Remove usage of contentmanager for executors
The context manager in the executor fit only for the blocking executor. Even the dispatcher needs to run code before and after the application callback, eventlet and future executors have to run the pre/post code into the main thread and can run the callback into an other thread, and that force them to run __enter__ and __exit__ manually and deal the exception path. This change adds a helper object instead of the context manager. It is designed to be explicit on what must be executed before and after the callback and what can be done in a thread or not. All the executor code is now in the impl_pooledexecutor.py and use the futures "PoolExecutor" API. This use futurist to provide a eventlet and aioeventlet futures friendly object. Change-Id: I8cd7640f36beeda47560e3c82671bad3530e38d1
This commit is contained in:
parent
3b891fcfcb
commit
c49594a62f
@ -35,7 +35,7 @@ from stevedore import driver
|
||||
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._executors import base as executor_base # FIXME(markmc)
|
||||
from oslo_messaging._executors import impl_pooledexecutor # FIXME(markmc)
|
||||
from oslo_messaging._i18n import _, _LE, _LW
|
||||
from oslo_messaging._drivers import pool
|
||||
|
||||
@ -1001,7 +1001,7 @@ class ZmqDriver(base.BaseDriver):
|
||||
if not zmq:
|
||||
raise ImportError("Failed to import eventlet.green.zmq")
|
||||
conf.register_opts(zmq_opts)
|
||||
conf.register_opts(executor_base._pool_opts)
|
||||
conf.register_opts(impl_pooledexecutor._pool_opts)
|
||||
conf.register_opts(base.base_opts)
|
||||
|
||||
super(ZmqDriver, self).__init__(conf, url, default_exchange,
|
||||
|
@ -14,19 +14,15 @@
|
||||
|
||||
import abc
|
||||
|
||||
from oslo_config import cfg
|
||||
import six
|
||||
|
||||
_pool_opts = [
|
||||
cfg.IntOpt('rpc_thread_pool_size',
|
||||
default=64,
|
||||
help='Size of RPC thread pool.'),
|
||||
]
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ExecutorBase(object):
|
||||
|
||||
# Executor can override how we run the application callback
|
||||
_executor_callback = None
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
self.conf = conf
|
||||
self.listener = listener
|
||||
@ -43,11 +39,3 @@ class ExecutorBase(object):
|
||||
@abc.abstractmethod
|
||||
def wait(self):
|
||||
"Wait until the executor has stopped polling."
|
||||
|
||||
|
||||
class PooledExecutorBase(ExecutorBase):
|
||||
"""An executor that uses a rpc thread pool of a given size."""
|
||||
|
||||
def __init__(self, conf, listener, callback):
|
||||
super(PooledExecutorBase, self).__init__(conf, listener, callback)
|
||||
self.conf.register_opts(_pool_opts)
|
||||
|
@ -70,6 +70,4 @@ class AsyncioEventletExecutor(impl_eventlet.EventletExecutor):
|
||||
result = aioeventlet.yield_future(result, loop=self._loop)
|
||||
return result
|
||||
|
||||
def _dispatch(self, incoming):
|
||||
ctx = self.dispatcher(incoming, self._coroutine_wrapper)
|
||||
impl_eventlet.spawn_with(ctxt=ctx, pool=self._greenpool)
|
||||
_executor_callback = _coroutine_wrapper
|
||||
|
@ -13,15 +13,28 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import futurist
|
||||
|
||||
from oslo_messaging._executors import base
|
||||
from oslo_messaging._i18n import _
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
from oslo_messaging._executors import impl_pooledexecutor
|
||||
|
||||
|
||||
class BlockingExecutor(base.ExecutorBase):
|
||||
class FakeBlockingThread(object):
|
||||
def __init__(self, target):
|
||||
self._target = target
|
||||
|
||||
def start(self):
|
||||
self._target()
|
||||
|
||||
@staticmethod
|
||||
def join():
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def stop():
|
||||
pass
|
||||
|
||||
|
||||
class BlockingExecutor(impl_pooledexecutor.PooledExecutor):
|
||||
|
||||
"""A message executor which blocks the current thread.
|
||||
|
||||
@ -34,24 +47,5 @@ class BlockingExecutor(base.ExecutorBase):
|
||||
for simple demo programs.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
super(BlockingExecutor, self).__init__(conf, listener, dispatcher)
|
||||
self._running = False
|
||||
|
||||
def start(self):
|
||||
self._running = True
|
||||
while self._running:
|
||||
try:
|
||||
incoming = self.listener.poll()
|
||||
if incoming is not None:
|
||||
with self.dispatcher(incoming) as callback:
|
||||
callback()
|
||||
except Exception:
|
||||
LOG.exception(_("Unexpected exception occurred."))
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
self.listener.stop()
|
||||
|
||||
def wait(self):
|
||||
pass
|
||||
_executor_cls = lambda __, ___: futurist.SynchronousExecutor()
|
||||
_thread_cls = FakeBlockingThread
|
||||
|
@ -14,50 +14,17 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import sys
|
||||
|
||||
import eventlet
|
||||
from eventlet.green import threading as greenthreading
|
||||
from eventlet import greenpool
|
||||
import greenlet
|
||||
from oslo_utils import excutils
|
||||
import futurist
|
||||
|
||||
from oslo_messaging._executors import base
|
||||
from oslo_messaging._executors import impl_pooledexecutor
|
||||
from oslo_messaging import localcontext
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def spawn_with(ctxt, pool):
|
||||
"""This is the equivalent of a with statement
|
||||
but with the content of the BLOCK statement executed
|
||||
into a greenthread
|
||||
|
||||
exception path grab from:
|
||||
http://www.python.org/dev/peps/pep-0343/
|
||||
"""
|
||||
|
||||
def complete(thread, exit):
|
||||
exc = True
|
||||
try:
|
||||
try:
|
||||
thread.wait()
|
||||
except Exception:
|
||||
exc = False
|
||||
if not exit(*sys.exc_info()):
|
||||
raise
|
||||
finally:
|
||||
if exc:
|
||||
exit(None, None, None)
|
||||
|
||||
callback = ctxt.__enter__()
|
||||
thread = pool.spawn(callback)
|
||||
thread.link(complete, ctxt.__exit__)
|
||||
|
||||
return thread
|
||||
|
||||
|
||||
class EventletExecutor(base.PooledExecutorBase):
|
||||
class EventletExecutor(impl_pooledexecutor.PooledExecutor):
|
||||
|
||||
"""A message executor which integrates with eventlet.
|
||||
|
||||
@ -70,10 +37,6 @@ class EventletExecutor(base.PooledExecutorBase):
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
super(EventletExecutor, self).__init__(conf, listener, dispatcher)
|
||||
self._thread = None
|
||||
self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size)
|
||||
self._running = False
|
||||
|
||||
if not isinstance(localcontext._STORE, greenthreading.local):
|
||||
LOG.debug('eventlet executor in use but the threading module '
|
||||
'has not been monkeypatched or has been '
|
||||
@ -82,39 +45,7 @@ class EventletExecutor(base.PooledExecutorBase):
|
||||
'behavior. In the future, we will raise a '
|
||||
'RuntimeException in this case.')
|
||||
|
||||
def _dispatch(self, incoming):
|
||||
spawn_with(ctxt=self.dispatcher(incoming), pool=self._greenpool)
|
||||
|
||||
def start(self):
|
||||
if self._thread is not None:
|
||||
return
|
||||
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _executor_thread():
|
||||
try:
|
||||
while self._running:
|
||||
incoming = self.listener.poll()
|
||||
if incoming is not None:
|
||||
self._dispatch(incoming)
|
||||
except greenlet.GreenletExit:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self._thread = eventlet.spawn(_executor_thread)
|
||||
|
||||
def stop(self):
|
||||
if self._thread is None:
|
||||
return
|
||||
self._running = False
|
||||
self.listener.stop()
|
||||
self._thread.cancel()
|
||||
|
||||
def wait(self):
|
||||
if self._thread is None:
|
||||
return
|
||||
self._greenpool.waitall()
|
||||
try:
|
||||
self._thread.wait()
|
||||
except greenlet.GreenletExit:
|
||||
pass
|
||||
self._thread = None
|
||||
_executor_cls = futurist.GreenThreadPoolExecutor
|
||||
_lock_cls = greenthreading.Lock
|
||||
_event_cls = greenthreading.Event
|
||||
_thread_cls = greenthreading.Thread
|
||||
|
112
oslo_messaging/_executors/impl_pooledexecutor.py
Normal file
112
oslo_messaging/_executors/impl_pooledexecutor.py
Normal file
@ -0,0 +1,112 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import threading
|
||||
|
||||
from concurrent import futures
|
||||
from oslo.config import cfg
|
||||
from oslo.utils import excutils
|
||||
|
||||
from oslo_messaging._executors import base
|
||||
|
||||
_pool_opts = [
|
||||
cfg.IntOpt('rpc_thread_pool_size',
|
||||
default=64,
|
||||
help='Size of RPC thread pool.'),
|
||||
]
|
||||
|
||||
|
||||
class PooledExecutor(base.ExecutorBase):
|
||||
"""A message executor which integrates with threads.
|
||||
|
||||
A message process that polls for messages from a dispatching thread and
|
||||
on reception of an incoming message places the message to be processed in
|
||||
a thread pool to be executed at a later time.
|
||||
"""
|
||||
|
||||
# NOTE(harlowja): if eventlet is being used and the thread module is monkey
|
||||
# patched this should/is supposed to work the same as the eventlet based
|
||||
# executor.
|
||||
|
||||
# NOTE(harlowja): Make it somewhat easy to change this via
|
||||
# inheritance (since there does exist other executor types that could be
|
||||
# used/tried here).
|
||||
_executor_cls = futures.ThreadPoolExecutor
|
||||
_event_cls = threading.Event
|
||||
_lock_cls = threading.Lock
|
||||
_thread_cls = threading.Thread
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
super(PooledExecutor, self).__init__(conf, listener, dispatcher)
|
||||
self.conf.register_opts(_pool_opts)
|
||||
self._poller = None
|
||||
self._executor = None
|
||||
self._tombstone = self._event_cls()
|
||||
self._incomplete = collections.deque()
|
||||
self._mutator = self._lock_cls()
|
||||
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _runner(self):
|
||||
while not self._tombstone.is_set():
|
||||
incoming = self.listener.poll()
|
||||
if incoming is None:
|
||||
continue
|
||||
callback = self.dispatcher(incoming, self._executor_callback)
|
||||
try:
|
||||
fut = self._executor.submit(callback.run)
|
||||
except RuntimeError:
|
||||
# This is triggered when the executor has been shutdown...
|
||||
#
|
||||
# TODO(harlowja): should we put whatever we pulled off back
|
||||
# since when this is thrown it means the executor has been
|
||||
# shutdown already??
|
||||
callback.done()
|
||||
return
|
||||
else:
|
||||
with self._mutator:
|
||||
self._incomplete.append(fut)
|
||||
# Run the other post processing of the callback when done...
|
||||
fut.add_done_callback(lambda f: callback.done())
|
||||
|
||||
def start(self):
|
||||
if self._executor is None:
|
||||
self._executor = self._executor_cls(self.conf.rpc_thread_pool_size)
|
||||
self._tombstone.clear()
|
||||
if self._poller is None or not self._poller.is_alive():
|
||||
self._poller = self._thread_cls(target=self._runner)
|
||||
self._poller.daemon = True
|
||||
self._poller.start()
|
||||
|
||||
def stop(self):
|
||||
if self._executor is not None:
|
||||
self._executor.shutdown(wait=False)
|
||||
self._tombstone.set()
|
||||
self.listener.stop()
|
||||
|
||||
def wait(self):
|
||||
# TODO(harlowja): this method really needs a timeout.
|
||||
if self._poller is not None:
|
||||
self._tombstone.wait()
|
||||
self._poller.join()
|
||||
self._poller = None
|
||||
if self._executor is not None:
|
||||
with self._mutator:
|
||||
incomplete_fs = list(self._incomplete)
|
||||
self._incomplete.clear()
|
||||
if incomplete_fs:
|
||||
futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED)
|
||||
self._executor = None
|
@ -14,118 +14,16 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import functools
|
||||
import sys
|
||||
import threading
|
||||
|
||||
from concurrent import futures
|
||||
from oslo_utils import excutils
|
||||
import six
|
||||
|
||||
from oslo_messaging._executors import base
|
||||
from oslo_messaging._executors import impl_pooledexecutor
|
||||
|
||||
|
||||
class ThreadExecutor(base.PooledExecutorBase):
|
||||
class ThreadExecutor(impl_pooledexecutor.PooledExecutor):
|
||||
"""A message executor which integrates with threads.
|
||||
|
||||
A message process that polls for messages from a dispatching thread and
|
||||
on reception of an incoming message places the message to be processed in
|
||||
a thread pool to be executed at a later time.
|
||||
"""
|
||||
|
||||
# NOTE(harlowja): if eventlet is being used and the thread module is monkey
|
||||
# patched this should/is supposed to work the same as the eventlet based
|
||||
# executor.
|
||||
|
||||
# NOTE(harlowja): Make it somewhat easy to change this via
|
||||
# inheritance (since there does exist other executor types that could be
|
||||
# used/tried here).
|
||||
_executor_cls = futures.ThreadPoolExecutor
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
super(ThreadExecutor, self).__init__(conf, listener, dispatcher)
|
||||
self._poller = None
|
||||
self._executor = None
|
||||
self._tombstone = threading.Event()
|
||||
self._incomplete = collections.deque()
|
||||
self._mutator = threading.Lock()
|
||||
|
||||
def _completer(self, exit_method, fut):
|
||||
"""Completes futures."""
|
||||
try:
|
||||
exc = fut.exception()
|
||||
if exc is not None:
|
||||
exc_type = type(exc)
|
||||
# Not available on < 3.x due to this being an added feature
|
||||
# of pep-3134 (exception chaining and embedded tracebacks).
|
||||
if six.PY3:
|
||||
exc_tb = exc.__traceback__
|
||||
else:
|
||||
exc_tb = None
|
||||
if not exit_method(exc_type, exc, exc_tb):
|
||||
six.reraise(exc_type, exc, tb=exc_tb)
|
||||
else:
|
||||
exit_method(None, None, None)
|
||||
finally:
|
||||
with self._mutator:
|
||||
try:
|
||||
self._incomplete.remove(fut)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _runner(self):
|
||||
while not self._tombstone.is_set():
|
||||
incoming = self.listener.poll()
|
||||
if incoming is None:
|
||||
continue
|
||||
# This is hacky, needs to be fixed....
|
||||
context = self.dispatcher(incoming)
|
||||
enter_method = context.__enter__()
|
||||
exit_method = context.__exit__
|
||||
try:
|
||||
fut = self._executor.submit(enter_method)
|
||||
except RuntimeError:
|
||||
# This is triggered when the executor has been shutdown...
|
||||
#
|
||||
# TODO(harlowja): should we put whatever we pulled off back
|
||||
# since when this is thrown it means the executor has been
|
||||
# shutdown already??
|
||||
exit_method(*sys.exc_info())
|
||||
return
|
||||
else:
|
||||
with self._mutator:
|
||||
self._incomplete.append(fut)
|
||||
# Run the other half (__exit__) when done...
|
||||
fut.add_done_callback(functools.partial(self._completer,
|
||||
exit_method))
|
||||
|
||||
def start(self):
|
||||
if self._executor is None:
|
||||
self._executor = self._executor_cls(self.conf.rpc_thread_pool_size)
|
||||
self._tombstone.clear()
|
||||
if self._poller is None or not self._poller.is_alive():
|
||||
self._poller = threading.Thread(target=self._runner)
|
||||
self._poller.daemon = True
|
||||
self._poller.start()
|
||||
|
||||
def stop(self):
|
||||
if self._executor is not None:
|
||||
self._executor.shutdown(wait=False)
|
||||
self._tombstone.set()
|
||||
self.listener.stop()
|
||||
|
||||
def wait(self):
|
||||
# TODO(harlowja): this method really needs a timeout.
|
||||
if self._poller is not None:
|
||||
self._tombstone.wait()
|
||||
self._poller.join()
|
||||
self._poller = None
|
||||
if self._executor is not None:
|
||||
with self._mutator:
|
||||
incomplete_fs = list(self._incomplete)
|
||||
self._incomplete.clear()
|
||||
if incomplete_fs:
|
||||
futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED)
|
||||
self._executor = None
|
||||
|
@ -13,6 +13,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def version_is_compatible(imp_version, version):
|
||||
"""Determine whether versions are compatible.
|
||||
@ -39,3 +43,54 @@ def version_is_compatible(imp_version, version):
|
||||
int(rev) > int(imp_rev)): # Revision
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class DispatcherExecutorContext(object):
|
||||
"""Dispatcher executor context helper
|
||||
|
||||
A dispatcher can have work to do before and after the dispatch of the
|
||||
request in the main server thread while the dispatcher itself can be
|
||||
done in its own thread.
|
||||
|
||||
The executor can use the helper like this:
|
||||
|
||||
callback = dispatcher(incoming)
|
||||
callback.prepare()
|
||||
thread = MyWhateverThread()
|
||||
thread.on_done(callback.done)
|
||||
thread.run(callback.run)
|
||||
|
||||
"""
|
||||
def __init__(self, incoming, dispatch, executor_callback=None,
|
||||
post=None):
|
||||
self._result = None
|
||||
self._incoming = incoming
|
||||
self._dispatch = dispatch
|
||||
self._post = post
|
||||
self._executor_callback = executor_callback
|
||||
|
||||
def run(self):
|
||||
"""The incoming message dispath itself
|
||||
|
||||
Can be run in an other thread/greenlet/corotine if the executor is
|
||||
able to do it.
|
||||
"""
|
||||
try:
|
||||
self._result = self._dispatch(self._incoming,
|
||||
self._executor_callback)
|
||||
except Exception:
|
||||
msg = 'The dispatcher method must catches all exceptions'
|
||||
LOG.exception(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
def done(self):
|
||||
"""Callback after the incoming message have been dispathed
|
||||
|
||||
Should be runned in the main executor thread/greenlet/corotine
|
||||
"""
|
||||
# FIXME(sileht): this is not currently true, this works only because
|
||||
# the driver connection used for polling write on the wire only to
|
||||
# ack/requeue message, but what if one day, the driver do something
|
||||
# else
|
||||
if self._post is not None:
|
||||
self._post(self._incoming, self._result)
|
||||
|
@ -14,11 +14,11 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import itertools
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from oslo_messaging import _utils as utils
|
||||
from oslo_messaging import localcontext
|
||||
from oslo_messaging import serializer as msg_serializer
|
||||
|
||||
@ -68,14 +68,15 @@ class NotificationDispatcher(object):
|
||||
return transport._listen_for_notifications(self._targets_priorities,
|
||||
pool=self.pool)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
result_wrapper = []
|
||||
return utils.DispatcherExecutorContext(
|
||||
incoming, self._dispatch_and_handle_error,
|
||||
executor_callback=executor_callback,
|
||||
post=self._post_dispatch)
|
||||
|
||||
yield lambda: result_wrapper.append(
|
||||
self._dispatch_and_handle_error(incoming, executor_callback))
|
||||
|
||||
if result_wrapper[0] == NotificationResult.HANDLED:
|
||||
@staticmethod
|
||||
def _post_dispatch(incoming, result):
|
||||
if result == NotificationResult.HANDLED:
|
||||
incoming.acknowledge()
|
||||
else:
|
||||
incoming.requeue()
|
||||
|
@ -29,7 +29,7 @@ from oslo_messaging._drivers import matchmaker
|
||||
from oslo_messaging._drivers import matchmaker_redis
|
||||
from oslo_messaging._drivers import matchmaker_ring
|
||||
from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts
|
||||
from oslo_messaging._executors import base
|
||||
from oslo_messaging._executors import impl_pooledexecutor
|
||||
from oslo_messaging.notify import notifier
|
||||
from oslo_messaging.rpc import client
|
||||
from oslo_messaging import transport
|
||||
@ -38,7 +38,7 @@ _global_opt_lists = [
|
||||
drivers_base.base_opts,
|
||||
impl_zmq.zmq_opts,
|
||||
matchmaker.matchmaker_opts,
|
||||
base._pool_opts,
|
||||
impl_pooledexecutor._pool_opts,
|
||||
notifier._notifier_opts,
|
||||
client._client_opts,
|
||||
transport._transport_opts,
|
||||
|
@ -24,7 +24,6 @@ __all__ = [
|
||||
'ExpectedException',
|
||||
]
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import sys
|
||||
|
||||
@ -130,10 +129,11 @@ class RPCDispatcher(object):
|
||||
result = func(ctxt, **new_args)
|
||||
return self.serializer.serialize_entity(ctxt, result)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
incoming.acknowledge()
|
||||
yield lambda: self._dispatch_and_reply(incoming, executor_callback)
|
||||
return utils.DispatcherExecutorContext(
|
||||
incoming, self._dispatch_and_reply,
|
||||
executor_callback=executor_callback)
|
||||
|
||||
def _dispatch_and_reply(self, incoming, executor_callback):
|
||||
try:
|
||||
|
@ -14,7 +14,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import threading
|
||||
|
||||
# eventlet 0.16 with monkey patching does not work yet on Python 3,
|
||||
@ -28,7 +27,6 @@ try:
|
||||
except ImportError:
|
||||
eventlet = None
|
||||
import testscenarios
|
||||
import testtools
|
||||
try:
|
||||
import trollius
|
||||
except ImportError:
|
||||
@ -45,6 +43,7 @@ try:
|
||||
except ImportError:
|
||||
impl_eventlet = None
|
||||
from oslo_messaging._executors import impl_thread
|
||||
from oslo_messaging import _utils as utils
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
from six.moves import mock
|
||||
|
||||
@ -106,7 +105,6 @@ class TestExecutor(test_utils.BaseTestCase):
|
||||
|
||||
@trollius.coroutine
|
||||
def simple_coroutine(value):
|
||||
yield None
|
||||
raise trollius.Return(value)
|
||||
|
||||
endpoint = mock.MagicMock(return_value=simple_coroutine('result'))
|
||||
@ -123,30 +121,29 @@ class TestExecutor(test_utils.BaseTestCase):
|
||||
self.endpoint = endpoint
|
||||
self.result = "not set"
|
||||
|
||||
@contextlib.contextmanager
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
if executor_callback is not None:
|
||||
def callback():
|
||||
result = executor_callback(self.endpoint,
|
||||
incoming.ctxt,
|
||||
incoming.message)
|
||||
self.result = result
|
||||
return result
|
||||
yield callback
|
||||
event.send()
|
||||
def callback(self, incoming, executor_callback):
|
||||
if executor_callback is None:
|
||||
result = self.endpoint(incoming.ctxt,
|
||||
incoming.message)
|
||||
else:
|
||||
def callback():
|
||||
result = self.endpoint(incoming.ctxt, incoming.message)
|
||||
self.result = result
|
||||
return result
|
||||
yield callback
|
||||
result = executor_callback(self.endpoint,
|
||||
incoming.ctxt,
|
||||
incoming.message)
|
||||
if is_aioeventlet:
|
||||
event.send()
|
||||
self.result = result
|
||||
return result
|
||||
|
||||
listener = mock.Mock(spec=['poll'])
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
return utils.DispatcherExecutorContext(incoming,
|
||||
self.callback,
|
||||
executor_callback)
|
||||
|
||||
listener = mock.Mock(spec=['poll', 'stop'])
|
||||
dispatcher = Dispatcher(endpoint)
|
||||
executor = self.executor(self.conf, listener, dispatcher)
|
||||
|
||||
incoming_message = mock.MagicMock(ctxt={},
|
||||
message={'payload': 'data'})
|
||||
incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
|
||||
|
||||
def fake_poll(timeout=None):
|
||||
if is_aioeventlet:
|
||||
@ -167,60 +164,3 @@ class TestExecutor(test_utils.BaseTestCase):
|
||||
self.assertEqual(dispatcher.result, 'result')
|
||||
|
||||
TestExecutor.generate_scenarios()
|
||||
|
||||
|
||||
class ExceptedException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class EventletContextManagerSpawnTest(test_utils.BaseTestCase):
|
||||
@testtools.skipIf(impl_eventlet is None, "Eventlet not available")
|
||||
def setUp(self):
|
||||
super(EventletContextManagerSpawnTest, self).setUp()
|
||||
self.before = mock.Mock()
|
||||
self.callback = mock.Mock()
|
||||
self.after = mock.Mock()
|
||||
self.exception_call = mock.Mock()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def context_mgr():
|
||||
self.before()
|
||||
try:
|
||||
yield lambda: self.callback()
|
||||
except ExceptedException:
|
||||
self.exception_call()
|
||||
self.after()
|
||||
|
||||
self.mgr = context_mgr()
|
||||
|
||||
def test_normal_run(self):
|
||||
thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet)
|
||||
thread.wait()
|
||||
self.assertEqual(1, self.before.call_count)
|
||||
self.assertEqual(1, self.callback.call_count)
|
||||
self.assertEqual(1, self.after.call_count)
|
||||
self.assertEqual(0, self.exception_call.call_count)
|
||||
|
||||
def test_excepted_exception(self):
|
||||
self.callback.side_effect = ExceptedException
|
||||
thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet)
|
||||
try:
|
||||
thread.wait()
|
||||
except ExceptedException:
|
||||
pass
|
||||
self.assertEqual(1, self.before.call_count)
|
||||
self.assertEqual(1, self.callback.call_count)
|
||||
self.assertEqual(1, self.after.call_count)
|
||||
self.assertEqual(1, self.exception_call.call_count)
|
||||
|
||||
def test_unexcepted_exception(self):
|
||||
self.callback.side_effect = Exception
|
||||
thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet)
|
||||
try:
|
||||
thread.wait()
|
||||
except Exception:
|
||||
pass
|
||||
self.assertEqual(1, self.before.call_count)
|
||||
self.assertEqual(1, self.callback.call_count)
|
||||
self.assertEqual(0, self.after.call_count)
|
||||
self.assertEqual(0, self.exception_call.call_count)
|
||||
|
@ -107,8 +107,9 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
sorted(dispatcher._targets_priorities))
|
||||
|
||||
incoming = mock.Mock(ctxt={}, message=msg)
|
||||
with dispatcher(incoming) as callback:
|
||||
callback()
|
||||
callback = dispatcher(incoming)
|
||||
callback.run()
|
||||
callback.done()
|
||||
|
||||
# check endpoint callbacks are called or not
|
||||
for i, endpoint_methods in enumerate(self.endpoints):
|
||||
@ -143,8 +144,9 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
msg['priority'] = 'what???'
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||
[mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None)
|
||||
with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback:
|
||||
callback()
|
||||
callback = dispatcher(mock.Mock(ctxt={}, message=msg))
|
||||
callback.run()
|
||||
callback.done()
|
||||
mylog.warning.assert_called_once_with('Unknown priority "%s"',
|
||||
'what???')
|
||||
|
||||
@ -244,8 +246,9 @@ class TestDispatcherFilter(test_utils.BaseTestCase):
|
||||
'timestamp': '2014-03-03 18:21:04.369234',
|
||||
'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'}
|
||||
incoming = mock.Mock(ctxt=self.context, message=message)
|
||||
with dispatcher(incoming) as callback:
|
||||
callback()
|
||||
callback = dispatcher(incoming)
|
||||
callback.run()
|
||||
callback.done()
|
||||
|
||||
if self.match:
|
||||
self.assertEqual(1, endpoint.info.call_count)
|
||||
|
@ -133,8 +133,9 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
incoming = mock.Mock(ctxt=self.ctxt, message=self.msg)
|
||||
incoming.reply.side_effect = check_reply
|
||||
|
||||
with dispatcher(incoming) as callback:
|
||||
callback()
|
||||
callback = dispatcher(incoming)
|
||||
callback.run()
|
||||
callback.done()
|
||||
|
||||
for n, endpoint in enumerate(endpoints):
|
||||
for method_name in ['foo', 'bar']:
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
pbr<2.0,>=0.11
|
||||
|
||||
futurist>=0.1.1 # Apache-2.0
|
||||
oslo.config>=1.11.0 # Apache-2.0
|
||||
oslo.context>=0.2.0 # Apache-2.0
|
||||
oslo.utils>=1.6.0 # Apache-2.0
|
||||
|
@ -107,8 +107,9 @@ class TestDispatcherScenario(test_utils.BaseTestCase):
|
||||
sorted(dispatcher._targets_priorities))
|
||||
|
||||
incoming = mock.Mock(ctxt={}, message=msg)
|
||||
with dispatcher(incoming) as callback:
|
||||
callback()
|
||||
callback = dispatcher(incoming)
|
||||
callback.run()
|
||||
callback.done()
|
||||
|
||||
# check endpoint callbacks are called or not
|
||||
for i, endpoint_methods in enumerate(self.endpoints):
|
||||
@ -146,8 +147,9 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
msg['priority'] = 'what???'
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||
[mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None)
|
||||
with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback:
|
||||
callback()
|
||||
callback = dispatcher(mock.Mock(ctxt={}, message=msg))
|
||||
callback.run()
|
||||
callback.done()
|
||||
mylog.warning.assert_called_once_with('Unknown priority "%s"',
|
||||
'what???')
|
||||
|
||||
@ -165,7 +167,8 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
|
||||
incoming = mock.Mock(ctxt={}, message=msg)
|
||||
executor_callback = mock.Mock()
|
||||
with dispatcher(incoming, executor_callback) as callback:
|
||||
callback()
|
||||
callback = dispatcher(incoming, executor_callback)
|
||||
callback.run()
|
||||
callback.done()
|
||||
self.assertTrue(executor_callback.called)
|
||||
self.assertEqual(executor_callback.call_args[0][0], endpoint_method)
|
||||
|
@ -120,8 +120,9 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
incoming = mock.Mock(ctxt=self.ctxt, message=self.msg)
|
||||
incoming.reply.side_effect = check_reply
|
||||
|
||||
with dispatcher(incoming) as callback:
|
||||
callback()
|
||||
callback = dispatcher(incoming)
|
||||
callback.run()
|
||||
callback.done()
|
||||
|
||||
for n, endpoint in enumerate(endpoints):
|
||||
for method_name in ['foo', 'bar']:
|
||||
|
Loading…
Reference in New Issue
Block a user