Add get_rpc_transport call

The get_rpc_transport wraps get_transport to unify the API in
anticipation of comprehensive separation of RPC and Notification
messaging backends.

Related-Bug: 1680192
Change-Id: Ic6af07b98ff43806c2af38a3ba129991f1e0ec86
This commit is contained in:
Andrew Smith 2017-04-06 09:47:34 -04:00
parent e569c92cd9
commit ec4d6639bc
10 changed files with 144 additions and 54 deletions

View File

@ -4,8 +4,6 @@ Transport
.. currentmodule:: oslo_messaging .. currentmodule:: oslo_messaging
.. autofunction:: get_transport
.. autoclass:: Transport .. autoclass:: Transport
.. autoclass:: TransportURL .. autoclass:: TransportURL

View File

@ -171,8 +171,8 @@ def get_notification_transport(conf, url=None,
group='oslo_messaging_notifications') group='oslo_messaging_notifications')
if url is None: if url is None:
url = conf.oslo_messaging_notifications.transport_url url = conf.oslo_messaging_notifications.transport_url
return msg_transport.get_transport(conf, url, return msg_transport._get_transport(conf, url,
allowed_remote_exmods, aliases) allowed_remote_exmods, aliases)
class Notifier(object): class Notifier(object):

View File

@ -28,10 +28,12 @@ __all__ = [
'RemoteError', 'RemoteError',
'UnsupportedVersion', 'UnsupportedVersion',
'expected_exceptions', 'expected_exceptions',
'get_rpc_transport',
'get_rpc_server', 'get_rpc_server',
'expose' 'expose'
] ]
from .client import * from .client import *
from .dispatcher import * from .dispatcher import *
from .transport import *
from .server import * from .server import *

View File

@ -282,7 +282,7 @@ class RPCClient(_BaseCallContext):
However, this class can be used directly without wrapping it another class. However, this class can be used directly without wrapping it another class.
For example:: For example::
transport = messaging.get_transport(cfg.CONF) transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic='test', version='2.0') target = messaging.Target(topic='test', version='2.0')
client = messaging.RPCClient(transport, target) client = messaging.RPCClient(transport, target)
client.call(ctxt, 'test', arg=arg) client.call(ctxt, 'test', arg=arg)
@ -440,12 +440,12 @@ class RPCClient(_BaseCallContext):
method are handled are quite subtle. method are handled are quite subtle.
Firstly, if the remote exception is contained in one of the modules Firstly, if the remote exception is contained in one of the modules
listed in the allow_remote_exmods messaging.get_transport() parameter, listed in the allow_remote_exmods messaging.get_rpc_transport()
then it this exception will be re-raised by call(). However, such parameter, then it this exception will be re-raised by call(). However,
locally re-raised remote exceptions are distinguishable from the same such locally re-raised remote exceptions are distinguishable from the
exception type raised locally because re-raised remote exceptions are same exception type raised locally because re-raised remote exceptions
modified such that their class name ends with the '_Remote' suffix so are modified such that their class name ends with the '_Remote' suffix
you may do:: so you may do::
if ex.__class__.__name__.endswith('_Remote'): if ex.__class__.__name__.endswith('_Remote'):
# Some special case for locally re-raised remote exceptions # Some special case for locally re-raised remote exceptions

View File

@ -20,12 +20,12 @@ methods which may be invoked remotely by clients over a given transport.
To create an RPC server, you supply a transport, target and a list of To create an RPC server, you supply a transport, target and a list of
endpoints. endpoints.
A transport can be obtained simply by calling the get_transport() method:: A transport can be obtained simply by calling the get_rpc_transport() method::
transport = messaging.get_transport(conf) transport = messaging.get_rpc_transport(conf)
which will load the appropriate transport driver according to the user's which will load the appropriate transport driver according to the user's
messaging configuration. See get_transport() for more details. messaging configuration. See get_rpc_transport() for more details.
The target supplied when creating an RPC server expresses the topic, server The target supplied when creating an RPC server expresses the topic, server
name and - optionally - the exchange to listen on. See Target for more details name and - optionally - the exchange to listen on. See Target for more details
@ -98,7 +98,7 @@ A simple example of an RPC server with multiple endpoints might be::
def test(self, ctx, arg): def test(self, ctx, arg):
return arg return arg
transport = oslo_messaging.get_transport(cfg.CONF) transport = oslo_messaging.get_rpc_transport(cfg.CONF)
target = oslo_messaging.Target(topic='test', server='server1') target = oslo_messaging.Target(topic='test', server='server1')
endpoints = [ endpoints = [
ServerControlEndpoint(None), ServerControlEndpoint(None),

View File

@ -0,0 +1,47 @@
# Copyright 2017 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
'get_rpc_transport'
]
from oslo_messaging import transport as msg_transport
def get_rpc_transport(conf, url=None,
allowed_remote_exmods=None):
"""A factory method for Transport objects for RPCs.
This method should be used to ensure the correct messaging functionality
for RPCs. RPCs and Notifications may use separate messaging systems
that utilize different drivers, different access permissions,
message delivery, etc.
Presently, this function works exactly the same as get_transport. It's
use is recommended as disambiguates the intended use for the transport
and may in the future extend functionality related to the separation of
messaging backends.
:param conf: the user configuration
:type conf: cfg.ConfigOpts
:param url: a transport URL
:type url: str or TransportURL
:param allowed_remote_exmods: a list of modules which a client using this
transport will deserialize remote exceptions
from
:type allowed_remote_exmods: list
"""
return msg_transport._get_transport(conf, url,
allowed_remote_exmods)

View File

@ -58,7 +58,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow') @mock.patch('oslo_utils.timeutils.utcnow')
def test_logger(self, mock_utcnow): def test_logger(self, mock_utcnow):
with mock.patch('oslo_messaging.transport.get_transport', with mock.patch('oslo_messaging.transport._get_transport',
return_value=test_notifier._FakeTransport(self.conf)): return_value=test_notifier._FakeTransport(self.conf)):
self.logger = oslo_messaging.LoggingNotificationHandler('test://') self.logger = oslo_messaging.LoggingNotificationHandler('test://')
@ -102,7 +102,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow') @mock.patch('oslo_utils.timeutils.utcnow')
def test_logging_conf(self, mock_utcnow): def test_logging_conf(self, mock_utcnow):
with mock.patch('oslo_messaging.transport.get_transport', with mock.patch('oslo_messaging.transport._get_transport',
return_value=test_notifier._FakeTransport(self.conf)): return_value=test_notifier._FakeTransport(self.conf)):
logging.config.dictConfig({ logging.config.dictConfig({
'version': 1, 'version': 1,

View File

@ -113,7 +113,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts()) super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
def test_constructor(self): def test_constructor(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar') target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()] endpoints = [object()]
serializer = object() serializer = object()
@ -135,7 +135,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual('blocking', server.executor_type) self.assertEqual('blocking', server.executor_type)
def test_constructor_without_explicit_RPCAccessPolicy(self): def test_constructor_without_explicit_RPCAccessPolicy(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar') target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()] endpoints = [object()]
serializer = object() serializer = object()
@ -148,7 +148,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual(FutureWarning, w.category) self.assertEqual(FutureWarning, w.category)
def test_server_wait_method(self): def test_server_wait_method(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar') target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()] endpoints = [object()]
serializer = object() serializer = object()
@ -180,7 +180,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual(1, listener.cleanup.call_count) self.assertEqual(1, listener.cleanup.call_count)
def test_no_target_server(self): def test_no_target_server(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
server = oslo_messaging.get_rpc_server( server = oslo_messaging.get_rpc_server(
transport, transport,
@ -195,7 +195,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertTrue(False) self.assertTrue(False)
def test_no_server_topic(self): def test_no_server_topic(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(server='testserver') target = oslo_messaging.Target(server='testserver')
server = oslo_messaging.get_rpc_server(transport, target, []) server = oslo_messaging.get_rpc_server(transport, target, [])
try: try:
@ -207,7 +207,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertTrue(False) self.assertTrue(False)
def _test_no_client_topic(self, call=True): def _test_no_client_topic(self, call=True):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = self._setup_client(transport, topic=None) client = self._setup_client(transport, topic=None)
@ -228,7 +228,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._test_no_client_topic(call=False) self._test_no_client_topic(call=False)
def test_client_call_timeout(self): def test_client_call_timeout(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
finished = False finished = False
wait = threading.Condition() wait = threading.Condition()
@ -256,7 +256,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread) self._stop_server(client, server_thread)
def test_unknown_executor(self): def test_unknown_executor(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
try: try:
oslo_messaging.get_rpc_server(transport, None, [], executor='foo') oslo_messaging.get_rpc_server(transport, None, [], executor='foo')
@ -267,7 +267,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertTrue(False) self.assertTrue(False)
def test_cast(self): def test_cast(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object): class TestEndpoint(object):
def __init__(self): def __init__(self):
@ -288,7 +288,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual(['dsfoo', 'dsbar'], endpoint.pings) self.assertEqual(['dsfoo', 'dsbar'], endpoint.pings)
def test_call(self): def test_call(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object): class TestEndpoint(object):
def ping(self, ctxt, arg): def ping(self, ctxt, arg):
@ -307,7 +307,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread) self._stop_server(client, server_thread)
def test_direct_call(self): def test_direct_call(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object): class TestEndpoint(object):
def ping(self, ctxt, arg): def ping(self, ctxt, arg):
@ -327,7 +327,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread) self._stop_server(client, server_thread)
def test_context(self): def test_context(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object): class TestEndpoint(object):
def ctxt_check(self, ctxt, key): def ctxt_check(self, ctxt, key):
@ -344,7 +344,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread) self._stop_server(client, server_thread)
def test_failure(self): def test_failure(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object): class TestEndpoint(object):
def ping(self, ctxt, arg): def ping(self, ctxt, arg):
@ -384,7 +384,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread) self._stop_server(client, server_thread)
def test_expected_failure(self): def test_expected_failure(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
debugs = [] debugs = []
errors = [] errors = []
@ -529,9 +529,9 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
url1 = 'fake:///' + (self.exchange1 or '') url1 = 'fake:///' + (self.exchange1 or '')
url2 = 'fake:///' + (self.exchange2 or '') url2 = 'fake:///' + (self.exchange2 or '')
transport1 = oslo_messaging.get_transport(self.conf, url=url1) transport1 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
if url1 != url2: if url1 != url2:
transport2 = oslo_messaging.get_transport(self.conf, url=url1) transport2 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
else: else:
transport2 = transport1 transport2 = transport1

View File

@ -77,8 +77,33 @@ class Transport(object):
This is a mostly opaque handle for an underlying messaging transport This is a mostly opaque handle for an underlying messaging transport
driver. driver.
It has a single 'conf' property which is the cfg.ConfigOpts instance used RPCs and Notifications may use separate messaging systems that utilize
to construct the transport object. different drivers, access permissions, message delivery, etc. To ensure
the correct messaging functionality, the corresponding method should be
used to construct a Transport object from transport configuration
gleaned from the user's configuration and, optionally, a transport URL.
The factory method for RPC Transport objects::
def get_rpc_transport(conf, url=None,
allowed_remote_exmods=None)
If a transport URL is supplied as a parameter, any transport configuration
contained in it takes precedence. If no transport URL is supplied, but
there is a transport URL supplied in the user's configuration then that
URL will take the place of the URL parameter.
The factory method for Notification Transport objects::
def get_notification_transport(conf, url=None,
allowed_remote_exmods=None)
If no transport URL is provided, the URL in the notifications section of
the config file will be used. If that URL is also absent, the same
transport as specified in the user's default section will be used.
The Transport has a single 'conf' property which is the cfg.ConfigOpts
instance used to construct the transport object.
""" """
def __init__(self, driver): def __init__(self, driver):
@ -146,6 +171,31 @@ class DriverLoadFailure(exceptions.MessagingException):
self.ex = ex self.ex = ex
def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
allowed_remote_exmods = allowed_remote_exmods or []
conf.register_opts(_transport_opts)
if not isinstance(url, TransportURL):
url = TransportURL.parse(conf, url, aliases)
kwargs = dict(default_exchange=conf.control_exchange,
allowed_remote_exmods=allowed_remote_exmods)
try:
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport.split('+')[0],
invoke_on_load=True,
invoke_args=[conf, url],
invoke_kwds=kwargs)
except RuntimeError as ex:
raise DriverLoadFailure(url.transport, ex)
return Transport(mgr.driver)
@removals.remove(
message='use get_rpc_transport or get_notification_transport'
)
@removals.removed_kwarg('aliases', @removals.removed_kwarg('aliases',
'Parameter aliases is deprecated for removal.') 'Parameter aliases is deprecated for removal.')
def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None): def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
@ -178,25 +228,8 @@ def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
:param aliases: DEPRECATED: A map of transport alias to transport name :param aliases: DEPRECATED: A map of transport alias to transport name
:type aliases: dict :type aliases: dict
""" """
allowed_remote_exmods = allowed_remote_exmods or [] return _get_transport(conf, url,
conf.register_opts(_transport_opts) allowed_remote_exmods, aliases)
if not isinstance(url, TransportURL):
url = TransportURL.parse(conf, url, aliases)
kwargs = dict(default_exchange=conf.control_exchange,
allowed_remote_exmods=allowed_remote_exmods)
try:
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport.split('+')[0],
invoke_on_load=True,
invoke_args=[conf, url],
invoke_kwds=kwargs)
except RuntimeError as ex:
raise DriverLoadFailure(url.transport, ex)
return Transport(mgr.driver)
class TransportHost(object): class TransportHost(object):

View File

@ -0,0 +1,10 @@
---
features:
- |
Add get_rpc_transport call to make the API clear for the separation
of RPC and Notification messaging backends.
deprecations:
- |
Deprecate get_transport and use get_rpc_transport or
get_notification_transport to make the API usage clear for the
separation of RPC and Notification messaging backends.