Warn when wrong transport instance is used
Since RPC and notifications can have different backends, it is useful to warn users if they use a notification transport in RPC and vice versa. This patch introduces RPCTransport and NotificationTransport subclasses of Transport, so it's easier to add different behavior for them if need be. Related-Bug: #1680192 Change-Id: Iab60544d69053c8e74c28a2d5c84665be749013f
This commit is contained in:
parent
338b85eb4e
commit
03b6f18f80
@ -132,6 +132,7 @@ import logging
|
|||||||
from oslo_messaging._i18n import _LE
|
from oslo_messaging._i18n import _LE
|
||||||
from oslo_messaging.notify import dispatcher as notify_dispatcher
|
from oslo_messaging.notify import dispatcher as notify_dispatcher
|
||||||
from oslo_messaging import server as msg_server
|
from oslo_messaging import server as msg_server
|
||||||
|
from oslo_messaging import transport as msg_transport
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -163,6 +164,11 @@ class NotificationServerBase(msg_server.MessageHandlingServer):
|
|||||||
class NotificationServer(NotificationServerBase):
|
class NotificationServer(NotificationServerBase):
|
||||||
def __init__(self, transport, targets, dispatcher, executor='blocking',
|
def __init__(self, transport, targets, dispatcher, executor='blocking',
|
||||||
allow_requeue=True, pool=None):
|
allow_requeue=True, pool=None):
|
||||||
|
if not isinstance(transport, msg_transport.NotificationTransport):
|
||||||
|
LOG.warning("Using RPC transport for notifications. Please use "
|
||||||
|
"get_notification_transport to obtain a "
|
||||||
|
"notification transport instance.")
|
||||||
|
|
||||||
super(NotificationServer, self).__init__(
|
super(NotificationServer, self).__init__(
|
||||||
transport, targets, dispatcher, executor, allow_requeue, pool, 1,
|
transport, targets, dispatcher, executor, allow_requeue, pool, 1,
|
||||||
None
|
None
|
||||||
|
@ -171,8 +171,9 @@ def get_notification_transport(conf, url=None,
|
|||||||
group='oslo_messaging_notifications')
|
group='oslo_messaging_notifications')
|
||||||
if url is None:
|
if url is None:
|
||||||
url = conf.oslo_messaging_notifications.transport_url
|
url = conf.oslo_messaging_notifications.transport_url
|
||||||
return msg_transport._get_transport(conf, url,
|
return msg_transport._get_transport(
|
||||||
allowed_remote_exmods, aliases)
|
conf, url, allowed_remote_exmods, aliases,
|
||||||
|
transport_cls=msg_transport.NotificationTransport)
|
||||||
|
|
||||||
|
|
||||||
class Notifier(object):
|
class Notifier(object):
|
||||||
@ -245,6 +246,10 @@ class Notifier(object):
|
|||||||
conf.register_opts(_notifier_opts,
|
conf.register_opts(_notifier_opts,
|
||||||
group='oslo_messaging_notifications')
|
group='oslo_messaging_notifications')
|
||||||
|
|
||||||
|
if not isinstance(transport, msg_transport.NotificationTransport):
|
||||||
|
_LOG.warning("Using RPC transport for notifications. Please use "
|
||||||
|
"get_notification_transport to obtain a "
|
||||||
|
"notification transport instance.")
|
||||||
self.transport = transport
|
self.transport = transport
|
||||||
self.publisher_id = publisher_id
|
self.publisher_id = publisher_id
|
||||||
if retry is not None:
|
if retry is not None:
|
||||||
|
@ -24,6 +24,7 @@ __all__ = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
|
import logging
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
import six
|
import six
|
||||||
@ -32,6 +33,10 @@ from oslo_messaging._drivers import base as driver_base
|
|||||||
from oslo_messaging import _utils as utils
|
from oslo_messaging import _utils as utils
|
||||||
from oslo_messaging import exceptions
|
from oslo_messaging import exceptions
|
||||||
from oslo_messaging import serializer as msg_serializer
|
from oslo_messaging import serializer as msg_serializer
|
||||||
|
from oslo_messaging import transport as msg_transport
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
_client_opts = [
|
_client_opts = [
|
||||||
cfg.IntOpt('rpc_response_timeout',
|
cfg.IntOpt('rpc_response_timeout',
|
||||||
@ -331,6 +336,11 @@ class RPCClient(_BaseCallContext):
|
|||||||
if serializer is None:
|
if serializer is None:
|
||||||
serializer = msg_serializer.NoOpSerializer()
|
serializer = msg_serializer.NoOpSerializer()
|
||||||
|
|
||||||
|
if not isinstance(transport, msg_transport.RPCTransport):
|
||||||
|
LOG.warning("Using notification transport for RPC. Please use "
|
||||||
|
"get_rpc_transport to obtain an RPC transport "
|
||||||
|
"instance.")
|
||||||
|
|
||||||
super(RPCClient, self).__init__(
|
super(RPCClient, self).__init__(
|
||||||
transport, target, serializer, timeout, version_cap, retry
|
transport, target, serializer, timeout, version_cap, retry
|
||||||
)
|
)
|
||||||
|
@ -135,6 +135,7 @@ from debtcollector.updating import updated_kwarg_default_value
|
|||||||
from oslo_messaging._i18n import _LE
|
from oslo_messaging._i18n import _LE
|
||||||
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
|
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
|
||||||
from oslo_messaging import server as msg_server
|
from oslo_messaging import server as msg_server
|
||||||
|
from oslo_messaging import transport as msg_transport
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -142,6 +143,10 @@ LOG = logging.getLogger(__name__)
|
|||||||
class RPCServer(msg_server.MessageHandlingServer):
|
class RPCServer(msg_server.MessageHandlingServer):
|
||||||
def __init__(self, transport, target, dispatcher, executor='blocking'):
|
def __init__(self, transport, target, dispatcher, executor='blocking'):
|
||||||
super(RPCServer, self).__init__(transport, dispatcher, executor)
|
super(RPCServer, self).__init__(transport, dispatcher, executor)
|
||||||
|
if not isinstance(transport, msg_transport.RPCTransport):
|
||||||
|
LOG.warning("Using notification transport for RPC. Please use "
|
||||||
|
"get_rpc_transport to obtain an RPC transport "
|
||||||
|
"instance.")
|
||||||
self._target = target
|
self._target = target
|
||||||
|
|
||||||
def _create_listener(self):
|
def _create_listener(self):
|
||||||
|
@ -43,5 +43,6 @@ def get_rpc_transport(conf, url=None,
|
|||||||
from
|
from
|
||||||
:type allowed_remote_exmods: list
|
:type allowed_remote_exmods: list
|
||||||
"""
|
"""
|
||||||
return msg_transport._get_transport(conf, url,
|
return msg_transport._get_transport(
|
||||||
allowed_remote_exmods)
|
conf, url, allowed_remote_exmods,
|
||||||
|
transport_cls=msg_transport.RPCTransport)
|
||||||
|
@ -22,7 +22,6 @@ import oslo_messaging
|
|||||||
from oslo_messaging.notify import dispatcher
|
from oslo_messaging.notify import dispatcher
|
||||||
from oslo_messaging.notify import notifier as msg_notifier
|
from oslo_messaging.notify import notifier as msg_notifier
|
||||||
from oslo_messaging.tests import utils as test_utils
|
from oslo_messaging.tests import utils as test_utils
|
||||||
import six
|
|
||||||
from six.moves import mock
|
from six.moves import mock
|
||||||
|
|
||||||
load_tests = testscenarios.load_tests_apply_scenarios
|
load_tests = testscenarios.load_tests_apply_scenarios
|
||||||
@ -183,7 +182,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
self.assertTrue(False)
|
self.assertTrue(False)
|
||||||
|
|
||||||
def test_batch_timeout(self):
|
def test_batch_timeout(self):
|
||||||
transport = oslo_messaging.get_transport(self.conf, url='fake:')
|
transport = oslo_messaging.get_notification_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
|
|
||||||
endpoint = mock.Mock()
|
endpoint = mock.Mock()
|
||||||
endpoint.info.return_value = None
|
endpoint.info.return_value = None
|
||||||
@ -191,7 +191,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
batch=(5, 1))
|
batch=(5, 1))
|
||||||
|
|
||||||
notifier = self._setup_notifier(transport)
|
notifier = self._setup_notifier(transport)
|
||||||
for i in six.moves.range(12):
|
for _ in range(12):
|
||||||
notifier.info({}, 'an_event.start', 'test message')
|
notifier.info({}, 'an_event.start', 'test message')
|
||||||
|
|
||||||
self.wait_for_messages(3)
|
self.wait_for_messages(3)
|
||||||
@ -209,7 +209,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
mock.call(messages * 2)])
|
mock.call(messages * 2)])
|
||||||
|
|
||||||
def test_batch_size(self):
|
def test_batch_size(self):
|
||||||
transport = oslo_messaging.get_transport(self.conf, url='fake:')
|
transport = oslo_messaging.get_notification_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
|
|
||||||
endpoint = mock.Mock()
|
endpoint = mock.Mock()
|
||||||
endpoint.info.return_value = None
|
endpoint.info.return_value = None
|
||||||
@ -217,7 +218,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
batch=(5, None))
|
batch=(5, None))
|
||||||
|
|
||||||
notifier = self._setup_notifier(transport)
|
notifier = self._setup_notifier(transport)
|
||||||
for i in six.moves.range(10):
|
for _ in range(10):
|
||||||
notifier.info({}, 'an_event.start', 'test message')
|
notifier.info({}, 'an_event.start', 'test message')
|
||||||
|
|
||||||
self.wait_for_messages(2)
|
self.wait_for_messages(2)
|
||||||
@ -234,7 +235,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
mock.call(messages * 5)])
|
mock.call(messages * 5)])
|
||||||
|
|
||||||
def test_batch_size_exception_path(self):
|
def test_batch_size_exception_path(self):
|
||||||
transport = oslo_messaging.get_transport(self.conf, url='fake:')
|
transport = oslo_messaging.get_notification_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
|
|
||||||
endpoint = mock.Mock()
|
endpoint = mock.Mock()
|
||||||
endpoint.info.side_effect = [None, Exception('boom!')]
|
endpoint.info.side_effect = [None, Exception('boom!')]
|
||||||
@ -242,7 +244,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
batch=(5, None))
|
batch=(5, None))
|
||||||
|
|
||||||
notifier = self._setup_notifier(transport)
|
notifier = self._setup_notifier(transport)
|
||||||
for i in six.moves.range(10):
|
for _ in range(10):
|
||||||
notifier.info({}, 'an_event.start', 'test message')
|
notifier.info({}, 'an_event.start', 'test message')
|
||||||
|
|
||||||
self.wait_for_messages(2)
|
self.wait_for_messages(2)
|
||||||
@ -506,3 +508,18 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
for call in mocked_endpoint1_calls:
|
for call in mocked_endpoint1_calls:
|
||||||
self.assertIn(call, endpoint2.info.mock_calls +
|
self.assertIn(call, endpoint2.info.mock_calls +
|
||||||
endpoint3.info.mock_calls)
|
endpoint3.info.mock_calls)
|
||||||
|
|
||||||
|
|
||||||
|
class TestListenerTransportWarning(test_utils.BaseTestCase):
|
||||||
|
|
||||||
|
@mock.patch('oslo_messaging.notify.listener.LOG')
|
||||||
|
def test_warning_when_rpc_transport(self, log):
|
||||||
|
transport = oslo_messaging.get_rpc_transport(self.conf)
|
||||||
|
target = oslo_messaging.Target(topic='foo')
|
||||||
|
endpoints = [object()]
|
||||||
|
oslo_messaging.get_notification_listener(
|
||||||
|
transport, [target], endpoints)
|
||||||
|
log.warning.assert_called_once_with(
|
||||||
|
"Using RPC transport for notifications. Please use "
|
||||||
|
"get_notification_transport to obtain a "
|
||||||
|
"notification transport instance.")
|
||||||
|
@ -16,7 +16,6 @@ import fixtures
|
|||||||
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_messaging.notify import log_handler
|
from oslo_messaging.notify import log_handler
|
||||||
from oslo_messaging.tests.notify import test_notifier
|
|
||||||
from oslo_messaging.tests import utils as test_utils
|
from oslo_messaging.tests import utils as test_utils
|
||||||
from six.moves import mock
|
from six.moves import mock
|
||||||
|
|
||||||
@ -34,7 +33,7 @@ class PublishErrorsHandlerTestCase(test_utils.BaseTestCase):
|
|||||||
group='oslo_messaging_notifications')
|
group='oslo_messaging_notifications')
|
||||||
self.stub_flg = True
|
self.stub_flg = True
|
||||||
|
|
||||||
transport = test_notifier._FakeTransport(self.conf)
|
transport = oslo_messaging.get_notification_transport(self.conf)
|
||||||
notifier = oslo_messaging.Notifier(transport)
|
notifier = oslo_messaging.Notifier(transport)
|
||||||
|
|
||||||
def fake_notifier(*args, **kwargs):
|
def fake_notifier(*args, **kwargs):
|
||||||
|
@ -22,7 +22,6 @@ from oslo_utils import timeutils
|
|||||||
import testscenarios
|
import testscenarios
|
||||||
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_messaging.tests.notify import test_notifier
|
|
||||||
from oslo_messaging.tests import utils as test_utils
|
from oslo_messaging.tests import utils as test_utils
|
||||||
from six.moves import mock
|
from six.moves import mock
|
||||||
|
|
||||||
@ -58,8 +57,9 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
@mock.patch('oslo_utils.timeutils.utcnow')
|
@mock.patch('oslo_utils.timeutils.utcnow')
|
||||||
def test_logger(self, mock_utcnow):
|
def test_logger(self, mock_utcnow):
|
||||||
|
fake_transport = oslo_messaging.get_notification_transport(self.conf)
|
||||||
with mock.patch('oslo_messaging.transport._get_transport',
|
with mock.patch('oslo_messaging.transport._get_transport',
|
||||||
return_value=test_notifier._FakeTransport(self.conf)):
|
return_value=fake_transport):
|
||||||
self.logger = oslo_messaging.LoggingNotificationHandler('test://')
|
self.logger = oslo_messaging.LoggingNotificationHandler('test://')
|
||||||
|
|
||||||
mock_utcnow.return_value = datetime.datetime.utcnow()
|
mock_utcnow.return_value = datetime.datetime.utcnow()
|
||||||
@ -102,8 +102,9 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
@mock.patch('oslo_utils.timeutils.utcnow')
|
@mock.patch('oslo_utils.timeutils.utcnow')
|
||||||
def test_logging_conf(self, mock_utcnow):
|
def test_logging_conf(self, mock_utcnow):
|
||||||
|
fake_transport = oslo_messaging.get_notification_transport(self.conf)
|
||||||
with mock.patch('oslo_messaging.transport._get_transport',
|
with mock.patch('oslo_messaging.transport._get_transport',
|
||||||
return_value=test_notifier._FakeTransport(self.conf)):
|
return_value=fake_transport):
|
||||||
logging.config.dictConfig({
|
logging.config.dictConfig({
|
||||||
'version': 1,
|
'version': 1,
|
||||||
'handlers': {
|
'handlers': {
|
||||||
|
@ -47,15 +47,6 @@ class JsonMessageMatcher(object):
|
|||||||
return self.message == jsonutils.loads(other)
|
return self.message == jsonutils.loads(other)
|
||||||
|
|
||||||
|
|
||||||
class _FakeTransport(object):
|
|
||||||
|
|
||||||
def __init__(self, conf):
|
|
||||||
self.conf = conf
|
|
||||||
|
|
||||||
def _send_notification(self, target, ctxt, message, version, retry=None):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
|
class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
|
||||||
|
|
||||||
"""Record logged exceptions and re-raise in cleanup.
|
"""Record logged exceptions and re-raise in cleanup.
|
||||||
@ -73,6 +64,9 @@ class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
|
|||||||
def exception(self, msg, *args, **kwargs):
|
def exception(self, msg, *args, **kwargs):
|
||||||
self.exceptions.append(sys.exc_info()[1])
|
self.exceptions.append(sys.exc_info()[1])
|
||||||
|
|
||||||
|
def warning(self, msg, *args, **kwargs):
|
||||||
|
return
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(_ReRaiseLoggedExceptionsFixture, self).setUp()
|
super(_ReRaiseLoggedExceptionsFixture, self).setUp()
|
||||||
|
|
||||||
@ -170,7 +164,8 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
|
|||||||
topics=self.topics,
|
topics=self.topics,
|
||||||
group='oslo_messaging_notifications')
|
group='oslo_messaging_notifications')
|
||||||
|
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_notification_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
|
|
||||||
if hasattr(self, 'ctor_pub_id'):
|
if hasattr(self, 'ctor_pub_id'):
|
||||||
notifier = oslo_messaging.Notifier(transport,
|
notifier = oslo_messaging.Notifier(transport,
|
||||||
@ -241,7 +236,8 @@ class TestSerializer(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
@mock.patch('oslo_utils.timeutils.utcnow')
|
@mock.patch('oslo_utils.timeutils.utcnow')
|
||||||
def test_serializer(self, mock_utcnow):
|
def test_serializer(self, mock_utcnow):
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_notification_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
|
|
||||||
serializer = msg_serializer.NoOpSerializer()
|
serializer = msg_serializer.NoOpSerializer()
|
||||||
|
|
||||||
@ -289,7 +285,8 @@ class TestNotifierTopics(test_utils.BaseTestCase):
|
|||||||
group='oslo_messaging_notifications')
|
group='oslo_messaging_notifications')
|
||||||
self.config(topics=['topic1', 'topic2'],
|
self.config(topics=['topic1', 'topic2'],
|
||||||
group='oslo_messaging_notifications')
|
group='oslo_messaging_notifications')
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_notification_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
|
|
||||||
notifier = oslo_messaging.Notifier(transport, 'test.localhost')
|
notifier = oslo_messaging.Notifier(transport, 'test.localhost')
|
||||||
self.assertEqual(['topic1', 'topic2'], notifier._topics)
|
self.assertEqual(['topic1', 'topic2'], notifier._topics)
|
||||||
@ -297,7 +294,8 @@ class TestNotifierTopics(test_utils.BaseTestCase):
|
|||||||
def test_topics_from_kwargs(self):
|
def test_topics_from_kwargs(self):
|
||||||
self.config(driver=['log'],
|
self.config(driver=['log'],
|
||||||
group='oslo_messaging_notifications')
|
group='oslo_messaging_notifications')
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_notification_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
|
|
||||||
notifier = oslo_messaging.Notifier(transport, 'test.localhost',
|
notifier = oslo_messaging.Notifier(transport, 'test.localhost',
|
||||||
topics=['topic1', 'topic2'])
|
topics=['topic1', 'topic2'])
|
||||||
@ -311,7 +309,8 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
|||||||
self.config(driver=['log'],
|
self.config(driver=['log'],
|
||||||
group='oslo_messaging_notifications')
|
group='oslo_messaging_notifications')
|
||||||
|
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_notification_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
|
|
||||||
notifier = oslo_messaging.Notifier(transport, 'test.localhost')
|
notifier = oslo_messaging.Notifier(transport, 'test.localhost')
|
||||||
|
|
||||||
@ -386,7 +385,8 @@ class TestNotificationConfig(test_utils.BaseTestCase):
|
|||||||
group='oslo_messaging_notifications')
|
group='oslo_messaging_notifications')
|
||||||
|
|
||||||
conf.set_override('retry', 3, group='oslo_messaging_notifications')
|
conf.set_override('retry', 3, group='oslo_messaging_notifications')
|
||||||
transport = _FakeTransport(conf)
|
transport = oslo_messaging.get_notification_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
notifier = oslo_messaging.Notifier(transport)
|
notifier = oslo_messaging.Notifier(transport)
|
||||||
|
|
||||||
self.assertEqual(3, notifier.retry)
|
self.assertEqual(3, notifier.retry)
|
||||||
@ -397,7 +397,8 @@ class TestNotificationConfig(test_utils.BaseTestCase):
|
|||||||
group='oslo_messaging_notifications')
|
group='oslo_messaging_notifications')
|
||||||
|
|
||||||
conf.set_override('retry', 3, group='oslo_messaging_notifications')
|
conf.set_override('retry', 3, group='oslo_messaging_notifications')
|
||||||
transport = _FakeTransport(conf)
|
transport = oslo_messaging.get_notification_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
notifier = oslo_messaging.Notifier(transport, retry=5)
|
notifier = oslo_messaging.Notifier(transport, retry=5)
|
||||||
|
|
||||||
self.assertEqual(5, notifier.retry)
|
self.assertEqual(5, notifier.retry)
|
||||||
@ -409,7 +410,8 @@ class TestRoutingNotifier(test_utils.BaseTestCase):
|
|||||||
self.config(driver=['routing'],
|
self.config(driver=['routing'],
|
||||||
group='oslo_messaging_notifications')
|
group='oslo_messaging_notifications')
|
||||||
|
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_notification_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
self.notifier = oslo_messaging.Notifier(transport)
|
self.notifier = oslo_messaging.Notifier(transport)
|
||||||
self.router = self.notifier._driver_mgr['routing'].obj
|
self.router = self.notifier._driver_mgr['routing'].obj
|
||||||
|
|
||||||
@ -642,8 +644,21 @@ class TestNoOpNotifier(test_utils.BaseTestCase):
|
|||||||
self.config(driver=['noop'],
|
self.config(driver=['noop'],
|
||||||
group='oslo_messaging_notifications')
|
group='oslo_messaging_notifications')
|
||||||
|
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_notification_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
|
|
||||||
notifier = oslo_messaging.Notifier(transport, 'test.localhost')
|
notifier = oslo_messaging.Notifier(transport, 'test.localhost')
|
||||||
|
|
||||||
self.assertFalse(notifier.is_enabled())
|
self.assertFalse(notifier.is_enabled())
|
||||||
|
|
||||||
|
|
||||||
|
class TestNotifierTransportWarning(test_utils.BaseTestCase):
|
||||||
|
|
||||||
|
@mock.patch('oslo_messaging.notify.notifier._LOG')
|
||||||
|
def test_warning_when_rpc_transport(self, log):
|
||||||
|
transport = oslo_messaging.get_rpc_transport(self.conf)
|
||||||
|
oslo_messaging.Notifier(transport, 'test.localhost')
|
||||||
|
log.warning.assert_called_once_with(
|
||||||
|
"Using RPC transport for notifications. Please use "
|
||||||
|
"get_notification_transport to obtain a "
|
||||||
|
"notification transport instance.")
|
||||||
|
@ -25,15 +25,6 @@ from oslo_messaging.tests import utils as test_utils
|
|||||||
load_tests = testscenarios.load_tests_apply_scenarios
|
load_tests = testscenarios.load_tests_apply_scenarios
|
||||||
|
|
||||||
|
|
||||||
class _FakeTransport(object):
|
|
||||||
|
|
||||||
def __init__(self, conf):
|
|
||||||
self.conf = conf
|
|
||||||
|
|
||||||
def _send(self, *args, **kwargs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class TestCastCall(test_utils.BaseTestCase):
|
class TestCastCall(test_utils.BaseTestCase):
|
||||||
|
|
||||||
scenarios = [
|
scenarios = [
|
||||||
@ -52,7 +43,7 @@ class TestCastCall(test_utils.BaseTestCase):
|
|||||||
def test_cast_call(self):
|
def test_cast_call(self):
|
||||||
self.config(rpc_response_timeout=None)
|
self.config(rpc_response_timeout=None)
|
||||||
|
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||||
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target())
|
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target())
|
||||||
|
|
||||||
transport._send = mock.Mock()
|
transport._send = mock.Mock()
|
||||||
@ -191,7 +182,7 @@ class TestCastToTarget(test_utils.BaseTestCase):
|
|||||||
target = oslo_messaging.Target(**self.ctor)
|
target = oslo_messaging.Target(**self.ctor)
|
||||||
expect_target = oslo_messaging.Target(**self.expect)
|
expect_target = oslo_messaging.Target(**self.expect)
|
||||||
|
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||||
client = oslo_messaging.RPCClient(transport, target)
|
client = oslo_messaging.RPCClient(transport, target)
|
||||||
|
|
||||||
transport._send = mock.Mock()
|
transport._send = mock.Mock()
|
||||||
@ -242,7 +233,7 @@ class TestCallTimeout(test_utils.BaseTestCase):
|
|||||||
def test_call_timeout(self):
|
def test_call_timeout(self):
|
||||||
self.config(rpc_response_timeout=self.confval)
|
self.config(rpc_response_timeout=self.confval)
|
||||||
|
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||||
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
|
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
|
||||||
timeout=self.ctor)
|
timeout=self.ctor)
|
||||||
|
|
||||||
@ -273,7 +264,7 @@ class TestCallRetry(test_utils.BaseTestCase):
|
|||||||
]
|
]
|
||||||
|
|
||||||
def test_call_retry(self):
|
def test_call_retry(self):
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||||
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
|
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
|
||||||
retry=self.ctor)
|
retry=self.ctor)
|
||||||
|
|
||||||
@ -302,7 +293,7 @@ class TestCallFanout(test_utils.BaseTestCase):
|
|||||||
]
|
]
|
||||||
|
|
||||||
def test_call_fanout(self):
|
def test_call_fanout(self):
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||||
client = oslo_messaging.RPCClient(transport,
|
client = oslo_messaging.RPCClient(transport,
|
||||||
oslo_messaging.Target(**self.target))
|
oslo_messaging.Target(**self.target))
|
||||||
|
|
||||||
@ -331,7 +322,7 @@ class TestSerializer(test_utils.BaseTestCase):
|
|||||||
def test_call_serializer(self):
|
def test_call_serializer(self):
|
||||||
self.config(rpc_response_timeout=None)
|
self.config(rpc_response_timeout=None)
|
||||||
|
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||||
serializer = msg_serializer.NoOpSerializer()
|
serializer = msg_serializer.NoOpSerializer()
|
||||||
|
|
||||||
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
|
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
|
||||||
@ -430,7 +421,7 @@ class TestVersionCap(test_utils.BaseTestCase):
|
|||||||
def test_version_cap(self):
|
def test_version_cap(self):
|
||||||
self.config(rpc_response_timeout=None)
|
self.config(rpc_response_timeout=None)
|
||||||
|
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||||
|
|
||||||
target = oslo_messaging.Target(version=self.version)
|
target = oslo_messaging.Target(version=self.version)
|
||||||
client = oslo_messaging.RPCClient(transport, target,
|
client = oslo_messaging.RPCClient(transport, target,
|
||||||
@ -535,7 +526,7 @@ class TestCanSendVersion(test_utils.BaseTestCase):
|
|||||||
def test_version_cap(self):
|
def test_version_cap(self):
|
||||||
self.config(rpc_response_timeout=None)
|
self.config(rpc_response_timeout=None)
|
||||||
|
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||||
|
|
||||||
target = oslo_messaging.Target(version=self.version)
|
target = oslo_messaging.Target(version=self.version)
|
||||||
client = oslo_messaging.RPCClient(transport, target,
|
client = oslo_messaging.RPCClient(transport, target,
|
||||||
@ -561,7 +552,7 @@ class TestCanSendVersion(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
def test_invalid_version_type(self):
|
def test_invalid_version_type(self):
|
||||||
target = oslo_messaging.Target(topic='sometopic')
|
target = oslo_messaging.Target(topic='sometopic')
|
||||||
transport = _FakeTransport(self.conf)
|
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||||
client = oslo_messaging.RPCClient(transport, target)
|
client = oslo_messaging.RPCClient(transport, target)
|
||||||
self.assertRaises(exceptions.MessagingException,
|
self.assertRaises(exceptions.MessagingException,
|
||||||
client.prepare, version='5')
|
client.prepare, version='5')
|
||||||
@ -569,3 +560,15 @@ class TestCanSendVersion(test_utils.BaseTestCase):
|
|||||||
client.prepare, version='5.a')
|
client.prepare, version='5.a')
|
||||||
self.assertRaises(exceptions.MessagingException,
|
self.assertRaises(exceptions.MessagingException,
|
||||||
client.prepare, version='5.5.a')
|
client.prepare, version='5.5.a')
|
||||||
|
|
||||||
|
|
||||||
|
class TestTransportWarning(test_utils.BaseTestCase):
|
||||||
|
|
||||||
|
@mock.patch('oslo_messaging.rpc.client.LOG')
|
||||||
|
def test_warning_when_notifier_transport(self, log):
|
||||||
|
transport = oslo_messaging.get_notification_transport(self.conf)
|
||||||
|
oslo_messaging.RPCClient(transport, oslo_messaging.Target())
|
||||||
|
log.warning.assert_called_once_with(
|
||||||
|
"Using notification transport for RPC. Please use "
|
||||||
|
"get_rpc_transport to obtain an RPC transport "
|
||||||
|
"instance.")
|
||||||
|
@ -436,6 +436,20 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
|||||||
|
|
||||||
self._stop_server(client, server_thread)
|
self._stop_server(client, server_thread)
|
||||||
|
|
||||||
|
@mock.patch('oslo_messaging.rpc.server.LOG')
|
||||||
|
def test_warning_when_notifier_transport(self, log):
|
||||||
|
transport = oslo_messaging.get_notification_transport(self.conf)
|
||||||
|
target = oslo_messaging.Target(topic='foo', server='bar')
|
||||||
|
endpoints = [object()]
|
||||||
|
serializer = object()
|
||||||
|
|
||||||
|
oslo_messaging.get_rpc_server(transport, target,
|
||||||
|
endpoints, serializer=serializer)
|
||||||
|
log.warning.assert_called_once_with(
|
||||||
|
"Using notification transport for RPC. Please use "
|
||||||
|
"get_rpc_transport to obtain an RPC transport "
|
||||||
|
"instance.")
|
||||||
|
|
||||||
|
|
||||||
class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
|
class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
|
||||||
|
|
||||||
|
@ -148,6 +148,7 @@ class GetTransportTestCase(test_utils.BaseTestCase):
|
|||||||
self.assertIsNotNone(transport_)
|
self.assertIsNotNone(transport_)
|
||||||
self.assertIs(transport_.conf, self.conf)
|
self.assertIs(transport_.conf, self.conf)
|
||||||
self.assertIs(transport_._driver, drvr)
|
self.assertIs(transport_._driver, drvr)
|
||||||
|
self.assertTrue(isinstance(transport_, transport.RPCTransport))
|
||||||
|
|
||||||
driver.DriverManager.assert_called_once_with('oslo.messaging.drivers',
|
driver.DriverManager.assert_called_once_with('oslo.messaging.drivers',
|
||||||
self.expect['backend'],
|
self.expect['backend'],
|
||||||
|
@ -153,6 +153,20 @@ class Transport(object):
|
|||||||
self._driver.cleanup()
|
self._driver.cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
class RPCTransport(Transport):
|
||||||
|
"""Transport object for RPC."""
|
||||||
|
|
||||||
|
def __init__(self, driver):
|
||||||
|
super(RPCTransport, self).__init__(driver)
|
||||||
|
|
||||||
|
|
||||||
|
class NotificationTransport(Transport):
|
||||||
|
"""Transport object for notifications."""
|
||||||
|
|
||||||
|
def __init__(self, driver):
|
||||||
|
super(NotificationTransport, self).__init__(driver)
|
||||||
|
|
||||||
|
|
||||||
class InvalidTransportURL(exceptions.MessagingException):
|
class InvalidTransportURL(exceptions.MessagingException):
|
||||||
"""Raised if transport URL is invalid."""
|
"""Raised if transport URL is invalid."""
|
||||||
|
|
||||||
@ -171,7 +185,8 @@ class DriverLoadFailure(exceptions.MessagingException):
|
|||||||
self.ex = ex
|
self.ex = ex
|
||||||
|
|
||||||
|
|
||||||
def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
|
def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None,
|
||||||
|
transport_cls=RPCTransport):
|
||||||
allowed_remote_exmods = allowed_remote_exmods or []
|
allowed_remote_exmods = allowed_remote_exmods or []
|
||||||
conf.register_opts(_transport_opts)
|
conf.register_opts(_transport_opts)
|
||||||
|
|
||||||
@ -190,7 +205,7 @@ def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
|
|||||||
except RuntimeError as ex:
|
except RuntimeError as ex:
|
||||||
raise DriverLoadFailure(url.transport, ex)
|
raise DriverLoadFailure(url.transport, ex)
|
||||||
|
|
||||||
return Transport(mgr.driver)
|
return transport_cls(mgr.driver)
|
||||||
|
|
||||||
|
|
||||||
@removals.remove(
|
@removals.remove(
|
||||||
@ -229,7 +244,8 @@ def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
|
|||||||
:type aliases: dict
|
:type aliases: dict
|
||||||
"""
|
"""
|
||||||
return _get_transport(conf, url,
|
return _get_transport(conf, url,
|
||||||
allowed_remote_exmods, aliases)
|
allowed_remote_exmods, aliases,
|
||||||
|
transport_cls=RPCTransport)
|
||||||
|
|
||||||
|
|
||||||
class TransportHost(object):
|
class TransportHost(object):
|
||||||
|
Loading…
Reference in New Issue
Block a user