From 206c19e99eeea4d6fb215fba5210873435baa6d3 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Thu, 1 Aug 2013 23:32:17 +0100 Subject: [PATCH] Add a driver method specifically for sending notifications Notifications are an unusual case in that we need users to manually opt in to new incompatible message formats by editing configuration because there may be external consumers expecting the old format. Add a send_notification() method to the driver interface and add a format version paramater to the method, to make it clear that this version selection is specifically for notifications. In the case of the rabbit/qpid drivers, the 2.0 format is where we added the message envelope. Change-Id: Ib4925c308b1252503749962aa16f043281f2b429 --- oslo/messaging/_drivers/amqpdriver.py | 14 ++++++++++---- oslo/messaging/_drivers/base.py | 4 ++++ oslo/messaging/_drivers/impl_fake.py | 9 +++++++-- oslo/messaging/notify/_impl_messaging.py | 10 +++++----- oslo/messaging/transport.py | 12 ++++++++---- tests/test_notifier.py | 12 ++++++------ tests/test_transport.py | 21 +++++++++++++++------ 7 files changed, 55 insertions(+), 27 deletions(-) diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index fb001f10b..ab77f4f78 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -277,8 +277,8 @@ class AMQPDriverBase(base.BaseDriver): return self._reply_q - def send(self, target, ctxt, message, - wait_for_reply=None, timeout=None, envelope=False): + def _send(self, target, ctxt, message, + wait_for_reply=None, timeout=None, envelope=True): # FIXME(markmc): remove this temporary hack class Context(object): @@ -299,8 +299,8 @@ class AMQPDriverBase(base.BaseDriver): msg.update({'_reply_q': self._get_reply_q()}) - # FIXME(markmc): handle envelope param - msg = rpc_common.serialize_msg(msg) + if envelope: + msg = rpc_common.serialize_msg(msg) if wait_for_reply: self._waiter.listen(msg_id) @@ -324,6 +324,12 @@ class AMQPDriverBase(base.BaseDriver): if wait_for_reply: self._waiter.unlisten(msg_id) + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + return self._send(target, ctxt, message, wait_for_reply, timeout) + + def send_notification(self, target, ctxt, message, version): + return self._send(target, ctxt, message, envelope=(version == 2.0)) + def listen(self, target): conn = self._get_connection(pooled=False) diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index 2b7b8e028..1dd49f4e6 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -65,6 +65,10 @@ class BaseDriver(object): wait_for_reply=None, timeout=None, envelope=False): """Send a message to the given target.""" + @abc.abstractmethod + def send_notification(self, target, ctxt, message, version): + """Send a notification message to the given target.""" + @abc.abstractmethod def listen(self, target): """Construct a Listener for the given target.""" diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index 6afbfad37..3b4c99e18 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -112,8 +112,7 @@ class FakeDriver(base.BaseDriver): while self._exchanges_lock: return self._exchanges.setdefault(name, FakeExchange(name)) - def send(self, target, ctxt, message, - wait_for_reply=None, timeout=None, envelope=False): + def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None): self._check_serialize(message) exchange = self._get_exchange(target.exchange or @@ -137,6 +136,12 @@ class FakeDriver(base.BaseDriver): return None + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + return self._send(target, ctxt, message, wait_for_reply, timeout) + + def send_notification(self, target, ctxt, message, version): + self._send(target, ctxt, message) + def listen(self, target): exchange = self._get_exchange(target.exchange or self._default_exchange) diff --git a/oslo/messaging/notify/_impl_messaging.py b/oslo/messaging/notify/_impl_messaging.py index d422fbd6e..1eca98ec5 100644 --- a/oslo/messaging/notify/_impl_messaging.py +++ b/oslo/messaging/notify/_impl_messaging.py @@ -34,17 +34,17 @@ class MessagingDriver(notifier._Driver): deployed which do not support the 2.0 message format. """ - def __init__(self, conf, topics, transport, envelope=False): + def __init__(self, conf, topics, transport, version=1.0): super(MessagingDriver, self).__init__(conf, topics, transport) - self.envelope = envelope + self.version = version def notify(self, ctxt, message, priority): priority = priority.lower() for topic in self.topics: target = messaging.Target(topic='%s.%s' % (topic, priority)) try: - self.transport._send(target, ctxt, message, - envelope=self.envelope) + self.transport._send_notification(target, ctxt, message, + version=self.version) except Exception: LOG.exception("Could not send notification to %(topic)s. " "Payload=%(message)s", @@ -56,4 +56,4 @@ class MessagingV2Driver(MessagingDriver): "Send notifications using the 2.0 message format." def __init__(self, conf, **kwargs): - super(MessagingV2Driver, self).__init__(conf, envelope=True, **kwargs) + super(MessagingV2Driver, self).__init__(conf, version=2.0, **kwargs) diff --git a/oslo/messaging/transport.py b/oslo/messaging/transport.py index f4667c83b..20083688f 100644 --- a/oslo/messaging/transport.py +++ b/oslo/messaging/transport.py @@ -75,15 +75,19 @@ class Transport(object): self.conf = driver.conf self._driver = driver - def _send(self, target, ctxt, message, - wait_for_reply=None, timeout=None, envelope=False): + def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None): if not target.topic: raise exceptions.InvalidTarget('A topic is required to send', target) return self._driver.send(target, ctxt, message, wait_for_reply=wait_for_reply, - timeout=timeout, - envelope=envelope) + timeout=timeout) + + def _send_notification(self, target, ctxt, message, version): + if not target.topic: + raise exceptions.InvalidTarget('A topic is required to send', + target) + self._driver.send(target, ctxt, message, version) def _listen(self, target): if not (target.topic and target.server): diff --git a/tests/test_notifier.py b/tests/test_notifier.py index a8ebc8691..4a7ed29c6 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -37,8 +37,7 @@ class _FakeTransport(object): def __init__(self, conf): self.conf = conf - def _send(self, target, ctxt, message, - wait_for_reply=None, timeout=None, envelope=False): + def _send_notification(self, target, ctxt, message, version): pass @@ -139,7 +138,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase): notifier = messaging.Notifier(transport, 'test.localhost') - self.mox.StubOutWithMock(transport, '_send') + self.mox.StubOutWithMock(transport, '_send_notification') message_id = uuid.uuid4() self.mox.StubOutWithMock(uuid, 'uuid4') @@ -158,15 +157,16 @@ class TestMessagingNotifier(test_utils.BaseTestCase): sends = [] if self.v1: - sends.append(dict(envelope=False)) + sends.append(dict(version=1.0)) if self.v2: - sends.append(dict(envelope=True)) + sends.append(dict(version=2.0)) for send_kwargs in sends: for topic in self.topics: target = messaging.Target(topic='%s.%s' % (topic, self.priority)) - transport._send(target, self.ctxt, message, **send_kwargs) + transport._send_notification(target, self.ctxt, message, + **send_kwargs) self.mox.ReplayAll() diff --git a/tests/test_transport.py b/tests/test_transport.py index 55e581c37..0722ac554 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -36,6 +36,9 @@ class _FakeDriver(object): def send(self, *args, **kwargs): pass + def send_notification(self, *args, **kwargs): + pass + def listen(self, target): pass @@ -239,8 +242,7 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): self.mox.StubOutWithMock(t._driver, 'send') t._driver.send(self._target, 'ctxt', 'message', wait_for_reply=None, - timeout=None, - envelope=False) + timeout=None) self.mox.ReplayAll() t._send(self._target, 'ctxt', 'message') @@ -251,14 +253,21 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): self.mox.StubOutWithMock(t._driver, 'send') t._driver.send(self._target, 'ctxt', 'message', wait_for_reply='wait_for_reply', - timeout='timeout', - envelope='envelope') + timeout='timeout') self.mox.ReplayAll() t._send(self._target, 'ctxt', 'message', wait_for_reply='wait_for_reply', - timeout='timeout', - envelope='envelope') + timeout='timeout') + + def test_send_notification(self): + t = transport.Transport(_FakeDriver(cfg.CONF)) + + self.mox.StubOutWithMock(t._driver, 'send') + t._driver.send(self._target, 'ctxt', 'message', 1.0) + self.mox.ReplayAll() + + t._send_notification(self._target, 'ctxt', 'message', version=1.0) def test_listen(self): t = transport.Transport(_FakeDriver(cfg.CONF))