From 4937949dffecdf8863a7876e5a6b0b18e811c3ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Herv=C3=A9=20Beraud?= Date: Fri, 8 Jan 2021 15:54:54 +0100 Subject: [PATCH] Correctly handle missing RabbitMQ queues Currently, setting the '[oslo_messaging] direct_mandatory_flag' config option to 'True' (the default) will result in a 'MessageUndeliverable' exception being raised when sending a reply if a RabbitMQ queue is missing [1]. It was the responsibility of the application to handle this exception, however, many applications are not doing so. This has resulted in a number of bug reports. Start handling this error condition, using a retry loop to attempt to resend the message and work around any temporary glitches. Since attempting to send a reply will will no longer raise an exception, there is little benefit in retaining the '[oslo_messaging] direct_mandatory_flag' config option: users setting this to False will simply not benefit from the retry logic and improved logging added here. This option is already deprecated though and will be fully removed in a future release. [1] https://www.rabbitmq.com/channels.html Change-Id: Id5cddbefbe24ef100f1cc522f44430df77d217cb Closes-Bug: #1905965 --- doc/source/admin/rabbit.rst | 3 +- oslo_messaging/_drivers/amqpdriver.py | 72 +++++++++++++------ oslo_messaging/_drivers/impl_rabbit.py | 3 + ...handle-missing-queue-553a803f94976be7.yaml | 5 ++ 4 files changed, 60 insertions(+), 23 deletions(-) create mode 100644 releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml diff --git a/doc/source/admin/rabbit.rst b/doc/source/admin/rabbit.rst index 7e75f6ab1..142bdf70e 100644 --- a/doc/source/admin/rabbit.rst +++ b/doc/source/admin/rabbit.rst @@ -66,7 +66,8 @@ flag is used`_. through the *Connection* class. With mandatory flag RabbitMQ raises a callback if the message is not routed to -any queue. +any queue. This callback will be used to loop for a timeout and let's a chance +to sender to recover. .. _Exchange is a AMQP mechanism: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges .. _queues: https://www.rabbitmq.com/queues.html diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 011222cee..cdc21c5a5 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -145,39 +145,67 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): while True: try: with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: + rpc_common.PURPOSE_SEND, + ) as conn: self._send_reply(conn, reply, failure) + return - except rpc_amqp.AMQPDestinationNotFound: - if timer.check_return() > 0: - LOG.debug(("The reply %(msg_id)s cannot be sent " - "%(reply_q)s reply queue doesn't exist, " - "retrying..."), { - 'msg_id': self.msg_id, - 'reply_q': self.reply_q}) - time.sleep(0.25) - else: + except oslo_messaging.MessageUndeliverable: + # queue not found + if timer.check_return() <= 0: self._obsolete_reply_queues.add(self.reply_q, self.msg_id) - infos = { - 'msg_id': self.msg_id, - 'reply_q': self.reply_q, - 'duration': duration - } - LOG.info("The reply %(msg_id)s cannot be sent " - "%(reply_q)s reply queue don't exist after " - "%(duration)s sec abandoning...", infos) + LOG.error( + 'The reply %(msg_id)s failed to send after ' + '%(duration)d seconds due to a missing queue ' + '(%(reply_q)s). Abandoning...', { + 'msg_id': self.msg_id, + 'duration': duration, + 'reply_q': self.reply_q}) return + LOG.debug( + 'The reply %(msg_id)s could not be sent due to a missing ' + 'queue (%(reply_q)s). Retrying...', { + 'msg_id': self.msg_id, + 'reply_q': self.reply_q}) + time.sleep(0.25) + except rpc_amqp.AMQPDestinationNotFound as exc: + # exchange not found/down + if timer.check_return() <= 0: + self._obsolete_reply_queues.add(self.reply_q, self.msg_id) + LOG.error( + 'The reply %(msg_id)s failed to send after ' + '%(duration)d seconds due to a broker issue ' + '(%(exc)s). Abandoning...', { + 'msg_id': self.msg_id, + 'duration': duration, + 'exc': exc}) + return + + LOG.debug( + 'The reply %(msg_id)s could not be sent due to a broker ' + 'issue (%(exc)s). Retrying...', { + 'msg_id': self.msg_id, + 'exc': exc}) + time.sleep(0.25) + def heartbeat(self): # generate a keep alive for RPC call monitoring with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: + rpc_common.PURPOSE_SEND, + ) as conn: try: self._send_reply(conn, None, None, ending=False) - except rpc_amqp.AMQPDestinationNotFound: - # internal exception that indicates queue/exchange gone - + except oslo_messaging.MessageUndeliverable: + # internal exception that indicates queue gone - # broker unreachable. - raise MessageDeliveryFailure("Heartbeat send failed") + raise MessageDeliveryFailure( + "Heartbeat send failed. Missing queue") + except rpc_amqp.AMQPDestinationNotFound: + # internal exception that indicates exchange gone - + # broker unreachable. + raise MessageDeliveryFailure( + "Heartbeat send failed. Missing exchange") # NOTE(sileht): Those have already be ack in RpcListener IO thread # We keep them as noop until all drivers do the same diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 0d7673e1a..9d99822d5 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -177,6 +177,8 @@ rabbit_opts = [ 'flag for direct send. The direct send is used as reply, ' 'so the MessageUndeliverable exception is raised ' 'in case the client queue does not exist.' + 'MessageUndeliverable exception will be used to loop for a ' + 'timeout to lets a chance to sender to recover.' 'This flag is deprecated and it will not be possible to ' 'deactivate this functionality anymore'), cfg.BoolOpt('enable_cancel_on_failover', @@ -516,6 +518,7 @@ class Connection(object): # if it was already monkey patched by eventlet/greenlet. global threading threading = stdlib_threading + self.direct_mandatory_flag = driver_conf.direct_mandatory_flag if self.ssl: diff --git a/releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml b/releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml new file mode 100644 index 000000000..0407e6238 --- /dev/null +++ b/releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Adding retry strategy based on the mandatory flag. Missing exchanges and + queues are now identified separately for logging purposes.