Allow to requeue the notification message

This patch allow to requeue the notification received by the
notification listener.

Partial implements blueprint notification-subscriber-server

Change-Id: I49c4ba91224c280e479edb19289ccb337a2ab843
This commit is contained in:
Mehdi Abaakouk 2013-12-11 16:50:09 +01:00 committed by Mehdi Abaakouk
parent 35f6d588a3
commit d8d2ad95d7
13 changed files with 178 additions and 25 deletions

View File

@ -318,12 +318,16 @@ class _MsgIdCache(object):
"""AMQP consumers may read same message twice when exceptions occur """AMQP consumers may read same message twice when exceptions occur
before ack is returned. This method prevents doing it. before ack is returned. This method prevents doing it.
""" """
if UNIQUE_ID in message_data:
msg_id = message_data.get(UNIQUE_ID)
if msg_id in self.prev_msgids:
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
def add(self, message_data):
if UNIQUE_ID in message_data: if UNIQUE_ID in message_data:
msg_id = message_data.pop(UNIQUE_ID) msg_id = message_data.pop(UNIQUE_ID)
if msg_id not in self.prev_msgids: if msg_id not in self.prev_msgids:
self.prev_msgids.append(msg_id) self.prev_msgids.append(msg_id)
else:
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
def _add_unique_id(msg): def _add_unique_id(msg):

View File

@ -37,7 +37,8 @@ class AMQPIncomingMessage(base.IncomingMessage):
self.msg_id = msg_id self.msg_id = msg_id
self.reply_q = reply_q self.reply_q = reply_q
self.acknowledge = message.acknowledge self.acknowledge_callback = message.acknowledge
self.requeue_callback = message.requeue
def _send_reply(self, conn, reply=None, failure=None, def _send_reply(self, conn, reply=None, failure=None,
ending=False, log_failure=True): ending=False, log_failure=True):
@ -65,6 +66,19 @@ class AMQPIncomingMessage(base.IncomingMessage):
self._send_reply(conn, reply, failure, log_failure=log_failure) self._send_reply(conn, reply, failure, log_failure=log_failure)
self._send_reply(conn, ending=True) self._send_reply(conn, ending=True)
def acknowledge(self):
self.listener.msg_id_cache.add(self.message)
self.acknowledge_callback()
def requeue(self):
# NOTE(sileht): In case of the connection is lost between receiving the
# message and requeing it, this requeue call fail
# but because the message is not acknowledged and not added to the
# msg_id_cache, the message will be reconsumed, the only difference is
# the message stay at the beginning of the queue instead of moving to
# the end.
self.requeue_callback()
class AMQPListener(base.Listener): class AMQPListener(base.Listener):

View File

@ -40,6 +40,10 @@ class IncomingMessage(object):
def acknowledge(self): def acknowledge(self):
"Acknowledge the message." "Acknowledge the message."
@abc.abstractmethod
def requeue(self):
"Requeue the message."
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class Listener(object): class Listener(object):

View File

@ -26,8 +26,9 @@ from oslo.messaging._drivers import base
class FakeIncomingMessage(base.IncomingMessage): class FakeIncomingMessage(base.IncomingMessage):
def __init__(self, listener, ctxt, message, reply_q): def __init__(self, listener, ctxt, message, reply_q, requeue):
super(FakeIncomingMessage, self).__init__(listener, ctxt, message) super(FakeIncomingMessage, self).__init__(listener, ctxt, message)
self.requeue_callback = requeue
self._reply_q = reply_q self._reply_q = reply_q
def reply(self, reply=None, failure=None, log_failure=True): def reply(self, reply=None, failure=None, log_failure=True):
@ -35,6 +36,9 @@ class FakeIncomingMessage(base.IncomingMessage):
failure = failure[1] if failure else None failure = failure[1] if failure else None
self._reply_q.put((reply, failure)) self._reply_q.put((reply, failure))
def requeue(self):
self.requeue_callback()
class FakeListener(base.Listener): class FakeListener(base.Listener):
@ -46,10 +50,11 @@ class FakeListener(base.Listener):
def poll(self): def poll(self):
while True: while True:
for target in self._targets: for target in self._targets:
(ctxt, message, reply_q) = self._exchange.poll(target) (ctxt, message, reply_q, requeue) = \
self._exchange.poll(target)
if message is not None: if message is not None:
message = FakeIncomingMessage(self, ctxt, message, reply_q) message = FakeIncomingMessage(self, ctxt, message,
message.acknowledge() reply_q, requeue)
return message return message
time.sleep(.05) time.sleep(.05)
@ -58,7 +63,7 @@ class FakeExchange(object):
def __init__(self, name): def __init__(self, name):
self.name = name self.name = name
self._queues_lock = threading.Lock() self._queues_lock = threading.RLock()
self._topic_queues = {} self._topic_queues = {}
self._server_queues = {} self._server_queues = {}
@ -78,8 +83,13 @@ class FakeExchange(object):
queues = [self._get_server_queue(topic, server)] queues = [self._get_server_queue(topic, server)]
else: else:
queues = [self._get_topic_queue(topic)] queues = [self._get_topic_queue(topic)]
def requeue():
self.deliver_message(topic, ctxt, message, server=server,
fanout=fanout, reply_q=reply_q)
for queue in queues: for queue in queues:
queue.append((ctxt, message, reply_q)) queue.append((ctxt, message, reply_q, requeue))
def poll(self, target): def poll(self, target):
with self._queues_lock: with self._queues_lock:
@ -87,7 +97,7 @@ class FakeExchange(object):
queue = self._get_server_queue(target.topic, target.server) queue = self._get_server_queue(target.topic, target.server)
else: else:
queue = self._get_topic_queue(target.topic) queue = self._get_topic_queue(target.topic)
return queue.pop(0) if queue else (None, None, None) return queue.pop(0) if queue else (None, None, None, None)
class FakeDriver(base.BaseDriver): class FakeDriver(base.BaseDriver):

View File

@ -99,6 +99,10 @@ class QpidMessage(dict):
def acknowledge(self): def acknowledge(self):
self._session.acknowledge(self._raw_message) self._session.acknowledge(self._raw_message)
def requeue(self):
raise NotImplementedError('The QPID driver does not yet support '
'requeuing messages')
class ConsumerBase(object): class ConsumerBase(object):
"""Consumer base class.""" """Consumer base class."""

View File

@ -128,6 +128,9 @@ class RabbitMessage(dict):
def acknowledge(self): def acknowledge(self):
self._raw_message.ack() self._raw_message.ack()
def requeue(self):
self._raw_message.requeue()
class ConsumerBase(object): class ConsumerBase(object):
"""Consumer base class.""" """Consumer base class."""

View File

@ -843,6 +843,10 @@ class ZmqIncomingMessage(base.IncomingMessage):
with self.condition: with self.condition:
self.condition.notify() self.condition.notify()
def requeue(self):
raise NotImplementedError('The ZeroMQ driver does not yet support '
'requeuing messages')
class ZmqListener(base.Listener): class ZmqListener(base.Listener):
@ -960,6 +964,8 @@ class ZmqDriver(base.BaseDriver):
return listener return listener
def listen_for_notifications(self, targets_and_priorities): def listen_for_notifications(self, targets_and_priorities):
# NOTE(sileht): this listener implementation is limited
# because zeromq doesn't support requeing message
conn = create_connection(self.conf) conn = create_connection(self.conf)
listener = ZmqListener(self, None) listener = ZmqListener(self, None)

View File

@ -15,8 +15,10 @@
__all__ = ['Notifier', __all__ = ['Notifier',
'LoggingNotificationHandler', 'LoggingNotificationHandler',
'get_notification_listener'] 'get_notification_listener',
'NotificationResult']
from .notifier import * from .notifier import *
from .listener import * from .listener import *
from .logger import * from .logger import *
from .dispatcher import NotificationResult

View File

@ -28,6 +28,11 @@ LOG = logging.getLogger(__name__)
PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample'] PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample']
class NotificationResult(object):
HANDLED = 'handled'
REQUEUE = 'requeue'
class NotificationDispatcher(object): class NotificationDispatcher(object):
"""A message dispatcher which understands Notification messages. """A message dispatcher which understands Notification messages.
@ -59,8 +64,15 @@ class NotificationDispatcher(object):
@contextlib.contextmanager @contextlib.contextmanager
def __call__(self, incoming): def __call__(self, incoming):
yield lambda: self._dispatch_and_handle_error(incoming) result_wrapper = []
incoming.acknowledge()
yield lambda: result_wrapper.append(
self._dispatch_and_handle_error(incoming))
if result_wrapper[0] == NotificationResult.HANDLED:
incoming.acknowledge()
else:
incoming.requeue()
def _dispatch_and_handle_error(self, incoming): def _dispatch_and_handle_error(self, incoming):
"""Dispatch a notification message to the appropriate endpoint method. """Dispatch a notification message to the appropriate endpoint method.
@ -69,12 +81,13 @@ class NotificationDispatcher(object):
:type ctxt: IncomingMessage :type ctxt: IncomingMessage
""" """
try: try:
self._dispatch(incoming.ctxt, incoming.message) return self._dispatch(incoming.ctxt, incoming.message)
except Exception: except Exception:
# sys.exc_info() is deleted by LOG.exception(). # sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info() exc_info = sys.exc_info()
LOG.error('Exception during message handling', LOG.error('Exception during message handling',
exc_info=exc_info) exc_info=exc_info)
return NotificationResult.HANDLED
def _dispatch(self, ctxt, message): def _dispatch(self, ctxt, message):
"""Dispatch an RPC message to the appropriate endpoint method. """Dispatch an RPC message to the appropriate endpoint method.
@ -99,6 +112,10 @@ class NotificationDispatcher(object):
for callback in self._callbacks_by_priority.get(priority, []): for callback in self._callbacks_by_priority.get(priority, []):
localcontext.set_local_context(ctxt) localcontext.set_local_context(ctxt)
try: try:
callback(ctxt, publisher_id, event_type, payload) ret = callback(ctxt, publisher_id, event_type, payload)
ret = NotificationResult.HANDLED if ret is None else ret
if ret != NotificationResult.HANDLED:
return ret
finally: finally:
localcontext.clear_local_context() localcontext.clear_local_context()
return NotificationResult.HANDLED

View File

@ -73,6 +73,24 @@ priority
Parameters to endpoint methods are the request context supplied by the client, Parameters to endpoint methods are the request context supplied by the client,
the publisher_id of the notification message, the event_type, the payload. the publisher_id of the notification message, the event_type, the payload.
An endpoint method can return explicitly messaging.NotificationResult.HANDLED
to acknowledge a message or messaging.NotificationResult.REQUEUE to requeue the
message.
The message is acknowledge only if all endpoints return
messaging.NotificationResult.HANDLED
If nothing is returned by an endpoint, this is considered like
messaging.NotificationResult.HANDLED
messaging.NotificationResult values needs to be handled by drivers:
* HANDLED: supported by all drivers
* REQUEUE: supported by drivers: fake://, rabbit://
In case of an unsupported driver nothing is done to the message and a
NotImplementedError is raised and logged.
By supplying a serializer object, a listener can deserialize a request context By supplying a serializer object, a listener can deserialize a request context
and arguments from - and serialize return values to - primitive types. and arguments from - and serialize return values to - primitive types.
""" """

View File

@ -41,24 +41,58 @@ class TestDispatcher(test_utils.BaseTestCase):
('no_endpoints', ('no_endpoints',
dict(endpoints=[], dict(endpoints=[],
endpoints_expect_calls=[], endpoints_expect_calls=[],
priority='info')), priority='info',
ex=None,
return_value=messaging.NotificationResult.HANDLED)),
('one_endpoints', ('one_endpoints',
dict(endpoints=[['warn']], dict(endpoints=[['warn']],
endpoints_expect_calls=['warn'], endpoints_expect_calls=['warn'],
priority='warn')), priority='warn',
ex=None,
return_value=messaging.NotificationResult.HANDLED)),
('two_endpoints_only_one_match', ('two_endpoints_only_one_match',
dict(endpoints=[['warn'], ['info']], dict(endpoints=[['warn'], ['info']],
endpoints_expect_calls=[None, 'info'], endpoints_expect_calls=[None, 'info'],
priority='info')), priority='info',
ex=None,
return_value=messaging.NotificationResult.HANDLED)),
('two_endpoints_both_match', ('two_endpoints_both_match',
dict(endpoints=[['debug', 'info'], ['info', 'debug']], dict(endpoints=[['debug', 'info'], ['info', 'debug']],
endpoints_expect_calls=['debug', 'debug'], endpoints_expect_calls=['debug', 'debug'],
priority='debug')), priority='debug',
ex=None,
return_value=messaging.NotificationResult.HANDLED)),
('no_return_value',
dict(endpoints=[['warn']],
endpoints_expect_calls=['warn'],
priority='warn',
ex=None, return_value=None)),
('requeue',
dict(endpoints=[['debug', 'warn']],
endpoints_expect_calls=['debug'],
priority='debug', msg=notification_msg,
ex=None,
return_value=messaging.NotificationResult.REQUEUE)),
('exception',
dict(endpoints=[['debug', 'warn']],
endpoints_expect_calls=['debug'],
priority='debug', msg=notification_msg,
ex=Exception,
return_value=messaging.NotificationResult.HANDLED)),
] ]
def test_dispatcher(self): def test_dispatcher(self):
endpoints = [mock.Mock(spec=endpoint_methods) endpoints = []
for endpoint_methods in self.endpoints] for endpoint_methods in self.endpoints:
e = mock.Mock(spec=endpoint_methods)
endpoints.append(e)
for m in endpoint_methods:
method = getattr(e, m)
if self.ex:
method.side_effect = self.ex()
else:
method.return_value = self.return_value
msg = notification_msg.copy() msg = notification_msg.copy()
msg['priority'] = self.priority msg['priority'] = self.priority
@ -89,6 +123,17 @@ class TestDispatcher(test_utils.BaseTestCase):
else: else:
self.assertEqual(endpoints[i].call_count, 0) self.assertEqual(endpoints[i].call_count, 0)
if self.ex:
self.assertEqual(incoming.acknowledge.call_count, 1)
self.assertEqual(incoming.requeue.call_count, 0)
elif self.return_value == messaging.NotificationResult.HANDLED \
or self.return_value is None:
self.assertEqual(incoming.acknowledge.call_count, 1)
self.assertEqual(incoming.requeue.call_count, 0)
elif self.return_value == messaging.NotificationResult.REQUEUE:
self.assertEqual(incoming.acknowledge.call_count, 0)
self.assertEqual(incoming.requeue.call_count, 1)
@mock.patch('oslo.messaging.notify.dispatcher.LOG') @mock.patch('oslo.messaging.notify.dispatcher.LOG')
def test_dispatcher_unknown_prio(self, mylog): def test_dispatcher_unknown_prio(self, mylog):
msg = notification_msg.copy() msg = notification_msg.copy()

View File

@ -123,7 +123,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
transport = messaging.get_transport(self.conf, url='fake:') transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock() endpoint = mock.Mock()
endpoint.info = mock.Mock() endpoint.info.return_value = None
listener_thread = self._setup_listener(transport, [endpoint], 1) listener_thread = self._setup_listener(transport, [endpoint], 1)
notifier = self._setup_notifier(transport) notifier = self._setup_notifier(transport)
@ -138,7 +138,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
transport = messaging.get_transport(self.conf, url='fake:') transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock() endpoint = mock.Mock()
endpoint.info = mock.Mock() endpoint.info.return_value = None
topics = ["topic1", "topic2"] topics = ["topic1", "topic2"]
listener_thread = self._setup_listener(transport, [endpoint], 2, listener_thread = self._setup_listener(transport, [endpoint], 2,
topics=topics) topics=topics)
@ -157,9 +157,9 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
transport = messaging.get_transport(self.conf, url='fake:') transport = messaging.get_transport(self.conf, url='fake:')
endpoint1 = mock.Mock() endpoint1 = mock.Mock()
endpoint1.info = mock.Mock() endpoint1.info.return_value = None
endpoint2 = mock.Mock() endpoint2 = mock.Mock()
endpoint2.info = mock.Mock() endpoint2.info.return_value = messaging.NotificationResult.HANDLED
listener_thread = self._setup_listener(transport, listener_thread = self._setup_listener(transport,
[endpoint1, endpoint2], 1) [endpoint1, endpoint2], 1)
notifier = self._setup_notifier(transport) notifier = self._setup_notifier(transport)
@ -171,3 +171,25 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
{}, 'testpublisher', 'an_event.start', 'test') {}, 'testpublisher', 'an_event.start', 'test')
endpoint2.info.assert_called_once_with( endpoint2.info.assert_called_once_with(
{}, 'testpublisher', 'an_event.start', 'test') {}, 'testpublisher', 'an_event.start', 'test')
def test_requeue(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info = mock.Mock()
def side_effect_requeue(*args, **kwargs):
if endpoint.info.call_count == 1:
return messaging.NotificationResult.REQUEUE
return messaging.NotificationResult.HANDLED
endpoint.info.side_effect = side_effect_requeue
listener_thread = self._setup_listener(transport,
[endpoint], 2)
notifier = self._setup_notifier(transport)
notifier.info({}, 'an_event.start', 'test')
self._stop_listener(listener_thread)
expected = [mock.call({}, 'testpublisher', 'an_event.start', 'test'),
mock.call({}, 'testpublisher', 'an_event.start', 'test')]
self.assertEqual(endpoint.info.call_args_list, expected)

View File

@ -206,6 +206,7 @@ class TestSendReceive(test_utils.BaseTestCase):
senders[i].start() senders[i].start()
received = listener.poll() received = listener.poll()
received.message.pop('_unique_id')
self.assertIsNotNone(received) self.assertIsNotNone(received)
self.assertEqual(received.ctxt, self.ctxt) self.assertEqual(received.ctxt, self.ctxt)
self.assertEqual(received.message, {'tx_id': i}) self.assertEqual(received.message, {'tx_id': i})
@ -302,12 +303,14 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
senders[0].start() senders[0].start()
msgs.append(listener.poll()) msgs.append(listener.poll())
msgs[-1].message.pop('_unique_id')
self.assertEqual(msgs[-1].message, {'tx_id': 0}) self.assertEqual(msgs[-1].message, {'tx_id': 0})
# Start the second guy, receive his message # Start the second guy, receive his message
senders[1].start() senders[1].start()
msgs.append(listener.poll()) msgs.append(listener.poll())
msgs[-1].message.pop('_unique_id')
self.assertEqual(msgs[-1].message, {'tx_id': 1}) self.assertEqual(msgs[-1].message, {'tx_id': 1})
# Reply to both in order, making the second thread queue # Reply to both in order, making the second thread queue
@ -602,6 +605,7 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
producer.publish(msg) producer.publish(msg)
received = listener.poll() received = listener.poll()
received.message.pop('_unique_id')
self.assertIsNotNone(received) self.assertIsNotNone(received)
self.assertEqual(self.expected_ctxt, received.ctxt) self.assertEqual(self.expected_ctxt, received.ctxt)
self.assertEqual(self.expected, received.message) self.assertEqual(self.expected, received.message)