Sync rpc/impl_qpid.py from oslo-incubator.

Fixes bug #1211338 (seen in Neutron, marked in launchpad as oslo
since the bug was in oslo code).

Change-Id: I11971c1213b095979bd4f2e878ea2bcad3ceb617
This commit is contained in:
David Ripton 2013-08-12 10:38:17 -04:00
parent ea6d30ee3e
commit 092dcefba4

View File

@ -24,7 +24,8 @@ import eventlet
import greenlet
from oslo.config import cfg
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _ # noqa
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
@ -118,10 +119,17 @@ class ConsumerBase(object):
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session)
self.connect(session)
def connect(self, session):
"""Declare the reciever on connect."""
self._declare_receiver(session)
def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect."""
self._declare_receiver(session)
def _declare_receiver(self, session):
self.session = session
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
@ -152,11 +160,15 @@ class ConsumerBase(object):
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
# TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message)
def get_receiver(self):
return self.receiver
def get_node_name(self):
return self.address.split(';')[0]
class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'."""
@ -169,11 +181,16 @@ class DirectConsumer(ConsumerBase):
'callback' is the callback to call when messages are received
"""
super(DirectConsumer, self).__init__(session, callback,
"%s/%s" % (msg_id, msg_id),
{"type": "direct"},
msg_id,
{"exclusive": True})
super(DirectConsumer, self).__init__(
session, callback,
"%s/%s" % (msg_id, msg_id),
{"type": "direct"},
msg_id,
{
"auto-delete": conf.amqp_auto_delete,
"exclusive": True,
"durable": conf.amqp_durable_queues,
})
class TopicConsumer(ConsumerBase):
@ -191,9 +208,14 @@ class TopicConsumer(ConsumerBase):
"""
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (exchange_name, topic),
{}, name or topic, {})
super(TopicConsumer, self).__init__(
session, callback,
"%s/%s" % (exchange_name, topic),
{}, name or topic,
{
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
})
class FanoutConsumer(ConsumerBase):
@ -206,6 +228,7 @@ class FanoutConsumer(ConsumerBase):
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
"""
self.conf = conf
super(FanoutConsumer, self).__init__(
session, callback,
@ -214,6 +237,18 @@ class FanoutConsumer(ConsumerBase):
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True})
def reconnect(self, session):
topic = self.get_node_name().rpartition('_fanout')[0]
params = {
'session': session,
'topic': topic,
'callback': self.callback,
}
self.__init__(conf=self.conf, **params)
super(FanoutConsumer, self).reconnect(session)
class Publisher(object):
"""Base Publisher class."""
@ -285,7 +320,7 @@ class DirectPublisher(Publisher):
def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
{"type": "Direct"})
{"type": "direct"})
class TopicPublisher(Publisher):
@ -575,6 +610,7 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
@ -615,7 +651,7 @@ class Connection(object):
return consumer
def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None):
exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.