Bring in oslo-common rpc ack() changes.
Requires https://review.openstack.org/#/c/34573/ Change-Id: I520e792b0ffb4eceac5d97efef606ee98a153744
This commit is contained in:
parent
df2f9d59c3
commit
ae4caf8ea0
@ -151,11 +151,13 @@ class ConnectionContext(rpc_common.Connection):
|
||||
def create_worker(self, topic, proxy, pool_name):
|
||||
self.connection.create_worker(topic, proxy, pool_name)
|
||||
|
||||
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
|
||||
def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
|
||||
ack_on_error=True):
|
||||
self.connection.join_consumer_pool(callback,
|
||||
pool_name,
|
||||
topic,
|
||||
exchange_name)
|
||||
exchange_name,
|
||||
ack_on_error)
|
||||
|
||||
def consume_in_thread(self):
|
||||
self.connection.consume_in_thread()
|
||||
|
@ -129,6 +129,7 @@ class ConsumerBase(object):
|
||||
self.tag = str(tag)
|
||||
self.kwargs = kwargs
|
||||
self.queue = None
|
||||
self.ack_on_error = kwargs.get('ack_on_error', True)
|
||||
self.reconnect(channel)
|
||||
|
||||
def reconnect(self, channel):
|
||||
@ -138,6 +139,36 @@ class ConsumerBase(object):
|
||||
self.queue = kombu.entity.Queue(**self.kwargs)
|
||||
self.queue.declare()
|
||||
|
||||
def _callback_handler(self, message, callback):
|
||||
"""Call callback with deserialized message.
|
||||
|
||||
Messages that are processed without exception are ack'ed.
|
||||
|
||||
If the message processing generates an exception, it will be
|
||||
ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
|
||||
Rejection is better than waiting for the message to timeout.
|
||||
Rejected messages are immediately requeued.
|
||||
"""
|
||||
|
||||
ack_msg = False
|
||||
try:
|
||||
msg = rpc_common.deserialize_msg(message.payload)
|
||||
callback(msg)
|
||||
ack_msg = True
|
||||
except Exception:
|
||||
if self.ack_on_error:
|
||||
ack_msg = True
|
||||
LOG.exception(_("Failed to process message"
|
||||
" ... skipping it."))
|
||||
else:
|
||||
LOG.exception(_("Failed to process message"
|
||||
" ... will requeue."))
|
||||
finally:
|
||||
if ack_msg:
|
||||
message.ack()
|
||||
else:
|
||||
message.reject()
|
||||
|
||||
def consume(self, *args, **kwargs):
|
||||
"""Actually declare the consumer on the amqp channel. This will
|
||||
start the flow of messages from the queue. Using the
|
||||
@ -150,8 +181,6 @@ class ConsumerBase(object):
|
||||
If kwargs['nowait'] is True, then this call will block until
|
||||
a message is read.
|
||||
|
||||
Messages will automatically be acked if the callback doesn't
|
||||
raise an exception
|
||||
"""
|
||||
|
||||
options = {'consumer_tag': self.tag}
|
||||
@ -162,13 +191,7 @@ class ConsumerBase(object):
|
||||
|
||||
def _callback(raw_message):
|
||||
message = self.channel.message_to_python(raw_message)
|
||||
try:
|
||||
msg = rpc_common.deserialize_msg(message.payload)
|
||||
callback(msg)
|
||||
except Exception:
|
||||
LOG.exception(_("Failed to process message... skipping it."))
|
||||
finally:
|
||||
message.ack()
|
||||
self._callback_handler(message, callback)
|
||||
|
||||
self.queue.consume(*args, callback=_callback, **options)
|
||||
|
||||
@ -635,8 +658,8 @@ class Connection(object):
|
||||
|
||||
def _consume():
|
||||
if info['do_consume']:
|
||||
queues_head = self.consumers[:-1]
|
||||
queues_tail = self.consumers[-1]
|
||||
queues_head = self.consumers[:-1] # not fanout.
|
||||
queues_tail = self.consumers[-1] # fanout
|
||||
for queue in queues_head:
|
||||
queue.consume(nowait=True)
|
||||
queues_tail.consume(nowait=False)
|
||||
@ -685,11 +708,12 @@ class Connection(object):
|
||||
self.declare_consumer(DirectConsumer, topic, callback)
|
||||
|
||||
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
|
||||
exchange_name=None):
|
||||
exchange_name=None, ack_on_error=True):
|
||||
"""Create a 'topic' consumer."""
|
||||
self.declare_consumer(functools.partial(TopicConsumer,
|
||||
name=queue_name,
|
||||
exchange_name=exchange_name,
|
||||
ack_on_error=ack_on_error,
|
||||
),
|
||||
topic, callback)
|
||||
|
||||
@ -754,7 +778,7 @@ class Connection(object):
|
||||
self.declare_topic_consumer(topic, proxy_cb, pool_name)
|
||||
|
||||
def join_consumer_pool(self, callback, pool_name, topic,
|
||||
exchange_name=None):
|
||||
exchange_name=None, ack_on_error=True):
|
||||
"""Register as a member of a group of consumers for a given topic from
|
||||
the specified exchange.
|
||||
|
||||
@ -775,6 +799,7 @@ class Connection(object):
|
||||
topic=topic,
|
||||
exchange_name=exchange_name,
|
||||
callback=callback_wrapper,
|
||||
ack_on_error=ack_on_error,
|
||||
)
|
||||
|
||||
|
||||
|
@ -152,6 +152,7 @@ class ConsumerBase(object):
|
||||
except Exception:
|
||||
LOG.exception(_("Failed to process message... skipping it."))
|
||||
finally:
|
||||
# TODO(sandy): Need support for optional ack_on_error.
|
||||
self.session.acknowledge(message)
|
||||
|
||||
def get_receiver(self):
|
||||
@ -615,7 +616,7 @@ class Connection(object):
|
||||
return consumer
|
||||
|
||||
def join_consumer_pool(self, callback, pool_name, topic,
|
||||
exchange_name=None):
|
||||
exchange_name=None, ack_on_error=True):
|
||||
"""Register as a member of a group of consumers for a given topic from
|
||||
the specified exchange.
|
||||
|
||||
|
@ -358,7 +358,6 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
def __init__(self, conf):
|
||||
super(ZmqBaseReactor, self).__init__()
|
||||
|
||||
self.mapping = {}
|
||||
self.proxies = {}
|
||||
self.threads = []
|
||||
self.sockets = []
|
||||
@ -366,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
|
||||
self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
|
||||
|
||||
def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
|
||||
zmq_type_out=None, in_bind=True, out_bind=True,
|
||||
subscribe=None):
|
||||
def register(self, proxy, in_addr, zmq_type_in,
|
||||
in_bind=True, subscribe=None):
|
||||
|
||||
LOG.info(_("Registering reactor"))
|
||||
|
||||
@ -384,21 +382,6 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
|
||||
LOG.info(_("In reactor registered"))
|
||||
|
||||
if not out_addr:
|
||||
return
|
||||
|
||||
if zmq_type_out not in (zmq.PUSH, zmq.PUB):
|
||||
raise RPCException("Bad output socktype")
|
||||
|
||||
# Items push out.
|
||||
outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
|
||||
|
||||
self.mapping[inq] = outq
|
||||
self.mapping[outq] = inq
|
||||
self.sockets.append(outq)
|
||||
|
||||
LOG.info(_("Out reactor registered"))
|
||||
|
||||
def consume_in_thread(self):
|
||||
def _consume(sock):
|
||||
LOG.info(_("Consuming socket"))
|
||||
@ -516,8 +499,7 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
try:
|
||||
self.register(consumption_proxy,
|
||||
consume_in,
|
||||
zmq.PULL,
|
||||
out_bind=True)
|
||||
zmq.PULL)
|
||||
except zmq.ZMQError:
|
||||
if os.access(ipc_dir, os.X_OK):
|
||||
with excutils.save_and_reraise_exception():
|
||||
@ -559,11 +541,6 @@ class ZmqReactor(ZmqBaseReactor):
|
||||
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
||||
data = sock.recv()
|
||||
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
|
||||
if sock in self.mapping:
|
||||
LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
|
||||
'data': data})
|
||||
self.mapping[sock].send(data)
|
||||
return
|
||||
|
||||
proxy = self.proxies[sock]
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user