diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 4a49f6ad5..394d46668 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -148,11 +148,11 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q, client_timeout, obsolete_reply_queues, message_operations_handler): - super(AMQPIncomingMessage, self).__init__(ctxt, message) + super(AMQPIncomingMessage, self).__init__(ctxt, message, msg_id) + self.orig_msg_id = msg_id self.listener = listener self.unique_id = unique_id - self.msg_id = msg_id self.reply_q = reply_q self.client_timeout = client_timeout self._obsolete_reply_queues = obsolete_reply_queues @@ -184,7 +184,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) def reply(self, reply=None, failure=None): - if not self.msg_id: + if not self.orig_msg_id: # NOTE(Alexei_987) not sending reply, if msg_id is empty # because reply should not be expected by caller side return diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index a03c1d7c4..d8533a388 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -14,6 +14,7 @@ import abc import threading +import uuid from oslo_config import cfg from oslo_utils import excutils @@ -78,10 +79,14 @@ class IncomingMessage(object, metaclass=abc.ABCMeta): :type message: dict """ - def __init__(self, ctxt, message): + def __init__(self, ctxt, message, msg_id=None): self.ctxt = ctxt self.message = message self.client_timeout = None + if msg_id is None: + self.msg_id = str(uuid.uuid4()) + else: + self.msg_id = msg_id def acknowledge(self): """Called by the server to acknowledge receipt of the message. When diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index 94d488812..a9f71ce75 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -122,6 +122,7 @@ A simple example of an RPC server with multiple endpoints might be:: import logging import sys +import time from oslo_messaging import exceptions from oslo_messaging.rpc import dispatcher as rpc_dispatcher @@ -151,6 +152,12 @@ class RPCServer(msg_server.MessageHandlingServer): def _process_incoming(self, incoming): message = incoming[0] + rpc_method = message.message.get('method') + start = time.time() + LOG.debug("Receive incoming message with id %(msg_id)s and " + "method: %(method)s.", + {"msg_id": message.msg_id, + "method": rpc_method}) # TODO(sileht): We should remove that at some point and do # this directly in the driver @@ -182,8 +189,19 @@ class RPCServer(msg_server.MessageHandlingServer): try: if failure is None: message.reply(res) + LOG.debug("Replied success message with id %(msg_id)s and " + "method: %(method)s. Time elapsed: %(elapsed).3f", + {"msg_id": message.msg_id, + "method": rpc_method, + "elapsed": (time.time() - start)}) else: message.reply(failure=failure) + LOG.debug("Replied failure for incoming message with " + "id %(msg_id)s and method: %(method)s. " + "Time elapsed: %(elapsed).3f", + {"msg_id": message.msg_id, + "method": rpc_method, + "elapsed": (time.time() - start)}) except exceptions.MessageUndeliverable as e: LOG.exception( "MessageUndeliverable error, " diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 06cf1c7ca..bc5ff7f85 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -412,7 +412,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): except Exception as ex: self.assertIsInstance(ex, ValueError) self.assertEqual('dsfoo', str(ex)) - self.assertTrue(len(debugs) == 0) + self.assertTrue(len(debugs) == 2) self.assertGreater(len(errors), 0) else: self.assertTrue(False)