From 86e5737bf6620b5693d7e7acc54dd84524c56d68 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Tue, 17 Dec 2013 19:44:51 +0100 Subject: [PATCH] Make the dispatcher responsible to listen() The dispatcher is now responsible to configure and to get the listener from the transport. The server just ask to the dispatcher to build and return a configured listener for a provider transport. Partial implements blueprint notification-subscriber-server Change-Id: I4a6d9620b8239f6d377bc5788b8a90a860b2f02c --- oslo/messaging/rpc/dispatcher.py | 18 +++++++++++++++--- oslo/messaging/rpc/server.py | 5 ++--- oslo/messaging/server.py | 8 ++------ tests/test_rpc_dispatcher.py | 6 ++++-- tests/test_rpc_server.py | 1 - 5 files changed, 23 insertions(+), 15 deletions(-) diff --git a/oslo/messaging/rpc/dispatcher.py b/oslo/messaging/rpc/dispatcher.py index 28148443f..a5bad113d 100644 --- a/oslo/messaging/rpc/dispatcher.py +++ b/oslo/messaging/rpc/dispatcher.py @@ -29,7 +29,7 @@ from oslo.messaging import _utils as utils from oslo.messaging import localcontext from oslo.messaging import serializer as msg_serializer from oslo.messaging import server as msg_server -from oslo.messaging import target +from oslo.messaging import target as msg_target class RPCDispatcherError(msg_server.MessagingServerError): @@ -68,12 +68,24 @@ class RPCDispatcher(object): Endpoints may have a target attribute describing the namespace and version of the methods exposed by that object. All public methods on an endpoint object are remotely invokable by clients. + + """ - def __init__(self, endpoints, serializer): + def __init__(self, target, endpoints, serializer): + """Construct a rpc server dispatcher. + + :param target: the exchange, topic and server to listen on + :type target: Target + """ + self.endpoints = endpoints self.serializer = serializer or msg_serializer.NoOpSerializer() - self._default_target = target.Target() + self._default_target = msg_target.Target() + self._target = target + + def _listen(self, transport): + return transport._listen(self._target) @staticmethod def _is_namespace(target, namespace): diff --git a/oslo/messaging/rpc/server.py b/oslo/messaging/rpc/server.py index 690b03dff..aaa071671 100644 --- a/oslo/messaging/rpc/server.py +++ b/oslo/messaging/rpc/server.py @@ -121,9 +121,8 @@ def get_rpc_server(transport, target, endpoints, :param serializer: an optional entity serializer :type serializer: Serializer """ - dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer) - return msg_server.MessageHandlingServer(transport, target, - dispatcher, executor) + dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer) + return msg_server.MessageHandlingServer(transport, dispatcher, executor) class ExpectedException(Exception): diff --git a/oslo/messaging/server.py b/oslo/messaging/server.py index 68559a9d3..bc96c5718 100644 --- a/oslo/messaging/server.py +++ b/oslo/messaging/server.py @@ -61,7 +61,7 @@ class MessageHandlingServer(object): new tasks. """ - def __init__(self, transport, target, dispatcher, executor='blocking'): + def __init__(self, transport, dispatcher, executor='blocking'): """Construct a message handling server. The dispatcher parameter is a callable which is invoked with context @@ -73,8 +73,6 @@ class MessageHandlingServer(object): :param transport: the messaging transport :type transport: Transport - :param target: the exchange, topic and server to listen on - :type target: Target :param dispatcher: a callable which is invoked for each method :type dispatcher: callable :param executor: name of message executor - e.g. 'eventlet', 'blocking' @@ -83,7 +81,6 @@ class MessageHandlingServer(object): self.conf = transport.conf self.transport = transport - self.target = target self.dispatcher = dispatcher self.executor = executor @@ -116,9 +113,8 @@ class MessageHandlingServer(object): """ if self._executor is not None: return - try: - listener = self.transport._listen(self.target) + listener = self.dispatcher._listen(self.transport) except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex) diff --git a/tests/test_rpc_dispatcher.py b/tests/test_rpc_dispatcher.py index 5b367b95e..5d20813da 100644 --- a/tests/test_rpc_dispatcher.py +++ b/tests/test_rpc_dispatcher.py @@ -97,7 +97,8 @@ class TestDispatcher(test_utils.BaseTestCase): endpoints.append(_FakeEndpoint(target)) serializer = None - dispatcher = messaging.RPCDispatcher(endpoints, serializer) + target = messaging.Target() + dispatcher = messaging.RPCDispatcher(target, endpoints, serializer) if self.dispatch_to is not None: endpoint = endpoints[self.dispatch_to['endpoint']] @@ -139,7 +140,8 @@ class TestSerializer(test_utils.BaseTestCase): def test_serializer(self): endpoint = _FakeEndpoint() serializer = msg_serializer.NoOpSerializer() - dispatcher = messaging.RPCDispatcher([endpoint], serializer) + target = messaging.Target() + dispatcher = messaging.RPCDispatcher(target, [endpoint], serializer) self.mox.StubOutWithMock(endpoint, 'foo') args = dict([(k, 'd' + v) for k, v in self.args.items()]) diff --git a/tests/test_rpc_server.py b/tests/test_rpc_server.py index 4fe0a59a8..1f2df2a13 100644 --- a/tests/test_rpc_server.py +++ b/tests/test_rpc_server.py @@ -105,7 +105,6 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): self.assertIs(server.conf, self.conf) self.assertIs(server.transport, transport) - self.assertIs(server.target, target) self.assertIsInstance(server.dispatcher, messaging.RPCDispatcher) self.assertIs(server.dispatcher.endpoints, endpoints) self.assertIs(server.dispatcher.serializer, serializer)