Merge "Issue blocking ACK for RPC requests from the consumer thread"
This commit is contained in:
commit
01a37733eb
@ -167,8 +167,34 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
'duration': duration})
|
||||
return
|
||||
|
||||
def heartbeat(self):
|
||||
with self.listener.driver._get_connection(
|
||||
rpc_common.PURPOSE_SEND) as conn:
|
||||
self._send_reply(conn, None, None, ending=False)
|
||||
|
||||
# NOTE(sileht): Those have already be ack in RpcListener IO thread
|
||||
# We keep them as noop until all drivers do the same
|
||||
def acknowledge(self):
|
||||
self._message_operations_handler.do(self.message.acknowledge)
|
||||
pass
|
||||
|
||||
def requeue(self):
|
||||
pass
|
||||
|
||||
|
||||
class NotificationAMQPIncomingMessage(AMQPIncomingMessage):
|
||||
def acknowledge(self):
|
||||
def _do_ack():
|
||||
try:
|
||||
self.message.acknowledge()
|
||||
except Exception as exc:
|
||||
# NOTE(kgiusti): this failure is likely due to a loss of the
|
||||
# connection to the broker. Not much we can do in this case,
|
||||
# especially considering the Notification has already been
|
||||
# dispatched. This *could* result in message duplication
|
||||
# (unacked msg is returned to the queue by the broker), but the
|
||||
# driver tries to catch that using the msg_id_cache.
|
||||
LOG.warning("Failed to acknowledge received message: %s", exc)
|
||||
self._message_operations_handler.do(_do_ack)
|
||||
self.listener.msg_id_cache.add(self.unique_id)
|
||||
|
||||
def requeue(self):
|
||||
@ -178,12 +204,12 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
# msg_id_cache, the message will be reconsumed, the only difference is
|
||||
# the message stay at the beginning of the queue instead of moving to
|
||||
# the end.
|
||||
self._message_operations_handler.do(self.message.requeue)
|
||||
|
||||
def heartbeat(self):
|
||||
with self.listener.driver._get_connection(
|
||||
rpc_common.PURPOSE_SEND) as conn:
|
||||
self._send_reply(conn, None, None, ending=False)
|
||||
def _do_requeue():
|
||||
try:
|
||||
self.message.requeue()
|
||||
except Exception as exc:
|
||||
LOG.warning("Failed to requeue received message: %s", exc)
|
||||
self._message_operations_handler.do(_do_requeue)
|
||||
|
||||
|
||||
class ObsoleteReplyQueuesCache(object):
|
||||
@ -256,7 +282,7 @@ class AMQPListener(base.PollStyleListener):
|
||||
else:
|
||||
LOG.debug("received message with unique_id: %s", unique_id)
|
||||
|
||||
self.incoming.append(AMQPIncomingMessage(
|
||||
self.incoming.append(self.message_cls(
|
||||
self,
|
||||
ctxt.to_dict(),
|
||||
message,
|
||||
@ -319,6 +345,41 @@ class AMQPListener(base.PollStyleListener):
|
||||
self.conn.close()
|
||||
|
||||
|
||||
class RpcAMQPListener(AMQPListener):
|
||||
message_cls = AMQPIncomingMessage
|
||||
|
||||
def __call__(self, message):
|
||||
# NOTE(kgiusti): In the original RPC implementation the RPC server
|
||||
# would acknowledge the request THEN process it. The goal of this was
|
||||
# to prevent duplication if the ack failed. Should the ack fail the
|
||||
# request would be discarded since the broker would not remove the
|
||||
# request from the queue since no ack was received. That would lead to
|
||||
# the request being redelivered at some point. However this approach
|
||||
# meant that the ack was issued from the dispatch thread, not the
|
||||
# consumer thread, which is bad since kombu is not thread safe. So a
|
||||
# change was made to schedule the ack to be sent on the consumer thread
|
||||
# - breaking the ability to catch ack errors before dispatching the
|
||||
# request. To fix this we do the actual ack here in the consumer
|
||||
# callback and avoid the upcall if the ack fails. See
|
||||
# https://bugs.launchpad.net/oslo.messaging/+bug/1695746
|
||||
# for all the gory details...
|
||||
try:
|
||||
message.acknowledge()
|
||||
except Exception as exc:
|
||||
LOG.warning("Discarding RPC request due to failed acknowlege: %s",
|
||||
exc)
|
||||
else:
|
||||
# NOTE(kgiusti): be aware that even if the acknowledge call
|
||||
# succeeds there is no guarantee the broker actually gets the ACK
|
||||
# since acknowledge() simply writes the ACK to the socket (there is
|
||||
# no ACK confirmation coming back from the broker)
|
||||
super(RpcAMQPListener, self).__call__(message)
|
||||
|
||||
|
||||
class NotificationAMQPListener(AMQPListener):
|
||||
message_cls = NotificationAMQPIncomingMessage
|
||||
|
||||
|
||||
class ReplyWaiters(object):
|
||||
|
||||
WAKE_UP = object()
|
||||
@ -590,7 +651,7 @@ class AMQPDriverBase(base.BaseDriver):
|
||||
def listen(self, target, batch_size, batch_timeout):
|
||||
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
|
||||
|
||||
listener = AMQPListener(self, conn)
|
||||
listener = RpcAMQPListener(self, conn)
|
||||
|
||||
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
|
||||
topic=target.topic,
|
||||
@ -608,7 +669,7 @@ class AMQPDriverBase(base.BaseDriver):
|
||||
batch_size, batch_timeout):
|
||||
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
|
||||
|
||||
listener = AMQPListener(self, conn)
|
||||
listener = NotificationAMQPListener(self, conn)
|
||||
for target, priority in targets_and_priorities:
|
||||
conn.declare_topic_consumer(
|
||||
exchange_name=self._get_exchange(target),
|
||||
|
@ -152,6 +152,9 @@ class RPCServer(msg_server.MessageHandlingServer):
|
||||
|
||||
def _process_incoming(self, incoming):
|
||||
message = incoming[0]
|
||||
|
||||
# TODO(sileht): We should remove that at some point and do
|
||||
# this directly in the driver
|
||||
try:
|
||||
message.acknowledge()
|
||||
except Exception:
|
||||
|
Loading…
x
Reference in New Issue
Block a user