From f059bba6ae9642675bc1c9004a4a4adc5d1b19a6 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Mon, 24 Jul 2017 13:36:17 -0400 Subject: [PATCH] Prevent rabbit from raising unexpected exceptions Publishing a message using the kombu connection autoretry method may allow exceptions from the py-amqp library to be raised up to the application. This does not conform to the documented oslo.messaging API. Enhance the try except block to capture any exception and translate it into a MessageDeliveryFailure. There are a few cases where exceptions will be raised during autoretry publishing: recoverable connection or channel errors, and non-recoverable connection or channel errors. autoretry will only retry if the error is recoverable. Non recoverable errors are re-raised immediately regardless of the retry count. In the case of a recoverable error it seems unlikely that retrying either the connection or the channel yet again is going to get us anywhere, so in this case we simply clean up the channel state, log an error and fail the operation. In the case of non-recoverable error we are out of luck (think authentication failure) - further retrying will not help. Best we can do is clean up state and log the heck out of it. Change-Id: I2f65d2ee19a8c3e9a323b30404abbf0cbb45a216 Closes-Bug: #1705351 Closes-Bug: #1707160 --- oslo_messaging/_drivers/impl_rabbit.py | 15 +++++---------- oslo_messaging/tests/drivers/test_impl_rabbit.py | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index c001ae4c8..35b824fcc 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -807,9 +807,11 @@ class Connection(object): ret, channel = autoretry_method() self._set_current_channel(channel) return ret - except kombu.exceptions.OperationalError as exc: - LOG.debug("Received recoverable error from kombu:", - exc_info=True) + except rpc_amqp.AMQPDestinationNotFound: + # NOTE(sileht): we must reraise this without + # trigger error_callback + raise + except Exception as exc: error_callback and error_callback(exc) self._set_current_channel(None) # NOTE(sileht): number of retry exceeded and the connection @@ -821,13 +823,6 @@ class Connection(object): 'tries: %(err_str)s') % info LOG.error(msg) raise exceptions.MessageDeliveryFailure(msg) - except rpc_amqp.AMQPDestinationNotFound: - # NOTE(sileht): we must reraise this without - # trigger error_callback - raise - except Exception as exc: - error_callback and error_callback(exc) - raise def _set_current_channel(self, new_channel): """Change the channel to use. diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 94be527b6..1ba3d6324 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -30,6 +30,7 @@ import oslo_messaging from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers import impl_rabbit as rabbit_driver +from oslo_messaging.exceptions import MessageDeliveryFailure from oslo_messaging.tests import utils as test_utils from six.moves import mock @@ -285,6 +286,20 @@ class TestRabbitPublisher(test_utils.BaseTestCase): try_send(e_active) self.assertIn('foobar', conn._declared_exchanges) + def test_send_exception_remap(self): + bad_exc = Exception("Non-oslo.messaging exception") + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + exchange_mock = mock.Mock() + with transport._driver._get_connection( + driver_common.PURPOSE_SEND) as pool_conn: + conn = pool_conn.connection + with mock.patch('kombu.messaging.Producer.publish', + side_effect=bad_exc): + self.assertRaises(MessageDeliveryFailure, + conn._ensure_publishing, + conn._publish, exchange_mock, 'msg') + class TestRabbitConsume(test_utils.BaseTestCase):