Merge "Add a thread + futures executor based executor"
This commit is contained in:
commit
39e4a0bcc4
15
doc/source/executors.rst
Normal file
15
doc/source/executors.rst
Normal file
@ -0,0 +1,15 @@
|
||||
---------
|
||||
Executors
|
||||
---------
|
||||
|
||||
.. automodule:: oslo.messaging._executors
|
||||
|
||||
.. currentmodule:: oslo.messaging
|
||||
|
||||
==============
|
||||
Executor types
|
||||
==============
|
||||
|
||||
Executors are providing the way an incoming message will be dispatched so that
|
||||
the message can be used for meaningful work. Different types of executors are
|
||||
supported, each with its own set of restrictions and capabilities.
|
@ -11,6 +11,7 @@ Contents
|
||||
:maxdepth: 1
|
||||
|
||||
transport
|
||||
executors
|
||||
target
|
||||
server
|
||||
rpcclient
|
||||
|
@ -23,11 +23,11 @@ import sys
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo.messaging._drivers import impl_zmq
|
||||
from oslo.messaging._executors import impl_eventlet # FIXME(markmc)
|
||||
from oslo.messaging._executors import base # FIXME(markmc)
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(impl_zmq.zmq_opts)
|
||||
CONF.register_opts(impl_eventlet._eventlet_opts)
|
||||
CONF.register_opts(base._pool_opts)
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -31,7 +31,7 @@ from six import moves
|
||||
from oslo.config import cfg
|
||||
from oslo.messaging._drivers import base
|
||||
from oslo.messaging._drivers import common as rpc_common
|
||||
from oslo.messaging._executors import impl_eventlet # FIXME(markmc)
|
||||
from oslo.messaging._executors import base as executor_base # FIXME(markmc)
|
||||
from oslo.messaging._i18n import _, _LE
|
||||
from oslo.serialization import jsonutils
|
||||
from oslo.utils import excutils
|
||||
@ -857,7 +857,7 @@ class ZmqDriver(base.BaseDriver):
|
||||
if not zmq:
|
||||
raise ImportError("Failed to import eventlet.green.zmq")
|
||||
conf.register_opts(zmq_opts)
|
||||
conf.register_opts(impl_eventlet._eventlet_opts)
|
||||
conf.register_opts(executor_base._pool_opts)
|
||||
|
||||
super(ZmqDriver, self).__init__(conf, url, default_exchange,
|
||||
allowed_remote_exmods)
|
||||
|
@ -16,6 +16,14 @@ import abc
|
||||
|
||||
import six
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
_pool_opts = [
|
||||
cfg.IntOpt('rpc_thread_pool_size',
|
||||
default=64,
|
||||
help='Size of RPC thread pool.'),
|
||||
]
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ExecutorBase(object):
|
||||
@ -36,3 +44,11 @@ class ExecutorBase(object):
|
||||
@abc.abstractmethod
|
||||
def wait(self):
|
||||
"Wait until the executor has stopped polling."
|
||||
|
||||
|
||||
class PooledExecutorBase(ExecutorBase):
|
||||
"""An executor that uses a rpc thread pool of a given size."""
|
||||
|
||||
def __init__(self, conf, listener, callback):
|
||||
super(PooledExecutorBase, self).__init__(conf, listener, callback)
|
||||
self.conf.register_opts(_pool_opts)
|
||||
|
@ -21,19 +21,12 @@ from eventlet.green import threading as greenthreading
|
||||
from eventlet import greenpool
|
||||
import greenlet
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo.messaging._executors import base
|
||||
from oslo.messaging import localcontext
|
||||
from oslo.utils import excutils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_eventlet_opts = [
|
||||
cfg.IntOpt('rpc_thread_pool_size',
|
||||
default=64,
|
||||
help='Size of RPC greenthread pool.'),
|
||||
]
|
||||
|
||||
|
||||
def spawn_with(ctxt, pool):
|
||||
"""This is the equivalent of a with statement
|
||||
@ -64,7 +57,7 @@ def spawn_with(ctxt, pool):
|
||||
return thread
|
||||
|
||||
|
||||
class EventletExecutor(base.ExecutorBase):
|
||||
class EventletExecutor(base.PooledExecutorBase):
|
||||
|
||||
"""A message executor which integrates with eventlet.
|
||||
|
||||
@ -77,7 +70,6 @@ class EventletExecutor(base.ExecutorBase):
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
super(EventletExecutor, self).__init__(conf, listener, dispatcher)
|
||||
self.conf.register_opts(_eventlet_opts)
|
||||
self._thread = None
|
||||
self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size)
|
||||
self._running = False
|
||||
|
131
oslo/messaging/_executors/impl_thread.py
Normal file
131
oslo/messaging/_executors/impl_thread.py
Normal file
@ -0,0 +1,131 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import functools
|
||||
import sys
|
||||
import threading
|
||||
|
||||
from concurrent import futures
|
||||
import six
|
||||
|
||||
from oslo.messaging._executors import base
|
||||
from oslo.utils import excutils
|
||||
|
||||
|
||||
class ThreadExecutor(base.PooledExecutorBase):
|
||||
"""A message executor which integrates with threads.
|
||||
|
||||
A message process that polls for messages from a dispatching thread and
|
||||
on reception of an incoming message places the message to be processed in
|
||||
a thread pool to be executed at a later time.
|
||||
"""
|
||||
|
||||
# NOTE(harlowja): if eventlet is being used and the thread module is monkey
|
||||
# patched this should/is supposed to work the same as the eventlet based
|
||||
# executor.
|
||||
|
||||
# NOTE(harlowja): Make it somewhat easy to change this via
|
||||
# inheritance (since there does exist other executor types that could be
|
||||
# used/tried here).
|
||||
_executor_cls = futures.ThreadPoolExecutor
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
super(ThreadExecutor, self).__init__(conf, listener, dispatcher)
|
||||
self._poller = None
|
||||
self._executor = None
|
||||
self._tombstone = threading.Event()
|
||||
self._incomplete = collections.deque()
|
||||
self._mutator = threading.Lock()
|
||||
|
||||
def _completer(self, exit_method, fut):
|
||||
"""Completes futures."""
|
||||
try:
|
||||
exc = fut.exception()
|
||||
if exc is not None:
|
||||
exc_type = type(exc)
|
||||
# Not available on < 3.x due to this being an added feature
|
||||
# of pep-3134 (exception chaining and embedded tracebacks).
|
||||
if six.PY3:
|
||||
exc_tb = exc.__traceback__
|
||||
else:
|
||||
exc_tb = None
|
||||
if not exit_method(exc_type, exc, exc_tb):
|
||||
six.reraise(exc_type, exc, tb=exc_tb)
|
||||
else:
|
||||
exit_method(None, None, None)
|
||||
finally:
|
||||
with self._mutator:
|
||||
try:
|
||||
self._incomplete.remove(fut)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _runner(self):
|
||||
while not self._tombstone.is_set():
|
||||
incoming = self.listener.poll()
|
||||
if incoming is None:
|
||||
continue
|
||||
# This is hacky, needs to be fixed....
|
||||
context = self.dispatcher(incoming)
|
||||
enter_method = context.__enter__()
|
||||
exit_method = context.__exit__
|
||||
try:
|
||||
fut = self._executor.submit(enter_method)
|
||||
except RuntimeError:
|
||||
# This is triggered when the executor has been shutdown...
|
||||
#
|
||||
# TODO(harlowja): should we put whatever we pulled off back
|
||||
# since when this is thrown it means the executor has been
|
||||
# shutdown already??
|
||||
exit_method(*sys.exc_info())
|
||||
return
|
||||
else:
|
||||
with self._mutator:
|
||||
self._incomplete.append(fut)
|
||||
# Run the other half (__exit__) when done...
|
||||
fut.add_done_callback(functools.partial(self._completer,
|
||||
exit_method))
|
||||
|
||||
def start(self):
|
||||
if self._executor is None:
|
||||
self._executor = self._executor_cls(self.conf.rpc_thread_pool_size)
|
||||
self._tombstone.clear()
|
||||
if self._poller is None or not self._poller.is_alive():
|
||||
self._poller = threading.Thread(target=self._runner)
|
||||
self._poller.daemon = True
|
||||
self._poller.start()
|
||||
|
||||
def stop(self):
|
||||
if self._executor is not None:
|
||||
self._executor.shutdown(wait=False)
|
||||
self._tombstone.set()
|
||||
self.listener.stop()
|
||||
|
||||
def wait(self):
|
||||
# TODO(harlowja): this method really needs a timeout.
|
||||
if self._poller is not None:
|
||||
self._tombstone.wait()
|
||||
self._poller.join()
|
||||
self._poller = None
|
||||
if self._executor is not None:
|
||||
with self._mutator:
|
||||
incomplete_fs = list(self._incomplete)
|
||||
self._incomplete.clear()
|
||||
if incomplete_fs:
|
||||
futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED)
|
||||
self._executor = None
|
@ -28,7 +28,7 @@ from oslo.messaging._drivers import matchmaker
|
||||
from oslo.messaging._drivers import matchmaker_redis
|
||||
from oslo.messaging._drivers import matchmaker_ring
|
||||
from oslo.messaging._drivers.protocols.amqp import opts as amqp_opts
|
||||
from oslo.messaging._executors import impl_eventlet
|
||||
from oslo.messaging._executors import base
|
||||
from oslo.messaging.notify import notifier
|
||||
from oslo.messaging.rpc import client
|
||||
from oslo.messaging import transport
|
||||
@ -39,7 +39,7 @@ _global_opt_lists = [
|
||||
impl_rabbit.rabbit_opts,
|
||||
impl_zmq.zmq_opts,
|
||||
matchmaker.matchmaker_opts,
|
||||
impl_eventlet._eventlet_opts,
|
||||
base._pool_opts,
|
||||
notifier._notifier_opts,
|
||||
client._client_opts,
|
||||
transport._transport_opts,
|
||||
|
@ -26,3 +26,6 @@ kombu>=2.5.0
|
||||
|
||||
# middleware
|
||||
oslo.middleware>=0.1.0 # Apache-2.0
|
||||
|
||||
# for the futures based executor
|
||||
futures>=2.1.6
|
||||
|
@ -30,16 +30,19 @@ try:
|
||||
from oslo.messaging._executors import impl_eventlet
|
||||
except ImportError:
|
||||
impl_eventlet = None
|
||||
from oslo.messaging._executors import impl_thread
|
||||
from tests import utils as test_utils
|
||||
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
|
||||
class TestExecutor(test_utils.BaseTestCase):
|
||||
|
||||
@classmethod
|
||||
def generate_scenarios(cls):
|
||||
impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor))]
|
||||
impl = [
|
||||
('blocking', dict(executor=impl_blocking.BlockingExecutor)),
|
||||
('threaded', dict(executor=impl_thread.ThreadExecutor)),
|
||||
]
|
||||
if impl_eventlet is not None:
|
||||
impl.append(
|
||||
('eventlet', dict(executor=impl_eventlet.EventletExecutor)))
|
||||
|
Loading…
x
Reference in New Issue
Block a user