Merge "rabbit: make ack/requeue thread-safe"
This commit is contained in:
commit
104c1da138
@ -35,11 +35,27 @@ from oslo_messaging._i18n import _LW
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# Minimum/Maximum sleep between a poll and ack/requeue
|
||||
# Maximum should be small enough to not get rejected ack,
|
||||
# minimum should be big enough to not burn the CPU.
|
||||
ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001
|
||||
ACK_REQUEUE_EVERY_SECONDS_MAX = 1.0
|
||||
|
||||
|
||||
def do_pending_tasks(tasks):
|
||||
while True:
|
||||
try:
|
||||
task = tasks.get(block=False)
|
||||
except moves.queue.Empty:
|
||||
break
|
||||
else:
|
||||
task()
|
||||
|
||||
|
||||
class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
|
||||
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q,
|
||||
obsolete_reply_queues):
|
||||
obsolete_reply_queues, pending_message_actions):
|
||||
super(AMQPIncomingMessage, self).__init__(ctxt, message)
|
||||
self.listener = listener
|
||||
|
||||
@ -47,6 +63,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
self.msg_id = msg_id
|
||||
self.reply_q = reply_q
|
||||
self._obsolete_reply_queues = obsolete_reply_queues
|
||||
self._pending_tasks = pending_message_actions
|
||||
self.stopwatch = timeutils.StopWatch()
|
||||
self.stopwatch.start()
|
||||
|
||||
@ -116,7 +133,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
return
|
||||
|
||||
def acknowledge(self):
|
||||
self.message.acknowledge()
|
||||
self._pending_tasks.put(self.message.acknowledge)
|
||||
self.listener.msg_id_cache.add(self.unique_id)
|
||||
|
||||
def requeue(self):
|
||||
@ -126,7 +143,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
# msg_id_cache, the message will be reconsumed, the only difference is
|
||||
# the message stay at the beginning of the queue instead of moving to
|
||||
# the end.
|
||||
self.message.requeue()
|
||||
self._pending_tasks.put(self.message.requeue)
|
||||
|
||||
|
||||
class ObsoleteReplyQueuesCache(object):
|
||||
@ -184,6 +201,8 @@ class AMQPListener(base.PollStyleListener):
|
||||
self.incoming = []
|
||||
self._stopped = threading.Event()
|
||||
self._obsolete_reply_queues = ObsoleteReplyQueuesCache()
|
||||
self._pending_tasks = moves.queue.Queue()
|
||||
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
|
||||
|
||||
def __call__(self, message):
|
||||
ctxt = rpc_amqp.unpack_context(message)
|
||||
@ -194,27 +213,45 @@ class AMQPListener(base.PollStyleListener):
|
||||
'msg_id': ctxt.msg_id})
|
||||
else:
|
||||
LOG.debug("received message with unique_id: %s", unique_id)
|
||||
self.incoming.append(AMQPIncomingMessage(self,
|
||||
ctxt.to_dict(),
|
||||
message,
|
||||
unique_id,
|
||||
ctxt.msg_id,
|
||||
ctxt.reply_q,
|
||||
self._obsolete_reply_queues))
|
||||
|
||||
self.incoming.append(AMQPIncomingMessage(
|
||||
self,
|
||||
ctxt.to_dict(),
|
||||
message,
|
||||
unique_id,
|
||||
ctxt.msg_id,
|
||||
ctxt.reply_q,
|
||||
self._obsolete_reply_queues,
|
||||
self._pending_tasks))
|
||||
|
||||
@base.batch_poll_helper
|
||||
def poll(self, timeout=None):
|
||||
stopwatch = timeutils.StopWatch(duration=timeout).start()
|
||||
|
||||
while not self._stopped.is_set():
|
||||
do_pending_tasks(self._pending_tasks)
|
||||
|
||||
if self.incoming:
|
||||
return self.incoming.pop(0)
|
||||
try:
|
||||
self.conn.consume(timeout=timeout)
|
||||
except rpc_common.Timeout:
|
||||
|
||||
left = stopwatch.leftover(return_none=True)
|
||||
if left is None:
|
||||
left = self._current_timeout
|
||||
if left <= 0:
|
||||
return None
|
||||
|
||||
try:
|
||||
self.conn.consume(timeout=min(self._current_timeout, left))
|
||||
except rpc_common.Timeout:
|
||||
self._current_timeout = max(self._current_timeout * 2,
|
||||
ACK_REQUEUE_EVERY_SECONDS_MAX)
|
||||
else:
|
||||
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
|
||||
|
||||
def stop(self):
|
||||
self._stopped.set()
|
||||
self.conn.stop_consuming()
|
||||
do_pending_tasks(self._pending_tasks)
|
||||
|
||||
def cleanup(self):
|
||||
# Closes listener connection
|
||||
@ -269,6 +306,7 @@ class ReplyWaiter(object):
|
||||
self.allowed_remote_exmods = allowed_remote_exmods
|
||||
self.msg_id_cache = rpc_amqp._MsgIdCache()
|
||||
self.waiters = ReplyWaiters()
|
||||
self._pending_tasks = moves.queue.Queue()
|
||||
|
||||
self.conn.declare_direct_consumer(reply_q, self)
|
||||
|
||||
@ -283,17 +321,26 @@ class ReplyWaiter(object):
|
||||
self.conn.stop_consuming()
|
||||
self._thread.join()
|
||||
self._thread = None
|
||||
do_pending_tasks(self._pending_tasks)
|
||||
|
||||
def poll(self):
|
||||
current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
|
||||
while not self._thread_exit_event.is_set():
|
||||
do_pending_tasks(self._pending_tasks)
|
||||
try:
|
||||
self.conn.consume()
|
||||
# ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds
|
||||
self.conn.consume(timeout=current_timeout)
|
||||
except rpc_common.Timeout:
|
||||
current_timeout = max(current_timeout * 2,
|
||||
ACK_REQUEUE_EVERY_SECONDS_MAX)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed to process incoming message, "
|
||||
"retrying..."))
|
||||
else:
|
||||
current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
|
||||
|
||||
def __call__(self, message):
|
||||
message.acknowledge()
|
||||
self._pending_tasks.put(message.acknowledge)
|
||||
incoming_msg_id = message.pop('_msg_id', None)
|
||||
if message.get('ending'):
|
||||
LOG.debug("received reply msg_id: %s", incoming_msg_id)
|
||||
|
Loading…
x
Reference in New Issue
Block a user