From bc46e647118f5d8e52f9b43fcc369823e1f65455 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Tue, 11 Oct 2016 10:06:37 -0400 Subject: [PATCH] [AMQP 1.0] Avoid unnecessary thread switch on ack Avoid blocking the RPC Server thread when it attempts to acknowledge the RPC request message. Blocking is unnecessary as the acknowledge command does not return a status and can be processed asynchronously from the server thread. Avoiding this context switch improves overall RPC call throughput according to the simulator tool by approximately ten percent on my test systems. Change-Id: I71548eb6f9f7dcaf74cb426d4e9b369b54856419 --- oslo_messaging/_drivers/amqp1_driver/controller.py | 12 +++++++----- oslo_messaging/_drivers/impl_amqp1.py | 6 ------ 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index d2d59b642..2e7ec9e18 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -235,17 +235,19 @@ class MessageDispositionTask(Task): super(MessageDispositionTask, self).__init__() self._disposition = disposition self._released = released - self._wakeup = threading.Event() def wait(self): - self._wakeup.wait() + # disposition update does not have to block the sender since there is + # no result to pend for. This avoids a thread context switch with + # every RPC call + pass def _execute(self, controller): try: self._disposition(self._released) - except Exception: - pass - self._wakeup.set() + except Exception as e: + # there's really nothing we can do about a failed disposition. + LOG.exception(_LE("Message acknowledgment failed: %s"), e) class Sender(pyngus.SenderEventHandler): diff --git a/oslo_messaging/_drivers/impl_amqp1.py b/oslo_messaging/_drivers/impl_amqp1.py index 16da05fec..e48c47824 100644 --- a/oslo_messaging/_drivers/impl_amqp1.py +++ b/oslo_messaging/_drivers/impl_amqp1.py @@ -124,18 +124,12 @@ class ProtonIncomingMessage(base.RpcIncomingMessage): task = controller.MessageDispositionTask(self._disposition, released=False) self.listener.driver._ctrl.add_task(task) - rc = task.wait() - if rc: - LOG.debug("Message acknowledge failed: %s", str(rc)) def requeue(self): """Schedule a MessageDispositionTask to release the message""" task = controller.MessageDispositionTask(self._disposition, released=True) self.listener.driver._ctrl.add_task(task) - rc = task.wait() - if rc: - LOG.debug("Message requeue failed: %s", str(rc)) class Queue(object):