Add RPC calls and handlers needed to support async service steps

This change adds missing RPC calls and handlers to bring service steps
to parity with deploy and clean steps, allowing service steps to run
asynchronously.

Change-Id: I82f95236797f24b84798be92b53deb7ec4f46dce
This commit is contained in:
Jacob Anders 2024-06-26 22:25:01 +10:00
parent 6ed746cf5e
commit b9d1ace728
7 changed files with 350 additions and 49 deletions

View File

@ -710,7 +710,7 @@ RELEASE_MAPPING = {
# the release as a separate block of text, like above.
'master': {
'api': '1.91',
'rpc': '1.59',
'rpc': '1.60',
'objects': {
'Allocation': ['1.1'],
'BIOSSetting': ['1.1'],

View File

@ -97,7 +97,7 @@ class ConductorManager(base_manager.BaseConductorManager):
# NOTE(rloo): This must be in sync with rpcapi.ConductorAPI's.
# NOTE(pas-ha): This also must be in sync with
# ironic.common.release_mappings.RELEASE_MAPPING['master']
RPC_API_VERSION = '1.59'
RPC_API_VERSION = '1.60'
target = messaging.Target(version=RPC_API_VERSION)
@ -990,6 +990,46 @@ class ConductorManager(base_manager.BaseConductorManager):
self._spawn_worker,
deployments.continue_node_deploy, task)
@METRICS.timer('ConductorManager.continue_node_service')
def continue_node_service(self, context, node_id):
"""RPC method to continue servicing a node.
This is useful for servicing tasks that are async. When they complete,
they call back via RPC, a new worker and lock are set up, and servicing
continues. This can also be used to resume servicing on take_over.
:param context: an admin context.
:param node_id: the ID or UUID of a node.
:raises: InvalidStateRequested if the node is not in SERVICEWAIT state
:raises: NoFreeConductorWorker when there is no free worker to start
async task
:raises: NodeLocked if node is locked by another conductor.
:raises: NodeNotFound if the node no longer appears in the database
"""
LOG.debug("RPC continue_node_service called for node %s.", node_id)
with task_manager.acquire(context, node_id, shared=False, patient=True,
purpose='continue node servicing') as task:
node = task.node
expected_states = [states.SERVICEWAIT, states.SERVICING]
if node.provision_state not in expected_states:
raise exception.InvalidStateRequested(_(
'Cannot continue servicing on %(node)s. Node is in '
'%(state)s state; should be in one of %(service_state)s') %
{'node': node.uuid,
'state': node.provision_state,
'service_state': ', '.join(expected_states)})
else:
task.process_event('resume')
task.set_spawn_error_hook(utils.spawn_servicing_error_handler,
task.node)
task.spawn_after(
self._spawn_worker,
servicing.continue_node_service, task)
@METRICS.timer('ConductorManager.do_node_tear_down')
@messaging.expected_exceptions(exception.NoFreeConductorWorker,
exception.NodeLocked,

View File

@ -158,12 +158,13 @@ class ConductorAPI(object):
| 1.57 - Added do_node_service
| 1.58 - Added support for json-rpc port usage
| 1.59 - Added support for attaching/detaching virtual media
| 1.60 - Added continue_node_service
"""
# NOTE(rloo): This must be in sync with manager.ConductorManager's.
# NOTE(pas-ha): This also must be in sync with
# ironic.common.release_mappings.RELEASE_MAPPING['master']
RPC_API_VERSION = '1.59'
RPC_API_VERSION = '1.60'
def __init__(self, topic=None):
super(ConductorAPI, self).__init__()
@ -624,6 +625,20 @@ class ConductorAPI(object):
return cctxt.cast(context, 'continue_node_deploy',
node_id=node_id)
def continue_node_service(self, context, node_id, topic=None):
"""Signal to conductor service to start the next service action.
NOTE(janders): this is an RPC cast, there will be no response or
exception raised by the conductor for this RPC.
:param context: request context.
:param node_id: node id or uuid.
:param topic: RPC topic. Defaults to self.topic.
"""
cctxt = self._prepare_call(topic=topic, version='1.60')
return cctxt.cast(context, 'continue_node_service',
node_id=node_id)
def validate_driver_interfaces(self, context, node_id, topic=None):
"""Validate the `core` and `standardized` interfaces for drivers.

View File

@ -518,6 +518,25 @@ def cleaning_error_handler(task, logmsg, errmsg=None, traceback=False,
task.process_event('fail', target_state=target_state)
def cleanup_servicewait_timeout(task):
"""Cleanup a servicing task after timeout.
:param task: a TaskManager instance.
"""
last_error = (_("Timeout reached while servicing the node. Please "
"check if the ramdisk responsible for the servicing is "
"running on the node. Failed on step %(step)s.") %
{'step': task.node.service_step})
logmsg = ("Servicing for node %(node)s failed. %(error)s" %
{'node': task.node.uuid, 'error': last_error})
# NOTE(janders): this is called from the periodic task for servicewait
# timeouts, via the task manager's process_event(). The node has already
# been moved to SERVICEFAIL, so the error handler doesn't need to set the
# fail state.
servicing_error_handler(task, logmsg, errmsg=last_error,
set_fail_state=False)
def wipe_internal_info_on_power_off(node):
"""Wipe information that should not survive reboot/power off."""
# DHCP may result in a new IP next time.
@ -789,6 +808,11 @@ def spawn_deploying_error_handler(e, node):
_spawn_error_handler(e, node, states.DEPLOYING)
def spawn_servicing_error_handler(e, node):
"""Handle spawning error for node servicing."""
_spawn_error_handler(e, node, states.SERVICING)
def spawn_rescue_error_handler(e, node):
"""Handle spawning error for node rescue."""
if isinstance(e, exception.NoFreeConductorWorker):
@ -966,6 +990,10 @@ def notify_conductor_resume_deploy(task):
notify_conductor_resume_operation(task, 'deploy')
def notify_conductor_resume_service(task):
notify_conductor_resume_operation(task, 'service')
def skip_automated_cleaning(node):
"""Checks if node cleaning needs to be skipped for an specific node.

View File

@ -3196,6 +3196,210 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
self._stop_service()
@mgr_utils.mock_record_keepalive
class DoNodeServiceTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
def setUp(self):
super(DoNodeServiceTestCase, self).setUp()
self.config(group='conductor')
self.power_update = {
'step': 'update_firmware', 'priority': 10, 'interface': 'power'}
self.deploy_update = {
'step': 'update_firmware', 'priority': 10, 'interface': 'deploy'}
self.deploy_magic = {
'step': 'magic_firmware', 'priority': 20, 'interface': 'deploy'}
self.next_clean_step_index = 1
self.deploy_raid = {
'step': 'build_raid', 'priority': 0, 'interface': 'deploy'}
self.service_steps = [self.deploy_update,
self.power_update,
self.deploy_magic]
@mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker',
autospec=True)
def test_continue_node_service_worker_pool_full(self, mock_spawn):
# Test the appropriate exception is raised if the worker pool is full
prv_state = states.SERVICEWAIT
tgt_prv_state = states.AVAILABLE
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
provision_state=prv_state,
target_provision_state=tgt_prv_state,
last_error=None)
self._start_service()
mock_spawn.side_effect = exception.NoFreeConductorWorker()
self.assertRaises(exception.NoFreeConductorWorker,
self.service.continue_node_service,
self.context, node.uuid)
@mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker',
autospec=True)
def test_continue_node_service_wrong_state(self, mock_spawn):
# Test the appropriate exception is raised if node isn't already
# in SERVICEWAIT state
prv_state = states.ACTIVE
tgt_prv_state = states.AVAILABLE
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
provision_state=prv_state,
target_provision_state=tgt_prv_state,
last_error=None)
self._start_service()
self.assertRaises(exception.InvalidStateRequested,
self.service.continue_node_service,
self.context, node.uuid)
self._stop_service()
node.refresh()
# Make sure things were rolled back
self.assertEqual(prv_state, node.provision_state)
self.assertEqual(tgt_prv_state, node.target_provision_state)
# Verify reservation has been cleared.
self.assertIsNone(node.reservation)
@mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker',
autospec=True)
def _continue_node_service(self, return_state, mock_spawn):
# test a node can continue servicing via RPC
prv_state = return_state
tgt_prv_state = states.ACTIVE
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
provision_state=prv_state,
target_provision_state=tgt_prv_state,
last_error=None,
service_step=self.service_steps[0])
self._start_service()
self.service.continue_node_service(self.context, node.uuid)
self._stop_service()
node.refresh()
self.assertEqual(states.SERVICING, node.provision_state)
self.assertEqual(tgt_prv_state, node.target_provision_state)
mock_spawn.assert_called_with(
self.service, servicing.continue_node_service, mock.ANY)
def test_continue_node_service(self):
self._continue_node_service(states.SERVICEWAIT)
def _continue_node_service_abort(self):
last_service_step = self.service_steps[0]
last_service_step['abortable'] = False
last_service_step['abort_after'] = True
driver_info = {'service_steps': self.service_steps,
'service_step_index': 0}
tgt_prov_state = states.ACTIVE
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.SERVICEWAIT,
target_provision_state=tgt_prov_state, last_error=None,
driver_internal_info=driver_info,
service_step=self.service_steps[0])
self._start_service()
self.service.continue_node_service(self.context, node.uuid)
self._stop_service()
node.refresh()
self.assertEqual(states.SERVICEFAIL, node.provision_state)
self.assertEqual(tgt_prov_state, node.target_provision_state)
self.assertIsNotNone(node.last_error)
# assert the clean step name is in the last error message
self.assertIn(self.service_steps[0]['step'], node.last_error)
def test_continue_node_service_abort(self):
self._continue_node_service_abort()
def _continue_node_service_abort_last_service_step(self):
last_service_step = self.service_steps[0]
last_service_step['abortable'] = False
last_service_step['abort_after'] = True
driver_info = {'service_steps': [self.service_steps[0]],
'service_step_index': 0}
tgt_prov_state = states.ACTIVE
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.SERVICEWAIT,
target_provision_state=tgt_prov_state, last_error=None,
driver_internal_info=driver_info,
service_step=self.service_steps[0])
self._start_service()
self.service.continue_node_service(self.context, node.uuid)
self._stop_service()
node.refresh()
self.assertEqual(tgt_prov_state, node.provision_state)
self.assertIsNone(node.target_provision_state)
self.assertIsNone(node.last_error)
def test_continue_node_service_abort_last_service_step(self):
self._continue_node_service_abort_last_service_step()
@mock.patch.object(tenacity, 'stop_after_attempt',
return_value=tenacity.stop_after_attempt(4),
autospec=True)
@mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker',
autospec=True)
def test_continue_node_service_locked(self, mock_spawn, mock_stop):
"""Test that continuing a service via RPC cannot fail due to locks."""
max_attempts = 3
self.config(node_locked_retry_attempts=max_attempts, group='conductor')
driver_info = {'service_steps': [self.service_steps[0]],
'service_step_index': 0}
tgt_prov_state = states.ACTIVE
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.SERVICEWAIT,
target_provision_state=tgt_prov_state, last_error=None,
driver_internal_info=driver_info,
service_step=self.service_steps[0])
self._start_service()
with mock.patch.object(objects.Node, 'reserve', autospec=True) as mck:
mck.side_effect = (
([exception.NodeLocked(node='foo', host='foo')] * max_attempts)
+ [node])
self.service.continue_node_service(self.context, node.uuid)
self._stop_service()
@mock.patch('ironic.drivers.modules.fake.FakePower.validate',
autospec=True)
def test_do_node_service_maintenance(self, mock_validate):
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.ACTIVE,
target_provision_state=states.NOSTATE,
maintenance=True, maintenance_reason='reason')
self._start_service()
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.service.do_node_service,
self.context, node.uuid, {'foo': 'bar'})
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NodeInMaintenance, exc.exc_info[0])
self.assertFalse(mock_validate.called)
@mock.patch.object(task_manager.TaskManager, 'process_event',
autospec=True)
@mock.patch('ironic.drivers.modules.network.flat.FlatNetwork.validate',
autospec=True)
@mock.patch('ironic.drivers.modules.fake.FakePower.validate',
autospec=True)
def test_do_node_service(self, mock_pv, mock_nv, mock_event):
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.ACTIVE,
target_provision_state=states.NOSTATE)
self._start_service()
self.service.do_node_service(self.context,
node.uuid, {'foo': 'bar'})
self.assertTrue(mock_pv.called)
self.assertTrue(mock_nv.called)
mock_event.assert_called_once_with(
mock.ANY,
'service',
callback=mock.ANY,
call_args=(servicing.do_node_service, mock.ANY,
{'foo': 'bar'}, False),
err_handler=mock.ANY, target_state='active')
# end legacy
class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
db_base.DbTestCase):
@mock.patch('ironic.conductor.task_manager.acquire', autospec=True)
@ -8644,52 +8848,6 @@ class ContinueInspectionTestCase(mgr_utils.ServiceSetUpMixin,
self.assertEqual(state, node.provision_state)
@mgr_utils.mock_record_keepalive
class DoNodeServiceTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
def setUp(self):
super(DoNodeServiceTestCase, self).setUp()
@mock.patch('ironic.drivers.modules.fake.FakePower.validate',
autospec=True)
def test_do_node_service_maintenance(self, mock_validate):
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.ACTIVE,
target_provision_state=states.NOSTATE,
maintenance=True, maintenance_reason='reason')
self._start_service()
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.service.do_node_service,
self.context, node.uuid, {'foo': 'bar'})
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NodeInMaintenance, exc.exc_info[0])
self.assertFalse(mock_validate.called)
@mock.patch.object(task_manager.TaskManager, 'process_event',
autospec=True)
@mock.patch('ironic.drivers.modules.network.flat.FlatNetwork.validate',
autospec=True)
@mock.patch('ironic.drivers.modules.fake.FakePower.validate',
autospec=True)
def test_do_node_service(self, mock_pv, mock_nv, mock_event):
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.ACTIVE,
target_provision_state=states.NOSTATE)
self._start_service()
self.service.do_node_service(self.context,
node.uuid, {'foo': 'bar'})
self.assertTrue(mock_pv.called)
self.assertTrue(mock_nv.called)
mock_event.assert_called_once_with(
mock.ANY,
'service',
callback=mock.ANY,
call_args=(servicing.do_node_service, mock.ANY,
{'foo': 'bar'}, False),
err_handler=mock.ANY, target_state='active')
@mock.patch.object(
task_manager.TaskManager, 'spawn_after',
lambda self, _spawn, func, *args, **kwargs: func(*args, **kwargs))

View File

@ -487,6 +487,12 @@ class RPCAPITestCase(db_base.DbTestCase):
version='1.45',
node_id=self.fake_node['uuid'])
def test_continue_node_service(self):
self._test_rpcapi('continue_node_service',
'cast',
version='1.60',
node_id=self.fake_node['uuid'])
def test_get_raid_logical_disk_properties(self):
self._test_rpcapi('get_raid_logical_disk_properties',
'call',

View File

@ -1186,6 +1186,45 @@ class ErrorHandlersTestCase(db_base.DbTestCase):
self.assertEqual(clean_error, self.node.maintenance_reason)
self.assertEqual('clean failure', self.node.fault)
@mock.patch.object(conductor_utils, 'servicing_error_handler',
autospec=True)
def test_cleanup_servicewait_timeout_handler_call(self,
mock_error_handler):
self.task.node.uuid = '18c95393-b775-4887-a274-c45be47509d5'
self.node.service_step = {}
conductor_utils.cleanup_servicewait_timeout(self.task)
mock_error_handler.assert_called_once_with(
self.task,
logmsg="Servicing for node 18c95393-b775-4887-a274-c45be47509d5 "
"failed. Timeout reached while servicing the node. Please "
"check if the ramdisk responsible for the servicing is "
"running on the node. Failed on step {}.",
errmsg="Timeout reached while servicing the node. Please "
"check if the ramdisk responsible for the servicing is "
"running on the node. Failed on step {}.",
set_fail_state=False)
def test_cleanup_servicewait_timeout(self):
self.node.provision_state = states.SERVICEFAIL
target = 'baz'
self.node.target_provision_state = target
self.node.driver_internal_info = {}
self.node.service_step = {'key': 'val'}
service_error = ("Timeout reached while servicing the node. Please "
"check if the ramdisk responsible for the servicing "
"is running on the node. Failed on step "
"{'key': 'val'}.")
self.node.set_driver_internal_info('servicing_reboot', True)
self.node.set_driver_internal_info('service_step_index', 0)
conductor_utils.cleanup_servicewait_timeout(self.task)
self.assertEqual({}, self.node.service_step)
self.assertNotIn('service_step_index', self.node.driver_internal_info)
self.assertFalse(self.task.process_event.called)
self.assertTrue(self.node.maintenance)
self.assertEqual(service_error, self.node.maintenance_reason)
self.assertEqual('service failure', self.node.fault)
@mock.patch.object(conductor_utils.LOG, 'error', autospec=True)
def _test_cleaning_error_handler(self, mock_log_error,
prov_state=states.CLEANING):
@ -1327,6 +1366,21 @@ class ErrorHandlersTestCase(db_base.DbTestCase):
self.assertFalse(self.node.save.called)
self.assertFalse(log_mock.warning.called)
@mock.patch.object(conductor_utils, 'LOG', autospec=True)
def test_spawn_servicing_error_handler_no_worker(self, log_mock):
exc = exception.NoFreeConductorWorker()
conductor_utils.spawn_servicing_error_handler(exc, self.node)
self.node.save.assert_called_once_with()
self.assertIn('No free conductor workers', self.node.last_error)
self.assertTrue(log_mock.warning.called)
@mock.patch.object(conductor_utils, 'LOG', autospec=True)
def test_spawn_servicing_error_handler_other_error(self, log_mock):
exc = Exception('foo')
conductor_utils.spawn_servicing_error_handler(exc, self.node)
self.assertFalse(self.node.save.called)
self.assertFalse(log_mock.warning.called)
@mock.patch.object(conductor_utils, 'LOG', autospec=True)
def test_spawn_rescue_error_handler_no_worker(self, log_mock):
exc = exception.NoFreeConductorWorker()