diff --git a/oslo_messaging/_utils.py b/oslo_messaging/_utils.py index 362c4f26f..969bdbbcd 100644 --- a/oslo_messaging/_utils.py +++ b/oslo_messaging/_utils.py @@ -13,6 +13,12 @@ # License for the specific language governing permissions and limitations # under the License. +import logging + +from oslo_utils import eventletutils + +LOG = logging.getLogger(__name__) + def version_is_compatible(imp_version, version): """Determine whether versions are compatible. @@ -59,3 +65,11 @@ class DummyLock(object): def __exit__(self, type, value, traceback): self.release() + + +def get_executor_with_context(): + if eventletutils.is_monkey_patched('thread'): + LOG.debug("Threading is patched, using an eventlet executor") + return 'eventlet' + LOG.debug("Using a threading executor") + return 'threading' diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py index ae6ac5118..de9a26aa7 100644 --- a/oslo_messaging/notify/listener.py +++ b/oslo_messaging/notify/listener.py @@ -143,7 +143,7 @@ LOG = logging.getLogger(__name__) class NotificationServerBase(msg_server.MessageHandlingServer): - def __init__(self, transport, targets, dispatcher, executor='blocking', + def __init__(self, transport, targets, dispatcher, executor=None, allow_requeue=True, pool=None, batch_size=1, batch_timeout=None): super(NotificationServerBase, self).__init__(transport, dispatcher, @@ -167,7 +167,7 @@ class NotificationServerBase(msg_server.MessageHandlingServer): class NotificationServer(NotificationServerBase): - def __init__(self, transport, targets, dispatcher, executor='blocking', + def __init__(self, transport, targets, dispatcher, executor=None, allow_requeue=True, pool=None): if not isinstance(transport, msg_transport.NotificationTransport): LOG.warning("Using RPC transport for notifications. Please use " @@ -216,7 +216,7 @@ class BatchNotificationServer(NotificationServerBase): def get_notification_listener(transport, targets, endpoints, - executor='blocking', serializer=None, + executor=None, serializer=None, allow_requeue=False, pool=None): """Construct a notification listener @@ -250,7 +250,7 @@ def get_notification_listener(transport, targets, endpoints, def get_batch_notification_listener(transport, targets, endpoints, - executor='blocking', serializer=None, + executor=None, serializer=None, allow_requeue=False, pool=None, batch_size=None, batch_timeout=None): """Construct a batch notification listener diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index d981b88ed..03517c350 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -138,7 +138,7 @@ LOG = logging.getLogger(__name__) class RPCServer(msg_server.MessageHandlingServer): - def __init__(self, transport, target, dispatcher, executor='blocking'): + def __init__(self, transport, target, dispatcher, executor=None): super(RPCServer, self).__init__(transport, dispatcher, executor) if not isinstance(transport, msg_transport.RPCTransport): LOG.warning("Using notification transport for RPC. Please use " @@ -200,7 +200,7 @@ class RPCServer(msg_server.MessageHandlingServer): def get_rpc_server(transport, target, endpoints, - executor='blocking', serializer=None, access_policy=None): + executor=None, serializer=None, access_policy=None): """Construct an RPC server. :param transport: the messaging transport diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py index 983ad7230..4df1512b5 100644 --- a/oslo_messaging/server.py +++ b/oslo_messaging/server.py @@ -23,7 +23,6 @@ import logging import threading import traceback -import debtcollector from oslo_config import cfg from oslo_service import service from oslo_utils import eventletutils @@ -32,6 +31,7 @@ import six from stevedore import driver from oslo_messaging._drivers import base as driver_base +from oslo_messaging import _utils as utils from oslo_messaging import exceptions __all__ = [ @@ -306,16 +306,17 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): new tasks. """ - def __init__(self, transport, dispatcher, executor='blocking'): + def __init__(self, transport, dispatcher, executor=None): """Construct a message handling server. The dispatcher parameter is a DispatcherBase instance which is used for routing request to endpoint for processing. The executor parameter controls how incoming messages will be received - and dispatched. By default, the most simple executor is used - the - blocking executor. It handles only one message at once. It's - recommended to use threading or eventlet. + and dispatched. Executor is automatically detected from + execution environment. + It handles many message in parallel. If your application need + asynchronism then you need to consider to use the eventlet executor. :param transport: the messaging transport :type transport: Transport @@ -326,19 +327,20 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): 'eventlet' and 'threading' :type executor: str """ + if executor and executor not in ("threading", "eventlet"): + raise ExecutorLoadFailure( + executor, + "Executor should be None or 'eventlet' and 'threading'") + if not executor: + executor = utils.get_executor_with_context() + self.conf = transport.conf self.conf.register_opts(_pool_opts) self.transport = transport self.dispatcher = dispatcher self.executor_type = executor - if self.executor_type == 'blocking': - debtcollector.deprecate( - 'blocking executor is deprecated. Executor default will be ' - 'removed. Use explicitly threading or eventlet instead', - version="pike", removal_version="rocky", - category=FutureWarning) - elif self.executor_type == "eventlet": + if self.executor_type == "eventlet": eventletutils.warn_eventlet_not_patched( expected_patched_modules=['thread'], what="the 'oslo.messaging eventlet executor'") @@ -403,10 +405,9 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): executor_opts = {} - if self.executor_type in ("threading", "eventlet"): - executor_opts["max_workers"] = ( - override_pool_size or self.conf.executor_thread_pool_size - ) + executor_opts["max_workers"] = ( + override_pool_size or self.conf.executor_thread_pool_size + ) self._work_executor = self._executor_cls(**executor_opts) try: diff --git a/oslo_messaging/tests/notify/test_listener.py b/oslo_messaging/tests/notify/test_listener.py index 44a48e03e..1125b1781 100644 --- a/oslo_messaging/tests/notify/test_listener.py +++ b/oslo_messaging/tests/notify/test_listener.py @@ -135,27 +135,21 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges', new_value={})) - @mock.patch('debtcollector.deprecate') - def test_constructor(self, deprecate): + def test_constructor(self): transport = msg_notifier.get_notification_transport( self.conf, url='fake:') target = oslo_messaging.Target(topic='foo') endpoints = [object()] listener = oslo_messaging.get_notification_listener( - transport, [target], endpoints) + transport, [target], endpoints, executor='threading') self.assertIs(listener.conf, self.conf) self.assertIs(listener.transport, transport) self.assertIsInstance(listener.dispatcher, dispatcher.NotificationDispatcher) self.assertIs(listener.dispatcher.endpoints, endpoints) - self.assertEqual('blocking', listener.executor_type) - deprecate.assert_called_once_with( - 'blocking executor is deprecated. Executor default will be ' - 'removed. Use explicitly threading or eventlet instead', - removal_version='rocky', version='pike', - category=FutureWarning) + self.assertEqual('threading', listener.executor_type) def test_no_target_topic(self): transport = msg_notifier.get_notification_transport( diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index b4ec519b6..c993a871a 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -14,7 +14,6 @@ # under the License. import threading -import warnings import eventlet import fixtures @@ -120,51 +119,62 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): 'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges', new_value={})) - @mock.patch('warnings.warn') - def test_constructor(self, warn): + def test_constructor(self): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') target = oslo_messaging.Target(topic='foo', server='bar') endpoints = [object()] serializer = object() access_policy = dispatcher.DefaultRPCAccessPolicy - warnings.simplefilter("always", FutureWarning) server = oslo_messaging.get_rpc_server(transport, target, endpoints, serializer=serializer, - access_policy=access_policy) + access_policy=access_policy, + executor='threading') self.assertIs(server.conf, self.conf) self.assertIs(server.transport, transport) self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher) self.assertIs(server.dispatcher.endpoints, endpoints) self.assertIs(server.dispatcher.serializer, serializer) - self.assertEqual('blocking', server.executor_type) - self.assertEqual([ - mock.call("blocking executor is deprecated. Executor default will " - "be removed. Use explicitly threading or eventlet " - "instead in version 'pike' and will be removed in " - "version 'rocky'", - category=FutureWarning, stacklevel=3) - ], warn.mock_calls) + self.assertEqual('threading', server.executor_type) - @mock.patch('warnings.warn') - def test_constructor_without_explicit_RPCAccessPolicy(self, warn): + def test_constructor_with_eventlet_executor(self): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') target = oslo_messaging.Target(topic='foo', server='bar') endpoints = [object()] serializer = object() + access_policy = dispatcher.DefaultRPCAccessPolicy - warnings.simplefilter("always", FutureWarning) - oslo_messaging.get_rpc_server(transport, target, - endpoints, serializer=serializer) - self.assertEqual([ - mock.call("blocking executor is deprecated. Executor default will " - "be removed. Use explicitly threading or eventlet " - "instead in version 'pike' and will be removed in " - "version 'rocky'", - category=FutureWarning, stacklevel=3) - ], warn.mock_calls) + server = oslo_messaging.get_rpc_server(transport, + target, + endpoints, + serializer=serializer, + access_policy=access_policy, + executor='eventlet') + self.assertIs(server.conf, self.conf) + self.assertIs(server.transport, transport) + self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher) + self.assertIs(server.dispatcher.endpoints, endpoints) + self.assertIs(server.dispatcher.serializer, serializer) + self.assertEqual('eventlet', server.executor_type) + + def test_constructor_with_unrecognized_executor(self): + transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') + target = oslo_messaging.Target(topic='foo', server='bar') + endpoints = [object()] + serializer = object() + access_policy = dispatcher.DefaultRPCAccessPolicy + + self.assertRaises( + server_module.ExecutorLoadFailure, + oslo_messaging.get_rpc_server, + transport=transport, + target=target, + endpoints=endpoints, + serializer=serializer, + access_policy=access_policy, + executor='boom') def test_server_wait_method(self): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') diff --git a/releasenotes/notes/blocking-executor-support-dropped-a3bc74c6825863f0.yaml b/releasenotes/notes/blocking-executor-support-dropped-a3bc74c6825863f0.yaml new file mode 100644 index 000000000..878c5fb0d --- /dev/null +++ b/releasenotes/notes/blocking-executor-support-dropped-a3bc74c6825863f0.yaml @@ -0,0 +1,8 @@ +--- +upgrade: + - | + The blocking executor has been deprecated for removal in Rocky and support + is now dropped in Ussuri. Its usage was never recommended for applications, + and it has no test coverage. + Applications should choose the appropriate threading model that maps to + their usage instead. diff --git a/setup.cfg b/setup.cfg index ecacb87d9..116ebff71 100644 --- a/setup.cfg +++ b/setup.cfg @@ -51,7 +51,6 @@ oslo.messaging.drivers = fake = oslo_messaging._drivers.impl_fake:FakeDriver oslo.messaging.executors = - blocking = futurist:SynchronousExecutor eventlet = futurist:GreenThreadPoolExecutor threading = futurist:ThreadPoolExecutor