diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 5418c5897..fa932bca4 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -14,12 +14,14 @@ # under the License. import logging +import os import queue import threading import time import uuid import cachetools +from oslo_concurrency import lockutils from oslo_utils import eventletutils from oslo_utils import timeutils @@ -40,6 +42,63 @@ ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001 ACK_REQUEUE_EVERY_SECONDS_MAX = 5.0 +class QManager(object): + """Queue Manager to build queue name for reply (and fanout) type. + This class is used only when use_queue_manager is set to True in config + file. + It rely on a shared memory accross processes (reading/writing data to + /dev/shm/xyz) and oslo_concurrency.lockutils to avoid assigning the same + queue name twice (or more) to different processes. + The original idea of this queue manager was to avoid random queue names, + so on service restart, the previously created queues can be reused, + avoiding deletion/creation of queues on rabbitmq side (which cost a lot + at scale). + """ + def __init__(self, hostname, processname): + # We will use hostname and processname in queue names to help identify + # them easily. + # This is also ensuring consistency between service restart. + self.hostname = hostname + self.processname = processname + # This is where the counter is kept + self.file_name = '/dev/shm/%s_%s_qmanager' % (self.hostname, # nosec + self.processname) + # We use the process group to restart the counter on service restart + self.pg = os.getpgrp() + + def get(self): + lock_name = 'oslo_read_shm_%s_%s' % (self.hostname, self.processname) + + @lockutils.synchronized(lock_name, external=True) + def read_from_shm(): + # Grab the counter from shm + # This function is thread and process safe thanks to lockutils + try: + with open(self.file_name, 'r') as f: + pg, c = f.readline().split(':') + pg = int(pg) + c = int(c) + except (FileNotFoundError, ValueError): + pg = self.pg + c = 0 + + # Increment the counter + if pg == self.pg: + c += 1 + else: + # The process group changed, maybe service restarted? + # Start over the counter + c = 1 + + # Write the new counter + with open(self.file_name, 'w') as f: + f.write(str(self.pg) + ':' + str(c)) + return c + + c = read_from_shm() + return self.hostname + ":" + self.processname + ":" + str(c) + + class MessageOperationsHandler(object): """Queue used by message operations to ensure that all tasks are serialized and run in the same thread, since underlying drivers like kombu @@ -445,6 +504,7 @@ class ReplyWaiters(object): 'to message ID %s' % msg_id) def put(self, msg_id, message_data): + LOG.info('Received RPC response for msg %s', msg_id) queue = self._queues.get(msg_id) if not queue: LOG.info('No calling threads waiting for msg_id : %s', msg_id) @@ -597,6 +657,12 @@ class AMQPDriverBase(base.BaseDriver): self._reply_q = None self._reply_q_conn = None self._waiter = None + if conf.oslo_messaging_rabbit.use_queue_manager: + self._q_manager = QManager( + hostname=conf.oslo_messaging_rabbit.hostname, + processname=conf.oslo_messaging_rabbit.processname) + else: + self._q_manager = None def _get_exchange(self, target): return target.exchange or self._default_exchange @@ -608,10 +674,16 @@ class AMQPDriverBase(base.BaseDriver): def _get_reply_q(self): with self._reply_q_lock: + # NOTE(amorin) Re-use reply_q when it already exists + # This avoid creating too many queues on AMQP server (rabbit) if self._reply_q is not None: return self._reply_q - reply_q = 'reply_' + uuid.uuid4().hex + if self._q_manager: + reply_q = 'reply_' + self._q_manager.get() + else: + reply_q = 'reply_' + uuid.uuid4().hex + LOG.info('Creating reply queue: %s', reply_q) conn = self._get_connection(rpc_common.PURPOSE_LISTEN) @@ -628,12 +700,20 @@ class AMQPDriverBase(base.BaseDriver): envelope=True, notify=False, retry=None, transport_options=None): msg = message + if 'method' in msg: + LOG.debug('Calling RPC method %s on target %s', msg.get('method'), + target.topic) + else: + LOG.debug('Sending message to topic %s', target.topic) if wait_for_reply: + _reply_q = self._get_reply_q() msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) - msg.update({'_reply_q': self._get_reply_q()}) + msg.update({'_reply_q': _reply_q}) msg.update({'_timeout': call_monitor_timeout}) + LOG.info('Expecting reply to msg %s in queue %s', msg_id, + _reply_q) rpc_amqp._add_unique_id(msg) unique_id = msg[rpc_amqp.UNIQUE_ID] diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 200ec1e1e..2150536a3 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -241,7 +241,16 @@ rabbit_opts = [ default=False, help="Enable x-cancel-on-ha-failover flag so that " "rabbitmq server will cancel and notify consumers" - "when queue is down") + "when queue is down"), + cfg.BoolOpt('use_queue_manager', + default=False, + help='Should we use consistant queue names or random ones'), + cfg.StrOpt('hostname', + default=socket.gethostname(), + help='Hostname used by queue manager'), + cfg.StrOpt('processname', + default=os.path.basename(sys.argv[0]), + help='Process name used by queue manager'), ] LOG = logging.getLogger(__name__) @@ -665,6 +674,7 @@ class Connection(object): self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread self.ssl_enforce_fips_mode = driver_conf.ssl_enforce_fips_mode self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover + self.use_queue_manager = driver_conf.use_queue_manager if self.heartbeat_in_pthread: # NOTE(hberaud): Experimental: threading module is in use to run @@ -844,6 +854,13 @@ class Connection(object): self.connection.port = 1234 self._poll_timeout = 0.05 + if self.use_queue_manager: + self._q_manager = amqpdriver.QManager( + hostname=driver_conf.hostname, + processname=driver_conf.processname) + else: + self._q_manager = None + # FIXME(markmc): use oslo sslutils when it is available as a library _SSL_PROTOCOLS = { "tlsv1": ssl.PROTOCOL_TLSv1, @@ -1388,9 +1405,13 @@ class Connection(object): def declare_fanout_consumer(self, topic, callback): """Create a 'fanout' consumer.""" - unique = uuid.uuid4().hex + if self._q_manager: + unique = self._q_manager.get() + else: + unique = uuid.uuid4().hex exchange_name = '%s_fanout' % topic queue_name = '%s_fanout_%s' % (topic, unique) + LOG.info('Creating fanout queue: %s', queue_name) consumer = Consumer( exchange_name=exchange_name, diff --git a/releasenotes/notes/rabbit_queue_manager-363209285cbbe257.yaml b/releasenotes/notes/rabbit_queue_manager-363209285cbbe257.yaml new file mode 100644 index 000000000..0fb960a2f --- /dev/null +++ b/releasenotes/notes/rabbit_queue_manager-363209285cbbe257.yaml @@ -0,0 +1,21 @@ +--- +features: + - | + Add three new options (``use_queue_manager``, ``hostname``, + ``processname``) to switch oslo.messaging from random queue names + (for reply_q and fanouts) to consistent naming. + The default value is False, so oslo.messaging will still use random queue + names if nothing is set in configuration file of services. + When switching use_queue_manager to True, the uuid4 random string from the + queue name is replaced with a combination of hostname, processname and + counter. + The counter will be kept in shared memory (/dev/shm/x_y_qmanager). + This way, when a service using oslo.messaging restarts (e.g. neutron), + it will re-create the queues using the same name as the previous run, so + no new queues are created and no need for rabbitmq to delete the previous + queues. + This is extremely useful for operator to debug which queue belong to which + server/process. + It's also higlhy recommended to enable this feature when using quorum + queues for transient (option named ``rabbit_transient_quorum_queue``) to + avoid consuming all erlang atoms after some time.