Add RPC incoming and reply log

Typically a simple log will not narrow down the performance, but give
us more information about the service status.

The commit 859e0d4eaa directly revert
the log due to the msg_id is not accessable for non-rabbit drivers.
It's not the right way to make things better.

So add the log back with the attribute msg_id for base IncomingMessage
class.

Closes-Bug: #2072156

Related-Bug: #1847747
Related-Bug: #1855775

Change-Id: Ib35c8fbb24d5c51d3b54e8ca63e663428318eca5
This commit is contained in:
LIU Yulong 2019-10-11 23:22:23 +08:00 committed by Takashi Kajinami
parent 0d3338fd1d
commit 104a63b9aa
4 changed files with 28 additions and 5 deletions

View File

@ -148,11 +148,11 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q, def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q,
client_timeout, obsolete_reply_queues, client_timeout, obsolete_reply_queues,
message_operations_handler): message_operations_handler):
super(AMQPIncomingMessage, self).__init__(ctxt, message) super(AMQPIncomingMessage, self).__init__(ctxt, message, msg_id)
self.orig_msg_id = msg_id
self.listener = listener self.listener = listener
self.unique_id = unique_id self.unique_id = unique_id
self.msg_id = msg_id
self.reply_q = reply_q self.reply_q = reply_q
self.client_timeout = client_timeout self.client_timeout = client_timeout
self._obsolete_reply_queues = obsolete_reply_queues self._obsolete_reply_queues = obsolete_reply_queues
@ -184,7 +184,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
def reply(self, reply=None, failure=None): def reply(self, reply=None, failure=None):
if not self.msg_id: if not self.orig_msg_id:
# NOTE(Alexei_987) not sending reply, if msg_id is empty # NOTE(Alexei_987) not sending reply, if msg_id is empty
# because reply should not be expected by caller side # because reply should not be expected by caller side
return return

View File

@ -14,6 +14,7 @@
import abc import abc
import threading import threading
import uuid
from oslo_config import cfg from oslo_config import cfg
from oslo_utils import excutils from oslo_utils import excutils
@ -78,10 +79,14 @@ class IncomingMessage(object, metaclass=abc.ABCMeta):
:type message: dict :type message: dict
""" """
def __init__(self, ctxt, message): def __init__(self, ctxt, message, msg_id=None):
self.ctxt = ctxt self.ctxt = ctxt
self.message = message self.message = message
self.client_timeout = None self.client_timeout = None
if msg_id is None:
self.msg_id = str(uuid.uuid4())
else:
self.msg_id = msg_id
def acknowledge(self): def acknowledge(self):
"""Called by the server to acknowledge receipt of the message. When """Called by the server to acknowledge receipt of the message. When

View File

@ -122,6 +122,7 @@ A simple example of an RPC server with multiple endpoints might be::
import logging import logging
import sys import sys
import time
from oslo_messaging import exceptions from oslo_messaging import exceptions
from oslo_messaging.rpc import dispatcher as rpc_dispatcher from oslo_messaging.rpc import dispatcher as rpc_dispatcher
@ -151,6 +152,12 @@ class RPCServer(msg_server.MessageHandlingServer):
def _process_incoming(self, incoming): def _process_incoming(self, incoming):
message = incoming[0] message = incoming[0]
rpc_method = message.message.get('method')
start = time.time()
LOG.debug("Receive incoming message with id %(msg_id)s and "
"method: %(method)s.",
{"msg_id": message.msg_id,
"method": rpc_method})
# TODO(sileht): We should remove that at some point and do # TODO(sileht): We should remove that at some point and do
# this directly in the driver # this directly in the driver
@ -182,8 +189,19 @@ class RPCServer(msg_server.MessageHandlingServer):
try: try:
if failure is None: if failure is None:
message.reply(res) message.reply(res)
LOG.debug("Replied success message with id %(msg_id)s and "
"method: %(method)s. Time elapsed: %(elapsed).3f",
{"msg_id": message.msg_id,
"method": rpc_method,
"elapsed": (time.time() - start)})
else: else:
message.reply(failure=failure) message.reply(failure=failure)
LOG.debug("Replied failure for incoming message with "
"id %(msg_id)s and method: %(method)s. "
"Time elapsed: %(elapsed).3f",
{"msg_id": message.msg_id,
"method": rpc_method,
"elapsed": (time.time() - start)})
except exceptions.MessageUndeliverable as e: except exceptions.MessageUndeliverable as e:
LOG.exception( LOG.exception(
"MessageUndeliverable error, " "MessageUndeliverable error, "

View File

@ -412,7 +412,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
except Exception as ex: except Exception as ex:
self.assertIsInstance(ex, ValueError) self.assertIsInstance(ex, ValueError)
self.assertEqual('dsfoo', str(ex)) self.assertEqual('dsfoo', str(ex))
self.assertTrue(len(debugs) == 0) self.assertTrue(len(debugs) == 2)
self.assertGreater(len(errors), 0) self.assertGreater(len(errors), 0)
else: else:
self.assertTrue(False) self.assertTrue(False)