diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index bd1bf0c09..e4268648a 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -14,6 +14,7 @@ import collections import contextlib +import functools import logging import os import socket @@ -248,134 +249,6 @@ class Consumer(object): message.ack() -class Publisher(object): - """Publisher that silently creates exchange but no queues.""" - - passive = False - - def __init__(self, conf, exchange_name, routing_key, type, durable, - auto_delete): - """Init the Publisher class with the exchange_name, routing_key, - type, durable auto_delete - """ - self.queue_arguments = _get_queue_arguments(conf) - self.exchange_name = exchange_name - self.routing_key = routing_key - self.auto_delete = auto_delete - self.durable = durable - self.exchange = kombu.entity.Exchange(name=self.exchange_name, - type=type, - durable=durable, - auto_delete=auto_delete, - passive=self.passive) - - def send(self, conn, msg, timeout=None): - """Send a message on an channel.""" - producer = kombu.messaging.Producer(exchange=self.exchange, - channel=conn.channel, - routing_key=self.routing_key) - - headers = {} - if timeout: - # AMQP TTL is in milliseconds when set in the property. - # Details: http://www.rabbitmq.com/ttl.html#per-message-ttl - # NOTE(sileht): this amqp header doesn't exists ... LP#1444854 - headers['ttl'] = timeout * 1000 - - # NOTE(sileht): no need to wait more, caller expects - # a answer before timeout is reached - transport_timeout = timeout - - heartbeat_timeout = conn.driver_conf.heartbeat_timeout_threshold - if (conn._heartbeat_supported_and_enabled() and ( - transport_timeout is None or - transport_timeout > heartbeat_timeout)): - # NOTE(sileht): we are supposed to send heartbeat every - # heartbeat_timeout, no need to wait more otherwise will - # disconnect us, so raise timeout earlier ourself - transport_timeout = heartbeat_timeout - - with conn._transport_socket_timeout(transport_timeout): - producer.publish(msg, headers=headers) - - -class DeclareQueuePublisher(Publisher): - """Publisher that declares a default queue - - When the exchange is missing instead of silently creating an exchange - not binded to a queue, this publisher creates a default queue - named with the routing_key. - - This is mainly used to not miss notifications in case of nobody consumes - them yet. If the future consumer binds the default queue it can retrieve - missing messages. - """ - - DECLARED_QUEUES = collections.defaultdict(set) - - def send(self, conn, msg, timeout=None): - queue_indentifier = (self.exchange_name, - self.routing_key) - # NOTE(sileht): We only do it once per reconnection - # the Connection._set_current_channel() is responsible to clear - # this cache - if queue_indentifier not in self.DECLARED_QUEUES[conn.channel]: - queue = kombu.entity.Queue( - channel=conn.channel, - exchange=self.exchange, - durable=self.durable, - auto_delete=self.auto_delete, - name=self.routing_key, - routing_key=self.routing_key, - queue_arguments=self.queue_arguments) - queue.declare() - self.DECLARED_QUEUES[conn.channel].add(queue_indentifier) - super(DeclareQueuePublisher, self).send( - conn, msg, timeout) - - @classmethod - def reset_cache(cls, channel): - cls.DECLARED_QUEUES.pop(channel, None) - - -class RetryOnMissingExchangePublisher(Publisher): - """Publisher that retry during 60 seconds if the exchange is missing.""" - - passive = True - - def send(self, conn, msg, timeout=None): - # TODO(sileht): - # * use timeout parameter when available - # * use rpc_timeout if not instead of hardcoded 60 - # * use @retrying - timer = rpc_common.DecayingTimer(duration=60) - timer.start() - - while True: - try: - super(RetryOnMissingExchangePublisher, self).send(conn, msg, - timeout) - return - except conn.connection.channel_errors as exc: - # NOTE(noelbk/sileht): - # If rabbit dies, the consumer can be disconnected before the - # publisher sends, and if the consumer hasn't declared the - # queue, the publisher's will send a message to an exchange - # that's not bound to a queue, and the message wll be lost. - # So we set passive=True to the publisher exchange and catch - # the 404 kombu ChannelError and retry until the exchange - # appears - if exc.code == 404 and timer.check_return() > 0: - LOG.info(_LI("The exchange %(exchange)s to send to " - "%(routing_key)s doesn't exist yet, " - "retrying...") % { - 'exchange': self.exchange, - 'routing_key': self.routing_key}) - time.sleep(1) - continue - raise - - class DummyConnectionLock(object): def acquire(self): pass @@ -795,7 +668,7 @@ class Connection(object): NOTE(sileht): Must be called within the connection lock """ if self.channel is not None and new_channel != self.channel: - DeclareQueuePublisher.reset_cache(self.channel) + self.PUBLISHER_DECLARED_QUEUES.pop(self.channel, None) self.connection.maybe_close_channel(self.channel) self.channel = new_channel @@ -980,20 +853,8 @@ class Connection(object): recoverable_error_callback=_recoverable_error_callback, error_callback=_error_callback) - def publisher_send(self, publisher, msg, timeout=None, retry=None): - """Send to a publisher based on the publisher class.""" - - def _error_callback(exc): - log_info = {'topic': publisher.exchange_name, 'err_str': exc} - LOG.error(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s"), log_info) - LOG.debug('Exception', exc_info=exc) - - def _publish(): - publisher.send(self, msg, timeout) - - with self._connection_lock: - self.ensure(_publish, retry=retry, error_callback=_error_callback) + def stop_consuming(self): + self._consume_loop_stopped = True def declare_direct_consumer(self, topic, callback): """Create a 'direct' queue. @@ -1045,54 +906,166 @@ class Connection(object): self.declare_consumer(consumer) + def _ensure_publishing(self, method, exchange, msg, routing_key=None, + timeout=None, retry=None): + """Send to a publisher based on the publisher class.""" + + def _error_callback(exc): + log_info = {'topic': exchange.name, 'err_str': exc} + LOG.error(_("Failed to publish message to topic " + "'%(topic)s': %(err_str)s"), log_info) + LOG.debug('Exception', exc_info=exc) + + method = functools.partial(method, exchange, msg, routing_key, timeout) + + with self._connection_lock: + self.ensure(method, retry=retry, error_callback=_error_callback) + + def _publish(self, exchange, msg, routing_key=None, timeout=None): + """Publish a message.""" + producer = kombu.messaging.Producer(exchange=exchange, + channel=self.channel, + routing_key=routing_key) + + headers = {} + if timeout: + # AMQP TTL is in milliseconds when set in the property. + # Details: http://www.rabbitmq.com/ttl.html#per-message-ttl + # NOTE(sileht): this amqp header doesn't exists ... LP#1444854 + headers['ttl'] = timeout * 1000 + + # NOTE(sileht): no need to wait more, caller expects + # a answer before timeout is reached + transport_timeout = timeout + + heartbeat_timeout = self.driver_conf.heartbeat_timeout_threshold + if (self._heartbeat_supported_and_enabled() and ( + transport_timeout is None or + transport_timeout > heartbeat_timeout)): + # NOTE(sileht): we are supposed to send heartbeat every + # heartbeat_timeout, no need to wait more otherwise will + # disconnect us, so raise timeout earlier ourself + transport_timeout = heartbeat_timeout + + with self._transport_socket_timeout(transport_timeout): + producer.publish(msg, headers=headers) + + PUBLISHER_DECLARED_QUEUES = collections.defaultdict(set) + + def _publish_and_creates_default_queue(self, exchange, msg, + routing_key=None, timeout=None): + """Publisher that declares a default queue + + When the exchange is missing instead of silency creates an exchange + not binded to a queue, this publisher creates a default queue + named with the routing_key + + This is mainly used to not miss notification in case of nobody consumes + them yet. If the futur consumer bind the default queue it can retrieve + missing messages. + + _set_current_channel is responsible to cleanup the cache. + """ + queue_indentifier = (exchange.name, routing_key) + # NOTE(sileht): We only do it once per reconnection + # the Connection._set_current_channel() is responsible to clear + # this cache + if (queue_indentifier not in + self.PUBLISHER_DECLARED_QUEUES[self.channel]): + queue = kombu.entity.Queue( + channel=self.channel, + exchange=exchange, + durable=exchange.durable, + auto_delete=exchange.auto_delete, + name=routing_key, + routing_key=routing_key, + queue_arguments=_get_queue_arguments(self.driver_conf)) + queue.declare() + self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier) + + self._publish(exchange, msg, routing_key=routing_key, timeout=timeout) + + def _publish_and_retry_on_missing_exchange(self, exchange, msg, + routing_key=None, timeout=None): + """Publisher that retry during 60 seconds if the exchange is missing. + """ + + if not exchange.passive: + RuntimeError("_publish_and_retry_on_missing_exchange() must be " + "called with an passive exchange.") + + # TODO(sileht): + # * use timeout parameter when available + # * use rpc_timeout if not instead of hardcoded 60 + # * use @retrying + timer = rpc_common.DecayingTimer(duration=60) + timer.start() + + while True: + try: + self._publish(exchange, msg, routing_key=routing_key, + timeout=timeout) + return + except self.connection.channel_errors as exc: + # NOTE(noelbk/sileht): + # If rabbit dies, the consumer can be disconnected before the + # publisher sends, and if the consumer hasn't declared the + # queue, the publisher's will send a message to an exchange + # that's not bound to a queue, and the message wll be lost. + # So we set passive=True to the publisher exchange and catch + # the 404 kombu ChannelError and retry until the exchange + # appears + if exc.code == 404 and timer.check_return() > 0: + LOG.info(_LI("The exchange %(exchange)s to send to " + "%(routing_key)s doesn't exist yet, " + "retrying...") % { + 'exchange': exchange.name, + 'routing_key': routing_key}) + time.sleep(1) + continue + raise + def direct_send(self, msg_id, msg): """Send a 'direct' message.""" + exchange = kombu.entity.Exchange(name=msg_id, + type='direct', + durable=False, + auto_delete=True, + passive=True) - p = RetryOnMissingExchangePublisher(self.driver_conf, - exchange_name=msg_id, - routing_key=msg_id, - type='direct', - durable=False, - auto_delete=True) - - self.publisher_send(p, msg) + self._ensure_publishing(self._publish_and_retry_on_missing_exchange, + exchange, msg, routing_key=msg_id) def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): """Send a 'topic' message.""" - p = Publisher(self.driver_conf, - exchange_name=exchange_name, - routing_key=topic, - type='topic', - durable=self.driver_conf.amqp_durable_queues, - auto_delete=self.driver_conf.amqp_auto_delete) - self.publisher_send(p, msg, timeout, retry=retry) - - def fanout_send(self, topic, msg, retry=None): - """Send a 'fanout' message.""" - - p = Publisher(self.driver_conf, - exchange_name='%s_fanout' % topic, - routing_key=None, - type='fanout', - durable=False, - auto_delete=True) - - self.publisher_send(p, msg, retry=retry) - - def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): - """Send a notify message on a topic.""" - p = DeclareQueuePublisher( - self.driver_conf, - exchange_name=exchange_name, - routing_key=topic, + exchange = kombu.entity.Exchange( + name=exchange_name, type='topic', durable=self.driver_conf.amqp_durable_queues, auto_delete=self.driver_conf.amqp_auto_delete) - self.publisher_send(p, msg, timeout=None, retry=retry) + self._ensure_publishing(self._publish, exchange, msg, + routing_key=topic, retry=retry) - def stop_consuming(self): - self._consume_loop_stopped = True + def fanout_send(self, topic, msg, retry=None): + """Send a 'fanout' message.""" + exchange = kombu.entity.Exchange(name='%s_fanout' % topic, + type='fanout', + durable=False, + auto_delete=True) + + self._ensure_publishing(self._publish, exchange, msg, retry=retry) + + def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): + """Send a notify message on a topic.""" + exchange = kombu.entity.Exchange( + name=exchange_name, + type='topic', + durable=self.driver_conf.amqp_durable_queues, + auto_delete=self.driver_conf.amqp_auto_delete) + + self._ensure_publishing(self._publish_and_creates_default_queue, + exchange, msg, routing_key=topic, retry=retry) class RabbitDriver(amqpdriver.AMQPDriverBase): diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 6f5b0bd7a..69bc1bada 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -173,10 +173,6 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase): heartbeat=0, failover_strategy="shuffle") -class RaiseOnNoExchangePublisher(rabbit_driver.Publisher): - passive = True - - class TestRabbitPublisher(test_utils.BaseTestCase): def test_declared_queue_publisher(self): @@ -184,41 +180,40 @@ class TestRabbitPublisher(test_utils.BaseTestCase): 'kombu+memory:////') self.addCleanup(transport.cleanup) - p1 = RaiseOnNoExchangePublisher( - self.conf.oslo_messaging_rabbit, - exchange_name='foobar', - routing_key='foobar', + e_passive = kombu.entity.Exchange( + name='foobar', type='topic', - durable=False, - auto_delete=False) + passive=True) - p2 = rabbit_driver.DeclareQueuePublisher( - self.conf.oslo_messaging_rabbit, - exchange_name='foobar', - routing_key='foobar', + e_active = kombu.entity.Exchange( + name='foobar', type='topic', - durable=False, - auto_delete=False) + passive=False) with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn: conn = pool_conn.connection exc = conn.connection.channel_errors[0] - # Ensure the exchange does not exists - self.assertRaises(exc, conn.publisher_send, p1, {}) - # Creates it - conn.publisher_send(p2, {}) - # Ensure it creates it - conn.publisher_send(p1, {}) - with mock.patch('kombu.messaging.Producer', - side_effect=exc): + def try_send(exchange): + conn._ensure_publishing( + conn._publish_and_creates_default_queue, + exchange, {}, routing_key='foobar') + + # Ensure the exchange does not exists + self.assertRaises(exc, try_send, e_passive) + # Create it + try_send(e_active) + # Ensure it creates it + try_send(e_passive) + + with mock.patch('kombu.messaging.Producer', side_effect=exc): # Shoud reset the cache and ensures the exchange does - # not exitsts - self.assertRaises(exc, conn.publisher_send, p1, {}) - # Recreates it - conn.publisher_send(p2, {}) + # not exists + self.assertRaises(exc, try_send, e_passive) + # Recreate it + try_send(e_active) # Ensure it have been recreated - conn.publisher_send(p1, {}) + try_send(e_passive) class TestRabbitConsume(test_utils.BaseTestCase):