diff --git a/ironic/common/rpc_service.py b/ironic/common/rpc_service.py index b0eec7758b..cb0f23c986 100644 --- a/ironic/common/rpc_service.py +++ b/ironic/common/rpc_service.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import signal import sys import time @@ -24,6 +25,7 @@ from oslo_log import log import oslo_messaging as messaging from oslo_service import service from oslo_utils import importutils +from oslo_utils import timeutils from ironic.common import context from ironic.common import rpc @@ -93,6 +95,26 @@ class RPCService(service.Service): 'transport': CONF.rpc_transport}) def stop(self): + initial_time = timeutils.utcnow() + extend_time = initial_time + datetime.timedelta( + seconds=CONF.hash_ring_reset_interval) + + try: + self.manager.del_host(deregister=self.deregister) + except Exception as e: + LOG.exception('Service error occurred when cleaning up ' + 'the RPC manager. Error: %s', e) + + if self.manager.get_online_conductor_count() > 1: + # Delay stopping the server until the hash ring has been + # reset on the cluster + stop_time = timeutils.utcnow() + if stop_time < extend_time: + stop_wait = max(0, (extend_time - stop_time).seconds) + LOG.info('Waiting %(stop_wait)s seconds for hash ring reset.', + {'stop_wait': stop_wait}) + time.sleep(stop_wait) + try: if self.rpcserver is not None: self.rpcserver.stop() @@ -100,11 +122,6 @@ class RPCService(service.Service): except Exception as e: LOG.exception('Service error occurred when stopping the ' 'RPC server. Error: %s', e) - try: - self.manager.del_host(deregister=self.deregister) - except Exception as e: - LOG.exception('Service error occurred when cleaning up ' - 'the RPC manager. Error: %s', e) super(RPCService, self).stop(graceful=True) LOG.info('Stopped RPC server for service %(service)s on host ' diff --git a/ironic/conductor/base_manager.py b/ironic/conductor/base_manager.py index 22ebd57f5a..5c2e4ea959 100644 --- a/ironic/conductor/base_manager.py +++ b/ironic/conductor/base_manager.py @@ -334,6 +334,10 @@ class BaseConductorManager(object): self._started = False + def get_online_conductor_count(self): + """Return a count of currently online conductors""" + return len(self.dbapi.get_online_conductors()) + def _register_and_validate_hardware_interfaces(self, hardware_types): """Register and validate hardware interfaces for this conductor. diff --git a/ironic/tests/unit/common/test_rpc_service.py b/ironic/tests/unit/common/test_rpc_service.py index 8483bfb224..09446ecf81 100644 --- a/ironic/tests/unit/common/test_rpc_service.py +++ b/ironic/tests/unit/common/test_rpc_service.py @@ -10,24 +10,28 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime +import time from unittest import mock from oslo_config import cfg import oslo_messaging from oslo_service import service as base_service +from oslo_utils import timeutils from ironic.common import context from ironic.common import rpc from ironic.common import rpc_service from ironic.conductor import manager from ironic.objects import base as objects_base -from ironic.tests import base +from ironic.tests.unit.db import base as db_base +from ironic.tests.unit.db import utils as db_utils CONF = cfg.CONF @mock.patch.object(base_service.Service, '__init__', lambda *_, **__: None) -class TestRPCService(base.TestCase): +class TestRPCService(db_base.DbTestCase): def setUp(self): super(TestRPCService, self).setUp() @@ -35,6 +39,7 @@ class TestRPCService(base.TestCase): mgr_module = "ironic.conductor.manager" mgr_class = "ConductorManager" self.rpc_svc = rpc_service.RPCService(host, mgr_module, mgr_class) + self.rpc_svc.manager.dbapi = self.dbapi @mock.patch.object(manager.ConductorManager, 'prepare_host', autospec=True) @mock.patch.object(oslo_messaging, 'Target', autospec=True) @@ -108,3 +113,75 @@ class TestRPCService(base.TestCase): self.assertFalse(self.rpc_svc._started) self.assertIn("boom", self.rpc_svc._failure) self.assertRaises(SystemExit, self.rpc_svc.wait_for_start) + + @mock.patch.object(timeutils, 'utcnow', autospec=True) + @mock.patch.object(time, 'sleep', autospec=True) + def test_stop_instant(self, mock_sleep, mock_utcnow): + # del_host returns instantly + mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0) + conductor1 = db_utils.get_test_conductor(hostname='fake_host') + with mock.patch.object(self.dbapi, 'get_online_conductors', + autospec=True) as mock_cond_list: + mock_cond_list.return_value = [conductor1] + self.rpc_svc.stop() + + # single conductor so exit immediately without waiting + mock_sleep.assert_not_called() + + @mock.patch.object(timeutils, 'utcnow', autospec=True) + @mock.patch.object(time, 'sleep', autospec=True) + def test_stop_after_full_reset_interval(self, mock_sleep, mock_utcnow): + # del_host returns instantly + mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0) + conductor1 = db_utils.get_test_conductor(hostname='fake_host') + conductor2 = db_utils.get_test_conductor(hostname='other_fake_host') + with mock.patch.object(self.dbapi, 'get_online_conductors', + autospec=True) as mock_cond_list: + # multiple conductors, so wait for hash_ring_reset_interval + mock_cond_list.return_value = [conductor1, conductor2] + self.rpc_svc.stop() + + # wait the total CONF.hash_ring_reset_interval 15 seconds + mock_sleep.assert_has_calls([mock.call(15)]) + + @mock.patch.object(timeutils, 'utcnow', autospec=True) + @mock.patch.object(time, 'sleep', autospec=True) + def test_stop_after_remaining_interval(self, mock_sleep, mock_utcnow): + mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0) + conductor1 = db_utils.get_test_conductor(hostname='fake_host') + conductor2 = db_utils.get_test_conductor(hostname='other_fake_host') + + # del_host returns after 5 seconds + mock_utcnow.side_effect = [ + datetime.datetime(2023, 2, 2, 21, 10, 0), + datetime.datetime(2023, 2, 2, 21, 10, 5), + ] + with mock.patch.object(self.dbapi, 'get_online_conductors', + autospec=True) as mock_cond_list: + # multiple conductors, so wait for hash_ring_reset_interval + mock_cond_list.return_value = [conductor1, conductor2] + self.rpc_svc.stop() + + # wait the remaining 10 seconds + mock_sleep.assert_has_calls([mock.call(10)]) + + @mock.patch.object(timeutils, 'utcnow', autospec=True) + @mock.patch.object(time, 'sleep', autospec=True) + def test_stop_slow(self, mock_sleep, mock_utcnow): + mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0) + conductor1 = db_utils.get_test_conductor(hostname='fake_host') + conductor2 = db_utils.get_test_conductor(hostname='other_fake_host') + + # del_host returns after 16 seconds + mock_utcnow.side_effect = [ + datetime.datetime(2023, 2, 2, 21, 10, 0), + datetime.datetime(2023, 2, 2, 21, 10, 16), + ] + with mock.patch.object(self.dbapi, 'get_online_conductors', + autospec=True) as mock_cond_list: + # multiple conductors, so wait for hash_ring_reset_interval + mock_cond_list.return_value = [conductor1, conductor2] + self.rpc_svc.stop() + + # no wait required, CONF.hash_ring_reset_interval already exceeded + mock_sleep.assert_not_called() diff --git a/releasenotes/notes/wait_hash_ring_reset-ef8bd548659e9906.yaml b/releasenotes/notes/wait_hash_ring_reset-ef8bd548659e9906.yaml new file mode 100644 index 0000000000..cea3e28f35 --- /dev/null +++ b/releasenotes/notes/wait_hash_ring_reset-ef8bd548659e9906.yaml @@ -0,0 +1,13 @@ +--- +fixes: + - | + When a conductor service is stopped it will now continue to respond to RPC + requests until ``[DEFAULT]hash_ring_reset_interval`` has elapsed, allowing + a hash ring reset to complete on the cluster after conductor is + unregistered. This will improve the reliability of the cluster when scaling + down or rolling out updates. + + This delay only occurs when there is more than one online conductor, + to allow fast restarts on single-node ironic installs (bifrost, + metal3). +