oslo.messaging/tests/drivers/test_impl_zmq.py
James Page cd71c47d32 Add functional and unit 0mq driver tests
Basic functional and unit tests for zmq driver.

Note as the zmq driver is directly dependent on eventlet, this
change also updates the notify logger tests to remove the
direct dependency on threading which was being monkey patched,
causing test failures.

As the zmq driver has a direct dependency on eventlet, tests are
skipped under py3.

Co-Authored-By: Kapil Thangavelu <kapil.thangavelu@canonical.com>
Co-Authored-By: Edward Hope-Morley <edward.hope-morley@canonical.com>

Change-Id: I93b8b2e92d0f2a353d3357a5e61f6d472ec84944
Partial-bug: #1302941
2014-12-09 15:02:31 +00:00

505 lines
20 KiB
Python

# Copyright 2014 Canonical, Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import socket
import fixtures
import mock
import testtools
from oslo import messaging
from oslo.utils import importutils
from tests import utils as test_utils
# NOTE(jamespage) the zmq driver implementation is currently tied
# to eventlet so we have to monkey_patch to support testing
# eventlet is not yet py3 compatible, so skip if not installed
eventlet = importutils.try_import('eventlet')
if eventlet:
eventlet.monkey_patch()
impl_zmq = importutils.try_import('oslo.messaging._drivers.impl_zmq')
LOG = logging.getLogger(__name__)
def get_unused_port():
"""Returns an unused port on localhost."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('localhost', 0))
port = s.getsockname()[1]
s.close()
return port
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestConfZmqDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
transport = messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
class stopRpc(object):
def __init__(self, attrs):
self.attrs = attrs
def __call__(self):
if self.attrs['reactor']:
self.attrs['reactor'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
class TestZmqBasics(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqBasics, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
def test_start_stop_listener(self):
target = messaging.Target(topic='testtopic')
listener = self.driver.listen(target)
result = listener.poll(0.01)
self.assertEqual(result, None)
def test_send_receive_raises(self):
"""Call() without method."""
target = messaging.Target(topic='testtopic')
self.driver.listen(target)
self.assertRaises(
KeyError,
self.driver.send,
target, {}, {'tx_id': 1}, wait_for_reply=True)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqIncomingMessage')
def test_send_receive_topic(self, mock_msg):
"""Call() with method."""
mock_msg.return_value = msg = mock.MagicMock()
msg.received = received = mock.MagicMock()
received.failure = False
received.reply = True
msg.condition = condition = mock.MagicMock()
condition.wait.return_value = True
target = messaging.Target(topic='testtopic')
self.driver.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
@mock.patch('oslo.messaging._drivers.impl_zmq._call', autospec=True)
def test_send_receive_fanout(self, mock_call):
target = messaging.Target(topic='testtopic', fanout=True)
self.driver.listen(target)
mock_call.__name__ = '_call'
mock_call.return_value = [True]
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
mock_call.assert_called_once_with(
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
{}, 'fanout~testtopic.127.0.0.1',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [])
@mock.patch('oslo.messaging._drivers.impl_zmq._call', autospec=True)
def test_send_receive_direct(self, mock_call):
# Also verifies fix for bug http://pad.lv/1301723
target = messaging.Target(topic='testtopic', server='localhost')
self.driver.listen(target)
mock_call.__name__ = '_call'
mock_call.return_value = [True]
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
mock_call.assert_called_once_with(
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
{}, 'testtopic.localhost',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [])
class TestZmqSocket(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqSocket, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False,
subscribe=None)
self.assertTrue(sock.can_recv)
self.assertFalse(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False,
subscribe=None)
self.assertTrue(sock.can_recv)
self.assertFalse(sock.can_send)
self.assertTrue(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False,
subscribe=None)
self.assertFalse(sock.can_recv)
self.assertTrue(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False,
subscribe=None)
self.assertFalse(sock.can_recv)
self.assertTrue(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
class TestZmqIncomingMessage(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqIncomingMessage, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
def test_zmqincomingmessage(self):
msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo')
msg.reply("abc")
self.assertIsInstance(
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
self.assertIsInstance(
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
self.assertEqual(msg.received.reply, "abc")
msg.requeue()
class TestZmqConnection(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqConnection, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
(self.internal_ipc_dir))
# No Fanout
conn.create_consumer(topic, context)
conn.reactor.register.assert_called_with(context, inaddr,
impl_zmq.zmq.PULL,
subscribe=None, in_bind=False)
# Reset for next bunch of checks
conn.reactor.register.reset_mock()
# Fanout
inaddr = ('ipc://%s/zmq_topic_fanout~topic' %
(self.internal_ipc_dir))
conn.create_consumer(topic, context, fanout='subscriber.foo')
conn.reactor.register.assert_called_with(context, inaddr,
impl_zmq.zmq.SUB,
subscribe='subscriber.foo',
in_bind=False)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
(self.internal_ipc_dir))
conn.create_consumer(topic, context)
conn.reactor.register.assert_called_with(
context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False)
conn.reactor.register.reset_mock()
# Call again with same topic
conn.create_consumer(topic, context)
self.assertFalse(conn.reactor.register.called)
@mock.patch('oslo.messaging._drivers.impl_zmq._get_matchmaker',
autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
conn = impl_zmq.Connection(self.driver)
conn.reactor.close = mock.Mock()
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
conn.close()
self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called)
self.assertTrue(conn.reactor.close.called)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_wait(self, mock_reactor):
conn = impl_zmq.Connection(self.driver)
conn.reactor.wait = mock.Mock()
conn.wait()
self.assertTrue(conn.reactor.wait.called)
@mock.patch('oslo.messaging._drivers.impl_zmq._get_matchmaker',
autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_consume_in_thread(self, mock_reactor,
mock_getmatchmaker):
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
conn = impl_zmq.Connection(self.driver)
conn.reactor.consume_in_thread = mock.Mock()
conn.consume_in_thread()
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
self.assertTrue(conn.reactor.consume_in_thread.called)
class TestZmqListener(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqListener, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
def test_zmqlistener_no_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
# Timeout = 0 should return straight away since the queue is empty
listener.poll(timeout=0)
def test_zmqlistener_w_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
kwargs = {'a': 1, 'b': 2}
m = mock.Mock()
ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
eventlet.spawn_n(listener.dispatch, ctxt, 0,
m.fake_method, 'name.space', **kwargs)
resp = listener.poll(timeout=10)
msg = {'method': m.fake_method, 'namespace': 'name.space',
'args': kwargs}
self.assertEqual(resp.message, msg)
class TestZmqDriver(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqDriver, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
@mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True)
def test_zmqdriver_send(self, mock_multi_send, mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic'
msg = 'jeronimo'
self.driver.send(messaging.Target(topic=topic), context, msg,
False, 0, False)
mock_multi_send.assert_called_with(mock_cast, context, topic, msg,
allowed_remote_exmods=[],
envelope=False)
@mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True)
def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic.foo'
topic_reformat = 'testtopic-foo'
msg = 'jeronimo'
self.driver.send_notification(messaging.Target(topic=topic), context,
msg, False, False)
mock_multi_send.assert_called_with(mock_cast, context, topic_reformat,
msg, allowed_remote_exmods=[],
envelope=False)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq.Connection', autospec=True)
def test_zmqdriver_listen(self, mock_connection, mock_listener):
mock_listener.return_value = listener = mock.Mock()
mock_connection.return_value = conn = mock.Mock()
conn.create_consumer = mock.Mock()
conn.consume_in_thread = mock.Mock()
topic = 'testtopic.foo'
self.driver.listen(messaging.Target(topic=topic))
conn.create_consumer.assert_called_with(topic, listener, fanout=True)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq.Connection', autospec=True)
def test_zmqdriver_listen_for_notification(self, mock_connection,
mock_listener):
mock_listener.return_value = listener = mock.Mock()
mock_connection.return_value = conn = mock.Mock()
conn.create_consumer = mock.Mock()
conn.consume_in_thread = mock.Mock()
topic = 'testtopic.foo'
data = [(messaging.Target(topic=topic), 0)]
# NOTE(jamespage): Pooling not supported, just pass None for now.
self.driver.listen_for_notifications(data, None)
conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener)