From 07822a0e7479480af81659e0027e2abf8a6587d0 Mon Sep 17 00:00:00 2001 From: dukhlov Date: Mon, 25 Jan 2016 18:10:46 +0200 Subject: [PATCH] Adds document and configuration guide Change-Id: I18ff502eceaf4ddaba607319038a5cc14fe14c5b --- doc/source/pika_driver.rst | 156 ++++++++++++++++++ oslo_messaging/_drivers/impl_pika.py | 24 ++- .../_drivers/pika_driver/pika_engine.py | 27 ++- .../_drivers/pika_driver/pika_poller.py | 8 +- .../tests/drivers/pika/test_poller.py | 6 +- 5 files changed, 192 insertions(+), 29 deletions(-) create mode 100644 doc/source/pika_driver.rst diff --git a/doc/source/pika_driver.rst b/doc/source/pika_driver.rst new file mode 100644 index 000000000..508aaa611 --- /dev/null +++ b/doc/source/pika_driver.rst @@ -0,0 +1,156 @@ +------------------------------ +Pika Driver Deployment Guide +------------------------------ + +.. currentmodule:: oslo_messaging + +============ +Introduction +============ + +Pika is a pure-Python implementation of the AMQP 0-9-1 protocol including +RabbitMQ's extensions. It is very actively supported and recommended by +RabbitMQ developers + +======== +Abstract +======== + +PikaDriver is one of oslo.messaging backend drivers. It supports RPC and Notify +patterns. Currently it could be the only oslo.messaging driver across the +OpenStack cluster. This document provides deployment information for this +driver in oslo_messaging. + +This driver is able to work with single instance of RabbitMQ server or +RabbitMQ cluster. + + +============= +Configuration +============= + +Enabling (mandatory) +-------------------- + +To enable the driver, in the section [DEFAULT] of the conf file, +the 'transport_url' parameter should be set to +`pika://user:pass@host1:port[,hostN:portN]` + + [DEFAULT] + transport_url = pika://guest:guest@localhost:5672 + + +Connection options (optional) +----------------------------- + +In section [oslo_messaging_pika]: +#. channel_max - Maximum number of channels to allow, + +#. frame_max (default - pika default value): The maximum byte size for + an AMQP frame, + +#. heartbeat_interval (default=1): How often to send heartbeats for + consumer's connections in seconds. If 0 - disable heartbeats, + +#. ssl (default=False): Enable SSL if True, + +#. ssl_options (default=None): Arguments passed to ssl.wrap_socket, + +#. socket_timeout (default=0.25): Set timeout for opening new connection's + socket, + +#. tcp_user_timeout (default=0.25): Set TCP_USER_TIMEOUT in seconds for + connection's socket, + +#. host_connection_reconnect_delay (default=0.25): Set delay for reconnection + to some host after connection error + + +Connection pool options (optional) +---------------------------------- + +In section [oslo_messaging_pika]: + +#. pool_max_size (default=10): Maximum number of connections to keep queued, + +#. pool_max_overflow (default=0): Maximum number of connections to create above + `pool_max_size`, + +#. pool_timeout (default=30): Default number of seconds to wait for a + connections to available, + +#. pool_recycle (default=600): Lifetime of a connection (since creation) in + seconds or None for no recycling. Expired connections are closed on acquire, + +#. pool_stale (default=60): Threshold at which inactive (since release) + connections are considered stale in seconds or None for no staleness. + Stale connections are closed on acquire.") + +RPC related options (optional) +------------------------------ + +In section [oslo_messaging_pika]: + +#. rpc_queue_expiration (default=60): Time to live for rpc queues without + consumers in seconds, + +#. default_rpc_exchange (default="${control_exchange}_rpc"): Exchange name for + sending RPC messages, + +#. rpc_reply_exchange', default=("${control_exchange}_rpc_reply"): Exchange + name for receiving RPC replies, + +#. rpc_listener_prefetch_count (default=100): Max number of not acknowledged + message which RabbitMQ can send to rpc listener, + +#. rpc_reply_listener_prefetch_count (default=100): Max number of not + acknowledged message which RabbitMQ can send to rpc reply listener, + +#. rpc_reply_retry_attempts (default=-1): Reconnecting retry count in case of + connectivity problem during sending reply. -1 means infinite retry during + rpc_timeout, + +#. rpc_reply_retry_delay (default=0.25) Reconnecting retry delay in case of + connectivity problem during sending reply, + +#. default_rpc_retry_attempts (default=-1): Reconnecting retry count in case of + connectivity problem during sending RPC message, -1 means infinite retry. If + actual retry attempts in not 0 the rpc request could be processed more then + one time, + +#. rpc_retry_delay (default=0.25): Reconnecting retry delay in case of + connectivity problem during sending RPC message + +$control_exchange in this code is value of [DEFAULT].control_exchange option, +which is "openstack" by default + +Notification related options (optional) +--------------------------------------- + +In section [oslo_messaging_pika]: + +#. notification_persistence (default=False): Persist notification messages, + +#. default_notification_exchange (default="${control_exchange}_notification"): + Exchange name for for sending notifications, + +#. notification_listener_prefetch_count (default=100): Max number of not + acknowledged message which RabbitMQ can send to notification listener, + +#. default_notification_retry_attempts (default=-1): Reconnecting retry count + in case of connectivity problem during sending notification, -1 means + infinite retry, + +#. notification_retry_delay (default=0.25): Reconnecting retry delay in case of + connectivity problem during sending notification message + +$control_exchange in this code is value of [DEFAULT].control_exchange option, +which is "openstack" by default + +DevStack Support +---------------- + +Pika driver is supported by DevStack. To enable it you should edit +local.conf [localrc] section and add next there: + + enable_plugin pika https://git.openstack.org/openstack/devstack-plugin-pika diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index 3d633a5b1..796a8247d 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -74,6 +74,11 @@ notification_opts = [ cfg.StrOpt('default_notification_exchange', default="${control_exchange}_notification", help="Exchange name for for sending notifications"), + cfg.IntOpt( + 'notification_listener_prefetch_count', default=100, + help="Max number of not acknowledged message which RabbitMQ can send " + "to notification listener." + ), cfg.IntOpt( 'default_notification_retry_attempts', default=-1, help="Reconnecting retry count in case of connectivity problem during " @@ -91,19 +96,18 @@ rpc_opts = [ help="Time to live for rpc queues without consumers in " "seconds."), cfg.StrOpt('default_rpc_exchange', default="${control_exchange}_rpc", - help="Exchange name for for sending RPC messages"), + help="Exchange name for sending RPC messages"), cfg.StrOpt('rpc_reply_exchange', default="${control_exchange}_rpc_reply", - help="Exchange name for for receiving RPC replies"), + help="Exchange name for receiving RPC replies"), cfg.IntOpt( - 'rpc_listener_prefetch_count', default=10, + 'rpc_listener_prefetch_count', default=100, help="Max number of not acknowledged message which RabbitMQ can send " - "to rpc listener. Works only if rpc_listener_ack == True" + "to rpc listener." ), cfg.IntOpt( - 'rpc_reply_listener_prefetch_count', default=10, + 'rpc_reply_listener_prefetch_count', default=100, help="Max number of not acknowledged message which RabbitMQ can send " - "to rpc reply listener. Works only if rpc_reply_listener_ack == " - "True" + "to rpc reply listener." ), cfg.IntOpt( 'rpc_reply_retry_attempts', default=-1, @@ -267,7 +271,11 @@ class PikaDriver(object): def listen_for_notifications(self, targets_and_priorities, pool): listener = pika_drv_poller.NotificationPikaPoller( - self._pika_engine, targets_and_priorities, pool + self._pika_engine, targets_and_priorities, + prefetch_count=( + self._pika_engine.notification_listener_prefetch_count + ), + queue_name=pool ) listener.start() return listener diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py index 997b0d104..be1db685e 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -119,6 +119,16 @@ class PikaEngine(object): conf.oslo_messaging_pika.rpc_listener_prefetch_count ) + self.default_rpc_retry_attempts = ( + conf.oslo_messaging_pika.default_rpc_retry_attempts + ) + + self.rpc_retry_delay = ( + conf.oslo_messaging_pika.rpc_retry_delay + ) + if self.rpc_retry_delay < 0: + raise ValueError("rpc_retry_delay should be non-negative integer") + self.rpc_reply_listener_prefetch_count = ( conf.oslo_messaging_pika.rpc_listener_prefetch_count ) @@ -126,13 +136,10 @@ class PikaEngine(object): self.rpc_reply_retry_attempts = ( conf.oslo_messaging_pika.rpc_reply_retry_attempts ) - if self.rpc_reply_retry_attempts is None: - raise ValueError("rpc_reply_retry_attempts should be integer") self.rpc_reply_retry_delay = ( conf.oslo_messaging_pika.rpc_reply_retry_delay ) - if (self.rpc_reply_retry_delay is None or - self.rpc_reply_retry_delay < 0): + if self.rpc_reply_retry_delay < 0: raise ValueError("rpc_reply_retry_delay should be non-negative " "integer") @@ -151,17 +158,9 @@ class PikaEngine(object): conf.oslo_messaging_pika.notification_persistence ) - self.default_rpc_retry_attempts = ( - conf.oslo_messaging_pika.default_rpc_retry_attempts + self.notification_listener_prefetch_count = ( + conf.oslo_messaging_pika.notification_listener_prefetch_count ) - if self.default_rpc_retry_attempts is None: - raise ValueError("default_rpc_retry_attempts should be an integer") - self.rpc_retry_delay = ( - conf.oslo_messaging_pika.rpc_retry_delay - ) - if (self.rpc_retry_delay is None or - self.rpc_retry_delay < 0): - raise ValueError("rpc_retry_delay should be non-negative integer") self.default_notification_retry_attempts = ( conf.oslo_messaging_pika.default_notification_retry_attempts diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index 68edc1247..4edfa0d50 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -353,8 +353,8 @@ class NotificationPikaPoller(PikaPoller): """PikaPoller implementation for polling Notification messages. Overrides base functionality according to Notification specific """ - def __init__(self, pika_engine, targets_and_priorities, - queue_name=None, prefetch_count=100): + def __init__(self, pika_engine, targets_and_priorities, prefetch_count, + queue_name=None): """Adds targets_and_priorities and queue_name parameter for declaring exchanges and queues used for notification delivery @@ -362,10 +362,10 @@ class NotificationPikaPoller(PikaPoller): shared driver functionality :param targets_and_priorities: list of (target, priority), defines default queue names for corresponding notification types - :param queue: String, alternative queue name used for this poller - instead of default queue name :param prefetch_count: Integer, maximum count of unacknowledged messages which RabbitMQ broker sends to this consumer + :param queue: String, alternative queue name used for this poller + instead of default queue name """ self._targets_and_priorities = targets_and_priorities self._queue_name = queue_name diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py index 77a3b6b29..d6e5e8ab8 100644 --- a/oslo_messaging/tests/drivers/pika/test_poller.py +++ b/oslo_messaging/tests/drivers/pika/test_poller.py @@ -416,8 +416,8 @@ class NotificationPikaPollerTestCase(unittest.TestCase): def test_declare_notification_queue_bindings_default_queue( self, pika_incoming_message_mock): poller = pika_poller.NotificationPikaPoller( - self._pika_engine, self._target_and_priorities, None, - self._prefetch_count, + self._pika_engine, self._target_and_priorities, + self._prefetch_count, None ) self._poller_connection_mock.process_data_events.side_effect = ( lambda time_limit: poller._on_message_with_ack_callback( @@ -479,7 +479,7 @@ class NotificationPikaPollerTestCase(unittest.TestCase): self, pika_incoming_message_mock): poller = pika_poller.NotificationPikaPoller( self._pika_engine, self._target_and_priorities, - "custom_queue_name", self._prefetch_count + self._prefetch_count, "custom_queue_name" ) self._poller_connection_mock.process_data_events.side_effect = ( lambda time_limit: poller._on_message_with_ack_callback(