Merge "[rabbit] use retry parameters during notification sending"
This commit is contained in:
commit
5d165cc713
@ -601,9 +601,10 @@ class AMQPDriverBase(base.BaseDriver):
|
|||||||
def _get_exchange(self, target):
|
def _get_exchange(self, target):
|
||||||
return target.exchange or self._default_exchange
|
return target.exchange or self._default_exchange
|
||||||
|
|
||||||
def _get_connection(self, purpose=rpc_common.PURPOSE_SEND):
|
def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None):
|
||||||
return rpc_common.ConnectionContext(self._connection_pool,
|
return rpc_common.ConnectionContext(self._connection_pool,
|
||||||
purpose=purpose)
|
purpose=purpose,
|
||||||
|
retry=retry)
|
||||||
|
|
||||||
def _get_reply_q(self):
|
def _get_reply_q(self):
|
||||||
with self._reply_q_lock:
|
with self._reply_q_lock:
|
||||||
@ -649,7 +650,7 @@ class AMQPDriverBase(base.BaseDriver):
|
|||||||
log_msg = "CAST unique_id: %s " % unique_id
|
log_msg = "CAST unique_id: %s " % unique_id
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
|
with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn:
|
||||||
if notify:
|
if notify:
|
||||||
exchange = self._get_exchange(target)
|
exchange = self._get_exchange(target)
|
||||||
LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"
|
LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"
|
||||||
|
@ -392,7 +392,7 @@ class ConnectionContext(Connection):
|
|||||||
If possible the function makes sure to return a connection to the pool.
|
If possible the function makes sure to return a connection to the pool.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, connection_pool, purpose):
|
def __init__(self, connection_pool, purpose, retry):
|
||||||
"""Create a new connection, or get one from the pool."""
|
"""Create a new connection, or get one from the pool."""
|
||||||
self.connection = None
|
self.connection = None
|
||||||
self.connection_pool = connection_pool
|
self.connection_pool = connection_pool
|
||||||
@ -420,7 +420,7 @@ class ConnectionContext(Connection):
|
|||||||
|
|
||||||
pooled = purpose == PURPOSE_SEND
|
pooled = purpose == PURPOSE_SEND
|
||||||
if pooled:
|
if pooled:
|
||||||
self.connection = connection_pool.get()
|
self.connection = connection_pool.get(retry=retry)
|
||||||
else:
|
else:
|
||||||
self.connection = connection_pool.create(purpose)
|
self.connection = connection_pool.create(purpose)
|
||||||
self.pooled = pooled
|
self.pooled = pooled
|
||||||
|
@ -452,13 +452,14 @@ class ConnectionLock(DummyConnectionLock):
|
|||||||
class Connection(object):
|
class Connection(object):
|
||||||
"""Connection object."""
|
"""Connection object."""
|
||||||
|
|
||||||
def __init__(self, conf, url, purpose):
|
def __init__(self, conf, url, purpose, retry=None):
|
||||||
# NOTE(viktors): Parse config options
|
# NOTE(viktors): Parse config options
|
||||||
driver_conf = conf.oslo_messaging_rabbit
|
driver_conf = conf.oslo_messaging_rabbit
|
||||||
|
|
||||||
self.interval_start = driver_conf.rabbit_retry_interval
|
self.interval_start = driver_conf.rabbit_retry_interval
|
||||||
self.interval_stepping = driver_conf.rabbit_retry_backoff
|
self.interval_stepping = driver_conf.rabbit_retry_backoff
|
||||||
self.interval_max = driver_conf.rabbit_interval_max
|
self.interval_max = driver_conf.rabbit_interval_max
|
||||||
|
self.max_retries = retry
|
||||||
|
|
||||||
self.login_method = driver_conf.rabbit_login_method
|
self.login_method = driver_conf.rabbit_login_method
|
||||||
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
|
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
|
||||||
@ -728,7 +729,13 @@ class Connection(object):
|
|||||||
str(exc), interval)
|
str(exc), interval)
|
||||||
|
|
||||||
self._set_current_channel(None)
|
self._set_current_channel(None)
|
||||||
self.connection.ensure_connection(errback=on_error)
|
self.connection.ensure_connection(
|
||||||
|
errback=on_error,
|
||||||
|
max_retries=self.max_retries,
|
||||||
|
interval_start=self.interval_start or 1,
|
||||||
|
interval_step=self.interval_stepping,
|
||||||
|
interval_max=self.interval_max,
|
||||||
|
)
|
||||||
self._set_current_channel(self.connection.channel())
|
self._set_current_channel(self.connection.channel())
|
||||||
self.set_transport_socket_timeout()
|
self.set_transport_socket_timeout()
|
||||||
|
|
||||||
|
@ -69,7 +69,7 @@ class Pool(object, metaclass=abc.ABCMeta):
|
|||||||
self._items.append((ttl_watch, item))
|
self._items.append((ttl_watch, item))
|
||||||
self._cond.notify()
|
self._cond.notify()
|
||||||
|
|
||||||
def get(self):
|
def get(self, retry=None):
|
||||||
"""Return an item from the pool, when one is available.
|
"""Return an item from the pool, when one is available.
|
||||||
|
|
||||||
This may cause the calling thread to block.
|
This may cause the calling thread to block.
|
||||||
@ -95,7 +95,7 @@ class Pool(object, metaclass=abc.ABCMeta):
|
|||||||
|
|
||||||
# We've grabbed a slot and dropped the lock, now do the creation
|
# We've grabbed a slot and dropped the lock, now do the creation
|
||||||
try:
|
try:
|
||||||
return self.create()
|
return self.create(retry=retry)
|
||||||
except Exception:
|
except Exception:
|
||||||
with self._cond:
|
with self._cond:
|
||||||
self._current_size -= 1
|
self._current_size -= 1
|
||||||
@ -111,7 +111,7 @@ class Pool(object, metaclass=abc.ABCMeta):
|
|||||||
return
|
return
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def create(self):
|
def create(self, retry=None):
|
||||||
"""Construct a new item."""
|
"""Construct a new item."""
|
||||||
|
|
||||||
|
|
||||||
@ -130,9 +130,9 @@ class ConnectionPool(Pool):
|
|||||||
LOG.debug("Idle connection has expired and been closed."
|
LOG.debug("Idle connection has expired and been closed."
|
||||||
" Pool size: %d" % len(self._items))
|
" Pool size: %d" % len(self._items))
|
||||||
|
|
||||||
def create(self, purpose=common.PURPOSE_SEND):
|
def create(self, purpose=common.PURPOSE_SEND, retry=None):
|
||||||
LOG.debug('Pool creating new connection')
|
LOG.debug('Pool creating new connection')
|
||||||
return self.connection_cls(self.conf, self.url, purpose)
|
return self.connection_cls(self.conf, self.url, purpose, retry=retry)
|
||||||
|
|
||||||
def empty(self):
|
def empty(self):
|
||||||
for item in self.iter_free():
|
for item in self.iter_free():
|
||||||
|
@ -21,19 +21,30 @@ Notification drivers for sending notifications via messaging.
|
|||||||
The messaging drivers publish notification messages to notification
|
The messaging drivers publish notification messages to notification
|
||||||
listeners.
|
listeners.
|
||||||
|
|
||||||
The driver will block the notifier's thread until the notification message has
|
In case of the rabbit backend the driver will block the notifier's thread
|
||||||
been passed to the messaging transport. There is no guarantee that the
|
until the notification message has been passed to the messaging transport.
|
||||||
notification message will be consumed by a notification listener.
|
There is no guarantee that the notification message will be consumed by a
|
||||||
|
notification listener.
|
||||||
|
|
||||||
|
In case of the kafka backend the driver will not block the notifier's thread
|
||||||
|
but return immediately. The driver will try to deliver the message in the
|
||||||
|
background.
|
||||||
|
|
||||||
Notification messages are sent 'at-most-once' - ensuring that they are not
|
Notification messages are sent 'at-most-once' - ensuring that they are not
|
||||||
duplicated.
|
duplicated.
|
||||||
|
|
||||||
If the connection to the messaging service is not active when a notification is
|
If the connection to the messaging service is not active when a notification is
|
||||||
sent this driver will block waiting for the connection to complete. If the
|
sent the rabbit backend will block waiting for the connection to complete.
|
||||||
connection fails to complete, the driver will try to re-establish that
|
If the connection fails to complete, the driver will try to re-establish that
|
||||||
connection. By default this will continue indefinitely until the connection
|
connection. By default this will continue indefinitely until the connection
|
||||||
completes. However, the retry parameter can be used to have the notification
|
completes. However, the retry parameter can be used to have the notification
|
||||||
send fail with a MessageDeliveryFailure after the given number of retries.
|
send fail. In this case an error is logged and the notifier's thread is resumed
|
||||||
|
without any error.
|
||||||
|
|
||||||
|
If the connection to the messaging service is not active when a notification is
|
||||||
|
sent the kafka backend will return immediately and the backend tries to
|
||||||
|
establish the connection and deliver the messages in the background.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
@ -44,7 +44,7 @@ class PoolTestCase(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
class TestPool(pool.Pool):
|
class TestPool(pool.Pool):
|
||||||
|
|
||||||
def create(self):
|
def create(self, retry=None):
|
||||||
return uuid.uuid4()
|
return uuid.uuid4()
|
||||||
|
|
||||||
class ThreadWaitWaiter(object):
|
class ThreadWaitWaiter(object):
|
||||||
@ -82,7 +82,7 @@ class PoolTestCase(test_utils.BaseTestCase):
|
|||||||
p = self.TestPool(**kwargs)
|
p = self.TestPool(**kwargs)
|
||||||
|
|
||||||
if self.create_error:
|
if self.create_error:
|
||||||
def create_error():
|
def create_error(retry=None):
|
||||||
raise RuntimeError
|
raise RuntimeError
|
||||||
orig_create = p.create
|
orig_create = p.create
|
||||||
self.useFixture(fixtures.MockPatchObject(
|
self.useFixture(fixtures.MockPatchObject(
|
||||||
|
@ -244,6 +244,10 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
|
|||||||
topics=["test-retry"],
|
topics=["test-retry"],
|
||||||
retry=2,
|
retry=2,
|
||||||
group="oslo_messaging_notifications")
|
group="oslo_messaging_notifications")
|
||||||
|
self.config(
|
||||||
|
# just to speed up the test execution
|
||||||
|
rabbit_retry_backoff=0,
|
||||||
|
group="oslo_messaging_rabbit")
|
||||||
transport = oslo_messaging.get_notification_transport(
|
transport = oslo_messaging.get_notification_transport(
|
||||||
self.conf, url='rabbit://')
|
self.conf, url='rabbit://')
|
||||||
notifier = oslo_messaging.Notifier(transport)
|
notifier = oslo_messaging.Notifier(transport)
|
||||||
@ -264,12 +268,15 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
|
|||||||
'kombu.connection.Connection._establish_connection',
|
'kombu.connection.Connection._establish_connection',
|
||||||
new=wrapped_establish_connection
|
new=wrapped_establish_connection
|
||||||
):
|
):
|
||||||
# FIXME(gibi) This is bug 1917645 as the driver does not stop
|
with mock.patch(
|
||||||
# retrying the connection after two retries only our test fixture
|
'oslo_messaging.notify.messaging.LOG.exception'
|
||||||
# stops the retry by raising TestingException
|
) as mock_log:
|
||||||
self.assertRaises(
|
notifier.info({}, "test", {})
|
||||||
self.TestingException,
|
|
||||||
notifier.info, {}, "test", {})
|
# one normal call plus two retries
|
||||||
|
self.assertEqual(3, len(calls))
|
||||||
|
# the error was caught and logged
|
||||||
|
mock_log.assert_called_once()
|
||||||
|
|
||||||
def test_notifier_retry_connection_fails_kafka(self):
|
def test_notifier_retry_connection_fails_kafka(self):
|
||||||
"""This test sets a small retry number for notification sending and
|
"""This test sets a small retry number for notification sending and
|
||||||
|
@ -0,0 +1,8 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
As a fix for `bug 1917645 <https://launchpad.net/bugs/1917645>`_ the rabbit
|
||||||
|
backend is changed to use the ``[oslo_messaging_notifications]retry``
|
||||||
|
parameter when driver tries to connect to the message bus during
|
||||||
|
notification sending. Before this fix the rabbit backend retried the
|
||||||
|
connection forever blocking the caller thread.
|
Loading…
Reference in New Issue
Block a user