From 8e3c523fd74257a78ceb384063f81db2e92a2ebd Mon Sep 17 00:00:00 2001 From: Arnaud Morin Date: Fri, 21 Jul 2023 16:51:51 +0200 Subject: [PATCH] Auto-delete the failed quorum rabbit queues When rabbit is failing for a specific quorum queue, the only thing to do is to delete the queue (as per rabbit doc, see [1]). So, to avoid the RPC service to be broken until an operator eventually do a manual fix on it, catch any INTERNAL ERROR (code 541) and trigger the deletion of the failed queues under those conditions. So on next queue declare (triggered from various retries), the queue will be created again and the service will recover by itself. Closes-Bug: #2028384 Related-bug: #2031497 [1] https://www.rabbitmq.com/quorum-queues.html#availability Signed-off-by: Arnaud Morin Change-Id: Ib8dba833542973091a4e0bf23bb593aca89c5905 --- oslo_messaging/_drivers/impl_rabbit.py | 34 ++++++++++++++++--- ...eleted-failed-quorum-ca6a3923c3ed999a.yaml | 9 +++++ 2 files changed, 39 insertions(+), 4 deletions(-) create mode 100644 releasenotes/notes/auto-deleted-failed-quorum-ca6a3923c3ed999a.yaml diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 70fb5427b..e6b3dbe45 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -28,7 +28,7 @@ import time from urllib import parse import uuid -from amqp import exceptions as amqp_exec +from amqp import exceptions as amqp_ex import kombu import kombu.connection import kombu.entity @@ -419,7 +419,7 @@ class Consumer(object): conn.connection_id, self.queue_name) try: self.queue.declare() - except amqp_exec.PreconditionFailed as err: + except amqp_ex.PreconditionFailed as err: # NOTE(hberaud): This kind of exception may be triggered # when a control exchange is shared between services and # when services try to create it with configs that differ @@ -457,6 +457,14 @@ class Consumer(object): 'Queue: [%(queue)s], ' 'error message: [%(err_str)s]', info) time.sleep(interval) + if self.queue_arguments.get('x-queue-type') == 'quorum': + # Before re-declare queue, try to delete it + # This is helping with issue #2028384 + # NOTE(amorin) we need to make sure the connection is + # established again, because when an error occur, the + # connection is closed. + conn.ensure_connection() + self.queue.delete() self.queue.declare() else: raise @@ -499,6 +507,24 @@ class Consumer(object): nowait=self.nowait) else: raise + except amqp_ex.InternalError as exc: + if self.queue_arguments.get('x-queue-type') == 'quorum': + # Before re-consume queue, try to delete it + # This is helping with issue #2028384 + if exc.code == 541: + LOG.warning('Queue %s seems broken, will try delete it ' + 'before starting over.', self.queue.name) + # NOTE(amorin) we need to make sure the connection is + # established again, because when an error occur, the + # connection is closed. + conn.ensure_connection() + self.queue.delete() + self.declare(conn) + self.queue.consume(callback=self._callback, + consumer_tag=str(tag), + nowait=self.nowait) + else: + raise def cancel(self, tag): LOG.trace('ConsumerBase.cancel: canceling %s', tag) @@ -1208,7 +1234,7 @@ class Connection(object): ConnectionRefusedError, OSError, kombu.exceptions.OperationalError, - amqp_exec.ConnectionForced) as exc: + amqp_ex.ConnectionForced) as exc: LOG.info("A recoverable connection/channel error " "occurred, trying to reconnect: %s", exc) self.ensure_connection() @@ -1410,7 +1436,7 @@ class Connection(object): if not (exchange.passive or exchange.name in self._declared_exchanges): try: exchange(self.channel).declare() - except amqp_exec.PreconditionFailed as err: + except amqp_ex.PreconditionFailed as err: # NOTE(hberaud): This kind of exception may be triggered # when a control exchange is shared between services and # when services try to create it with configs that differ diff --git a/releasenotes/notes/auto-deleted-failed-quorum-ca6a3923c3ed999a.yaml b/releasenotes/notes/auto-deleted-failed-quorum-ca6a3923c3ed999a.yaml new file mode 100644 index 000000000..73c96df17 --- /dev/null +++ b/releasenotes/notes/auto-deleted-failed-quorum-ca6a3923c3ed999a.yaml @@ -0,0 +1,9 @@ +--- +fixes: + - | + Auto-delete the failed quorum rabbit queues. + When rabbit is failing for a specific quorum queue, delete the queue + before trying to recreate it. + This may happen if the queue is not recoverable on rabbit side. + See https://www.rabbitmq.com/quorum-queues.html#availability for more + info on this specific case.