Merge "Adding support for rabbitmq quorum queues"
This commit is contained in:
commit
2d090b5d6b
@ -240,6 +240,7 @@ Consuming Options
|
||||
^^^^^^^^^^^^^^^^^
|
||||
|
||||
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_ha_queues`
|
||||
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_queue`
|
||||
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_transient_queues_ttl`
|
||||
|
||||
Connection Options
|
||||
|
@ -32,7 +32,9 @@ from oslo_messaging._drivers import common as rpc_common
|
||||
amqp_opts = [
|
||||
cfg.BoolOpt('amqp_durable_queues',
|
||||
default=False,
|
||||
help='Use durable queues in AMQP.'),
|
||||
help='Use durable queues in AMQP. If rabbit_quorum_queue '
|
||||
'is enabled, queues will be durable and this value will '
|
||||
'be ignored.'),
|
||||
cfg.BoolOpt('amqp_auto_delete',
|
||||
default=False,
|
||||
deprecated_group='DEFAULT',
|
||||
|
@ -136,6 +136,17 @@ rabbit_opts = [
|
||||
'nodes, run: '
|
||||
"""\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """
|
||||
"""'{"ha-mode": "all"}' \""""),
|
||||
cfg.BoolOpt('rabbit_quorum_queue',
|
||||
default=False,
|
||||
help='Use quorum queues in RabbitMQ (x-queue-type: quorum). '
|
||||
'The quorum queue is a modern queue type for RabbitMQ '
|
||||
'implementing a durable, replicated FIFO queue based on the '
|
||||
'Raft consensus algorithm. It is available as of '
|
||||
'RabbitMQ 3.8.0. If set this option will conflict with '
|
||||
'the HA queues (``rabbit_ha_queues``) aka mirrored queues, '
|
||||
'in other words the HA queues should be disabled, quorum '
|
||||
'queues durable by default so the amqp_durable_queues '
|
||||
'opion is ignored when this option enabled.'),
|
||||
cfg.IntOpt('rabbit_transient_queues_ttl',
|
||||
min=1,
|
||||
default=1800,
|
||||
@ -178,7 +189,8 @@ rabbit_opts = [
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
|
||||
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
|
||||
rabbit_quorum_queue):
|
||||
"""Construct the arguments for declaring a queue.
|
||||
|
||||
If the rabbit_ha_queues option is set, we try to declare a mirrored queue
|
||||
@ -201,12 +213,31 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
|
||||
Setting a queue TTL causes the queue to be automatically deleted
|
||||
if it is unused for the TTL duration. This is a helpful safeguard
|
||||
to prevent queues with zero consumers from growing without bound.
|
||||
|
||||
If the rabbit_quorum_queue option is set, we try to declare a mirrored
|
||||
queue as described here:
|
||||
|
||||
https://www.rabbitmq.com/quorum-queues.html
|
||||
|
||||
Setting x-queue-type to quorum means that replicated FIFO queue based on
|
||||
the Raft consensus algorithm will be used. It is available as of
|
||||
RabbitMQ 3.8.0. If set this option will conflict with
|
||||
the HA queues (``rabbit_ha_queues``) aka mirrored queues,
|
||||
in other words HA queues should be disabled.
|
||||
"""
|
||||
args = {}
|
||||
|
||||
if rabbit_quorum_queue and rabbit_ha_queues:
|
||||
raise RuntimeError('Configuration Error: rabbit_quorum_queue '
|
||||
'and rabbit_ha_queues both enabled, queue '
|
||||
'type is quorum or HA (mirrored) not both')
|
||||
|
||||
if rabbit_ha_queues:
|
||||
args['x-ha-policy'] = 'all'
|
||||
|
||||
if rabbit_quorum_queue:
|
||||
args['x-queue-type'] = 'quorum'
|
||||
|
||||
if rabbit_queue_ttl > 0:
|
||||
args['x-expires'] = rabbit_queue_ttl * 1000
|
||||
|
||||
@ -235,7 +266,7 @@ class Consumer(object):
|
||||
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
|
||||
exchange_auto_delete, queue_auto_delete, callback,
|
||||
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
|
||||
enable_cancel_on_failover=False):
|
||||
enable_cancel_on_failover=False, rabbit_quorum_queue=False):
|
||||
"""Init the Consumer class with the exchange_name, routing_key,
|
||||
type, durable auto_delete
|
||||
"""
|
||||
@ -249,7 +280,8 @@ class Consumer(object):
|
||||
self.type = type
|
||||
self.nowait = nowait
|
||||
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
|
||||
rabbit_queue_ttl)
|
||||
rabbit_queue_ttl,
|
||||
rabbit_quorum_queue)
|
||||
self.queue = None
|
||||
self._declared_on = None
|
||||
self.exchange = kombu.entity.Exchange(
|
||||
@ -463,6 +495,7 @@ class Connection(object):
|
||||
|
||||
self.login_method = driver_conf.rabbit_login_method
|
||||
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
|
||||
self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue
|
||||
self.rabbit_transient_queues_ttl = \
|
||||
driver_conf.rabbit_transient_queues_ttl
|
||||
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
|
||||
@ -662,6 +695,12 @@ class Connection(object):
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
@property
|
||||
def durable(self):
|
||||
# Quorum queues are durable by default, durable option should
|
||||
# be enabled by default with quorum queues
|
||||
return self.amqp_durable_queues or self.rabbit_quorum_queue
|
||||
|
||||
@classmethod
|
||||
def validate_ssl_version(cls, version):
|
||||
key = version.lower()
|
||||
@ -1157,12 +1196,13 @@ class Connection(object):
|
||||
queue_name=queue_name or topic,
|
||||
routing_key=topic,
|
||||
type='topic',
|
||||
durable=self.amqp_durable_queues,
|
||||
durable=self.durable,
|
||||
exchange_auto_delete=self.amqp_auto_delete,
|
||||
queue_auto_delete=self.amqp_auto_delete,
|
||||
callback=callback,
|
||||
rabbit_ha_queues=self.rabbit_ha_queues,
|
||||
enable_cancel_on_failover=self.enable_cancel_on_failover)
|
||||
enable_cancel_on_failover=self.enable_cancel_on_failover,
|
||||
rabbit_quorum_queue=self.rabbit_quorum_queue)
|
||||
|
||||
self.declare_consumer(consumer)
|
||||
|
||||
@ -1274,7 +1314,10 @@ class Connection(object):
|
||||
auto_delete=exchange.auto_delete,
|
||||
name=routing_key,
|
||||
routing_key=routing_key,
|
||||
queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0))
|
||||
queue_arguments=_get_queue_arguments(
|
||||
self.rabbit_ha_queues,
|
||||
0,
|
||||
self.rabbit_quorum_queue))
|
||||
log_info = {'key': routing_key, 'exchange': exchange}
|
||||
LOG.trace(
|
||||
'Connection._publish_and_creates_default_queue: '
|
||||
@ -1330,7 +1373,7 @@ class Connection(object):
|
||||
exchange = kombu.entity.Exchange(
|
||||
name=exchange_name,
|
||||
type='topic',
|
||||
durable=self.amqp_durable_queues,
|
||||
durable=self.durable,
|
||||
auto_delete=self.amqp_auto_delete)
|
||||
|
||||
self._ensure_publishing(self._publish, exchange, msg,
|
||||
@ -1352,7 +1395,7 @@ class Connection(object):
|
||||
exchange = kombu.entity.Exchange(
|
||||
name=exchange_name,
|
||||
type='topic',
|
||||
durable=self.amqp_durable_queues,
|
||||
durable=self.durable,
|
||||
auto_delete=self.amqp_auto_delete)
|
||||
|
||||
self._ensure_publishing(self._publish_and_creates_default_queue,
|
||||
|
@ -0,0 +1,11 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Adding support for quorum queues. Quorum queues are enabled if the
|
||||
``rabbit_quorum_queue`` parameter is sets (``x-queue-type: quorum``).
|
||||
Setting x-queue-type to quorum means that replicated FIFO queue based on
|
||||
the Raft consensus algorithm will be used. It is available as of
|
||||
RabbitMQ 3.8.0. The quorum queues are durable by default
|
||||
(``amqp_durable_queues``) will be ignored.
|
||||
when enabled the HA queues (``rabbit_ha_queues``) aka mirrored queues
|
||||
should be disabled since the queue can't be both types at the same time
|
Loading…
Reference in New Issue
Block a user