Merge "Add QManager to amqp driver"
This commit is contained in:
commit
6ad1ccf89c
@ -14,12 +14,14 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import queue
|
import queue
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
import cachetools
|
import cachetools
|
||||||
|
from oslo_concurrency import lockutils
|
||||||
from oslo_utils import eventletutils
|
from oslo_utils import eventletutils
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
|
|
||||||
@ -40,6 +42,63 @@ ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001
|
|||||||
ACK_REQUEUE_EVERY_SECONDS_MAX = 5.0
|
ACK_REQUEUE_EVERY_SECONDS_MAX = 5.0
|
||||||
|
|
||||||
|
|
||||||
|
class QManager(object):
|
||||||
|
"""Queue Manager to build queue name for reply (and fanout) type.
|
||||||
|
This class is used only when use_queue_manager is set to True in config
|
||||||
|
file.
|
||||||
|
It rely on a shared memory accross processes (reading/writing data to
|
||||||
|
/dev/shm/xyz) and oslo_concurrency.lockutils to avoid assigning the same
|
||||||
|
queue name twice (or more) to different processes.
|
||||||
|
The original idea of this queue manager was to avoid random queue names,
|
||||||
|
so on service restart, the previously created queues can be reused,
|
||||||
|
avoiding deletion/creation of queues on rabbitmq side (which cost a lot
|
||||||
|
at scale).
|
||||||
|
"""
|
||||||
|
def __init__(self, hostname, processname):
|
||||||
|
# We will use hostname and processname in queue names to help identify
|
||||||
|
# them easily.
|
||||||
|
# This is also ensuring consistency between service restart.
|
||||||
|
self.hostname = hostname
|
||||||
|
self.processname = processname
|
||||||
|
# This is where the counter is kept
|
||||||
|
self.file_name = '/dev/shm/%s_%s_qmanager' % (self.hostname, # nosec
|
||||||
|
self.processname)
|
||||||
|
# We use the process group to restart the counter on service restart
|
||||||
|
self.pg = os.getpgrp()
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
lock_name = 'oslo_read_shm_%s_%s' % (self.hostname, self.processname)
|
||||||
|
|
||||||
|
@lockutils.synchronized(lock_name, external=True)
|
||||||
|
def read_from_shm():
|
||||||
|
# Grab the counter from shm
|
||||||
|
# This function is thread and process safe thanks to lockutils
|
||||||
|
try:
|
||||||
|
with open(self.file_name, 'r') as f:
|
||||||
|
pg, c = f.readline().split(':')
|
||||||
|
pg = int(pg)
|
||||||
|
c = int(c)
|
||||||
|
except (FileNotFoundError, ValueError):
|
||||||
|
pg = self.pg
|
||||||
|
c = 0
|
||||||
|
|
||||||
|
# Increment the counter
|
||||||
|
if pg == self.pg:
|
||||||
|
c += 1
|
||||||
|
else:
|
||||||
|
# The process group changed, maybe service restarted?
|
||||||
|
# Start over the counter
|
||||||
|
c = 1
|
||||||
|
|
||||||
|
# Write the new counter
|
||||||
|
with open(self.file_name, 'w') as f:
|
||||||
|
f.write(str(self.pg) + ':' + str(c))
|
||||||
|
return c
|
||||||
|
|
||||||
|
c = read_from_shm()
|
||||||
|
return self.hostname + ":" + self.processname + ":" + str(c)
|
||||||
|
|
||||||
|
|
||||||
class MessageOperationsHandler(object):
|
class MessageOperationsHandler(object):
|
||||||
"""Queue used by message operations to ensure that all tasks are
|
"""Queue used by message operations to ensure that all tasks are
|
||||||
serialized and run in the same thread, since underlying drivers like kombu
|
serialized and run in the same thread, since underlying drivers like kombu
|
||||||
@ -445,6 +504,7 @@ class ReplyWaiters(object):
|
|||||||
'to message ID %s' % msg_id)
|
'to message ID %s' % msg_id)
|
||||||
|
|
||||||
def put(self, msg_id, message_data):
|
def put(self, msg_id, message_data):
|
||||||
|
LOG.info('Received RPC response for msg %s', msg_id)
|
||||||
queue = self._queues.get(msg_id)
|
queue = self._queues.get(msg_id)
|
||||||
if not queue:
|
if not queue:
|
||||||
LOG.info('No calling threads waiting for msg_id : %s', msg_id)
|
LOG.info('No calling threads waiting for msg_id : %s', msg_id)
|
||||||
@ -597,6 +657,12 @@ class AMQPDriverBase(base.BaseDriver):
|
|||||||
self._reply_q = None
|
self._reply_q = None
|
||||||
self._reply_q_conn = None
|
self._reply_q_conn = None
|
||||||
self._waiter = None
|
self._waiter = None
|
||||||
|
if conf.oslo_messaging_rabbit.use_queue_manager:
|
||||||
|
self._q_manager = QManager(
|
||||||
|
hostname=conf.oslo_messaging_rabbit.hostname,
|
||||||
|
processname=conf.oslo_messaging_rabbit.processname)
|
||||||
|
else:
|
||||||
|
self._q_manager = None
|
||||||
|
|
||||||
def _get_exchange(self, target):
|
def _get_exchange(self, target):
|
||||||
return target.exchange or self._default_exchange
|
return target.exchange or self._default_exchange
|
||||||
@ -608,10 +674,16 @@ class AMQPDriverBase(base.BaseDriver):
|
|||||||
|
|
||||||
def _get_reply_q(self):
|
def _get_reply_q(self):
|
||||||
with self._reply_q_lock:
|
with self._reply_q_lock:
|
||||||
|
# NOTE(amorin) Re-use reply_q when it already exists
|
||||||
|
# This avoid creating too many queues on AMQP server (rabbit)
|
||||||
if self._reply_q is not None:
|
if self._reply_q is not None:
|
||||||
return self._reply_q
|
return self._reply_q
|
||||||
|
|
||||||
reply_q = 'reply_' + uuid.uuid4().hex
|
if self._q_manager:
|
||||||
|
reply_q = 'reply_' + self._q_manager.get()
|
||||||
|
else:
|
||||||
|
reply_q = 'reply_' + uuid.uuid4().hex
|
||||||
|
LOG.info('Creating reply queue: %s', reply_q)
|
||||||
|
|
||||||
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
|
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
|
||||||
|
|
||||||
@ -628,12 +700,20 @@ class AMQPDriverBase(base.BaseDriver):
|
|||||||
envelope=True, notify=False, retry=None, transport_options=None):
|
envelope=True, notify=False, retry=None, transport_options=None):
|
||||||
|
|
||||||
msg = message
|
msg = message
|
||||||
|
if 'method' in msg:
|
||||||
|
LOG.debug('Calling RPC method %s on target %s', msg.get('method'),
|
||||||
|
target.topic)
|
||||||
|
else:
|
||||||
|
LOG.debug('Sending message to topic %s', target.topic)
|
||||||
|
|
||||||
if wait_for_reply:
|
if wait_for_reply:
|
||||||
|
_reply_q = self._get_reply_q()
|
||||||
msg_id = uuid.uuid4().hex
|
msg_id = uuid.uuid4().hex
|
||||||
msg.update({'_msg_id': msg_id})
|
msg.update({'_msg_id': msg_id})
|
||||||
msg.update({'_reply_q': self._get_reply_q()})
|
msg.update({'_reply_q': _reply_q})
|
||||||
msg.update({'_timeout': call_monitor_timeout})
|
msg.update({'_timeout': call_monitor_timeout})
|
||||||
|
LOG.info('Expecting reply to msg %s in queue %s', msg_id,
|
||||||
|
_reply_q)
|
||||||
|
|
||||||
rpc_amqp._add_unique_id(msg)
|
rpc_amqp._add_unique_id(msg)
|
||||||
unique_id = msg[rpc_amqp.UNIQUE_ID]
|
unique_id = msg[rpc_amqp.UNIQUE_ID]
|
||||||
|
@ -241,7 +241,16 @@ rabbit_opts = [
|
|||||||
default=False,
|
default=False,
|
||||||
help="Enable x-cancel-on-ha-failover flag so that "
|
help="Enable x-cancel-on-ha-failover flag so that "
|
||||||
"rabbitmq server will cancel and notify consumers"
|
"rabbitmq server will cancel and notify consumers"
|
||||||
"when queue is down")
|
"when queue is down"),
|
||||||
|
cfg.BoolOpt('use_queue_manager',
|
||||||
|
default=False,
|
||||||
|
help='Should we use consistant queue names or random ones'),
|
||||||
|
cfg.StrOpt('hostname',
|
||||||
|
default=socket.gethostname(),
|
||||||
|
help='Hostname used by queue manager'),
|
||||||
|
cfg.StrOpt('processname',
|
||||||
|
default=os.path.basename(sys.argv[0]),
|
||||||
|
help='Process name used by queue manager'),
|
||||||
]
|
]
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -665,6 +674,7 @@ class Connection(object):
|
|||||||
self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread
|
self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread
|
||||||
self.ssl_enforce_fips_mode = driver_conf.ssl_enforce_fips_mode
|
self.ssl_enforce_fips_mode = driver_conf.ssl_enforce_fips_mode
|
||||||
self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover
|
self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover
|
||||||
|
self.use_queue_manager = driver_conf.use_queue_manager
|
||||||
|
|
||||||
if self.heartbeat_in_pthread:
|
if self.heartbeat_in_pthread:
|
||||||
# NOTE(hberaud): Experimental: threading module is in use to run
|
# NOTE(hberaud): Experimental: threading module is in use to run
|
||||||
@ -844,6 +854,13 @@ class Connection(object):
|
|||||||
self.connection.port = 1234
|
self.connection.port = 1234
|
||||||
self._poll_timeout = 0.05
|
self._poll_timeout = 0.05
|
||||||
|
|
||||||
|
if self.use_queue_manager:
|
||||||
|
self._q_manager = amqpdriver.QManager(
|
||||||
|
hostname=driver_conf.hostname,
|
||||||
|
processname=driver_conf.processname)
|
||||||
|
else:
|
||||||
|
self._q_manager = None
|
||||||
|
|
||||||
# FIXME(markmc): use oslo sslutils when it is available as a library
|
# FIXME(markmc): use oslo sslutils when it is available as a library
|
||||||
_SSL_PROTOCOLS = {
|
_SSL_PROTOCOLS = {
|
||||||
"tlsv1": ssl.PROTOCOL_TLSv1,
|
"tlsv1": ssl.PROTOCOL_TLSv1,
|
||||||
@ -1388,9 +1405,13 @@ class Connection(object):
|
|||||||
def declare_fanout_consumer(self, topic, callback):
|
def declare_fanout_consumer(self, topic, callback):
|
||||||
"""Create a 'fanout' consumer."""
|
"""Create a 'fanout' consumer."""
|
||||||
|
|
||||||
unique = uuid.uuid4().hex
|
if self._q_manager:
|
||||||
|
unique = self._q_manager.get()
|
||||||
|
else:
|
||||||
|
unique = uuid.uuid4().hex
|
||||||
exchange_name = '%s_fanout' % topic
|
exchange_name = '%s_fanout' % topic
|
||||||
queue_name = '%s_fanout_%s' % (topic, unique)
|
queue_name = '%s_fanout_%s' % (topic, unique)
|
||||||
|
LOG.info('Creating fanout queue: %s', queue_name)
|
||||||
|
|
||||||
consumer = Consumer(
|
consumer = Consumer(
|
||||||
exchange_name=exchange_name,
|
exchange_name=exchange_name,
|
||||||
|
@ -0,0 +1,21 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Add three new options (``use_queue_manager``, ``hostname``,
|
||||||
|
``processname``) to switch oslo.messaging from random queue names
|
||||||
|
(for reply_q and fanouts) to consistent naming.
|
||||||
|
The default value is False, so oslo.messaging will still use random queue
|
||||||
|
names if nothing is set in configuration file of services.
|
||||||
|
When switching use_queue_manager to True, the uuid4 random string from the
|
||||||
|
queue name is replaced with a combination of hostname, processname and
|
||||||
|
counter.
|
||||||
|
The counter will be kept in shared memory (/dev/shm/x_y_qmanager).
|
||||||
|
This way, when a service using oslo.messaging restarts (e.g. neutron),
|
||||||
|
it will re-create the queues using the same name as the previous run, so
|
||||||
|
no new queues are created and no need for rabbitmq to delete the previous
|
||||||
|
queues.
|
||||||
|
This is extremely useful for operator to debug which queue belong to which
|
||||||
|
server/process.
|
||||||
|
It's also higlhy recommended to enable this feature when using quorum
|
||||||
|
queues for transient (option named ``rabbit_transient_quorum_queue``) to
|
||||||
|
avoid consuming all erlang atoms after some time.
|
Loading…
Reference in New Issue
Block a user