From d8d2ad95d79bad991b471e97a6b7985552aba91b Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Wed, 11 Dec 2013 16:50:09 +0100 Subject: [PATCH] Allow to requeue the notification message This patch allow to requeue the notification received by the notification listener. Partial implements blueprint notification-subscriber-server Change-Id: I49c4ba91224c280e479edb19289ccb337a2ab843 --- oslo/messaging/_drivers/amqp.py | 8 +++- oslo/messaging/_drivers/amqpdriver.py | 16 +++++++- oslo/messaging/_drivers/base.py | 4 ++ oslo/messaging/_drivers/impl_fake.py | 24 +++++++---- oslo/messaging/_drivers/impl_qpid.py | 4 ++ oslo/messaging/_drivers/impl_rabbit.py | 3 ++ oslo/messaging/_drivers/impl_zmq.py | 6 +++ oslo/messaging/notify/__init__.py | 4 +- oslo/messaging/notify/dispatcher.py | 25 +++++++++-- oslo/messaging/notify/listener.py | 18 ++++++++ tests/test_notify_dispatcher.py | 57 +++++++++++++++++++++++--- tests/test_notify_listener.py | 30 ++++++++++++-- tests/test_rabbit.py | 4 ++ 13 files changed, 178 insertions(+), 25 deletions(-) diff --git a/oslo/messaging/_drivers/amqp.py b/oslo/messaging/_drivers/amqp.py index 9be6f0eaf..6b9c88ec9 100644 --- a/oslo/messaging/_drivers/amqp.py +++ b/oslo/messaging/_drivers/amqp.py @@ -318,12 +318,16 @@ class _MsgIdCache(object): """AMQP consumers may read same message twice when exceptions occur before ack is returned. This method prevents doing it. """ + if UNIQUE_ID in message_data: + msg_id = message_data.get(UNIQUE_ID) + if msg_id in self.prev_msgids: + raise rpc_common.DuplicateMessageError(msg_id=msg_id) + + def add(self, message_data): if UNIQUE_ID in message_data: msg_id = message_data.pop(UNIQUE_ID) if msg_id not in self.prev_msgids: self.prev_msgids.append(msg_id) - else: - raise rpc_common.DuplicateMessageError(msg_id=msg_id) def _add_unique_id(msg): diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index 1b1cac494..8ccba92ac 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -37,7 +37,8 @@ class AMQPIncomingMessage(base.IncomingMessage): self.msg_id = msg_id self.reply_q = reply_q - self.acknowledge = message.acknowledge + self.acknowledge_callback = message.acknowledge + self.requeue_callback = message.requeue def _send_reply(self, conn, reply=None, failure=None, ending=False, log_failure=True): @@ -65,6 +66,19 @@ class AMQPIncomingMessage(base.IncomingMessage): self._send_reply(conn, reply, failure, log_failure=log_failure) self._send_reply(conn, ending=True) + def acknowledge(self): + self.listener.msg_id_cache.add(self.message) + self.acknowledge_callback() + + def requeue(self): + # NOTE(sileht): In case of the connection is lost between receiving the + # message and requeing it, this requeue call fail + # but because the message is not acknowledged and not added to the + # msg_id_cache, the message will be reconsumed, the only difference is + # the message stay at the beginning of the queue instead of moving to + # the end. + self.requeue_callback() + class AMQPListener(base.Listener): diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index 83b9b9ef2..dcf08287e 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -40,6 +40,10 @@ class IncomingMessage(object): def acknowledge(self): "Acknowledge the message." + @abc.abstractmethod + def requeue(self): + "Requeue the message." + @six.add_metaclass(abc.ABCMeta) class Listener(object): diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index a588db035..2bfbe16b3 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -26,8 +26,9 @@ from oslo.messaging._drivers import base class FakeIncomingMessage(base.IncomingMessage): - def __init__(self, listener, ctxt, message, reply_q): + def __init__(self, listener, ctxt, message, reply_q, requeue): super(FakeIncomingMessage, self).__init__(listener, ctxt, message) + self.requeue_callback = requeue self._reply_q = reply_q def reply(self, reply=None, failure=None, log_failure=True): @@ -35,6 +36,9 @@ class FakeIncomingMessage(base.IncomingMessage): failure = failure[1] if failure else None self._reply_q.put((reply, failure)) + def requeue(self): + self.requeue_callback() + class FakeListener(base.Listener): @@ -46,10 +50,11 @@ class FakeListener(base.Listener): def poll(self): while True: for target in self._targets: - (ctxt, message, reply_q) = self._exchange.poll(target) + (ctxt, message, reply_q, requeue) = \ + self._exchange.poll(target) if message is not None: - message = FakeIncomingMessage(self, ctxt, message, reply_q) - message.acknowledge() + message = FakeIncomingMessage(self, ctxt, message, + reply_q, requeue) return message time.sleep(.05) @@ -58,7 +63,7 @@ class FakeExchange(object): def __init__(self, name): self.name = name - self._queues_lock = threading.Lock() + self._queues_lock = threading.RLock() self._topic_queues = {} self._server_queues = {} @@ -78,8 +83,13 @@ class FakeExchange(object): queues = [self._get_server_queue(topic, server)] else: queues = [self._get_topic_queue(topic)] + + def requeue(): + self.deliver_message(topic, ctxt, message, server=server, + fanout=fanout, reply_q=reply_q) + for queue in queues: - queue.append((ctxt, message, reply_q)) + queue.append((ctxt, message, reply_q, requeue)) def poll(self, target): with self._queues_lock: @@ -87,7 +97,7 @@ class FakeExchange(object): queue = self._get_server_queue(target.topic, target.server) else: queue = self._get_topic_queue(target.topic) - return queue.pop(0) if queue else (None, None, None) + return queue.pop(0) if queue else (None, None, None, None) class FakeDriver(base.BaseDriver): diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index bac02c7e5..c53ad8946 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -99,6 +99,10 @@ class QpidMessage(dict): def acknowledge(self): self._session.acknowledge(self._raw_message) + def requeue(self): + raise NotImplementedError('The QPID driver does not yet support ' + 'requeuing messages') + class ConsumerBase(object): """Consumer base class.""" diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 0bc6a8bda..be41a4ec1 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -128,6 +128,9 @@ class RabbitMessage(dict): def acknowledge(self): self._raw_message.ack() + def requeue(self): + self._raw_message.requeue() + class ConsumerBase(object): """Consumer base class.""" diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index d9869389c..6b2b53957 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -843,6 +843,10 @@ class ZmqIncomingMessage(base.IncomingMessage): with self.condition: self.condition.notify() + def requeue(self): + raise NotImplementedError('The ZeroMQ driver does not yet support ' + 'requeuing messages') + class ZmqListener(base.Listener): @@ -960,6 +964,8 @@ class ZmqDriver(base.BaseDriver): return listener def listen_for_notifications(self, targets_and_priorities): + # NOTE(sileht): this listener implementation is limited + # because zeromq doesn't support requeing message conn = create_connection(self.conf) listener = ZmqListener(self, None) diff --git a/oslo/messaging/notify/__init__.py b/oslo/messaging/notify/__init__.py index 4b87d72c3..92d726ba7 100644 --- a/oslo/messaging/notify/__init__.py +++ b/oslo/messaging/notify/__init__.py @@ -15,8 +15,10 @@ __all__ = ['Notifier', 'LoggingNotificationHandler', - 'get_notification_listener'] + 'get_notification_listener', + 'NotificationResult'] from .notifier import * from .listener import * from .logger import * +from .dispatcher import NotificationResult diff --git a/oslo/messaging/notify/dispatcher.py b/oslo/messaging/notify/dispatcher.py index 1bcf84c93..87deee854 100644 --- a/oslo/messaging/notify/dispatcher.py +++ b/oslo/messaging/notify/dispatcher.py @@ -28,6 +28,11 @@ LOG = logging.getLogger(__name__) PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample'] +class NotificationResult(object): + HANDLED = 'handled' + REQUEUE = 'requeue' + + class NotificationDispatcher(object): """A message dispatcher which understands Notification messages. @@ -59,8 +64,15 @@ class NotificationDispatcher(object): @contextlib.contextmanager def __call__(self, incoming): - yield lambda: self._dispatch_and_handle_error(incoming) - incoming.acknowledge() + result_wrapper = [] + + yield lambda: result_wrapper.append( + self._dispatch_and_handle_error(incoming)) + + if result_wrapper[0] == NotificationResult.HANDLED: + incoming.acknowledge() + else: + incoming.requeue() def _dispatch_and_handle_error(self, incoming): """Dispatch a notification message to the appropriate endpoint method. @@ -69,12 +81,13 @@ class NotificationDispatcher(object): :type ctxt: IncomingMessage """ try: - self._dispatch(incoming.ctxt, incoming.message) + return self._dispatch(incoming.ctxt, incoming.message) except Exception: # sys.exc_info() is deleted by LOG.exception(). exc_info = sys.exc_info() LOG.error('Exception during message handling', exc_info=exc_info) + return NotificationResult.HANDLED def _dispatch(self, ctxt, message): """Dispatch an RPC message to the appropriate endpoint method. @@ -99,6 +112,10 @@ class NotificationDispatcher(object): for callback in self._callbacks_by_priority.get(priority, []): localcontext.set_local_context(ctxt) try: - callback(ctxt, publisher_id, event_type, payload) + ret = callback(ctxt, publisher_id, event_type, payload) + ret = NotificationResult.HANDLED if ret is None else ret + if ret != NotificationResult.HANDLED: + return ret finally: localcontext.clear_local_context() + return NotificationResult.HANDLED diff --git a/oslo/messaging/notify/listener.py b/oslo/messaging/notify/listener.py index f7384c148..06a2e24ce 100644 --- a/oslo/messaging/notify/listener.py +++ b/oslo/messaging/notify/listener.py @@ -73,6 +73,24 @@ priority Parameters to endpoint methods are the request context supplied by the client, the publisher_id of the notification message, the event_type, the payload. +An endpoint method can return explicitly messaging.NotificationResult.HANDLED +to acknowledge a message or messaging.NotificationResult.REQUEUE to requeue the +message. + +The message is acknowledge only if all endpoints return +messaging.NotificationResult.HANDLED + +If nothing is returned by an endpoint, this is considered like +messaging.NotificationResult.HANDLED + +messaging.NotificationResult values needs to be handled by drivers: + +* HANDLED: supported by all drivers +* REQUEUE: supported by drivers: fake://, rabbit:// + +In case of an unsupported driver nothing is done to the message and a +NotImplementedError is raised and logged. + By supplying a serializer object, a listener can deserialize a request context and arguments from - and serialize return values to - primitive types. """ diff --git a/tests/test_notify_dispatcher.py b/tests/test_notify_dispatcher.py index 24b6e6b32..3e2c966a1 100644 --- a/tests/test_notify_dispatcher.py +++ b/tests/test_notify_dispatcher.py @@ -41,24 +41,58 @@ class TestDispatcher(test_utils.BaseTestCase): ('no_endpoints', dict(endpoints=[], endpoints_expect_calls=[], - priority='info')), + priority='info', + ex=None, + return_value=messaging.NotificationResult.HANDLED)), ('one_endpoints', dict(endpoints=[['warn']], endpoints_expect_calls=['warn'], - priority='warn')), + priority='warn', + ex=None, + return_value=messaging.NotificationResult.HANDLED)), ('two_endpoints_only_one_match', dict(endpoints=[['warn'], ['info']], endpoints_expect_calls=[None, 'info'], - priority='info')), + priority='info', + ex=None, + return_value=messaging.NotificationResult.HANDLED)), ('two_endpoints_both_match', dict(endpoints=[['debug', 'info'], ['info', 'debug']], endpoints_expect_calls=['debug', 'debug'], - priority='debug')), + priority='debug', + ex=None, + return_value=messaging.NotificationResult.HANDLED)), + ('no_return_value', + dict(endpoints=[['warn']], + endpoints_expect_calls=['warn'], + priority='warn', + ex=None, return_value=None)), + ('requeue', + dict(endpoints=[['debug', 'warn']], + endpoints_expect_calls=['debug'], + priority='debug', msg=notification_msg, + ex=None, + return_value=messaging.NotificationResult.REQUEUE)), + ('exception', + dict(endpoints=[['debug', 'warn']], + endpoints_expect_calls=['debug'], + priority='debug', msg=notification_msg, + ex=Exception, + return_value=messaging.NotificationResult.HANDLED)), ] def test_dispatcher(self): - endpoints = [mock.Mock(spec=endpoint_methods) - for endpoint_methods in self.endpoints] + endpoints = [] + for endpoint_methods in self.endpoints: + e = mock.Mock(spec=endpoint_methods) + endpoints.append(e) + for m in endpoint_methods: + method = getattr(e, m) + if self.ex: + method.side_effect = self.ex() + else: + method.return_value = self.return_value + msg = notification_msg.copy() msg['priority'] = self.priority @@ -89,6 +123,17 @@ class TestDispatcher(test_utils.BaseTestCase): else: self.assertEqual(endpoints[i].call_count, 0) + if self.ex: + self.assertEqual(incoming.acknowledge.call_count, 1) + self.assertEqual(incoming.requeue.call_count, 0) + elif self.return_value == messaging.NotificationResult.HANDLED \ + or self.return_value is None: + self.assertEqual(incoming.acknowledge.call_count, 1) + self.assertEqual(incoming.requeue.call_count, 0) + elif self.return_value == messaging.NotificationResult.REQUEUE: + self.assertEqual(incoming.acknowledge.call_count, 0) + self.assertEqual(incoming.requeue.call_count, 1) + @mock.patch('oslo.messaging.notify.dispatcher.LOG') def test_dispatcher_unknown_prio(self, mylog): msg = notification_msg.copy() diff --git a/tests/test_notify_listener.py b/tests/test_notify_listener.py index 7bb30b41b..3b35a3bb1 100644 --- a/tests/test_notify_listener.py +++ b/tests/test_notify_listener.py @@ -123,7 +123,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): transport = messaging.get_transport(self.conf, url='fake:') endpoint = mock.Mock() - endpoint.info = mock.Mock() + endpoint.info.return_value = None listener_thread = self._setup_listener(transport, [endpoint], 1) notifier = self._setup_notifier(transport) @@ -138,7 +138,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): transport = messaging.get_transport(self.conf, url='fake:') endpoint = mock.Mock() - endpoint.info = mock.Mock() + endpoint.info.return_value = None topics = ["topic1", "topic2"] listener_thread = self._setup_listener(transport, [endpoint], 2, topics=topics) @@ -157,9 +157,9 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): transport = messaging.get_transport(self.conf, url='fake:') endpoint1 = mock.Mock() - endpoint1.info = mock.Mock() + endpoint1.info.return_value = None endpoint2 = mock.Mock() - endpoint2.info = mock.Mock() + endpoint2.info.return_value = messaging.NotificationResult.HANDLED listener_thread = self._setup_listener(transport, [endpoint1, endpoint2], 1) notifier = self._setup_notifier(transport) @@ -171,3 +171,25 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): {}, 'testpublisher', 'an_event.start', 'test') endpoint2.info.assert_called_once_with( {}, 'testpublisher', 'an_event.start', 'test') + + def test_requeue(self): + transport = messaging.get_transport(self.conf, url='fake:') + endpoint = mock.Mock() + endpoint.info = mock.Mock() + + def side_effect_requeue(*args, **kwargs): + if endpoint.info.call_count == 1: + return messaging.NotificationResult.REQUEUE + return messaging.NotificationResult.HANDLED + + endpoint.info.side_effect = side_effect_requeue + listener_thread = self._setup_listener(transport, + [endpoint], 2) + notifier = self._setup_notifier(transport) + notifier.info({}, 'an_event.start', 'test') + + self._stop_listener(listener_thread) + + expected = [mock.call({}, 'testpublisher', 'an_event.start', 'test'), + mock.call({}, 'testpublisher', 'an_event.start', 'test')] + self.assertEqual(endpoint.info.call_args_list, expected) diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py index 8c0874191..77afd41bb 100644 --- a/tests/test_rabbit.py +++ b/tests/test_rabbit.py @@ -206,6 +206,7 @@ class TestSendReceive(test_utils.BaseTestCase): senders[i].start() received = listener.poll() + received.message.pop('_unique_id') self.assertIsNotNone(received) self.assertEqual(received.ctxt, self.ctxt) self.assertEqual(received.message, {'tx_id': i}) @@ -302,12 +303,14 @@ class TestRacyWaitForReply(test_utils.BaseTestCase): senders[0].start() msgs.append(listener.poll()) + msgs[-1].message.pop('_unique_id') self.assertEqual(msgs[-1].message, {'tx_id': 0}) # Start the second guy, receive his message senders[1].start() msgs.append(listener.poll()) + msgs[-1].message.pop('_unique_id') self.assertEqual(msgs[-1].message, {'tx_id': 1}) # Reply to both in order, making the second thread queue @@ -602,6 +605,7 @@ class TestReplyWireFormat(test_utils.BaseTestCase): producer.publish(msg) received = listener.poll() + received.message.pop('_unique_id') self.assertIsNotNone(received) self.assertEqual(self.expected_ctxt, received.ctxt) self.assertEqual(self.expected, received.message)