Display the reply queue's name in timeout logs
It would be helpful if "Timed out waiting for <service>" log messages at least specified on which `reply_q` it was waited for. Example without the reply_q: ``` 12228 2020-09-14 14:56:37.187 7 WARNING nova.conductor.api [req-1e081db6-808b-4af1-afc1-b87db7839394 - - - - -] Timed out waiting for nova-conductor. Is it running? Or did this service start before nova-conductor? Reattempting establishment of nova-conductor connection...: oslo_messaging.exceptions.MessagingTimeout: Timed out waiting for a reply to message ID 1640e7ef6f314451ba9a75d9ff6136ad ``` Example after adding the reply_q: ``` 12228 2020-09-14 14:56:37.187 7 WARNING nova.conductor.api [req-1e081db6-808b-4af1-afc1-b87db7839394 - - - - -] Timed out waiting for nova-conductor. Is it running? Or did this service start before nova-conductor? Reattempting establishment of nova-conductor connection...: oslo_messaging.exceptions.MessagingTimeout: Timed out waiting for a reply (reply_2882766a63b540dabaf7d019cf0c0cda) to message ID 1640e7ef6f314451ba9a75d9ff6136ad ``` It could help us to more merely debug and observe if something went wrong with a reply queue. Change-Id: Ied2c881c71930dc631919113adc00112648f9d72 Closes-Bug: #1896925
This commit is contained in:
parent
b5244bd05a
commit
97d457f0af
@ -583,9 +583,11 @@ class ReplyWaiter(object):
|
||||
self.waiters.remove(msg_id)
|
||||
|
||||
@staticmethod
|
||||
def _raise_timeout_exception(msg_id):
|
||||
def _raise_timeout_exception(msg_id, reply_q):
|
||||
raise oslo_messaging.MessagingTimeout(
|
||||
'Timed out waiting for a reply to message ID %s.', msg_id)
|
||||
'Timed out waiting for a reply %(reply_q)s '
|
||||
'to message ID %(msg_id)s.',
|
||||
{'msg_id': msg_id, 'reply_q': reply_q})
|
||||
|
||||
def _process_reply(self, data):
|
||||
self.msg_id_cache.check_duplicate_message(data)
|
||||
@ -599,7 +601,7 @@ class ReplyWaiter(object):
|
||||
ending = data.get('ending', False)
|
||||
return result, ending
|
||||
|
||||
def wait(self, msg_id, timeout, call_monitor_timeout):
|
||||
def wait(self, msg_id, timeout, call_monitor_timeout, reply_q):
|
||||
# NOTE(sileht): for each msg_id we receive two amqp message
|
||||
# first one with the payload, a second one to ensure the other
|
||||
# have finish to send the payload
|
||||
@ -617,16 +619,26 @@ class ReplyWaiter(object):
|
||||
final_reply = None
|
||||
ending = False
|
||||
while not ending:
|
||||
timeout = timer.check_return(self._raise_timeout_exception, msg_id)
|
||||
timeout = timer.check_return(
|
||||
self._raise_timeout_exception,
|
||||
msg_id,
|
||||
reply_q
|
||||
)
|
||||
if call_monitor_timer and timeout > 0:
|
||||
cm_timeout = call_monitor_timer.check_return(
|
||||
self._raise_timeout_exception, msg_id)
|
||||
self._raise_timeout_exception,
|
||||
msg_id,
|
||||
reply_q
|
||||
)
|
||||
if cm_timeout < timeout:
|
||||
timeout = cm_timeout
|
||||
try:
|
||||
message = self.waiters.get(msg_id, timeout=timeout)
|
||||
except queue.Empty:
|
||||
self._raise_timeout_exception(msg_id)
|
||||
self._raise_timeout_exception(
|
||||
msg_id,
|
||||
reply_q
|
||||
)
|
||||
|
||||
reply, ending = self._process_reply(message)
|
||||
if reply is not None:
|
||||
@ -700,6 +712,7 @@ class AMQPDriverBase(base.BaseDriver):
|
||||
envelope=True, notify=False, retry=None, transport_options=None):
|
||||
|
||||
msg = message
|
||||
reply_q = None
|
||||
if 'method' in msg:
|
||||
LOG.debug('Calling RPC method %s on target %s', msg.get('method'),
|
||||
target.topic)
|
||||
@ -707,13 +720,13 @@ class AMQPDriverBase(base.BaseDriver):
|
||||
LOG.debug('Sending message to topic %s', target.topic)
|
||||
|
||||
if wait_for_reply:
|
||||
_reply_q = self._get_reply_q()
|
||||
reply_q = self._get_reply_q()
|
||||
msg_id = uuid.uuid4().hex
|
||||
msg.update({'_msg_id': msg_id})
|
||||
msg.update({'_reply_q': _reply_q})
|
||||
msg.update({'_reply_q': reply_q})
|
||||
msg.update({'_timeout': call_monitor_timeout})
|
||||
LOG.info('Expecting reply to msg %s in queue %s', msg_id,
|
||||
_reply_q)
|
||||
reply_q)
|
||||
|
||||
rpc_amqp._add_unique_id(msg)
|
||||
unique_id = msg[rpc_amqp.UNIQUE_ID]
|
||||
@ -756,7 +769,7 @@ class AMQPDriverBase(base.BaseDriver):
|
||||
|
||||
if wait_for_reply:
|
||||
result = self._waiter.wait(msg_id, timeout,
|
||||
call_monitor_timeout)
|
||||
call_monitor_timeout, reply_q)
|
||||
if isinstance(result, Exception):
|
||||
raise result
|
||||
return result
|
||||
|
@ -668,7 +668,7 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
|
||||
wait_conditions = []
|
||||
orig_reply_waiter = amqpdriver.ReplyWaiter.wait
|
||||
|
||||
def reply_waiter(self, msg_id, timeout, call_monitor_timeout):
|
||||
def reply_waiter(self, msg_id, timeout, call_monitor_timeout, reply_q):
|
||||
if wait_conditions:
|
||||
cond = wait_conditions.pop()
|
||||
with cond:
|
||||
@ -676,7 +676,7 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
|
||||
with cond:
|
||||
cond.wait()
|
||||
return orig_reply_waiter(self, msg_id, timeout,
|
||||
call_monitor_timeout)
|
||||
call_monitor_timeout, reply_q)
|
||||
|
||||
self.useFixture(fixtures.MockPatchObject(
|
||||
amqpdriver.ReplyWaiter, 'wait', reply_waiter))
|
||||
|
4
releasenotes/notes/reply_q-timeout-e3c3bae636e8bc74.yaml
Normal file
4
releasenotes/notes/reply_q-timeout-e3c3bae636e8bc74.yaml
Normal file
@ -0,0 +1,4 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
The name of the ``reply_q`` is now logged when a timeout occurs while waiting for a reply.
|
Loading…
Reference in New Issue
Block a user