de015d5c83
To avoid creating a new ZMQ connection for every message sent to a remote broker, implement pooling and re-use of ZmqClient objects and associated ZMQ context. A pool is created for each remote endpoint (keyed by address); the size of each pool is configured using rpc_conn_pool_size. All outbound message client connections are pooled. Closes-Bug: 1384113 Change-Id: Ia55d5c310a56e51df5e2f5d39e561a4da3fe4d83
474 lines
19 KiB
Python
474 lines
19 KiB
Python
# Copyright 2014 Canonical, Ltd.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
import socket
|
|
|
|
import fixtures
|
|
from oslo_utils import importutils
|
|
import testtools
|
|
|
|
try:
|
|
import zmq
|
|
except ImportError:
|
|
zmq = None
|
|
|
|
import oslo_messaging
|
|
from oslo_messaging._drivers import common as rpc_common
|
|
from oslo_messaging.tests import utils as test_utils
|
|
from six.moves import mock
|
|
|
|
# eventlet is not yet py3 compatible, so skip if not installed
|
|
eventlet = importutils.try_import('eventlet')
|
|
|
|
impl_zmq = importutils.try_import('oslo_messaging._drivers.impl_zmq')
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
def get_unused_port():
|
|
"""Returns an unused port on localhost."""
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
s.bind(('localhost', 0))
|
|
port = s.getsockname()[1]
|
|
s.close()
|
|
return port
|
|
|
|
|
|
class ZmqBaseTestCase(test_utils.BaseTestCase):
|
|
"""Base test case for all ZMQ tests that make use of the ZMQ Proxy"""
|
|
|
|
@testtools.skipIf(zmq is None, "zmq not available")
|
|
def setUp(self):
|
|
super(ZmqBaseTestCase, self).setUp()
|
|
self.messaging_conf.transport_driver = 'zmq'
|
|
# Get driver
|
|
transport = oslo_messaging.get_transport(self.conf)
|
|
self.driver = transport._driver
|
|
|
|
# Set config values
|
|
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
|
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
|
|
'rpc_zmq_host': '127.0.0.1',
|
|
'rpc_response_timeout': 5,
|
|
'rpc_zmq_port': get_unused_port(),
|
|
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
|
|
self.config(**kwargs)
|
|
|
|
# Start RPC
|
|
LOG.info("Running internal zmq receiver.")
|
|
self.reactor = impl_zmq.ZmqProxy(self.conf)
|
|
self.reactor.consume_in_thread()
|
|
|
|
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
|
|
self.addCleanup(stopRpc(self.__dict__))
|
|
|
|
|
|
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
|
|
|
|
@testtools.skipIf(zmq is None, "zmq not available")
|
|
def setUp(self):
|
|
super(TestConfZmqDriverLoad, self).setUp()
|
|
self.messaging_conf.transport_driver = 'zmq'
|
|
|
|
def test_driver_load(self):
|
|
transport = oslo_messaging.get_transport(self.conf)
|
|
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
|
|
|
|
|
|
class stopRpc(object):
|
|
def __init__(self, attrs):
|
|
self.attrs = attrs
|
|
|
|
def __call__(self):
|
|
if self.attrs['reactor']:
|
|
self.attrs['reactor'].close()
|
|
if self.attrs['driver']:
|
|
self.attrs['driver'].cleanup()
|
|
|
|
|
|
class TestZmqBasics(ZmqBaseTestCase):
|
|
|
|
def test_start_stop_listener(self):
|
|
target = oslo_messaging.Target(topic='testtopic')
|
|
listener = self.driver.listen(target)
|
|
result = listener.poll(0.01)
|
|
self.assertEqual(result, None)
|
|
|
|
def test_send_receive_raises(self):
|
|
"""Call() without method."""
|
|
target = oslo_messaging.Target(topic='testtopic')
|
|
self.driver.listen(target)
|
|
self.assertRaises(
|
|
KeyError,
|
|
self.driver.send,
|
|
target, {}, {'tx_id': 1}, wait_for_reply=True)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqIncomingMessage')
|
|
def test_send_receive_topic(self, mock_msg):
|
|
"""Call() with method."""
|
|
mock_msg.return_value = msg = mock.MagicMock()
|
|
msg.received = received = mock.MagicMock()
|
|
received.failure = False
|
|
received.reply = True
|
|
msg.condition = condition = mock.MagicMock()
|
|
condition.wait.return_value = True
|
|
|
|
target = oslo_messaging.Target(topic='testtopic')
|
|
self.driver.listen(target)
|
|
result = self.driver.send(
|
|
target, {},
|
|
{'method': 'hello-world', 'tx_id': 1},
|
|
wait_for_reply=True)
|
|
self.assertEqual(result, True)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
|
|
def test_send_receive_fanout(self, mock_call):
|
|
target = oslo_messaging.Target(topic='testtopic', fanout=True)
|
|
self.driver.listen(target)
|
|
|
|
mock_call.__name__ = '_call'
|
|
mock_call.return_value = [True]
|
|
|
|
result = self.driver.send(
|
|
target, {},
|
|
{'method': 'hello-world', 'tx_id': 1},
|
|
wait_for_reply=True)
|
|
|
|
self.assertEqual(result, True)
|
|
mock_call.assert_called_once_with(
|
|
self.driver,
|
|
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
|
|
{}, 'fanout~testtopic.127.0.0.1',
|
|
{'tx_id': 1, 'method': 'hello-world'},
|
|
None, False, [], True)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
|
|
def test_send_receive_direct(self, mock_call):
|
|
# Also verifies fix for bug http://pad.lv/1301723
|
|
target = oslo_messaging.Target(topic='testtopic', server='localhost')
|
|
self.driver.listen(target)
|
|
|
|
mock_call.__name__ = '_call'
|
|
mock_call.return_value = [True]
|
|
|
|
result = self.driver.send(
|
|
target, {},
|
|
{'method': 'hello-world', 'tx_id': 1},
|
|
wait_for_reply=True)
|
|
|
|
self.assertEqual(result, True)
|
|
mock_call.assert_called_once_with(
|
|
self.driver,
|
|
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
|
|
{}, 'testtopic.localhost',
|
|
{'tx_id': 1, 'method': 'hello-world'},
|
|
None, False, [], True)
|
|
|
|
|
|
class TestZmqSocket(test_utils.BaseTestCase):
|
|
|
|
@testtools.skipIf(zmq is None, "zmq not available")
|
|
def setUp(self):
|
|
super(TestZmqSocket, self).setUp()
|
|
self.messaging_conf.transport_driver = 'zmq'
|
|
# Get driver
|
|
transport = oslo_messaging.get_transport(self.conf)
|
|
self.driver = transport._driver
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
|
|
def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe):
|
|
mock_ctxt = mock.Mock()
|
|
mock_context.return_value = mock_ctxt
|
|
mock_sock = mock.Mock()
|
|
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
|
|
mock_sock.connect = mock.Mock()
|
|
mock_sock.bind = mock.Mock()
|
|
addr = '127.0.0.1'
|
|
|
|
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False,
|
|
subscribe=None)
|
|
self.assertTrue(sock.can_recv)
|
|
self.assertFalse(sock.can_send)
|
|
self.assertFalse(sock.can_sub)
|
|
self.assertTrue(mock_sock.connect.called)
|
|
self.assertFalse(mock_sock.bind.called)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
|
|
def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe):
|
|
mock_ctxt = mock.Mock()
|
|
mock_context.return_value = mock_ctxt
|
|
mock_sock = mock.Mock()
|
|
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
|
|
mock_sock.connect = mock.Mock()
|
|
mock_sock.bind = mock.Mock()
|
|
addr = '127.0.0.1'
|
|
|
|
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False,
|
|
subscribe=None)
|
|
self.assertTrue(sock.can_recv)
|
|
self.assertFalse(sock.can_send)
|
|
self.assertTrue(sock.can_sub)
|
|
self.assertTrue(mock_sock.connect.called)
|
|
self.assertFalse(mock_sock.bind.called)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
|
|
def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe):
|
|
mock_ctxt = mock.Mock()
|
|
mock_context.return_value = mock_ctxt
|
|
mock_sock = mock.Mock()
|
|
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
|
|
mock_sock.connect = mock.Mock()
|
|
mock_sock.bind = mock.Mock()
|
|
addr = '127.0.0.1'
|
|
|
|
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False,
|
|
subscribe=None)
|
|
self.assertFalse(sock.can_recv)
|
|
self.assertTrue(sock.can_send)
|
|
self.assertFalse(sock.can_sub)
|
|
self.assertTrue(mock_sock.connect.called)
|
|
self.assertFalse(mock_sock.bind.called)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
|
|
def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe):
|
|
mock_ctxt = mock.Mock()
|
|
mock_context.return_value = mock_ctxt
|
|
mock_sock = mock.Mock()
|
|
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
|
|
mock_sock.connect = mock.Mock()
|
|
mock_sock.bind = mock.Mock()
|
|
addr = '127.0.0.1'
|
|
|
|
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False,
|
|
subscribe=None)
|
|
self.assertFalse(sock.can_recv)
|
|
self.assertTrue(sock.can_send)
|
|
self.assertFalse(sock.can_sub)
|
|
self.assertTrue(mock_sock.connect.called)
|
|
self.assertFalse(mock_sock.bind.called)
|
|
|
|
|
|
class TestZmqIncomingMessage(test_utils.BaseTestCase):
|
|
|
|
@testtools.skipIf(zmq is None, "zmq not available")
|
|
def setUp(self):
|
|
super(TestZmqIncomingMessage, self).setUp()
|
|
self.messaging_conf.transport_driver = 'zmq'
|
|
# Get driver
|
|
transport = oslo_messaging.get_transport(self.conf)
|
|
self.driver = transport._driver
|
|
|
|
def test_zmqincomingmessage(self):
|
|
msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo')
|
|
msg.reply("abc")
|
|
self.assertIsInstance(
|
|
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
|
|
self.assertIsInstance(
|
|
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
|
|
self.assertEqual(msg.received.reply, "abc")
|
|
msg.requeue()
|
|
|
|
|
|
class TestZmqConnection(ZmqBaseTestCase):
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
|
def test_zmqconnection_create_consumer(self, mock_reactor):
|
|
|
|
mock_reactor.register = mock.Mock()
|
|
conn = impl_zmq.Connection(self.driver.conf, self.driver)
|
|
topic = 'topic.foo'
|
|
context = mock.Mock()
|
|
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
|
|
(self.internal_ipc_dir))
|
|
# No Fanout
|
|
conn.create_consumer(topic, context)
|
|
conn.reactor.register.assert_called_with(context, inaddr,
|
|
impl_zmq.zmq.PULL,
|
|
subscribe=None, in_bind=False)
|
|
|
|
# Reset for next bunch of checks
|
|
conn.reactor.register.reset_mock()
|
|
|
|
# Fanout
|
|
inaddr = ('ipc://%s/zmq_topic_fanout~topic' %
|
|
(self.internal_ipc_dir))
|
|
conn.create_consumer(topic, context, fanout='subscriber.foo')
|
|
conn.reactor.register.assert_called_with(context, inaddr,
|
|
impl_zmq.zmq.SUB,
|
|
subscribe='subscriber.foo',
|
|
in_bind=False)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
|
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
|
|
mock_reactor.register = mock.Mock()
|
|
conn = impl_zmq.Connection(self.driver.conf, self.driver)
|
|
topic = 'topic.foo'
|
|
context = mock.Mock()
|
|
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
|
|
(self.internal_ipc_dir))
|
|
|
|
conn.create_consumer(topic, context)
|
|
conn.reactor.register.assert_called_with(
|
|
context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False)
|
|
conn.reactor.register.reset_mock()
|
|
# Call again with same topic
|
|
conn.create_consumer(topic, context)
|
|
self.assertFalse(conn.reactor.register.called)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
|
|
autospec=True)
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
|
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
|
|
conn = impl_zmq.Connection(self.driver.conf, self.driver)
|
|
conn.reactor.close = mock.Mock()
|
|
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
|
|
conn.close()
|
|
self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called)
|
|
self.assertTrue(conn.reactor.close.called)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
|
def test_zmqconnection_wait(self, mock_reactor):
|
|
conn = impl_zmq.Connection(self.driver, self.driver)
|
|
conn.reactor.wait = mock.Mock()
|
|
conn.wait()
|
|
self.assertTrue(conn.reactor.wait.called)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
|
|
autospec=True)
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
|
def test_zmqconnection_consume_in_thread(self, mock_reactor,
|
|
mock_getmatchmaker):
|
|
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
|
|
conn = impl_zmq.Connection(self.driver, self.driver)
|
|
conn.reactor.consume_in_thread = mock.Mock()
|
|
conn.consume_in_thread()
|
|
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
|
|
self.assertTrue(conn.reactor.consume_in_thread.called)
|
|
|
|
|
|
class TestZmqListener(ZmqBaseTestCase):
|
|
|
|
def test_zmqlistener_no_msg(self):
|
|
listener = impl_zmq.ZmqListener(self.driver)
|
|
# Timeout = 0 should return straight away since the queue is empty
|
|
listener.poll(timeout=0)
|
|
|
|
def test_zmqlistener_w_msg(self):
|
|
listener = impl_zmq.ZmqListener(self.driver)
|
|
kwargs = {'a': 1, 'b': 2}
|
|
m = mock.Mock()
|
|
ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
|
|
message = {'namespace': 'name.space', 'method': m.fake_method,
|
|
'args': kwargs}
|
|
eventlet.spawn_n(listener.dispatch, ctxt, message)
|
|
resp = listener.poll(timeout=10)
|
|
msg = {'method': m.fake_method, 'namespace': 'name.space',
|
|
'args': kwargs}
|
|
self.assertEqual(resp.message, msg)
|
|
|
|
|
|
class TestZmqDriver(ZmqBaseTestCase):
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
|
|
@mock.patch('oslo_messaging._drivers.matchmaker.MatchMakerBase.queues',
|
|
autospec=True)
|
|
def test_zmqdriver_multi_send_cast_with_no_queues(self,
|
|
mock_queues,
|
|
mock_cast):
|
|
context = mock.Mock(autospec=impl_zmq.RpcContext)
|
|
topic = 'testtopic'
|
|
msg = 'jeronimo'
|
|
|
|
with mock.patch.object(impl_zmq.LOG, 'warn') as flog:
|
|
mock_queues.return_value = None
|
|
impl_zmq._multi_send(self.driver, mock_cast,
|
|
context, topic, msg)
|
|
self.assertEqual(1, flog.call_count)
|
|
args, kwargs = flog.call_args
|
|
self.assertIn('No matchmaker results', args[0])
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
|
|
@mock.patch('oslo_messaging._drivers.matchmaker.MatchMakerBase.queues',
|
|
autospec=True)
|
|
def test_zmqdriver_multi_send_call_with_no_queues(self,
|
|
mock_queues,
|
|
mock_call):
|
|
context = mock.Mock(autospec=impl_zmq.RpcContext)
|
|
topic = 'testtopic'
|
|
msg = 'jeronimo'
|
|
|
|
mock_queues.return_value = None
|
|
self.assertRaises(rpc_common.Timeout,
|
|
impl_zmq._multi_send, self.driver,
|
|
mock_call, context, topic, msg)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
|
|
def test_zmqdriver_send(self, mock_multi_send, mock_cast):
|
|
context = mock.Mock(autospec=impl_zmq.RpcContext)
|
|
topic = 'testtopic'
|
|
msg = 'jeronimo'
|
|
self.driver.send(oslo_messaging.Target(topic=topic), context, msg,
|
|
False, 0, False)
|
|
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
|
|
topic, msg,
|
|
allowed_remote_exmods=[],
|
|
envelope=False, pooled=True)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
|
|
def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast):
|
|
context = mock.Mock(autospec=impl_zmq.RpcContext)
|
|
topic = 'testtopic.foo'
|
|
topic_reformat = 'testtopic-foo'
|
|
msg = 'jeronimo'
|
|
self.driver.send_notification(oslo_messaging.Target(topic=topic),
|
|
context, msg, False, False)
|
|
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
|
|
topic_reformat, msg,
|
|
allowed_remote_exmods=[],
|
|
envelope=False, pooled=True)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
|
|
def test_zmqdriver_listen(self, mock_connection, mock_listener):
|
|
mock_listener.return_value = listener = mock.Mock()
|
|
mock_connection.return_value = conn = mock.Mock()
|
|
conn.create_consumer = mock.Mock()
|
|
conn.consume_in_thread = mock.Mock()
|
|
topic = 'testtopic.foo'
|
|
self.driver.listen(oslo_messaging.Target(topic=topic))
|
|
conn.create_consumer.assert_called_with(topic, listener, fanout=True)
|
|
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
|
|
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
|
|
def test_zmqdriver_listen_for_notification(self, mock_connection,
|
|
mock_listener):
|
|
mock_listener.return_value = listener = mock.Mock()
|
|
mock_connection.return_value = conn = mock.Mock()
|
|
conn.create_consumer = mock.Mock()
|
|
conn.consume_in_thread = mock.Mock()
|
|
topic = 'testtopic.foo'
|
|
data = [(oslo_messaging.Target(topic=topic), 0)]
|
|
# NOTE(jamespage): Pooling not supported, just pass None for now.
|
|
self.driver.listen_for_notifications(data, None)
|
|
conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener)
|