Merge "Update latest OSLO files"
This commit is contained in:
commit
a70fdbca13
@ -13,7 +13,6 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from quantum.openstack.common import jsonutils
|
from quantum.openstack.common import jsonutils
|
||||||
|
@ -13,7 +13,6 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from quantum.openstack.common import context as req_context
|
from quantum.openstack.common import context as req_context
|
||||||
|
@ -32,13 +32,27 @@ import uuid
|
|||||||
from eventlet import greenpool
|
from eventlet import greenpool
|
||||||
from eventlet import pools
|
from eventlet import pools
|
||||||
from eventlet import semaphore
|
from eventlet import semaphore
|
||||||
|
from eventlet import queue
|
||||||
|
|
||||||
|
# TODO(pekowsk): Remove import cfg and below comment in Havana.
|
||||||
|
# This import should no longer be needed when the amqp_rpc_single_reply_queue
|
||||||
|
# option is removed.
|
||||||
|
from oslo.config import cfg
|
||||||
from quantum.openstack.common import excutils
|
from quantum.openstack.common import excutils
|
||||||
from quantum.openstack.common.gettextutils import _
|
from quantum.openstack.common.gettextutils import _
|
||||||
from quantum.openstack.common import local
|
from quantum.openstack.common import local
|
||||||
from quantum.openstack.common import log as logging
|
from quantum.openstack.common import log as logging
|
||||||
from quantum.openstack.common.rpc import common as rpc_common
|
from quantum.openstack.common.rpc import common as rpc_common
|
||||||
|
|
||||||
|
# TODO(pekowski): Remove this option in Havana.
|
||||||
|
amqp_opts = [
|
||||||
|
cfg.BoolOpt('amqp_rpc_single_reply_queue',
|
||||||
|
default=False,
|
||||||
|
help='Enable a fast single reply queue if using AMQP based '
|
||||||
|
'RPC like RabbitMQ or Qpid.'),
|
||||||
|
]
|
||||||
|
|
||||||
|
cfg.CONF.register_opts(amqp_opts)
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -51,6 +65,7 @@ class Pool(pools.Pool):
|
|||||||
kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
|
kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
|
||||||
kwargs.setdefault("order_as_stack", True)
|
kwargs.setdefault("order_as_stack", True)
|
||||||
super(Pool, self).__init__(*args, **kwargs)
|
super(Pool, self).__init__(*args, **kwargs)
|
||||||
|
self.reply_proxy = None
|
||||||
|
|
||||||
# TODO(comstud): Timeout connections not used in a while
|
# TODO(comstud): Timeout connections not used in a while
|
||||||
def create(self):
|
def create(self):
|
||||||
@ -60,6 +75,16 @@ class Pool(pools.Pool):
|
|||||||
def empty(self):
|
def empty(self):
|
||||||
while self.free_items:
|
while self.free_items:
|
||||||
self.get().close()
|
self.get().close()
|
||||||
|
# Force a new connection pool to be created.
|
||||||
|
# Note that this was added due to failing unit test cases. The issue
|
||||||
|
# is the above "while loop" gets all the cached connections from the
|
||||||
|
# pool and closes them, but never returns them to the pool, a pool
|
||||||
|
# leak. The unit tests hang waiting for an item to be returned to the
|
||||||
|
# pool. The unit tests get here via the teatDown() method. In the run
|
||||||
|
# time code, it gets here via cleanup() and only appears in service.py
|
||||||
|
# just before doing a sys.exit(), so cleanup() only happens once and
|
||||||
|
# the leakage is not a problem.
|
||||||
|
self.connection_cls.pool = None
|
||||||
|
|
||||||
|
|
||||||
_pool_create_sem = semaphore.Semaphore()
|
_pool_create_sem = semaphore.Semaphore()
|
||||||
@ -137,6 +162,12 @@ class ConnectionContext(rpc_common.Connection):
|
|||||||
def create_worker(self, topic, proxy, pool_name):
|
def create_worker(self, topic, proxy, pool_name):
|
||||||
self.connection.create_worker(topic, proxy, pool_name)
|
self.connection.create_worker(topic, proxy, pool_name)
|
||||||
|
|
||||||
|
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
|
||||||
|
self.connection.join_consumer_pool(callback,
|
||||||
|
pool_name,
|
||||||
|
topic,
|
||||||
|
exchange_name)
|
||||||
|
|
||||||
def consume_in_thread(self):
|
def consume_in_thread(self):
|
||||||
self.connection.consume_in_thread()
|
self.connection.consume_in_thread()
|
||||||
|
|
||||||
@ -148,8 +179,45 @@ class ConnectionContext(rpc_common.Connection):
|
|||||||
raise rpc_common.InvalidRPCConnectionReuse()
|
raise rpc_common.InvalidRPCConnectionReuse()
|
||||||
|
|
||||||
|
|
||||||
def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
|
class ReplyProxy(ConnectionContext):
|
||||||
ending=False, log_failure=True):
|
""" Connection class for RPC replies / callbacks """
|
||||||
|
def __init__(self, conf, connection_pool):
|
||||||
|
self._call_waiters = {}
|
||||||
|
self._num_call_waiters = 0
|
||||||
|
self._num_call_waiters_wrn_threshhold = 10
|
||||||
|
self._reply_q = 'reply_' + uuid.uuid4().hex
|
||||||
|
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
|
||||||
|
self.declare_direct_consumer(self._reply_q, self._process_data)
|
||||||
|
self.consume_in_thread()
|
||||||
|
|
||||||
|
def _process_data(self, message_data):
|
||||||
|
msg_id = message_data.pop('_msg_id', None)
|
||||||
|
waiter = self._call_waiters.get(msg_id)
|
||||||
|
if not waiter:
|
||||||
|
LOG.warn(_('no calling threads waiting for msg_id : %s'
|
||||||
|
', message : %s') % (msg_id, message_data))
|
||||||
|
else:
|
||||||
|
waiter.put(message_data)
|
||||||
|
|
||||||
|
def add_call_waiter(self, waiter, msg_id):
|
||||||
|
self._num_call_waiters += 1
|
||||||
|
if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
|
||||||
|
LOG.warn(_('Number of call waiters is greater than warning '
|
||||||
|
'threshhold: %d. There could be a MulticallProxyWaiter '
|
||||||
|
'leak.') % self._num_call_waiters_wrn_threshhold)
|
||||||
|
self._num_call_waiters_wrn_threshhold *= 2
|
||||||
|
self._call_waiters[msg_id] = waiter
|
||||||
|
|
||||||
|
def del_call_waiter(self, msg_id):
|
||||||
|
self._num_call_waiters -= 1
|
||||||
|
del self._call_waiters[msg_id]
|
||||||
|
|
||||||
|
def get_reply_q(self):
|
||||||
|
return self._reply_q
|
||||||
|
|
||||||
|
|
||||||
|
def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
|
||||||
|
failure=None, ending=False, log_failure=True):
|
||||||
"""Sends a reply or an error on the channel signified by msg_id.
|
"""Sends a reply or an error on the channel signified by msg_id.
|
||||||
|
|
||||||
Failure should be a sys.exc_info() tuple.
|
Failure should be a sys.exc_info() tuple.
|
||||||
@ -168,13 +236,21 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
|
|||||||
'failure': failure}
|
'failure': failure}
|
||||||
if ending:
|
if ending:
|
||||||
msg['ending'] = True
|
msg['ending'] = True
|
||||||
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
|
# If a reply_q exists, add the msg_id to the reply and pass the
|
||||||
|
# reply_q to direct_send() to use it as the response queue.
|
||||||
|
# Otherwise use the msg_id for backward compatibilty.
|
||||||
|
if reply_q:
|
||||||
|
msg['_msg_id'] = msg_id
|
||||||
|
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
|
||||||
|
else:
|
||||||
|
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
|
||||||
|
|
||||||
|
|
||||||
class RpcContext(rpc_common.CommonRpcContext):
|
class RpcContext(rpc_common.CommonRpcContext):
|
||||||
"""Context that supports replying to a rpc.call"""
|
"""Context that supports replying to a rpc.call"""
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
self.msg_id = kwargs.pop('msg_id', None)
|
self.msg_id = kwargs.pop('msg_id', None)
|
||||||
|
self.reply_q = kwargs.pop('reply_q', None)
|
||||||
self.conf = kwargs.pop('conf')
|
self.conf = kwargs.pop('conf')
|
||||||
super(RpcContext, self).__init__(**kwargs)
|
super(RpcContext, self).__init__(**kwargs)
|
||||||
|
|
||||||
@ -182,13 +258,14 @@ class RpcContext(rpc_common.CommonRpcContext):
|
|||||||
values = self.to_dict()
|
values = self.to_dict()
|
||||||
values['conf'] = self.conf
|
values['conf'] = self.conf
|
||||||
values['msg_id'] = self.msg_id
|
values['msg_id'] = self.msg_id
|
||||||
|
values['reply_q'] = self.reply_q
|
||||||
return self.__class__(**values)
|
return self.__class__(**values)
|
||||||
|
|
||||||
def reply(self, reply=None, failure=None, ending=False,
|
def reply(self, reply=None, failure=None, ending=False,
|
||||||
connection_pool=None, log_failure=True):
|
connection_pool=None, log_failure=True):
|
||||||
if self.msg_id:
|
if self.msg_id:
|
||||||
msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
|
msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
|
||||||
ending, log_failure)
|
reply, failure, ending, log_failure)
|
||||||
if ending:
|
if ending:
|
||||||
self.msg_id = None
|
self.msg_id = None
|
||||||
|
|
||||||
@ -204,6 +281,7 @@ def unpack_context(conf, msg):
|
|||||||
value = msg.pop(key)
|
value = msg.pop(key)
|
||||||
context_dict[key[9:]] = value
|
context_dict[key[9:]] = value
|
||||||
context_dict['msg_id'] = msg.pop('_msg_id', None)
|
context_dict['msg_id'] = msg.pop('_msg_id', None)
|
||||||
|
context_dict['reply_q'] = msg.pop('_reply_q', None)
|
||||||
context_dict['conf'] = conf
|
context_dict['conf'] = conf
|
||||||
ctx = RpcContext.from_dict(context_dict)
|
ctx = RpcContext.from_dict(context_dict)
|
||||||
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
|
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
|
||||||
@ -224,15 +302,54 @@ def pack_context(msg, context):
|
|||||||
msg.update(context_d)
|
msg.update(context_d)
|
||||||
|
|
||||||
|
|
||||||
class ProxyCallback(object):
|
class _ThreadPoolWithWait(object):
|
||||||
"""Calls methods on a proxy object based on method and args."""
|
"""Base class for a delayed invocation manager used by
|
||||||
|
the Connection class to start up green threads
|
||||||
|
to handle incoming messages.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, conf, proxy, connection_pool):
|
def __init__(self, conf, connection_pool):
|
||||||
self.proxy = proxy
|
|
||||||
self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
|
self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
|
||||||
self.connection_pool = connection_pool
|
self.connection_pool = connection_pool
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
|
|
||||||
|
def wait(self):
|
||||||
|
"""Wait for all callback threads to exit."""
|
||||||
|
self.pool.waitall()
|
||||||
|
|
||||||
|
|
||||||
|
class CallbackWrapper(_ThreadPoolWithWait):
|
||||||
|
"""Wraps a straight callback to allow it to be invoked in a green
|
||||||
|
thread.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, conf, callback, connection_pool):
|
||||||
|
"""
|
||||||
|
:param conf: cfg.CONF instance
|
||||||
|
:param callback: a callable (probably a function)
|
||||||
|
:param connection_pool: connection pool as returned by
|
||||||
|
get_connection_pool()
|
||||||
|
"""
|
||||||
|
super(CallbackWrapper, self).__init__(
|
||||||
|
conf=conf,
|
||||||
|
connection_pool=connection_pool,
|
||||||
|
)
|
||||||
|
self.callback = callback
|
||||||
|
|
||||||
|
def __call__(self, message_data):
|
||||||
|
self.pool.spawn_n(self.callback, message_data)
|
||||||
|
|
||||||
|
|
||||||
|
class ProxyCallback(_ThreadPoolWithWait):
|
||||||
|
"""Calls methods on a proxy object based on method and args."""
|
||||||
|
|
||||||
|
def __init__(self, conf, proxy, connection_pool):
|
||||||
|
super(ProxyCallback, self).__init__(
|
||||||
|
conf=conf,
|
||||||
|
connection_pool=connection_pool,
|
||||||
|
)
|
||||||
|
self.proxy = proxy
|
||||||
|
|
||||||
def __call__(self, message_data):
|
def __call__(self, message_data):
|
||||||
"""Consumer callback to call a method on a proxy object.
|
"""Consumer callback to call a method on a proxy object.
|
||||||
|
|
||||||
@ -293,11 +410,66 @@ class ProxyCallback(object):
|
|||||||
ctxt.reply(None, sys.exc_info(),
|
ctxt.reply(None, sys.exc_info(),
|
||||||
connection_pool=self.connection_pool)
|
connection_pool=self.connection_pool)
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
"""Wait for all callback threads to exit."""
|
class MulticallProxyWaiter(object):
|
||||||
self.pool.waitall()
|
def __init__(self, conf, msg_id, timeout, connection_pool):
|
||||||
|
self._msg_id = msg_id
|
||||||
|
self._timeout = timeout or conf.rpc_response_timeout
|
||||||
|
self._reply_proxy = connection_pool.reply_proxy
|
||||||
|
self._done = False
|
||||||
|
self._got_ending = False
|
||||||
|
self._conf = conf
|
||||||
|
self._dataqueue = queue.LightQueue()
|
||||||
|
# Add this caller to the reply proxy's call_waiters
|
||||||
|
self._reply_proxy.add_call_waiter(self, self._msg_id)
|
||||||
|
|
||||||
|
def put(self, data):
|
||||||
|
self._dataqueue.put(data)
|
||||||
|
|
||||||
|
def done(self):
|
||||||
|
if self._done:
|
||||||
|
return
|
||||||
|
self._done = True
|
||||||
|
# Remove this caller from reply proxy's call_waiters
|
||||||
|
self._reply_proxy.del_call_waiter(self._msg_id)
|
||||||
|
|
||||||
|
def _process_data(self, data):
|
||||||
|
result = None
|
||||||
|
if data['failure']:
|
||||||
|
failure = data['failure']
|
||||||
|
result = rpc_common.deserialize_remote_exception(self._conf,
|
||||||
|
failure)
|
||||||
|
elif data.get('ending', False):
|
||||||
|
self._got_ending = True
|
||||||
|
else:
|
||||||
|
result = data['result']
|
||||||
|
return result
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
"""Return a result until we get a reply with an 'ending" flag"""
|
||||||
|
if self._done:
|
||||||
|
raise StopIteration
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
data = self._dataqueue.get(timeout=self._timeout)
|
||||||
|
result = self._process_data(data)
|
||||||
|
except queue.Empty:
|
||||||
|
LOG.exception(_('Timed out waiting for RPC response.'))
|
||||||
|
self.done()
|
||||||
|
raise rpc_common.Timeout()
|
||||||
|
except Exception:
|
||||||
|
with excutils.save_and_reraise_exception():
|
||||||
|
self.done()
|
||||||
|
if self._got_ending:
|
||||||
|
self.done()
|
||||||
|
raise StopIteration
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
self.done()
|
||||||
|
raise result
|
||||||
|
yield result
|
||||||
|
|
||||||
|
|
||||||
|
#TODO(pekowski): Remove MulticallWaiter() in Havana.
|
||||||
class MulticallWaiter(object):
|
class MulticallWaiter(object):
|
||||||
def __init__(self, conf, connection, timeout):
|
def __init__(self, conf, connection, timeout):
|
||||||
self._connection = connection
|
self._connection = connection
|
||||||
@ -353,22 +525,40 @@ def create_connection(conf, new, connection_pool):
|
|||||||
return ConnectionContext(conf, connection_pool, pooled=not new)
|
return ConnectionContext(conf, connection_pool, pooled=not new)
|
||||||
|
|
||||||
|
|
||||||
|
_reply_proxy_create_sem = semaphore.Semaphore()
|
||||||
|
|
||||||
|
|
||||||
def multicall(conf, context, topic, msg, timeout, connection_pool):
|
def multicall(conf, context, topic, msg, timeout, connection_pool):
|
||||||
"""Make a call that returns multiple times."""
|
"""Make a call that returns multiple times."""
|
||||||
|
# TODO(pekowski): Remove all these comments in Havana.
|
||||||
|
# For amqp_rpc_single_reply_queue = False,
|
||||||
# Can't use 'with' for multicall, as it returns an iterator
|
# Can't use 'with' for multicall, as it returns an iterator
|
||||||
# that will continue to use the connection. When it's done,
|
# that will continue to use the connection. When it's done,
|
||||||
# connection.close() will get called which will put it back into
|
# connection.close() will get called which will put it back into
|
||||||
# the pool
|
# the pool
|
||||||
|
# For amqp_rpc_single_reply_queue = True,
|
||||||
|
# The 'with' statement is mandatory for closing the connection
|
||||||
LOG.debug(_('Making synchronous call on %s ...'), topic)
|
LOG.debug(_('Making synchronous call on %s ...'), topic)
|
||||||
msg_id = uuid.uuid4().hex
|
msg_id = uuid.uuid4().hex
|
||||||
msg.update({'_msg_id': msg_id})
|
msg.update({'_msg_id': msg_id})
|
||||||
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
|
|
||||||
conn = ConnectionContext(conf, connection_pool)
|
# TODO(pekowski): Remove this flag and the code under the if clause
|
||||||
wait_msg = MulticallWaiter(conf, conn, timeout)
|
# in Havana.
|
||||||
conn.declare_direct_consumer(msg_id, wait_msg)
|
if not conf.amqp_rpc_single_reply_queue:
|
||||||
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
|
conn = ConnectionContext(conf, connection_pool)
|
||||||
|
wait_msg = MulticallWaiter(conf, conn, timeout)
|
||||||
|
conn.declare_direct_consumer(msg_id, wait_msg)
|
||||||
|
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
|
||||||
|
else:
|
||||||
|
with _reply_proxy_create_sem:
|
||||||
|
if not connection_pool.reply_proxy:
|
||||||
|
connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
|
||||||
|
msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
|
||||||
|
wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
|
||||||
|
with ConnectionContext(conf, connection_pool) as conn:
|
||||||
|
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
|
||||||
return wait_msg
|
return wait_msg
|
||||||
|
|
||||||
|
|
||||||
|
@ -197,6 +197,28 @@ class Connection(object):
|
|||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
|
||||||
|
"""Register as a member of a group of consumers for a given topic from
|
||||||
|
the specified exchange.
|
||||||
|
|
||||||
|
Exactly one member of a given pool will receive each message.
|
||||||
|
|
||||||
|
A message will be delivered to multiple pools, if more than
|
||||||
|
one is created.
|
||||||
|
|
||||||
|
:param callback: Callable to be invoked for each message.
|
||||||
|
:type callback: callable accepting one argument
|
||||||
|
:param pool_name: The name of the consumer pool.
|
||||||
|
:type pool_name: str
|
||||||
|
:param topic: The routing topic for desired messages.
|
||||||
|
:type topic: str
|
||||||
|
:param exchange_name: The name of the message exchange where
|
||||||
|
the client should attach. Defaults to
|
||||||
|
the configured exchange.
|
||||||
|
:type exchange_name: str
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
def consume_in_thread(self):
|
def consume_in_thread(self):
|
||||||
"""Spawn a thread to handle incoming messages.
|
"""Spawn a thread to handle incoming messages.
|
||||||
|
|
||||||
|
@ -165,9 +165,10 @@ class ConsumerBase(object):
|
|||||||
try:
|
try:
|
||||||
msg = rpc_common.deserialize_msg(message.payload)
|
msg = rpc_common.deserialize_msg(message.payload)
|
||||||
callback(msg)
|
callback(msg)
|
||||||
message.ack()
|
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_("Failed to process message... skipping it."))
|
LOG.exception(_("Failed to process message... skipping it."))
|
||||||
|
finally:
|
||||||
|
message.ack()
|
||||||
|
|
||||||
self.queue.consume(*args, callback=_callback, **options)
|
self.queue.consume(*args, callback=_callback, **options)
|
||||||
|
|
||||||
@ -750,6 +751,30 @@ class Connection(object):
|
|||||||
self.proxy_callbacks.append(proxy_cb)
|
self.proxy_callbacks.append(proxy_cb)
|
||||||
self.declare_topic_consumer(topic, proxy_cb, pool_name)
|
self.declare_topic_consumer(topic, proxy_cb, pool_name)
|
||||||
|
|
||||||
|
def join_consumer_pool(self, callback, pool_name, topic,
|
||||||
|
exchange_name=None):
|
||||||
|
"""Register as a member of a group of consumers for a given topic from
|
||||||
|
the specified exchange.
|
||||||
|
|
||||||
|
Exactly one member of a given pool will receive each message.
|
||||||
|
|
||||||
|
A message will be delivered to multiple pools, if more than
|
||||||
|
one is created.
|
||||||
|
"""
|
||||||
|
callback_wrapper = rpc_amqp.CallbackWrapper(
|
||||||
|
conf=self.conf,
|
||||||
|
callback=callback,
|
||||||
|
connection_pool=rpc_amqp.get_connection_pool(self.conf,
|
||||||
|
Connection),
|
||||||
|
)
|
||||||
|
self.proxy_callbacks.append(callback_wrapper)
|
||||||
|
self.declare_topic_consumer(
|
||||||
|
queue_name=pool_name,
|
||||||
|
topic=topic,
|
||||||
|
exchange_name=exchange_name,
|
||||||
|
callback=callback_wrapper,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def create_connection(conf, new=True):
|
def create_connection(conf, new=True):
|
||||||
"""Create a connection"""
|
"""Create a connection"""
|
||||||
|
@ -560,6 +560,34 @@ class Connection(object):
|
|||||||
|
|
||||||
return consumer
|
return consumer
|
||||||
|
|
||||||
|
def join_consumer_pool(self, callback, pool_name, topic,
|
||||||
|
exchange_name=None):
|
||||||
|
"""Register as a member of a group of consumers for a given topic from
|
||||||
|
the specified exchange.
|
||||||
|
|
||||||
|
Exactly one member of a given pool will receive each message.
|
||||||
|
|
||||||
|
A message will be delivered to multiple pools, if more than
|
||||||
|
one is created.
|
||||||
|
"""
|
||||||
|
callback_wrapper = rpc_amqp.CallbackWrapper(
|
||||||
|
conf=self.conf,
|
||||||
|
callback=callback,
|
||||||
|
connection_pool=rpc_amqp.get_connection_pool(self.conf,
|
||||||
|
Connection),
|
||||||
|
)
|
||||||
|
self.proxy_callbacks.append(callback_wrapper)
|
||||||
|
|
||||||
|
consumer = TopicConsumer(conf=self.conf,
|
||||||
|
session=self.session,
|
||||||
|
topic=topic,
|
||||||
|
callback=callback_wrapper,
|
||||||
|
name=pool_name,
|
||||||
|
exchange_name=exchange_name)
|
||||||
|
|
||||||
|
self._register_consumer(consumer)
|
||||||
|
return consumer
|
||||||
|
|
||||||
|
|
||||||
def create_connection(conf, new=True):
|
def create_connection(conf, new=True):
|
||||||
"""Create a connection"""
|
"""Create a connection"""
|
||||||
|
Loading…
x
Reference in New Issue
Block a user