Move zmq driver options into its own group
ZeroMQ driver options are current stored into the DEFAULT group. This change makes the zmq configuration clearer by putting its options into oslo_messaging_zmq group. Change-Id: Ia00fda005b1664750d2646f8c82ebdf295b156fb Closes-bug: #1417040 Co-Authored-By: Oleksii Zamiatin <ozamiatin@mirantis.com>
This commit is contained in:
parent
b259f88b09
commit
7c5d039fd3
@ -85,12 +85,14 @@ Configuration
|
||||
Enabling (mandatory)
|
||||
--------------------
|
||||
|
||||
To enable the driver, in the section [DEFAULT] of the conf file,
|
||||
the 'rpc_backend' flag must be set to 'zmq' and the 'rpc_zmq_host' flag
|
||||
To enable the driver the 'transport_url' option must be set to 'zmq://'
|
||||
in the section [DEFAULT] of the conf file, the 'rpc_zmq_host' flag
|
||||
must be set to the hostname of the current node. ::
|
||||
|
||||
[DEFAULT]
|
||||
rpc_backend = zmq
|
||||
transport_url = "zmq://"
|
||||
|
||||
[oslo_messaging_zmq]
|
||||
rpc_zmq_host = {hostname}
|
||||
|
||||
|
||||
@ -110,27 +112,17 @@ RedisMatchMaker: loads the hash table from a remote Redis server, supports
|
||||
dynamic host/topic registrations, host expiration, and hooks for consuming
|
||||
applications to acknowledge or neg-acknowledge topic.host service availability.
|
||||
|
||||
To set the MatchMaker class, use option 'rpc_zmq_matchmaker' in [DEFAULT]. ::
|
||||
For ZeroMQ driver Redis is configured in transport_url also. For using Redis
|
||||
specify the URL as follows::
|
||||
|
||||
rpc_zmq_matchmaker = dummy
|
||||
|
||||
or::
|
||||
|
||||
rpc_zmq_matchmaker = redis
|
||||
|
||||
To specify the Redis server for RedisMatchMaker, use options in
|
||||
[matchmaker_redis] of each project. ::
|
||||
|
||||
[matchmaker_redis]
|
||||
host = 127.0.0.1
|
||||
port = 6379
|
||||
[DEFAULT]
|
||||
transport_url = "zmq+redis://127.0.0.1:6379"
|
||||
|
||||
In order to cleanup redis storage from expired records (e.g. target listener
|
||||
goes down) TTL may be applied for keys. Configure 'zmq_target_expire' option
|
||||
which is 120 (seconds) by default. The option is related not specifically to
|
||||
redis so it is also defined in [DEFAULT] section. If option value is <= 0
|
||||
then keys don't expire and live forever in the storage.
|
||||
|
||||
redis so it is also defined in [oslo_messaging_zmq] section. If option value
|
||||
is <= 0 then keys don't expire and live forever in the storage.
|
||||
|
||||
MatchMaker Data Source (mandatory)
|
||||
----------------------------------
|
||||
@ -159,11 +151,10 @@ we use Sentinel solution and redis master-slave-slave configuration (if we have
|
||||
3 controllers and run Redis on each of them).
|
||||
|
||||
To deploy redis with HA follow the `sentinel-install`_ instructions. From the
|
||||
messaging driver's side you will need to setup following configuration which
|
||||
is different from a single-node redis deployment ::
|
||||
messaging driver's side you will need to setup following configuration ::
|
||||
|
||||
[matchmaker_redis]
|
||||
sentinel_hosts=host1:26379, host2:26379, host3:26379
|
||||
[DEFAULT]
|
||||
transport_url = "zmq+redis://host1:26379,host2:26379,host3:26379"
|
||||
|
||||
|
||||
Restrict the number of TCP sockets on controller
|
||||
@ -174,7 +165,7 @@ controller node in directly connected configuration. To solve the issue
|
||||
ROUTER proxy may be used.
|
||||
|
||||
In order to configure driver to use ROUTER proxy set up the 'use_router_proxy'
|
||||
option to true in [DEFAULT] section (false is set by default).
|
||||
option to true in [oslo_messaging_zmq] section (false is set by default).
|
||||
|
||||
For example::
|
||||
|
||||
@ -198,7 +189,7 @@ direct DEALER/ROUTER unicast which is possible but less efficient and therefore
|
||||
is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not
|
||||
needed.
|
||||
|
||||
This option can be set in [DEFAULT] section.
|
||||
This option can be set in [oslo_messaging_zmq] section.
|
||||
|
||||
For example::
|
||||
|
||||
@ -218,7 +209,7 @@ All services bind to an IP address or Ethernet adapter. By default, all services
|
||||
bind to '*', effectively binding to 0.0.0.0. This may be changed with the option
|
||||
'rpc_zmq_bind_address' which accepts a wildcard, IP address, or Ethernet adapter.
|
||||
|
||||
This configuration can be set in [DEFAULT] section.
|
||||
This configuration can be set in [oslo_messaging_zmq] section.
|
||||
|
||||
For example::
|
||||
|
||||
|
@ -17,12 +17,13 @@ import logging
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from oslo_messaging._drivers import impl_zmq
|
||||
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_options
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(impl_zmq.zmq_opts)
|
||||
|
||||
zmq_options.register_opts(CONF)
|
||||
|
||||
opt_group = cfg.OptGroup(name='zmq_proxy_opts',
|
||||
title='ZeroMQ proxy options')
|
||||
|
0
oslo_messaging/_drivers/impl_pika.py
Executable file → Normal file
0
oslo_messaging/_drivers/impl_pika.py
Executable file → Normal file
@ -14,10 +14,8 @@
|
||||
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
from stevedore import driver
|
||||
|
||||
from oslo_messaging._drivers import base
|
||||
@ -25,90 +23,14 @@ from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_client
|
||||
from oslo_messaging._drivers.zmq_driver.server import zmq_server
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_options
|
||||
from oslo_messaging._i18n import _LE
|
||||
from oslo_messaging import server
|
||||
|
||||
|
||||
RPCException = rpc_common.RPCException
|
||||
_MATCHMAKER_BACKENDS = ('redis', 'dummy')
|
||||
_MATCHMAKER_DEFAULT = 'redis'
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
zmq_opts = [
|
||||
cfg.StrOpt('rpc_zmq_bind_address', default='*',
|
||||
help='ZeroMQ bind address. Should be a wildcard (*), '
|
||||
'an ethernet interface, or IP. '
|
||||
'The "host" option should point or resolve to this '
|
||||
'address.'),
|
||||
|
||||
cfg.StrOpt('rpc_zmq_matchmaker', default=_MATCHMAKER_DEFAULT,
|
||||
choices=_MATCHMAKER_BACKENDS,
|
||||
help='MatchMaker driver.'),
|
||||
|
||||
cfg.IntOpt('rpc_zmq_contexts', default=1,
|
||||
help='Number of ZeroMQ contexts, defaults to 1.'),
|
||||
|
||||
cfg.IntOpt('rpc_zmq_topic_backlog',
|
||||
help='Maximum number of ingress messages to locally buffer '
|
||||
'per topic. Default is unlimited.'),
|
||||
|
||||
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
|
||||
help='Directory for holding IPC sockets.'),
|
||||
|
||||
cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
|
||||
sample_default='localhost',
|
||||
help='Name of this node. Must be a valid hostname, FQDN, or '
|
||||
'IP address. Must match "host" option, if running Nova.'),
|
||||
|
||||
cfg.IntOpt('rpc_cast_timeout', default=-1,
|
||||
help='Seconds to wait before a cast expires (TTL). '
|
||||
'The default value of -1 specifies an infinite linger '
|
||||
'period. The value of 0 specifies no linger period. '
|
||||
'Pending messages shall be discarded immediately '
|
||||
'when the socket is closed. Only supported by impl_zmq.'),
|
||||
|
||||
cfg.IntOpt('rpc_poll_timeout', default=1,
|
||||
help='The default number of seconds that poll should wait. '
|
||||
'Poll raises timeout exception when timeout expired.'),
|
||||
|
||||
cfg.IntOpt('zmq_target_expire', default=300,
|
||||
help='Expiration timeout in seconds of a name service record '
|
||||
'about existing target ( < 0 means no timeout).'),
|
||||
|
||||
cfg.IntOpt('zmq_target_update', default=180,
|
||||
help='Update period in seconds of a name service record '
|
||||
'about existing target.'),
|
||||
|
||||
cfg.BoolOpt('use_pub_sub', default=True,
|
||||
help='Use PUB/SUB pattern for fanout methods. '
|
||||
'PUB/SUB always uses proxy.'),
|
||||
|
||||
cfg.BoolOpt('use_router_proxy', default=True,
|
||||
help='Use ROUTER remote proxy.'),
|
||||
|
||||
cfg.PortOpt('rpc_zmq_min_port',
|
||||
default=49153,
|
||||
help='Minimal port number for random ports range.'),
|
||||
|
||||
cfg.IntOpt('rpc_zmq_max_port',
|
||||
min=1,
|
||||
max=65536,
|
||||
default=65536,
|
||||
help='Maximal port number for random ports range.'),
|
||||
|
||||
cfg.IntOpt('rpc_zmq_bind_port_retries',
|
||||
default=100,
|
||||
help='Number of retries to find free port number before '
|
||||
'fail with ZMQBindError.'),
|
||||
|
||||
cfg.StrOpt('rpc_zmq_serialization', default='json',
|
||||
choices=('json', 'msgpack'),
|
||||
help='Default serialization mechanism for '
|
||||
'serializing/deserializing outgoing/incoming messages')
|
||||
]
|
||||
|
||||
|
||||
class LazyDriverItem(object):
|
||||
|
||||
def __init__(self, item_cls, *args, **kwargs):
|
||||
@ -174,9 +96,7 @@ class ZmqDriver(base.BaseDriver):
|
||||
if zmq is None:
|
||||
raise ImportError(_LE("ZeroMQ is not available!"))
|
||||
|
||||
conf.register_opts(zmq_opts)
|
||||
conf.register_opts(server._pool_opts)
|
||||
conf.register_opts(base.base_opts)
|
||||
zmq_options.register_opts(conf)
|
||||
self.conf = conf
|
||||
self.allowed_remote_exmods = allowed_remote_exmods
|
||||
|
||||
@ -186,9 +106,11 @@ class ZmqDriver(base.BaseDriver):
|
||||
).driver(self.conf, url=url)
|
||||
|
||||
client_cls = zmq_client.ZmqClientProxy
|
||||
if conf.use_pub_sub and not conf.use_router_proxy:
|
||||
if conf.oslo_messaging_zmq.use_pub_sub and not \
|
||||
conf.oslo_messaging_zmq.use_router_proxy:
|
||||
client_cls = zmq_client.ZmqClientMixDirectPubSub
|
||||
elif not conf.use_pub_sub and not conf.use_router_proxy:
|
||||
elif not conf.oslo_messaging_zmq.use_pub_sub and not \
|
||||
conf.oslo_messaging_zmq.use_router_proxy:
|
||||
client_cls = zmq_client.ZmqClientDirect
|
||||
|
||||
self.client = LazyDriverItem(
|
||||
@ -206,13 +128,13 @@ class ZmqDriver(base.BaseDriver):
|
||||
zmq_transport, p, matchmaker_backend = url.transport.partition('+')
|
||||
assert zmq_transport == 'zmq', "Needs to be zmq for this transport!"
|
||||
if not matchmaker_backend:
|
||||
return self.conf.rpc_zmq_matchmaker
|
||||
elif matchmaker_backend not in _MATCHMAKER_BACKENDS:
|
||||
return self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker
|
||||
elif matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS:
|
||||
raise rpc_common.RPCException(
|
||||
_LE("Incorrect matchmaker backend name %(backend_name)s!"
|
||||
"Available names are: %(available_names)s") %
|
||||
{"backend_name": matchmaker_backend,
|
||||
"available_names": _MATCHMAKER_BACKENDS})
|
||||
"available_names": zmq_options.MATCHMAKER_BACKENDS})
|
||||
return matchmaker_backend
|
||||
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
||||
|
@ -63,7 +63,7 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
else:
|
||||
return \
|
||||
[zmq_address.target_to_subscribe_filter(request.target)] \
|
||||
if self.conf.use_pub_sub else \
|
||||
if self.conf.oslo_messaging_zmq.use_pub_sub else \
|
||||
self.routing_table.get_all_hosts(request.target)
|
||||
except retrying.RetryError:
|
||||
return []
|
||||
|
@ -39,7 +39,8 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
|
||||
|
||||
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
|
||||
|
||||
if conf.use_router_proxy or not conf.use_pub_sub:
|
||||
if conf.oslo_messaging_zmq.use_router_proxy or not \
|
||||
conf.oslo_messaging_zmq.use_pub_sub:
|
||||
raise WrongClientException()
|
||||
|
||||
publisher_direct = \
|
||||
@ -68,7 +69,8 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
|
||||
|
||||
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
|
||||
|
||||
if conf.use_pub_sub or conf.use_router_proxy:
|
||||
if conf.oslo_messaging_zmq.use_pub_sub or \
|
||||
conf.oslo_messaging_zmq.use_router_proxy:
|
||||
raise WrongClientException()
|
||||
|
||||
publisher = \
|
||||
@ -92,7 +94,7 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase):
|
||||
|
||||
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
|
||||
|
||||
if not conf.use_router_proxy:
|
||||
if not conf.oslo_messaging_zmq.use_router_proxy:
|
||||
raise WrongClientException()
|
||||
|
||||
publisher = \
|
||||
|
@ -87,7 +87,8 @@ class ReceiverBase(object):
|
||||
return self._requests.pop((message_id, message_type), None)
|
||||
|
||||
def _run_loop(self):
|
||||
data, socket = self._poller.poll(timeout=self.conf.rpc_poll_timeout)
|
||||
data, socket = self._poller.poll(
|
||||
timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout)
|
||||
if data is None:
|
||||
return
|
||||
reply_id, message_type, message_id, response = data
|
||||
|
@ -46,7 +46,8 @@ class RoutingTable(object):
|
||||
return host
|
||||
|
||||
def _is_tm_expired(self, tm):
|
||||
return 0 <= self.conf.zmq_target_expire <= time.time() - tm
|
||||
return 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
|
||||
<= time.time() - tm
|
||||
|
||||
def _update_routing_table(self, target):
|
||||
routing_record = self.routing_table.get(str(target))
|
||||
|
@ -57,7 +57,8 @@ class SocketsManager(object):
|
||||
def _check_for_new_hosts(self, target):
|
||||
key = self._key_from_target(target)
|
||||
socket, tm = self.outbound_sockets[key]
|
||||
if 0 <= self.conf.zmq_target_expire <= time.time() - tm:
|
||||
if 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
|
||||
<= time.time() - tm:
|
||||
self._get_hosts_and_connect(socket, target)
|
||||
return socket
|
||||
|
||||
|
@ -85,7 +85,7 @@ class ZmqProxy(object):
|
||||
self.conf = conf
|
||||
self.matchmaker = driver.DriverManager(
|
||||
'oslo.messaging.zmq.matchmaker',
|
||||
self.conf.rpc_zmq_matchmaker,
|
||||
self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker,
|
||||
).driver(self.conf)
|
||||
self.context = zmq.Context()
|
||||
self.proxy = proxy_cls(conf, self.context, self.matchmaker)
|
||||
|
@ -68,8 +68,9 @@ class UniversalQueueProxy(object):
|
||||
return
|
||||
|
||||
msg_type = message[0]
|
||||
if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE,
|
||||
zmq_names.NOTIFY_TYPE):
|
||||
if self.conf.oslo_messaging_zmq.use_pub_sub and \
|
||||
msg_type in (zmq_names.CAST_FANOUT_TYPE,
|
||||
zmq_names.NOTIFY_TYPE):
|
||||
self.pub_publisher.send_request(message)
|
||||
else:
|
||||
self._redirect_message(self.be_router_socket.handle
|
||||
@ -133,12 +134,13 @@ class RouterUpdater(zmq_updater.UpdaterBase):
|
||||
def _update_records(self):
|
||||
self.matchmaker.register_publisher(
|
||||
(self.publisher_address, self.fe_router_address),
|
||||
expire=self.conf.zmq_target_expire)
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Update PUB publisher"),
|
||||
{"pub": self.publisher_address,
|
||||
"router": self.fe_router_address})
|
||||
self.matchmaker.register_router(self.be_router_address,
|
||||
expire=self.conf.zmq_target_expire)
|
||||
self.matchmaker.register_router(
|
||||
self.be_router_address,
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"),
|
||||
{"router": self.be_router_address})
|
||||
|
||||
|
@ -79,8 +79,8 @@ class SingleSocketConsumer(ConsumerBase):
|
||||
{"stype": zmq_names.socket_type_str(socket_type),
|
||||
"addr": socket.bind_address,
|
||||
"port": socket.port})
|
||||
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
|
||||
socket.port)
|
||||
self.host = zmq_address.combine_address(
|
||||
self.conf.oslo_messaging_zmq.rpc_zmq_host, socket.port)
|
||||
self.poller.register(socket, self.receive_message)
|
||||
return socket
|
||||
except zmq.ZMQError as e:
|
||||
@ -119,7 +119,7 @@ class TargetUpdater(zmq_updater.UpdaterBase):
|
||||
self.matchmaker.register(
|
||||
self.target, self.host,
|
||||
zmq_names.socket_type_str(self.socket_type),
|
||||
expire=self.conf.zmq_target_expire)
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
|
||||
def stop(self):
|
||||
super(TargetUpdater, self).stop()
|
||||
|
@ -41,11 +41,14 @@ class ZmqServer(base.PollStyleListener):
|
||||
self.poller = poller or zmq_async.get_poller()
|
||||
|
||||
self.router_consumer = zmq_router_consumer.RouterConsumer(
|
||||
conf, self.poller, self) if not conf.use_router_proxy else None
|
||||
conf, self.poller, self) \
|
||||
if not conf.oslo_messaging_zmq.use_router_proxy else None
|
||||
self.dealer_consumer = zmq_dealer_consumer.DealerConsumer(
|
||||
conf, self.poller, self) if conf.use_router_proxy else None
|
||||
conf, self.poller, self) \
|
||||
if conf.oslo_messaging_zmq.use_router_proxy else None
|
||||
self.sub_consumer = zmq_sub_consumer.SubConsumer(
|
||||
conf, self.poller, self) if conf.use_pub_sub else None
|
||||
conf, self.poller, self) \
|
||||
if conf.oslo_messaging_zmq.use_pub_sub else None
|
||||
|
||||
self.consumers = []
|
||||
if self.router_consumer is not None:
|
||||
@ -58,7 +61,7 @@ class ZmqServer(base.PollStyleListener):
|
||||
@base.batch_poll_helper
|
||||
def poll(self, timeout=None):
|
||||
message, socket = self.poller.poll(
|
||||
timeout or self.conf.rpc_poll_timeout)
|
||||
timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout)
|
||||
return message
|
||||
|
||||
def stop(self):
|
||||
@ -94,7 +97,7 @@ class ZmqNotificationServer(base.PollStyleListener):
|
||||
@base.batch_poll_helper
|
||||
def poll(self, timeout=None):
|
||||
message, socket = self.poller.poll(
|
||||
timeout or self.conf.rpc_poll_timeout)
|
||||
timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout)
|
||||
return message
|
||||
|
||||
def stop(self):
|
||||
|
@ -24,11 +24,11 @@ def get_tcp_direct_address(host):
|
||||
|
||||
|
||||
def get_tcp_random_address(conf):
|
||||
return "tcp://%s" % conf.rpc_zmq_bind_address
|
||||
return "tcp://%s" % conf.oslo_messaging_zmq.rpc_zmq_bind_address
|
||||
|
||||
|
||||
def get_broker_address(conf):
|
||||
return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir
|
||||
return "ipc://%s/zmq-broker" % conf.oslo_messaging_zmq.rpc_zmq_ipc_dir
|
||||
|
||||
|
||||
def prefix_str(key, listener_type):
|
||||
|
122
oslo_messaging/_drivers/zmq_driver/zmq_options.py
Normal file
122
oslo_messaging/_drivers/zmq_driver/zmq_options.py
Normal file
@ -0,0 +1,122 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import socket
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging import server
|
||||
|
||||
|
||||
MATCHMAKER_BACKENDS = ('redis', 'dummy')
|
||||
MATCHMAKER_DEFAULT = 'redis'
|
||||
|
||||
|
||||
zmq_opts = [
|
||||
cfg.StrOpt('rpc_zmq_bind_address', default='*',
|
||||
deprecated_group='DEFAULT',
|
||||
help='ZeroMQ bind address. Should be a wildcard (*), '
|
||||
'an ethernet interface, or IP. '
|
||||
'The "host" option should point or resolve to this '
|
||||
'address.'),
|
||||
|
||||
cfg.StrOpt('rpc_zmq_matchmaker', default=MATCHMAKER_DEFAULT,
|
||||
choices=MATCHMAKER_BACKENDS,
|
||||
deprecated_group='DEFAULT',
|
||||
help='MatchMaker driver.'),
|
||||
|
||||
cfg.IntOpt('rpc_zmq_contexts', default=1,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Number of ZeroMQ contexts, defaults to 1.'),
|
||||
|
||||
cfg.IntOpt('rpc_zmq_topic_backlog',
|
||||
deprecated_group='DEFAULT',
|
||||
help='Maximum number of ingress messages to locally buffer '
|
||||
'per topic. Default is unlimited.'),
|
||||
|
||||
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
|
||||
deprecated_group='DEFAULT',
|
||||
help='Directory for holding IPC sockets.'),
|
||||
|
||||
cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
|
||||
sample_default='localhost',
|
||||
deprecated_group='DEFAULT',
|
||||
help='Name of this node. Must be a valid hostname, FQDN, or '
|
||||
'IP address. Must match "host" option, if running Nova.'),
|
||||
|
||||
cfg.IntOpt('rpc_cast_timeout', default=-1,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Seconds to wait before a cast expires (TTL). '
|
||||
'The default value of -1 specifies an infinite linger '
|
||||
'period. The value of 0 specifies no linger period. '
|
||||
'Pending messages shall be discarded immediately '
|
||||
'when the socket is closed. Only supported by impl_zmq.'),
|
||||
|
||||
cfg.IntOpt('rpc_poll_timeout', default=1,
|
||||
deprecated_group='DEFAULT',
|
||||
help='The default number of seconds that poll should wait. '
|
||||
'Poll raises timeout exception when timeout expired.'),
|
||||
|
||||
cfg.IntOpt('zmq_target_expire', default=300,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Expiration timeout in seconds of a name service record '
|
||||
'about existing target ( < 0 means no timeout).'),
|
||||
|
||||
cfg.IntOpt('zmq_target_update', default=180,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Update period in seconds of a name service record '
|
||||
'about existing target.'),
|
||||
|
||||
cfg.BoolOpt('use_pub_sub', default=True,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Use PUB/SUB pattern for fanout methods. '
|
||||
'PUB/SUB always uses proxy.'),
|
||||
|
||||
cfg.BoolOpt('use_router_proxy', default=True,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Use ROUTER remote proxy.'),
|
||||
|
||||
cfg.PortOpt('rpc_zmq_min_port',
|
||||
default=49153,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Minimal port number for random ports range.'),
|
||||
|
||||
cfg.IntOpt('rpc_zmq_max_port',
|
||||
min=1,
|
||||
max=65536,
|
||||
default=65536,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Maximal port number for random ports range.'),
|
||||
|
||||
cfg.IntOpt('rpc_zmq_bind_port_retries',
|
||||
default=100,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Number of retries to find free port number before '
|
||||
'fail with ZMQBindError.'),
|
||||
|
||||
cfg.StrOpt('rpc_zmq_serialization', default='json',
|
||||
choices=('json', 'msgpack'),
|
||||
deprecated_group='DEFAULT',
|
||||
help='Default serialization mechanism for '
|
||||
'serializing/deserializing outgoing/incoming messages')
|
||||
]
|
||||
|
||||
|
||||
def register_opts(conf):
|
||||
opt_group = cfg.OptGroup(name='oslo_messaging_zmq',
|
||||
title='ZeroMQ driver options')
|
||||
conf.register_opts(zmq_opts, group=opt_group)
|
||||
conf.register_opts(server._pool_opts)
|
||||
conf.register_opts(base.base_opts)
|
@ -47,8 +47,9 @@ class ZmqSocket(object):
|
||||
self.handle.set_hwm(high_watermark)
|
||||
|
||||
self.close_linger = -1
|
||||
if self.conf.rpc_cast_timeout > 0:
|
||||
self.close_linger = self.conf.rpc_cast_timeout * 1000
|
||||
if self.conf.oslo_messaging_zmq.rpc_cast_timeout > 0:
|
||||
self.close_linger = \
|
||||
self.conf.oslo_messaging_zmq.rpc_cast_timeout * 1000
|
||||
self.handle.setsockopt(zmq.LINGER, self.close_linger)
|
||||
# Put messages to only connected queues
|
||||
self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
|
||||
@ -96,8 +97,9 @@ class ZmqSocket(object):
|
||||
self.handle.send_multipart(*args, **kwargs)
|
||||
|
||||
def send_dumped(self, obj, *args, **kwargs):
|
||||
serialization = kwargs.pop('serialization',
|
||||
self.conf.rpc_zmq_serialization)
|
||||
serialization = kwargs.pop(
|
||||
'serialization',
|
||||
self.conf.oslo_messaging_zmq.rpc_zmq_serialization)
|
||||
serializer = self._get_serializer(serialization)
|
||||
s = serializer.dump_as_bytes(obj)
|
||||
self.handle.send(s, *args, **kwargs)
|
||||
@ -118,8 +120,9 @@ class ZmqSocket(object):
|
||||
return self.handle.recv_multipart(*args, **kwargs)
|
||||
|
||||
def recv_loaded(self, *args, **kwargs):
|
||||
serialization = kwargs.pop('serialization',
|
||||
self.conf.rpc_zmq_serialization)
|
||||
serialization = kwargs.pop(
|
||||
'serialization',
|
||||
self.conf.oslo_messaging_zmq.rpc_zmq_serialization)
|
||||
serializer = self._get_serializer(serialization)
|
||||
s = self.handle.recv(*args, **kwargs)
|
||||
obj = serializer.load_from_bytes(s)
|
||||
@ -170,13 +173,13 @@ class ZmqRandomPortSocket(ZmqSocket):
|
||||
high_watermark=high_watermark)
|
||||
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
|
||||
if host is None:
|
||||
host = conf.rpc_zmq_host
|
||||
host = conf.oslo_messaging_zmq.rpc_zmq_host
|
||||
try:
|
||||
self.port = self.handle.bind_to_random_port(
|
||||
self.bind_address,
|
||||
min_port=conf.rpc_zmq_min_port,
|
||||
max_port=conf.rpc_zmq_max_port,
|
||||
max_tries=conf.rpc_zmq_bind_port_retries)
|
||||
min_port=conf.oslo_messaging_zmq.rpc_zmq_min_port,
|
||||
max_port=conf.oslo_messaging_zmq.rpc_zmq_max_port,
|
||||
max_tries=conf.oslo_messaging_zmq.rpc_zmq_bind_port_retries)
|
||||
self.connect_address = zmq_address.combine_address(host, self.port)
|
||||
except zmq.ZMQBindError:
|
||||
LOG.error(_LE("Random ports range exceeded!"))
|
||||
@ -192,7 +195,8 @@ class ZmqFixedPortSocket(ZmqSocket):
|
||||
high_watermark=high_watermark)
|
||||
self.connect_address = zmq_address.combine_address(host, port)
|
||||
self.bind_address = zmq_address.get_tcp_direct_address(
|
||||
zmq_address.combine_address(conf.rpc_zmq_bind_address, port))
|
||||
zmq_address.combine_address(
|
||||
conf.oslo_messaging_zmq.rpc_zmq_bind_address, port))
|
||||
self.host = host
|
||||
self.port = port
|
||||
|
||||
|
@ -41,7 +41,7 @@ class UpdaterBase(object):
|
||||
|
||||
def _update_loop(self):
|
||||
self.update_method()
|
||||
time.sleep(self.conf.zmq_target_update)
|
||||
time.sleep(self.conf.oslo_messaging_zmq.zmq_target_update)
|
||||
|
||||
def cleanup(self):
|
||||
self.executor.stop()
|
||||
|
@ -58,7 +58,8 @@ class ConfFixture(fixtures.Fixture):
|
||||
'oslo_messaging._drivers.amqp1_driver.opts',
|
||||
'amqp1_opts', 'oslo_messaging_amqp')
|
||||
_import_opts(self.conf,
|
||||
'oslo_messaging._drivers.impl_zmq', 'zmq_opts')
|
||||
'oslo_messaging._drivers.zmq_driver.zmq_options',
|
||||
'zmq_opts', 'oslo_messaging_zmq')
|
||||
_import_opts(self.conf,
|
||||
'oslo_messaging._drivers.zmq_driver.'
|
||||
'matchmaker.matchmaker_redis',
|
||||
|
@ -25,7 +25,7 @@ from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts
|
||||
from oslo_messaging._drivers import base as drivers_base
|
||||
from oslo_messaging._drivers import impl_pika
|
||||
from oslo_messaging._drivers import impl_rabbit
|
||||
from oslo_messaging._drivers import impl_zmq
|
||||
from oslo_messaging._drivers.impl_zmq import zmq_options
|
||||
from oslo_messaging._drivers.pika_driver import pika_connection_factory
|
||||
from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis
|
||||
from oslo_messaging.notify import notifier
|
||||
@ -36,7 +36,7 @@ from oslo_messaging import transport
|
||||
|
||||
_global_opt_lists = [
|
||||
drivers_base.base_opts,
|
||||
impl_zmq.zmq_opts,
|
||||
zmq_options.zmq_opts,
|
||||
server._pool_opts,
|
||||
client._client_opts,
|
||||
transport._transport_opts,
|
||||
@ -45,6 +45,7 @@ _global_opt_lists = [
|
||||
_opts = [
|
||||
(None, list(itertools.chain(*_global_opt_lists))),
|
||||
('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts),
|
||||
('oslo_messaging_zmq', zmq_options.zmq_opts),
|
||||
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
|
||||
('oslo_messaging_notifications', notifier._notifier_opts),
|
||||
('oslo_messaging_rabbit', list(
|
||||
|
@ -35,7 +35,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
|
||||
# Set config values
|
||||
kwargs = {'rpc_zmq_min_port': 5555,
|
||||
'rpc_zmq_max_port': 5560}
|
||||
self.config(**kwargs)
|
||||
self.config(group='oslo_messaging_zmq', **kwargs)
|
||||
|
||||
def test_ports_range(self):
|
||||
listeners = []
|
||||
|
@ -54,7 +54,7 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
|
||||
|
||||
kwargs = {'use_pub_sub': True,
|
||||
'rpc_zmq_serialization': self.serialization}
|
||||
self.config(**kwargs)
|
||||
self.config(group='oslo_messaging_zmq', **kwargs)
|
||||
|
||||
self.config(host="127.0.0.1", group="zmq_proxy_opts")
|
||||
self.config(publisher_port="0", group="zmq_proxy_opts")
|
||||
|
@ -20,6 +20,7 @@ import testtools
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_options
|
||||
from oslo_messaging._i18n import _LE
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
@ -71,17 +72,18 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(ZmqBaseTestCase, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
zmq_options.register_opts(self.conf)
|
||||
|
||||
# Set config values
|
||||
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
||||
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
|
||||
'rpc_zmq_host': '127.0.0.1',
|
||||
'rpc_response_timeout': 5,
|
||||
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
|
||||
'use_pub_sub': False,
|
||||
'use_router_proxy': False,
|
||||
'rpc_zmq_matchmaker': 'dummy'}
|
||||
self.config(**kwargs)
|
||||
self.config(group='oslo_messaging_zmq', **kwargs)
|
||||
self.config(rpc_response_timeout=5)
|
||||
|
||||
# Get driver
|
||||
transport = oslo_messaging.get_transport(self.conf)
|
||||
|
@ -293,10 +293,12 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
|
||||
|
||||
zmq_matchmaker = os.environ.get('ZMQ_MATCHMAKER')
|
||||
if zmq_matchmaker:
|
||||
self.config(rpc_zmq_matchmaker=zmq_matchmaker)
|
||||
self.config(rpc_zmq_matchmaker=zmq_matchmaker,
|
||||
group="oslo_messaging_zmq")
|
||||
zmq_ipc_dir = os.environ.get('ZMQ_IPC_DIR')
|
||||
if zmq_ipc_dir:
|
||||
self.config(rpc_zmq_ipc_dir=zmq_ipc_dir)
|
||||
self.config(group="oslo_messaging_zmq",
|
||||
rpc_zmq_ipc_dir=zmq_ipc_dir)
|
||||
zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT')
|
||||
if zmq_redis_port:
|
||||
self.config(port=zmq_redis_port, group="matchmaker_redis")
|
||||
@ -304,10 +306,12 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
|
||||
self.config(wait_timeout=1000, group="matchmaker_redis")
|
||||
zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB')
|
||||
if zmq_use_pub_sub:
|
||||
self.config(use_pub_sub=zmq_use_pub_sub)
|
||||
self.config(use_pub_sub=zmq_use_pub_sub,
|
||||
group='oslo_messaging_zmq')
|
||||
zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY')
|
||||
if zmq_use_router_proxy:
|
||||
self.config(use_router_proxy=zmq_use_router_proxy)
|
||||
self.config(use_router_proxy=zmq_use_router_proxy,
|
||||
group='oslo_messaging_zmq')
|
||||
|
||||
|
||||
class NotificationFixture(fixtures.Fixture):
|
||||
|
@ -70,7 +70,8 @@ def listener_configurer(conf):
|
||||
'%(levelname)-8s %(message)s')
|
||||
h.setFormatter(f)
|
||||
root.addHandler(h)
|
||||
log_path = conf.rpc_zmq_ipc_dir + "/" + "zmq_multiproc.log"
|
||||
log_path = conf.oslo_messaging_zmq.rpc_zmq_ipc_dir + \
|
||||
"/" + "zmq_multiproc.log"
|
||||
file_handler = logging.StreamHandler(open(log_path, 'w'))
|
||||
file_handler.setFormatter(f)
|
||||
root.addHandler(file_handler)
|
||||
|
@ -30,10 +30,10 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase):
|
||||
self.conf.prog = "test_prog"
|
||||
self.conf.project = "test_project"
|
||||
|
||||
kwargs = {'rpc_response_timeout': 30}
|
||||
self.config(**kwargs)
|
||||
self.config(rpc_response_timeout=30)
|
||||
|
||||
log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log"
|
||||
log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir,
|
||||
str(os.getpid()) + ".log")
|
||||
sys.stdout = open(log_path, "w", buffering=0)
|
||||
|
||||
def test_call_server_before_client(self):
|
||||
|
@ -32,11 +32,12 @@ class OptsTestCase(test_utils.BaseTestCase):
|
||||
super(OptsTestCase, self).setUp()
|
||||
|
||||
def _test_list_opts(self, result):
|
||||
self.assertEqual(5, len(result))
|
||||
self.assertEqual(6, len(result))
|
||||
|
||||
groups = [g for (g, l) in result]
|
||||
self.assertIn(None, groups)
|
||||
self.assertIn('matchmaker_redis', groups)
|
||||
self.assertIn('oslo_messaging_zmq', groups)
|
||||
self.assertIn('oslo_messaging_amqp', groups)
|
||||
self.assertIn('oslo_messaging_notifications', groups)
|
||||
self.assertIn('oslo_messaging_rabbit', groups)
|
||||
|
@ -18,6 +18,7 @@ export ZMQ_PROXY_HOST=127.0.0.1
|
||||
cat > ${DATADIR}/zmq.conf <<EOF
|
||||
[DEFAULT]
|
||||
transport_url=${TRANSPORT_URL}
|
||||
[oslo_messaging_zmq]
|
||||
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
|
||||
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
|
||||
use_pub_sub=${ZMQ_USE_PUB_SUB}
|
||||
|
@ -16,6 +16,7 @@ export ZMQ_USE_ROUTER_PROXY=true
|
||||
cat > ${DATADIR}/zmq.conf <<EOF
|
||||
[DEFAULT]
|
||||
transport_url=${TRANSPORT_URL}
|
||||
[oslo_messaging_zmq]
|
||||
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
|
||||
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
|
||||
use_pub_sub=${ZMQ_USE_PUB_SUB}
|
||||
|
@ -16,6 +16,7 @@ export ZMQ_USE_ROUTER_PROXY=false
|
||||
cat > ${DATADIR}/zmq.conf <<EOF
|
||||
[DEFAULT]
|
||||
transport_url=${TRANSPORT_URL}
|
||||
[oslo_messaging_zmq]
|
||||
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
|
||||
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
|
||||
use_pub_sub=${ZMQ_USE_PUB_SUB}
|
||||
|
@ -684,7 +684,7 @@ def main():
|
||||
if args.mode == 'rpc-server':
|
||||
target = messaging.Target(topic=args.topic, server=args.server)
|
||||
if args.url.startswith('zmq'):
|
||||
cfg.CONF.rpc_zmq_matchmaker = "redis"
|
||||
cfg.CONF.oslo_messaging_zmq.rpc_zmq_matchmaker = "redis"
|
||||
|
||||
endpoint = rpc_server(TRANSPORT, target, args.wait_before_answer,
|
||||
args.executor, args.duration)
|
||||
|
Loading…
Reference in New Issue
Block a user