notification listener: add allow_requeue param
In commit d8d2ad9 we added support for notification listener endpoint methods to return REQUEUE, but if a driver does not support this we raise NotImplementedError when the application attempts to requeue a message. This requeuing behaviour might only be used by an application in unusual, exceptional circumstances and catch users by surprise. Instead, let's require the application to assert that it needs this feature in advance and raise NotImplementError at that point if the driver doesn't support it. Change-Id: Id0bb0e57d2dcc1ec7d752e98c9b1e8e48d99f35c
This commit is contained in:
parent
d8d2ad95d7
commit
5bd31315c2
@ -67,6 +67,11 @@ class BaseDriver(object):
|
||||
self._default_exchange = default_exchange
|
||||
self._allowed_remote_exmods = allowed_remote_exmods
|
||||
|
||||
def require_features(self, requeue=False):
|
||||
if requeue:
|
||||
raise NotImplementedError('Message requeueing not supported by '
|
||||
'this transport driver')
|
||||
|
||||
@abc.abstractmethod
|
||||
def send(self, target, ctxt, message,
|
||||
wait_for_reply=None, timeout=None, envelope=False):
|
||||
|
@ -112,6 +112,9 @@ class FakeDriver(base.BaseDriver):
|
||||
self._exchanges_lock = threading.Lock()
|
||||
self._exchanges = {}
|
||||
|
||||
def require_features(self, requeue=True):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _check_serialize(message):
|
||||
"""Make sure a message intended for rpc can be serialized.
|
||||
|
@ -100,8 +100,7 @@ class QpidMessage(dict):
|
||||
self._session.acknowledge(self._raw_message)
|
||||
|
||||
def requeue(self):
|
||||
raise NotImplementedError('The QPID driver does not yet support '
|
||||
'requeuing messages')
|
||||
pass
|
||||
|
||||
|
||||
class ConsumerBase(object):
|
||||
|
@ -744,3 +744,6 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
|
||||
connection_pool,
|
||||
default_exchange,
|
||||
allowed_remote_exmods)
|
||||
|
||||
def require_features(self, requeue=True):
|
||||
pass
|
||||
|
@ -844,8 +844,7 @@ class ZmqIncomingMessage(base.IncomingMessage):
|
||||
self.condition.notify()
|
||||
|
||||
def requeue(self):
|
||||
raise NotImplementedError('The ZeroMQ driver does not yet support '
|
||||
'requeuing messages')
|
||||
pass
|
||||
|
||||
|
||||
class ZmqListener(base.Listener):
|
||||
|
@ -44,10 +44,11 @@ class NotificationDispatcher(object):
|
||||
message to the endpoints
|
||||
"""
|
||||
|
||||
def __init__(self, targets, endpoints, serializer):
|
||||
def __init__(self, targets, endpoints, serializer, allow_requeue):
|
||||
self.targets = targets
|
||||
self.endpoints = endpoints
|
||||
self.serializer = serializer or msg_serializer.NoOpSerializer()
|
||||
self.allow_requeue = allow_requeue
|
||||
|
||||
self._callbacks_by_priority = {}
|
||||
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
|
||||
@ -114,7 +115,7 @@ class NotificationDispatcher(object):
|
||||
try:
|
||||
ret = callback(ctxt, publisher_id, event_type, payload)
|
||||
ret = NotificationResult.HANDLED if ret is None else ret
|
||||
if ret != NotificationResult.HANDLED:
|
||||
if self.allow_requeue and ret == NotificationResult.REQUEUE:
|
||||
return ret
|
||||
finally:
|
||||
localcontext.clear_local_context()
|
||||
|
@ -73,26 +73,20 @@ priority
|
||||
Parameters to endpoint methods are the request context supplied by the client,
|
||||
the publisher_id of the notification message, the event_type, the payload.
|
||||
|
||||
An endpoint method can return explicitly messaging.NotificationResult.HANDLED
|
||||
By supplying a serializer object, a listener can deserialize a request context
|
||||
and arguments from - and serialize return values to - primitive types.
|
||||
|
||||
An endpoint method can explicitly return messaging.NotificationResult.HANDLED
|
||||
to acknowledge a message or messaging.NotificationResult.REQUEUE to requeue the
|
||||
message.
|
||||
|
||||
The message is acknowledge only if all endpoints return
|
||||
messaging.NotificationResult.HANDLED
|
||||
The message is acknowledged only if all endpoints either return
|
||||
messaging.NotificationResult.HANDLED or None.
|
||||
|
||||
If nothing is returned by an endpoint, this is considered like
|
||||
messaging.NotificationResult.HANDLED
|
||||
|
||||
messaging.NotificationResult values needs to be handled by drivers:
|
||||
|
||||
* HANDLED: supported by all drivers
|
||||
* REQUEUE: supported by drivers: fake://, rabbit://
|
||||
|
||||
In case of an unsupported driver nothing is done to the message and a
|
||||
NotImplementedError is raised and logged.
|
||||
|
||||
By supplying a serializer object, a listener can deserialize a request context
|
||||
and arguments from - and serialize return values to - primitive types.
|
||||
Note that not all transport drivers implement support for requeueing. In order
|
||||
to use this feature, applications should assert that the feature is available
|
||||
by passing allow_requeue=True to get_notification_listener(). If the driver
|
||||
does not support requeueing, it will raise NotImplementedError at this point.
|
||||
"""
|
||||
|
||||
from oslo.messaging.notify import dispatcher as notify_dispatcher
|
||||
@ -100,7 +94,8 @@ from oslo.messaging import server as msg_server
|
||||
|
||||
|
||||
def get_notification_listener(transport, targets, endpoints,
|
||||
executor='blocking', serializer=None):
|
||||
executor='blocking', serializer=None,
|
||||
allow_requeue=False):
|
||||
"""Construct a notification listener
|
||||
|
||||
The executor parameter controls how incoming messages will be received and
|
||||
@ -117,7 +112,12 @@ def get_notification_listener(transport, targets, endpoints,
|
||||
:type executor: str
|
||||
:param serializer: an optional entity serializer
|
||||
:type serializer: Serializer
|
||||
:param allow_requeue: whether NotificationResult.REQUEUE support is needed
|
||||
:type allow_requeue: bool
|
||||
:raises: NotImplementedError
|
||||
"""
|
||||
transport._require_driver_features(requeue=allow_requeue)
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints,
|
||||
serializer)
|
||||
serializer,
|
||||
allow_requeue)
|
||||
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
|
||||
|
@ -78,6 +78,9 @@ class Transport(object):
|
||||
self.conf = driver.conf
|
||||
self._driver = driver
|
||||
|
||||
def _require_driver_features(self, requeue=False):
|
||||
self._driver.require_features(requeue=requeue)
|
||||
|
||||
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
|
||||
if not target.topic:
|
||||
raise exceptions.InvalidTarget('A topic is required to send',
|
||||
|
@ -97,9 +97,8 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
msg['priority'] = self.priority
|
||||
|
||||
targets = [messaging.Target(topic='notifications')]
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(targets,
|
||||
endpoints,
|
||||
None)
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||
targets, endpoints, None, allow_requeue=True)
|
||||
|
||||
# check it listen on wanted topics
|
||||
self.assertEqual(sorted(dispatcher._targets_priorities),
|
||||
@ -138,9 +137,8 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
def test_dispatcher_unknown_prio(self, mylog):
|
||||
msg = notification_msg.copy()
|
||||
msg['priority'] = 'what???'
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher([mock.Mock()],
|
||||
[mock.Mock()],
|
||||
None)
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||
[mock.Mock()], [mock.Mock()], None, allow_requeue=True)
|
||||
with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback:
|
||||
callback()
|
||||
mylog.warning.assert_called_once_with('Unknown priority "what???"')
|
||||
|
@ -35,7 +35,7 @@ class ListenerSetupMixin(object):
|
||||
self._expect_messages = expect_messages
|
||||
self._received_msgs = 0
|
||||
self._listener = messaging.get_notification_listener(
|
||||
transport, targets, endpoints + [self])
|
||||
transport, targets, endpoints + [self], allow_requeue=True)
|
||||
|
||||
def info(self, ctxt, publisher_id, event_type, payload):
|
||||
self._received_msgs += 1
|
||||
|
Loading…
x
Reference in New Issue
Block a user