From 930d97599c0048e84f7c47dca0d62d2cc0562166 Mon Sep 17 00:00:00 2001 From: Vladimir Kozhukalov Date: Wed, 9 Oct 2024 18:51:35 -0500 Subject: [PATCH] [rabbit] Ignore stream offset header when stream queue is not used Consumer callback should look for x-stream-offset header only if the stream mode is used. Otherwise the message headers attribute could be None and the error appears when using get method. Closes-Bug: #2084168 Change-Id: I7f9c742f8f557d9faae2cd749d34dcb15d8005c0 --- oslo_messaging/_drivers/impl_rabbit.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 17d224136..ba5c92dce 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -631,12 +631,12 @@ class Consumer(object): Messages that are processed and ack'ed. """ - - offset = message.headers.get("x-stream-offset") - if offset is not None: - LOG.debug("Stream for %s current offset: %s", self.queue_name, - offset) - self.next_stream_offset = offset + 1 + if self.rabbit_stream_fanout: + offset = message.headers.get("x-stream-offset") + if offset is not None: + LOG.debug("Stream for %s current offset: %s", + self.queue_name, offset) + self.next_stream_offset = offset + 1 m2p = getattr(self.queue.channel, 'message_to_python', None) if m2p: @@ -1099,7 +1099,7 @@ class Connection(object): info = {'err_str': exc, 'sleep_time': interval} info.update(self._get_connection_info(conn_error=True)) - if 'Basic.cancel' in str(exc): + if self.rabbit_stream_fanout and 'Basic.cancel' in str(exc): # This branch allows for consumer offset reset # in the unlikely case consumers are cancelled. This may # happen, for example, when we delete the stream queue.