[zmq] Fix fanout without PUB/SUB

This patch fix incorrect fanout behavior when use_pub_sub=False.
In addition it also makes dummy matchmaker behave similar to (already
fixed here) redis one in order to make future testing more realistic
(relevant tests were also fixed and refactored).

Change-Id: Ie131de189972250ea2d9b99fe65b5908b7144569
Closes-Bug: #1622968
This commit is contained in:
Gevorg Davoian 2016-09-13 13:58:25 +03:00
parent eaf433bf49
commit c3df59e1ab
5 changed files with 145 additions and 100 deletions

View File

@ -37,12 +37,17 @@ class SocketsManager(object):
return self.matchmaker.get_hosts_retry(
target, zmq_names.socket_type_str(self.listener_type))
def get_hosts_fanout(self, target):
return self.matchmaker.get_hosts_fanout_retry(
target, zmq_names.socket_type_str(self.listener_type))
@staticmethod
def _key_from_target(target):
return target.topic if target.fanout else str(target)
def _get_hosts_and_connect(self, socket, target):
hosts = self.get_hosts(target)
get_hosts = self.get_hosts_fanout if target.fanout else self.get_hosts
hosts = get_hosts(target)
self._connect_to_hosts(socket, target, hosts)
def _track_socket(self, socket, target):

View File

@ -13,6 +13,7 @@
import abc
import collections
import logging
import six
@ -20,6 +21,8 @@ from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
class MatchmakerUnavailable(rpc_common.RPCException):
"""Exception is raised on connection error to matchmaker service"""
@ -147,6 +150,28 @@ class MatchmakerBase(object):
:returns: a list of "hostname:port" hosts
"""
@abc.abstractmethod
def get_hosts_fanout(self, target, listener_type):
"""Get all hosts for fanout from nameserver by target.
:param target: the default target for invocations
:type target: Target
:param listener_type: listener socket type ROUTER, SUB etc.
:type listener_type: str
:returns: a list of "hostname:port" hosts
"""
@abc.abstractmethod
def get_hosts_fanout_retry(self, target, listener_type):
"""Retry if not host for fanout - used on client first time connection.
:param target: the default target for invocations
:type target: Target
:param listener_type: listener socket type ROUTER, SUB etc.
:type listener_type: str
:returns: a list of "hostname:port" hosts
"""
class MatchmakerDummy(MatchmakerBase):
@ -180,20 +205,56 @@ class MatchmakerDummy(MatchmakerBase):
return list(self._routers)
def register(self, target, hostname, listener_type, expire=-1):
key = zmq_address.target_to_key(target, listener_type)
if target.server:
key = zmq_address.target_to_key(target, listener_type)
if hostname not in self._cache[key]:
self._cache[key].append(hostname)
key = zmq_address.prefix_str(target.topic, listener_type)
if hostname not in self._cache[key]:
self._cache[key].append(hostname)
def unregister(self, target, hostname, listener_type):
key = zmq_address.target_to_key(target, listener_type)
if target.server:
key = zmq_address.target_to_key(target, listener_type)
if hostname in self._cache[key]:
self._cache[key].remove(hostname)
key = zmq_address.prefix_str(target.topic, listener_type)
if hostname in self._cache[key]:
self._cache[key].remove(hostname)
def get_hosts(self, target, listener_type):
key = zmq_address.target_to_key(target, listener_type)
return self._cache[key]
hosts = []
if target.server:
key = zmq_address.target_to_key(target, listener_type)
hosts.extend(self._cache[key])
if not hosts:
key = zmq_address.prefix_str(target.topic, listener_type)
hosts.extend(self._cache[key])
LOG.debug("[Dummy] get_hosts for target %(target)s: %(hosts)s",
{"target": target, "hosts": hosts})
return hosts
def get_hosts_retry(self, target, listener_type):
# Do not complicate dummy matchmaker
# This method will act smarter in real world matchmakers
return self.get_hosts(target, listener_type)
def get_hosts_fanout(self, target, listener_type):
key = zmq_address.prefix_str(target.topic, listener_type)
hosts = list(self._cache[key])
LOG.debug("[Dummy] get_hosts_fanout for target %(target)s: %(hosts)s",
{"target": target, "hosts": hosts})
return hosts
def get_hosts_fanout_retry(self, target, listener_type):
# Do not complicate dummy matchmaker
# This method will act smarter in real world matchmakers
return self.get_hosts_fanout(target, listener_type)

View File

@ -198,34 +198,32 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase):
@redis_connection_warn
def register(self, target, hostname, listener_type, expire=-1):
if target.topic and target.server:
if target.server:
key = zmq_address.target_to_key(target, listener_type)
self._add_key_with_expire(key, hostname, expire)
if target.topic:
key = zmq_address.prefix_str(target.topic, listener_type)
self._add_key_with_expire(key, hostname, expire)
key = zmq_address.prefix_str(target.topic, listener_type)
self._add_key_with_expire(key, hostname, expire)
@no_reraise
@redis_connection_warn
def unregister(self, target, hostname, listener_type):
if target.topic and target.server:
if target.server:
key = zmq_address.target_to_key(target, listener_type)
self._redis.srem(key, hostname)
if target.topic:
key = zmq_address.prefix_str(target.topic, listener_type)
self._redis.srem(key, hostname)
key = zmq_address.prefix_str(target.topic, listener_type)
self._redis.srem(key, hostname)
@redis_connection_warn
def get_hosts(self, target, listener_type):
hosts = []
if target.topic and target.server:
if target.server:
key = zmq_address.target_to_key(target, listener_type)
hosts.extend(self._get_hosts_by_key(key))
if not hosts and target.topic:
if not hosts:
key = zmq_address.prefix_str(target.topic, listener_type)
hosts.extend(self._get_hosts_by_key(key))
@ -239,14 +237,8 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase):
@redis_connection_warn
def get_hosts_fanout(self, target, listener_type):
hosts = []
if target.topic and target.server:
key = zmq_address.target_to_key(target, listener_type)
hosts.extend(self._get_hosts_by_key(key))
key = zmq_address.prefix_str(target.topic, listener_type)
hosts.extend(self._get_hosts_by_key(key))
hosts = list(self._get_hosts_by_key(key))
LOG.debug("[Redis] get_hosts_fanout for target %(target)s: %(hosts)s",
{"target": target, "hosts": hosts})

View File

@ -40,6 +40,10 @@ class ZmqServer(base.PollStyleListener):
self.target = target
self.poller = poller or zmq_async.get_poller()
LOG.info(_LI('[%(host)s] Run server %(target)s'),
{'host': self.conf.oslo_messaging_zmq.rpc_zmq_host,
'target': self.target})
self.router_consumer = zmq_router_consumer.RouterConsumer(
conf, self.poller, self) \
if not conf.oslo_messaging_zmq.use_router_proxy else None
@ -66,15 +70,22 @@ class ZmqServer(base.PollStyleListener):
def stop(self):
self.poller.close()
LOG.info(_LI("Stop server %(target)s"), {'target': self.target})
for consumer in self.consumers:
consumer.stop()
LOG.info(_LI('[%(host)s] Stop server %(target)s'),
{'host': self.conf.oslo_messaging_zmq.rpc_zmq_host,
'target': self.target})
def cleanup(self):
self.poller.close()
for consumer in self.consumers:
consumer.cleanup()
LOG.info(_LI('[%(host)s] Destroy server %(target)s'),
{'host': self.conf.oslo_messaging_zmq.rpc_zmq_host,
'target': self.target})
class ZmqNotificationServer(base.PollStyleListener):

View File

@ -13,7 +13,6 @@
# under the License.
import testtools
import time
import oslo_messaging
from oslo_messaging._drivers import impl_zmq
@ -68,86 +67,63 @@ class TestConfZmqDriverLoad(test_utils.BaseTestCase):
class TestZmqBasics(zmq_common.ZmqBaseTestCase):
def test_send_receive_raises(self):
"""Call() without method."""
target = oslo_messaging.Target(topic='testtopic')
self.listener.listen(target)
self.assertRaises(
KeyError,
self.driver.send,
target, {}, {'tx_id': 1},
wait_for_reply=True,
timeout=60)
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestZmqBasics, self).setUp()
self.target = oslo_messaging.Target(topic='topic')
self.ctxt = {'key': 'value'}
self.message = {'method': 'qwerty', 'args': {'int': 1, 'bool': True}}
def test_send_receive_topic(self):
"""Call() with topic."""
def test_send_call_without_method_failure(self):
self.message.pop('method')
self.listener.listen(self.target)
self.assertRaises(KeyError, self.driver.send,
self.target, self.ctxt, self.message,
wait_for_reply=True, timeout=10)
target = oslo_messaging.Target(topic='testtopic')
self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True,
timeout=60)
def _check_listener_received(self):
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.ctxt, self.listener.message.ctxt)
self.assertEqual(self.message, self.listener.message.message)
def test_send_call_success(self):
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=True, timeout=10)
self.assertTrue(result)
self._check_listener_received()
def test_send_noreply(self):
"""Cast() with topic."""
target = oslo_messaging.Target(topic='testtopic', server="my@server")
self.listener.listen(target)
time.sleep(0.01)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait(5)
self.assertIsNone(result)
self.assertTrue(self.listener._received.isSet())
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
def test_send_fanout(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait(5)
self.assertIsNone(result)
self.assertTrue(self.listener._received.isSet())
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
def test_send_receive_direct(self):
"""Call() without topic."""
target = oslo_messaging.Target(server='127.0.0.1')
self.listener.listen(target)
message = {'method': 'hello-world', 'tx_id': 1}
context = {}
result = self.driver.send(target, context, message,
wait_for_reply=True,
timeout=60)
def test_send_call_direct_success(self):
self.target.server = 'server'
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=True, timeout=10)
self.assertTrue(result)
self._check_listener_received()
def test_send_receive_notification(self):
"""Notify() test"""
target = oslo_messaging.Target(topic='t1',
server='notification@server')
self.listener.listen_notifications([(target, 'info')])
message = {'method': 'hello-world', 'tx_id': 1}
context = {}
target.topic += '.info'
self.driver.send_notification(target, context, message, '3.0')
def test_send_cast_direct_success(self):
self.target.server = 'server'
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=False)
self.listener._received.wait(5)
self.assertTrue(self.listener._received.isSet())
self.assertIsNone(result)
self._check_listener_received()
def test_send_fanout_success(self):
self.target.fanout = True
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=False)
self.listener._received.wait(5)
self.assertIsNone(result)
self._check_listener_received()
def test_send_notify_success(self):
self.listener.listen_notifications([(self.target, 'info')])
self.target.topic += '.info'
result = self.driver.send_notification(self.target, self.ctxt,
self.message, '3.0')
self.listener._received.wait(5)
self.assertIsNone(result)
self._check_listener_received()