From a672218ea5db664f626d7a4d898cfc7670f2d9f3 Mon Sep 17 00:00:00 2001 From: Yulia Portnova Date: Wed, 9 Sep 2015 17:08:25 +0300 Subject: [PATCH] Extend logging in amqpdriver Log message specific information (unique_id/msg_id etc) to make debugging easier. Log msg_type, exchange_type, unique_id, msg_id, exchange, reply queue, topic before actual send; Log unique_id, msg_id, reply queue before sending reply; Log msg_id, unique_id and reply_q if needed when receive message. Change-Id: I10d7d3eb6676ea60e5a932f5932e14bafa50532c --- oslo_messaging/_drivers/amqpdriver.py | 45 +++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index c60c6fc6d..d3405086a 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -63,6 +63,7 @@ class AMQPIncomingMessage(base.IncomingMessage): msg['ending'] = True rpc_amqp._add_unique_id(msg) + unique_id = msg[rpc_amqp.UNIQUE_ID] # If a reply_q exists, add the msg_id to the reply and pass the # reply_q to direct_send() to use it as the response queue. @@ -70,6 +71,12 @@ class AMQPIncomingMessage(base.IncomingMessage): if self.reply_q: msg['_msg_id'] = self.msg_id try: + if ending: + LOG.debug("sending reply msg_id: %(msg_id)s " + "reply queue: %(reply_q)s" % { + 'msg_id': self.msg_id, + 'unique_id': unique_id, + 'reply_q': self.reply_q}) conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) except rpc_amqp.AMQPDestinationNotFound: self._obsolete_reply_queues.add(self.reply_q, self.msg_id) @@ -180,6 +187,13 @@ class AMQPListener(base.Listener): unique_id = self.msg_id_cache.check_duplicate_message(message) + if ctxt.reply_q: + LOG.debug( + "received message msg_id: %(msg_id)s reply to %(queue)s" % { + 'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id}) + else: + LOG.debug("received message unique_id: %s " % unique_id) + self.incoming.append(AMQPIncomingMessage(self, ctxt.to_dict(), message, @@ -276,6 +290,8 @@ class ReplyWaiter(object): def __call__(self, message): message.acknowledge() incoming_msg_id = message.pop('_msg_id', None) + if message.get('ending'): + LOG.debug("received reply msg_id: %s" % incoming_msg_id) self.waiters.put(incoming_msg_id, message) def listen(self, msg_id): @@ -389,10 +405,11 @@ class AMQPDriverBase(base.BaseDriver): if wait_for_reply: msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) - LOG.debug('MSG_ID is %s', msg_id) msg.update({'_reply_q': self._get_reply_q()}) rpc_amqp._add_unique_id(msg) + unique_id = msg[rpc_amqp.UNIQUE_ID] + rpc_amqp.pack_context(msg, context) if envelope: @@ -400,21 +417,37 @@ class AMQPDriverBase(base.BaseDriver): if wait_for_reply: self._waiter.listen(msg_id) + log_msg = "CALL msg_id: %s " % msg_id + else: + log_msg = "CAST unique_id: %s " % unique_id try: with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn: if notify: - conn.notify_send(self._get_exchange(target), - target.topic, msg, retry=retry) + exchange = self._get_exchange(target) + log_msg += "NOTIFY exchange '%(exchange)s'" \ + " topic '%(topic)s'" % { + 'exchange': exchange, + 'topic': target.topic} + LOG.debug(log_msg) + conn.notify_send(exchange, target.topic, msg, retry=retry) elif target.fanout: + log_msg += "FANOUT topic '%(topic)s'" % { + 'topic': target.topic} + LOG.debug(log_msg) conn.fanout_send(target.topic, msg, retry=retry) else: topic = target.topic + exchange = self._get_exchange(target) if target.server: topic = '%s.%s' % (target.topic, target.server) - conn.topic_send(exchange_name=self._get_exchange(target), - topic=topic, msg=msg, timeout=timeout, - retry=retry) + log_msg += "exchange '%(exchange)s'" \ + " topic '%(topic)s'" % { + 'exchange': exchange, + 'topic': target.topic} + LOG.debug(log_msg) + conn.topic_send(exchange_name=exchange, topic=topic, + msg=msg, timeout=timeout, retry=retry) if wait_for_reply: result = self._waiter.wait(msg_id, timeout)