diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index e4268648a..a1fab724f 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -927,12 +927,11 @@ class Connection(object): channel=self.channel, routing_key=routing_key) - headers = {} + expiration = None if timeout: # AMQP TTL is in milliseconds when set in the property. # Details: http://www.rabbitmq.com/ttl.html#per-message-ttl - # NOTE(sileht): this amqp header doesn't exists ... LP#1444854 - headers['ttl'] = timeout * 1000 + expiration = int(timeout * 1000) # NOTE(sileht): no need to wait more, caller expects # a answer before timeout is reached @@ -948,7 +947,7 @@ class Connection(object): transport_timeout = heartbeat_timeout with self._transport_socket_timeout(transport_timeout): - producer.publish(msg, headers=headers) + producer.publish(msg, expiration=expiration) PUBLISHER_DECLARED_QUEUES = collections.defaultdict(set) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 69bc1bada..c8f58bdbe 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -174,6 +174,24 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase): class TestRabbitPublisher(test_utils.BaseTestCase): + @mock.patch('kombu.messaging.Producer.publish') + def test_send_with_timeout(self, fake_publish): + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn: + conn = pool_conn.connection + conn._publish(mock.Mock(), 'msg', routing_key='routing_key', + timeout=1) + fake_publish.assert_called_with('msg', expiration=1000) + + @mock.patch('kombu.messaging.Producer.publish') + def test_send_no_timeout(self, fake_publish): + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn: + conn = pool_conn.connection + conn._publish(mock.Mock(), 'msg', routing_key='routing_key') + fake_publish.assert_called_with('msg', expiration=None) def test_declared_queue_publisher(self): transport = oslo_messaging.get_transport(self.conf,