Implement get_rpc_client function

We already expose functions to handle the instantiation
of classes such as RPCServer and RPCTransport but the
same was never done for RPCClient so the API is
inconsistent in its enforcement.

This adds a get_rpc_client function that should be used
instead of instatiating the RPCClient class directly to
be more consistent.

This also allows to handle more logic inside the function
in the future such as if implementations for an async client
is implemented, as investigation in [1] has shown.

[1] https://review.opendev.org/c/openstack/oslo.messaging/+/858936

Change-Id: Ia4d1f0497b9e2728bde02f4ff05fdc175ddffe66
This commit is contained in:
Tobias Urdin 2022-10-22 12:55:34 +02:00 committed by Tobias Urdin
parent e5e70a5d89
commit 4ead7cb2dc
7 changed files with 74 additions and 29 deletions

View File

@ -30,6 +30,7 @@ __all__ = [
'expected_exceptions', 'expected_exceptions',
'get_rpc_transport', 'get_rpc_transport',
'get_rpc_server', 'get_rpc_server',
'get_rpc_client',
'expose' 'expose'
] ]

View File

@ -32,6 +32,7 @@ __all__ = [
'RPCClient', 'RPCClient',
'RPCVersionCapError', 'RPCVersionCapError',
'RemoteError', 'RemoteError',
'get_rpc_client',
] ]
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -263,6 +264,9 @@ class RPCClient(_BaseCallContext):
The RPCClient class is responsible for sending method invocations to and The RPCClient class is responsible for sending method invocations to and
receiving return values from remote RPC servers via a messaging transport. receiving return values from remote RPC servers via a messaging transport.
The class should always be instantiated by using the get_rpc_client
function and not constructing the class directly.
Two RPC patterns are supported: RPC calls and RPC casts. Two RPC patterns are supported: RPC calls and RPC casts.
An RPC cast is used when an RPC method does *not* return a value to An RPC cast is used when an RPC method does *not* return a value to
@ -295,7 +299,7 @@ class RPCClient(_BaseCallContext):
def __init__(self, transport): def __init__(self, transport):
target = messaging.Target(topic='test', version='2.0') target = messaging.Target(topic='test', version='2.0')
self._client = messaging.RPCClient(transport, target) self._client = messaging.get_rpc_client(transport, target)
def test(self, ctxt, arg): def test(self, ctxt, arg):
return self._client.call(ctxt, 'test', arg=arg) return self._client.call(ctxt, 'test', arg=arg)
@ -320,7 +324,7 @@ class RPCClient(_BaseCallContext):
transport = messaging.get_rpc_transport(cfg.CONF) transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic='test', version='2.0') target = messaging.Target(topic='test', version='2.0')
client = messaging.RPCClient(transport, target) client = messaging.get_rpc_client(transport, target)
client.call(ctxt, 'test', arg=arg) client.call(ctxt, 'test', arg=arg)
but this is probably only useful in limited circumstances as a wrapper but this is probably only useful in limited circumstances as a wrapper
@ -334,7 +338,7 @@ class RPCClient(_BaseCallContext):
have the RPC request fail with a MessageDeliveryFailure after the given have the RPC request fail with a MessageDeliveryFailure after the given
number of retries. For example:: number of retries. For example::
client = messaging.RPCClient(transport, target, retry=None) client = messaging.get_rpc_client(transport, target, retry=None)
client.call(ctxt, 'sync') client.call(ctxt, 'sync')
try: try:
client.prepare(retry=0).cast(ctxt, 'ping') client.prepare(retry=0).cast(ctxt, 'ping')
@ -346,9 +350,13 @@ class RPCClient(_BaseCallContext):
def __init__(self, transport, target, def __init__(self, transport, target,
timeout=None, version_cap=None, serializer=None, retry=None, timeout=None, version_cap=None, serializer=None, retry=None,
call_monitor_timeout=None, transport_options=None): call_monitor_timeout=None, transport_options=None,
_manual_load=True):
"""Construct an RPC client. """Construct an RPC client.
This should not be called directly, use the get_rpc_client function
to instantiate this class.
:param transport: a messaging transport handle :param transport: a messaging transport handle
:type transport: Transport :type transport: Transport
:param target: the default target for invocations :param target: the default target for invocations
@ -371,7 +379,17 @@ class RPCClient(_BaseCallContext):
(less than the overall timeout (less than the overall timeout
parameter). parameter).
:type call_monitor_timeout: int :type call_monitor_timeout: int
:param transport_options: Transport options passed to client.
:type transport_options: TransportOptions
:param _manual_load: Internal use only to check if class was
manually instantiated or not.
:type _manual_load: bool
""" """
if _manual_load:
LOG.warning("Using RPCClient manually to instantiate client. "
"Please use get_rpc_client to obtain an RPC client "
"instance.")
if serializer is None: if serializer is None:
serializer = msg_serializer.NoOpSerializer() serializer = msg_serializer.NoOpSerializer()
@ -530,3 +548,16 @@ class RPCClient(_BaseCallContext):
def can_send_version(self, version=_marker): def can_send_version(self, version=_marker):
"""Check to see if a version is compatible with the version cap.""" """Check to see if a version is compatible with the version cap."""
return self.prepare(version=version).can_send_version() return self.prepare(version=version).can_send_version()
def get_rpc_client(transport, target, **kwargs):
"""Construct an RPC client.
:param transport: the messaging transport
:type transport: Transport
:param target: the exchange, topic and server to listen on
:type target: Target
:param **kwargs: The kwargs will be passed down to the
RPCClient constructor
"""
return RPCClient(transport, target, _manual_load=False, **kwargs)

View File

@ -114,7 +114,7 @@ class RpcServerFixture(fixtures.Fixture):
target=self.target, target=self.target,
endpoints=endpoints, endpoints=endpoints,
executor=self.executor) executor=self.executor)
self._ctrl = oslo_messaging.RPCClient(transport.transport, self._ctrl = oslo_messaging.get_rpc_client(transport.transport,
self.ctrl_target) self.ctrl_target)
self._start() self._start()
transport.wait() transport.wait()
@ -230,7 +230,7 @@ class ClientStub(object):
transport_options=None, **kwargs): transport_options=None, **kwargs):
self.name = name or "functional-tests" self.name = name or "functional-tests"
self.cast = cast self.cast = cast
self.client = oslo_messaging.RPCClient( self.client = oslo_messaging.get_rpc_client(
transport=transport, transport=transport,
target=target, target=target,
transport_options=transport_options, transport_options=transport_options,

View File

@ -44,7 +44,8 @@ class TestCastCall(test_utils.BaseTestCase):
self.config(rpc_response_timeout=None) self.config(rpc_response_timeout=None)
transport_options = oslo_messaging.TransportOptions() transport_options = oslo_messaging.TransportOptions()
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), client = oslo_messaging.get_rpc_client(
transport, oslo_messaging.Target(),
transport_options=transport_options) transport_options=transport_options)
transport._send = mock.Mock() transport._send = mock.Mock()
@ -70,7 +71,7 @@ class TestCastCall(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
transport_options = oslo_messaging.TransportOptions(at_least_once=True) transport_options = oslo_messaging.TransportOptions(at_least_once=True)
client = oslo_messaging.RPCClient( client = oslo_messaging.get_rpc_client(
transport, transport,
oslo_messaging.Target(), oslo_messaging.Target(),
transport_options=transport_options) transport_options=transport_options)
@ -215,7 +216,7 @@ class TestCastToTarget(test_utils.BaseTestCase):
expect_target = oslo_messaging.Target(**self.expect) expect_target = oslo_messaging.Target(**self.expect)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, target) client = oslo_messaging.get_rpc_client(transport, target)
transport._send = mock.Mock() transport._send = mock.Mock()
@ -269,8 +270,8 @@ class TestCallTimeout(test_utils.BaseTestCase):
self.config(rpc_response_timeout=self.confval) self.config(rpc_response_timeout=self.confval)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), client = oslo_messaging.get_rpc_client(
timeout=self.ctor, transport, oslo_messaging.Target(), timeout=self.ctor,
call_monitor_timeout=self.cm) call_monitor_timeout=self.cm)
transport._send = mock.Mock() transport._send = mock.Mock()
@ -302,7 +303,8 @@ class TestCallRetry(test_utils.BaseTestCase):
def test_call_retry(self): def test_call_retry(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), client = oslo_messaging.get_rpc_client(
transport, oslo_messaging.Target(),
retry=self.ctor) retry=self.ctor)
transport._send = mock.Mock() transport._send = mock.Mock()
@ -332,8 +334,8 @@ class TestCallFanout(test_utils.BaseTestCase):
def test_call_fanout(self): def test_call_fanout(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, client = oslo_messaging.get_rpc_client(
oslo_messaging.Target(**self.target)) transport, oslo_messaging.Target(**self.target))
if self.prepare is not _notset: if self.prepare is not _notset:
client = client.prepare(**self.prepare) client = client.prepare(**self.prepare)
@ -363,8 +365,8 @@ class TestSerializer(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
serializer = msg_serializer.NoOpSerializer() serializer = msg_serializer.NoOpSerializer()
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), client = oslo_messaging.get_rpc_client(
serializer=serializer) transport, oslo_messaging.Target(), serializer=serializer)
transport._send = mock.Mock() transport._send = mock.Mock()
kwargs = dict(wait_for_reply=True, kwargs = dict(wait_for_reply=True,
@ -465,7 +467,7 @@ class TestVersionCap(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(version=self.version) target = oslo_messaging.Target(version=self.version)
client = oslo_messaging.RPCClient(transport, target, client = oslo_messaging.get_rpc_client(transport, target,
version_cap=self.cap) version_cap=self.cap)
if self.success: if self.success:
@ -574,7 +576,7 @@ class TestCanSendVersion(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(version=self.version) target = oslo_messaging.Target(version=self.version)
client = oslo_messaging.RPCClient(transport, target, client = oslo_messaging.get_rpc_client(transport, target,
version_cap=self.cap) version_cap=self.cap)
prep_kwargs = {} prep_kwargs = {}
@ -598,7 +600,7 @@ class TestCanSendVersion(test_utils.BaseTestCase):
def test_invalid_version_type(self): def test_invalid_version_type(self):
target = oslo_messaging.Target(topic='sometopic') target = oslo_messaging.Target(topic='sometopic')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, target) client = oslo_messaging.get_rpc_client(transport, target)
self.assertRaises(exceptions.MessagingException, self.assertRaises(exceptions.MessagingException,
client.prepare, version='5') client.prepare, version='5')
self.assertRaises(exceptions.MessagingException, self.assertRaises(exceptions.MessagingException,
@ -612,7 +614,7 @@ class TestTransportWarning(test_utils.BaseTestCase):
@mock.patch('oslo_messaging.rpc.client.LOG') @mock.patch('oslo_messaging.rpc.client.LOG')
def test_warning_when_notifier_transport(self, log): def test_warning_when_notifier_transport(self, log):
transport = oslo_messaging.get_notification_transport(self.conf) transport = oslo_messaging.get_notification_transport(self.conf)
oslo_messaging.RPCClient(transport, oslo_messaging.Target()) oslo_messaging.get_rpc_client(transport, oslo_messaging.Target())
log.warning.assert_called_once_with( log.warning.assert_called_once_with(
"Using notification transport for RPC. Please use " "Using notification transport for RPC. Please use "
"get_rpc_transport to obtain an RPC transport " "get_rpc_transport to obtain an RPC transport "

View File

@ -102,7 +102,7 @@ class ServerSetupMixin(object):
def _setup_client(self, transport, topic='testtopic', exchange=None): def _setup_client(self, transport, topic='testtopic', exchange=None):
target = oslo_messaging.Target(topic=topic, exchange=exchange) target = oslo_messaging.Target(topic=topic, exchange=exchange)
return oslo_messaging.RPCClient(transport, target=target, return oslo_messaging.get_rpc_client(transport, target=target,
serializer=self.serializer) serializer=self.serializer)

View File

@ -0,0 +1,11 @@
---
features:
- |
Added new ``get_rpc_client`` function to instantiate the RPCClient
class
deprecations:
- |
Instantiating the RPCClient class directly is deprecated in favor
of using the new ``get_rpc_client`` function to expose a more
common API similar to existing functions such as ``get_rpc_server``
and ``get_rpc_transport``

View File

@ -410,7 +410,7 @@ class RPCClient(Client):
def __init__(self, client_id, transport, target, timeout, is_cast, def __init__(self, client_id, transport, target, timeout, is_cast,
wait_after_msg, sync_mode=False): wait_after_msg, sync_mode=False):
client = rpc.RPCClient(transport, target) client = rpc.get_rpc_client(transport, target)
method = _rpc_cast if is_cast else _rpc_call method = _rpc_cast if is_cast else _rpc_call
super(RPCClient, self).__init__(client_id, super(RPCClient, self).__init__(client_id,