[zmq] Support transport URL
Opposite to central-broker transport in zmq we don't have such, but we can use transport URL to pass credentials to redis (or other registry) for example. Closes-Bug #1538095 Change-Id: I58069a20c129bab4a25fd17add9eb9c428fe3ff3
This commit is contained in:
parent
92c4f76c79
commit
fda27b0899
@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import threading
|
||||
@ -29,6 +30,10 @@ from oslo_messaging._i18n import _LE
|
||||
|
||||
|
||||
RPCException = rpc_common.RPCException
|
||||
_MATCHMAKER_BACKENDS = ('redis', 'dummy')
|
||||
_MATCHMAKER_DEFAULT = 'redis'
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
zmq_opts = [
|
||||
cfg.StrOpt('rpc_zmq_bind_address', default='*',
|
||||
@ -37,7 +42,8 @@ zmq_opts = [
|
||||
'The "host" option should point or resolve to this '
|
||||
'address.'),
|
||||
|
||||
cfg.StrOpt('rpc_zmq_matchmaker', default='redis',
|
||||
cfg.StrOpt('rpc_zmq_matchmaker', default=_MATCHMAKER_DEFAULT,
|
||||
choices=_MATCHMAKER_BACKENDS,
|
||||
help='MatchMaker driver.'),
|
||||
|
||||
cfg.StrOpt('rpc_zmq_concurrency', default='eventlet',
|
||||
@ -167,8 +173,8 @@ class ZmqDriver(base.BaseDriver):
|
||||
|
||||
self.matchmaker = driver.DriverManager(
|
||||
'oslo.messaging.zmq.matchmaker',
|
||||
self.conf.rpc_zmq_matchmaker,
|
||||
).driver(self.conf)
|
||||
self.get_matchmaker_backend(url),
|
||||
).driver(self.conf, url=url)
|
||||
|
||||
self.client = LazyDriverItem(
|
||||
zmq_client.ZmqClient, self.conf, self.matchmaker,
|
||||
@ -181,6 +187,19 @@ class ZmqDriver(base.BaseDriver):
|
||||
super(ZmqDriver, self).__init__(conf, url, default_exchange,
|
||||
allowed_remote_exmods)
|
||||
|
||||
def get_matchmaker_backend(self, url):
|
||||
zmq_transport, p, matchmaker_backend = url.transport.partition('+')
|
||||
assert zmq_transport == 'zmq', "Needs to be zmq for this transport!"
|
||||
if not matchmaker_backend:
|
||||
return self.conf.rpc_zmq_matchmaker
|
||||
elif matchmaker_backend not in _MATCHMAKER_BACKENDS:
|
||||
raise rpc_common.RPCException(
|
||||
_LE("Incorrect matchmaker backend name %(backend_name)s!"
|
||||
"Available names are: %(available_names)s") %
|
||||
{"backend_name": matchmaker_backend,
|
||||
"available_names": _MATCHMAKER_BACKENDS})
|
||||
return matchmaker_backend
|
||||
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
||||
retry=None):
|
||||
"""Send RPC message to server
|
||||
|
@ -23,8 +23,9 @@ from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
class MatchMakerBase(object):
|
||||
|
||||
def __init__(self, conf, *args, **kwargs):
|
||||
super(MatchMakerBase, self).__init__(*args, **kwargs)
|
||||
super(MatchMakerBase, self).__init__()
|
||||
self.conf = conf
|
||||
self.url = kwargs.get('url')
|
||||
|
||||
@abc.abstractmethod
|
||||
def register_publisher(self, hostname):
|
||||
|
@ -89,18 +89,18 @@ class RedisMatchMaker(base.MatchMakerBase):
|
||||
super(RedisMatchMaker, self).__init__(conf, *args, **kwargs)
|
||||
self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis")
|
||||
|
||||
if not self.conf.matchmaker_redis.sentinel_hosts:
|
||||
self.sentinel_hosts = self._extract_sentinel_options()
|
||||
if not self.sentinel_hosts:
|
||||
self.standalone_redis = self._extract_standalone_redis_options()
|
||||
self._redis = redis.StrictRedis(
|
||||
host=self.conf.matchmaker_redis.host,
|
||||
port=self.conf.matchmaker_redis.port,
|
||||
password=self.conf.matchmaker_redis.password,
|
||||
host=self.standalone_redis["host"],
|
||||
port=self.standalone_redis["port"],
|
||||
password=self.standalone_redis["password"]
|
||||
)
|
||||
else:
|
||||
socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000.
|
||||
s = self.conf.matchmaker_redis.sentinel_hosts
|
||||
sentinel_hosts = [tuple(i.split(":")) for i in s]
|
||||
sentinel = redis.sentinel.Sentinel(
|
||||
sentinels=sentinel_hosts,
|
||||
sentinels=self.sentinel_hosts,
|
||||
socket_timeout=socket_timeout
|
||||
)
|
||||
|
||||
@ -108,9 +108,27 @@ class RedisMatchMaker(base.MatchMakerBase):
|
||||
self.conf.matchmaker_redis.sentinel_group_name,
|
||||
socket_timeout=socket_timeout
|
||||
)
|
||||
|
||||
apply_retrying(self, self.conf)
|
||||
|
||||
def _extract_sentinel_options(self):
|
||||
if self.url and self.url.hosts:
|
||||
if len(self.url.hosts) > 1:
|
||||
return [(host.hostname, host.port) for host in self.url.hosts]
|
||||
elif self.conf.matchmaker_redis.sentinel_hosts:
|
||||
s = self.conf.matchmaker_redis.sentinel_hosts
|
||||
return [tuple(i.split(":")) for i in s]
|
||||
|
||||
def _extract_standalone_redis_options(self):
|
||||
if self.url and self.url.hosts:
|
||||
redis_host = self.url.hosts[0]
|
||||
return {"host": redis_host.hostname,
|
||||
"port": redis_host.port,
|
||||
"password": redis_host.password}
|
||||
else:
|
||||
return {"host": self.conf.matchmaker_redis.host,
|
||||
"port": self.conf.matchmaker_redis.port,
|
||||
"password": self.conf.matchmaker_redis.password}
|
||||
|
||||
def register_publisher(self, hostname):
|
||||
host_str = ",".join(hostname)
|
||||
if host_str not in self._get_hosts_by_key(_PUBLISHERS_KEY):
|
||||
|
87
oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py
Normal file
87
oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py
Normal file
@ -0,0 +1,87 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import testtools
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers import common
|
||||
from oslo_messaging._drivers.zmq_driver.matchmaker.base import DummyMatchMaker
|
||||
from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class TestZmqTransportUrl(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqTransportUrl, self).setUp()
|
||||
|
||||
def setup_url(self, url):
|
||||
transport = oslo_messaging.get_transport(self.conf, url)
|
||||
self.addCleanup(transport.cleanup)
|
||||
driver = transport._driver
|
||||
return driver, url
|
||||
|
||||
def test_empty_url(self):
|
||||
driver, url = self.setup_url("zmq:///")
|
||||
self.assertIs(matchmaker_redis.RedisMatchMaker,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq', driver.matchmaker.url.transport)
|
||||
|
||||
def test_error_name(self):
|
||||
self.assertRaises(common.RPCException, self.setup_url, "zmq+error:///")
|
||||
|
||||
def test_dummy_url(self):
|
||||
driver, url = self.setup_url("zmq+dummy:///")
|
||||
self.assertIs(DummyMatchMaker,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq+dummy', driver.matchmaker.url.transport)
|
||||
|
||||
def test_redis_url(self):
|
||||
driver, url = self.setup_url("zmq+redis:///")
|
||||
self.assertIs(matchmaker_redis.RedisMatchMaker,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
|
||||
|
||||
def test_redis_url_no_creds(self):
|
||||
driver, url = self.setup_url("zmq+redis://host:65123/")
|
||||
self.assertIs(matchmaker_redis.RedisMatchMaker,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
|
||||
self.assertEqual("host", driver.matchmaker.standalone_redis["host"])
|
||||
self.assertEqual(65123, driver.matchmaker.standalone_redis["port"])
|
||||
|
||||
def test_redis_url_no_port(self):
|
||||
driver, url = self.setup_url("zmq+redis://:p12@host:65123/")
|
||||
self.assertIs(matchmaker_redis.RedisMatchMaker,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
|
||||
self.assertEqual("host", driver.matchmaker.standalone_redis["host"])
|
||||
self.assertEqual(65123, driver.matchmaker.standalone_redis["port"])
|
||||
self.assertEqual("p12", driver.matchmaker.standalone_redis["password"])
|
||||
|
||||
def test_sentinel_multiple_hosts_url(self):
|
||||
driver, url = self.setup_url(
|
||||
"zmq+redis://sentinel1:20001,sentinel2:20001,sentinel3:20001/")
|
||||
self.assertIs(matchmaker_redis.RedisMatchMaker,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
|
||||
self.assertEqual(3, len(driver.matchmaker.sentinel_hosts))
|
||||
expected = [("sentinel1", 20001), ("sentinel2", 20001),
|
||||
("sentinel3", 20001)]
|
||||
self.assertEqual(expected, driver.matchmaker.sentinel_hosts)
|
Loading…
x
Reference in New Issue
Block a user