diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py index 86c92a32b..3fcada48c 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py @@ -114,18 +114,15 @@ class DealerPublisherDirectStatic(DealerPublisherDirect): conf, matchmaker, zmq.DEALER) def acquire_connection(self, request): + target_key = zmq_address.target_to_key( + request.target, zmq_names.socket_type_str(zmq.ROUTER)) if request.msg_type in zmq_names.MULTISEND_TYPES: hosts = self.routing_table.get_fanout_hosts(request.target) - target_key = zmq_address.prefix_str( - request.target.topic, - zmq_names.socket_type_str(zmq.ROUTER)) return self.fanout_sockets.get_cached_socket(target_key, hosts, immediate=False) else: hosts = self.routing_table.get_all_round_robin_hosts( request.target) - target_key = zmq_address.target_to_key( - request.target, zmq_names.socket_type_str(zmq.ROUTER)) return self.sockets_manager.get_cached_socket(target_key, hosts) def _finally_unregister(self, socket, request): diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py index 03979a5a9..458563314 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py @@ -72,8 +72,8 @@ class RoutingTableAdaptor(object): return target_key def get_fanout_hosts(self, target): - target_key = zmq_address.prefix_str( - target.topic, zmq_names.socket_type_str(self.listener_type)) + target_key = zmq_address.target_to_key( + target, zmq_names.socket_type_str(self.listener_type)) LOG.debug("Processing target %s for fanout." % target_key) diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py index 22eaf898a..d495d2f54 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py @@ -246,7 +246,7 @@ class MatchmakerDummy(MatchmakerBase): return self.get_hosts(target, listener_type) def get_hosts_fanout(self, target, listener_type): - key = zmq_address.prefix_str(target.topic, listener_type) + key = zmq_address.target_to_key(target, listener_type) hosts = list(self._cache[key]) LOG.debug("[Dummy] get_hosts_fanout for target %(target)s: %(hosts)s", diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py index a6ba084a6..a7191f22e 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py @@ -224,7 +224,7 @@ class MatchmakerRedisBase(zmq_matchmaker_base.MatchmakerBase): return self._retry_method(target, listener_type, self.get_hosts) def get_hosts_fanout(self, target, listener_type): - key = zmq_address.prefix_str(target.topic, listener_type) + key = zmq_address.target_to_key(target, listener_type) hosts = list(self._smembers(key)) LOG.debug("[Redis] get_hosts_fanout for target %(target)s: %(hosts)s", diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py index 906267b42..aa32c665e 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py @@ -33,7 +33,7 @@ def prefix_str(key, listener_type): def target_to_key(target, listener_type=None): key = target.topic - if target.server: + if target.server and not target.fanout: # FIXME(ozamiatin): Workaround for Cinder. # Remove split when Bug #1630975 is being fixed. key += "/" + target.server.split('@')[0] diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_address.py b/oslo_messaging/tests/drivers/zmq/test_zmq_address.py new file mode 100644 index 000000000..519c294cc --- /dev/null +++ b/oslo_messaging/tests/drivers/zmq/test_zmq_address.py @@ -0,0 +1,67 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testscenarios +import testtools + +import oslo_messaging +from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging.tests import utils as test_utils + + +zmq = zmq_async.import_zmq() + +load_tests = testscenarios.load_tests_apply_scenarios + + +class TestZmqAddress(test_utils.BaseTestCase): + + scenarios = [ + ('router', {'listener_type': zmq_names.socket_type_str(zmq.ROUTER)}), + ('dealer', {'listener_type': zmq_names.socket_type_str(zmq.DEALER)}) + ] + + @testtools.skipIf(zmq is None, "zmq not available") + def test_target_to_key_topic_only(self): + target = oslo_messaging.Target(topic='topic') + key = zmq_address.target_to_key(target, self.listener_type) + self.assertEqual(self.listener_type + '/topic', key) + + @testtools.skipIf(zmq is None, "zmq not available") + def test_target_to_key_topic_server_round_robin(self): + target = oslo_messaging.Target(topic='topic', server='server') + key = zmq_address.target_to_key(target, self.listener_type) + self.assertEqual(self.listener_type + '/topic/server', key) + + @testtools.skipIf(zmq is None, "zmq not available") + def test_target_to_key_topic_fanout(self): + target = oslo_messaging.Target(topic='topic', fanout=True) + key = zmq_address.target_to_key(target, self.listener_type) + self.assertEqual(self.listener_type + '/topic', key) + + @testtools.skipIf(zmq is None, "zmq not available") + def test_target_to_key_topic_server_fanout(self): + target = oslo_messaging.Target(topic='topic', server='server', + fanout=True) + key = zmq_address.target_to_key(target, self.listener_type) + self.assertEqual(self.listener_type + '/topic', key) + + @testtools.skipIf(zmq is None, "zmq not available") + def test_target_to_key_topic_server_fanout_no_prefix(self): + target = oslo_messaging.Target(topic='topic', server='server', + fanout=True) + key = zmq_address.target_to_key(target) + self.assertEqual('topic', key)