Implement the transport options
With this feature, it is possible to specialize the parameters to send. `options = oslo_messaging.TransportOptions(at_least_once=True)` TransportOptions is used in every single driver, for example in RabbitMQ driver is used to handle the mandatory flag. Notes: - The idea of creating a new class TransportOptions is because I'd like to have an abstract class not related only to the RPCClient - at_least_once is the first parameter, when needed we can add the others. Implements: blueprint transport-options (second point) The blueprint link is [1] To test it you can use [2] 1- https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options 2- https://github.com/Gsantomaggio/rabbitmq-utils/ tree/master/openstack/mandatory_test Change-Id: I1858e4a990507d3c2bac2ef7fbef75d8c2dbfce2
This commit is contained in:
parent
03ec779cdf
commit
e804874c50
@ -49,7 +49,6 @@ from oslo_messaging import exceptions
|
||||
# NOTE(sileht): don't exists in py2 socket module
|
||||
TCP_USER_TIMEOUT = 18
|
||||
|
||||
|
||||
rabbit_opts = [
|
||||
cfg.BoolOpt('ssl',
|
||||
default=False,
|
||||
@ -1150,15 +1149,17 @@ class Connection(object):
|
||||
'transport_options': str(transport_options)}
|
||||
LOG.trace('Connection._publish: sending message %(msg)s to'
|
||||
' %(who)s with routing key %(key)s', log_info)
|
||||
|
||||
# NOTE(sileht): no need to wait more, caller expects
|
||||
# a answer before timeout is reached
|
||||
with self._transport_socket_timeout(timeout):
|
||||
self._producer.publish(msg,
|
||||
exchange=exchange,
|
||||
routing_key=routing_key,
|
||||
expiration=timeout,
|
||||
compression=self.kombu_compression)
|
||||
self._producer.publish(
|
||||
msg,
|
||||
mandatory=transport_options.at_least_once if
|
||||
transport_options else False,
|
||||
exchange=exchange,
|
||||
routing_key=routing_key,
|
||||
expiration=timeout,
|
||||
compression=self.kombu_compression)
|
||||
|
||||
def _publish_and_creates_default_queue(self, exchange, msg,
|
||||
routing_key=None, timeout=None,
|
||||
|
@ -199,6 +199,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
|
||||
'msg', expiration=1,
|
||||
exchange=exchange_mock,
|
||||
compression=self.conf.oslo_messaging_rabbit.kombu_compression,
|
||||
mandatory=False,
|
||||
routing_key='routing_key')
|
||||
|
||||
@mock.patch('kombu.messaging.Producer.publish')
|
||||
@ -212,6 +213,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
|
||||
conn._publish(exchange_mock, 'msg', routing_key='routing_key')
|
||||
fake_publish.assert_called_with(
|
||||
'msg', expiration=None,
|
||||
mandatory=False,
|
||||
compression=self.conf.oslo_messaging_rabbit.kombu_compression,
|
||||
exchange=exchange_mock,
|
||||
routing_key='routing_key')
|
||||
|
@ -42,14 +42,15 @@ class TestCastCall(test_utils.BaseTestCase):
|
||||
|
||||
def test_cast_call(self):
|
||||
self.config(rpc_response_timeout=None)
|
||||
|
||||
transport_options = oslo_messaging.TransportOptions()
|
||||
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target())
|
||||
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
|
||||
transport_options=transport_options)
|
||||
|
||||
transport._send = mock.Mock()
|
||||
|
||||
msg = dict(method='foo', args=self.args)
|
||||
kwargs = {'retry': None, 'transport_options': None}
|
||||
kwargs = {'retry': None, 'transport_options': transport_options}
|
||||
if self.call:
|
||||
kwargs['wait_for_reply'] = True
|
||||
kwargs['timeout'] = None
|
||||
@ -57,7 +58,7 @@ class TestCastCall(test_utils.BaseTestCase):
|
||||
|
||||
method = client.call if self.call else client.cast
|
||||
method(self.ctxt, 'foo', **self.args)
|
||||
|
||||
self.assertFalse(transport_options.at_least_once)
|
||||
transport._send.assert_called_once_with(oslo_messaging.Target(),
|
||||
self.ctxt,
|
||||
msg,
|
||||
@ -67,13 +68,18 @@ class TestCastCall(test_utils.BaseTestCase):
|
||||
self.config(rpc_response_timeout=None)
|
||||
|
||||
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
|
||||
transport_options={'my_k': 'my_val'})
|
||||
|
||||
transport_options = oslo_messaging.TransportOptions(at_least_once=True)
|
||||
client = oslo_messaging.RPCClient(
|
||||
transport,
|
||||
oslo_messaging.Target(),
|
||||
transport_options=transport_options)
|
||||
|
||||
transport._send = mock.Mock()
|
||||
|
||||
msg = dict(method='foo', args=self.args)
|
||||
kwargs = {'retry': None, 'transport_options': {'my_k': 'my_val'}}
|
||||
kwargs = {'retry': None,
|
||||
'transport_options': transport_options}
|
||||
if self.call:
|
||||
kwargs['wait_for_reply'] = True
|
||||
kwargs['timeout'] = None
|
||||
@ -82,6 +88,7 @@ class TestCastCall(test_utils.BaseTestCase):
|
||||
method = client.call if self.call else client.cast
|
||||
method(self.ctxt, 'foo', **self.args)
|
||||
|
||||
self.assertTrue(transport_options.at_least_once)
|
||||
transport._send.assert_called_once_with(oslo_messaging.Target(),
|
||||
self.ctxt,
|
||||
msg,
|
||||
|
@ -33,6 +33,7 @@ __all__ = [
|
||||
'Transport',
|
||||
'TransportHost',
|
||||
'TransportURL',
|
||||
'TransportOptions',
|
||||
'get_transport',
|
||||
'set_transport_defaults',
|
||||
]
|
||||
@ -277,6 +278,16 @@ class TransportHost(object):
|
||||
return '<TransportHost ' + values + '>'
|
||||
|
||||
|
||||
class TransportOptions(object):
|
||||
|
||||
def __init__(self, at_least_once=False):
|
||||
self._at_least_once = at_least_once
|
||||
|
||||
@property
|
||||
def at_least_once(self):
|
||||
return self._at_least_once
|
||||
|
||||
|
||||
class TransportURL(object):
|
||||
|
||||
"""A parsed transport URL.
|
||||
|
Loading…
x
Reference in New Issue
Block a user