From c50076b4efb79cef46d618d6d80eecbcebb72898 Mon Sep 17 00:00:00 2001 From: Gabriele Date: Thu, 27 Jun 2019 13:13:47 +0200 Subject: [PATCH] Implement mandatory flag for RabbitMQ driver With this feature it is possible to use the mandatory RabbitMQ mandatory flag. Implements: blueprint transport-options (point 3) The blueprint link is [1] Please follow the link [2] to use and test the feature. 1- https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options 2- https://github.com/Gsantomaggio/rabbitmq-utils/ tree/master/openstack/mandatory_test Change-Id: Ie269fc08ba80c4b94a24a8207c1e86c19c3b3fcb --- lower-constraints.txt | 2 +- oslo_messaging/_drivers/impl_rabbit.py | 12 ++++++- oslo_messaging/exceptions.py | 13 ++++++- .../tests/functional/test_functional.py | 34 +++++++++++++++++++ oslo_messaging/tests/functional/utils.py | 9 +++-- requirements.txt | 2 +- 6 files changed, 66 insertions(+), 6 deletions(-) diff --git a/lower-constraints.txt b/lower-constraints.txt index 3df8f1a83..338c12a42 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -28,7 +28,7 @@ imagesize==0.7.1 iso8601==0.1.11 Jinja2==2.10 keystoneauth1==3.4.0 -kombu==4.0.0 +kombu==4.6.1 linecache2==1.0.0 MarkupSafe==1.0 mccabe==0.2.1 diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 1726fec2d..124e7ef6d 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -756,6 +756,10 @@ class Connection(object): # NOTE(sileht): we must reraise this without # trigger error_callback raise + except exceptions.MessageUndeliverable: + # NOTE(gsantomaggio): we must reraise this without + # trigger error_callback + raise except Exception as exc: error_callback and error_callback(exc) self._set_current_channel(None) @@ -769,6 +773,11 @@ class Connection(object): LOG.error(msg) raise exceptions.MessageDeliveryFailure(msg) + @staticmethod + def on_return(exception, exchange, routing_key, message): + raise exceptions.MessageUndeliverable(exception, exchange, routing_key, + message) + def _set_current_channel(self, new_channel): """Change the channel to use. @@ -787,7 +796,8 @@ class Connection(object): if new_channel is not None: if self.purpose == rpc_common.PURPOSE_LISTEN: self._set_qos(new_channel) - self._producer = kombu.messaging.Producer(new_channel) + self._producer = kombu.messaging.Producer(new_channel, + on_return=self.on_return) for consumer in self._consumers: consumer.declare(self) diff --git a/oslo_messaging/exceptions.py b/oslo_messaging/exceptions.py index cfe6a7efb..f6ba20a76 100644 --- a/oslo_messaging/exceptions.py +++ b/oslo_messaging/exceptions.py @@ -16,7 +16,7 @@ import six __all__ = ['MessagingException', 'MessagingTimeout', 'MessageDeliveryFailure', - 'InvalidTarget'] + 'InvalidTarget', 'MessageUndeliverable'] class MessagingException(Exception): @@ -38,3 +38,14 @@ class InvalidTarget(MessagingException, ValueError): msg = msg + ":" + six.text_type(target) super(InvalidTarget, self).__init__(msg) self.target = target + + +class MessageUndeliverable(Exception): + """Raised if message is not routed with mandatory flag""" + + def __init__(self, exception, exchange, routing_key, message): + super(MessageUndeliverable, self).__init__() + self.exception = exception + self.exchange = exchange + self.routing_key = routing_key + self.message = message diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index 4fa8b48f1..6800f596f 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -152,6 +152,40 @@ class CallTestCase(utils.SkipIfNoTransportURL): self.assertEqual(10, server.endpoint.ival) + def test_mandatory_call(self): + if not self.url.startswith("rabbit://"): + self.skipTest("backend does not support call monitoring") + + transport = self.useFixture(utils.RPCTransportFixture(self.conf, + self.url)) + target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()), + server='server_' + str(uuid.uuid4())) + + # test for mandatory flag using transport-options, see: + # https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options + # first test with `at_least_once=False` raises a "MessagingTimeout" + # error since there is no control if the queue actually exists. + # (Default behavior) + options = oslo_messaging.TransportOptions(at_least_once=False) + client1 = utils.ClientStub(transport.transport, target, + cast=False, timeout=1, + transport_options=options) + + self.assertRaises(oslo_messaging.MessagingTimeout, + client1.delay) + + # second test with `at_least_once=True` raises a "MessageUndeliverable" + # caused by mandatory flag. + # the MessageUndeliverable error is raised immediately without waiting + # any timeout + options2 = oslo_messaging.TransportOptions(at_least_once=True) + client2 = utils.ClientStub(transport.transport, target, + cast=False, timeout=60, + transport_options=options2) + + self.assertRaises(oslo_messaging.MessageUndeliverable, + client2.delay) + def test_monitor_long_call(self): if not (self.url.startswith("rabbit://") or self.url.startswith("amqp://")): diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 4d403a07b..700c16277 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -226,10 +226,15 @@ class RpcCast(RpcCall): class ClientStub(object): - def __init__(self, transport, target, cast=False, name=None, **kwargs): + def __init__(self, transport, target, cast=False, name=None, + transport_options=None, **kwargs): self.name = name or "functional-tests" self.cast = cast - self.client = oslo_messaging.RPCClient(transport, target, **kwargs) + self.client = oslo_messaging.RPCClient( + transport=transport, + target=target, + transport_options=transport_options, + **kwargs) def __getattr__(self, name): context = {"application": self.name} diff --git a/requirements.txt b/requirements.txt index 2e05118b4..7ed73945c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,7 +26,7 @@ PyYAML>=3.12 # MIT # rabbit driver is the default # we set the amqp version to ensure heartbeat works amqp>=2.4.1 # BSD -kombu!=4.0.2,>=4.0.0 # BSD +kombu!=4.0.2,>=4.6.1 # BSD # middleware oslo.middleware>=3.31.0 # Apache-2.0