diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index f2677c262..ce5c21e0e 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -49,12 +49,6 @@ amqp_opts = [ default=False, deprecated_group='DEFAULT', help='Auto-delete queues in AMQP.'), - - # FIXME(markmc): this was toplevel in openstack.common.rpc - cfg.IntOpt('rpc_conn_pool_size', - default=30, - deprecated_group='DEFAULT', - help='Size of RPC connection pool.'), ] UNIQUE_ID = '_unique_id' diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index af0866c91..73066bf0d 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -17,8 +17,15 @@ import abc import six +from oslo.config import cfg from oslo_messaging import exceptions +base_opts = [ + cfg.IntOpt('rpc_conn_pool_size', + default=30, + help='Size of RPC connection pool.'), +] + class TransportDriverError(exceptions.MessagingException): """Base class for transport driver specific exceptions.""" diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py index 487952f9f..25b8a103a 100644 --- a/oslo_messaging/_drivers/impl_qpid.py +++ b/oslo_messaging/_drivers/impl_qpid.py @@ -28,6 +28,7 @@ import six from oslo_messaging._drivers import amqp as rpc_amqp from oslo_messaging._drivers import amqpdriver +from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common from oslo_messaging._i18n import _ from oslo_messaging import exceptions @@ -783,6 +784,7 @@ class QpidDriver(amqpdriver.AMQPDriverBase): conf.register_group(opt_group) conf.register_opts(qpid_opts, group=opt_group) conf.register_opts(rpc_amqp.amqp_opts, group=opt_group) + conf.register_opts(base.base_opts, group=opt_group) connection_pool = rpc_amqp.ConnectionPool( conf, conf.oslo_messaging_qpid.rpc_conn_pool_size, diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index ee2da428e..d5582f007 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -34,6 +34,7 @@ from six.moves.urllib import parse from oslo_messaging._drivers import amqp as rpc_amqp from oslo_messaging._drivers import amqpdriver +from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common from oslo_messaging._i18n import _ from oslo_messaging._i18n import _LE @@ -1161,6 +1162,7 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): conf.register_group(opt_group) conf.register_opts(rabbit_opts, group=opt_group) conf.register_opts(rpc_amqp.amqp_opts, group=opt_group) + conf.register_opts(base.base_opts, group=opt_group) connection_pool = rpc_amqp.ConnectionPool( conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size, diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 0e0684309..5f97e7820 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -37,6 +37,7 @@ from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common from oslo_messaging._executors import base as executor_base # FIXME(markmc) from oslo_messaging._i18n import _, _LE, _LW +from oslo_messaging._drivers import pool zmq = importutils.try_import('eventlet.green.zmq') @@ -117,8 +118,8 @@ class ZmqSocket(object): Can be used as a Context (supports the 'with' statement). """ - def __init__(self, addr, zmq_type, bind=True, subscribe=None): - self.ctxt = zmq.Context(CONF.rpc_zmq_contexts) + def __init__(self, addr, zmq_type, bind=True, subscribe=None, ctxt=None): + self.ctxt = ctxt or zmq.Context(CONF.rpc_zmq_contexts) self.sock = self.ctxt.socket(zmq_type) # Enable IPv6-support in libzmq. @@ -236,8 +237,9 @@ class ZmqSocket(object): class ZmqClient(object): """Client for ZMQ sockets.""" - def __init__(self, addr): - self.outq = ZmqSocket(addr, zmq.PUSH, bind=False) + def __init__(self, addr, ctxt=None): + self.address = addr + self.outq = ZmqSocket(addr, zmq.PUSH, bind=False, ctxt=ctxt) def cast(self, msg_id, topic, data, envelope): msg_id = msg_id or 0 @@ -259,6 +261,67 @@ class ZmqClient(object): self.outq.close() +class ZmqClientContext(object): + """This is essentially a wrapper around ZmqClient that supports 'with'. + It can also return a new ZmqClient, or one from a pool. + + The function will also catch when an instance of this class is to be + deleted. With that we can return ZmqClients to the pool on exceptions + and so forth without making the caller be responsible for catching them. + If possible the function makes sure to return a client to the pool. + + Based on amqp.ConnectionContext. + """ + + def __init__(self, address, connection_pool=None, pooled=False): + self.connection = None + self.connection_pool = connection_pool + self.pooled = pooled + if self.pooled and self.connection_pool is not None: + self.connection = self.connection_pool.get(address) + else: + self.connection = ZmqClient(address) + + def __enter__(self): + """When with ZmqClientContext() is used, return self.""" + return self + + def _done(self): + """If the client came from a pool, clean it up and put it back. + If it did not come from a pool, close it. + """ + if self.connection: + if self.pooled and self.connection_pool is not None: + # Reset the connection so it's ready for the next caller + # to grab from the pool + self.connection_pool.put(self.connection) + else: + try: + self.connection.close() + except Exception: + pass + self.connection = None + + def __exit__(self, exc_type, exc_value, tb): + """End of 'with' statement. We're done here.""" + self._done() + + def __del__(self): + """Caller is done with this client. Make sure we cleaned up.""" + self._done() + + def close(self): + """Caller is done with this client.""" + self._done() + + def __getattr__(self, key): + """Proxy all other calls to the ZmqClient instance.""" + if self.connection: + return getattr(self.connection, key) + else: + raise rpc_common.InvalidRPCConnectionReuse() + + class RpcContext(rpc_common.CommonRpcContext): """Context that supports replying to a rpc.call.""" def __init__(self, **kwargs): @@ -320,7 +383,7 @@ class InternalContext(object): return {'exc': rpc_common.serialize_remote_exception(sys.exc_info())} - def reply(self, ctx, proxy, + def reply(self, driver, ctx, proxy, msg_id=None, context=None, topic=None, msg=None): """Reply to a casted call.""" # NOTE(ewindisch): context kwarg exists for Grizzly compat. @@ -336,19 +399,20 @@ class InternalContext(object): ctx.replies) LOG.debug("Sending reply") - _multi_send(_cast, ctx, topic, { + _multi_send(driver, _cast, ctx, topic, { 'method': '-process_reply', 'args': { 'msg_id': msg_id, # Include for Folsom compat. 'response': response } - }, _msg_id=msg_id) + }, _msg_id=msg_id, pooled=True) class ConsumerBase(object): """Base Consumer.""" - def __init__(self): + def __init__(self, driver): + self.driver = driver self.private_ctx = InternalContext(None) @classmethod @@ -371,7 +435,7 @@ class ConsumerBase(object): # Internal method # uses internal context for safety. if method == '-reply': - self.private_ctx.reply(ctx, proxy, **data['args']) + self.private_ctx.reply(self.driver, ctx, proxy, **data['args']) return proxy.dispatch(ctx, data) @@ -383,9 +447,10 @@ class ZmqBaseReactor(ConsumerBase): Used for RoundRobin requests. """ - def __init__(self, conf): - super(ZmqBaseReactor, self).__init__() + def __init__(self, conf, driver=None): + super(ZmqBaseReactor, self).__init__(driver) + self.driver = driver self.proxies = {} self.threads = [] self.sockets = [] @@ -564,8 +629,8 @@ class ZmqReactor(ZmqBaseReactor): Can also be used as a 1:1 proxy """ - def __init__(self, conf): - super(ZmqReactor, self).__init__(conf) + def __init__(self, conf, driver): + super(ZmqReactor, self).__init__(conf, driver) def consume(self, sock): # TODO(ewindisch): use zero-copy (i.e. references, not copying) @@ -598,9 +663,9 @@ class ZmqReactor(ZmqBaseReactor): class Connection(rpc_common.Connection): """Manages connections and threads.""" - def __init__(self, conf): + def __init__(self, conf, driver): self.topics = [] - self.reactor = ZmqReactor(conf) + self.reactor = ZmqReactor(conf, driver) def create_consumer(self, topic, proxy, fanout=False): # Register with matchmaker. @@ -653,8 +718,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, topic, msg, timeout=None, envelope=False, - _msg_id=None, allowed_remote_exmods=None): +def _cast(driver, addr, context, topic, msg, timeout=None, envelope=False, + _msg_id=None, allowed_remote_exmods=None, pooled=False): allowed_remote_exmods = allowed_remote_exmods or [] timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -662,21 +727,16 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False, topic = topic.encode('utf-8') with Timeout(timeout_cast, exception=rpc_common.Timeout): - conn = None - try: - conn = ZmqClient(addr) - - # assumes cast can't return an exception - conn.cast(_msg_id, topic, payload, envelope) - except zmq.ZMQError: - raise RPCException("Cast failed. ZMQ Socket Exception") - finally: - if conn is not None: - conn.close() + with driver.get_connection(addr, pooled) as conn: + try: + # assumes cast can't return an exception + conn.cast(_msg_id, topic, payload, envelope) + except zmq.ZMQError: + raise RPCException("Cast failed. ZMQ Socket Exception") -def _call(addr, context, topic, msg, timeout=None, - envelope=False, allowed_remote_exmods=None): +def _call(driver, addr, context, topic, msg, timeout=None, + envelope=False, allowed_remote_exmods=None, pooled=False): allowed_remote_exmods = allowed_remote_exmods or [] # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -714,7 +774,8 @@ def _call(addr, context, topic, msg, timeout=None, ) LOG.debug("Sending cast: %s", topic) - _cast(addr, context, topic, payload, envelope=envelope) + _cast(driver, addr, context, topic, payload, envelope=envelope, + pooled=pooled) LOG.debug("Cast sent; Waiting reply") # Blocks until receives reply @@ -755,8 +816,9 @@ def _call(addr, context, topic, msg, timeout=None, return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None, - envelope=False, _msg_id=None, allowed_remote_exmods=None): +def _multi_send(driver, method, context, topic, msg, timeout=None, + envelope=False, _msg_id=None, allowed_remote_exmods=None, + pooled=False): """Wraps the sending of messages. Dispatches to the matchmaker and sends message to all relevant hosts. @@ -787,11 +849,12 @@ def _multi_send(method, context, topic, msg, timeout=None, _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port) if method.__name__ == '_cast': - eventlet.spawn_n(method, _addr, context, - _topic, msg, timeout, envelope, _msg_id) + eventlet.spawn_n(method, driver, _addr, context, + _topic, msg, timeout, envelope, _msg_id, + None, pooled) else: - return_val = method(_addr, context, _topic, msg, timeout, - envelope, allowed_remote_exmods) + return_val = method(driver, _addr, context, _topic, msg, timeout, + envelope, allowed_remote_exmods, pooled) return return_val @@ -871,6 +934,50 @@ class ZmqListener(base.Listener): return None +class ZmqClientPool(pool.Pool): + """Class that implements a pool of Zmq Clients for a single endpoint""" + def __init__(self, conf, address, connection_cls, ctxt): + self.connection_cls = connection_cls + self.ctxt = ctxt + self.address = address + super(ZmqClientPool, self).__init__(conf.rpc_conn_pool_size) + + def create(self): + LOG.debug('Pool creating new ZMQ connection for %s' % self.address) + return self.connection_cls(self.address, self.ctxt) + + def empty(self): + for item in self.iter_free(): + item.close() + + +class ZmqClientPoolManager(object): + """Class that manages pools of clients for Zmq endpoints""" + + def __init__(self, conf, ctxt=None): + self._pools = {} + self._lock = threading.Lock() + self.conf = conf + self.ctxt = ctxt + + def get(self, address): + if address not in self._pools: + with self._lock: + if address not in self._pools: + self._pools[address] = ZmqClientPool(self.conf, + address, + ZmqClient, + self.ctxt) + return self._pools[address].get() + + def put(self, item): + self._pools[item.address].put(item) + + def empty(self): + for p in self._pools: + self._pools[p].empty() + + class ZmqDriver(base.BaseDriver): # FIXME(markmc): allow this driver to be used without eventlet @@ -881,6 +988,7 @@ class ZmqDriver(base.BaseDriver): raise ImportError("Failed to import eventlet.green.zmq") conf.register_opts(zmq_opts) conf.register_opts(executor_base._pool_opts) + conf.register_opts(base.base_opts) super(ZmqDriver, self).__init__(conf, url, default_exchange, allowed_remote_exmods) @@ -899,6 +1007,33 @@ class ZmqDriver(base.BaseDriver): self.listeners = [] + # NOTE(jamespage): Create pool manager on first use to deal with + # os.fork calls in openstack daemons. + self._pool = None + self._pid = None + self._lock = threading.Lock() + + def _configure_pool_manager(func): + """Causes a new pool manager to be created when the messaging service + is first used by the current process. This is important as all + connections in the pools manager by the pool manager will share the + same ZMQ context, which must not be shared across OS processes. + """ + def wrap(self, *args, **kws): + with self._lock: + old_pid = self._pid + self._pid = os.getpid() + + if old_pid != self._pid: + # Create fresh pool manager for the current process + # along with a new ZMQ context. + self._pool = ZmqClientPoolManager( + self.conf, + zmq.Context(self.conf.rpc_zmq_contexts) + ) + return func(self, *args, **kws) + return wrap + def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, envelope=False): @@ -915,19 +1050,22 @@ class ZmqDriver(base.BaseDriver): elif target.server: topic = '%s.%s' % (topic, target.server) - reply = _multi_send(method, ctxt, topic, message, + reply = _multi_send(self, method, ctxt, topic, message, envelope=envelope, - allowed_remote_exmods=self._allowed_remote_exmods) + allowed_remote_exmods=self._allowed_remote_exmods, + pooled=True) if wait_for_reply: return reply[-1] + @_configure_pool_manager def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, retry=None): # NOTE(sileht): retry is not implemented because this driver never # retry anything return self._send(target, ctxt, message, wait_for_reply, timeout) + @_configure_pool_manager def send_notification(self, target, ctxt, message, version, retry=None): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. @@ -936,8 +1074,9 @@ class ZmqDriver(base.BaseDriver): target = target(topic=target.topic.replace('.', '-')) return self._send(target, ctxt, message, envelope=(version == 2.0)) + @_configure_pool_manager def listen(self, target): - conn = Connection(self.conf) + conn = Connection(self.conf, self) listener = ZmqListener(self) @@ -951,12 +1090,13 @@ class ZmqDriver(base.BaseDriver): return listener + @_configure_pool_manager def listen_for_notifications(self, targets_and_priorities, pool): # NOTE(sileht): this listener implementation is limited # because zeromq doesn't support: # * requeing message # * pool - conn = Connection(self.conf) + conn = Connection(self.conf, self) listener = ZmqListener(self) for target, priority in targets_and_priorities: @@ -974,3 +1114,8 @@ class ZmqDriver(base.BaseDriver): for c in self.listeners: c.close() self.listeners = [] + if self._pool: + self._pool.empty() + + def get_connection(self, address, pooled=False): + return ZmqClientContext(address, self._pool, pooled) diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index 664e586b9..5911b69d7 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -21,6 +21,7 @@ import copy import itertools from oslo_messaging._drivers import amqp +from oslo_messaging._drivers import base as drivers_base from oslo_messaging._drivers import impl_qpid from oslo_messaging._drivers import impl_rabbit from oslo_messaging._drivers import impl_zmq @@ -34,6 +35,7 @@ from oslo_messaging.rpc import client from oslo_messaging import transport _global_opt_lists = [ + drivers_base.base_opts, impl_zmq.zmq_opts, matchmaker.matchmaker_opts, base._pool_opts, diff --git a/oslo_messaging/tests/drivers/test_impl_zmq.py b/oslo_messaging/tests/drivers/test_impl_zmq.py index 626c35bf5..85e5dd377 100644 --- a/oslo_messaging/tests/drivers/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/test_impl_zmq.py @@ -150,10 +150,11 @@ class TestZmqBasics(ZmqBaseTestCase): self.assertEqual(result, True) mock_call.assert_called_once_with( + self.driver, 'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'], {}, 'fanout~testtopic.127.0.0.1', {'tx_id': 1, 'method': 'hello-world'}, - None, False, []) + None, False, [], True) @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True) def test_send_receive_direct(self, mock_call): @@ -171,10 +172,11 @@ class TestZmqBasics(ZmqBaseTestCase): self.assertEqual(result, True) mock_call.assert_called_once_with( + self.driver, 'tcp://localhost:%s' % self.conf['rpc_zmq_port'], {}, 'testtopic.localhost', {'tx_id': 1, 'method': 'hello-world'}, - None, False, []) + None, False, [], True) class TestZmqSocket(test_utils.BaseTestCase): @@ -291,7 +293,7 @@ class TestZmqConnection(ZmqBaseTestCase): def test_zmqconnection_create_consumer(self, mock_reactor): mock_reactor.register = mock.Mock() - conn = impl_zmq.Connection(self.driver) + conn = impl_zmq.Connection(self.driver.conf, self.driver) topic = 'topic.foo' context = mock.Mock() inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' % @@ -317,7 +319,7 @@ class TestZmqConnection(ZmqBaseTestCase): @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor): mock_reactor.register = mock.Mock() - conn = impl_zmq.Connection(self.driver) + conn = impl_zmq.Connection(self.driver.conf, self.driver) topic = 'topic.foo' context = mock.Mock() inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' % @@ -335,7 +337,7 @@ class TestZmqConnection(ZmqBaseTestCase): autospec=True) @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker): - conn = impl_zmq.Connection(self.driver) + conn = impl_zmq.Connection(self.driver.conf, self.driver) conn.reactor.close = mock.Mock() mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock() conn.close() @@ -344,7 +346,7 @@ class TestZmqConnection(ZmqBaseTestCase): @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) def test_zmqconnection_wait(self, mock_reactor): - conn = impl_zmq.Connection(self.driver) + conn = impl_zmq.Connection(self.driver, self.driver) conn.reactor.wait = mock.Mock() conn.wait() self.assertTrue(conn.reactor.wait.called) @@ -355,7 +357,7 @@ class TestZmqConnection(ZmqBaseTestCase): def test_zmqconnection_consume_in_thread(self, mock_reactor, mock_getmatchmaker): mock_getmatchmaker.return_value.start_heartbeat = mock.Mock() - conn = impl_zmq.Connection(self.driver) + conn = impl_zmq.Connection(self.driver, self.driver) conn.reactor.consume_in_thread = mock.Mock() conn.consume_in_thread() self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called) @@ -397,7 +399,8 @@ class TestZmqDriver(ZmqBaseTestCase): with mock.patch.object(impl_zmq.LOG, 'warn') as flog: mock_queues.return_value = None - impl_zmq._multi_send(mock_cast, context, topic, msg) + impl_zmq._multi_send(self.driver, mock_cast, + context, topic, msg) self.assertEqual(1, flog.call_count) args, kwargs = flog.call_args self.assertIn('No matchmaker results', args[0]) @@ -414,7 +417,7 @@ class TestZmqDriver(ZmqBaseTestCase): mock_queues.return_value = None self.assertRaises(rpc_common.Timeout, - impl_zmq._multi_send, + impl_zmq._multi_send, self.driver, mock_call, context, topic, msg) @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True) @@ -425,9 +428,10 @@ class TestZmqDriver(ZmqBaseTestCase): msg = 'jeronimo' self.driver.send(oslo_messaging.Target(topic=topic), context, msg, False, 0, False) - mock_multi_send.assert_called_with(mock_cast, context, topic, msg, + mock_multi_send.assert_called_with(self.driver, mock_cast, context, + topic, msg, allowed_remote_exmods=[], - envelope=False) + envelope=False, pooled=True) @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True) @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True) @@ -438,9 +442,10 @@ class TestZmqDriver(ZmqBaseTestCase): msg = 'jeronimo' self.driver.send_notification(oslo_messaging.Target(topic=topic), context, msg, False, False) - mock_multi_send.assert_called_with(mock_cast, context, topic_reformat, - msg, allowed_remote_exmods=[], - envelope=False) + mock_multi_send.assert_called_with(self.driver, mock_cast, context, + topic_reformat, msg, + allowed_remote_exmods=[], + envelope=False, pooled=True) @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True) @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True) diff --git a/tests/drivers/test_impl_zmq.py b/tests/drivers/test_impl_zmq.py index 6af1716e3..ddc6753ea 100644 --- a/tests/drivers/test_impl_zmq.py +++ b/tests/drivers/test_impl_zmq.py @@ -150,10 +150,11 @@ class TestZmqBasics(ZmqBaseTestCase): self.assertEqual(result, True) mock_call.assert_called_once_with( + self.driver, 'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'], {}, 'fanout~testtopic.127.0.0.1', {'tx_id': 1, 'method': 'hello-world'}, - None, False, []) + None, False, [], True) @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True) def test_send_receive_direct(self, mock_call): @@ -171,10 +172,11 @@ class TestZmqBasics(ZmqBaseTestCase): self.assertEqual(result, True) mock_call.assert_called_once_with( + self.driver, 'tcp://localhost:%s' % self.conf['rpc_zmq_port'], {}, 'testtopic.localhost', {'tx_id': 1, 'method': 'hello-world'}, - None, False, []) + None, False, [], True) class TestZmqSocket(test_utils.BaseTestCase): @@ -291,7 +293,7 @@ class TestZmqConnection(ZmqBaseTestCase): def test_zmqconnection_create_consumer(self, mock_reactor): mock_reactor.register = mock.Mock() - conn = impl_zmq.Connection(self.driver) + conn = impl_zmq.Connection(self.driver.conf, self.driver) topic = 'topic.foo' context = mock.Mock() inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' % @@ -317,7 +319,7 @@ class TestZmqConnection(ZmqBaseTestCase): @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor): mock_reactor.register = mock.Mock() - conn = impl_zmq.Connection(self.driver) + conn = impl_zmq.Connection(self.driver.conf, self.driver) topic = 'topic.foo' context = mock.Mock() inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' % @@ -335,7 +337,7 @@ class TestZmqConnection(ZmqBaseTestCase): autospec=True) @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker): - conn = impl_zmq.Connection(self.driver) + conn = impl_zmq.Connection(self.driver.conf, self.driver) conn.reactor.close = mock.Mock() mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock() conn.close() @@ -344,7 +346,7 @@ class TestZmqConnection(ZmqBaseTestCase): @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) def test_zmqconnection_wait(self, mock_reactor): - conn = impl_zmq.Connection(self.driver) + conn = impl_zmq.Connection(self.driver.conf, self.driver) conn.reactor.wait = mock.Mock() conn.wait() self.assertTrue(conn.reactor.wait.called) @@ -355,7 +357,7 @@ class TestZmqConnection(ZmqBaseTestCase): def test_zmqconnection_consume_in_thread(self, mock_reactor, mock_getmatchmaker): mock_getmatchmaker.return_value.start_heartbeat = mock.Mock() - conn = impl_zmq.Connection(self.driver) + conn = impl_zmq.Connection(self.driver.conf, self.driver) conn.reactor.consume_in_thread = mock.Mock() conn.consume_in_thread() self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called) @@ -393,9 +395,10 @@ class TestZmqDriver(ZmqBaseTestCase): msg = 'jeronimo' self.driver.send(messaging.Target(topic=topic), context, msg, False, 0, False) - mock_multi_send.assert_called_with(mock_cast, context, topic, msg, + mock_multi_send.assert_called_with(self.driver, mock_cast, context, + topic, msg, allowed_remote_exmods=[], - envelope=False) + envelope=False, pooled=True) @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True) @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True) @@ -406,9 +409,10 @@ class TestZmqDriver(ZmqBaseTestCase): msg = 'jeronimo' self.driver.send_notification(messaging.Target(topic=topic), context, msg, False, False) - mock_multi_send.assert_called_with(mock_cast, context, topic_reformat, - msg, allowed_remote_exmods=[], - envelope=False) + mock_multi_send.assert_called_with(self.driver, mock_cast, context, + topic_reformat, msg, + allowed_remote_exmods=[], + envelope=False, pooled=True) @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True) @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)