On rpc service stop, wait for node reservation release
Instead of clearing existing reservations at the beginning of del_host, wait for the tasks holding them to go to completion. This check continues indefinitely until the conductor process exits due to one of: - All reservations for this conductor are released - CONF.graceful_shutdown_timeout has elapsed - The process manager (systemd, kubernetes) sends SIGKILL after the configured graceful period Because the default values of [DEFAULT]graceful_shutdown_timeout and [conductor]heartbeat_timeout are the same (60s) no other conductor will claim a node as an orphan until this conductor exits. Change-Id: Ib8db915746228cd87272740825aaaea1fdf953c7
This commit is contained in:
parent
e54ee2ba4c
commit
6a9e319fbe
@ -100,7 +100,8 @@ class RPCService(service.Service):
|
|||||||
seconds=CONF.hash_ring_reset_interval)
|
seconds=CONF.hash_ring_reset_interval)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.manager.del_host(deregister=self.deregister)
|
self.manager.del_host(deregister=self.deregister,
|
||||||
|
clear_node_reservations=False)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception('Service error occurred when cleaning up '
|
LOG.exception('Service error occurred when cleaning up '
|
||||||
'the RPC manager. Error: %s', e)
|
'the RPC manager. Error: %s', e)
|
||||||
@ -127,6 +128,21 @@ class RPCService(service.Service):
|
|||||||
LOG.info('Stopped RPC server for service %(service)s on host '
|
LOG.info('Stopped RPC server for service %(service)s on host '
|
||||||
'%(host)s.',
|
'%(host)s.',
|
||||||
{'service': self.topic, 'host': self.host})
|
{'service': self.topic, 'host': self.host})
|
||||||
|
|
||||||
|
# Wait for reservation locks held by this conductor.
|
||||||
|
# The conductor process will end when:
|
||||||
|
# - All reservations for this conductor are released
|
||||||
|
# - CONF.graceful_shutdown_timeout has elapsed
|
||||||
|
# - The process manager (systemd, kubernetes) sends SIGKILL after the
|
||||||
|
# configured graceful period
|
||||||
|
graceful_time = initial_time + datetime.timedelta(
|
||||||
|
seconds=CONF.graceful_shutdown_timeout)
|
||||||
|
while (self.manager.has_reserved()
|
||||||
|
and graceful_time > timeutils.utcnow()):
|
||||||
|
LOG.info('Waiting for reserved nodes to clear on host %(host)s',
|
||||||
|
{'host': self.host})
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
rpc.set_global_manager(None)
|
rpc.set_global_manager(None)
|
||||||
|
|
||||||
def _handle_signal(self, signo, frame):
|
def _handle_signal(self, signo, frame):
|
||||||
|
@ -298,13 +298,15 @@ class BaseConductorManager(object):
|
|||||||
# This is only used in tests currently. Delete it?
|
# This is only used in tests currently. Delete it?
|
||||||
self._periodic_task_callables = periodic_task_callables
|
self._periodic_task_callables = periodic_task_callables
|
||||||
|
|
||||||
def del_host(self, deregister=True):
|
def del_host(self, deregister=True, clear_node_reservations=True):
|
||||||
# Conductor deregistration fails if called on non-initialized
|
# Conductor deregistration fails if called on non-initialized
|
||||||
# conductor (e.g. when rpc server is unreachable).
|
# conductor (e.g. when rpc server is unreachable).
|
||||||
if not hasattr(self, 'conductor'):
|
if not hasattr(self, 'conductor'):
|
||||||
return
|
return
|
||||||
self._shutdown = True
|
self._shutdown = True
|
||||||
self._keepalive_evt.set()
|
self._keepalive_evt.set()
|
||||||
|
|
||||||
|
if clear_node_reservations:
|
||||||
# clear all locks held by this conductor before deregistering
|
# clear all locks held by this conductor before deregistering
|
||||||
self.dbapi.clear_node_reservations_for_conductor(self.host)
|
self.dbapi.clear_node_reservations_for_conductor(self.host)
|
||||||
if deregister:
|
if deregister:
|
||||||
@ -338,6 +340,15 @@ class BaseConductorManager(object):
|
|||||||
"""Return a count of currently online conductors"""
|
"""Return a count of currently online conductors"""
|
||||||
return len(self.dbapi.get_online_conductors())
|
return len(self.dbapi.get_online_conductors())
|
||||||
|
|
||||||
|
def has_reserved(self):
|
||||||
|
"""Determines if this host currently has any reserved nodes
|
||||||
|
|
||||||
|
:returns: True if this host has reserved nodes
|
||||||
|
"""
|
||||||
|
return bool(self.dbapi.get_nodeinfo_list(
|
||||||
|
filters={'reserved_by_any_of': [self.host]},
|
||||||
|
limit=1))
|
||||||
|
|
||||||
def _register_and_validate_hardware_interfaces(self, hardware_types):
|
def _register_and_validate_hardware_interfaces(self, hardware_types):
|
||||||
"""Register and validate hardware interfaces for this conductor.
|
"""Register and validate hardware interfaces for this conductor.
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ from oslo_utils import timeutils
|
|||||||
from ironic.common import context
|
from ironic.common import context
|
||||||
from ironic.common import rpc
|
from ironic.common import rpc
|
||||||
from ironic.common import rpc_service
|
from ironic.common import rpc_service
|
||||||
|
from ironic.common import service as ironic_service
|
||||||
from ironic.conductor import manager
|
from ironic.conductor import manager
|
||||||
from ironic.objects import base as objects_base
|
from ironic.objects import base as objects_base
|
||||||
from ironic.tests.unit.db import base as db_base
|
from ironic.tests.unit.db import base as db_base
|
||||||
@ -39,6 +40,8 @@ class TestRPCService(db_base.DbTestCase):
|
|||||||
mgr_module = "ironic.conductor.manager"
|
mgr_module = "ironic.conductor.manager"
|
||||||
mgr_class = "ConductorManager"
|
mgr_class = "ConductorManager"
|
||||||
self.rpc_svc = rpc_service.RPCService(host, mgr_module, mgr_class)
|
self.rpc_svc = rpc_service.RPCService(host, mgr_module, mgr_class)
|
||||||
|
# register oslo_service DEFAULT config options
|
||||||
|
ironic_service.process_launcher()
|
||||||
self.rpc_svc.manager.dbapi = self.dbapi
|
self.rpc_svc.manager.dbapi = self.dbapi
|
||||||
|
|
||||||
@mock.patch.object(manager.ConductorManager, 'prepare_host', autospec=True)
|
@mock.patch.object(manager.ConductorManager, 'prepare_host', autospec=True)
|
||||||
@ -123,6 +126,9 @@ class TestRPCService(db_base.DbTestCase):
|
|||||||
with mock.patch.object(self.dbapi, 'get_online_conductors',
|
with mock.patch.object(self.dbapi, 'get_online_conductors',
|
||||||
autospec=True) as mock_cond_list:
|
autospec=True) as mock_cond_list:
|
||||||
mock_cond_list.return_value = [conductor1]
|
mock_cond_list.return_value = [conductor1]
|
||||||
|
with mock.patch.object(self.dbapi, 'get_nodeinfo_list',
|
||||||
|
autospec=True) as mock_nodeinfo_list:
|
||||||
|
mock_nodeinfo_list.return_value = []
|
||||||
self.rpc_svc.stop()
|
self.rpc_svc.stop()
|
||||||
|
|
||||||
# single conductor so exit immediately without waiting
|
# single conductor so exit immediately without waiting
|
||||||
@ -139,7 +145,11 @@ class TestRPCService(db_base.DbTestCase):
|
|||||||
autospec=True) as mock_cond_list:
|
autospec=True) as mock_cond_list:
|
||||||
# multiple conductors, so wait for hash_ring_reset_interval
|
# multiple conductors, so wait for hash_ring_reset_interval
|
||||||
mock_cond_list.return_value = [conductor1, conductor2]
|
mock_cond_list.return_value = [conductor1, conductor2]
|
||||||
|
with mock.patch.object(self.dbapi, 'get_nodeinfo_list',
|
||||||
|
autospec=True) as mock_nodeinfo_list:
|
||||||
|
mock_nodeinfo_list.return_value = []
|
||||||
self.rpc_svc.stop()
|
self.rpc_svc.stop()
|
||||||
|
mock_nodeinfo_list.assert_called_once()
|
||||||
|
|
||||||
# wait the total CONF.hash_ring_reset_interval 15 seconds
|
# wait the total CONF.hash_ring_reset_interval 15 seconds
|
||||||
mock_sleep.assert_has_calls([mock.call(15)])
|
mock_sleep.assert_has_calls([mock.call(15)])
|
||||||
@ -160,7 +170,11 @@ class TestRPCService(db_base.DbTestCase):
|
|||||||
autospec=True) as mock_cond_list:
|
autospec=True) as mock_cond_list:
|
||||||
# multiple conductors, so wait for hash_ring_reset_interval
|
# multiple conductors, so wait for hash_ring_reset_interval
|
||||||
mock_cond_list.return_value = [conductor1, conductor2]
|
mock_cond_list.return_value = [conductor1, conductor2]
|
||||||
|
with mock.patch.object(self.dbapi, 'get_nodeinfo_list',
|
||||||
|
autospec=True) as mock_nodeinfo_list:
|
||||||
|
mock_nodeinfo_list.return_value = []
|
||||||
self.rpc_svc.stop()
|
self.rpc_svc.stop()
|
||||||
|
mock_nodeinfo_list.assert_called_once()
|
||||||
|
|
||||||
# wait the remaining 10 seconds
|
# wait the remaining 10 seconds
|
||||||
mock_sleep.assert_has_calls([mock.call(10)])
|
mock_sleep.assert_has_calls([mock.call(10)])
|
||||||
@ -181,7 +195,35 @@ class TestRPCService(db_base.DbTestCase):
|
|||||||
autospec=True) as mock_cond_list:
|
autospec=True) as mock_cond_list:
|
||||||
# multiple conductors, so wait for hash_ring_reset_interval
|
# multiple conductors, so wait for hash_ring_reset_interval
|
||||||
mock_cond_list.return_value = [conductor1, conductor2]
|
mock_cond_list.return_value = [conductor1, conductor2]
|
||||||
|
with mock.patch.object(self.dbapi, 'get_nodeinfo_list',
|
||||||
|
autospec=True) as mock_nodeinfo_list:
|
||||||
|
mock_nodeinfo_list.return_value = []
|
||||||
self.rpc_svc.stop()
|
self.rpc_svc.stop()
|
||||||
|
mock_nodeinfo_list.assert_called_once()
|
||||||
|
|
||||||
# no wait required, CONF.hash_ring_reset_interval already exceeded
|
# no wait required, CONF.hash_ring_reset_interval already exceeded
|
||||||
mock_sleep.assert_not_called()
|
mock_sleep.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||||
|
@mock.patch.object(time, 'sleep', autospec=True)
|
||||||
|
def test_stop_has_reserved(self, mock_sleep, mock_utcnow):
|
||||||
|
mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0)
|
||||||
|
conductor1 = db_utils.get_test_conductor(hostname='fake_host')
|
||||||
|
conductor2 = db_utils.get_test_conductor(hostname='other_fake_host')
|
||||||
|
|
||||||
|
with mock.patch.object(self.dbapi, 'get_online_conductors',
|
||||||
|
autospec=True) as mock_cond_list:
|
||||||
|
# multiple conductors, so wait for hash_ring_reset_interval
|
||||||
|
mock_cond_list.return_value = [conductor1, conductor2]
|
||||||
|
with mock.patch.object(self.dbapi, 'get_nodeinfo_list',
|
||||||
|
autospec=True) as mock_nodeinfo_list:
|
||||||
|
# 3 calls to manager has_reserved until all reservation locks
|
||||||
|
# are released
|
||||||
|
mock_nodeinfo_list.side_effect = [['a', 'b'], ['a'], []]
|
||||||
|
self.rpc_svc.stop()
|
||||||
|
self.assertEqual(3, mock_nodeinfo_list.call_count)
|
||||||
|
|
||||||
|
# wait the remaining 15 seconds, then wait until has_reserved
|
||||||
|
# returns False
|
||||||
|
mock_sleep.assert_has_calls(
|
||||||
|
[mock.call(15), mock.call(1), mock.call(1)])
|
||||||
|
@ -0,0 +1,15 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
On shutdown the conductor will wait for at most
|
||||||
|
``[DEFAULT]graceful_shutdown_timeout`` seconds for existing lock node
|
||||||
|
reservations to clear. Previously lock reservations were cleared
|
||||||
|
immediately, which in some cases would result in nodes going into a failed
|
||||||
|
state.
|
||||||
|
upgrade:
|
||||||
|
- |
|
||||||
|
``[DEFAULT]graceful_shutdown_timeout`` defaults to 60s. Systemd
|
||||||
|
``TimeoutStopSec`` defaults to 30s. Kubernetes
|
||||||
|
``terminationGracePeriodSeconds`` defaults to 90s. It is recommended to
|
||||||
|
align the value of ``[DEFAULT]graceful_shutdown_timeout`` with the graceful
|
||||||
|
timeout of the process manager of the conductor process.
|
Loading…
Reference in New Issue
Block a user