Extend logging in amqpdriver

Log message specific information (unique_id/msg_id etc) to make
debugging easier.
 
Log msg_type, exchange_type, unique_id, msg_id, exchange,
reply queue, topic before actual send;
Log unique_id, msg_id, reply queue before sending reply;
Log msg_id, unique_id and reply_q if needed when receive message.

Change-Id: I10d7d3eb6676ea60e5a932f5932e14bafa50532c
This commit is contained in:
Yulia Portnova 2015-09-09 17:08:25 +03:00
parent 88667c6159
commit a672218ea5

View File

@ -63,6 +63,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
msg['ending'] = True
rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID]
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
@ -70,6 +71,12 @@ class AMQPIncomingMessage(base.IncomingMessage):
if self.reply_q:
msg['_msg_id'] = self.msg_id
try:
if ending:
LOG.debug("sending reply msg_id: %(msg_id)s "
"reply queue: %(reply_q)s" % {
'msg_id': self.msg_id,
'unique_id': unique_id,
'reply_q': self.reply_q})
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
except rpc_amqp.AMQPDestinationNotFound:
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
@ -180,6 +187,13 @@ class AMQPListener(base.Listener):
unique_id = self.msg_id_cache.check_duplicate_message(message)
if ctxt.reply_q:
LOG.debug(
"received message msg_id: %(msg_id)s reply to %(queue)s" % {
'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id})
else:
LOG.debug("received message unique_id: %s " % unique_id)
self.incoming.append(AMQPIncomingMessage(self,
ctxt.to_dict(),
message,
@ -276,6 +290,8 @@ class ReplyWaiter(object):
def __call__(self, message):
message.acknowledge()
incoming_msg_id = message.pop('_msg_id', None)
if message.get('ending'):
LOG.debug("received reply msg_id: %s" % incoming_msg_id)
self.waiters.put(incoming_msg_id, message)
def listen(self, msg_id):
@ -389,10 +405,11 @@ class AMQPDriverBase(base.BaseDriver):
if wait_for_reply:
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug('MSG_ID is %s', msg_id)
msg.update({'_reply_q': self._get_reply_q()})
rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID]
rpc_amqp.pack_context(msg, context)
if envelope:
@ -400,21 +417,37 @@ class AMQPDriverBase(base.BaseDriver):
if wait_for_reply:
self._waiter.listen(msg_id)
log_msg = "CALL msg_id: %s " % msg_id
else:
log_msg = "CAST unique_id: %s " % unique_id
try:
with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn:
if notify:
conn.notify_send(self._get_exchange(target),
target.topic, msg, retry=retry)
exchange = self._get_exchange(target)
log_msg += "NOTIFY exchange '%(exchange)s'" \
" topic '%(topic)s'" % {
'exchange': exchange,
'topic': target.topic}
LOG.debug(log_msg)
conn.notify_send(exchange, target.topic, msg, retry=retry)
elif target.fanout:
log_msg += "FANOUT topic '%(topic)s'" % {
'topic': target.topic}
LOG.debug(log_msg)
conn.fanout_send(target.topic, msg, retry=retry)
else:
topic = target.topic
exchange = self._get_exchange(target)
if target.server:
topic = '%s.%s' % (target.topic, target.server)
conn.topic_send(exchange_name=self._get_exchange(target),
topic=topic, msg=msg, timeout=timeout,
retry=retry)
log_msg += "exchange '%(exchange)s'" \
" topic '%(topic)s'" % {
'exchange': exchange,
'topic': target.topic}
LOG.debug(log_msg)
conn.topic_send(exchange_name=exchange, topic=topic,
msg=msg, timeout=timeout, retry=retry)
if wait_for_reply:
result = self._waiter.wait(msg_id, timeout)