creates a dispatcher abstraction
This change creates a dispatcher abstraction to document the interface of a dispatcher. And also allows in the futur to have attributes with default values. Change-Id: I9a7e5e03f89635a3790b3851f492a1a7aab58feb
This commit is contained in:
parent
52ccff7cbc
commit
bdf287e847
@ -46,57 +46,6 @@ def version_is_compatible(imp_version, version):
|
||||
return True
|
||||
|
||||
|
||||
class DispatcherExecutorContext(object):
|
||||
"""Dispatcher executor context helper
|
||||
|
||||
A dispatcher can have work to do before and after the dispatch of the
|
||||
request in the main server thread while the dispatcher itself can be
|
||||
done in its own thread.
|
||||
|
||||
The executor can use the helper like this:
|
||||
|
||||
callback = dispatcher(incoming)
|
||||
callback.prepare()
|
||||
thread = MyWhateverThread()
|
||||
thread.on_done(callback.done)
|
||||
thread.run(callback.run)
|
||||
|
||||
"""
|
||||
def __init__(self, incoming, dispatch, executor_callback=None,
|
||||
post=None):
|
||||
self._result = None
|
||||
self._incoming = incoming
|
||||
self._dispatch = dispatch
|
||||
self._post = post
|
||||
self._executor_callback = executor_callback
|
||||
|
||||
def run(self):
|
||||
"""The incoming message dispath itself
|
||||
|
||||
Can be run in an other thread/greenlet/corotine if the executor is
|
||||
able to do it.
|
||||
"""
|
||||
try:
|
||||
self._result = self._dispatch(self._incoming,
|
||||
self._executor_callback)
|
||||
except Exception:
|
||||
msg = 'The dispatcher method must catches all exceptions'
|
||||
LOG.exception(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
def done(self):
|
||||
"""Callback after the incoming message have been dispathed
|
||||
|
||||
Should be ran in the main executor thread/greenlet/corotine
|
||||
"""
|
||||
# FIXME(sileht): this is not currently true, this works only because
|
||||
# the driver connection used for polling write on the wire only to
|
||||
# ack/requeue message, but what if one day, the driver do something
|
||||
# else
|
||||
if self._post is not None:
|
||||
self._post(self._incoming, self._result)
|
||||
|
||||
|
||||
def fetch_current_thread_functor():
|
||||
# Until https://github.com/eventlet/eventlet/issues/172 is resolved
|
||||
# or addressed we have to use complicated workaround to get a object
|
||||
|
105
oslo_messaging/dispatcher.py
Normal file
105
oslo_messaging/dispatcher.py
Normal file
@ -0,0 +1,105 @@
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import logging
|
||||
|
||||
import six
|
||||
|
||||
__all__ = [
|
||||
"DispatcherBase",
|
||||
"DispatcherExecutorContext"
|
||||
]
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DispatcherExecutorContext(object):
|
||||
"""Dispatcher executor context helper
|
||||
|
||||
A dispatcher can have work to do before and after the dispatch of the
|
||||
request in the main server thread while the dispatcher itself can be
|
||||
done in its own thread.
|
||||
|
||||
The executor can use the helper like this:
|
||||
|
||||
callback = dispatcher(incoming)
|
||||
callback.prepare()
|
||||
thread = MyWhateverThread()
|
||||
thread.on_done(callback.done)
|
||||
thread.run(callback.run)
|
||||
|
||||
"""
|
||||
def __init__(self, incoming, dispatch, executor_callback=None,
|
||||
post=None):
|
||||
self._result = None
|
||||
self._incoming = incoming
|
||||
self._dispatch = dispatch
|
||||
self._post = post
|
||||
self._executor_callback = executor_callback
|
||||
|
||||
def run(self):
|
||||
"""The incoming message dispath itself
|
||||
|
||||
Can be run in an other thread/greenlet/corotine if the executor is
|
||||
able to do it.
|
||||
"""
|
||||
try:
|
||||
self._result = self._dispatch(self._incoming,
|
||||
self._executor_callback)
|
||||
except Exception:
|
||||
msg = 'The dispatcher method must catches all exceptions'
|
||||
LOG.exception(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
def done(self):
|
||||
"""Callback after the incoming message have been dispathed
|
||||
|
||||
Should be ran in the main executor thread/greenlet/corotine
|
||||
"""
|
||||
# FIXME(sileht): this is not currently true, this works only because
|
||||
# the driver connection used for polling write on the wire only to
|
||||
# ack/requeue message, but what if one day, the driver do something
|
||||
# else
|
||||
if self._post is not None:
|
||||
self._post(self._incoming, self._result)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class DispatcherBase(object):
|
||||
"Base class for dispatcher"
|
||||
|
||||
@abc.abstractmethod
|
||||
def _listen(self, transport):
|
||||
"""Initiate the driver Listener
|
||||
|
||||
Usualy the driver Listener is start with the transport helper methods:
|
||||
|
||||
* transport._listen()
|
||||
* transport._listen_for_notifications()
|
||||
|
||||
:param transport: the transport object
|
||||
:type transport: oslo_messaging.transport.Transport
|
||||
:returns: a driver Listener object
|
||||
:rtype: oslo_messaging._drivers.base.Listener
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
"""Called by the executor to get the DispatcherExecutorContext
|
||||
|
||||
:param incoming: message or list of messages
|
||||
:type incoming: oslo_messging._drivers.base.IncomingMessage
|
||||
:returns: DispatcherExecutorContext
|
||||
:rtype: DispatcherExecutorContext
|
||||
"""
|
@ -18,7 +18,7 @@ import itertools
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from oslo_messaging import _utils as utils
|
||||
from oslo_messaging import dispatcher
|
||||
from oslo_messaging import localcontext
|
||||
from oslo_messaging import serializer as msg_serializer
|
||||
|
||||
@ -33,7 +33,7 @@ class NotificationResult(object):
|
||||
REQUEUE = 'requeue'
|
||||
|
||||
|
||||
class NotificationDispatcher(object):
|
||||
class NotificationDispatcher(dispatcher.DispatcherBase):
|
||||
"""A message dispatcher which understands Notification messages.
|
||||
|
||||
A MessageHandlingServer is constructed by passing a callable dispatcher
|
||||
@ -69,7 +69,7 @@ class NotificationDispatcher(object):
|
||||
pool=self.pool)
|
||||
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
return utils.DispatcherExecutorContext(
|
||||
return dispatcher.DispatcherExecutorContext(
|
||||
incoming, self._dispatch_and_handle_error,
|
||||
executor_callback=executor_callback,
|
||||
post=self._post_dispatch)
|
||||
|
@ -31,6 +31,7 @@ import six
|
||||
|
||||
from oslo_messaging._i18n import _LE
|
||||
from oslo_messaging import _utils as utils
|
||||
from oslo_messaging import dispatcher
|
||||
from oslo_messaging import localcontext
|
||||
from oslo_messaging import serializer as msg_serializer
|
||||
from oslo_messaging import server as msg_server
|
||||
@ -75,7 +76,7 @@ class UnsupportedVersion(RPCDispatcherError):
|
||||
self.method = method
|
||||
|
||||
|
||||
class RPCDispatcher(object):
|
||||
class RPCDispatcher(dispatcher.DispatcherBase):
|
||||
"""A message dispatcher which understands RPC messages.
|
||||
|
||||
A MessageHandlingServer is constructed by passing a callable dispatcher
|
||||
@ -131,7 +132,7 @@ class RPCDispatcher(object):
|
||||
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
incoming.acknowledge()
|
||||
return utils.DispatcherExecutorContext(
|
||||
return dispatcher.DispatcherExecutorContext(
|
||||
incoming, self._dispatch_and_reply,
|
||||
executor_callback=executor_callback)
|
||||
|
||||
|
@ -44,7 +44,7 @@ try:
|
||||
except ImportError:
|
||||
impl_eventlet = None
|
||||
from oslo_messaging._executors import impl_thread
|
||||
from oslo_messaging import _utils as utils
|
||||
from oslo_messaging import dispatcher as dispatcher_base
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
from six.moves import mock
|
||||
|
||||
@ -151,9 +151,8 @@ class TestExecutor(test_utils.BaseTestCase):
|
||||
return result
|
||||
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
return utils.DispatcherExecutorContext(incoming,
|
||||
self.callback,
|
||||
executor_callback)
|
||||
return dispatcher_base.DispatcherExecutorContext(
|
||||
incoming, self.callback, executor_callback)
|
||||
|
||||
return Dispatcher(endpoint), endpoint, event, run_executor
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user