Re-work server API to eliminate server subclasses

This is something I think Doug has been trying to tell me to do from the
start :-)

The main idea is to remove all the MessageHandlingServer subclasses and,
instead, if you want a server which is hooked up with the RPC dispatcher
you just use this convenience function:

  server = rpc_server.get_rpc_server(transport, target, endpoints)

This means the dispatcher interface is now part of the public API, but
that should be fine since it's very simple - it's a callable that takes
a request context and message.

However, we also need to be able to construct a MessageHandlingServer
with a specific executor. By having an executor_cls parameter to the
constructor as part of the public API, we'd be exposing the executor
interface which is quite likely to change. Instead - and this seems
obvious in retrospect - just use stevedore to load executors and allow
them to be requested by name:

  server = rpc_server.get_rpc_server(transport, target, endpoints,
                                     executor='eventlet')

This also means we can get rid of openstack.common.messaging.eventlet.
This commit is contained in:
Mark McLoughlin 2013-06-14 11:41:49 +01:00
parent 7a1c2730a0
commit f0f3d4b5f2
8 changed files with 146 additions and 120 deletions

View File

@ -15,8 +15,10 @@
from openstack.common.messaging import exceptions
from openstack.common.messaging.rpc import client
from openstack.common.messaging.rpc import server
from openstack.common.messaging.rpc import dispatcher as rpc_dispatcher
from openstack.common.messaging.rpc import server as rpc_server
from openstack.common.messaging import serializer
from openstack.common.messaging import server
from openstack.common.messaging import target
from openstack.common.messaging import transport
@ -25,11 +27,27 @@ get_transport = transport.get_transport
Target = target.Target
RPCClient = client.RPCClient
BlockingRPCServer = server.BlockingRPCServer
MessageHandlingServer = server.MessageHandlingServer
get_rpc_server = rpc_server.get_rpc_server
RPCDispatcher = rpc_dispatcher.RPCDispatcher
Serializer = serializer.Serializer
#
# Exceptions
#
MessagingException = exceptions.MessagingException
MessagingTimeout = exceptions.MessagingTimeout
DriverLoadFailure = transport.DriverLoadFailure
InvalidTransportURL = transport.InvalidTransportURL
RPCVersionCapError = client.RPCVersionCapError
MessagingServerError = server.MessagingServerError
ExecutorLoadFailure = server.ExecutorLoadFailure
RPCDispatcherError = rpc_dispatcher.RPCDispatcherError
NoSuchMethod = rpc_dispatcher.NoSuchMethod
UnsupportedVersion = rpc_dispatcher.UnsupportedVersion

View File

@ -18,6 +18,17 @@ from openstack.common.messaging._executors import base
class BlockingExecutor(base.ExecutorBase):
"""A message executor which blocks the current thread.
The blocking executor's start() method functions as a request processing
loop - i.e. it blocks, processes messages and only returns when stop() is
called from a dispatched method.
Method calls are dispatched in the current thread, so only a single method
call can be executing at once. This executor is likely to only be useful
for simple demo programs.
"""
def __init__(self, conf, listener, callback):
super(BlockingExecutor, self).__init__(conf, listener, callback)
self._running = False

View File

@ -30,6 +30,15 @@ _eventlet_opts = [
class EventletExecutor(base.ExecutorBase):
"""A message exector which integrates with eventlet.
This is an executor which polls for incoming messages from a greenthread
and dispatches each message in its own greenthread.
The stop() method kills the message polling greenthread and the wait()
method waits for all message dispatch greenthreads to complete.
"""
def __init__(self, conf, listener, callback):
super(EventletExecutor, self).__init__(conf, listener, callback)
self.conf.register_opts(_eventlet_opts)

View File

@ -1,48 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.common.messaging._executors import impl_eventlet
from openstack.common.messaging.rpc import server
class EventletRPCServer(server._RPCServer):
"""An RPC server which integrates with eventlet.
This is an RPC server which polls for incoming messages from a greenthread
and dispatches each message in its own greenthread.
The stop() method kills the message polling greenthread and the wait()
method waits for all message dispatch greenthreads to complete.
"""
def __init__(self, transport, target, endpoints, serializer=None):
"""Construct a new eventlet RPC server.
:param transport: the messaging transport
:type transport: Transport
:param target: the exchange, topic and server to listen on
:type target: Target
:param endpoints: a list of endpoint objects
:type endpoints: list
:param serializer: an optional entity serializer
:type serializer: Serializer
"""
executor_cls = impl_eventlet.EventletExecutor
super(EventletRPCServer, self).__init__(transport,
target,
endpoints,
serializer,
executor_cls)

View File

@ -16,40 +16,52 @@
# License for the specific language governing permissions and limitations
# under the License.
from openstack.common.gettextutils import _
from openstack.common import log as logging
from openstack.common.messaging import _server as server
from openstack.common.messaging import _utils as utils
from openstack.common.messaging import serializer as msg_serializer
from openstack.common.messaging import server as msg_server
from openstack.common.messaging import target
_LOG = logging.getLogger(__name__)
class RPCDispatcherError(server.MessagingServerError):
pass
class RPCDispatcherError(msg_server.MessagingServerError):
"A base class for all RPC dispatcher exceptions."
class NoSuchMethodError(RPCDispatcherError, AttributeError):
class NoSuchMethod(RPCDispatcherError, AttributeError):
"Raised if there is no endpoint which exposes the requested method."
def __init__(self, method):
msg = "Endpoint does not support RPC method %s" % method
super(NoSuchMethod, self).__init__(msg)
self.method = method
def __str__(self):
return _("Endpoint does not support RPC method %s") % self.method
class UnsupportedVersion(RPCDispatcherError):
"Raised if there is no endpoint which supports the requested version."
def __init__(self, version):
msg = "Endpoint does not support RPC version %s" % version
super(UnsupportedVersion, self).__init__(msg)
self.version = version
def __str__(self):
return _("Endpoint does not support RPC version %s") % self.version
class RPCDispatcher(object):
"Pass messages to the API objects for processing."
"""A message dispatcher which understands RPC messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
which is invoked with context and message dictionaries each time a message
is received.
RPCDispatcher is one such dispatcher which understands the format of RPC
messages. The dispatcher looks at the namespace, version and method values
in the message and matches those against a list of available endpoints.
Endpoints may have a target attribute describing the namespace and version
of the methods exposed by that object. All public methods on an endpoint
object are remotely invokable by clients.
"""
def __init__(self, endpoints, serializer):
self.endpoints = endpoints
@ -73,6 +85,14 @@ class RPCDispatcher(object):
return self.serializer.serialize_entity(ctxt, result)
def __call__(self, ctxt, message):
"""Dispatch an RPC message to the appropriate endpoint method.
:param ctxt: the request context
:type ctxt: dict
:param message: the message payload
:type message: dict
:raises: NoSuchMethod, UnsupportedVersion
"""
method = message.get('method')
args = message.get('args', {})
namespace = message.get('namespace')
@ -94,6 +114,6 @@ class RPCDispatcher(object):
found_compatible = True
if found_compatible:
raise NoSuchMethodError(method)
raise NoSuchMethod(method)
else:
raise UnsupportedVersion(version)

View File

@ -13,9 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from openstack.common.messaging._executors import impl_blocking
from openstack.common.messaging import _server
from openstack.common.messaging.rpc import _dispatcher
from openstack.common.messaging.rpc import dispatcher as rpc_dispatcher
from openstack.common.messaging import server as msg_server
"""
An RPC server exposes a number of endpoints, each of which contain a set of
@ -74,7 +73,7 @@ A simple example of an RPC server with multiple endpoints might be:
ServerControlEndpoint(self),
TestEndpoint(),
]
server = messaging.BlockingRPCServer(transport, target, endpoints)
server = messaging.get_rpc_server(transport, target, endpoints)
server.start()
server.wait()
@ -94,32 +93,13 @@ deserialize arguments from - serialize return values to - primitive types.
"""
class _RPCServer(_server.MessageHandlingServer):
def get_rpc_server(transport, target, endpoints,
executor='blocking', serializer=None):
"""Construct an RPC server.
def __init__(self, transport, target, endpoints, serializer, executor_cls):
dispatcher = _dispatcher.RPCDispatcher(endpoints, serializer)
super(_RPCServer, self).__init__(transport,
target,
dispatcher,
executor_cls)
class BlockingRPCServer(_RPCServer):
"""An RPC server which blocks and dispatches in the current thread.
The blocking RPC server is a very simple RPC server whose start() method
functions as a request processing loop - i.e. it blocks, processes messages
and only returns when stop() is called from a dispatched method.
Method calls are dispatched in the current thread, so only a single method
call can be executing at once.
This class is likely to only be useful for simple demo programs.
"""
def __init__(self, transport, target, endpoints, serializer=None):
"""Construct a new blocking RPC server.
The executor parameter controls how incoming messages will be received and
dispatched. By default, the most simple executor is used - the blocking
executor.
:param transport: the messaging transport
:type transport: Transport
@ -127,12 +107,11 @@ class BlockingRPCServer(_RPCServer):
:type target: Target
:param endpoints: a list of endpoint objects
:type endpoints: list
:param executor: name of a message executor - e.g. 'eventlet', 'blocking'
:type executor: str
:param serializer: an optional entity serializer
:type serializer: Serializer
"""
executor_cls = impl_blocking.BlockingExecutor
super(BlockingRPCServer, self).__init__(transport,
target,
endpoints,
serializer,
executor_cls)
dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer)
return msg_server.MessageHandlingServer(transport, target,
dispatcher, executor)

View File

@ -16,13 +16,26 @@
# License for the specific language governing permissions and limitations
# under the License.
from stevedore import driver
from openstack.common import log as logging
from openstack.common.messaging import exceptions
_LOG = logging.getLogger(__name__)
class MessagingServerError(Exception):
pass
class MessagingServerError(exceptions.MessagingException):
"""Base class for all MessageHandlingServer exceptions."""
class ExecutorLoadFailure(MessagingServerError):
"""Raised if an executor can't be loaded."""
def __init__(self, executor, ex):
msg = 'Failed to load executor "%s": %s' % (executor, ex)
super(ExecutorLoadFailure, self).__init__(msg)
self.executor = executor
self.ex = ex
class MessageHandlingServer(object):
@ -33,14 +46,39 @@ class MessageHandlingServer(object):
new tasks.
"""
def __init__(self, transport, target, dispatcher, executor_cls):
def __init__(self, transport, target, dispatcher, executor='blocking'):
"""Construct a message handling server.
The dispatcher parameter is a callable which is invoked with context
and message dictionaries each time a message is received.
The executor parameter controls how incoming messages will be received
and dispatched. By default, the most simple executor is used - the
blocking executor.
:param transport: the messaging transport
:type transport: Transport
:param target: the exchange, topic and server to listen on
:type target: Target
:param dispatcher: a callable which is invoked for each method
:type dispatcher: callable
:param executor: name of message executor - e.g. 'eventlet', 'blocking'
:type executor: str
"""
self.conf = transport.conf
self.transport = transport
self.target = target
self.dispatcher = dispatcher
self.executor = executor
self._executor_cls = executor_cls
try:
mgr = driver.DriverManager('openstack.common.messaging.executors',
self.executor)
except RuntimeError as ex:
raise ExecutorLoadFailure(self.executor, ex)
else:
self._executor_cls = mgr.driver
self._executor = None
super(MessageHandlingServer, self).__init__()

View File

@ -14,7 +14,6 @@
# under the License.
from openstack.common import messaging
from openstack.common.messaging.rpc import _dispatcher as rpc_dispatcher
from openstack.common.messaging import serializer as msg_serializer
from tests import utils as test_utils
@ -53,7 +52,7 @@ class TestDispatcher(test_utils.BaseTestCase):
dict(endpoints=[],
dispatch_to=None,
ctxt={}, msg=dict(method='foo'),
success=False, ex=rpc_dispatcher.UnsupportedVersion)),
success=False, ex=messaging.UnsupportedVersion)),
('default_target',
dict(endpoints=[{}],
dispatch_to=dict(endpoint=0, method='foo'),
@ -79,7 +78,7 @@ class TestDispatcher(test_utils.BaseTestCase):
dict(endpoints=[{}],
dispatch_to=None,
ctxt={}, msg=dict(method='foobar'),
success=False, ex=rpc_dispatcher.NoSuchMethodError)),
success=False, ex=messaging.NoSuchMethod)),
('namespace',
dict(endpoints=[{}, dict(namespace='testns')],
dispatch_to=dict(endpoint=1, method='foo'),
@ -89,7 +88,7 @@ class TestDispatcher(test_utils.BaseTestCase):
dict(endpoints=[{}, dict(namespace='testns')],
dispatch_to=None,
ctxt={}, msg=dict(method='foo', namespace='nstest'),
success=False, ex=rpc_dispatcher.UnsupportedVersion)),
success=False, ex=messaging.UnsupportedVersion)),
('version',
dict(endpoints=[dict(version='1.5'), dict(version='3.4')],
dispatch_to=dict(endpoint=1, method='foo'),
@ -99,7 +98,7 @@ class TestDispatcher(test_utils.BaseTestCase):
dict(endpoints=[dict(version='1.5'), dict(version='3.0')],
dispatch_to=None,
ctxt={}, msg=dict(method='foo', version='3.2'),
success=False, ex=rpc_dispatcher.UnsupportedVersion)),
success=False, ex=messaging.UnsupportedVersion)),
]
def _test_dispatcher(self):
@ -109,7 +108,7 @@ class TestDispatcher(test_utils.BaseTestCase):
endpoints.append(_FakeEndpoint(target))
serializer = None
dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer)
dispatcher = messaging.RPCDispatcher(endpoints, serializer)
if self.dispatch_to is not None:
endpoint = endpoints[self.dispatch_to['endpoint']]
@ -128,9 +127,9 @@ class TestDispatcher(test_utils.BaseTestCase):
self.assertFalse(self.success, ex)
self.assertTrue(self.ex is not None, ex)
self.assertTrue(isinstance(ex, self.ex), ex)
if isinstance(ex, rpc_dispatcher.NoSuchMethodError):
if isinstance(ex, messaging.NoSuchMethod):
self.assertEquals(ex.method, self.msg.get('method'))
elif isinstance(ex, rpc_dispatcher.UnsupportedVersion):
elif isinstance(ex, messaging.UnsupportedVersion):
self.assertEquals(ex.version, self.msg.get('version', '1.0'))
else:
self.assertTrue(self.success)
@ -183,7 +182,7 @@ class TestSerializer(test_utils.BaseTestCase):
def _test_serializer(self):
endpoint = _FakeEndpoint()
serializer = msg_serializer.NoOpSerializer
dispatcher = rpc_dispatcher.RPCDispatcher([endpoint], serializer)
dispatcher = messaging.RPCDispatcher([endpoint], serializer)
self.mox.StubOutWithMock(endpoint, 'foo')
args = dict([(k, 'd' + v) for k, v in self.args.items()])