Fix kombu accept different TTL since version 3.0.25

kombu accept TTL as seconds instead of millisecond since version 3.0.25,
We remove TTL conversion in commit d49ddc3b9828f097d5bb39bca0381386a9de7762,
which is valid only when kombu >=3.0.25, so need do conversion according
to kombu version.

Note: Remove this workaround when all supported branches with
requirement kombu >=3.0.25

Closes-Bug: #1531148
Change-Id: I762265f23084a95f2d24cb434bad7d9556d4edd5
This commit is contained in:
ChangBo Guo(gcb) 2016-02-18 16:12:56 +08:00
parent b80fdc850d
commit 9cfaf50c45
2 changed files with 31 additions and 4 deletions

View File

@ -30,6 +30,8 @@ import kombu.messaging
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import netutils
from oslo_utils import versionutils
import pkg_resources
import six
from six.moves.urllib import parse
@ -1042,6 +1044,18 @@ class Connection(object):
with self._connection_lock:
self.ensure(method, retry=retry, error_callback=_error_callback)
def _get_expiration(self, timeout):
# NOTE(gcb) kombu accept TTL as seconds instead of millisecond since
# version 3.0.25, so do conversion according to kombu version.
# TODO(gcb) remove this workaround when all supported branches
# with requirement kombu >=3.0.25
if timeout is not None:
kombu_version = pkg_resources.get_distribution('kombu').version
if not versionutils.is_compatible('3.0.25', kombu_version):
timeout = int(timeout * 1000)
return timeout
def _publish(self, exchange, msg, routing_key=None, timeout=None):
"""Publish a message."""
producer = kombu.messaging.Producer(exchange=exchange,
@ -1068,7 +1082,7 @@ class Connection(object):
LOG.trace('Connection._publish: sending message %(msg)s to'
' %(who)s with routing key %(key)s', log_info)
with self._transport_socket_timeout(transport_timeout):
producer.publish(msg, expiration=timeout,
producer.publish(msg, expiration=self._get_expiration(timeout),
compression=self.kombu_compression)
# List of notification queue declared on the channel to avoid

View File

@ -24,7 +24,9 @@ import kombu
import kombu.transport.memory
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import versionutils
from oslotest import mockpatch
import pkg_resources
import testscenarios
import oslo_messaging
@ -206,9 +208,20 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
conn = pool_conn.connection
conn._publish(mock.Mock(), 'msg', routing_key='routing_key',
timeout=1)
fake_publish.assert_called_with(
'msg', expiration=1,
compression=self.conf.oslo_messaging_rabbit.kombu_compression)
# NOTE(gcb) kombu accept TTL as seconds instead of millisecond since
# version 3.0.25, so do conversion according to kombu version.
# TODO(gcb) remove this workaround when all supported branches
# with requirement kombu >=3.0.25
kombu_version = pkg_resources.get_distribution('kombu').version
if versionutils.is_compatible('3.0.25', kombu_version):
fake_publish.assert_called_with(
'msg', expiration=1,
compression=self.conf.oslo_messaging_rabbit.kombu_compression)
else:
fake_publish.assert_called_with(
'msg', expiration=1000,
compression=self.conf.oslo_messaging_rabbit.kombu_compression)
@mock.patch('kombu.messaging.Producer.publish')
def test_send_no_timeout(self, fake_publish):