[rabbit] use retry parameters during notification sending

The rabbit backend now applies the [oslo_messaging_notifications]retry,
[oslo_messaging_rabbit]rabbit_retry_interval, rabbit_retry_backoff and
rabbit_interval_max configuration parameters when tries to establish the
connection to the message bus during notification sending.

This patch also clarifies the differences between the behavior
of the kafka and the rabbit drivers in this regard.

Closes-Bug: #1917645
Change-Id: Id4ccafc95314c86ae918336e42cca64a6acd4d94
This commit is contained in:
Balazs Gibizer 2021-11-23 16:58:05 +01:00
parent 1db6de63a8
commit 7b3968d9b0
8 changed files with 60 additions and 26 deletions

View File

@ -601,9 +601,10 @@ class AMQPDriverBase(base.BaseDriver):
def _get_exchange(self, target): def _get_exchange(self, target):
return target.exchange or self._default_exchange return target.exchange or self._default_exchange
def _get_connection(self, purpose=rpc_common.PURPOSE_SEND): def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None):
return rpc_common.ConnectionContext(self._connection_pool, return rpc_common.ConnectionContext(self._connection_pool,
purpose=purpose) purpose=purpose,
retry=retry)
def _get_reply_q(self): def _get_reply_q(self):
with self._reply_q_lock: with self._reply_q_lock:
@ -649,7 +650,7 @@ class AMQPDriverBase(base.BaseDriver):
log_msg = "CAST unique_id: %s " % unique_id log_msg = "CAST unique_id: %s " % unique_id
try: try:
with self._get_connection(rpc_common.PURPOSE_SEND) as conn: with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn:
if notify: if notify:
exchange = self._get_exchange(target) exchange = self._get_exchange(target)
LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'" LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"

View File

@ -392,7 +392,7 @@ class ConnectionContext(Connection):
If possible the function makes sure to return a connection to the pool. If possible the function makes sure to return a connection to the pool.
""" """
def __init__(self, connection_pool, purpose): def __init__(self, connection_pool, purpose, retry):
"""Create a new connection, or get one from the pool.""" """Create a new connection, or get one from the pool."""
self.connection = None self.connection = None
self.connection_pool = connection_pool self.connection_pool = connection_pool
@ -420,7 +420,7 @@ class ConnectionContext(Connection):
pooled = purpose == PURPOSE_SEND pooled = purpose == PURPOSE_SEND
if pooled: if pooled:
self.connection = connection_pool.get() self.connection = connection_pool.get(retry=retry)
else: else:
self.connection = connection_pool.create(purpose) self.connection = connection_pool.create(purpose)
self.pooled = pooled self.pooled = pooled

View File

@ -452,13 +452,14 @@ class ConnectionLock(DummyConnectionLock):
class Connection(object): class Connection(object):
"""Connection object.""" """Connection object."""
def __init__(self, conf, url, purpose): def __init__(self, conf, url, purpose, retry=None):
# NOTE(viktors): Parse config options # NOTE(viktors): Parse config options
driver_conf = conf.oslo_messaging_rabbit driver_conf = conf.oslo_messaging_rabbit
self.interval_start = driver_conf.rabbit_retry_interval self.interval_start = driver_conf.rabbit_retry_interval
self.interval_stepping = driver_conf.rabbit_retry_backoff self.interval_stepping = driver_conf.rabbit_retry_backoff
self.interval_max = driver_conf.rabbit_interval_max self.interval_max = driver_conf.rabbit_interval_max
self.max_retries = retry
self.login_method = driver_conf.rabbit_login_method self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
@ -728,7 +729,13 @@ class Connection(object):
str(exc), interval) str(exc), interval)
self._set_current_channel(None) self._set_current_channel(None)
self.connection.ensure_connection(errback=on_error) self.connection.ensure_connection(
errback=on_error,
max_retries=self.max_retries,
interval_start=self.interval_start or 1,
interval_step=self.interval_stepping,
interval_max=self.interval_max,
)
self._set_current_channel(self.connection.channel()) self._set_current_channel(self.connection.channel())
self.set_transport_socket_timeout() self.set_transport_socket_timeout()

View File

@ -69,7 +69,7 @@ class Pool(object, metaclass=abc.ABCMeta):
self._items.append((ttl_watch, item)) self._items.append((ttl_watch, item))
self._cond.notify() self._cond.notify()
def get(self): def get(self, retry=None):
"""Return an item from the pool, when one is available. """Return an item from the pool, when one is available.
This may cause the calling thread to block. This may cause the calling thread to block.
@ -95,7 +95,7 @@ class Pool(object, metaclass=abc.ABCMeta):
# We've grabbed a slot and dropped the lock, now do the creation # We've grabbed a slot and dropped the lock, now do the creation
try: try:
return self.create() return self.create(retry=retry)
except Exception: except Exception:
with self._cond: with self._cond:
self._current_size -= 1 self._current_size -= 1
@ -111,7 +111,7 @@ class Pool(object, metaclass=abc.ABCMeta):
return return
@abc.abstractmethod @abc.abstractmethod
def create(self): def create(self, retry=None):
"""Construct a new item.""" """Construct a new item."""
@ -130,9 +130,9 @@ class ConnectionPool(Pool):
LOG.debug("Idle connection has expired and been closed." LOG.debug("Idle connection has expired and been closed."
" Pool size: %d" % len(self._items)) " Pool size: %d" % len(self._items))
def create(self, purpose=common.PURPOSE_SEND): def create(self, purpose=common.PURPOSE_SEND, retry=None):
LOG.debug('Pool creating new connection') LOG.debug('Pool creating new connection')
return self.connection_cls(self.conf, self.url, purpose) return self.connection_cls(self.conf, self.url, purpose, retry=retry)
def empty(self): def empty(self):
for item in self.iter_free(): for item in self.iter_free():

View File

@ -21,19 +21,30 @@ Notification drivers for sending notifications via messaging.
The messaging drivers publish notification messages to notification The messaging drivers publish notification messages to notification
listeners. listeners.
The driver will block the notifier's thread until the notification message has In case of the rabbit backend the driver will block the notifier's thread
been passed to the messaging transport. There is no guarantee that the until the notification message has been passed to the messaging transport.
notification message will be consumed by a notification listener. There is no guarantee that the notification message will be consumed by a
notification listener.
In case of the kafka backend the driver will not block the notifier's thread
but return immediately. The driver will try to deliver the message in the
background.
Notification messages are sent 'at-most-once' - ensuring that they are not Notification messages are sent 'at-most-once' - ensuring that they are not
duplicated. duplicated.
If the connection to the messaging service is not active when a notification is If the connection to the messaging service is not active when a notification is
sent this driver will block waiting for the connection to complete. If the sent the rabbit backend will block waiting for the connection to complete.
connection fails to complete, the driver will try to re-establish that If the connection fails to complete, the driver will try to re-establish that
connection. By default this will continue indefinitely until the connection connection. By default this will continue indefinitely until the connection
completes. However, the retry parameter can be used to have the notification completes. However, the retry parameter can be used to have the notification
send fail with a MessageDeliveryFailure after the given number of retries. send fail. In this case an error is logged and the notifier's thread is resumed
without any error.
If the connection to the messaging service is not active when a notification is
sent the kafka backend will return immediately and the backend tries to
establish the connection and deliver the messages in the background.
""" """
import logging import logging

View File

@ -44,7 +44,7 @@ class PoolTestCase(test_utils.BaseTestCase):
class TestPool(pool.Pool): class TestPool(pool.Pool):
def create(self): def create(self, retry=None):
return uuid.uuid4() return uuid.uuid4()
class ThreadWaitWaiter(object): class ThreadWaitWaiter(object):
@ -82,7 +82,7 @@ class PoolTestCase(test_utils.BaseTestCase):
p = self.TestPool(**kwargs) p = self.TestPool(**kwargs)
if self.create_error: if self.create_error:
def create_error(): def create_error(retry=None):
raise RuntimeError raise RuntimeError
orig_create = p.create orig_create = p.create
self.useFixture(fixtures.MockPatchObject( self.useFixture(fixtures.MockPatchObject(

View File

@ -244,6 +244,10 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
topics=["test-retry"], topics=["test-retry"],
retry=2, retry=2,
group="oslo_messaging_notifications") group="oslo_messaging_notifications")
self.config(
# just to speed up the test execution
rabbit_retry_backoff=0,
group="oslo_messaging_rabbit")
transport = oslo_messaging.get_notification_transport( transport = oslo_messaging.get_notification_transport(
self.conf, url='rabbit://') self.conf, url='rabbit://')
notifier = oslo_messaging.Notifier(transport) notifier = oslo_messaging.Notifier(transport)
@ -264,12 +268,15 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
'kombu.connection.Connection._establish_connection', 'kombu.connection.Connection._establish_connection',
new=wrapped_establish_connection new=wrapped_establish_connection
): ):
# FIXME(gibi) This is bug 1917645 as the driver does not stop with mock.patch(
# retrying the connection after two retries only our test fixture 'oslo_messaging.notify.messaging.LOG.exception'
# stops the retry by raising TestingException ) as mock_log:
self.assertRaises( notifier.info({}, "test", {})
self.TestingException,
notifier.info, {}, "test", {}) # one normal call plus two retries
self.assertEqual(3, len(calls))
# the error was caught and logged
mock_log.assert_called_once()
def test_notifier_retry_connection_fails_kafka(self): def test_notifier_retry_connection_fails_kafka(self):
"""This test sets a small retry number for notification sending and """This test sets a small retry number for notification sending and

View File

@ -0,0 +1,8 @@
---
fixes:
- |
As a fix for `bug 1917645 <https://launchpad.net/bugs/1917645>`_ the rabbit
backend is changed to use the ``[oslo_messaging_notifications]retry``
parameter when driver tries to connect to the message bus during
notification sending. Before this fix the rabbit backend retried the
connection forever blocking the caller thread.