From fda27b0899e6304b3eaa39fd8e5463802ce14f46 Mon Sep 17 00:00:00 2001 From: Oleksii Zamiatin Date: Tue, 26 Jan 2016 17:13:16 +0200 Subject: [PATCH] [zmq] Support transport URL Opposite to central-broker transport in zmq we don't have such, but we can use transport URL to pass credentials to redis (or other registry) for example. Closes-Bug #1538095 Change-Id: I58069a20c129bab4a25fd17add9eb9c428fe3ff3 --- oslo_messaging/_drivers/impl_zmq.py | 25 +++++- .../_drivers/zmq_driver/matchmaker/base.py | 3 +- .../zmq_driver/matchmaker/matchmaker_redis.py | 34 ++++++-- .../drivers/zmq/test_zmq_transport_url.py | 87 +++++++++++++++++++ 4 files changed, 137 insertions(+), 12 deletions(-) create mode 100644 oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 8c1a836ba..5f3455f0f 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import logging import os import socket import threading @@ -29,6 +30,10 @@ from oslo_messaging._i18n import _LE RPCException = rpc_common.RPCException +_MATCHMAKER_BACKENDS = ('redis', 'dummy') +_MATCHMAKER_DEFAULT = 'redis' +LOG = logging.getLogger(__name__) + zmq_opts = [ cfg.StrOpt('rpc_zmq_bind_address', default='*', @@ -37,7 +42,8 @@ zmq_opts = [ 'The "host" option should point or resolve to this ' 'address.'), - cfg.StrOpt('rpc_zmq_matchmaker', default='redis', + cfg.StrOpt('rpc_zmq_matchmaker', default=_MATCHMAKER_DEFAULT, + choices=_MATCHMAKER_BACKENDS, help='MatchMaker driver.'), cfg.StrOpt('rpc_zmq_concurrency', default='eventlet', @@ -167,8 +173,8 @@ class ZmqDriver(base.BaseDriver): self.matchmaker = driver.DriverManager( 'oslo.messaging.zmq.matchmaker', - self.conf.rpc_zmq_matchmaker, - ).driver(self.conf) + self.get_matchmaker_backend(url), + ).driver(self.conf, url=url) self.client = LazyDriverItem( zmq_client.ZmqClient, self.conf, self.matchmaker, @@ -181,6 +187,19 @@ class ZmqDriver(base.BaseDriver): super(ZmqDriver, self).__init__(conf, url, default_exchange, allowed_remote_exmods) + def get_matchmaker_backend(self, url): + zmq_transport, p, matchmaker_backend = url.transport.partition('+') + assert zmq_transport == 'zmq', "Needs to be zmq for this transport!" + if not matchmaker_backend: + return self.conf.rpc_zmq_matchmaker + elif matchmaker_backend not in _MATCHMAKER_BACKENDS: + raise rpc_common.RPCException( + _LE("Incorrect matchmaker backend name %(backend_name)s!" + "Available names are: %(available_names)s") % + {"backend_name": matchmaker_backend, + "available_names": _MATCHMAKER_BACKENDS}) + return matchmaker_backend + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, retry=None): """Send RPC message to server diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py index 8e23d7afa..cc99aed22 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py @@ -23,8 +23,9 @@ from oslo_messaging._drivers.zmq_driver import zmq_address class MatchMakerBase(object): def __init__(self, conf, *args, **kwargs): - super(MatchMakerBase, self).__init__(*args, **kwargs) + super(MatchMakerBase, self).__init__() self.conf = conf + self.url = kwargs.get('url') @abc.abstractmethod def register_publisher(self, hostname): diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py index 970a155ef..1ef48ac19 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py @@ -89,18 +89,18 @@ class RedisMatchMaker(base.MatchMakerBase): super(RedisMatchMaker, self).__init__(conf, *args, **kwargs) self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis") - if not self.conf.matchmaker_redis.sentinel_hosts: + self.sentinel_hosts = self._extract_sentinel_options() + if not self.sentinel_hosts: + self.standalone_redis = self._extract_standalone_redis_options() self._redis = redis.StrictRedis( - host=self.conf.matchmaker_redis.host, - port=self.conf.matchmaker_redis.port, - password=self.conf.matchmaker_redis.password, + host=self.standalone_redis["host"], + port=self.standalone_redis["port"], + password=self.standalone_redis["password"] ) else: socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000. - s = self.conf.matchmaker_redis.sentinel_hosts - sentinel_hosts = [tuple(i.split(":")) for i in s] sentinel = redis.sentinel.Sentinel( - sentinels=sentinel_hosts, + sentinels=self.sentinel_hosts, socket_timeout=socket_timeout ) @@ -108,9 +108,27 @@ class RedisMatchMaker(base.MatchMakerBase): self.conf.matchmaker_redis.sentinel_group_name, socket_timeout=socket_timeout ) - apply_retrying(self, self.conf) + def _extract_sentinel_options(self): + if self.url and self.url.hosts: + if len(self.url.hosts) > 1: + return [(host.hostname, host.port) for host in self.url.hosts] + elif self.conf.matchmaker_redis.sentinel_hosts: + s = self.conf.matchmaker_redis.sentinel_hosts + return [tuple(i.split(":")) for i in s] + + def _extract_standalone_redis_options(self): + if self.url and self.url.hosts: + redis_host = self.url.hosts[0] + return {"host": redis_host.hostname, + "port": redis_host.port, + "password": redis_host.password} + else: + return {"host": self.conf.matchmaker_redis.host, + "port": self.conf.matchmaker_redis.port, + "password": self.conf.matchmaker_redis.password} + def register_publisher(self, hostname): host_str = ",".join(hostname) if host_str not in self._get_hosts_by_key(_PUBLISHERS_KEY): diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py b/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py new file mode 100644 index 000000000..f80ee912f --- /dev/null +++ b/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py @@ -0,0 +1,87 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +import oslo_messaging +from oslo_messaging._drivers import common +from oslo_messaging._drivers.zmq_driver.matchmaker.base import DummyMatchMaker +from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging.tests import utils as test_utils + + +zmq = zmq_async.import_zmq() + + +class TestZmqTransportUrl(test_utils.BaseTestCase): + + @testtools.skipIf(zmq is None, "zmq not available") + def setUp(self): + super(TestZmqTransportUrl, self).setUp() + + def setup_url(self, url): + transport = oslo_messaging.get_transport(self.conf, url) + self.addCleanup(transport.cleanup) + driver = transport._driver + return driver, url + + def test_empty_url(self): + driver, url = self.setup_url("zmq:///") + self.assertIs(matchmaker_redis.RedisMatchMaker, + driver.matchmaker.__class__) + self.assertEqual('zmq', driver.matchmaker.url.transport) + + def test_error_name(self): + self.assertRaises(common.RPCException, self.setup_url, "zmq+error:///") + + def test_dummy_url(self): + driver, url = self.setup_url("zmq+dummy:///") + self.assertIs(DummyMatchMaker, + driver.matchmaker.__class__) + self.assertEqual('zmq+dummy', driver.matchmaker.url.transport) + + def test_redis_url(self): + driver, url = self.setup_url("zmq+redis:///") + self.assertIs(matchmaker_redis.RedisMatchMaker, + driver.matchmaker.__class__) + self.assertEqual('zmq+redis', driver.matchmaker.url.transport) + + def test_redis_url_no_creds(self): + driver, url = self.setup_url("zmq+redis://host:65123/") + self.assertIs(matchmaker_redis.RedisMatchMaker, + driver.matchmaker.__class__) + self.assertEqual('zmq+redis', driver.matchmaker.url.transport) + self.assertEqual("host", driver.matchmaker.standalone_redis["host"]) + self.assertEqual(65123, driver.matchmaker.standalone_redis["port"]) + + def test_redis_url_no_port(self): + driver, url = self.setup_url("zmq+redis://:p12@host:65123/") + self.assertIs(matchmaker_redis.RedisMatchMaker, + driver.matchmaker.__class__) + self.assertEqual('zmq+redis', driver.matchmaker.url.transport) + self.assertEqual("host", driver.matchmaker.standalone_redis["host"]) + self.assertEqual(65123, driver.matchmaker.standalone_redis["port"]) + self.assertEqual("p12", driver.matchmaker.standalone_redis["password"]) + + def test_sentinel_multiple_hosts_url(self): + driver, url = self.setup_url( + "zmq+redis://sentinel1:20001,sentinel2:20001,sentinel3:20001/") + self.assertIs(matchmaker_redis.RedisMatchMaker, + driver.matchmaker.__class__) + self.assertEqual('zmq+redis', driver.matchmaker.url.transport) + self.assertEqual(3, len(driver.matchmaker.sentinel_hosts)) + expected = [("sentinel1", 20001), ("sentinel2", 20001), + ("sentinel3", 20001)] + self.assertEqual(expected, driver.matchmaker.sentinel_hosts)