Merge "Extend logging in amqpdriver"
This commit is contained in:
commit
6aa9276ef8
@ -63,6 +63,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
||||
msg['ending'] = True
|
||||
|
||||
rpc_amqp._add_unique_id(msg)
|
||||
unique_id = msg[rpc_amqp.UNIQUE_ID]
|
||||
|
||||
# If a reply_q exists, add the msg_id to the reply and pass the
|
||||
# reply_q to direct_send() to use it as the response queue.
|
||||
@ -70,6 +71,12 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
||||
if self.reply_q:
|
||||
msg['_msg_id'] = self.msg_id
|
||||
try:
|
||||
if ending:
|
||||
LOG.debug("sending reply msg_id: %(msg_id)s "
|
||||
"reply queue: %(reply_q)s" % {
|
||||
'msg_id': self.msg_id,
|
||||
'unique_id': unique_id,
|
||||
'reply_q': self.reply_q})
|
||||
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
|
||||
except rpc_amqp.AMQPDestinationNotFound:
|
||||
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
|
||||
@ -180,6 +187,13 @@ class AMQPListener(base.Listener):
|
||||
|
||||
unique_id = self.msg_id_cache.check_duplicate_message(message)
|
||||
|
||||
if ctxt.reply_q:
|
||||
LOG.debug(
|
||||
"received message msg_id: %(msg_id)s reply to %(queue)s" % {
|
||||
'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id})
|
||||
else:
|
||||
LOG.debug("received message unique_id: %s " % unique_id)
|
||||
|
||||
self.incoming.append(AMQPIncomingMessage(self,
|
||||
ctxt.to_dict(),
|
||||
message,
|
||||
@ -276,6 +290,8 @@ class ReplyWaiter(object):
|
||||
def __call__(self, message):
|
||||
message.acknowledge()
|
||||
incoming_msg_id = message.pop('_msg_id', None)
|
||||
if message.get('ending'):
|
||||
LOG.debug("received reply msg_id: %s" % incoming_msg_id)
|
||||
self.waiters.put(incoming_msg_id, message)
|
||||
|
||||
def listen(self, msg_id):
|
||||
@ -389,10 +405,11 @@ class AMQPDriverBase(base.BaseDriver):
|
||||
if wait_for_reply:
|
||||
msg_id = uuid.uuid4().hex
|
||||
msg.update({'_msg_id': msg_id})
|
||||
LOG.debug('MSG_ID is %s', msg_id)
|
||||
msg.update({'_reply_q': self._get_reply_q()})
|
||||
|
||||
rpc_amqp._add_unique_id(msg)
|
||||
unique_id = msg[rpc_amqp.UNIQUE_ID]
|
||||
|
||||
rpc_amqp.pack_context(msg, context)
|
||||
|
||||
if envelope:
|
||||
@ -400,21 +417,37 @@ class AMQPDriverBase(base.BaseDriver):
|
||||
|
||||
if wait_for_reply:
|
||||
self._waiter.listen(msg_id)
|
||||
log_msg = "CALL msg_id: %s " % msg_id
|
||||
else:
|
||||
log_msg = "CAST unique_id: %s " % unique_id
|
||||
|
||||
try:
|
||||
with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn:
|
||||
if notify:
|
||||
conn.notify_send(self._get_exchange(target),
|
||||
target.topic, msg, retry=retry)
|
||||
exchange = self._get_exchange(target)
|
||||
log_msg += "NOTIFY exchange '%(exchange)s'" \
|
||||
" topic '%(topic)s'" % {
|
||||
'exchange': exchange,
|
||||
'topic': target.topic}
|
||||
LOG.debug(log_msg)
|
||||
conn.notify_send(exchange, target.topic, msg, retry=retry)
|
||||
elif target.fanout:
|
||||
log_msg += "FANOUT topic '%(topic)s'" % {
|
||||
'topic': target.topic}
|
||||
LOG.debug(log_msg)
|
||||
conn.fanout_send(target.topic, msg, retry=retry)
|
||||
else:
|
||||
topic = target.topic
|
||||
exchange = self._get_exchange(target)
|
||||
if target.server:
|
||||
topic = '%s.%s' % (target.topic, target.server)
|
||||
conn.topic_send(exchange_name=self._get_exchange(target),
|
||||
topic=topic, msg=msg, timeout=timeout,
|
||||
retry=retry)
|
||||
log_msg += "exchange '%(exchange)s'" \
|
||||
" topic '%(topic)s'" % {
|
||||
'exchange': exchange,
|
||||
'topic': target.topic}
|
||||
LOG.debug(log_msg)
|
||||
conn.topic_send(exchange_name=exchange, topic=topic,
|
||||
msg=msg, timeout=timeout, retry=retry)
|
||||
|
||||
if wait_for_reply:
|
||||
result = self._waiter.wait(msg_id, timeout)
|
||||
|
Loading…
x
Reference in New Issue
Block a user