diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index b8b9fcefe..3a2303d39 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -358,7 +358,8 @@ class AMQPDriverBase(base.BaseDriver): return self._reply_q def _send(self, target, ctxt, message, - wait_for_reply=None, timeout=None, envelope=True): + wait_for_reply=None, timeout=None, + envelope=True, notify=False): # FIXME(markmc): remove this temporary hack class Context(object): @@ -388,7 +389,9 @@ class AMQPDriverBase(base.BaseDriver): try: with self._get_connection() as conn: - if target.fanout: + if notify: + conn.notify_send(target.topic, msg) + elif target.fanout: conn.fanout_send(target.topic, msg) else: topic = target.topic @@ -409,7 +412,8 @@ class AMQPDriverBase(base.BaseDriver): return self._send(target, ctxt, message, wait_for_reply, timeout) def send_notification(self, target, ctxt, message, version): - return self._send(target, ctxt, message, envelope=(version == 2.0)) + return self._send(target, ctxt, message, + envelope=(version == 2.0), notify=True) def listen(self, target): conn = self._get_connection(pooled=False)