From f0f3d4b5f2613f0e2082a1ef5d106d3044dd55a4 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 14 Jun 2013 11:41:49 +0100 Subject: [PATCH] Re-work server API to eliminate server subclasses This is something I think Doug has been trying to tell me to do from the start :-) The main idea is to remove all the MessageHandlingServer subclasses and, instead, if you want a server which is hooked up with the RPC dispatcher you just use this convenience function: server = rpc_server.get_rpc_server(transport, target, endpoints) This means the dispatcher interface is now part of the public API, but that should be fine since it's very simple - it's a callable that takes a request context and message. However, we also need to be able to construct a MessageHandlingServer with a specific executor. By having an executor_cls parameter to the constructor as part of the public API, we'd be exposing the executor interface which is quite likely to change. Instead - and this seems obvious in retrospect - just use stevedore to load executors and allow them to be requested by name: server = rpc_server.get_rpc_server(transport, target, endpoints, executor='eventlet') This also means we can get rid of openstack.common.messaging.eventlet. --- openstack/common/messaging/__init__.py | 22 ++++++- .../messaging/_executors/impl_blocking.py | 11 ++++ .../messaging/_executors/impl_eventlet.py | 9 +++ openstack/common/messaging/eventlet.py | 48 -------------- .../rpc/{_dispatcher.py => dispatcher.py} | 46 +++++++++---- openstack/common/messaging/rpc/server.py | 65 +++++++------------ .../messaging/{_server.py => server.py} | 48 ++++++++++++-- tests/unit/messaging/test_rpc_dispatcher.py | 17 +++-- 8 files changed, 146 insertions(+), 120 deletions(-) delete mode 100644 openstack/common/messaging/eventlet.py rename openstack/common/messaging/rpc/{_dispatcher.py => dispatcher.py} (64%) rename openstack/common/messaging/{_server.py => server.py} (64%) diff --git a/openstack/common/messaging/__init__.py b/openstack/common/messaging/__init__.py index 9413f3fb7..aa6972370 100644 --- a/openstack/common/messaging/__init__.py +++ b/openstack/common/messaging/__init__.py @@ -15,8 +15,10 @@ from openstack.common.messaging import exceptions from openstack.common.messaging.rpc import client -from openstack.common.messaging.rpc import server +from openstack.common.messaging.rpc import dispatcher as rpc_dispatcher +from openstack.common.messaging.rpc import server as rpc_server from openstack.common.messaging import serializer +from openstack.common.messaging import server from openstack.common.messaging import target from openstack.common.messaging import transport @@ -25,11 +27,27 @@ get_transport = transport.get_transport Target = target.Target RPCClient = client.RPCClient -BlockingRPCServer = server.BlockingRPCServer + +MessageHandlingServer = server.MessageHandlingServer +get_rpc_server = rpc_server.get_rpc_server +RPCDispatcher = rpc_dispatcher.RPCDispatcher Serializer = serializer.Serializer + +# +# Exceptions +# MessagingException = exceptions.MessagingException MessagingTimeout = exceptions.MessagingTimeout + DriverLoadFailure = transport.DriverLoadFailure InvalidTransportURL = transport.InvalidTransportURL + RPCVersionCapError = client.RPCVersionCapError + +MessagingServerError = server.MessagingServerError +ExecutorLoadFailure = server.ExecutorLoadFailure + +RPCDispatcherError = rpc_dispatcher.RPCDispatcherError +NoSuchMethod = rpc_dispatcher.NoSuchMethod +UnsupportedVersion = rpc_dispatcher.UnsupportedVersion diff --git a/openstack/common/messaging/_executors/impl_blocking.py b/openstack/common/messaging/_executors/impl_blocking.py index 90f449e80..2fd30a8aa 100644 --- a/openstack/common/messaging/_executors/impl_blocking.py +++ b/openstack/common/messaging/_executors/impl_blocking.py @@ -18,6 +18,17 @@ from openstack.common.messaging._executors import base class BlockingExecutor(base.ExecutorBase): + """A message executor which blocks the current thread. + + The blocking executor's start() method functions as a request processing + loop - i.e. it blocks, processes messages and only returns when stop() is + called from a dispatched method. + + Method calls are dispatched in the current thread, so only a single method + call can be executing at once. This executor is likely to only be useful + for simple demo programs. + """ + def __init__(self, conf, listener, callback): super(BlockingExecutor, self).__init__(conf, listener, callback) self._running = False diff --git a/openstack/common/messaging/_executors/impl_eventlet.py b/openstack/common/messaging/_executors/impl_eventlet.py index df42432e7..4253104c7 100644 --- a/openstack/common/messaging/_executors/impl_eventlet.py +++ b/openstack/common/messaging/_executors/impl_eventlet.py @@ -30,6 +30,15 @@ _eventlet_opts = [ class EventletExecutor(base.ExecutorBase): + """A message exector which integrates with eventlet. + + This is an executor which polls for incoming messages from a greenthread + and dispatches each message in its own greenthread. + + The stop() method kills the message polling greenthread and the wait() + method waits for all message dispatch greenthreads to complete. + """ + def __init__(self, conf, listener, callback): super(EventletExecutor, self).__init__(conf, listener, callback) self.conf.register_opts(_eventlet_opts) diff --git a/openstack/common/messaging/eventlet.py b/openstack/common/messaging/eventlet.py deleted file mode 100644 index be3d33b75..000000000 --- a/openstack/common/messaging/eventlet.py +++ /dev/null @@ -1,48 +0,0 @@ - -# Copyright 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from openstack.common.messaging._executors import impl_eventlet -from openstack.common.messaging.rpc import server - - -class EventletRPCServer(server._RPCServer): - - """An RPC server which integrates with eventlet. - - This is an RPC server which polls for incoming messages from a greenthread - and dispatches each message in its own greenthread. - - The stop() method kills the message polling greenthread and the wait() - method waits for all message dispatch greenthreads to complete. - """ - - def __init__(self, transport, target, endpoints, serializer=None): - """Construct a new eventlet RPC server. - - :param transport: the messaging transport - :type transport: Transport - :param target: the exchange, topic and server to listen on - :type target: Target - :param endpoints: a list of endpoint objects - :type endpoints: list - :param serializer: an optional entity serializer - :type serializer: Serializer - """ - executor_cls = impl_eventlet.EventletExecutor - super(EventletRPCServer, self).__init__(transport, - target, - endpoints, - serializer, - executor_cls) diff --git a/openstack/common/messaging/rpc/_dispatcher.py b/openstack/common/messaging/rpc/dispatcher.py similarity index 64% rename from openstack/common/messaging/rpc/_dispatcher.py rename to openstack/common/messaging/rpc/dispatcher.py index 8a2c0a69a..70bfae711 100644 --- a/openstack/common/messaging/rpc/_dispatcher.py +++ b/openstack/common/messaging/rpc/dispatcher.py @@ -16,40 +16,52 @@ # License for the specific language governing permissions and limitations # under the License. -from openstack.common.gettextutils import _ from openstack.common import log as logging -from openstack.common.messaging import _server as server from openstack.common.messaging import _utils as utils from openstack.common.messaging import serializer as msg_serializer +from openstack.common.messaging import server as msg_server from openstack.common.messaging import target _LOG = logging.getLogger(__name__) -class RPCDispatcherError(server.MessagingServerError): - pass +class RPCDispatcherError(msg_server.MessagingServerError): + "A base class for all RPC dispatcher exceptions." -class NoSuchMethodError(RPCDispatcherError, AttributeError): +class NoSuchMethod(RPCDispatcherError, AttributeError): + "Raised if there is no endpoint which exposes the requested method." def __init__(self, method): + msg = "Endpoint does not support RPC method %s" % method + super(NoSuchMethod, self).__init__(msg) self.method = method - def __str__(self): - return _("Endpoint does not support RPC method %s") % self.method - class UnsupportedVersion(RPCDispatcherError): + "Raised if there is no endpoint which supports the requested version." def __init__(self, version): + msg = "Endpoint does not support RPC version %s" % version + super(UnsupportedVersion, self).__init__(msg) self.version = version - def __str__(self): - return _("Endpoint does not support RPC version %s") % self.version - class RPCDispatcher(object): - "Pass messages to the API objects for processing." + """A message dispatcher which understands RPC messages. + + A MessageHandlingServer is constructed by passing a callable dispatcher + which is invoked with context and message dictionaries each time a message + is received. + + RPCDispatcher is one such dispatcher which understands the format of RPC + messages. The dispatcher looks at the namespace, version and method values + in the message and matches those against a list of available endpoints. + + Endpoints may have a target attribute describing the namespace and version + of the methods exposed by that object. All public methods on an endpoint + object are remotely invokable by clients. + """ def __init__(self, endpoints, serializer): self.endpoints = endpoints @@ -73,6 +85,14 @@ class RPCDispatcher(object): return self.serializer.serialize_entity(ctxt, result) def __call__(self, ctxt, message): + """Dispatch an RPC message to the appropriate endpoint method. + + :param ctxt: the request context + :type ctxt: dict + :param message: the message payload + :type message: dict + :raises: NoSuchMethod, UnsupportedVersion + """ method = message.get('method') args = message.get('args', {}) namespace = message.get('namespace') @@ -94,6 +114,6 @@ class RPCDispatcher(object): found_compatible = True if found_compatible: - raise NoSuchMethodError(method) + raise NoSuchMethod(method) else: raise UnsupportedVersion(version) diff --git a/openstack/common/messaging/rpc/server.py b/openstack/common/messaging/rpc/server.py index 3604dfa54..1482d57e2 100644 --- a/openstack/common/messaging/rpc/server.py +++ b/openstack/common/messaging/rpc/server.py @@ -13,9 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. -from openstack.common.messaging._executors import impl_blocking -from openstack.common.messaging import _server -from openstack.common.messaging.rpc import _dispatcher +from openstack.common.messaging.rpc import dispatcher as rpc_dispatcher +from openstack.common.messaging import server as msg_server """ An RPC server exposes a number of endpoints, each of which contain a set of @@ -74,7 +73,7 @@ A simple example of an RPC server with multiple endpoints might be: ServerControlEndpoint(self), TestEndpoint(), ] - server = messaging.BlockingRPCServer(transport, target, endpoints) + server = messaging.get_rpc_server(transport, target, endpoints) server.start() server.wait() @@ -94,45 +93,25 @@ deserialize arguments from - serialize return values to - primitive types. """ -class _RPCServer(_server.MessageHandlingServer): +def get_rpc_server(transport, target, endpoints, + executor='blocking', serializer=None): + """Construct an RPC server. - def __init__(self, transport, target, endpoints, serializer, executor_cls): - dispatcher = _dispatcher.RPCDispatcher(endpoints, serializer) - super(_RPCServer, self).__init__(transport, - target, - dispatcher, - executor_cls) + The executor parameter controls how incoming messages will be received and + dispatched. By default, the most simple executor is used - the blocking + executor. - -class BlockingRPCServer(_RPCServer): - - """An RPC server which blocks and dispatches in the current thread. - - The blocking RPC server is a very simple RPC server whose start() method - functions as a request processing loop - i.e. it blocks, processes messages - and only returns when stop() is called from a dispatched method. - - Method calls are dispatched in the current thread, so only a single method - call can be executing at once. - - This class is likely to only be useful for simple demo programs. + :param transport: the messaging transport + :type transport: Transport + :param target: the exchange, topic and server to listen on + :type target: Target + :param endpoints: a list of endpoint objects + :type endpoints: list + :param executor: name of a message executor - e.g. 'eventlet', 'blocking' + :type executor: str + :param serializer: an optional entity serializer + :type serializer: Serializer """ - - def __init__(self, transport, target, endpoints, serializer=None): - """Construct a new blocking RPC server. - - :param transport: the messaging transport - :type transport: Transport - :param target: the exchange, topic and server to listen on - :type target: Target - :param endpoints: a list of endpoint objects - :type endpoints: list - :param serializer: an optional entity serializer - :type serializer: Serializer - """ - executor_cls = impl_blocking.BlockingExecutor - super(BlockingRPCServer, self).__init__(transport, - target, - endpoints, - serializer, - executor_cls) + dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer) + return msg_server.MessageHandlingServer(transport, target, + dispatcher, executor) diff --git a/openstack/common/messaging/_server.py b/openstack/common/messaging/server.py similarity index 64% rename from openstack/common/messaging/_server.py rename to openstack/common/messaging/server.py index 14b1e6255..74b7c8c4a 100644 --- a/openstack/common/messaging/_server.py +++ b/openstack/common/messaging/server.py @@ -16,13 +16,26 @@ # License for the specific language governing permissions and limitations # under the License. +from stevedore import driver + from openstack.common import log as logging +from openstack.common.messaging import exceptions _LOG = logging.getLogger(__name__) -class MessagingServerError(Exception): - pass +class MessagingServerError(exceptions.MessagingException): + """Base class for all MessageHandlingServer exceptions.""" + + +class ExecutorLoadFailure(MessagingServerError): + """Raised if an executor can't be loaded.""" + + def __init__(self, executor, ex): + msg = 'Failed to load executor "%s": %s' % (executor, ex) + super(ExecutorLoadFailure, self).__init__(msg) + self.executor = executor + self.ex = ex class MessageHandlingServer(object): @@ -33,15 +46,40 @@ class MessageHandlingServer(object): new tasks. """ - def __init__(self, transport, target, dispatcher, executor_cls): + def __init__(self, transport, target, dispatcher, executor='blocking'): + """Construct a message handling server. + + The dispatcher parameter is a callable which is invoked with context + and message dictionaries each time a message is received. + + The executor parameter controls how incoming messages will be received + and dispatched. By default, the most simple executor is used - the + blocking executor. + + :param transport: the messaging transport + :type transport: Transport + :param target: the exchange, topic and server to listen on + :type target: Target + :param dispatcher: a callable which is invoked for each method + :type dispatcher: callable + :param executor: name of message executor - e.g. 'eventlet', 'blocking' + :type executor: str + """ self.conf = transport.conf self.transport = transport self.target = target self.dispatcher = dispatcher + self.executor = executor - self._executor_cls = executor_cls - self._executor = None + try: + mgr = driver.DriverManager('openstack.common.messaging.executors', + self.executor) + except RuntimeError as ex: + raise ExecutorLoadFailure(self.executor, ex) + else: + self._executor_cls = mgr.driver + self._executor = None super(MessageHandlingServer, self).__init__() diff --git a/tests/unit/messaging/test_rpc_dispatcher.py b/tests/unit/messaging/test_rpc_dispatcher.py index 8b407b3f4..364bb6f0b 100644 --- a/tests/unit/messaging/test_rpc_dispatcher.py +++ b/tests/unit/messaging/test_rpc_dispatcher.py @@ -14,7 +14,6 @@ # under the License. from openstack.common import messaging -from openstack.common.messaging.rpc import _dispatcher as rpc_dispatcher from openstack.common.messaging import serializer as msg_serializer from tests import utils as test_utils @@ -53,7 +52,7 @@ class TestDispatcher(test_utils.BaseTestCase): dict(endpoints=[], dispatch_to=None, ctxt={}, msg=dict(method='foo'), - success=False, ex=rpc_dispatcher.UnsupportedVersion)), + success=False, ex=messaging.UnsupportedVersion)), ('default_target', dict(endpoints=[{}], dispatch_to=dict(endpoint=0, method='foo'), @@ -79,7 +78,7 @@ class TestDispatcher(test_utils.BaseTestCase): dict(endpoints=[{}], dispatch_to=None, ctxt={}, msg=dict(method='foobar'), - success=False, ex=rpc_dispatcher.NoSuchMethodError)), + success=False, ex=messaging.NoSuchMethod)), ('namespace', dict(endpoints=[{}, dict(namespace='testns')], dispatch_to=dict(endpoint=1, method='foo'), @@ -89,7 +88,7 @@ class TestDispatcher(test_utils.BaseTestCase): dict(endpoints=[{}, dict(namespace='testns')], dispatch_to=None, ctxt={}, msg=dict(method='foo', namespace='nstest'), - success=False, ex=rpc_dispatcher.UnsupportedVersion)), + success=False, ex=messaging.UnsupportedVersion)), ('version', dict(endpoints=[dict(version='1.5'), dict(version='3.4')], dispatch_to=dict(endpoint=1, method='foo'), @@ -99,7 +98,7 @@ class TestDispatcher(test_utils.BaseTestCase): dict(endpoints=[dict(version='1.5'), dict(version='3.0')], dispatch_to=None, ctxt={}, msg=dict(method='foo', version='3.2'), - success=False, ex=rpc_dispatcher.UnsupportedVersion)), + success=False, ex=messaging.UnsupportedVersion)), ] def _test_dispatcher(self): @@ -109,7 +108,7 @@ class TestDispatcher(test_utils.BaseTestCase): endpoints.append(_FakeEndpoint(target)) serializer = None - dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer) + dispatcher = messaging.RPCDispatcher(endpoints, serializer) if self.dispatch_to is not None: endpoint = endpoints[self.dispatch_to['endpoint']] @@ -128,9 +127,9 @@ class TestDispatcher(test_utils.BaseTestCase): self.assertFalse(self.success, ex) self.assertTrue(self.ex is not None, ex) self.assertTrue(isinstance(ex, self.ex), ex) - if isinstance(ex, rpc_dispatcher.NoSuchMethodError): + if isinstance(ex, messaging.NoSuchMethod): self.assertEquals(ex.method, self.msg.get('method')) - elif isinstance(ex, rpc_dispatcher.UnsupportedVersion): + elif isinstance(ex, messaging.UnsupportedVersion): self.assertEquals(ex.version, self.msg.get('version', '1.0')) else: self.assertTrue(self.success) @@ -183,7 +182,7 @@ class TestSerializer(test_utils.BaseTestCase): def _test_serializer(self): endpoint = _FakeEndpoint() serializer = msg_serializer.NoOpSerializer - dispatcher = rpc_dispatcher.RPCDispatcher([endpoint], serializer) + dispatcher = messaging.RPCDispatcher([endpoint], serializer) self.mox.StubOutWithMock(endpoint, 'foo') args = dict([(k, 'd' + v) for k, v in self.args.items()])