[rabbit] Ignore stream offset header when stream queue is not used
Consumer callback should look for x-stream-offset header only if the stream mode is used. Otherwise the message headers attribute could be None and the error appears when using get method. Closes-Bug: #2084168 Change-Id: I7f9c742f8f557d9faae2cd749d34dcb15d8005c0
This commit is contained in:
parent
b3d7270b3a
commit
930d97599c
@ -631,12 +631,12 @@ class Consumer(object):
|
||||
|
||||
Messages that are processed and ack'ed.
|
||||
"""
|
||||
|
||||
offset = message.headers.get("x-stream-offset")
|
||||
if offset is not None:
|
||||
LOG.debug("Stream for %s current offset: %s", self.queue_name,
|
||||
offset)
|
||||
self.next_stream_offset = offset + 1
|
||||
if self.rabbit_stream_fanout:
|
||||
offset = message.headers.get("x-stream-offset")
|
||||
if offset is not None:
|
||||
LOG.debug("Stream for %s current offset: %s",
|
||||
self.queue_name, offset)
|
||||
self.next_stream_offset = offset + 1
|
||||
|
||||
m2p = getattr(self.queue.channel, 'message_to_python', None)
|
||||
if m2p:
|
||||
@ -1099,7 +1099,7 @@ class Connection(object):
|
||||
info = {'err_str': exc, 'sleep_time': interval}
|
||||
info.update(self._get_connection_info(conn_error=True))
|
||||
|
||||
if 'Basic.cancel' in str(exc):
|
||||
if self.rabbit_stream_fanout and 'Basic.cancel' in str(exc):
|
||||
# This branch allows for consumer offset reset
|
||||
# in the unlikely case consumers are cancelled. This may
|
||||
# happen, for example, when we delete the stream queue.
|
||||
|
Loading…
x
Reference in New Issue
Block a user