Merge "Implement the transport options"

This commit is contained in:
Zuul 2019-06-26 13:09:53 +00:00 committed by Gerrit Code Review
commit 6cdd4cb007
4 changed files with 35 additions and 14 deletions

View File

@ -49,7 +49,6 @@ from oslo_messaging import exceptions
# NOTE(sileht): don't exists in py2 socket module
TCP_USER_TIMEOUT = 18
rabbit_opts = [
cfg.BoolOpt('ssl',
default=False,
@ -1150,11 +1149,13 @@ class Connection(object):
'transport_options': str(transport_options)}
LOG.trace('Connection._publish: sending message %(msg)s to'
' %(who)s with routing key %(key)s', log_info)
# NOTE(sileht): no need to wait more, caller expects
# a answer before timeout is reached
with self._transport_socket_timeout(timeout):
self._producer.publish(msg,
self._producer.publish(
msg,
mandatory=transport_options.at_least_once if
transport_options else False,
exchange=exchange,
routing_key=routing_key,
expiration=timeout,

View File

@ -199,6 +199,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
'msg', expiration=1,
exchange=exchange_mock,
compression=self.conf.oslo_messaging_rabbit.kombu_compression,
mandatory=False,
routing_key='routing_key')
@mock.patch('kombu.messaging.Producer.publish')
@ -212,6 +213,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
conn._publish(exchange_mock, 'msg', routing_key='routing_key')
fake_publish.assert_called_with(
'msg', expiration=None,
mandatory=False,
compression=self.conf.oslo_messaging_rabbit.kombu_compression,
exchange=exchange_mock,
routing_key='routing_key')

View File

@ -42,14 +42,15 @@ class TestCastCall(test_utils.BaseTestCase):
def test_cast_call(self):
self.config(rpc_response_timeout=None)
transport_options = oslo_messaging.TransportOptions()
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target())
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
transport_options=transport_options)
transport._send = mock.Mock()
msg = dict(method='foo', args=self.args)
kwargs = {'retry': None, 'transport_options': None}
kwargs = {'retry': None, 'transport_options': transport_options}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
@ -57,7 +58,7 @@ class TestCastCall(test_utils.BaseTestCase):
method = client.call if self.call else client.cast
method(self.ctxt, 'foo', **self.args)
self.assertFalse(transport_options.at_least_once)
transport._send.assert_called_once_with(oslo_messaging.Target(),
self.ctxt,
msg,
@ -67,13 +68,18 @@ class TestCastCall(test_utils.BaseTestCase):
self.config(rpc_response_timeout=None)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
transport_options={'my_k': 'my_val'})
transport_options = oslo_messaging.TransportOptions(at_least_once=True)
client = oslo_messaging.RPCClient(
transport,
oslo_messaging.Target(),
transport_options=transport_options)
transport._send = mock.Mock()
msg = dict(method='foo', args=self.args)
kwargs = {'retry': None, 'transport_options': {'my_k': 'my_val'}}
kwargs = {'retry': None,
'transport_options': transport_options}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
@ -82,6 +88,7 @@ class TestCastCall(test_utils.BaseTestCase):
method = client.call if self.call else client.cast
method(self.ctxt, 'foo', **self.args)
self.assertTrue(transport_options.at_least_once)
transport._send.assert_called_once_with(oslo_messaging.Target(),
self.ctxt,
msg,

View File

@ -33,6 +33,7 @@ __all__ = [
'Transport',
'TransportHost',
'TransportURL',
'TransportOptions',
'get_transport',
'set_transport_defaults',
]
@ -277,6 +278,16 @@ class TransportHost(object):
return '<TransportHost ' + values + '>'
class TransportOptions(object):
def __init__(self, at_least_once=False):
self._at_least_once = at_least_once
@property
def at_least_once(self):
return self._at_least_once
class TransportURL(object):
"""A parsed transport URL.