Merge "Respond to rpc requests on stop until hash ring reset"
This commit is contained in:
commit
63de82c3d3
@ -14,6 +14,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
@ -24,6 +25,7 @@ from oslo_log import log
|
||||
import oslo_messaging as messaging
|
||||
from oslo_service import service
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from ironic.common import context
|
||||
from ironic.common import rpc
|
||||
@ -93,6 +95,26 @@ class RPCService(service.Service):
|
||||
'transport': CONF.rpc_transport})
|
||||
|
||||
def stop(self):
|
||||
initial_time = timeutils.utcnow()
|
||||
extend_time = initial_time + datetime.timedelta(
|
||||
seconds=CONF.hash_ring_reset_interval)
|
||||
|
||||
try:
|
||||
self.manager.del_host(deregister=self.deregister)
|
||||
except Exception as e:
|
||||
LOG.exception('Service error occurred when cleaning up '
|
||||
'the RPC manager. Error: %s', e)
|
||||
|
||||
if self.manager.get_online_conductor_count() > 1:
|
||||
# Delay stopping the server until the hash ring has been
|
||||
# reset on the cluster
|
||||
stop_time = timeutils.utcnow()
|
||||
if stop_time < extend_time:
|
||||
stop_wait = max(0, (extend_time - stop_time).seconds)
|
||||
LOG.info('Waiting %(stop_wait)s seconds for hash ring reset.',
|
||||
{'stop_wait': stop_wait})
|
||||
time.sleep(stop_wait)
|
||||
|
||||
try:
|
||||
if self.rpcserver is not None:
|
||||
self.rpcserver.stop()
|
||||
@ -100,11 +122,6 @@ class RPCService(service.Service):
|
||||
except Exception as e:
|
||||
LOG.exception('Service error occurred when stopping the '
|
||||
'RPC server. Error: %s', e)
|
||||
try:
|
||||
self.manager.del_host(deregister=self.deregister)
|
||||
except Exception as e:
|
||||
LOG.exception('Service error occurred when cleaning up '
|
||||
'the RPC manager. Error: %s', e)
|
||||
|
||||
super(RPCService, self).stop(graceful=True)
|
||||
LOG.info('Stopped RPC server for service %(service)s on host '
|
||||
|
@ -334,6 +334,10 @@ class BaseConductorManager(object):
|
||||
|
||||
self._started = False
|
||||
|
||||
def get_online_conductor_count(self):
|
||||
"""Return a count of currently online conductors"""
|
||||
return len(self.dbapi.get_online_conductors())
|
||||
|
||||
def _register_and_validate_hardware_interfaces(self, hardware_types):
|
||||
"""Register and validate hardware interfaces for this conductor.
|
||||
|
||||
|
@ -10,24 +10,28 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging
|
||||
from oslo_service import service as base_service
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from ironic.common import context
|
||||
from ironic.common import rpc
|
||||
from ironic.common import rpc_service
|
||||
from ironic.conductor import manager
|
||||
from ironic.objects import base as objects_base
|
||||
from ironic.tests import base
|
||||
from ironic.tests.unit.db import base as db_base
|
||||
from ironic.tests.unit.db import utils as db_utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
@mock.patch.object(base_service.Service, '__init__', lambda *_, **__: None)
|
||||
class TestRPCService(base.TestCase):
|
||||
class TestRPCService(db_base.DbTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRPCService, self).setUp()
|
||||
@ -35,6 +39,7 @@ class TestRPCService(base.TestCase):
|
||||
mgr_module = "ironic.conductor.manager"
|
||||
mgr_class = "ConductorManager"
|
||||
self.rpc_svc = rpc_service.RPCService(host, mgr_module, mgr_class)
|
||||
self.rpc_svc.manager.dbapi = self.dbapi
|
||||
|
||||
@mock.patch.object(manager.ConductorManager, 'prepare_host', autospec=True)
|
||||
@mock.patch.object(oslo_messaging, 'Target', autospec=True)
|
||||
@ -108,3 +113,75 @@ class TestRPCService(base.TestCase):
|
||||
self.assertFalse(self.rpc_svc._started)
|
||||
self.assertIn("boom", self.rpc_svc._failure)
|
||||
self.assertRaises(SystemExit, self.rpc_svc.wait_for_start)
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||
@mock.patch.object(time, 'sleep', autospec=True)
|
||||
def test_stop_instant(self, mock_sleep, mock_utcnow):
|
||||
# del_host returns instantly
|
||||
mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0)
|
||||
conductor1 = db_utils.get_test_conductor(hostname='fake_host')
|
||||
with mock.patch.object(self.dbapi, 'get_online_conductors',
|
||||
autospec=True) as mock_cond_list:
|
||||
mock_cond_list.return_value = [conductor1]
|
||||
self.rpc_svc.stop()
|
||||
|
||||
# single conductor so exit immediately without waiting
|
||||
mock_sleep.assert_not_called()
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||
@mock.patch.object(time, 'sleep', autospec=True)
|
||||
def test_stop_after_full_reset_interval(self, mock_sleep, mock_utcnow):
|
||||
# del_host returns instantly
|
||||
mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0)
|
||||
conductor1 = db_utils.get_test_conductor(hostname='fake_host')
|
||||
conductor2 = db_utils.get_test_conductor(hostname='other_fake_host')
|
||||
with mock.patch.object(self.dbapi, 'get_online_conductors',
|
||||
autospec=True) as mock_cond_list:
|
||||
# multiple conductors, so wait for hash_ring_reset_interval
|
||||
mock_cond_list.return_value = [conductor1, conductor2]
|
||||
self.rpc_svc.stop()
|
||||
|
||||
# wait the total CONF.hash_ring_reset_interval 15 seconds
|
||||
mock_sleep.assert_has_calls([mock.call(15)])
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||
@mock.patch.object(time, 'sleep', autospec=True)
|
||||
def test_stop_after_remaining_interval(self, mock_sleep, mock_utcnow):
|
||||
mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0)
|
||||
conductor1 = db_utils.get_test_conductor(hostname='fake_host')
|
||||
conductor2 = db_utils.get_test_conductor(hostname='other_fake_host')
|
||||
|
||||
# del_host returns after 5 seconds
|
||||
mock_utcnow.side_effect = [
|
||||
datetime.datetime(2023, 2, 2, 21, 10, 0),
|
||||
datetime.datetime(2023, 2, 2, 21, 10, 5),
|
||||
]
|
||||
with mock.patch.object(self.dbapi, 'get_online_conductors',
|
||||
autospec=True) as mock_cond_list:
|
||||
# multiple conductors, so wait for hash_ring_reset_interval
|
||||
mock_cond_list.return_value = [conductor1, conductor2]
|
||||
self.rpc_svc.stop()
|
||||
|
||||
# wait the remaining 10 seconds
|
||||
mock_sleep.assert_has_calls([mock.call(10)])
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||
@mock.patch.object(time, 'sleep', autospec=True)
|
||||
def test_stop_slow(self, mock_sleep, mock_utcnow):
|
||||
mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0)
|
||||
conductor1 = db_utils.get_test_conductor(hostname='fake_host')
|
||||
conductor2 = db_utils.get_test_conductor(hostname='other_fake_host')
|
||||
|
||||
# del_host returns after 16 seconds
|
||||
mock_utcnow.side_effect = [
|
||||
datetime.datetime(2023, 2, 2, 21, 10, 0),
|
||||
datetime.datetime(2023, 2, 2, 21, 10, 16),
|
||||
]
|
||||
with mock.patch.object(self.dbapi, 'get_online_conductors',
|
||||
autospec=True) as mock_cond_list:
|
||||
# multiple conductors, so wait for hash_ring_reset_interval
|
||||
mock_cond_list.return_value = [conductor1, conductor2]
|
||||
self.rpc_svc.stop()
|
||||
|
||||
# no wait required, CONF.hash_ring_reset_interval already exceeded
|
||||
mock_sleep.assert_not_called()
|
||||
|
@ -0,0 +1,13 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
When a conductor service is stopped it will now continue to respond to RPC
|
||||
requests until ``[DEFAULT]hash_ring_reset_interval`` has elapsed, allowing
|
||||
a hash ring reset to complete on the cluster after conductor is
|
||||
unregistered. This will improve the reliability of the cluster when scaling
|
||||
down or rolling out updates.
|
||||
|
||||
This delay only occurs when there is more than one online conductor,
|
||||
to allow fast restarts on single-node ironic installs (bifrost,
|
||||
metal3).
|
||||
|
Loading…
x
Reference in New Issue
Block a user