From 5c3314a63284f94a3f8563bc23795ce7df92e167 Mon Sep 17 00:00:00 2001 From: Dan Prince Date: Tue, 13 Aug 2013 09:52:12 -0400 Subject: [PATCH] Revert "Sync rpc/impl_qpid.py from oslo-incubator." This reverts commit 3f4bb0443e96e9b7d04e3e4e77a8bb4e8647e01e. Fixes LP Bug #1211778. Change-Id: I254995a1bf5416fb70fbcac28ee399b1373efde7 --- neutron/openstack/common/rpc/impl_qpid.py | 60 +++++------------------ 1 file changed, 12 insertions(+), 48 deletions(-) diff --git a/neutron/openstack/common/rpc/impl_qpid.py b/neutron/openstack/common/rpc/impl_qpid.py index 6443513e7e..a5f934e4d4 100644 --- a/neutron/openstack/common/rpc/impl_qpid.py +++ b/neutron/openstack/common/rpc/impl_qpid.py @@ -24,8 +24,7 @@ import eventlet import greenlet from oslo.config import cfg -from neutron.openstack.common import excutils -from neutron.openstack.common.gettextutils import _ # noqa +from neutron.openstack.common.gettextutils import _ from neutron.openstack.common import importutils from neutron.openstack.common import jsonutils from neutron.openstack.common import log as logging @@ -119,17 +118,10 @@ class ConsumerBase(object): self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) - self.connect(session) - - def connect(self, session): - """Declare the reciever on connect.""" - self._declare_receiver(session) + self.reconnect(session) def reconnect(self, session): """Re-declare the receiver after a qpid reconnect.""" - self._declare_receiver(session) - - def _declare_receiver(self, session): self.session = session self.receiver = session.receiver(self.address) self.receiver.capacity = 1 @@ -160,15 +152,11 @@ class ConsumerBase(object): except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: - # TODO(sandy): Need support for optional ack_on_error. self.session.acknowledge(message) def get_receiver(self): return self.receiver - def get_node_name(self): - return self.address.split(';')[0] - class DirectConsumer(ConsumerBase): """Queue/consumer class for 'direct'.""" @@ -181,16 +169,11 @@ class DirectConsumer(ConsumerBase): 'callback' is the callback to call when messages are received """ - super(DirectConsumer, self).__init__( - session, callback, - "%s/%s" % (msg_id, msg_id), - {"type": "direct"}, - msg_id, - { - "auto-delete": conf.amqp_auto_delete, - "exclusive": True, - "durable": conf.amqp_durable_queues, - }) + super(DirectConsumer, self).__init__(session, callback, + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + {"exclusive": True}) class TopicConsumer(ConsumerBase): @@ -208,14 +191,9 @@ class TopicConsumer(ConsumerBase): """ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) - super(TopicConsumer, self).__init__( - session, callback, - "%s/%s" % (exchange_name, topic), - {}, name or topic, - { - "auto-delete": conf.amqp_auto_delete, - "durable": conf.amqp_durable_queues, - }) + super(TopicConsumer, self).__init__(session, callback, + "%s/%s" % (exchange_name, topic), + {}, name or topic, {}) class FanoutConsumer(ConsumerBase): @@ -228,7 +206,6 @@ class FanoutConsumer(ConsumerBase): 'topic' is the topic to listen on 'callback' is the callback to call when messages are received """ - self.conf = conf super(FanoutConsumer, self).__init__( session, callback, @@ -237,18 +214,6 @@ class FanoutConsumer(ConsumerBase): "%s_fanout_%s" % (topic, uuid.uuid4().hex), {"exclusive": True}) - def reconnect(self, session): - topic = self.get_node_name().rpartition('_fanout')[0] - params = { - 'session': session, - 'topic': topic, - 'callback': self.callback, - } - - self.__init__(conf=self.conf, **params) - - super(FanoutConsumer, self).reconnect(session) - class Publisher(object): """Base Publisher class.""" @@ -320,7 +285,7 @@ class DirectPublisher(Publisher): def __init__(self, conf, session, msg_id): """Init a 'direct' publisher.""" super(DirectPublisher, self).__init__(session, msg_id, - {"type": "direct"}) + {"type": "Direct"}) class TopicPublisher(Publisher): @@ -610,7 +575,6 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" - @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() @@ -651,7 +615,7 @@ class Connection(object): return consumer def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None, ack_on_error=True): + exchange_name=None): """Register as a member of a group of consumers for a given topic from the specified exchange.