From 989dbb8aad8be68a9c63e2e6a4d445cc445c051c Mon Sep 17 00:00:00 2001 From: Arnaud Morin Date: Thu, 13 Apr 2023 17:53:18 +0200 Subject: [PATCH] Enable use of quorum queues for transient messages Add a new flag rabbit_transient_quorum_queue to enable the use of quorum for transient queues (reply_ and _fanout_) This is helping a lot OpenStack services to not fail (and recover) from a rabbit node issue. Related-bug: #2031497 Signed-off-by: Arnaud Morin Change-Id: Icee5ee6938ca7c9651f281fb835708fc88b8464f --- oslo_messaging/_drivers/impl_rabbit.py | 45 ++++++++++++------- ...bit_transient_quorum-fc3c3f88ead90034.yaml | 31 +++++++++++++ 2 files changed, 61 insertions(+), 15 deletions(-) create mode 100644 releasenotes/notes/rabbit_transient_quorum-fc3c3f88ead90034.yaml diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index e6b3dbe45..200ec1e1e 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -165,9 +165,16 @@ rabbit_opts = [ 'Raft consensus algorithm. It is available as of ' 'RabbitMQ 3.8.0. If set this option will conflict with ' 'the HA queues (``rabbit_ha_queues``) aka mirrored queues, ' - 'in other words the HA queues should be disabled, quorum ' - 'queues durable by default so the amqp_durable_queues ' - 'opion is ignored when this option enabled.'), + 'in other words the HA queues should be disabled. ' + 'Quorum queues are also durable by default so the ' + 'amqp_durable_queues option is ignored when this option is ' + 'enabled.'), + cfg.BoolOpt('rabbit_transient_quorum_queue', + default=False, + help='Use quorum queues for transients queues in RabbitMQ. ' + 'Enabling this option will then make sure those queues are ' + 'also using quorum kind of rabbit queues, which are HA by ' + 'default.'), cfg.IntOpt('rabbit_quorum_delivery_limit', default=0, help='Each time a message is redelivered to a consumer, ' @@ -639,6 +646,8 @@ class Connection(object): self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue self.rabbit_quorum_queue_config = self._get_quorum_configurations( driver_conf) + self.rabbit_transient_quorum_queue = \ + driver_conf.rabbit_transient_quorum_queue self.rabbit_transient_queues_ttl = \ driver_conf.rabbit_transient_queues_ttl self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count @@ -1351,7 +1360,9 @@ class Connection(object): callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, rabbit_queue_ttl=self.rabbit_transient_queues_ttl, - enable_cancel_on_failover=self.enable_cancel_on_failover) + enable_cancel_on_failover=self.enable_cancel_on_failover, + rabbit_quorum_queue=self.rabbit_transient_quorum_queue, + rabbit_quorum_queue_config=self.rabbit_quorum_queue_config) self.declare_consumer(consumer) @@ -1386,13 +1397,15 @@ class Connection(object): queue_name=queue_name, routing_key=topic, type='fanout', - durable=False, + durable=self.rabbit_transient_quorum_queue, exchange_auto_delete=True, queue_auto_delete=False, callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, rabbit_queue_ttl=self.rabbit_transient_queues_ttl, - enable_cancel_on_failover=self.enable_cancel_on_failover) + enable_cancel_on_failover=self.enable_cancel_on_failover, + rabbit_quorum_queue=self.rabbit_transient_quorum_queue, + rabbit_quorum_queue_config=self.rabbit_quorum_queue_config) self.declare_consumer(consumer) @@ -1538,11 +1551,12 @@ class Connection(object): def direct_send(self, msg_id, msg): """Send a 'direct' message.""" - exchange = kombu.entity.Exchange(name='', # using default exchange - type='direct', - durable=False, - auto_delete=True, - passive=True) + exchange = kombu.entity.Exchange( + name='', # using default exchange + type='direct', + durable=self.rabbit_transient_quorum_queue, + auto_delete=True, + passive=True) options = oslo_messaging.TransportOptions( at_least_once=self.direct_mandatory_flag) @@ -1569,10 +1583,11 @@ class Connection(object): def fanout_send(self, topic, msg, retry=None): """Send a 'fanout' message.""" - exchange = kombu.entity.Exchange(name='%s_fanout' % topic, - type='fanout', - durable=False, - auto_delete=True) + exchange = kombu.entity.Exchange( + name='%s_fanout' % topic, + type='fanout', + durable=self.rabbit_transient_quorum_queue, + auto_delete=True) LOG.debug('Sending fanout to %s_fanout', topic) self._ensure_publishing(self._publish, exchange, msg, retry=retry) diff --git a/releasenotes/notes/rabbit_transient_quorum-fc3c3f88ead90034.yaml b/releasenotes/notes/rabbit_transient_quorum-fc3c3f88ead90034.yaml new file mode 100644 index 000000000..25d323a0d --- /dev/null +++ b/releasenotes/notes/rabbit_transient_quorum-fc3c3f88ead90034.yaml @@ -0,0 +1,31 @@ +--- +features: + - | + Add an option to enable transient queues to use quorum. + + Transient queues in OpenStack are not so transient, they live the whole + process lifetime (e.g. until you restart a service, like nova-compute). + Transient here means they belong to a specific process, compared to + regular queues which may be used by more processes. + Usually, transients queues are the "fanout" and "reply" queues. + + By default, without any rabbitmq policy tuning, they are not durable + neither highly available. + + By enabling quorum for transients, oslo.messaging will declare quorum + queues instead of classic on rabbitmq. As a result, those queues will + automatically become HA and durable. + Note that this may have an impact on your cluster, as rabbit will need + more cpu, ram and network bandwith to manage the queues. This was tested + at pretty large scale (2k hypervisors) with a cluster of 5 nodes. + + Also note that the current rabbitmq implementation rely on a fixed number + of "erlang atom" (5M by default), and one atom is consumed each time a + quorum queue is created with a different name. If your deployment is doing + a lot of queue deletion/creation, you may consume all your atoms quicker. + + When enabling quorum for transients, you may also want to update your + rabbitmq policies accordingly (e.g. make sure they apply on quorum). + + This option will stay disabled by default for now but may become the + default in the future.