zmq: Add support for ZmqClient pooling
To avoid creating a new ZMQ connection for every message sent to a remote broker, implement pooling and re-use of ZmqClient objects and associated ZMQ context. A pool is created for each remote endpoint (keyed by address); the size of each pool is configured using rpc_conn_pool_size. All outbound message client connections are pooled. Closes-Bug: 1384113 Change-Id: Ia55d5c310a56e51df5e2f5d39e561a4da3fe4d83
This commit is contained in:
parent
287a4f56f4
commit
de015d5c83
@ -49,12 +49,6 @@ amqp_opts = [
|
||||
default=False,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Auto-delete queues in AMQP.'),
|
||||
|
||||
# FIXME(markmc): this was toplevel in openstack.common.rpc
|
||||
cfg.IntOpt('rpc_conn_pool_size',
|
||||
default=30,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Size of RPC connection pool.'),
|
||||
]
|
||||
|
||||
UNIQUE_ID = '_unique_id'
|
||||
|
@ -17,8 +17,15 @@ import abc
|
||||
|
||||
import six
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo_messaging import exceptions
|
||||
|
||||
base_opts = [
|
||||
cfg.IntOpt('rpc_conn_pool_size',
|
||||
default=30,
|
||||
help='Size of RPC connection pool.'),
|
||||
]
|
||||
|
||||
|
||||
class TransportDriverError(exceptions.MessagingException):
|
||||
"""Base class for transport driver specific exceptions."""
|
||||
|
@ -28,6 +28,7 @@ import six
|
||||
|
||||
from oslo_messaging._drivers import amqp as rpc_amqp
|
||||
from oslo_messaging._drivers import amqpdriver
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._i18n import _
|
||||
from oslo_messaging import exceptions
|
||||
@ -783,6 +784,7 @@ class QpidDriver(amqpdriver.AMQPDriverBase):
|
||||
conf.register_group(opt_group)
|
||||
conf.register_opts(qpid_opts, group=opt_group)
|
||||
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
|
||||
conf.register_opts(base.base_opts, group=opt_group)
|
||||
|
||||
connection_pool = rpc_amqp.ConnectionPool(
|
||||
conf, conf.oslo_messaging_qpid.rpc_conn_pool_size,
|
||||
|
@ -34,6 +34,7 @@ from six.moves.urllib import parse
|
||||
|
||||
from oslo_messaging._drivers import amqp as rpc_amqp
|
||||
from oslo_messaging._drivers import amqpdriver
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._i18n import _
|
||||
from oslo_messaging._i18n import _LE
|
||||
@ -1161,6 +1162,7 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
|
||||
conf.register_group(opt_group)
|
||||
conf.register_opts(rabbit_opts, group=opt_group)
|
||||
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
|
||||
conf.register_opts(base.base_opts, group=opt_group)
|
||||
|
||||
connection_pool = rpc_amqp.ConnectionPool(
|
||||
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
|
||||
|
@ -37,6 +37,7 @@ from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._executors import base as executor_base # FIXME(markmc)
|
||||
from oslo_messaging._i18n import _, _LE, _LW
|
||||
from oslo_messaging._drivers import pool
|
||||
|
||||
|
||||
zmq = importutils.try_import('eventlet.green.zmq')
|
||||
@ -117,8 +118,8 @@ class ZmqSocket(object):
|
||||
Can be used as a Context (supports the 'with' statement).
|
||||
"""
|
||||
|
||||
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
|
||||
self.ctxt = zmq.Context(CONF.rpc_zmq_contexts)
|
||||
def __init__(self, addr, zmq_type, bind=True, subscribe=None, ctxt=None):
|
||||
self.ctxt = ctxt or zmq.Context(CONF.rpc_zmq_contexts)
|
||||
self.sock = self.ctxt.socket(zmq_type)
|
||||
|
||||
# Enable IPv6-support in libzmq.
|
||||
@ -236,8 +237,9 @@ class ZmqSocket(object):
|
||||
class ZmqClient(object):
|
||||
"""Client for ZMQ sockets."""
|
||||
|
||||
def __init__(self, addr):
|
||||
self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
|
||||
def __init__(self, addr, ctxt=None):
|
||||
self.address = addr
|
||||
self.outq = ZmqSocket(addr, zmq.PUSH, bind=False, ctxt=ctxt)
|
||||
|
||||
def cast(self, msg_id, topic, data, envelope):
|
||||
msg_id = msg_id or 0
|
||||
@ -259,6 +261,67 @@ class ZmqClient(object):
|
||||
self.outq.close()
|
||||
|
||||
|
||||
class ZmqClientContext(object):
|
||||
"""This is essentially a wrapper around ZmqClient that supports 'with'.
|
||||
It can also return a new ZmqClient, or one from a pool.
|
||||
|
||||
The function will also catch when an instance of this class is to be
|
||||
deleted. With that we can return ZmqClients to the pool on exceptions
|
||||
and so forth without making the caller be responsible for catching them.
|
||||
If possible the function makes sure to return a client to the pool.
|
||||
|
||||
Based on amqp.ConnectionContext.
|
||||
"""
|
||||
|
||||
def __init__(self, address, connection_pool=None, pooled=False):
|
||||
self.connection = None
|
||||
self.connection_pool = connection_pool
|
||||
self.pooled = pooled
|
||||
if self.pooled and self.connection_pool is not None:
|
||||
self.connection = self.connection_pool.get(address)
|
||||
else:
|
||||
self.connection = ZmqClient(address)
|
||||
|
||||
def __enter__(self):
|
||||
"""When with ZmqClientContext() is used, return self."""
|
||||
return self
|
||||
|
||||
def _done(self):
|
||||
"""If the client came from a pool, clean it up and put it back.
|
||||
If it did not come from a pool, close it.
|
||||
"""
|
||||
if self.connection:
|
||||
if self.pooled and self.connection_pool is not None:
|
||||
# Reset the connection so it's ready for the next caller
|
||||
# to grab from the pool
|
||||
self.connection_pool.put(self.connection)
|
||||
else:
|
||||
try:
|
||||
self.connection.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.connection = None
|
||||
|
||||
def __exit__(self, exc_type, exc_value, tb):
|
||||
"""End of 'with' statement. We're done here."""
|
||||
self._done()
|
||||
|
||||
def __del__(self):
|
||||
"""Caller is done with this client. Make sure we cleaned up."""
|
||||
self._done()
|
||||
|
||||
def close(self):
|
||||
"""Caller is done with this client."""
|
||||
self._done()
|
||||
|
||||
def __getattr__(self, key):
|
||||
"""Proxy all other calls to the ZmqClient instance."""
|
||||
if self.connection:
|
||||
return getattr(self.connection, key)
|
||||
else:
|
||||
raise rpc_common.InvalidRPCConnectionReuse()
|
||||
|
||||
|
||||
class RpcContext(rpc_common.CommonRpcContext):
|
||||
"""Context that supports replying to a rpc.call."""
|
||||
def __init__(self, **kwargs):
|
||||
@ -320,7 +383,7 @@ class InternalContext(object):
|
||||
return {'exc':
|
||||
rpc_common.serialize_remote_exception(sys.exc_info())}
|
||||
|
||||
def reply(self, ctx, proxy,
|
||||
def reply(self, driver, ctx, proxy,
|
||||
msg_id=None, context=None, topic=None, msg=None):
|
||||
"""Reply to a casted call."""
|
||||
# NOTE(ewindisch): context kwarg exists for Grizzly compat.
|
||||
@ -336,19 +399,20 @@ class InternalContext(object):
|
||||
ctx.replies)
|
||||
|
||||
LOG.debug("Sending reply")
|
||||
_multi_send(_cast, ctx, topic, {
|
||||
_multi_send(driver, _cast, ctx, topic, {
|
||||
'method': '-process_reply',
|
||||
'args': {
|
||||
'msg_id': msg_id, # Include for Folsom compat.
|
||||
'response': response
|
||||
}
|
||||
}, _msg_id=msg_id)
|
||||
}, _msg_id=msg_id, pooled=True)
|
||||
|
||||
|
||||
class ConsumerBase(object):
|
||||
"""Base Consumer."""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, driver):
|
||||
self.driver = driver
|
||||
self.private_ctx = InternalContext(None)
|
||||
|
||||
@classmethod
|
||||
@ -371,7 +435,7 @@ class ConsumerBase(object):
|
||||
# Internal method
|
||||
# uses internal context for safety.
|
||||
if method == '-reply':
|
||||
self.private_ctx.reply(ctx, proxy, **data['args'])
|
||||
self.private_ctx.reply(self.driver, ctx, proxy, **data['args'])
|
||||
return
|
||||
|
||||
proxy.dispatch(ctx, data)
|
||||
@ -383,9 +447,10 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
Used for RoundRobin requests.
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
super(ZmqBaseReactor, self).__init__()
|
||||
def __init__(self, conf, driver=None):
|
||||
super(ZmqBaseReactor, self).__init__(driver)
|
||||
|
||||
self.driver = driver
|
||||
self.proxies = {}
|
||||
self.threads = []
|
||||
self.sockets = []
|
||||
@ -564,8 +629,8 @@ class ZmqReactor(ZmqBaseReactor):
|
||||
Can also be used as a 1:1 proxy
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
super(ZmqReactor, self).__init__(conf)
|
||||
def __init__(self, conf, driver):
|
||||
super(ZmqReactor, self).__init__(conf, driver)
|
||||
|
||||
def consume(self, sock):
|
||||
# TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
||||
@ -598,9 +663,9 @@ class ZmqReactor(ZmqBaseReactor):
|
||||
class Connection(rpc_common.Connection):
|
||||
"""Manages connections and threads."""
|
||||
|
||||
def __init__(self, conf):
|
||||
def __init__(self, conf, driver):
|
||||
self.topics = []
|
||||
self.reactor = ZmqReactor(conf)
|
||||
self.reactor = ZmqReactor(conf, driver)
|
||||
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
# Register with matchmaker.
|
||||
@ -653,8 +718,8 @@ class Connection(rpc_common.Connection):
|
||||
self.reactor.consume_in_thread()
|
||||
|
||||
|
||||
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
|
||||
_msg_id=None, allowed_remote_exmods=None):
|
||||
def _cast(driver, addr, context, topic, msg, timeout=None, envelope=False,
|
||||
_msg_id=None, allowed_remote_exmods=None, pooled=False):
|
||||
allowed_remote_exmods = allowed_remote_exmods or []
|
||||
timeout_cast = timeout or CONF.rpc_cast_timeout
|
||||
payload = [RpcContext.marshal(context), msg]
|
||||
@ -662,21 +727,16 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False,
|
||||
topic = topic.encode('utf-8')
|
||||
|
||||
with Timeout(timeout_cast, exception=rpc_common.Timeout):
|
||||
conn = None
|
||||
try:
|
||||
conn = ZmqClient(addr)
|
||||
|
||||
# assumes cast can't return an exception
|
||||
conn.cast(_msg_id, topic, payload, envelope)
|
||||
except zmq.ZMQError:
|
||||
raise RPCException("Cast failed. ZMQ Socket Exception")
|
||||
finally:
|
||||
if conn is not None:
|
||||
conn.close()
|
||||
with driver.get_connection(addr, pooled) as conn:
|
||||
try:
|
||||
# assumes cast can't return an exception
|
||||
conn.cast(_msg_id, topic, payload, envelope)
|
||||
except zmq.ZMQError:
|
||||
raise RPCException("Cast failed. ZMQ Socket Exception")
|
||||
|
||||
|
||||
def _call(addr, context, topic, msg, timeout=None,
|
||||
envelope=False, allowed_remote_exmods=None):
|
||||
def _call(driver, addr, context, topic, msg, timeout=None,
|
||||
envelope=False, allowed_remote_exmods=None, pooled=False):
|
||||
allowed_remote_exmods = allowed_remote_exmods or []
|
||||
# timeout_response is how long we wait for a response
|
||||
timeout = timeout or CONF.rpc_response_timeout
|
||||
@ -714,7 +774,8 @@ def _call(addr, context, topic, msg, timeout=None,
|
||||
)
|
||||
|
||||
LOG.debug("Sending cast: %s", topic)
|
||||
_cast(addr, context, topic, payload, envelope=envelope)
|
||||
_cast(driver, addr, context, topic, payload, envelope=envelope,
|
||||
pooled=pooled)
|
||||
|
||||
LOG.debug("Cast sent; Waiting reply")
|
||||
# Blocks until receives reply
|
||||
@ -755,8 +816,9 @@ def _call(addr, context, topic, msg, timeout=None,
|
||||
return responses[-1]
|
||||
|
||||
|
||||
def _multi_send(method, context, topic, msg, timeout=None,
|
||||
envelope=False, _msg_id=None, allowed_remote_exmods=None):
|
||||
def _multi_send(driver, method, context, topic, msg, timeout=None,
|
||||
envelope=False, _msg_id=None, allowed_remote_exmods=None,
|
||||
pooled=False):
|
||||
"""Wraps the sending of messages.
|
||||
|
||||
Dispatches to the matchmaker and sends message to all relevant hosts.
|
||||
@ -787,11 +849,12 @@ def _multi_send(method, context, topic, msg, timeout=None,
|
||||
_addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
|
||||
|
||||
if method.__name__ == '_cast':
|
||||
eventlet.spawn_n(method, _addr, context,
|
||||
_topic, msg, timeout, envelope, _msg_id)
|
||||
eventlet.spawn_n(method, driver, _addr, context,
|
||||
_topic, msg, timeout, envelope, _msg_id,
|
||||
None, pooled)
|
||||
else:
|
||||
return_val = method(_addr, context, _topic, msg, timeout,
|
||||
envelope, allowed_remote_exmods)
|
||||
return_val = method(driver, _addr, context, _topic, msg, timeout,
|
||||
envelope, allowed_remote_exmods, pooled)
|
||||
|
||||
return return_val
|
||||
|
||||
@ -871,6 +934,50 @@ class ZmqListener(base.Listener):
|
||||
return None
|
||||
|
||||
|
||||
class ZmqClientPool(pool.Pool):
|
||||
"""Class that implements a pool of Zmq Clients for a single endpoint"""
|
||||
def __init__(self, conf, address, connection_cls, ctxt):
|
||||
self.connection_cls = connection_cls
|
||||
self.ctxt = ctxt
|
||||
self.address = address
|
||||
super(ZmqClientPool, self).__init__(conf.rpc_conn_pool_size)
|
||||
|
||||
def create(self):
|
||||
LOG.debug('Pool creating new ZMQ connection for %s' % self.address)
|
||||
return self.connection_cls(self.address, self.ctxt)
|
||||
|
||||
def empty(self):
|
||||
for item in self.iter_free():
|
||||
item.close()
|
||||
|
||||
|
||||
class ZmqClientPoolManager(object):
|
||||
"""Class that manages pools of clients for Zmq endpoints"""
|
||||
|
||||
def __init__(self, conf, ctxt=None):
|
||||
self._pools = {}
|
||||
self._lock = threading.Lock()
|
||||
self.conf = conf
|
||||
self.ctxt = ctxt
|
||||
|
||||
def get(self, address):
|
||||
if address not in self._pools:
|
||||
with self._lock:
|
||||
if address not in self._pools:
|
||||
self._pools[address] = ZmqClientPool(self.conf,
|
||||
address,
|
||||
ZmqClient,
|
||||
self.ctxt)
|
||||
return self._pools[address].get()
|
||||
|
||||
def put(self, item):
|
||||
self._pools[item.address].put(item)
|
||||
|
||||
def empty(self):
|
||||
for p in self._pools:
|
||||
self._pools[p].empty()
|
||||
|
||||
|
||||
class ZmqDriver(base.BaseDriver):
|
||||
|
||||
# FIXME(markmc): allow this driver to be used without eventlet
|
||||
@ -881,6 +988,7 @@ class ZmqDriver(base.BaseDriver):
|
||||
raise ImportError("Failed to import eventlet.green.zmq")
|
||||
conf.register_opts(zmq_opts)
|
||||
conf.register_opts(executor_base._pool_opts)
|
||||
conf.register_opts(base.base_opts)
|
||||
|
||||
super(ZmqDriver, self).__init__(conf, url, default_exchange,
|
||||
allowed_remote_exmods)
|
||||
@ -899,6 +1007,33 @@ class ZmqDriver(base.BaseDriver):
|
||||
|
||||
self.listeners = []
|
||||
|
||||
# NOTE(jamespage): Create pool manager on first use to deal with
|
||||
# os.fork calls in openstack daemons.
|
||||
self._pool = None
|
||||
self._pid = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _configure_pool_manager(func):
|
||||
"""Causes a new pool manager to be created when the messaging service
|
||||
is first used by the current process. This is important as all
|
||||
connections in the pools manager by the pool manager will share the
|
||||
same ZMQ context, which must not be shared across OS processes.
|
||||
"""
|
||||
def wrap(self, *args, **kws):
|
||||
with self._lock:
|
||||
old_pid = self._pid
|
||||
self._pid = os.getpid()
|
||||
|
||||
if old_pid != self._pid:
|
||||
# Create fresh pool manager for the current process
|
||||
# along with a new ZMQ context.
|
||||
self._pool = ZmqClientPoolManager(
|
||||
self.conf,
|
||||
zmq.Context(self.conf.rpc_zmq_contexts)
|
||||
)
|
||||
return func(self, *args, **kws)
|
||||
return wrap
|
||||
|
||||
def _send(self, target, ctxt, message,
|
||||
wait_for_reply=None, timeout=None, envelope=False):
|
||||
|
||||
@ -915,19 +1050,22 @@ class ZmqDriver(base.BaseDriver):
|
||||
elif target.server:
|
||||
topic = '%s.%s' % (topic, target.server)
|
||||
|
||||
reply = _multi_send(method, ctxt, topic, message,
|
||||
reply = _multi_send(self, method, ctxt, topic, message,
|
||||
envelope=envelope,
|
||||
allowed_remote_exmods=self._allowed_remote_exmods)
|
||||
allowed_remote_exmods=self._allowed_remote_exmods,
|
||||
pooled=True)
|
||||
|
||||
if wait_for_reply:
|
||||
return reply[-1]
|
||||
|
||||
@_configure_pool_manager
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
||||
retry=None):
|
||||
# NOTE(sileht): retry is not implemented because this driver never
|
||||
# retry anything
|
||||
return self._send(target, ctxt, message, wait_for_reply, timeout)
|
||||
|
||||
@_configure_pool_manager
|
||||
def send_notification(self, target, ctxt, message, version, retry=None):
|
||||
# NOTE(ewindisch): dot-priority in rpc notifier does not
|
||||
# work with our assumptions.
|
||||
@ -936,8 +1074,9 @@ class ZmqDriver(base.BaseDriver):
|
||||
target = target(topic=target.topic.replace('.', '-'))
|
||||
return self._send(target, ctxt, message, envelope=(version == 2.0))
|
||||
|
||||
@_configure_pool_manager
|
||||
def listen(self, target):
|
||||
conn = Connection(self.conf)
|
||||
conn = Connection(self.conf, self)
|
||||
|
||||
listener = ZmqListener(self)
|
||||
|
||||
@ -951,12 +1090,13 @@ class ZmqDriver(base.BaseDriver):
|
||||
|
||||
return listener
|
||||
|
||||
@_configure_pool_manager
|
||||
def listen_for_notifications(self, targets_and_priorities, pool):
|
||||
# NOTE(sileht): this listener implementation is limited
|
||||
# because zeromq doesn't support:
|
||||
# * requeing message
|
||||
# * pool
|
||||
conn = Connection(self.conf)
|
||||
conn = Connection(self.conf, self)
|
||||
|
||||
listener = ZmqListener(self)
|
||||
for target, priority in targets_and_priorities:
|
||||
@ -974,3 +1114,8 @@ class ZmqDriver(base.BaseDriver):
|
||||
for c in self.listeners:
|
||||
c.close()
|
||||
self.listeners = []
|
||||
if self._pool:
|
||||
self._pool.empty()
|
||||
|
||||
def get_connection(self, address, pooled=False):
|
||||
return ZmqClientContext(address, self._pool, pooled)
|
||||
|
@ -21,6 +21,7 @@ import copy
|
||||
import itertools
|
||||
|
||||
from oslo_messaging._drivers import amqp
|
||||
from oslo_messaging._drivers import base as drivers_base
|
||||
from oslo_messaging._drivers import impl_qpid
|
||||
from oslo_messaging._drivers import impl_rabbit
|
||||
from oslo_messaging._drivers import impl_zmq
|
||||
@ -34,6 +35,7 @@ from oslo_messaging.rpc import client
|
||||
from oslo_messaging import transport
|
||||
|
||||
_global_opt_lists = [
|
||||
drivers_base.base_opts,
|
||||
impl_zmq.zmq_opts,
|
||||
matchmaker.matchmaker_opts,
|
||||
base._pool_opts,
|
||||
|
@ -150,10 +150,11 @@ class TestZmqBasics(ZmqBaseTestCase):
|
||||
|
||||
self.assertEqual(result, True)
|
||||
mock_call.assert_called_once_with(
|
||||
self.driver,
|
||||
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
|
||||
{}, 'fanout~testtopic.127.0.0.1',
|
||||
{'tx_id': 1, 'method': 'hello-world'},
|
||||
None, False, [])
|
||||
None, False, [], True)
|
||||
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
|
||||
def test_send_receive_direct(self, mock_call):
|
||||
@ -171,10 +172,11 @@ class TestZmqBasics(ZmqBaseTestCase):
|
||||
|
||||
self.assertEqual(result, True)
|
||||
mock_call.assert_called_once_with(
|
||||
self.driver,
|
||||
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
|
||||
{}, 'testtopic.localhost',
|
||||
{'tx_id': 1, 'method': 'hello-world'},
|
||||
None, False, [])
|
||||
None, False, [], True)
|
||||
|
||||
|
||||
class TestZmqSocket(test_utils.BaseTestCase):
|
||||
@ -291,7 +293,7 @@ class TestZmqConnection(ZmqBaseTestCase):
|
||||
def test_zmqconnection_create_consumer(self, mock_reactor):
|
||||
|
||||
mock_reactor.register = mock.Mock()
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn = impl_zmq.Connection(self.driver.conf, self.driver)
|
||||
topic = 'topic.foo'
|
||||
context = mock.Mock()
|
||||
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
|
||||
@ -317,7 +319,7 @@ class TestZmqConnection(ZmqBaseTestCase):
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
|
||||
mock_reactor.register = mock.Mock()
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn = impl_zmq.Connection(self.driver.conf, self.driver)
|
||||
topic = 'topic.foo'
|
||||
context = mock.Mock()
|
||||
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
|
||||
@ -335,7 +337,7 @@ class TestZmqConnection(ZmqBaseTestCase):
|
||||
autospec=True)
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn = impl_zmq.Connection(self.driver.conf, self.driver)
|
||||
conn.reactor.close = mock.Mock()
|
||||
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
|
||||
conn.close()
|
||||
@ -344,7 +346,7 @@ class TestZmqConnection(ZmqBaseTestCase):
|
||||
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_wait(self, mock_reactor):
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn = impl_zmq.Connection(self.driver, self.driver)
|
||||
conn.reactor.wait = mock.Mock()
|
||||
conn.wait()
|
||||
self.assertTrue(conn.reactor.wait.called)
|
||||
@ -355,7 +357,7 @@ class TestZmqConnection(ZmqBaseTestCase):
|
||||
def test_zmqconnection_consume_in_thread(self, mock_reactor,
|
||||
mock_getmatchmaker):
|
||||
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn = impl_zmq.Connection(self.driver, self.driver)
|
||||
conn.reactor.consume_in_thread = mock.Mock()
|
||||
conn.consume_in_thread()
|
||||
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
|
||||
@ -397,7 +399,8 @@ class TestZmqDriver(ZmqBaseTestCase):
|
||||
|
||||
with mock.patch.object(impl_zmq.LOG, 'warn') as flog:
|
||||
mock_queues.return_value = None
|
||||
impl_zmq._multi_send(mock_cast, context, topic, msg)
|
||||
impl_zmq._multi_send(self.driver, mock_cast,
|
||||
context, topic, msg)
|
||||
self.assertEqual(1, flog.call_count)
|
||||
args, kwargs = flog.call_args
|
||||
self.assertIn('No matchmaker results', args[0])
|
||||
@ -414,7 +417,7 @@ class TestZmqDriver(ZmqBaseTestCase):
|
||||
|
||||
mock_queues.return_value = None
|
||||
self.assertRaises(rpc_common.Timeout,
|
||||
impl_zmq._multi_send,
|
||||
impl_zmq._multi_send, self.driver,
|
||||
mock_call, context, topic, msg)
|
||||
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
|
||||
@ -425,9 +428,10 @@ class TestZmqDriver(ZmqBaseTestCase):
|
||||
msg = 'jeronimo'
|
||||
self.driver.send(oslo_messaging.Target(topic=topic), context, msg,
|
||||
False, 0, False)
|
||||
mock_multi_send.assert_called_with(mock_cast, context, topic, msg,
|
||||
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
|
||||
topic, msg,
|
||||
allowed_remote_exmods=[],
|
||||
envelope=False)
|
||||
envelope=False, pooled=True)
|
||||
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
|
||||
@ -438,9 +442,10 @@ class TestZmqDriver(ZmqBaseTestCase):
|
||||
msg = 'jeronimo'
|
||||
self.driver.send_notification(oslo_messaging.Target(topic=topic),
|
||||
context, msg, False, False)
|
||||
mock_multi_send.assert_called_with(mock_cast, context, topic_reformat,
|
||||
msg, allowed_remote_exmods=[],
|
||||
envelope=False)
|
||||
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
|
||||
topic_reformat, msg,
|
||||
allowed_remote_exmods=[],
|
||||
envelope=False, pooled=True)
|
||||
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
|
||||
|
@ -150,10 +150,11 @@ class TestZmqBasics(ZmqBaseTestCase):
|
||||
|
||||
self.assertEqual(result, True)
|
||||
mock_call.assert_called_once_with(
|
||||
self.driver,
|
||||
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
|
||||
{}, 'fanout~testtopic.127.0.0.1',
|
||||
{'tx_id': 1, 'method': 'hello-world'},
|
||||
None, False, [])
|
||||
None, False, [], True)
|
||||
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
|
||||
def test_send_receive_direct(self, mock_call):
|
||||
@ -171,10 +172,11 @@ class TestZmqBasics(ZmqBaseTestCase):
|
||||
|
||||
self.assertEqual(result, True)
|
||||
mock_call.assert_called_once_with(
|
||||
self.driver,
|
||||
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
|
||||
{}, 'testtopic.localhost',
|
||||
{'tx_id': 1, 'method': 'hello-world'},
|
||||
None, False, [])
|
||||
None, False, [], True)
|
||||
|
||||
|
||||
class TestZmqSocket(test_utils.BaseTestCase):
|
||||
@ -291,7 +293,7 @@ class TestZmqConnection(ZmqBaseTestCase):
|
||||
def test_zmqconnection_create_consumer(self, mock_reactor):
|
||||
|
||||
mock_reactor.register = mock.Mock()
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn = impl_zmq.Connection(self.driver.conf, self.driver)
|
||||
topic = 'topic.foo'
|
||||
context = mock.Mock()
|
||||
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
|
||||
@ -317,7 +319,7 @@ class TestZmqConnection(ZmqBaseTestCase):
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
|
||||
mock_reactor.register = mock.Mock()
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn = impl_zmq.Connection(self.driver.conf, self.driver)
|
||||
topic = 'topic.foo'
|
||||
context = mock.Mock()
|
||||
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
|
||||
@ -335,7 +337,7 @@ class TestZmqConnection(ZmqBaseTestCase):
|
||||
autospec=True)
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn = impl_zmq.Connection(self.driver.conf, self.driver)
|
||||
conn.reactor.close = mock.Mock()
|
||||
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
|
||||
conn.close()
|
||||
@ -344,7 +346,7 @@ class TestZmqConnection(ZmqBaseTestCase):
|
||||
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_wait(self, mock_reactor):
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn = impl_zmq.Connection(self.driver.conf, self.driver)
|
||||
conn.reactor.wait = mock.Mock()
|
||||
conn.wait()
|
||||
self.assertTrue(conn.reactor.wait.called)
|
||||
@ -355,7 +357,7 @@ class TestZmqConnection(ZmqBaseTestCase):
|
||||
def test_zmqconnection_consume_in_thread(self, mock_reactor,
|
||||
mock_getmatchmaker):
|
||||
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn = impl_zmq.Connection(self.driver.conf, self.driver)
|
||||
conn.reactor.consume_in_thread = mock.Mock()
|
||||
conn.consume_in_thread()
|
||||
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
|
||||
@ -393,9 +395,10 @@ class TestZmqDriver(ZmqBaseTestCase):
|
||||
msg = 'jeronimo'
|
||||
self.driver.send(messaging.Target(topic=topic), context, msg,
|
||||
False, 0, False)
|
||||
mock_multi_send.assert_called_with(mock_cast, context, topic, msg,
|
||||
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
|
||||
topic, msg,
|
||||
allowed_remote_exmods=[],
|
||||
envelope=False)
|
||||
envelope=False, pooled=True)
|
||||
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
|
||||
@ -406,9 +409,10 @@ class TestZmqDriver(ZmqBaseTestCase):
|
||||
msg = 'jeronimo'
|
||||
self.driver.send_notification(messaging.Target(topic=topic), context,
|
||||
msg, False, False)
|
||||
mock_multi_send.assert_called_with(mock_cast, context, topic_reformat,
|
||||
msg, allowed_remote_exmods=[],
|
||||
envelope=False)
|
||||
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
|
||||
topic_reformat, msg,
|
||||
allowed_remote_exmods=[],
|
||||
envelope=False, pooled=True)
|
||||
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
|
||||
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
|
||||
|
Loading…
Reference in New Issue
Block a user