359 lines
15 KiB
Python
359 lines
15 KiB
Python
# Copyright 2011 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
from oslo_utils import timeutils
|
|
import pika_pool
|
|
import tenacity
|
|
|
|
from oslo_messaging._drivers import base
|
|
from oslo_messaging._drivers.pika_driver import (pika_connection_factory as
|
|
pika_drv_conn_factory)
|
|
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
|
|
from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine
|
|
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
|
from oslo_messaging._drivers.pika_driver import pika_listener as pika_drv_lstnr
|
|
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
|
|
from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
|
|
from oslo_messaging import exceptions
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
pika_pool_opts = [
|
|
cfg.IntOpt('pool_max_size', default=30,
|
|
help="Maximum number of connections to keep queued."),
|
|
cfg.IntOpt('pool_max_overflow', default=0,
|
|
help="Maximum number of connections to create above "
|
|
"`pool_max_size`."),
|
|
cfg.IntOpt('pool_timeout', default=30,
|
|
help="Default number of seconds to wait for a connections to "
|
|
"available"),
|
|
cfg.IntOpt('pool_recycle', default=600,
|
|
help="Lifetime of a connection (since creation) in seconds "
|
|
"or None for no recycling. Expired connections are "
|
|
"closed on acquire."),
|
|
cfg.IntOpt('pool_stale', default=60,
|
|
help="Threshold at which inactive (since release) connections "
|
|
"are considered stale in seconds or None for no "
|
|
"staleness. Stale connections are closed on acquire.")
|
|
]
|
|
|
|
message_opts = [
|
|
cfg.StrOpt('default_serializer_type', default='json',
|
|
choices=('json', 'msgpack'),
|
|
help="Default serialization mechanism for "
|
|
"serializing/deserializing outgoing/incoming messages")
|
|
]
|
|
|
|
notification_opts = [
|
|
cfg.BoolOpt('notification_persistence', default=False,
|
|
help="Persist notification messages."),
|
|
cfg.StrOpt('default_notification_exchange',
|
|
default="${control_exchange}_notification",
|
|
help="Exchange name for sending notifications"),
|
|
cfg.IntOpt(
|
|
'notification_listener_prefetch_count', default=100,
|
|
help="Max number of not acknowledged message which RabbitMQ can send "
|
|
"to notification listener."
|
|
),
|
|
cfg.IntOpt(
|
|
'default_notification_retry_attempts', default=-1,
|
|
help="Reconnecting retry count in case of connectivity problem during "
|
|
"sending notification, -1 means infinite retry."
|
|
),
|
|
cfg.FloatOpt(
|
|
'notification_retry_delay', default=0.25,
|
|
help="Reconnecting retry delay in case of connectivity problem during "
|
|
"sending notification message"
|
|
)
|
|
]
|
|
|
|
rpc_opts = [
|
|
cfg.IntOpt('rpc_queue_expiration', default=60,
|
|
help="Time to live for rpc queues without consumers in "
|
|
"seconds."),
|
|
cfg.StrOpt('default_rpc_exchange', default="${control_exchange}_rpc",
|
|
help="Exchange name for sending RPC messages"),
|
|
cfg.StrOpt('rpc_reply_exchange', default="${control_exchange}_rpc_reply",
|
|
help="Exchange name for receiving RPC replies"),
|
|
cfg.IntOpt(
|
|
'rpc_listener_prefetch_count', default=100,
|
|
help="Max number of not acknowledged message which RabbitMQ can send "
|
|
"to rpc listener."
|
|
),
|
|
cfg.IntOpt(
|
|
'rpc_reply_listener_prefetch_count', default=100,
|
|
help="Max number of not acknowledged message which RabbitMQ can send "
|
|
"to rpc reply listener."
|
|
),
|
|
cfg.IntOpt(
|
|
'rpc_reply_retry_attempts', default=-1,
|
|
help="Reconnecting retry count in case of connectivity problem during "
|
|
"sending reply. -1 means infinite retry during rpc_timeout"
|
|
),
|
|
cfg.FloatOpt(
|
|
'rpc_reply_retry_delay', default=0.25,
|
|
help="Reconnecting retry delay in case of connectivity problem during "
|
|
"sending reply."
|
|
),
|
|
cfg.IntOpt(
|
|
'default_rpc_retry_attempts', default=-1,
|
|
help="Reconnecting retry count in case of connectivity problem during "
|
|
"sending RPC message, -1 means infinite retry. If actual "
|
|
"retry attempts in not 0 the rpc request could be processed more "
|
|
"than one time"
|
|
),
|
|
cfg.FloatOpt(
|
|
'rpc_retry_delay', default=0.25,
|
|
help="Reconnecting retry delay in case of connectivity problem during "
|
|
"sending RPC message"
|
|
)
|
|
]
|
|
|
|
|
|
class PikaDriver(base.BaseDriver):
|
|
"""Pika Driver
|
|
|
|
The ``pika`` driver is the successor to the existing rabbit/kombu driver.
|
|
It is based on the Pika client library and supports the RabbitMQ broker as
|
|
the messaging back end.
|
|
|
|
See :doc:`pika_driver` for details.
|
|
"""
|
|
|
|
def __init__(self, conf, url, default_exchange=None,
|
|
allowed_remote_exmods=None):
|
|
opt_group = cfg.OptGroup(name='oslo_messaging_pika',
|
|
title='Pika driver options')
|
|
conf.register_group(opt_group)
|
|
conf.register_opts(pika_drv_conn_factory.pika_opts, group=opt_group)
|
|
conf.register_opts(pika_pool_opts, group=opt_group)
|
|
conf.register_opts(message_opts, group=opt_group)
|
|
conf.register_opts(rpc_opts, group=opt_group)
|
|
conf.register_opts(notification_opts, group=opt_group)
|
|
|
|
self._pika_engine = pika_drv_engine.PikaEngine(
|
|
conf, url, default_exchange, allowed_remote_exmods
|
|
)
|
|
self._reply_listener = pika_drv_lstnr.RpcReplyPikaListener(
|
|
self._pika_engine
|
|
)
|
|
super(PikaDriver, self).__init__(conf, url, default_exchange,
|
|
allowed_remote_exmods)
|
|
|
|
def require_features(self, requeue=False):
|
|
pass
|
|
|
|
def _declare_rpc_exchange(self, exchange, stopwatch):
|
|
timeout = stopwatch.leftover(return_none=True)
|
|
with (self._pika_engine.connection_without_confirmation_pool
|
|
.acquire(timeout=timeout)) as conn:
|
|
try:
|
|
self._pika_engine.declare_exchange_by_channel(
|
|
conn.channel,
|
|
self._pika_engine.get_rpc_exchange_name(
|
|
exchange
|
|
), "direct", False
|
|
)
|
|
except pika_pool.Timeout as e:
|
|
raise exceptions.MessagingTimeout(
|
|
"Timeout for current operation was expired. {}.".format(
|
|
str(e)
|
|
)
|
|
)
|
|
|
|
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
|
retry=None):
|
|
with timeutils.StopWatch(duration=timeout) as stopwatch:
|
|
if retry is None:
|
|
retry = self._pika_engine.default_rpc_retry_attempts
|
|
|
|
exchange = self._pika_engine.get_rpc_exchange_name(
|
|
target.exchange
|
|
)
|
|
|
|
def on_exception(ex):
|
|
if isinstance(ex, pika_drv_exc.ExchangeNotFoundException):
|
|
# it is desired to create exchange because if we sent to
|
|
# exchange which is not exists, we get ChannelClosed
|
|
# exception and need to reconnect
|
|
try:
|
|
self._declare_rpc_exchange(exchange, stopwatch)
|
|
except pika_drv_exc.ConnectionException as e:
|
|
LOG.warning("Problem during declaring exchange. %s", e)
|
|
return True
|
|
elif isinstance(ex, (pika_drv_exc.ConnectionException,
|
|
exceptions.MessageDeliveryFailure)):
|
|
LOG.warning("Problem during message sending. %s", ex)
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
if retry:
|
|
retrier = tenacity.retry(
|
|
stop=(tenacity.stop_never if retry == -1 else
|
|
tenacity.stop_after_attempt(retry)),
|
|
retry=tenacity.retry_if_exception(on_exception),
|
|
wait=tenacity.wait_fixed(self._pika_engine.rpc_retry_delay)
|
|
)
|
|
else:
|
|
retrier = None
|
|
|
|
if target.fanout:
|
|
return self.cast_all_workers(
|
|
exchange, target.topic, ctxt, message, stopwatch, retrier
|
|
)
|
|
|
|
routing_key = self._pika_engine.get_rpc_queue_name(
|
|
target.topic, target.server, retrier is None
|
|
)
|
|
|
|
msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine,
|
|
message, ctxt)
|
|
try:
|
|
reply = msg.send(
|
|
exchange=exchange,
|
|
routing_key=routing_key,
|
|
reply_listener=(
|
|
self._reply_listener if wait_for_reply else None
|
|
),
|
|
stopwatch=stopwatch,
|
|
retrier=retrier
|
|
)
|
|
except pika_drv_exc.ExchangeNotFoundException as ex:
|
|
try:
|
|
self._declare_rpc_exchange(exchange, stopwatch)
|
|
except pika_drv_exc.ConnectionException as e:
|
|
LOG.warning("Problem during declaring exchange. %s", e)
|
|
raise ex
|
|
|
|
if reply is not None:
|
|
if reply.failure is not None:
|
|
raise reply.failure
|
|
|
|
return reply.result
|
|
|
|
def cast_all_workers(self, exchange, topic, ctxt, message, stopwatch,
|
|
retrier=None):
|
|
msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
|
|
ctxt)
|
|
try:
|
|
msg.send(
|
|
exchange=exchange,
|
|
routing_key=self._pika_engine.get_rpc_queue_name(
|
|
topic, "all_workers", retrier is None
|
|
),
|
|
mandatory=False,
|
|
stopwatch=stopwatch,
|
|
retrier=retrier
|
|
)
|
|
except pika_drv_exc.ExchangeNotFoundException:
|
|
try:
|
|
self._declare_rpc_exchange(exchange, stopwatch)
|
|
except pika_drv_exc.ConnectionException as e:
|
|
LOG.warning("Problem during declaring exchange. %s", e)
|
|
|
|
def _declare_notification_queue_binding(
|
|
self, target, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH):
|
|
if stopwatch.expired():
|
|
raise exceptions.MessagingTimeout(
|
|
"Timeout for current operation was expired."
|
|
)
|
|
try:
|
|
timeout = stopwatch.leftover(return_none=True)
|
|
with (self._pika_engine.connection_without_confirmation_pool
|
|
.acquire)(timeout=timeout) as conn:
|
|
self._pika_engine.declare_queue_binding_by_channel(
|
|
conn.channel,
|
|
exchange=(
|
|
target.exchange or
|
|
self._pika_engine.default_notification_exchange
|
|
),
|
|
queue=target.topic,
|
|
routing_key=target.topic,
|
|
exchange_type='direct',
|
|
queue_expiration=None,
|
|
durable=self._pika_engine.notification_persistence,
|
|
)
|
|
except pika_pool.Timeout as e:
|
|
raise exceptions.MessagingTimeout(
|
|
"Timeout for current operation was expired. {}.".format(str(e))
|
|
)
|
|
|
|
def send_notification(self, target, ctxt, message, version, retry=None):
|
|
if retry is None:
|
|
retry = self._pika_engine.default_notification_retry_attempts
|
|
|
|
def on_exception(ex):
|
|
if isinstance(ex, (pika_drv_exc.ExchangeNotFoundException,
|
|
pika_drv_exc.RoutingException)):
|
|
LOG.warning("Problem during sending notification. %s", ex)
|
|
try:
|
|
self._declare_notification_queue_binding(target)
|
|
except pika_drv_exc.ConnectionException as e:
|
|
LOG.warning("Problem during declaring notification queue "
|
|
"binding. %s", e)
|
|
return True
|
|
elif isinstance(ex, (pika_drv_exc.ConnectionException,
|
|
pika_drv_exc.MessageRejectedException)):
|
|
LOG.warning("Problem during sending notification. %s", ex)
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
if retry:
|
|
retrier = tenacity.retry(
|
|
stop=(tenacity.stop_never if retry == -1 else
|
|
tenacity.stop_after_attempt(retry)),
|
|
retry=tenacity.retry_if_exception(on_exception),
|
|
wait=tenacity.wait_fixed(
|
|
self._pika_engine.notification_retry_delay
|
|
)
|
|
)
|
|
else:
|
|
retrier = None
|
|
|
|
msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
|
|
ctxt)
|
|
return msg.send(
|
|
exchange=(
|
|
target.exchange or
|
|
self._pika_engine.default_notification_exchange
|
|
),
|
|
routing_key=target.topic,
|
|
confirm=True,
|
|
mandatory=True,
|
|
persistent=self._pika_engine.notification_persistence,
|
|
retrier=retrier
|
|
)
|
|
|
|
def listen(self, target, batch_size, batch_timeout):
|
|
return pika_drv_poller.RpcServicePikaPoller(
|
|
self._pika_engine, target, batch_size, batch_timeout,
|
|
self._pika_engine.rpc_listener_prefetch_count
|
|
)
|
|
|
|
def listen_for_notifications(self, targets_and_priorities, pool,
|
|
batch_size, batch_timeout):
|
|
return pika_drv_poller.NotificationPikaPoller(
|
|
self._pika_engine, targets_and_priorities, batch_size,
|
|
batch_timeout,
|
|
self._pika_engine.notification_listener_prefetch_count, pool
|
|
)
|
|
|
|
def cleanup(self):
|
|
self._reply_listener.cleanup()
|
|
self._pika_engine.cleanup()
|