Move server's logic from executors

Now we have situation when openstack projects like Mistral needs
extra oslo.messaging functionality.

 But it is too complicated now to to implement something new and
 integrate it with current code because there is a little bit mess.
 1) Executor should be responsible for how to run jobs
      (but now also has code with server logic)
 2) Dispatcher should be responsible for routing message to the
     target endpoint for processing (but it also has serialisation, sending replies,
     executing some executor's callbacks etc)
 3) Server should do all server specific logic, we need to have different
      implementation of servers for RPC and notification, not different implementations
      of dispatchers

 This patch fixes 1-st point

Change-Id: Ib6408f408889bb7b7056722be636a5547b1a780d
This commit is contained in:
dukhlov 2016-02-20 11:31:15 -05:00 committed by Dmitriy Ukhlov
parent 92c4f76c79
commit 1482687ff7
18 changed files with 190 additions and 623 deletions

View File

@ -20,11 +20,11 @@ from oslo_config import cfg
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver.broker import zmq_broker
from oslo_messaging._executors import impl_pooledexecutor
from oslo_messaging import server
CONF = cfg.CONF
CONF.register_opts(impl_zmq.zmq_opts)
CONF.register_opts(impl_pooledexecutor._pool_opts)
CONF.register_opts(server._pool_opts)
CONF.rpc_zmq_native = True

View File

@ -24,8 +24,8 @@ from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client import zmq_client
from oslo_messaging._drivers.zmq_driver.server import zmq_server
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._executors import impl_pooledexecutor
from oslo_messaging._i18n import _LE
from oslo_messaging import server
RPCException = rpc_common.RPCException
@ -160,7 +160,7 @@ class ZmqDriver(base.BaseDriver):
raise ImportError(_LE("ZeroMQ is not available!"))
conf.register_opts(zmq_opts)
conf.register_opts(impl_pooledexecutor._pool_opts)
conf.register_opts(server._pool_opts)
conf.register_opts(base.base_opts)
self.conf = conf
self.allowed_remote_exmods = allowed_remote_exmods

View File

@ -20,6 +20,7 @@ messaging protocol. The driver sends messages and creates subscriptions via
'tasks' that are performed on its behalf via the controller module.
"""
import collections
import logging
import os
import threading
@ -27,7 +28,7 @@ import time
from oslo_serialization import jsonutils
from oslo_utils import importutils
from six import moves
from oslo_utils import timeutils
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common
@ -114,18 +115,49 @@ class ProtonIncomingMessage(base.RpcIncomingMessage):
pass
class Queue(object):
def __init__(self):
self._queue = collections.deque()
self._lock = threading.Lock()
self._pop_wake_condition = threading.Condition(self._lock)
self._started = True
def put(self, item):
with self._lock:
self._queue.appendleft(item)
self._pop_wake_condition.notify()
def pop(self, timeout):
with timeutils.StopWatch(timeout) as stop_watcher:
with self._lock:
while len(self._queue) == 0:
if stop_watcher.expired() or not self._started:
return None
self._pop_wake_condition.wait(
stop_watcher.leftover(return_none=True)
)
return self._queue.pop()
def stop(self):
with self._lock:
self._started = False
self._pop_wake_condition.notify_all()
class ProtonListener(base.Listener):
def __init__(self, driver):
super(ProtonListener, self).__init__(driver.prefetch_size)
self.driver = driver
self.incoming = moves.queue.Queue()
self.incoming = Queue()
def stop(self):
self.incoming.stop()
@base.batch_poll_helper
def poll(self, timeout=None):
try:
message = self.incoming.get(True, timeout)
except moves.queue.Empty:
return
message = self.incoming.pop(timeout)
if message is None:
return None
request, ctxt = unmarshal_request(message)
LOG.debug("Returning incoming message")
return ProtonIncomingMessage(self, ctxt, request, message)

View File

@ -1,44 +0,0 @@
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class ExecutorBase(object):
def __init__(self, conf, listener, dispatcher):
self.conf = conf
self.listener = listener
self.dispatcher = dispatcher
@abc.abstractmethod
def start(self, override_pool_size=None):
"""Start polling for incoming messages."""
@abc.abstractmethod
def stop(self):
"""Stop polling for messages."""
@abc.abstractmethod
def wait(self, timeout=None):
"""Wait until the executor has stopped polling.
If a timeout is provided, and it is not ``None`` then this method will
wait up to that amount of time for its components to finish, if not
all components finish in the alloted time, then false will be returned
otherwise true will be returned.
"""

View File

@ -1,102 +0,0 @@
# Copyright 2013 Red Hat, Inc.
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import futurist
import threading
from oslo_messaging._executors import impl_pooledexecutor
from oslo_utils import timeutils
class FakeBlockingThread(object):
'''A minimal implementation of threading.Thread which does not create a
thread or start executing the target when start() is called. Instead, the
caller must explicitly execute the non-blocking thread.execute() method
after start() has been called.
'''
def __init__(self, target):
self._target = target
self._running = False
self._running_cond = threading.Condition()
def start(self):
if self._running:
# Not a user error. No need to translate.
raise RuntimeError('FakeBlockingThread already started')
with self._running_cond:
self._running = True
self._running_cond.notify_all()
def join(self, timeout=None):
with timeutils.StopWatch(duration=timeout) as w, self._running_cond:
while self._running:
self._running_cond.wait(w.leftover(return_none=True))
# Thread.join() does not raise an exception on timeout. It is
# the caller's responsibility to check is_alive().
if w.expired():
return
def is_alive(self):
return self._running
def execute(self):
if not self._running:
# Not a user error. No need to translate.
raise RuntimeError('FakeBlockingThread not started')
try:
self._target()
finally:
with self._running_cond:
self._running = False
self._running_cond.notify_all()
class BlockingExecutor(impl_pooledexecutor.PooledExecutor):
"""A message executor which blocks the current thread.
The blocking executor's start() method functions as a request processing
loop - i.e. it blocks, processes messages and only returns when stop() is
called from a dispatched method.
Method calls are dispatched in the current thread, so only a single method
call can be executing at once. This executor is likely to only be useful
for simple demo programs.
"""
_executor_cls = lambda __, ___: futurist.SynchronousExecutor()
_thread_cls = FakeBlockingThread
def __init__(self, *args, **kwargs):
super(BlockingExecutor, self).__init__(*args, **kwargs)
def execute(self):
'''Explicitly run the executor in the current context.'''
# NOTE(mdbooth): Splitting start into start and execute for the
# blocking executor closes a potential race. On a non-blocking
# executor, calling start performs some initialisation synchronously
# before starting the executor and returning control to the caller. In
# the non-blocking caller there was no externally visible boundary
# between the completion of initialisation and the start of execution,
# meaning the caller cannot indicate to another thread that
# initialisation is complete. With the split, the start call for the
# blocking executor becomes analogous to the non-blocking case,
# indicating that initialisation is complete. The caller can then
# synchronously call execute.
if self._poller is not None:
self._poller.execute()

View File

@ -1,43 +0,0 @@
# Copyright 2013 Red Hat, Inc.
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from eventlet.green import threading as greenthreading
import futurist
from oslo_messaging._executors import impl_pooledexecutor
from oslo_utils import eventletutils
class EventletExecutor(impl_pooledexecutor.PooledExecutor):
"""A message executor which integrates with eventlet.
This is an executor which polls for incoming messages from a greenthread
and dispatches each message in its own greenthread powered async
executor.
The stop() method kills the message polling greenthread and the wait()
method waits for all executor maintained greenthreads to complete.
"""
def __init__(self, conf, listener, dispatcher):
super(EventletExecutor, self).__init__(conf, listener, dispatcher)
eventletutils.warn_eventlet_not_patched(
expected_patched_modules=['thread'],
what="the 'oslo.messaging eventlet executor'")
_executor_cls = futurist.GreenThreadPoolExecutor
_lock_cls = greenthreading.Lock
_event_cls = greenthreading.Event
_thread_cls = greenthreading.Thread

View File

@ -1,155 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import threading
from futurist import waiters
from oslo_config import cfg
from oslo_utils import excutils
from oslo_utils import timeutils
from oslo_messaging._executors import base
_pool_opts = [
cfg.IntOpt('executor_thread_pool_size',
default=64,
deprecated_name="rpc_thread_pool_size",
help='Size of executor thread pool.'),
]
class PooledExecutor(base.ExecutorBase):
"""A message executor which integrates with some executor.
This will create a message thread that polls for messages from a
dispatching thread and on reception of an incoming message places the
message to be processed into a async executor to be executed at a later
time.
"""
# These may be overridden by subclasses (and implemented using whatever
# objects make most sense for the provided async execution model).
_event_cls = threading.Event
_lock_cls = threading.Lock
# Pooling and dispatching (executor submission) will happen from a
# thread created from this class/function.
_thread_cls = threading.Thread
# This one **must** be overridden by a subclass.
_executor_cls = None
# Blocking function that should wait for all provided futures to finish.
_wait_for_all = staticmethod(waiters.wait_for_all)
def __init__(self, conf, listener, dispatcher):
super(PooledExecutor, self).__init__(conf, listener, dispatcher)
self.conf.register_opts(_pool_opts)
self._poller = None
self._executor = None
self._tombstone = self._event_cls()
self._incomplete = collections.deque()
self._mutator = self._lock_cls()
def _do_submit(self, callback):
def _on_done(fut):
with self._mutator:
try:
self._incomplete.remove(fut)
except ValueError:
pass
callback.done()
try:
fut = self._executor.submit(callback.run)
except RuntimeError:
# This is triggered when the executor has been shutdown...
#
# TODO(harlowja): should we put whatever we pulled off back
# since when this is thrown it means the executor has been
# shutdown already??
callback.done()
return False
else:
with self._mutator:
self._incomplete.append(fut)
# Run the other post processing of the callback when done...
fut.add_done_callback(_on_done)
return True
@excutils.forever_retry_uncaught_exceptions
def _runner(self):
while not self._tombstone.is_set():
incoming = self.listener.poll(
timeout=self.dispatcher.batch_timeout,
prefetch_size=self.dispatcher.batch_size)
if not incoming:
continue
callback = self.dispatcher(incoming)
was_submitted = self._do_submit(callback)
if not was_submitted:
break
def start(self, override_pool_size=None):
if self._executor is None:
if override_pool_size is not None and int(override_pool_size) < 1:
raise ValueError('The thread pool size should be a positive '
'value.')
self._executor = self._executor_cls(
override_pool_size if override_pool_size else
self.conf.executor_thread_pool_size)
self._tombstone.clear()
if self._poller is None or not self._poller.is_alive():
self._poller = self._thread_cls(target=self._runner)
self._poller.daemon = True
self._poller.start()
def stop(self):
if self._executor is not None:
self._executor.shutdown(wait=False)
self._tombstone.set()
self.listener.stop()
def wait(self, timeout=None):
with timeutils.StopWatch(duration=timeout) as w:
poller = self._poller
if poller is not None:
self._tombstone.wait(w.leftover(return_none=True))
if not self._tombstone.is_set():
return False
poller.join(w.leftover(return_none=True))
if poller.is_alive():
return False
self._poller = None
executor = self._executor
if executor is not None:
with self._mutator:
incomplete_fs = list(self._incomplete)
if incomplete_fs:
(done, not_done) = self._wait_for_all(
incomplete_fs,
timeout=w.leftover(return_none=True))
with self._mutator:
for fut in done:
try:
self._incomplete.remove(fut)
except ValueError:
pass
if not_done:
return False
self._executor = None
return True

View File

@ -1,30 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import futurist
from oslo_messaging._executors import impl_pooledexecutor
class ThreadExecutor(impl_pooledexecutor.PooledExecutor):
"""A message executor which integrates with threads.
A message process that polls for messages from a dispatching thread and
on reception of an incoming message places the message to be processed in
a thread pool to be executed at a later time.
"""
_executor_cls = futurist.ThreadPoolExecutor

View File

@ -22,19 +22,21 @@ import itertools
from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import base as drivers_base
from oslo_messaging._drivers import impl_pika
from oslo_messaging._drivers import impl_rabbit
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts
from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis
from oslo_messaging._executors import impl_pooledexecutor
from oslo_messaging.notify import notifier
from oslo_messaging.rpc import client
from oslo_messaging import server
from oslo_messaging import transport
_global_opt_lists = [
drivers_base.base_opts,
impl_zmq.zmq_opts,
impl_pooledexecutor._pool_opts,
server._pool_opts,
client._client_opts,
transport._transport_opts,
]
@ -44,8 +46,10 @@ _opts = [
('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts),
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_notifications', notifier._notifier_opts),
('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts,
impl_rabbit.rabbit_opts))),
('oslo_messaging_rabbit', list(
itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts,
impl_pika.pika_opts, impl_pika.pika_pool_opts,
impl_pika.notification_opts, impl_pika.rpc_opts))),
]

View File

@ -29,7 +29,10 @@ import logging
import threading
import traceback
from oslo_config import cfg
from oslo_service import service
from oslo_utils import eventletutils
from oslo_utils import excutils
from oslo_utils import timeutils
from stevedore import driver
@ -44,6 +47,14 @@ LOG = logging.getLogger(__name__)
DEFAULT_LOG_AFTER = 30
_pool_opts = [
cfg.IntOpt('executor_thread_pool_size',
default=64,
deprecated_name="rpc_thread_pool_size",
help='Size of executor thread pool.'),
]
class MessagingServerError(exceptions.MessagingException):
"""Base class for all MessageHandlingServer exceptions."""
@ -311,24 +322,33 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
:type executor: str
"""
self.conf = transport.conf
self.conf.register_opts(_pool_opts)
self.transport = transport
self.dispatcher = dispatcher
self.executor = executor
self.executor_type = executor
self.listener = None
try:
mgr = driver.DriverManager('oslo.messaging.executors',
self.executor)
self.executor_type)
except RuntimeError as ex:
raise ExecutorLoadFailure(self.executor, ex)
raise ExecutorLoadFailure(self.executor_type, ex)
self._executor_cls = mgr.driver
self._executor_obj = None
self._work_executor = None
self._poll_executor = None
self._started = False
super(MessageHandlingServer, self).__init__()
def _submit_work(self, callback):
fut = self._work_executor.submit(callback.run)
fut.add_done_callback(lambda f: callback.done())
@ordered(reset_after='stop')
def start(self, override_pool_size=None):
"""Start handling incoming messages.
@ -354,18 +374,28 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
self._started = True
try:
listener = self.dispatcher._listen(self.transport)
self.listener = self.dispatcher._listen(self.transport)
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
executor = self._executor_cls(self.conf, listener, self.dispatcher)
executor.start(override_pool_size=override_pool_size)
self._executor_obj = executor
if self.executor == 'blocking':
# N.B. This will be executed unlocked and unordered, so
# we can't rely on the value of self._executor_obj when this runs.
# We explicitly pass the local variable.
return lambda: executor.execute()
executor_opts = {}
if self.executor_type == "threading":
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)
elif self.executor_type == "eventlet":
eventletutils.warn_eventlet_not_patched(
expected_patched_modules=['thread'],
what="the 'oslo.messaging eventlet executor'")
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)
self._work_executor = self._executor_cls(**executor_opts)
self._poll_executor = self._executor_cls(**executor_opts)
return lambda: self._poll_executor.submit(self._runner)
@ordered(after='start')
def stop(self):
@ -376,7 +406,30 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
some messages, and underlying driver resources associated to this
server are still in use. See 'wait' for more details.
"""
self._executor_obj.stop()
self.listener.stop()
self._started = False
@excutils.forever_retry_uncaught_exceptions
def _runner(self):
while self._started:
incoming = self.listener.poll(
timeout=self.dispatcher.batch_timeout,
prefetch_size=self.dispatcher.batch_size)
if incoming:
self._submit_work(self.dispatcher(incoming))
# listener is stopped but we need to process all already consumed
# messages
while True:
incoming = self.listener.poll(
timeout=self.dispatcher.batch_timeout,
prefetch_size=self.dispatcher.batch_size)
if incoming:
self._submit_work(self.dispatcher(incoming))
else:
return
@ordered(after='stop')
def wait(self):
@ -389,12 +442,11 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
Once it's finished, the underlying driver resources associated to this
server are released (like closing useless network connections).
"""
try:
self._executor_obj.wait()
finally:
# Close listener connection after processing all messages
self._executor_obj.listener.cleanup()
self._executor_obj = None
self._poll_executor.shutdown(wait=True)
self._work_executor.shutdown(wait=True)
# Close listener connection after processing all messages
self.listener.cleanup()
def reset(self):
"""Reset service.

View File

@ -1,145 +0,0 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import time
from six.moves import mock
import testscenarios
from oslo_messaging._executors import impl_blocking
try:
from oslo_messaging._executors import impl_eventlet
except ImportError:
impl_eventlet = None
from oslo_messaging._executors import impl_thread
from oslo_messaging import dispatcher as dispatcher_base
from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
class TestExecutor(test_utils.BaseTestCase):
@classmethod
def generate_scenarios(cls):
impl = [
('blocking', dict(executor=impl_blocking.BlockingExecutor)),
('threaded', dict(executor=impl_thread.ThreadExecutor)),
]
if impl_eventlet is not None:
impl.append(
('eventlet', dict(executor=impl_eventlet.EventletExecutor)))
cls.scenarios = testscenarios.multiply_scenarios(impl)
@staticmethod
def _run_in_thread(target, executor):
thread = threading.Thread(target=target, args=(executor,))
thread.daemon = True
thread.start()
return thread
def _create_dispatcher(self):
if impl_blocking is not None:
blocking_class = impl_blocking.BlockingExecutor
else:
blocking_class = None
is_blocking = (self.executor == blocking_class)
if is_blocking:
def run_executor(executor):
executor.start()
executor.execute()
executor.wait()
endpoint = mock.MagicMock(return_value='result')
event = None
else:
def run_executor(executor):
executor.start()
executor.wait()
endpoint = mock.MagicMock(return_value='result')
event = None
class Dispatcher(dispatcher_base.DispatcherBase):
def __init__(self, endpoint):
self.endpoint = endpoint
self.result = "not set"
def _listen(self, transport):
pass
def callback(self, incoming):
result = self.endpoint(incoming.ctxt,
incoming.message)
self.result = result
return result
def __call__(self, incoming):
return dispatcher_base.DispatcherExecutorContext(
incoming[0], self.callback)
return Dispatcher(endpoint), endpoint, event, run_executor
def test_slow_wait(self):
dispatcher, endpoint, event, run_executor = self._create_dispatcher()
listener = mock.Mock(spec=['poll', 'stop'])
executor = self.executor(self.conf, listener, dispatcher)
incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
def fake_poll(timeout=None, prefetch_size=1):
time.sleep(0.1)
if listener.poll.call_count == 10:
if event is not None:
event.wait()
executor.stop()
else:
return incoming_message
listener.poll.side_effect = fake_poll
thread = self._run_in_thread(run_executor, executor)
self.assertFalse(executor.wait(timeout=0.1))
thread.join()
self.assertTrue(executor.wait())
def test_dead_wait(self):
(dispatcher, _endpoint, _event,
_run_executor) = self._create_dispatcher()
listener = mock.Mock(spec=['poll', 'stop'])
executor = self.executor(self.conf, listener, dispatcher)
executor.stop()
self.assertTrue(executor.wait())
def test_executor_dispatch(self):
dispatcher, endpoint, event, run_executor = self._create_dispatcher()
listener = mock.Mock(spec=['poll', 'stop'])
executor = self.executor(self.conf, listener, dispatcher)
incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
def fake_poll(timeout=None, prefetch_size=1):
if listener.poll.call_count == 1:
return [incoming_message]
if event is not None:
event.wait()
executor.stop()
listener.poll.side_effect = fake_poll
thread = self._run_in_thread(run_executor, executor)
thread.join()
endpoint.assert_called_once_with({}, {'payload': 'data'})
self.assertEqual(dispatcher.result, 'result')
TestExecutor.generate_scenarios()

View File

@ -320,7 +320,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
# order between events with different categories is not guaranteed
received = {}
for expected in events:
e = listener.events.get(timeout=0.5)
e = listener.events.get(timeout=1)
received[e[0]] = e
for expected in events:

View File

@ -146,7 +146,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
self.assertIsInstance(listener.dispatcher,
dispatcher.NotificationDispatcher)
self.assertIs(listener.dispatcher.endpoints, endpoints)
self.assertEqual('blocking', listener.executor)
self.assertEqual('blocking', listener.executor_type)
def test_no_target_topic(self):
transport = msg_notifier.get_notification_transport(

View File

@ -127,7 +127,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher)
self.assertIs(server.dispatcher.endpoints, endpoints)
self.assertIs(server.dispatcher.serializer, serializer)
self.assertEqual('blocking', server.executor)
self.assertEqual('blocking', server.executor_type)
def test_server_wait_method(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
@ -136,11 +136,11 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
serializer = object()
class MagicMockIgnoreArgs(mock.MagicMock):
'''MagicMock ignores arguments.
"""MagicMock ignores arguments.
A MagicMock which can never misinterpret the arguments passed to
it during construction.
'''
"""
def __init__(self, *args, **kwargs):
super(MagicMockIgnoreArgs, self).__init__()
@ -149,15 +149,16 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
serializer=serializer)
# Mocking executor
server._executor_cls = MagicMockIgnoreArgs
server.listener = MagicMockIgnoreArgs()
server.dispatcher = MagicMockIgnoreArgs()
# Here assigning executor's listener object to listener variable
# before calling wait method, because in wait method we are
# setting executor to None.
server.start()
listener = server._executor_obj.listener
listener = server.listener
server.stop()
# call server wait method
server.wait()
self.assertIsNone(server._executor_obj)
self.assertEqual(1, listener.cleanup.call_count)
def test_no_target_server(self):
@ -539,7 +540,7 @@ class TestServerLocking(test_utils.BaseTestCase):
super(TestServerLocking, self).setUp(conf=cfg.ConfigOpts())
def _logmethod(name):
def method(self):
def method(self, *args, **kwargs):
with self._lock:
self._calls.append(name)
return method
@ -553,13 +554,8 @@ class TestServerLocking(test_utils.BaseTestCase):
self.listener = mock.MagicMock()
executors.append(self)
def start(self, override_pool_size=None):
with self._lock:
self._calls.append('start')
stop = _logmethod('stop')
wait = _logmethod('wait')
execute = _logmethod('execute')
submit = _logmethod('submit')
shutdown = _logmethod('shutdown')
self.executors = executors
@ -574,11 +570,10 @@ class TestServerLocking(test_utils.BaseTestCase):
self.server.stop()
self.server.wait()
self.assertEqual(len(self.executors), 1)
executor = self.executors[0]
self.assertEqual(executor._calls,
['start', 'execute', 'stop', 'wait'])
self.assertTrue(executor.listener.cleanup.called)
self.assertEqual(len(self.executors), 2)
self.assertEqual(self.executors[0]._calls, ['shutdown'])
self.assertEqual(self.executors[1]._calls, ['submit', 'shutdown'])
self.assertTrue(self.server.listener.cleanup.called)
def test_reversed_order(self):
# Test that if we call wait, stop, start, these will be correctly
@ -596,10 +591,9 @@ class TestServerLocking(test_utils.BaseTestCase):
self.server.wait()
self.assertEqual(len(self.executors), 1)
executor = self.executors[0]
self.assertEqual(executor._calls,
['start', 'execute', 'stop', 'wait'])
self.assertEqual(len(self.executors), 2)
self.assertEqual(self.executors[0]._calls, ['shutdown'])
self.assertEqual(self.executors[1]._calls, ['submit', 'shutdown'])
def test_wait_for_running_task(self):
# Test that if 2 threads call a method simultaneously, both will wait,
@ -611,16 +605,16 @@ class TestServerLocking(test_utils.BaseTestCase):
running_event = threading.Event()
done_event = threading.Event()
runner = [None]
_runner = [None]
class SteppingFakeExecutor(self.server._executor_cls):
def start(self, override_pool_size=None):
def __init__(self, *args, **kwargs):
# Tell the test which thread won the race
runner[0] = eventlet.getcurrent()
_runner[0] = eventlet.getcurrent()
running_event.set()
start_event.wait()
super(SteppingFakeExecutor, self).start()
super(SteppingFakeExecutor, self).__init__(*args, **kwargs)
done_event.set()
finish_event.wait()
@ -632,7 +626,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Wait until one of the threads starts running
running_event.wait()
runner = runner[0]
runner = _runner[0]
waiter = start2 if runner == start1 else start2
waiter_finished = threading.Event()
@ -640,18 +634,16 @@ class TestServerLocking(test_utils.BaseTestCase):
# At this point, runner is running start(), and waiter() is waiting for
# it to complete. runner has not yet logged anything.
self.assertEqual(1, len(self.executors))
executor = self.executors[0]
self.assertEqual(executor._calls, [])
self.assertEqual(0, len(self.executors))
self.assertFalse(waiter_finished.is_set())
# Let the runner log the call
start_event.set()
done_event.wait()
# We haven't signalled completion yet, so execute shouldn't have run
self.assertEqual(executor._calls, ['start'])
# We haven't signalled completion yet, so submit shouldn't have run
self.assertEqual(1, len(self.executors))
self.assertEqual(self.executors[0]._calls, [])
self.assertFalse(waiter_finished.is_set())
# Let the runner complete
@ -662,7 +654,9 @@ class TestServerLocking(test_utils.BaseTestCase):
# Check that both threads have finished, start was only called once,
# and execute ran
self.assertTrue(waiter_finished.is_set())
self.assertEqual(executor._calls, ['start', 'execute'])
self.assertEqual(2, len(self.executors))
self.assertEqual(self.executors[0]._calls, [])
self.assertEqual(self.executors[1]._calls, ['submit'])
def test_start_stop_wait_stop_wait(self):
# Test that we behave correctly when calling stop/wait more than once.
@ -674,11 +668,10 @@ class TestServerLocking(test_utils.BaseTestCase):
self.server.stop()
self.server.wait()
self.assertEqual(len(self.executors), 1)
executor = self.executors[0]
self.assertEqual(executor._calls,
['start', 'execute', 'stop', 'wait'])
self.assertTrue(executor.listener.cleanup.called)
self.assertEqual(len(self.executors), 2)
self.assertEqual(self.executors[0]._calls, ['shutdown'])
self.assertEqual(self.executors[1]._calls, ['submit', 'shutdown'])
self.assertTrue(self.server.listener.cleanup.called)
def test_state_wrapping(self):
# Test that we behave correctly if a thread waits, and the server state
@ -712,8 +705,9 @@ class TestServerLocking(test_utils.BaseTestCase):
complete_waiting_callback.wait()
# The server should have started, but stop should not have been called
self.assertEqual(1, len(self.executors))
self.assertEqual(self.executors[0]._calls, ['start', 'execute'])
self.assertEqual(2, len(self.executors))
self.assertEqual(self.executors[0]._calls, [])
self.assertEqual(self.executors[1]._calls, ['submit'])
self.assertFalse(thread1_finished.is_set())
self.server.stop()
@ -721,19 +715,20 @@ class TestServerLocking(test_utils.BaseTestCase):
# We should have gone through all the states, and thread1 should still
# be waiting
self.assertEqual(1, len(self.executors))
self.assertEqual(self.executors[0]._calls, ['start', 'execute',
'stop', 'wait'])
self.assertEqual(2, len(self.executors))
self.assertEqual(self.executors[0]._calls, ['shutdown'])
self.assertEqual(self.executors[1]._calls, ['submit', 'shutdown'])
self.assertFalse(thread1_finished.is_set())
# Start again
self.server.start()
# We should now record 2 executors
self.assertEqual(2, len(self.executors))
self.assertEqual(self.executors[0]._calls, ['start', 'execute',
'stop', 'wait'])
self.assertEqual(self.executors[1]._calls, ['start', 'execute'])
# We should now record 4 executors (2 for each server)
self.assertEqual(4, len(self.executors))
self.assertEqual(self.executors[0]._calls, ['shutdown'])
self.assertEqual(self.executors[1]._calls, ['submit', 'shutdown'])
self.assertEqual(self.executors[2]._calls, [])
self.assertEqual(self.executors[3]._calls, ['submit'])
self.assertFalse(thread1_finished.is_set())
# Allow thread1 to complete
@ -742,10 +737,11 @@ class TestServerLocking(test_utils.BaseTestCase):
# thread1 should now have finished, and stop should not have been
# called again on either the first or second executor
self.assertEqual(2, len(self.executors))
self.assertEqual(self.executors[0]._calls, ['start', 'execute',
'stop', 'wait'])
self.assertEqual(self.executors[1]._calls, ['start', 'execute'])
self.assertEqual(4, len(self.executors))
self.assertEqual(self.executors[0]._calls, ['shutdown'])
self.assertEqual(self.executors[1]._calls, ['submit', 'shutdown'])
self.assertEqual(self.executors[2]._calls, [])
self.assertEqual(self.executors[3]._calls, ['submit'])
self.assertTrue(thread1_finished.is_set())
@mock.patch.object(server_module, 'DEFAULT_LOG_AFTER', 1)
@ -810,24 +806,24 @@ class TestServerLocking(test_utils.BaseTestCase):
# Start the server, which will also instantiate an executor
self.server.start()
stop_called = threading.Event()
self.server.stop()
shutdown_called = threading.Event()
# Patch the executor's stop method to be very slow
def slow_stop():
stop_called.set()
def slow_shutdown(wait):
shutdown_called.set()
eventlet.sleep(10)
self.executors[0].stop = slow_stop
self.executors[0].shutdown = slow_shutdown
# Call stop in a new thread
thread = eventlet.spawn(self.server.stop)
# Call wait in a new thread
thread = eventlet.spawn(self.server.wait)
# Wait until the thread is in the slow stop method
stop_called.wait()
shutdown_called.wait()
# Call stop again in the main thread with a timeout
# Call wait again in the main thread with a timeout
self.assertRaises(server_module.TaskTimeout,
self.server.stop, timeout=1)
self.server.wait, timeout=1)
thread.kill()
@mock.patch.object(server_module, 'LOG')

View File

@ -17,7 +17,7 @@ import testtools
import mock
from oslo_messaging._executors import impl_thread
from oslo_messaging import server
try:
from oslo_messaging import opts
except ImportError:
@ -59,6 +59,8 @@ class OptsTestCase(test_utils.BaseTestCase):
self._test_list_opts(result)
def test_defaults(self):
impl_thread.ThreadExecutor(self.conf, mock.Mock(), mock.Mock())
transport = mock.Mock()
transport.conf = self.conf
server.MessageHandlingServer(transport, mock.Mock())
opts.set_defaults(self.conf, executor_thread_pool_size=100)
self.assertEqual(100, self.conf.executor_thread_pool_size)

View File

@ -40,9 +40,9 @@ oslo.messaging.drivers =
pika = oslo_messaging._drivers.impl_pika:PikaDriver
oslo.messaging.executors =
blocking = oslo_messaging._executors.impl_blocking:BlockingExecutor
eventlet = oslo_messaging._executors.impl_eventlet:EventletExecutor
threading = oslo_messaging._executors.impl_thread:ThreadExecutor
blocking = futurist:SynchronousExecutor
eventlet = futurist:GreenThreadPoolExecutor
threading = futurist:ThreadPoolExecutor
oslo.messaging.notify.drivers =
messagingv2 = oslo_messaging.notify.messaging:MessagingV2Driver