Adding support for rabbitmq quorum queues
https://www.rabbitmq.com/quorum-queues.html The quorum queue is a modern queue type for RabbitMQ implementing a durable, replicated FIFO queue based on the Raft consensus algorithm. It is available as of RabbitMQ 3.8.0. the quorum queues can not be set by policy so this should be done when declaring the queue. To declare a quorum queue set the x-queue-type queue argument to quorum (the default is classic). This argument must be provided by a client at queue declaration time; it cannot be set or changed using a policy. This is because policy definition or applicable policy can be changed dynamically but queue type cannot. It must be specified at the time of declaration. its good for the oslo messaging to add support for that type of queue that have multiple advantaged over mirroring. If quorum queues are sets mirrored queues will be ignored. Closes-Bug: #1942933 Change-Id: Id573e04c287e034e50626daf6e18a34735d45251
This commit is contained in:
parent
f9de265f39
commit
7e8acbf870
@ -240,6 +240,7 @@ Consuming Options
|
|||||||
^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_ha_queues`
|
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_ha_queues`
|
||||||
|
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_queue`
|
||||||
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_transient_queues_ttl`
|
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_transient_queues_ttl`
|
||||||
|
|
||||||
Connection Options
|
Connection Options
|
||||||
|
@ -32,7 +32,9 @@ from oslo_messaging._drivers import common as rpc_common
|
|||||||
amqp_opts = [
|
amqp_opts = [
|
||||||
cfg.BoolOpt('amqp_durable_queues',
|
cfg.BoolOpt('amqp_durable_queues',
|
||||||
default=False,
|
default=False,
|
||||||
help='Use durable queues in AMQP.'),
|
help='Use durable queues in AMQP. If rabbit_quorum_queue '
|
||||||
|
'is enabled, queues will be durable and this value will '
|
||||||
|
'be ignored.'),
|
||||||
cfg.BoolOpt('amqp_auto_delete',
|
cfg.BoolOpt('amqp_auto_delete',
|
||||||
default=False,
|
default=False,
|
||||||
deprecated_group='DEFAULT',
|
deprecated_group='DEFAULT',
|
||||||
|
@ -149,6 +149,17 @@ rabbit_opts = [
|
|||||||
'nodes, run: '
|
'nodes, run: '
|
||||||
"""\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """
|
"""\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """
|
||||||
"""'{"ha-mode": "all"}' \""""),
|
"""'{"ha-mode": "all"}' \""""),
|
||||||
|
cfg.BoolOpt('rabbit_quorum_queue',
|
||||||
|
default=False,
|
||||||
|
help='Use quorum queues in RabbitMQ (x-queue-type: quorum). '
|
||||||
|
'The quorum queue is a modern queue type for RabbitMQ '
|
||||||
|
'implementing a durable, replicated FIFO queue based on the '
|
||||||
|
'Raft consensus algorithm. It is available as of '
|
||||||
|
'RabbitMQ 3.8.0. If set this option will conflict with '
|
||||||
|
'the HA queues (``rabbit_ha_queues``) aka mirrored queues, '
|
||||||
|
'in other words the HA queues should be disabled, quorum '
|
||||||
|
'queues durable by default so the amqp_durable_queues '
|
||||||
|
'opion is ignored when this option enabled.'),
|
||||||
cfg.IntOpt('rabbit_transient_queues_ttl',
|
cfg.IntOpt('rabbit_transient_queues_ttl',
|
||||||
min=1,
|
min=1,
|
||||||
default=1800,
|
default=1800,
|
||||||
@ -191,7 +202,8 @@ rabbit_opts = [
|
|||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
|
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
|
||||||
|
rabbit_quorum_queue):
|
||||||
"""Construct the arguments for declaring a queue.
|
"""Construct the arguments for declaring a queue.
|
||||||
|
|
||||||
If the rabbit_ha_queues option is set, we try to declare a mirrored queue
|
If the rabbit_ha_queues option is set, we try to declare a mirrored queue
|
||||||
@ -214,12 +226,31 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
|
|||||||
Setting a queue TTL causes the queue to be automatically deleted
|
Setting a queue TTL causes the queue to be automatically deleted
|
||||||
if it is unused for the TTL duration. This is a helpful safeguard
|
if it is unused for the TTL duration. This is a helpful safeguard
|
||||||
to prevent queues with zero consumers from growing without bound.
|
to prevent queues with zero consumers from growing without bound.
|
||||||
|
|
||||||
|
If the rabbit_quorum_queue option is set, we try to declare a mirrored
|
||||||
|
queue as described here:
|
||||||
|
|
||||||
|
https://www.rabbitmq.com/quorum-queues.html
|
||||||
|
|
||||||
|
Setting x-queue-type to quorum means that replicated FIFO queue based on
|
||||||
|
the Raft consensus algorithm will be used. It is available as of
|
||||||
|
RabbitMQ 3.8.0. If set this option will conflict with
|
||||||
|
the HA queues (``rabbit_ha_queues``) aka mirrored queues,
|
||||||
|
in other words HA queues should be disabled.
|
||||||
"""
|
"""
|
||||||
args = {}
|
args = {}
|
||||||
|
|
||||||
|
if rabbit_quorum_queue and rabbit_ha_queues:
|
||||||
|
raise RuntimeError('Configuration Error: rabbit_quorum_queue '
|
||||||
|
'and rabbit_ha_queues both enabled, queue '
|
||||||
|
'type is quorum or HA (mirrored) not both')
|
||||||
|
|
||||||
if rabbit_ha_queues:
|
if rabbit_ha_queues:
|
||||||
args['x-ha-policy'] = 'all'
|
args['x-ha-policy'] = 'all'
|
||||||
|
|
||||||
|
if rabbit_quorum_queue:
|
||||||
|
args['x-queue-type'] = 'quorum'
|
||||||
|
|
||||||
if rabbit_queue_ttl > 0:
|
if rabbit_queue_ttl > 0:
|
||||||
args['x-expires'] = rabbit_queue_ttl * 1000
|
args['x-expires'] = rabbit_queue_ttl * 1000
|
||||||
|
|
||||||
@ -248,7 +279,7 @@ class Consumer(object):
|
|||||||
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
|
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
|
||||||
exchange_auto_delete, queue_auto_delete, callback,
|
exchange_auto_delete, queue_auto_delete, callback,
|
||||||
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
|
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
|
||||||
enable_cancel_on_failover=False):
|
enable_cancel_on_failover=False, rabbit_quorum_queue=False):
|
||||||
"""Init the Consumer class with the exchange_name, routing_key,
|
"""Init the Consumer class with the exchange_name, routing_key,
|
||||||
type, durable auto_delete
|
type, durable auto_delete
|
||||||
"""
|
"""
|
||||||
@ -262,7 +293,8 @@ class Consumer(object):
|
|||||||
self.type = type
|
self.type = type
|
||||||
self.nowait = nowait
|
self.nowait = nowait
|
||||||
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
|
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
|
||||||
rabbit_queue_ttl)
|
rabbit_queue_ttl,
|
||||||
|
rabbit_quorum_queue)
|
||||||
self.queue = None
|
self.queue = None
|
||||||
self._declared_on = None
|
self._declared_on = None
|
||||||
self.exchange = kombu.entity.Exchange(
|
self.exchange = kombu.entity.Exchange(
|
||||||
@ -475,6 +507,7 @@ class Connection(object):
|
|||||||
|
|
||||||
self.login_method = driver_conf.rabbit_login_method
|
self.login_method = driver_conf.rabbit_login_method
|
||||||
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
|
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
|
||||||
|
self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue
|
||||||
self.rabbit_transient_queues_ttl = \
|
self.rabbit_transient_queues_ttl = \
|
||||||
driver_conf.rabbit_transient_queues_ttl
|
driver_conf.rabbit_transient_queues_ttl
|
||||||
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
|
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
|
||||||
@ -674,6 +707,12 @@ class Connection(object):
|
|||||||
except AttributeError:
|
except AttributeError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@property
|
||||||
|
def durable(self):
|
||||||
|
# Quorum queues are durable by default, durable option should
|
||||||
|
# be enabled by default with quorum queues
|
||||||
|
return self.amqp_durable_queues or self.rabbit_quorum_queue
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def validate_ssl_version(cls, version):
|
def validate_ssl_version(cls, version):
|
||||||
key = version.lower()
|
key = version.lower()
|
||||||
@ -1163,12 +1202,13 @@ class Connection(object):
|
|||||||
queue_name=queue_name or topic,
|
queue_name=queue_name or topic,
|
||||||
routing_key=topic,
|
routing_key=topic,
|
||||||
type='topic',
|
type='topic',
|
||||||
durable=self.amqp_durable_queues,
|
durable=self.durable,
|
||||||
exchange_auto_delete=self.amqp_auto_delete,
|
exchange_auto_delete=self.amqp_auto_delete,
|
||||||
queue_auto_delete=self.amqp_auto_delete,
|
queue_auto_delete=self.amqp_auto_delete,
|
||||||
callback=callback,
|
callback=callback,
|
||||||
rabbit_ha_queues=self.rabbit_ha_queues,
|
rabbit_ha_queues=self.rabbit_ha_queues,
|
||||||
enable_cancel_on_failover=self.enable_cancel_on_failover)
|
enable_cancel_on_failover=self.enable_cancel_on_failover,
|
||||||
|
rabbit_quorum_queue=self.rabbit_quorum_queue)
|
||||||
|
|
||||||
self.declare_consumer(consumer)
|
self.declare_consumer(consumer)
|
||||||
|
|
||||||
@ -1280,7 +1320,10 @@ class Connection(object):
|
|||||||
auto_delete=exchange.auto_delete,
|
auto_delete=exchange.auto_delete,
|
||||||
name=routing_key,
|
name=routing_key,
|
||||||
routing_key=routing_key,
|
routing_key=routing_key,
|
||||||
queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0))
|
queue_arguments=_get_queue_arguments(
|
||||||
|
self.rabbit_ha_queues,
|
||||||
|
0,
|
||||||
|
self.rabbit_quorum_queue))
|
||||||
log_info = {'key': routing_key, 'exchange': exchange}
|
log_info = {'key': routing_key, 'exchange': exchange}
|
||||||
LOG.trace(
|
LOG.trace(
|
||||||
'Connection._publish_and_creates_default_queue: '
|
'Connection._publish_and_creates_default_queue: '
|
||||||
@ -1336,7 +1379,7 @@ class Connection(object):
|
|||||||
exchange = kombu.entity.Exchange(
|
exchange = kombu.entity.Exchange(
|
||||||
name=exchange_name,
|
name=exchange_name,
|
||||||
type='topic',
|
type='topic',
|
||||||
durable=self.amqp_durable_queues,
|
durable=self.durable,
|
||||||
auto_delete=self.amqp_auto_delete)
|
auto_delete=self.amqp_auto_delete)
|
||||||
|
|
||||||
self._ensure_publishing(self._publish, exchange, msg,
|
self._ensure_publishing(self._publish, exchange, msg,
|
||||||
@ -1358,7 +1401,7 @@ class Connection(object):
|
|||||||
exchange = kombu.entity.Exchange(
|
exchange = kombu.entity.Exchange(
|
||||||
name=exchange_name,
|
name=exchange_name,
|
||||||
type='topic',
|
type='topic',
|
||||||
durable=self.amqp_durable_queues,
|
durable=self.durable,
|
||||||
auto_delete=self.amqp_auto_delete)
|
auto_delete=self.amqp_auto_delete)
|
||||||
|
|
||||||
self._ensure_publishing(self._publish_and_creates_default_queue,
|
self._ensure_publishing(self._publish_and_creates_default_queue,
|
||||||
|
@ -0,0 +1,11 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Adding support for quorum queues. Quorum queues are enabled if the
|
||||||
|
``rabbit_quorum_queue`` parameter is sets (``x-queue-type: quorum``).
|
||||||
|
Setting x-queue-type to quorum means that replicated FIFO queue based on
|
||||||
|
the Raft consensus algorithm will be used. It is available as of
|
||||||
|
RabbitMQ 3.8.0. The quorum queues are durable by default
|
||||||
|
(``amqp_durable_queues``) will be ignored.
|
||||||
|
when enabled the HA queues (``rabbit_ha_queues``) aka mirrored queues
|
||||||
|
should be disabled since the queue can't be both types at the same time
|
Loading…
Reference in New Issue
Block a user