From 708d9d842b7f78cd544917841796b92b1cbd7b41 Mon Sep 17 00:00:00 2001 From: gtt116 Date: Thu, 16 Apr 2015 08:08:14 +0000 Subject: [PATCH] rabbit: Fix message ttl not work impl_rabbit set timeout into message's header with {'ttl': (timeout * 1000)}, this mean doesn't work in real, messages still stays in queue after the ttl. As RabbitMQ document said (http://www.rabbitmq.com/ttl.html#per-message-ttl), we should passing "expiration" into message's property rather than header to make it work. Change-Id: I5d6ae72e69f856c56fb83fb939ed35246716e04d Closes-bug: #1444854 --- oslo_messaging/_drivers/impl_rabbit.py | 7 +++---- .../tests/drivers/test_impl_rabbit.py | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index e4268648a..a1fab724f 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -927,12 +927,11 @@ class Connection(object): channel=self.channel, routing_key=routing_key) - headers = {} + expiration = None if timeout: # AMQP TTL is in milliseconds when set in the property. # Details: http://www.rabbitmq.com/ttl.html#per-message-ttl - # NOTE(sileht): this amqp header doesn't exists ... LP#1444854 - headers['ttl'] = timeout * 1000 + expiration = int(timeout * 1000) # NOTE(sileht): no need to wait more, caller expects # a answer before timeout is reached @@ -948,7 +947,7 @@ class Connection(object): transport_timeout = heartbeat_timeout with self._transport_socket_timeout(transport_timeout): - producer.publish(msg, headers=headers) + producer.publish(msg, expiration=expiration) PUBLISHER_DECLARED_QUEUES = collections.defaultdict(set) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 69bc1bada..c8f58bdbe 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -174,6 +174,24 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase): class TestRabbitPublisher(test_utils.BaseTestCase): + @mock.patch('kombu.messaging.Producer.publish') + def test_send_with_timeout(self, fake_publish): + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn: + conn = pool_conn.connection + conn._publish(mock.Mock(), 'msg', routing_key='routing_key', + timeout=1) + fake_publish.assert_called_with('msg', expiration=1000) + + @mock.patch('kombu.messaging.Producer.publish') + def test_send_no_timeout(self, fake_publish): + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn: + conn = pool_conn.connection + conn._publish(mock.Mock(), 'msg', routing_key='routing_key') + fake_publish.assert_called_with('msg', expiration=None) def test_declared_queue_publisher(self): transport = oslo_messaging.get_transport(self.conf,