Adds document and configuration guide
Change-Id: I18ff502eceaf4ddaba607319038a5cc14fe14c5b
This commit is contained in:
parent
a053593c13
commit
07822a0e74
156
doc/source/pika_driver.rst
Normal file
156
doc/source/pika_driver.rst
Normal file
@ -0,0 +1,156 @@
|
||||
------------------------------
|
||||
Pika Driver Deployment Guide
|
||||
------------------------------
|
||||
|
||||
.. currentmodule:: oslo_messaging
|
||||
|
||||
============
|
||||
Introduction
|
||||
============
|
||||
|
||||
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol including
|
||||
RabbitMQ's extensions. It is very actively supported and recommended by
|
||||
RabbitMQ developers
|
||||
|
||||
========
|
||||
Abstract
|
||||
========
|
||||
|
||||
PikaDriver is one of oslo.messaging backend drivers. It supports RPC and Notify
|
||||
patterns. Currently it could be the only oslo.messaging driver across the
|
||||
OpenStack cluster. This document provides deployment information for this
|
||||
driver in oslo_messaging.
|
||||
|
||||
This driver is able to work with single instance of RabbitMQ server or
|
||||
RabbitMQ cluster.
|
||||
|
||||
|
||||
=============
|
||||
Configuration
|
||||
=============
|
||||
|
||||
Enabling (mandatory)
|
||||
--------------------
|
||||
|
||||
To enable the driver, in the section [DEFAULT] of the conf file,
|
||||
the 'transport_url' parameter should be set to
|
||||
`pika://user:pass@host1:port[,hostN:portN]`
|
||||
|
||||
[DEFAULT]
|
||||
transport_url = pika://guest:guest@localhost:5672
|
||||
|
||||
|
||||
Connection options (optional)
|
||||
-----------------------------
|
||||
|
||||
In section [oslo_messaging_pika]:
|
||||
#. channel_max - Maximum number of channels to allow,
|
||||
|
||||
#. frame_max (default - pika default value): The maximum byte size for
|
||||
an AMQP frame,
|
||||
|
||||
#. heartbeat_interval (default=1): How often to send heartbeats for
|
||||
consumer's connections in seconds. If 0 - disable heartbeats,
|
||||
|
||||
#. ssl (default=False): Enable SSL if True,
|
||||
|
||||
#. ssl_options (default=None): Arguments passed to ssl.wrap_socket,
|
||||
|
||||
#. socket_timeout (default=0.25): Set timeout for opening new connection's
|
||||
socket,
|
||||
|
||||
#. tcp_user_timeout (default=0.25): Set TCP_USER_TIMEOUT in seconds for
|
||||
connection's socket,
|
||||
|
||||
#. host_connection_reconnect_delay (default=0.25): Set delay for reconnection
|
||||
to some host after connection error
|
||||
|
||||
|
||||
Connection pool options (optional)
|
||||
----------------------------------
|
||||
|
||||
In section [oslo_messaging_pika]:
|
||||
|
||||
#. pool_max_size (default=10): Maximum number of connections to keep queued,
|
||||
|
||||
#. pool_max_overflow (default=0): Maximum number of connections to create above
|
||||
`pool_max_size`,
|
||||
|
||||
#. pool_timeout (default=30): Default number of seconds to wait for a
|
||||
connections to available,
|
||||
|
||||
#. pool_recycle (default=600): Lifetime of a connection (since creation) in
|
||||
seconds or None for no recycling. Expired connections are closed on acquire,
|
||||
|
||||
#. pool_stale (default=60): Threshold at which inactive (since release)
|
||||
connections are considered stale in seconds or None for no staleness.
|
||||
Stale connections are closed on acquire.")
|
||||
|
||||
RPC related options (optional)
|
||||
------------------------------
|
||||
|
||||
In section [oslo_messaging_pika]:
|
||||
|
||||
#. rpc_queue_expiration (default=60): Time to live for rpc queues without
|
||||
consumers in seconds,
|
||||
|
||||
#. default_rpc_exchange (default="${control_exchange}_rpc"): Exchange name for
|
||||
sending RPC messages,
|
||||
|
||||
#. rpc_reply_exchange', default=("${control_exchange}_rpc_reply"): Exchange
|
||||
name for receiving RPC replies,
|
||||
|
||||
#. rpc_listener_prefetch_count (default=100): Max number of not acknowledged
|
||||
message which RabbitMQ can send to rpc listener,
|
||||
|
||||
#. rpc_reply_listener_prefetch_count (default=100): Max number of not
|
||||
acknowledged message which RabbitMQ can send to rpc reply listener,
|
||||
|
||||
#. rpc_reply_retry_attempts (default=-1): Reconnecting retry count in case of
|
||||
connectivity problem during sending reply. -1 means infinite retry during
|
||||
rpc_timeout,
|
||||
|
||||
#. rpc_reply_retry_delay (default=0.25) Reconnecting retry delay in case of
|
||||
connectivity problem during sending reply,
|
||||
|
||||
#. default_rpc_retry_attempts (default=-1): Reconnecting retry count in case of
|
||||
connectivity problem during sending RPC message, -1 means infinite retry. If
|
||||
actual retry attempts in not 0 the rpc request could be processed more then
|
||||
one time,
|
||||
|
||||
#. rpc_retry_delay (default=0.25): Reconnecting retry delay in case of
|
||||
connectivity problem during sending RPC message
|
||||
|
||||
$control_exchange in this code is value of [DEFAULT].control_exchange option,
|
||||
which is "openstack" by default
|
||||
|
||||
Notification related options (optional)
|
||||
---------------------------------------
|
||||
|
||||
In section [oslo_messaging_pika]:
|
||||
|
||||
#. notification_persistence (default=False): Persist notification messages,
|
||||
|
||||
#. default_notification_exchange (default="${control_exchange}_notification"):
|
||||
Exchange name for for sending notifications,
|
||||
|
||||
#. notification_listener_prefetch_count (default=100): Max number of not
|
||||
acknowledged message which RabbitMQ can send to notification listener,
|
||||
|
||||
#. default_notification_retry_attempts (default=-1): Reconnecting retry count
|
||||
in case of connectivity problem during sending notification, -1 means
|
||||
infinite retry,
|
||||
|
||||
#. notification_retry_delay (default=0.25): Reconnecting retry delay in case of
|
||||
connectivity problem during sending notification message
|
||||
|
||||
$control_exchange in this code is value of [DEFAULT].control_exchange option,
|
||||
which is "openstack" by default
|
||||
|
||||
DevStack Support
|
||||
----------------
|
||||
|
||||
Pika driver is supported by DevStack. To enable it you should edit
|
||||
local.conf [localrc] section and add next there:
|
||||
|
||||
enable_plugin pika https://git.openstack.org/openstack/devstack-plugin-pika
|
@ -74,6 +74,11 @@ notification_opts = [
|
||||
cfg.StrOpt('default_notification_exchange',
|
||||
default="${control_exchange}_notification",
|
||||
help="Exchange name for for sending notifications"),
|
||||
cfg.IntOpt(
|
||||
'notification_listener_prefetch_count', default=100,
|
||||
help="Max number of not acknowledged message which RabbitMQ can send "
|
||||
"to notification listener."
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'default_notification_retry_attempts', default=-1,
|
||||
help="Reconnecting retry count in case of connectivity problem during "
|
||||
@ -91,19 +96,18 @@ rpc_opts = [
|
||||
help="Time to live for rpc queues without consumers in "
|
||||
"seconds."),
|
||||
cfg.StrOpt('default_rpc_exchange', default="${control_exchange}_rpc",
|
||||
help="Exchange name for for sending RPC messages"),
|
||||
help="Exchange name for sending RPC messages"),
|
||||
cfg.StrOpt('rpc_reply_exchange', default="${control_exchange}_rpc_reply",
|
||||
help="Exchange name for for receiving RPC replies"),
|
||||
help="Exchange name for receiving RPC replies"),
|
||||
cfg.IntOpt(
|
||||
'rpc_listener_prefetch_count', default=10,
|
||||
'rpc_listener_prefetch_count', default=100,
|
||||
help="Max number of not acknowledged message which RabbitMQ can send "
|
||||
"to rpc listener. Works only if rpc_listener_ack == True"
|
||||
"to rpc listener."
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'rpc_reply_listener_prefetch_count', default=10,
|
||||
'rpc_reply_listener_prefetch_count', default=100,
|
||||
help="Max number of not acknowledged message which RabbitMQ can send "
|
||||
"to rpc reply listener. Works only if rpc_reply_listener_ack == "
|
||||
"True"
|
||||
"to rpc reply listener."
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'rpc_reply_retry_attempts', default=-1,
|
||||
@ -267,7 +271,11 @@ class PikaDriver(object):
|
||||
|
||||
def listen_for_notifications(self, targets_and_priorities, pool):
|
||||
listener = pika_drv_poller.NotificationPikaPoller(
|
||||
self._pika_engine, targets_and_priorities, pool
|
||||
self._pika_engine, targets_and_priorities,
|
||||
prefetch_count=(
|
||||
self._pika_engine.notification_listener_prefetch_count
|
||||
),
|
||||
queue_name=pool
|
||||
)
|
||||
listener.start()
|
||||
return listener
|
||||
|
@ -119,6 +119,16 @@ class PikaEngine(object):
|
||||
conf.oslo_messaging_pika.rpc_listener_prefetch_count
|
||||
)
|
||||
|
||||
self.default_rpc_retry_attempts = (
|
||||
conf.oslo_messaging_pika.default_rpc_retry_attempts
|
||||
)
|
||||
|
||||
self.rpc_retry_delay = (
|
||||
conf.oslo_messaging_pika.rpc_retry_delay
|
||||
)
|
||||
if self.rpc_retry_delay < 0:
|
||||
raise ValueError("rpc_retry_delay should be non-negative integer")
|
||||
|
||||
self.rpc_reply_listener_prefetch_count = (
|
||||
conf.oslo_messaging_pika.rpc_listener_prefetch_count
|
||||
)
|
||||
@ -126,13 +136,10 @@ class PikaEngine(object):
|
||||
self.rpc_reply_retry_attempts = (
|
||||
conf.oslo_messaging_pika.rpc_reply_retry_attempts
|
||||
)
|
||||
if self.rpc_reply_retry_attempts is None:
|
||||
raise ValueError("rpc_reply_retry_attempts should be integer")
|
||||
self.rpc_reply_retry_delay = (
|
||||
conf.oslo_messaging_pika.rpc_reply_retry_delay
|
||||
)
|
||||
if (self.rpc_reply_retry_delay is None or
|
||||
self.rpc_reply_retry_delay < 0):
|
||||
if self.rpc_reply_retry_delay < 0:
|
||||
raise ValueError("rpc_reply_retry_delay should be non-negative "
|
||||
"integer")
|
||||
|
||||
@ -151,17 +158,9 @@ class PikaEngine(object):
|
||||
conf.oslo_messaging_pika.notification_persistence
|
||||
)
|
||||
|
||||
self.default_rpc_retry_attempts = (
|
||||
conf.oslo_messaging_pika.default_rpc_retry_attempts
|
||||
self.notification_listener_prefetch_count = (
|
||||
conf.oslo_messaging_pika.notification_listener_prefetch_count
|
||||
)
|
||||
if self.default_rpc_retry_attempts is None:
|
||||
raise ValueError("default_rpc_retry_attempts should be an integer")
|
||||
self.rpc_retry_delay = (
|
||||
conf.oslo_messaging_pika.rpc_retry_delay
|
||||
)
|
||||
if (self.rpc_retry_delay is None or
|
||||
self.rpc_retry_delay < 0):
|
||||
raise ValueError("rpc_retry_delay should be non-negative integer")
|
||||
|
||||
self.default_notification_retry_attempts = (
|
||||
conf.oslo_messaging_pika.default_notification_retry_attempts
|
||||
|
@ -353,8 +353,8 @@ class NotificationPikaPoller(PikaPoller):
|
||||
"""PikaPoller implementation for polling Notification messages. Overrides
|
||||
base functionality according to Notification specific
|
||||
"""
|
||||
def __init__(self, pika_engine, targets_and_priorities,
|
||||
queue_name=None, prefetch_count=100):
|
||||
def __init__(self, pika_engine, targets_and_priorities, prefetch_count,
|
||||
queue_name=None):
|
||||
"""Adds targets_and_priorities and queue_name parameter
|
||||
for declaring exchanges and queues used for notification delivery
|
||||
|
||||
@ -362,10 +362,10 @@ class NotificationPikaPoller(PikaPoller):
|
||||
shared driver functionality
|
||||
:param targets_and_priorities: list of (target, priority), defines
|
||||
default queue names for corresponding notification types
|
||||
:param queue: String, alternative queue name used for this poller
|
||||
instead of default queue name
|
||||
:param prefetch_count: Integer, maximum count of unacknowledged
|
||||
messages which RabbitMQ broker sends to this consumer
|
||||
:param queue: String, alternative queue name used for this poller
|
||||
instead of default queue name
|
||||
"""
|
||||
self._targets_and_priorities = targets_and_priorities
|
||||
self._queue_name = queue_name
|
||||
|
@ -416,8 +416,8 @@ class NotificationPikaPollerTestCase(unittest.TestCase):
|
||||
def test_declare_notification_queue_bindings_default_queue(
|
||||
self, pika_incoming_message_mock):
|
||||
poller = pika_poller.NotificationPikaPoller(
|
||||
self._pika_engine, self._target_and_priorities, None,
|
||||
self._prefetch_count,
|
||||
self._pika_engine, self._target_and_priorities,
|
||||
self._prefetch_count, None
|
||||
)
|
||||
self._poller_connection_mock.process_data_events.side_effect = (
|
||||
lambda time_limit: poller._on_message_with_ack_callback(
|
||||
@ -479,7 +479,7 @@ class NotificationPikaPollerTestCase(unittest.TestCase):
|
||||
self, pika_incoming_message_mock):
|
||||
poller = pika_poller.NotificationPikaPoller(
|
||||
self._pika_engine, self._target_and_priorities,
|
||||
"custom_queue_name", self._prefetch_count
|
||||
self._prefetch_count, "custom_queue_name"
|
||||
)
|
||||
self._poller_connection_mock.process_data_events.side_effect = (
|
||||
lambda time_limit: poller._on_message_with_ack_callback(
|
||||
|
Loading…
Reference in New Issue
Block a user