diff --git a/oslo_messaging/_executors/impl_aioeventlet.py b/oslo_messaging/_executors/impl_aioeventlet.py new file mode 100644 index 000000000..d0fc4aa31 --- /dev/null +++ b/oslo_messaging/_executors/impl_aioeventlet.py @@ -0,0 +1,75 @@ +# Copyright 2014 eNovance. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import aioeventlet +import trollius + +from oslo_messaging._executors import impl_eventlet + + +class AsyncioEventletExecutor(impl_eventlet.EventletExecutor): + + """A message executor which integrates with eventlet and trollius. + + The executor is based on eventlet executor and so is compatible with it. + The executor supports trollius coroutines, explicit asynchronous + programming, in addition to eventlet greenthreads, implicit asynchronous + programming. + + To use the executor, an aioeventlet event loop must the running in the + thread executing the executor (usually the main thread). Example of code to + setup and run an aioeventlet event loop for the executor (in the main + thread): + + import aioeventlet + import trollius + + policy = aioeventlet.EventLoopPolicy() + trollius.set_event_loop_policy(policy) + + def run_loop(loop): + loop.run_forever() + loop.close() + + # Get the aioeventlet event loop (create it if needed) + loop = trollius.get_event_loop() + + # run the event loop in a new greenthread, + # close it when it is done + eventlet.spawn(run_loop, loop) + """ + + def __init__(self, conf, listener, dispatcher): + super(AsyncioEventletExecutor, self).__init__(conf, listener, + dispatcher) + self._loop = None + + def start(self): + # check that the event loop is an aioeventlet event loop + loop = trollius.get_event_loop() + if not isinstance(loop, aioeventlet.EventLoop): + raise RuntimeError("need an aioeventlet event loop") + self._loop = loop + + super(AsyncioEventletExecutor, self).start() + + def _coroutine_wrapper(self, func, *args, **kw): + result = func(*args, **kw) + if trollius.iscoroutine(result): + result = aioeventlet.yield_future(result, loop=self._loop) + return result + + def _dispatch(self, incoming): + ctx = self.dispatcher(incoming, self._coroutine_wrapper) + impl_eventlet.spawn_with(ctxt=ctx, pool=self._greenpool) diff --git a/oslo_messaging/_executors/impl_eventlet.py b/oslo_messaging/_executors/impl_eventlet.py index 555ae0364..3333fe713 100644 --- a/oslo_messaging/_executors/impl_eventlet.py +++ b/oslo_messaging/_executors/impl_eventlet.py @@ -82,6 +82,9 @@ class EventletExecutor(base.PooledExecutorBase): 'behavior. In the future, we will raise a ' 'RuntimeException in this case.') + def _dispatch(self, incoming): + spawn_with(ctxt=self.dispatcher(incoming), pool=self._greenpool) + def start(self): if self._thread is not None: return @@ -92,8 +95,7 @@ class EventletExecutor(base.PooledExecutorBase): while self._running: incoming = self.listener.poll() if incoming is not None: - spawn_with(ctxt=self.dispatcher(incoming), - pool=self._greenpool) + self._dispatch(incoming) except greenlet.GreenletExit: return diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py index cb321ddef..4e3ec517f 100644 --- a/oslo_messaging/tests/executors/test_executor.py +++ b/oslo_messaging/tests/executors/test_executor.py @@ -17,6 +17,12 @@ import contextlib import threading +# eventlet 0.16 with monkey patching does not work yet on Python 3, +# so make aioeventlet, eventlet and trollius import optional +try: + import aioeventlet +except ImportError: + aioeventlet = None try: import eventlet except ImportError: @@ -24,7 +30,16 @@ except ImportError: import mock import testscenarios import testtools +try: + import trollius +except ImportError: + pass + +try: + from oslo_messaging._executors import impl_aioeventlet +except ImportError: + impl_aioeventlet = None from oslo_messaging._executors import impl_blocking try: from oslo_messaging._executors import impl_eventlet @@ -46,42 +61,110 @@ class TestExecutor(test_utils.BaseTestCase): if impl_eventlet is not None: impl.append( ('eventlet', dict(executor=impl_eventlet.EventletExecutor))) + if impl_aioeventlet is not None: + impl.append( + ('aioeventlet', + dict(executor=impl_aioeventlet.AsyncioEventletExecutor))) cls.scenarios = testscenarios.multiply_scenarios(impl) @staticmethod - def _run_in_thread(executor): - def thread(): - executor.start() - executor.wait() - thread = threading.Thread(target=thread) + def _run_in_thread(target, executor): + thread = threading.Thread(target=target, args=(executor,)) thread.daemon = True thread.start() thread.join(timeout=30) def test_executor_dispatch(self): - callback = mock.MagicMock(return_value='result') + if impl_aioeventlet is not None: + aioeventlet_class = impl_aioeventlet.AsyncioEventletExecutor + else: + aioeventlet_class = None + is_aioeventlet = (self.executor == aioeventlet_class) + + if is_aioeventlet: + policy = aioeventlet.EventLoopPolicy() + trollius.set_event_loop_policy(policy) + self.addCleanup(trollius.set_event_loop_policy, None) + + def run_loop(loop): + loop.run_forever() + loop.close() + trollius.set_event_loop(None) + + def run_executor(executor): + # create an event loop in the executor thread + loop = trollius.new_event_loop() + trollius.set_event_loop(loop) + eventlet.spawn(run_loop, loop) + + # run the executor + executor.start() + executor.wait() + + # stop the event loop: run_loop() will close it + loop.stop() + + @trollius.coroutine + def simple_coroutine(value): + yield None + raise trollius.Return(value) + + endpoint = mock.MagicMock(return_value=simple_coroutine('result')) + event = eventlet.event.Event() + else: + def run_executor(executor): + executor.start() + executor.wait() + + endpoint = mock.MagicMock(return_value='result') class Dispatcher(object): + def __init__(self, endpoint): + self.endpoint = endpoint + self.result = "not set" + @contextlib.contextmanager - def __call__(self, incoming): - yield lambda: callback(incoming.ctxt, incoming.message) + def __call__(self, incoming, executor_callback=None): + if executor_callback is not None: + def callback(): + result = executor_callback(self.endpoint, + incoming.ctxt, + incoming.message) + self.result = result + return result + yield callback + event.send() + else: + def callback(): + result = self.endpoint(incoming.ctxt, incoming.message) + self.result = result + return result + yield callback listener = mock.Mock(spec=['poll']) - executor = self.executor(self.conf, listener, Dispatcher()) + dispatcher = Dispatcher(endpoint) + executor = self.executor(self.conf, listener, dispatcher) incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'}) def fake_poll(timeout=None): - if listener.poll.call_count == 1: - return incoming_message - executor.stop() + if is_aioeventlet: + if listener.poll.call_count == 1: + return incoming_message + event.wait() + executor.stop() + else: + if listener.poll.call_count == 1: + return incoming_message + executor.stop() listener.poll.side_effect = fake_poll - self._run_in_thread(executor) + self._run_in_thread(run_executor, executor) - callback.assert_called_once_with({}, {'payload': 'data'}) + endpoint.assert_called_once_with({}, {'payload': 'data'}) + self.assertEqual(dispatcher.result, 'result') TestExecutor.generate_scenarios() diff --git a/requirements.txt b/requirements.txt index 352b14aa8..e6747b09e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,3 +29,7 @@ oslo.middleware>=0.3.0 # Apache-2.0 # for the futures based executor futures>=2.1.6 + +# needed by the aioeventlet executor +aioeventlet>=0.4 +trollius>=1.0 diff --git a/setup.cfg b/setup.cfg index 3f94cb942..0e191c441 100644 --- a/setup.cfg +++ b/setup.cfg @@ -42,6 +42,7 @@ oslo.messaging.drivers = fake = oslo_messaging._drivers.impl_fake:FakeDriver oslo.messaging.executors = + aioeventlet = oslo_messaging._executors.impl_aioeventlet:AsyncioEventletExecutor blocking = oslo_messaging._executors.impl_blocking:BlockingExecutor eventlet = oslo_messaging._executors.impl_eventlet:EventletExecutor threading = oslo_messaging._executors.impl_thread:ThreadExecutor