Add functional and unit 0mq driver tests
Basic functional and unit tests for zmq driver. Note as the zmq driver is directly dependent on eventlet, this change also updates the notify logger tests to remove the direct dependency on threading which was being monkey patched, causing test failures. As the zmq driver has a direct dependency on eventlet, tests are skipped under py3. Co-Authored-By: Kapil Thangavelu <kapil.thangavelu@canonical.com> Co-Authored-By: Edward Hope-Morley <edward.hope-morley@canonical.com> Change-Id: I93b8b2e92d0f2a353d3357a5e61f6d472ec84944 Partial-bug: #1302941
This commit is contained in:
parent
f4bb707bc3
commit
cd71c47d32
@ -32,7 +32,7 @@ from oslo.config import cfg
|
||||
from oslo.messaging._drivers import base
|
||||
from oslo.messaging._drivers import common as rpc_common
|
||||
from oslo.messaging._executors import impl_eventlet # FIXME(markmc)
|
||||
from oslo.messaging._i18n import _
|
||||
from oslo.messaging._i18n import _, _LE
|
||||
from oslo.serialization import jsonutils
|
||||
from oslo.utils import excutils
|
||||
from oslo.utils import importutils
|
||||
@ -180,6 +180,10 @@ class ZmqSocket(object):
|
||||
self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
|
||||
self.subscriptions.remove(msg_filter)
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self.sock is None or self.sock.closed
|
||||
|
||||
def close(self):
|
||||
if self.sock is None or self.sock.closed:
|
||||
return
|
||||
@ -257,7 +261,10 @@ class RpcContext(rpc_common.CommonRpcContext):
|
||||
|
||||
@classmethod
|
||||
def marshal(self, ctx):
|
||||
if not isinstance(ctx, dict):
|
||||
ctx_data = ctx.to_dict()
|
||||
else:
|
||||
ctx_data = ctx
|
||||
return _serialize(ctx_data)
|
||||
|
||||
@classmethod
|
||||
@ -395,7 +402,7 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
def consume_in_thread(self):
|
||||
def _consume(sock):
|
||||
LOG.info(_("Consuming socket"))
|
||||
while True:
|
||||
while not sock.closed:
|
||||
self.consume(sock)
|
||||
|
||||
for k in self.proxies.keys():
|
||||
@ -408,12 +415,12 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
t.wait()
|
||||
|
||||
def close(self):
|
||||
for s in self.sockets:
|
||||
s.close()
|
||||
|
||||
for t in self.threads:
|
||||
t.kill()
|
||||
|
||||
for s in self.sockets:
|
||||
s.close()
|
||||
|
||||
|
||||
class ZmqProxy(ZmqBaseReactor):
|
||||
"""A consumer class implementing a topic-based proxy.
|
||||
@ -612,9 +619,15 @@ class Connection(rpc_common.Connection):
|
||||
self.topics.append(topic)
|
||||
|
||||
def close(self):
|
||||
_get_matchmaker().stop_heartbeat()
|
||||
mm = _get_matchmaker()
|
||||
mm.stop_heartbeat()
|
||||
for topic in self.topics:
|
||||
_get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
|
||||
try:
|
||||
mm.unregister(topic, CONF.rpc_zmq_host)
|
||||
except Exception as err:
|
||||
LOG.error(_LE('Unable to unregister topic %(topic)s'
|
||||
' from matchmaker: %(err)s') %
|
||||
{'topic': topic, 'err': err})
|
||||
|
||||
self.reactor.close()
|
||||
self.topics = []
|
||||
@ -634,6 +647,7 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False,
|
||||
payload = [RpcContext.marshal(context), msg]
|
||||
|
||||
with Timeout(timeout_cast, exception=rpc_common.Timeout):
|
||||
conn = None
|
||||
try:
|
||||
conn = ZmqClient(addr)
|
||||
|
||||
@ -642,7 +656,7 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False,
|
||||
except zmq.ZMQError:
|
||||
raise RPCException("Cast failed. ZMQ Socket Exception")
|
||||
finally:
|
||||
if 'conn' in vars():
|
||||
if conn is not None:
|
||||
conn.close()
|
||||
|
||||
|
||||
@ -684,12 +698,14 @@ def _call(addr, context, topic, msg, timeout=None,
|
||||
zmq.SUB, subscribe=msg_id, bind=False
|
||||
)
|
||||
|
||||
LOG.debug("Sending cast")
|
||||
LOG.debug("Sending cast: %s", topic)
|
||||
_cast(addr, context, topic, payload, envelope=envelope)
|
||||
|
||||
LOG.debug("Cast sent; Waiting reply")
|
||||
# Blocks until receives reply
|
||||
msg = msg_waiter.recv()
|
||||
if msg is None:
|
||||
raise rpc_common.Timeout()
|
||||
LOG.debug("Received message: %s", msg)
|
||||
LOG.debug("Unpacking response")
|
||||
|
||||
@ -789,7 +805,7 @@ class ZmqIncomingMessage(base.IncomingMessage):
|
||||
self.condition.notify()
|
||||
|
||||
def requeue(self):
|
||||
pass
|
||||
LOG.debug("WARNING: requeue not supported")
|
||||
|
||||
|
||||
class ZmqListener(base.Listener):
|
||||
@ -858,19 +874,11 @@ class ZmqDriver(base.BaseDriver):
|
||||
raise NotImplementedError('The ZeroMQ driver currently only works '
|
||||
'with oslo.config.cfg.CONF')
|
||||
|
||||
self.listeners = []
|
||||
|
||||
def _send(self, target, ctxt, message,
|
||||
wait_for_reply=None, timeout=None, envelope=False):
|
||||
|
||||
# FIXME(markmc): remove this temporary hack
|
||||
class Context(object):
|
||||
def __init__(self, d):
|
||||
self.d = d
|
||||
|
||||
def to_dict(self):
|
||||
return self.d
|
||||
|
||||
context = Context(ctxt)
|
||||
|
||||
if wait_for_reply:
|
||||
method = _call
|
||||
else:
|
||||
@ -884,7 +892,7 @@ class ZmqDriver(base.BaseDriver):
|
||||
elif target.server:
|
||||
topic = '%s.%s' % (topic, target.server)
|
||||
|
||||
reply = _multi_send(method, context, topic, message,
|
||||
reply = _multi_send(method, ctxt, topic, message,
|
||||
envelope=envelope,
|
||||
allowed_remote_exmods=self._allowed_remote_exmods)
|
||||
|
||||
@ -916,6 +924,7 @@ class ZmqDriver(base.BaseDriver):
|
||||
conn.create_consumer(target.topic, listener, fanout=True)
|
||||
|
||||
conn.consume_in_thread()
|
||||
self.listeners.append(conn)
|
||||
|
||||
return listener
|
||||
|
||||
@ -934,8 +943,11 @@ class ZmqDriver(base.BaseDriver):
|
||||
conn.create_consumer('%s-%s' % (target.topic, priority),
|
||||
listener)
|
||||
conn.consume_in_thread()
|
||||
self.listeners.append(conn)
|
||||
|
||||
return listener
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
for c in self.listeners:
|
||||
c.close()
|
||||
self.listeners = []
|
||||
|
@ -21,6 +21,9 @@ qpid-python
|
||||
# for test_matchmaker_redis
|
||||
redis>=2.10.0
|
||||
|
||||
# for test_impl_zmq
|
||||
pyzmq>=14.3.1
|
||||
|
||||
# when we can require tox>= 1.4, this can go into tox.ini:
|
||||
# [testenv:cover]
|
||||
# deps = {[testenv]deps} coverage
|
||||
|
504
tests/drivers/test_impl_zmq.py
Normal file
504
tests/drivers/test_impl_zmq.py
Normal file
@ -0,0 +1,504 @@
|
||||
# Copyright 2014 Canonical, Ltd.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import socket
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
from oslo import messaging
|
||||
from oslo.utils import importutils
|
||||
from tests import utils as test_utils
|
||||
|
||||
# NOTE(jamespage) the zmq driver implementation is currently tied
|
||||
# to eventlet so we have to monkey_patch to support testing
|
||||
# eventlet is not yet py3 compatible, so skip if not installed
|
||||
eventlet = importutils.try_import('eventlet')
|
||||
if eventlet:
|
||||
eventlet.monkey_patch()
|
||||
|
||||
impl_zmq = importutils.try_import('oslo.messaging._drivers.impl_zmq')
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_unused_port():
|
||||
"""Returns an unused port on localhost."""
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
s.bind(('localhost', 0))
|
||||
port = s.getsockname()[1]
|
||||
s.close()
|
||||
return port
|
||||
|
||||
|
||||
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestConfZmqDriverLoad, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
|
||||
def test_driver_load(self):
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
|
||||
|
||||
|
||||
class stopRpc(object):
|
||||
def __init__(self, attrs):
|
||||
self.attrs = attrs
|
||||
|
||||
def __call__(self):
|
||||
if self.attrs['reactor']:
|
||||
self.attrs['reactor'].close()
|
||||
if self.attrs['driver']:
|
||||
self.attrs['driver'].cleanup()
|
||||
|
||||
|
||||
class TestZmqBasics(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqBasics, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
# Get driver
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
# Set config values
|
||||
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
||||
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
|
||||
'rpc_zmq_host': '127.0.0.1',
|
||||
'rpc_response_timeout': 5,
|
||||
'rpc_zmq_port': get_unused_port(),
|
||||
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
|
||||
self.config(**kwargs)
|
||||
|
||||
# Start RPC
|
||||
LOG.info("Running internal zmq receiver.")
|
||||
self.reactor = impl_zmq.ZmqProxy(self.conf)
|
||||
self.reactor.consume_in_thread()
|
||||
|
||||
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
|
||||
self.addCleanup(stopRpc(self.__dict__))
|
||||
|
||||
def test_start_stop_listener(self):
|
||||
target = messaging.Target(topic='testtopic')
|
||||
listener = self.driver.listen(target)
|
||||
result = listener.poll(0.01)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
def test_send_receive_raises(self):
|
||||
"""Call() without method."""
|
||||
target = messaging.Target(topic='testtopic')
|
||||
self.driver.listen(target)
|
||||
self.assertRaises(
|
||||
KeyError,
|
||||
self.driver.send,
|
||||
target, {}, {'tx_id': 1}, wait_for_reply=True)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqIncomingMessage')
|
||||
def test_send_receive_topic(self, mock_msg):
|
||||
"""Call() with method."""
|
||||
mock_msg.return_value = msg = mock.MagicMock()
|
||||
msg.received = received = mock.MagicMock()
|
||||
received.failure = False
|
||||
received.reply = True
|
||||
msg.condition = condition = mock.MagicMock()
|
||||
condition.wait.return_value = True
|
||||
|
||||
target = messaging.Target(topic='testtopic')
|
||||
self.driver.listen(target)
|
||||
result = self.driver.send(
|
||||
target, {},
|
||||
{'method': 'hello-world', 'tx_id': 1},
|
||||
wait_for_reply=True)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._call', autospec=True)
|
||||
def test_send_receive_fanout(self, mock_call):
|
||||
target = messaging.Target(topic='testtopic', fanout=True)
|
||||
self.driver.listen(target)
|
||||
|
||||
mock_call.__name__ = '_call'
|
||||
mock_call.return_value = [True]
|
||||
|
||||
result = self.driver.send(
|
||||
target, {},
|
||||
{'method': 'hello-world', 'tx_id': 1},
|
||||
wait_for_reply=True)
|
||||
|
||||
self.assertEqual(result, True)
|
||||
mock_call.assert_called_once_with(
|
||||
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
|
||||
{}, 'fanout~testtopic.127.0.0.1',
|
||||
{'tx_id': 1, 'method': 'hello-world'},
|
||||
None, False, [])
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._call', autospec=True)
|
||||
def test_send_receive_direct(self, mock_call):
|
||||
# Also verifies fix for bug http://pad.lv/1301723
|
||||
target = messaging.Target(topic='testtopic', server='localhost')
|
||||
self.driver.listen(target)
|
||||
|
||||
mock_call.__name__ = '_call'
|
||||
mock_call.return_value = [True]
|
||||
|
||||
result = self.driver.send(
|
||||
target, {},
|
||||
{'method': 'hello-world', 'tx_id': 1},
|
||||
wait_for_reply=True)
|
||||
|
||||
self.assertEqual(result, True)
|
||||
mock_call.assert_called_once_with(
|
||||
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
|
||||
{}, 'testtopic.localhost',
|
||||
{'tx_id': 1, 'method': 'hello-world'},
|
||||
None, False, [])
|
||||
|
||||
|
||||
class TestZmqSocket(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqSocket, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
# Get driver
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
|
||||
def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe):
|
||||
mock_ctxt = mock.Mock()
|
||||
mock_context.return_value = mock_ctxt
|
||||
mock_sock = mock.Mock()
|
||||
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
|
||||
mock_sock.connect = mock.Mock()
|
||||
mock_sock.bind = mock.Mock()
|
||||
addr = '127.0.0.1'
|
||||
|
||||
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False,
|
||||
subscribe=None)
|
||||
self.assertTrue(sock.can_recv)
|
||||
self.assertFalse(sock.can_send)
|
||||
self.assertFalse(sock.can_sub)
|
||||
self.assertTrue(mock_sock.connect.called)
|
||||
self.assertFalse(mock_sock.bind.called)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
|
||||
def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe):
|
||||
mock_ctxt = mock.Mock()
|
||||
mock_context.return_value = mock_ctxt
|
||||
mock_sock = mock.Mock()
|
||||
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
|
||||
mock_sock.connect = mock.Mock()
|
||||
mock_sock.bind = mock.Mock()
|
||||
addr = '127.0.0.1'
|
||||
|
||||
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False,
|
||||
subscribe=None)
|
||||
self.assertTrue(sock.can_recv)
|
||||
self.assertFalse(sock.can_send)
|
||||
self.assertTrue(sock.can_sub)
|
||||
self.assertTrue(mock_sock.connect.called)
|
||||
self.assertFalse(mock_sock.bind.called)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
|
||||
def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe):
|
||||
mock_ctxt = mock.Mock()
|
||||
mock_context.return_value = mock_ctxt
|
||||
mock_sock = mock.Mock()
|
||||
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
|
||||
mock_sock.connect = mock.Mock()
|
||||
mock_sock.bind = mock.Mock()
|
||||
addr = '127.0.0.1'
|
||||
|
||||
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False,
|
||||
subscribe=None)
|
||||
self.assertFalse(sock.can_recv)
|
||||
self.assertTrue(sock.can_send)
|
||||
self.assertFalse(sock.can_sub)
|
||||
self.assertTrue(mock_sock.connect.called)
|
||||
self.assertFalse(mock_sock.bind.called)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
|
||||
def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe):
|
||||
mock_ctxt = mock.Mock()
|
||||
mock_context.return_value = mock_ctxt
|
||||
mock_sock = mock.Mock()
|
||||
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
|
||||
mock_sock.connect = mock.Mock()
|
||||
mock_sock.bind = mock.Mock()
|
||||
addr = '127.0.0.1'
|
||||
|
||||
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False,
|
||||
subscribe=None)
|
||||
self.assertFalse(sock.can_recv)
|
||||
self.assertTrue(sock.can_send)
|
||||
self.assertFalse(sock.can_sub)
|
||||
self.assertTrue(mock_sock.connect.called)
|
||||
self.assertFalse(mock_sock.bind.called)
|
||||
|
||||
|
||||
class TestZmqIncomingMessage(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqIncomingMessage, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
# Get driver
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
def test_zmqincomingmessage(self):
|
||||
msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo')
|
||||
msg.reply("abc")
|
||||
self.assertIsInstance(
|
||||
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
|
||||
self.assertIsInstance(
|
||||
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
|
||||
self.assertEqual(msg.received.reply, "abc")
|
||||
msg.requeue()
|
||||
|
||||
|
||||
class TestZmqConnection(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqConnection, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
# Get driver
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
# Set config values
|
||||
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
||||
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
|
||||
'rpc_zmq_host': '127.0.0.1',
|
||||
'rpc_response_timeout': 5,
|
||||
'rpc_zmq_port': get_unused_port(),
|
||||
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
|
||||
self.config(**kwargs)
|
||||
|
||||
# Start RPC
|
||||
LOG.info("Running internal zmq receiver.")
|
||||
self.reactor = impl_zmq.ZmqProxy(self.conf)
|
||||
self.reactor.consume_in_thread()
|
||||
|
||||
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
|
||||
self.addCleanup(stopRpc(self.__dict__))
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_create_consumer(self, mock_reactor):
|
||||
|
||||
mock_reactor.register = mock.Mock()
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
topic = 'topic.foo'
|
||||
context = mock.Mock()
|
||||
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
|
||||
(self.internal_ipc_dir))
|
||||
# No Fanout
|
||||
conn.create_consumer(topic, context)
|
||||
conn.reactor.register.assert_called_with(context, inaddr,
|
||||
impl_zmq.zmq.PULL,
|
||||
subscribe=None, in_bind=False)
|
||||
|
||||
# Reset for next bunch of checks
|
||||
conn.reactor.register.reset_mock()
|
||||
|
||||
# Fanout
|
||||
inaddr = ('ipc://%s/zmq_topic_fanout~topic' %
|
||||
(self.internal_ipc_dir))
|
||||
conn.create_consumer(topic, context, fanout='subscriber.foo')
|
||||
conn.reactor.register.assert_called_with(context, inaddr,
|
||||
impl_zmq.zmq.SUB,
|
||||
subscribe='subscriber.foo',
|
||||
in_bind=False)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
|
||||
mock_reactor.register = mock.Mock()
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
topic = 'topic.foo'
|
||||
context = mock.Mock()
|
||||
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
|
||||
(self.internal_ipc_dir))
|
||||
|
||||
conn.create_consumer(topic, context)
|
||||
conn.reactor.register.assert_called_with(
|
||||
context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False)
|
||||
conn.reactor.register.reset_mock()
|
||||
# Call again with same topic
|
||||
conn.create_consumer(topic, context)
|
||||
self.assertFalse(conn.reactor.register.called)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._get_matchmaker',
|
||||
autospec=True)
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn.reactor.close = mock.Mock()
|
||||
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
|
||||
conn.close()
|
||||
self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called)
|
||||
self.assertTrue(conn.reactor.close.called)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_wait(self, mock_reactor):
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn.reactor.wait = mock.Mock()
|
||||
conn.wait()
|
||||
self.assertTrue(conn.reactor.wait.called)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._get_matchmaker',
|
||||
autospec=True)
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_consume_in_thread(self, mock_reactor,
|
||||
mock_getmatchmaker):
|
||||
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn.reactor.consume_in_thread = mock.Mock()
|
||||
conn.consume_in_thread()
|
||||
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
|
||||
self.assertTrue(conn.reactor.consume_in_thread.called)
|
||||
|
||||
|
||||
class TestZmqListener(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqListener, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
# Get driver
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
# Set config values
|
||||
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
||||
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
|
||||
'rpc_zmq_host': '127.0.0.1',
|
||||
'rpc_response_timeout': 5,
|
||||
'rpc_zmq_port': get_unused_port(),
|
||||
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
|
||||
self.config(**kwargs)
|
||||
|
||||
# Start RPC
|
||||
LOG.info("Running internal zmq receiver.")
|
||||
self.reactor = impl_zmq.ZmqProxy(self.conf)
|
||||
self.reactor.consume_in_thread()
|
||||
|
||||
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
|
||||
self.addCleanup(stopRpc(self.__dict__))
|
||||
|
||||
def test_zmqlistener_no_msg(self):
|
||||
listener = impl_zmq.ZmqListener(self.driver)
|
||||
# Timeout = 0 should return straight away since the queue is empty
|
||||
listener.poll(timeout=0)
|
||||
|
||||
def test_zmqlistener_w_msg(self):
|
||||
listener = impl_zmq.ZmqListener(self.driver)
|
||||
kwargs = {'a': 1, 'b': 2}
|
||||
m = mock.Mock()
|
||||
ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
|
||||
eventlet.spawn_n(listener.dispatch, ctxt, 0,
|
||||
m.fake_method, 'name.space', **kwargs)
|
||||
resp = listener.poll(timeout=10)
|
||||
msg = {'method': m.fake_method, 'namespace': 'name.space',
|
||||
'args': kwargs}
|
||||
self.assertEqual(resp.message, msg)
|
||||
|
||||
|
||||
class TestZmqDriver(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqDriver, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
# Get driver
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
# Set config values
|
||||
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
||||
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
|
||||
'rpc_zmq_host': '127.0.0.1',
|
||||
'rpc_response_timeout': 5,
|
||||
'rpc_zmq_port': get_unused_port(),
|
||||
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
|
||||
self.config(**kwargs)
|
||||
|
||||
# Start RPC
|
||||
LOG.info("Running internal zmq receiver.")
|
||||
self.reactor = impl_zmq.ZmqProxy(self.conf)
|
||||
self.reactor.consume_in_thread()
|
||||
|
||||
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
|
||||
self.addCleanup(stopRpc(self.__dict__))
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True)
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True)
|
||||
def test_zmqdriver_send(self, mock_multi_send, mock_cast):
|
||||
context = mock.Mock(autospec=impl_zmq.RpcContext)
|
||||
topic = 'testtopic'
|
||||
msg = 'jeronimo'
|
||||
self.driver.send(messaging.Target(topic=topic), context, msg,
|
||||
False, 0, False)
|
||||
mock_multi_send.assert_called_with(mock_cast, context, topic, msg,
|
||||
allowed_remote_exmods=[],
|
||||
envelope=False)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True)
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True)
|
||||
def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast):
|
||||
context = mock.Mock(autospec=impl_zmq.RpcContext)
|
||||
topic = 'testtopic.foo'
|
||||
topic_reformat = 'testtopic-foo'
|
||||
msg = 'jeronimo'
|
||||
self.driver.send_notification(messaging.Target(topic=topic), context,
|
||||
msg, False, False)
|
||||
mock_multi_send.assert_called_with(mock_cast, context, topic_reformat,
|
||||
msg, allowed_remote_exmods=[],
|
||||
envelope=False)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqListener', autospec=True)
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.Connection', autospec=True)
|
||||
def test_zmqdriver_listen(self, mock_connection, mock_listener):
|
||||
mock_listener.return_value = listener = mock.Mock()
|
||||
mock_connection.return_value = conn = mock.Mock()
|
||||
conn.create_consumer = mock.Mock()
|
||||
conn.consume_in_thread = mock.Mock()
|
||||
topic = 'testtopic.foo'
|
||||
self.driver.listen(messaging.Target(topic=topic))
|
||||
conn.create_consumer.assert_called_with(topic, listener, fanout=True)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqListener', autospec=True)
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.Connection', autospec=True)
|
||||
def test_zmqdriver_listen_for_notification(self, mock_connection,
|
||||
mock_listener):
|
||||
mock_listener.return_value = listener = mock.Mock()
|
||||
mock_connection.return_value = conn = mock.Mock()
|
||||
conn.create_consumer = mock.Mock()
|
||||
conn.consume_in_thread = mock.Mock()
|
||||
topic = 'testtopic.foo'
|
||||
data = [(messaging.Target(topic=topic), 0)]
|
||||
# NOTE(jamespage): Pooling not supported, just pass None for now.
|
||||
self.driver.listen_for_notifications(data, None)
|
||||
conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener)
|
@ -64,7 +64,7 @@ class ListenerSetupMixin(object):
|
||||
|
||||
def wait_for(self, expect_messages):
|
||||
while expect_messages != self._received_msgs:
|
||||
pass
|
||||
yield
|
||||
|
||||
def stop(self):
|
||||
for listener in self.listeners:
|
||||
|
@ -17,10 +17,6 @@ import logging
|
||||
import logging.config
|
||||
import os
|
||||
import sys
|
||||
try:
|
||||
import threading
|
||||
except ImportError:
|
||||
threading = None
|
||||
|
||||
import mock
|
||||
import testscenarios
|
||||
@ -39,13 +35,6 @@ logging.AUDIT = logging.INFO + 1
|
||||
logging.addLevelName(logging.AUDIT, 'AUDIT')
|
||||
|
||||
|
||||
def get_thread_ident():
|
||||
if threading is not None:
|
||||
return threading.current_thread().ident
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
class TestLogNotifier(test_utils.BaseTestCase):
|
||||
|
||||
scenarios = [
|
||||
@ -62,6 +51,10 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
||||
super(TestLogNotifier, self).setUp()
|
||||
self.addCleanup(messaging.notify._impl_test.reset)
|
||||
self.config(notification_driver=['test'])
|
||||
# NOTE(jamespage) disable thread information logging for testing
|
||||
# as this causes test failures when zmq tests monkey_patch via
|
||||
# eventlet
|
||||
logging.logThreads = 0
|
||||
|
||||
@mock.patch('oslo.utils.timeutils.utcnow')
|
||||
def test_logger(self, mock_utcnow):
|
||||
@ -93,7 +86,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
||||
{'process': os.getpid(),
|
||||
'funcName': None,
|
||||
'name': 'foo',
|
||||
'thread': get_thread_ident(),
|
||||
'thread': None,
|
||||
'levelno': levelno,
|
||||
'processName': 'MainProcess',
|
||||
'pathname': '/foo/bar',
|
||||
@ -149,7 +142,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
||||
{'process': os.getpid(),
|
||||
'funcName': 'test_logging_conf',
|
||||
'name': 'default',
|
||||
'thread': get_thread_ident(),
|
||||
'thread': None,
|
||||
'levelno': levelno,
|
||||
'processName': 'MainProcess',
|
||||
'pathname': pathname,
|
||||
|
Loading…
Reference in New Issue
Block a user