rabbit: Fix message ttl not work
impl_rabbit set timeout into message's header with {'ttl': (timeout * 1000)}, this mean doesn't work in real, messages still stays in queue after the ttl. As RabbitMQ document said (http://www.rabbitmq.com/ttl.html#per-message-ttl), we should passing "expiration" into message's property rather than header to make it work. Change-Id: I5d6ae72e69f856c56fb83fb939ed35246716e04d Closes-bug: #1444854
This commit is contained in:
parent
ecb7803d5f
commit
708d9d842b
@ -927,12 +927,11 @@ class Connection(object):
|
|||||||
channel=self.channel,
|
channel=self.channel,
|
||||||
routing_key=routing_key)
|
routing_key=routing_key)
|
||||||
|
|
||||||
headers = {}
|
expiration = None
|
||||||
if timeout:
|
if timeout:
|
||||||
# AMQP TTL is in milliseconds when set in the property.
|
# AMQP TTL is in milliseconds when set in the property.
|
||||||
# Details: http://www.rabbitmq.com/ttl.html#per-message-ttl
|
# Details: http://www.rabbitmq.com/ttl.html#per-message-ttl
|
||||||
# NOTE(sileht): this amqp header doesn't exists ... LP#1444854
|
expiration = int(timeout * 1000)
|
||||||
headers['ttl'] = timeout * 1000
|
|
||||||
|
|
||||||
# NOTE(sileht): no need to wait more, caller expects
|
# NOTE(sileht): no need to wait more, caller expects
|
||||||
# a answer before timeout is reached
|
# a answer before timeout is reached
|
||||||
@ -948,7 +947,7 @@ class Connection(object):
|
|||||||
transport_timeout = heartbeat_timeout
|
transport_timeout = heartbeat_timeout
|
||||||
|
|
||||||
with self._transport_socket_timeout(transport_timeout):
|
with self._transport_socket_timeout(transport_timeout):
|
||||||
producer.publish(msg, headers=headers)
|
producer.publish(msg, expiration=expiration)
|
||||||
|
|
||||||
PUBLISHER_DECLARED_QUEUES = collections.defaultdict(set)
|
PUBLISHER_DECLARED_QUEUES = collections.defaultdict(set)
|
||||||
|
|
||||||
|
@ -174,6 +174,24 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
|
|
||||||
class TestRabbitPublisher(test_utils.BaseTestCase):
|
class TestRabbitPublisher(test_utils.BaseTestCase):
|
||||||
|
@mock.patch('kombu.messaging.Producer.publish')
|
||||||
|
def test_send_with_timeout(self, fake_publish):
|
||||||
|
transport = oslo_messaging.get_transport(self.conf,
|
||||||
|
'kombu+memory:////')
|
||||||
|
with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
|
||||||
|
conn = pool_conn.connection
|
||||||
|
conn._publish(mock.Mock(), 'msg', routing_key='routing_key',
|
||||||
|
timeout=1)
|
||||||
|
fake_publish.assert_called_with('msg', expiration=1000)
|
||||||
|
|
||||||
|
@mock.patch('kombu.messaging.Producer.publish')
|
||||||
|
def test_send_no_timeout(self, fake_publish):
|
||||||
|
transport = oslo_messaging.get_transport(self.conf,
|
||||||
|
'kombu+memory:////')
|
||||||
|
with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
|
||||||
|
conn = pool_conn.connection
|
||||||
|
conn._publish(mock.Mock(), 'msg', routing_key='routing_key')
|
||||||
|
fake_publish.assert_called_with('msg', expiration=None)
|
||||||
|
|
||||||
def test_declared_queue_publisher(self):
|
def test_declared_queue_publisher(self):
|
||||||
transport = oslo_messaging.get_transport(self.conf,
|
transport = oslo_messaging.get_transport(self.conf,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user