Merge "Revert "Sync rpc/impl_qpid.py from oslo-incubator.""
This commit is contained in:
commit
a0a315486a
@ -24,8 +24,7 @@ import eventlet
|
|||||||
import greenlet
|
import greenlet
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from neutron.openstack.common import excutils
|
from neutron.openstack.common.gettextutils import _
|
||||||
from neutron.openstack.common.gettextutils import _ # noqa
|
|
||||||
from neutron.openstack.common import importutils
|
from neutron.openstack.common import importutils
|
||||||
from neutron.openstack.common import jsonutils
|
from neutron.openstack.common import jsonutils
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
@ -119,17 +118,10 @@ class ConsumerBase(object):
|
|||||||
|
|
||||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||||
|
|
||||||
self.connect(session)
|
self.reconnect(session)
|
||||||
|
|
||||||
def connect(self, session):
|
|
||||||
"""Declare the reciever on connect."""
|
|
||||||
self._declare_receiver(session)
|
|
||||||
|
|
||||||
def reconnect(self, session):
|
def reconnect(self, session):
|
||||||
"""Re-declare the receiver after a qpid reconnect."""
|
"""Re-declare the receiver after a qpid reconnect."""
|
||||||
self._declare_receiver(session)
|
|
||||||
|
|
||||||
def _declare_receiver(self, session):
|
|
||||||
self.session = session
|
self.session = session
|
||||||
self.receiver = session.receiver(self.address)
|
self.receiver = session.receiver(self.address)
|
||||||
self.receiver.capacity = 1
|
self.receiver.capacity = 1
|
||||||
@ -160,15 +152,11 @@ class ConsumerBase(object):
|
|||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_("Failed to process message... skipping it."))
|
LOG.exception(_("Failed to process message... skipping it."))
|
||||||
finally:
|
finally:
|
||||||
# TODO(sandy): Need support for optional ack_on_error.
|
|
||||||
self.session.acknowledge(message)
|
self.session.acknowledge(message)
|
||||||
|
|
||||||
def get_receiver(self):
|
def get_receiver(self):
|
||||||
return self.receiver
|
return self.receiver
|
||||||
|
|
||||||
def get_node_name(self):
|
|
||||||
return self.address.split(';')[0]
|
|
||||||
|
|
||||||
|
|
||||||
class DirectConsumer(ConsumerBase):
|
class DirectConsumer(ConsumerBase):
|
||||||
"""Queue/consumer class for 'direct'."""
|
"""Queue/consumer class for 'direct'."""
|
||||||
@ -181,16 +169,11 @@ class DirectConsumer(ConsumerBase):
|
|||||||
'callback' is the callback to call when messages are received
|
'callback' is the callback to call when messages are received
|
||||||
"""
|
"""
|
||||||
|
|
||||||
super(DirectConsumer, self).__init__(
|
super(DirectConsumer, self).__init__(session, callback,
|
||||||
session, callback,
|
"%s/%s" % (msg_id, msg_id),
|
||||||
"%s/%s" % (msg_id, msg_id),
|
{"type": "direct"},
|
||||||
{"type": "direct"},
|
msg_id,
|
||||||
msg_id,
|
{"exclusive": True})
|
||||||
{
|
|
||||||
"auto-delete": conf.amqp_auto_delete,
|
|
||||||
"exclusive": True,
|
|
||||||
"durable": conf.amqp_durable_queues,
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
class TopicConsumer(ConsumerBase):
|
class TopicConsumer(ConsumerBase):
|
||||||
@ -208,14 +191,9 @@ class TopicConsumer(ConsumerBase):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
|
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
|
||||||
super(TopicConsumer, self).__init__(
|
super(TopicConsumer, self).__init__(session, callback,
|
||||||
session, callback,
|
"%s/%s" % (exchange_name, topic),
|
||||||
"%s/%s" % (exchange_name, topic),
|
{}, name or topic, {})
|
||||||
{}, name or topic,
|
|
||||||
{
|
|
||||||
"auto-delete": conf.amqp_auto_delete,
|
|
||||||
"durable": conf.amqp_durable_queues,
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
class FanoutConsumer(ConsumerBase):
|
class FanoutConsumer(ConsumerBase):
|
||||||
@ -228,7 +206,6 @@ class FanoutConsumer(ConsumerBase):
|
|||||||
'topic' is the topic to listen on
|
'topic' is the topic to listen on
|
||||||
'callback' is the callback to call when messages are received
|
'callback' is the callback to call when messages are received
|
||||||
"""
|
"""
|
||||||
self.conf = conf
|
|
||||||
|
|
||||||
super(FanoutConsumer, self).__init__(
|
super(FanoutConsumer, self).__init__(
|
||||||
session, callback,
|
session, callback,
|
||||||
@ -237,18 +214,6 @@ class FanoutConsumer(ConsumerBase):
|
|||||||
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
|
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
|
||||||
{"exclusive": True})
|
{"exclusive": True})
|
||||||
|
|
||||||
def reconnect(self, session):
|
|
||||||
topic = self.get_node_name().rpartition('_fanout')[0]
|
|
||||||
params = {
|
|
||||||
'session': session,
|
|
||||||
'topic': topic,
|
|
||||||
'callback': self.callback,
|
|
||||||
}
|
|
||||||
|
|
||||||
self.__init__(conf=self.conf, **params)
|
|
||||||
|
|
||||||
super(FanoutConsumer, self).reconnect(session)
|
|
||||||
|
|
||||||
|
|
||||||
class Publisher(object):
|
class Publisher(object):
|
||||||
"""Base Publisher class."""
|
"""Base Publisher class."""
|
||||||
@ -320,7 +285,7 @@ class DirectPublisher(Publisher):
|
|||||||
def __init__(self, conf, session, msg_id):
|
def __init__(self, conf, session, msg_id):
|
||||||
"""Init a 'direct' publisher."""
|
"""Init a 'direct' publisher."""
|
||||||
super(DirectPublisher, self).__init__(session, msg_id,
|
super(DirectPublisher, self).__init__(session, msg_id,
|
||||||
{"type": "direct"})
|
{"type": "Direct"})
|
||||||
|
|
||||||
|
|
||||||
class TopicPublisher(Publisher):
|
class TopicPublisher(Publisher):
|
||||||
@ -610,7 +575,6 @@ class Connection(object):
|
|||||||
|
|
||||||
def consume_in_thread(self):
|
def consume_in_thread(self):
|
||||||
"""Consumer from all queues/consumers in a greenthread."""
|
"""Consumer from all queues/consumers in a greenthread."""
|
||||||
@excutils.forever_retry_uncaught_exceptions
|
|
||||||
def _consumer_thread():
|
def _consumer_thread():
|
||||||
try:
|
try:
|
||||||
self.consume()
|
self.consume()
|
||||||
@ -651,7 +615,7 @@ class Connection(object):
|
|||||||
return consumer
|
return consumer
|
||||||
|
|
||||||
def join_consumer_pool(self, callback, pool_name, topic,
|
def join_consumer_pool(self, callback, pool_name, topic,
|
||||||
exchange_name=None, ack_on_error=True):
|
exchange_name=None):
|
||||||
"""Register as a member of a group of consumers for a given topic from
|
"""Register as a member of a group of consumers for a given topic from
|
||||||
the specified exchange.
|
the specified exchange.
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user