Enable use of quorum queues for transient messages

Add a new flag rabbit_transient_quorum_queue to enable the use of quorum
for transient queues (reply_ and _fanout_)

This is helping a lot OpenStack services to not fail (and recover) from
a rabbit node issue.

Related-bug: #2031497

Signed-off-by: Arnaud Morin <arnaud.morin@ovhcloud.com>
Change-Id: Icee5ee6938ca7c9651f281fb835708fc88b8464f
This commit is contained in:
Arnaud Morin 2023-04-13 17:53:18 +02:00
parent 8e3c523fd7
commit 989dbb8aad
2 changed files with 61 additions and 15 deletions

View File

@ -165,9 +165,16 @@ rabbit_opts = [
'Raft consensus algorithm. It is available as of ' 'Raft consensus algorithm. It is available as of '
'RabbitMQ 3.8.0. If set this option will conflict with ' 'RabbitMQ 3.8.0. If set this option will conflict with '
'the HA queues (``rabbit_ha_queues``) aka mirrored queues, ' 'the HA queues (``rabbit_ha_queues``) aka mirrored queues, '
'in other words the HA queues should be disabled, quorum ' 'in other words the HA queues should be disabled. '
'queues durable by default so the amqp_durable_queues ' 'Quorum queues are also durable by default so the '
'opion is ignored when this option enabled.'), 'amqp_durable_queues option is ignored when this option is '
'enabled.'),
cfg.BoolOpt('rabbit_transient_quorum_queue',
default=False,
help='Use quorum queues for transients queues in RabbitMQ. '
'Enabling this option will then make sure those queues are '
'also using quorum kind of rabbit queues, which are HA by '
'default.'),
cfg.IntOpt('rabbit_quorum_delivery_limit', cfg.IntOpt('rabbit_quorum_delivery_limit',
default=0, default=0,
help='Each time a message is redelivered to a consumer, ' help='Each time a message is redelivered to a consumer, '
@ -639,6 +646,8 @@ class Connection(object):
self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue
self.rabbit_quorum_queue_config = self._get_quorum_configurations( self.rabbit_quorum_queue_config = self._get_quorum_configurations(
driver_conf) driver_conf)
self.rabbit_transient_quorum_queue = \
driver_conf.rabbit_transient_quorum_queue
self.rabbit_transient_queues_ttl = \ self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
@ -1351,7 +1360,9 @@ class Connection(object):
callback=callback, callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues, rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl, rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
enable_cancel_on_failover=self.enable_cancel_on_failover) enable_cancel_on_failover=self.enable_cancel_on_failover,
rabbit_quorum_queue=self.rabbit_transient_quorum_queue,
rabbit_quorum_queue_config=self.rabbit_quorum_queue_config)
self.declare_consumer(consumer) self.declare_consumer(consumer)
@ -1386,13 +1397,15 @@ class Connection(object):
queue_name=queue_name, queue_name=queue_name,
routing_key=topic, routing_key=topic,
type='fanout', type='fanout',
durable=False, durable=self.rabbit_transient_quorum_queue,
exchange_auto_delete=True, exchange_auto_delete=True,
queue_auto_delete=False, queue_auto_delete=False,
callback=callback, callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues, rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl, rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
enable_cancel_on_failover=self.enable_cancel_on_failover) enable_cancel_on_failover=self.enable_cancel_on_failover,
rabbit_quorum_queue=self.rabbit_transient_quorum_queue,
rabbit_quorum_queue_config=self.rabbit_quorum_queue_config)
self.declare_consumer(consumer) self.declare_consumer(consumer)
@ -1538,11 +1551,12 @@ class Connection(object):
def direct_send(self, msg_id, msg): def direct_send(self, msg_id, msg):
"""Send a 'direct' message.""" """Send a 'direct' message."""
exchange = kombu.entity.Exchange(name='', # using default exchange exchange = kombu.entity.Exchange(
type='direct', name='', # using default exchange
durable=False, type='direct',
auto_delete=True, durable=self.rabbit_transient_quorum_queue,
passive=True) auto_delete=True,
passive=True)
options = oslo_messaging.TransportOptions( options = oslo_messaging.TransportOptions(
at_least_once=self.direct_mandatory_flag) at_least_once=self.direct_mandatory_flag)
@ -1569,10 +1583,11 @@ class Connection(object):
def fanout_send(self, topic, msg, retry=None): def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message.""" """Send a 'fanout' message."""
exchange = kombu.entity.Exchange(name='%s_fanout' % topic, exchange = kombu.entity.Exchange(
type='fanout', name='%s_fanout' % topic,
durable=False, type='fanout',
auto_delete=True) durable=self.rabbit_transient_quorum_queue,
auto_delete=True)
LOG.debug('Sending fanout to %s_fanout', topic) LOG.debug('Sending fanout to %s_fanout', topic)
self._ensure_publishing(self._publish, exchange, msg, retry=retry) self._ensure_publishing(self._publish, exchange, msg, retry=retry)

View File

@ -0,0 +1,31 @@
---
features:
- |
Add an option to enable transient queues to use quorum.
Transient queues in OpenStack are not so transient, they live the whole
process lifetime (e.g. until you restart a service, like nova-compute).
Transient here means they belong to a specific process, compared to
regular queues which may be used by more processes.
Usually, transients queues are the "fanout" and "reply" queues.
By default, without any rabbitmq policy tuning, they are not durable
neither highly available.
By enabling quorum for transients, oslo.messaging will declare quorum
queues instead of classic on rabbitmq. As a result, those queues will
automatically become HA and durable.
Note that this may have an impact on your cluster, as rabbit will need
more cpu, ram and network bandwith to manage the queues. This was tested
at pretty large scale (2k hypervisors) with a cluster of 5 nodes.
Also note that the current rabbitmq implementation rely on a fixed number
of "erlang atom" (5M by default), and one atom is consumed each time a
quorum queue is created with a different name. If your deployment is doing
a lot of queue deletion/creation, you may consume all your atoms quicker.
When enabling quorum for transients, you may also want to update your
rabbitmq policies accordingly (e.g. make sure they apply on quorum).
This option will stay disabled by default for now but may become the
default in the future.