From c68266b36b5d8e94fef2cca23ea459a01e05a0d2 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Tue, 13 Oct 2015 18:30:50 +0200 Subject: [PATCH] Use a condition (and/or a dummy one) instead of a lock Instead of having to spin in the wait method, just use a condition and block until stopping has actually happened, when stop happens, it will use the notify_all method to let any blockers release. Closes-Bug: #1505730 Change-Id: I3cfbe1bf02d451e379b1dcc23dacb0139c03be76 --- oslo_messaging/_utils.py | 23 ++++++++++ oslo_messaging/server.py | 60 +++++++++++++++++-------- oslo_messaging/tests/rpc/test_server.py | 20 --------- 3 files changed, 64 insertions(+), 39 deletions(-) diff --git a/oslo_messaging/_utils.py b/oslo_messaging/_utils.py index cec94bb48..1bb20b089 100644 --- a/oslo_messaging/_utils.py +++ b/oslo_messaging/_utils.py @@ -116,6 +116,29 @@ def fetch_current_thread_functor(): return lambda: threading.current_thread() +class DummyCondition(object): + def acquire(self): + pass + + def notify(self): + pass + + def notify_all(self): + pass + + def wait(self, timeout=None): + pass + + def release(self): + pass + + def __enter__(self): + self.acquire() + + def __exit__(self, type, value, traceback): + self.release() + + class DummyLock(object): def acquire(self): pass diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py index 7af4c3d8a..f8083bd02 100644 --- a/oslo_messaging/server.py +++ b/oslo_messaging/server.py @@ -27,6 +27,7 @@ import logging import threading from oslo_service import service +from oslo_utils import timeutils from stevedore import driver from oslo_messaging._drivers import base as driver_base @@ -98,9 +99,11 @@ class MessageHandlingServer(service.ServiceBase): # is fully started. Except for the blocking executor that have # start() that doesn't return if self.executor != "blocking": - self._state_lock = threading.Lock() + self._state_cond = threading.Condition() + self._dummy_cond = False else: - self._state_lock = _utils.DummyLock() + self._state_cond = _utils.DummyCondition() + self._dummy_cond = True try: mgr = driver.DriverManager('oslo.messaging.executors', @@ -130,16 +133,18 @@ class MessageHandlingServer(service.ServiceBase): """ if self._executor is not None: return - try: - listener = self.dispatcher._listen(self.transport) - except driver_base.TransportDriverError as ex: - raise ServerListenError(self.target, ex) - - with self._state_lock: + with self._state_cond: + if self._executor is not None: + return + try: + listener = self.dispatcher._listen(self.transport) + except driver_base.TransportDriverError as ex: + raise ServerListenError(self.target, ex) self._running = True self._executor = self._executor_cls(self.conf, listener, self.dispatcher) self._executor.start() + self._state_cond.notify_all() def stop(self): """Stop handling incoming messages. @@ -149,10 +154,11 @@ class MessageHandlingServer(service.ServiceBase): some messages, and underlying driver resources associated to this server are still in use. See 'wait' for more details. """ - with self._state_lock: + with self._state_cond: if self._executor is not None: self._running = False self._executor.stop() + self._state_cond.notify_all() def wait(self): """Wait for message processing to complete. @@ -164,21 +170,37 @@ class MessageHandlingServer(service.ServiceBase): Once it's finished, the underlying driver resources associated to this server are released (like closing useless network connections). """ - with self._state_lock: + with self._state_cond: if self._running: - # NOTE(dims): Need to change this to raise RuntimeError after - # verifying/fixing other openstack projects (like Neutron) - # work ok with this change LOG.warn(_LW("wait() should be called after stop() as it " "waits for existing messages to finish " "processing")) - - if self._executor is not None: - self._executor.wait() - # Close listener connection after processing all messages - self._executor.listener.cleanup() - + w = timeutils.StopWatch() + w.start() + while self._running: + # NOTE(harlowja): 1.0 seconds was mostly chosen at + # random, but it seems like a reasonable value to + # use to avoid spamming the logs with to much + # information. + self._state_cond.wait(1.0) + if self._running and not self._dummy_cond: + LOG.warn( + _LW("wait() should be have been called" + " after stop() as wait() waits for existing" + " messages to finish processing, it has" + " been %0.2f seconds and stop() still has" + " not been called"), w.elapsed()) + executor = self._executor self._executor = None + if executor is not None: + # We are the lucky calling thread to wait on the executor to + # actually finish. + try: + executor.wait() + finally: + # Close listener connection after processing all messages + executor.listener.cleanup() + executor = None def reset(self): """Reset service. diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 0c222ae47..b1f8961c5 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -130,26 +130,6 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): self.assertIsNone(server._executor) self.assertEqual(1, listener.cleanup.call_count) - @mock.patch('oslo_messaging._executors.impl_pooledexecutor.' - 'PooledExecutor.wait') - def test_server_invalid_wait_running_server(self, mock_wait): - transport = oslo_messaging.get_transport(self.conf, url='fake:') - target = oslo_messaging.Target(topic='foo', server='bar') - endpoints = [object()] - serializer = object() - - server = oslo_messaging.get_rpc_server(transport, target, endpoints, - serializer=serializer, - executor='eventlet') - self.addCleanup(server.wait) - self.addCleanup(server.stop) - server.start() - with mock.patch('logging.Logger.warn') as warn: - server.wait() - warn.assert_called_with('wait() should be called after ' - 'stop() as it waits for existing ' - 'messages to finish processing') - def test_no_target_server(self): transport = oslo_messaging.get_transport(self.conf, url='fake:')