Merge "Sync rpc/impl_qpid.py from oslo-incubator."
This commit is contained in:
commit
58e25937de
@ -24,7 +24,8 @@ import eventlet
|
|||||||
import greenlet
|
import greenlet
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from neutron.openstack.common.gettextutils import _
|
from neutron.openstack.common import excutils
|
||||||
|
from neutron.openstack.common.gettextutils import _ # noqa
|
||||||
from neutron.openstack.common import importutils
|
from neutron.openstack.common import importutils
|
||||||
from neutron.openstack.common import jsonutils
|
from neutron.openstack.common import jsonutils
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
@ -118,10 +119,17 @@ class ConsumerBase(object):
|
|||||||
|
|
||||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||||
|
|
||||||
self.reconnect(session)
|
self.connect(session)
|
||||||
|
|
||||||
|
def connect(self, session):
|
||||||
|
"""Declare the reciever on connect."""
|
||||||
|
self._declare_receiver(session)
|
||||||
|
|
||||||
def reconnect(self, session):
|
def reconnect(self, session):
|
||||||
"""Re-declare the receiver after a qpid reconnect."""
|
"""Re-declare the receiver after a qpid reconnect."""
|
||||||
|
self._declare_receiver(session)
|
||||||
|
|
||||||
|
def _declare_receiver(self, session):
|
||||||
self.session = session
|
self.session = session
|
||||||
self.receiver = session.receiver(self.address)
|
self.receiver = session.receiver(self.address)
|
||||||
self.receiver.capacity = 1
|
self.receiver.capacity = 1
|
||||||
@ -152,11 +160,15 @@ class ConsumerBase(object):
|
|||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_("Failed to process message... skipping it."))
|
LOG.exception(_("Failed to process message... skipping it."))
|
||||||
finally:
|
finally:
|
||||||
|
# TODO(sandy): Need support for optional ack_on_error.
|
||||||
self.session.acknowledge(message)
|
self.session.acknowledge(message)
|
||||||
|
|
||||||
def get_receiver(self):
|
def get_receiver(self):
|
||||||
return self.receiver
|
return self.receiver
|
||||||
|
|
||||||
|
def get_node_name(self):
|
||||||
|
return self.address.split(';')[0]
|
||||||
|
|
||||||
|
|
||||||
class DirectConsumer(ConsumerBase):
|
class DirectConsumer(ConsumerBase):
|
||||||
"""Queue/consumer class for 'direct'."""
|
"""Queue/consumer class for 'direct'."""
|
||||||
@ -169,11 +181,16 @@ class DirectConsumer(ConsumerBase):
|
|||||||
'callback' is the callback to call when messages are received
|
'callback' is the callback to call when messages are received
|
||||||
"""
|
"""
|
||||||
|
|
||||||
super(DirectConsumer, self).__init__(session, callback,
|
super(DirectConsumer, self).__init__(
|
||||||
|
session, callback,
|
||||||
"%s/%s" % (msg_id, msg_id),
|
"%s/%s" % (msg_id, msg_id),
|
||||||
{"type": "direct"},
|
{"type": "direct"},
|
||||||
msg_id,
|
msg_id,
|
||||||
{"exclusive": True})
|
{
|
||||||
|
"auto-delete": conf.amqp_auto_delete,
|
||||||
|
"exclusive": True,
|
||||||
|
"durable": conf.amqp_durable_queues,
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
class TopicConsumer(ConsumerBase):
|
class TopicConsumer(ConsumerBase):
|
||||||
@ -191,9 +208,14 @@ class TopicConsumer(ConsumerBase):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
|
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
|
||||||
super(TopicConsumer, self).__init__(session, callback,
|
super(TopicConsumer, self).__init__(
|
||||||
|
session, callback,
|
||||||
"%s/%s" % (exchange_name, topic),
|
"%s/%s" % (exchange_name, topic),
|
||||||
{}, name or topic, {})
|
{}, name or topic,
|
||||||
|
{
|
||||||
|
"auto-delete": conf.amqp_auto_delete,
|
||||||
|
"durable": conf.amqp_durable_queues,
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
class FanoutConsumer(ConsumerBase):
|
class FanoutConsumer(ConsumerBase):
|
||||||
@ -206,6 +228,7 @@ class FanoutConsumer(ConsumerBase):
|
|||||||
'topic' is the topic to listen on
|
'topic' is the topic to listen on
|
||||||
'callback' is the callback to call when messages are received
|
'callback' is the callback to call when messages are received
|
||||||
"""
|
"""
|
||||||
|
self.conf = conf
|
||||||
|
|
||||||
super(FanoutConsumer, self).__init__(
|
super(FanoutConsumer, self).__init__(
|
||||||
session, callback,
|
session, callback,
|
||||||
@ -214,6 +237,18 @@ class FanoutConsumer(ConsumerBase):
|
|||||||
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
|
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
|
||||||
{"exclusive": True})
|
{"exclusive": True})
|
||||||
|
|
||||||
|
def reconnect(self, session):
|
||||||
|
topic = self.get_node_name().rpartition('_fanout')[0]
|
||||||
|
params = {
|
||||||
|
'session': session,
|
||||||
|
'topic': topic,
|
||||||
|
'callback': self.callback,
|
||||||
|
}
|
||||||
|
|
||||||
|
self.__init__(conf=self.conf, **params)
|
||||||
|
|
||||||
|
super(FanoutConsumer, self).reconnect(session)
|
||||||
|
|
||||||
|
|
||||||
class Publisher(object):
|
class Publisher(object):
|
||||||
"""Base Publisher class."""
|
"""Base Publisher class."""
|
||||||
@ -285,7 +320,7 @@ class DirectPublisher(Publisher):
|
|||||||
def __init__(self, conf, session, msg_id):
|
def __init__(self, conf, session, msg_id):
|
||||||
"""Init a 'direct' publisher."""
|
"""Init a 'direct' publisher."""
|
||||||
super(DirectPublisher, self).__init__(session, msg_id,
|
super(DirectPublisher, self).__init__(session, msg_id,
|
||||||
{"type": "Direct"})
|
{"type": "direct"})
|
||||||
|
|
||||||
|
|
||||||
class TopicPublisher(Publisher):
|
class TopicPublisher(Publisher):
|
||||||
@ -575,6 +610,7 @@ class Connection(object):
|
|||||||
|
|
||||||
def consume_in_thread(self):
|
def consume_in_thread(self):
|
||||||
"""Consumer from all queues/consumers in a greenthread."""
|
"""Consumer from all queues/consumers in a greenthread."""
|
||||||
|
@excutils.forever_retry_uncaught_exceptions
|
||||||
def _consumer_thread():
|
def _consumer_thread():
|
||||||
try:
|
try:
|
||||||
self.consume()
|
self.consume()
|
||||||
@ -615,7 +651,7 @@ class Connection(object):
|
|||||||
return consumer
|
return consumer
|
||||||
|
|
||||||
def join_consumer_pool(self, callback, pool_name, topic,
|
def join_consumer_pool(self, callback, pool_name, topic,
|
||||||
exchange_name=None):
|
exchange_name=None, ack_on_error=True):
|
||||||
"""Register as a member of a group of consumers for a given topic from
|
"""Register as a member of a group of consumers for a given topic from
|
||||||
the specified exchange.
|
the specified exchange.
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user