Add quorum queue control configurations

the quorum queue type add features that did not exist before or not
handled in rabbitmq the following link shows some of them
https://blog.rabbitmq.com/posts/2020/04/rabbitmq-gets-an-ha-upgrade/

the options below control the quorum queue and ensure the stability of
the quorum system
x-max-in-memory-length
x-max-in-memory-bytes
x-delivery-limit

which control the memory usage and handle message poisoning

Closes-Bug: #1962348
Change-Id: I570227d6102681f4f9d8813ed0d7693a1160c21d
This commit is contained in:
hamza alqtaishat 2022-02-25 22:08:51 +00:00
parent 2d090b5d6b
commit 8932ad237b
3 changed files with 80 additions and 7 deletions

View File

@ -241,6 +241,9 @@ Consuming Options
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_ha_queues` - :oslo.config:option:`oslo_messaging_rabbit.rabbit_ha_queues`
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_queue` - :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_queue`
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_delivery_limit`
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quroum_max_memory_length`
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quroum_max_memory_bytes`
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_transient_queues_ttl` - :oslo.config:option:`oslo_messaging_rabbit.rabbit_transient_queues_ttl`
Connection Options Connection Options

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import collections
import contextlib import contextlib
import errno import errno
import functools import functools
@ -45,6 +46,13 @@ from oslo_messaging._drivers import pool
from oslo_messaging import _utils from oslo_messaging import _utils
from oslo_messaging import exceptions from oslo_messaging import exceptions
# The QuorumMemConfig will hold the quorum queue memory configurations
QuorumMemConfig = collections.namedtuple('QuorumMemConfig',
'delivery_limit'
' max_memory_length'
' max_memory_bytes')
# NOTE(sileht): don't exist in py2 socket module # NOTE(sileht): don't exist in py2 socket module
TCP_USER_TIMEOUT = 18 TCP_USER_TIMEOUT = 18
@ -147,6 +155,30 @@ rabbit_opts = [
'in other words the HA queues should be disabled, quorum ' 'in other words the HA queues should be disabled, quorum '
'queues durable by default so the amqp_durable_queues ' 'queues durable by default so the amqp_durable_queues '
'opion is ignored when this option enabled.'), 'opion is ignored when this option enabled.'),
cfg.IntOpt('rabbit_quorum_delivery_limit',
default=0,
help='Each time a message is redelivered to a consumer, '
'a counter is incremented. Once the redelivery count '
'exceeds the delivery limit the message gets dropped '
'or dead-lettered (if a DLX exchange has been configured) '
'Used only when rabbit_quorum_queue is enabled, '
'Default 0 which means dont set a limit.'),
cfg.IntOpt('rabbit_quroum_max_memory_length',
default=0,
help='By default all messages are maintained in memory '
'if a quorum queue grows in length it can put memory '
'pressure on a cluster. This option can limit the number '
'of messages in the quorum queue. '
'Used only when rabbit_quorum_queue is enabled, '
'Default 0 which means dont set a limit.'),
cfg.IntOpt('rabbit_quroum_max_memory_bytes',
default=0,
help='By default all messages are maintained in memory '
'if a quorum queue grows in length it can put memory '
'pressure on a cluster. This option can limit the number '
'of memory bytes used by the quorum queue. '
'Used only when rabbit_quorum_queue is enabled, '
'Default 0 which means dont set a limit.'),
cfg.IntOpt('rabbit_transient_queues_ttl', cfg.IntOpt('rabbit_transient_queues_ttl',
min=1, min=1,
default=1800, default=1800,
@ -190,7 +222,8 @@ LOG = logging.getLogger(__name__)
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
rabbit_quorum_queue): rabbit_quorum_queue,
rabbit_quorum_queue_config):
"""Construct the arguments for declaring a queue. """Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we try to declare a mirrored queue If the rabbit_ha_queues option is set, we try to declare a mirrored queue
@ -224,6 +257,14 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
RabbitMQ 3.8.0. If set this option will conflict with RabbitMQ 3.8.0. If set this option will conflict with
the HA queues (``rabbit_ha_queues``) aka mirrored queues, the HA queues (``rabbit_ha_queues``) aka mirrored queues,
in other words HA queues should be disabled. in other words HA queues should be disabled.
rabbit_quorum_queue_config:
Quorum queues provides three options to handle message poisoning
and limit the resources the qourum queue can use
x-delivery-limit number of times the queue will try to deliver
a message before it decide to discard it
x-max-in-memory-length, x-max-in-memory-bytes control the size
of memory used by quorum queue
""" """
args = {} args = {}
@ -237,6 +278,15 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
if rabbit_quorum_queue: if rabbit_quorum_queue:
args['x-queue-type'] = 'quorum' args['x-queue-type'] = 'quorum'
if rabbit_quorum_queue_config.delivery_limit:
args['x-delivery-limit'] = \
rabbit_quorum_queue_config.delivery_limit
if rabbit_quorum_queue_config.max_memory_length:
args['x-max-in-memory-length'] = \
rabbit_quorum_queue_config.max_memory_length
if rabbit_quorum_queue_config.max_memory_bytes:
args['x-max-in-memory-bytes'] = \
rabbit_quorum_queue_config.max_memory_bytes
if rabbit_queue_ttl > 0: if rabbit_queue_ttl > 0:
args['x-expires'] = rabbit_queue_ttl * 1000 args['x-expires'] = rabbit_queue_ttl * 1000
@ -266,7 +316,8 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable, def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback, exchange_auto_delete, queue_auto_delete, callback,
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
enable_cancel_on_failover=False, rabbit_quorum_queue=False): enable_cancel_on_failover=False, rabbit_quorum_queue=False,
rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)):
"""Init the Consumer class with the exchange_name, routing_key, """Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete type, durable auto_delete
""" """
@ -279,9 +330,10 @@ class Consumer(object):
self.callback = callback self.callback = callback
self.type = type self.type = type
self.nowait = nowait self.nowait = nowait
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues, rabbit_quorum_queue_config = rabbit_quorum_queue_config or {}
rabbit_queue_ttl, self.queue_arguments = _get_queue_arguments(
rabbit_quorum_queue) rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue,
rabbit_quorum_queue_config)
self.queue = None self.queue = None
self._declared_on = None self._declared_on = None
self.exchange = kombu.entity.Exchange( self.exchange = kombu.entity.Exchange(
@ -496,6 +548,8 @@ class Connection(object):
self.login_method = driver_conf.rabbit_login_method self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue
self.rabbit_quorum_queue_config = self._get_quorum_configurations(
driver_conf)
self.rabbit_transient_queues_ttl = \ self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
@ -709,6 +763,14 @@ class Connection(object):
except KeyError: except KeyError:
raise RuntimeError("Invalid SSL version : %s" % version) raise RuntimeError("Invalid SSL version : %s" % version)
def _get_quorum_configurations(self, driver_conf):
"""Get the quorum queue configurations"""
delivery_limit = driver_conf.rabbit_quorum_delivery_limit
max_memory_length = driver_conf.rabbit_quroum_max_memory_length
max_memory_bytes = driver_conf.rabbit_quroum_max_memory_bytes
return QuorumMemConfig(delivery_limit, max_memory_length,
max_memory_bytes)
# NOTE(moguimar): default_password in this function's context is just # NOTE(moguimar): default_password in this function's context is just
# a fallback option, not a hardcoded password. # a fallback option, not a hardcoded password.
def _transform_transport_url(self, url, host, default_username='', # nosec def _transform_transport_url(self, url, host, default_username='', # nosec
@ -1202,7 +1264,8 @@ class Connection(object):
callback=callback, callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues, rabbit_ha_queues=self.rabbit_ha_queues,
enable_cancel_on_failover=self.enable_cancel_on_failover, enable_cancel_on_failover=self.enable_cancel_on_failover,
rabbit_quorum_queue=self.rabbit_quorum_queue) rabbit_quorum_queue=self.rabbit_quorum_queue,
rabbit_quorum_queue_config=self.rabbit_quorum_queue_config)
self.declare_consumer(consumer) self.declare_consumer(consumer)
@ -1317,7 +1380,8 @@ class Connection(object):
queue_arguments=_get_queue_arguments( queue_arguments=_get_queue_arguments(
self.rabbit_ha_queues, self.rabbit_ha_queues,
0, 0,
self.rabbit_quorum_queue)) self.rabbit_quorum_queue,
self.rabbit_quorum_queue_config))
log_info = {'key': routing_key, 'exchange': exchange} log_info = {'key': routing_key, 'exchange': exchange}
LOG.trace( LOG.trace(
'Connection._publish_and_creates_default_queue: ' 'Connection._publish_and_creates_default_queue: '

View File

@ -0,0 +1,6 @@
---
features:
- |
Add quorum configuration x-max-in-memory-length,
x-max-in-memory-bytes, x-delivery-limit which control the quorum
queue memory usage and handle the message poisoning problem