diff --git a/doc/source/contributor/architecture.rst b/doc/source/contributor/architecture.rst index e246fd28b1..84e58ee1a0 100644 --- a/doc/source/contributor/architecture.rst +++ b/doc/source/contributor/architecture.rst @@ -42,7 +42,7 @@ Drivers may run their own periodic tasks, i.e. actions run repeatedly after a certain amount of time. Such a task is created by using the periodic_ decorator on an interface method. For example -:: +.. code-block:: python from futurist import periodics @@ -55,6 +55,11 @@ decorator on an interface method. For example Here the ``spacing`` argument is a period in seconds for a given periodic task. For example 'spacing=5' means every 5 seconds. +Starting with the Yoga cycle, there is also a new decorator +:py:func:`ironic.conductor.periodics.node_periodic` to create periodic tasks +that handle nodes. See :ref:`deploy steps documentation ` +for an example. + Driver-Specific Steps --------------------- diff --git a/doc/source/contributor/deploy-steps.rst b/doc/source/contributor/deploy-steps.rst index e6407d41e3..a6cd6809d2 100644 --- a/doc/source/contributor/deploy-steps.rst +++ b/doc/source/contributor/deploy-steps.rst @@ -188,6 +188,95 @@ following pattern: return deploy_utils.reboot_to_finish_step(task) +.. _deploy-steps-polling: + +Polling for completion +~~~~~~~~~~~~~~~~~~~~~~~ + +Finally, you may want to poll the BMC until the operation is complete. Often +enough, this also involves a reboot. In this case you can use the +:py:func:`ironic.conductor.periodics.node_periodic` decorator to create a +periodic task that operates on relevant nodes: + +.. code-block:: python + + from ironic.common import states + from ironic.common import utils + from ironic.conductor import periodics + from ironic.drivers import base + from ironic.drivers.modules import deploy_utils + + _STATUS_CHECK_INTERVAL = ... # better use a configuration option + + class MyManagement(base.ManagementInterface): + ... + + @base.clean_step(priority=0) + def my_action(self, task): + ... + + reboot_required = ... # your step may or may not need rebooting + + # Make this node as running my_action. Often enough you will store + # some useful data rather than a boolean flag. + utils.set_node_nested_field(task.node, 'driver_internal_info', + 'in_my_action', True) + + # Tell ironic that... + deploy_utils.set_async_step_flags( + node, + # ... we're waiting for IPA to come back after reboot + reboot=reboot_required, + # ... the current step shouldn't be entered again + skip_current_step=True, + # ... we'll be polling until the step is done + polling=True) + + if reboot_required: + return deploy_utils.reboot_to_finish_step(task) + + @periodics.node_periodic( + purpose='checking my action status', + spacing=_STATUS_CHECK_INTERVAL, + filters={ + # Skip nodes that already have a lock + 'reserved': False, + # Only consider nodes that are waiting for cleaning or failed + # on timeout. + 'provision_state_in': [states.CLEANWAIT, states.CLEANFAIL], + }, + # Load driver_internal_info from the database on listing + predicate_extra_fields=['driver_internal_info'], + # Only consider nodes with in_my_action + predicate=lambda n: n.driver_internal_info.get('in_my_action'), + ) + def check_my_action(self, task, manager, context): + # Double-check that the node is managed by this interface + if not isinstance(task.driver.management, MyManagement): + return + + if not needs_actions(): # insert your checks here + return + + task.upgrade_lock() + + ... # do any required updates + + # Drop the flag so that this node is no longer considered + utils.pop_node_nested_field(task.node, 'driver_internal_info', + 'in_my_action') + +Note that creating a ``task`` involves an additional database query, so you +want to avoid creating them for too many nodes in your periodic tasks. Instead: + +* Try to use precise ``filters`` to filter out nodes on the database level. + Using ``reserved`` and ``provision_state``/``provision_state_in`` are + recommended in most cases. See + :py:meth:`ironic.db.api.Connection.get_nodeinfo_list` for a list of possible + filters. +* Use ``predicate`` to filter on complex fields such as + ``driver_internal_info``. Predicates are checked before tasks are created. + Implementing RAID ----------------- diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index ed54aa67b1..4c49bc7893 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -45,7 +45,6 @@ import datetime import queue import eventlet -from futurist import periodics from futurist import waiters from ironic_lib import metrics_utils from oslo_log import log @@ -66,6 +65,7 @@ from ironic.conductor import base_manager from ironic.conductor import cleaning from ironic.conductor import deployments from ironic.conductor import notification_utils as notify_utils +from ironic.conductor import periodics from ironic.conductor import steps as conductor_steps from ironic.conductor import task_manager from ironic.conductor import utils @@ -1497,10 +1497,15 @@ class ConductorManager(base_manager.BaseConductorManager): eventlet.sleep(0) @METRICS.timer('ConductorManager._power_failure_recovery') - @periodics.periodic(spacing=CONF.conductor.power_failure_recovery_interval, - enabled=bool( - CONF.conductor.power_failure_recovery_interval)) - def _power_failure_recovery(self, context): + @periodics.node_periodic( + purpose='power failure recovery', + spacing=CONF.conductor.power_failure_recovery_interval, + # NOTE(kaifeng) To avoid conflicts with periodic task of the + # regular power state checking, maintenance is still a required + # condition. + filters={'maintenance': True, 'fault': faults.POWER_FAILURE}, + ) + def _power_failure_recovery(self, task, context): """Periodic task to check power states for nodes in maintenance. Attempt to grab a lock and sync only if the following @@ -1511,19 +1516,6 @@ class ConductorManager(base_manager.BaseConductorManager): 3) Node is not reserved. 4) Node is not in the ENROLL state. """ - def should_sync_power_state_for_recovery(task): - """Check if ironic should sync power state for recovery.""" - - # NOTE(dtantsur): it's also pointless (and dangerous) to - # sync power state when a power action is in progress - if (task.node.provision_state == states.ENROLL - or not task.node.maintenance - or task.node.fault != faults.POWER_FAILURE - or task.node.target_power_state - or task.node.reservation): - return False - return True - def handle_recovery(task, actual_power_state): """Handle recovery when power sync is succeeded.""" task.upgrade_lock() @@ -1546,48 +1538,33 @@ class ConductorManager(base_manager.BaseConductorManager): notify_utils.emit_power_state_corrected_notification( task, old_power_state) - # NOTE(kaifeng) To avoid conflicts with periodic task of the - # regular power state checking, maintenance is still a required - # condition. - filters = {'maintenance': True, - 'fault': faults.POWER_FAILURE} - node_iter = self.iter_nodes(fields=['id'], filters=filters) - for (node_uuid, driver, conductor_group, node_id) in node_iter: - try: - with task_manager.acquire(context, node_uuid, - purpose='power failure recovery', - shared=True) as task: - if not should_sync_power_state_for_recovery(task): - continue - try: - # Validate driver info in case of parameter changed - # in maintenance. - task.driver.power.validate(task) - # The driver may raise an exception, or may return - # ERROR. Handle both the same way. - power_state = task.driver.power.get_power_state(task) - if power_state == states.ERROR: - raise exception.PowerStateFailure( - _("Power driver returned ERROR state " - "while trying to get power state.")) - except Exception as e: - LOG.debug("During power_failure_recovery, could " - "not get power state for node %(node)s, " - "Error: %(err)s.", - {'node': task.node.uuid, 'err': e}) - else: - handle_recovery(task, power_state) - except exception.NodeNotFound: - LOG.info("During power_failure_recovery, node %(node)s was " - "not found and presumed deleted by another process.", - {'node': node_uuid}) - except exception.NodeLocked: - LOG.info("During power_failure_recovery, node %(node)s was " - "already locked by another process. Skip.", - {'node': node_uuid}) - finally: - # Yield on every iteration - eventlet.sleep(0) + # NOTE(dtantsur): it's also pointless (and dangerous) to + # sync power state when a power action is in progress + if (task.node.provision_state == states.ENROLL + or not task.node.maintenance + or task.node.fault != faults.POWER_FAILURE + or task.node.target_power_state + or task.node.reservation): + return + + try: + # Validate driver info in case of parameter changed + # in maintenance. + task.driver.power.validate(task) + # The driver may raise an exception, or may return + # ERROR. Handle both the same way. + power_state = task.driver.power.get_power_state(task) + if power_state == states.ERROR: + raise exception.PowerStateFailure( + _("Power driver returned ERROR state " + "while trying to get power state.")) + except Exception as e: + LOG.debug("During power_failure_recovery, could " + "not get power state for node %(node)s, " + "Error: %(err)s.", + {'node': task.node.uuid, 'err': e}) + else: + handle_recovery(task, power_state) @METRICS.timer('ConductorManager._check_deploy_timeouts') @periodics.periodic( @@ -1869,9 +1846,17 @@ class ConductorManager(base_manager.BaseConductorManager): ) @METRICS.timer('ConductorManager._sync_local_state') - @periodics.periodic(spacing=CONF.conductor.sync_local_state_interval, - enabled=CONF.conductor.sync_local_state_interval > 0) - def _sync_local_state(self, context): + @periodics.node_periodic( + purpose='node take over', + spacing=CONF.conductor.sync_local_state_interval, + filters={'reserved': False, 'maintenance': False, + 'provision_state': states.ACTIVE}, + predicate_extra_fields=['conductor_affinity'], + predicate=lambda n, m: n.conductor_affinity != m.conductor.id, + limit=lambda: CONF.conductor.periodic_max_workers, + shared_task=False, + ) + def _sync_local_state(self, task, context): """Perform any actions necessary to sync local state. This is called periodically to refresh the conductor's copy of the @@ -1880,40 +1865,20 @@ class ConductorManager(base_manager.BaseConductorManager): The ensuing actions could include preparing a PXE environment, updating the DHCP server, and so on. """ - filters = {'reserved': False, - 'maintenance': False, - 'provision_state': states.ACTIVE} - node_iter = self.iter_nodes(fields=['id', 'conductor_affinity'], - filters=filters) + # NOTE(tenbrae): now that we have the lock, check again to + # avoid racing with deletes and other state changes + node = task.node + if (node.maintenance + or node.conductor_affinity == self.conductor.id + or node.provision_state != states.ACTIVE): + return False - workers_count = 0 - for (node_uuid, driver, conductor_group, node_id, - conductor_affinity) in node_iter: - if conductor_affinity == self.conductor.id: - continue - - # Node is mapped here, but not updated by this conductor last - try: - with task_manager.acquire(context, node_uuid, - purpose='node take over') as task: - # NOTE(tenbrae): now that we have the lock, check again to - # avoid racing with deletes and other state changes - node = task.node - if (node.maintenance - or node.conductor_affinity == self.conductor.id - or node.provision_state != states.ACTIVE): - continue - - task.spawn_after(self._spawn_worker, - self._do_takeover, task) - - except exception.NoFreeConductorWorker: - break - except (exception.NodeLocked, exception.NodeNotFound): - continue - workers_count += 1 - if workers_count == CONF.conductor.periodic_max_workers: - break + try: + task.spawn_after(self._spawn_worker, self._do_takeover, task) + except exception.NoFreeConductorWorker: + raise periodics.Stop() + else: + return True @METRICS.timer('ConductorManager.validate_driver_interfaces') @messaging.expected_exceptions(exception.NodeLocked) diff --git a/ironic/conductor/periodics.py b/ironic/conductor/periodics.py new file mode 100644 index 0000000000..ead5cbf08a --- /dev/null +++ b/ironic/conductor/periodics.py @@ -0,0 +1,151 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Conductor periodics.""" + +import collections +import functools +import inspect + +import eventlet +from futurist import periodics +from oslo_log import log + +from ironic.common import exception +from ironic.conductor import base_manager +from ironic.conductor import task_manager + + +LOG = log.getLogger(__name__) + + +def periodic(spacing, enabled=True, **kwargs): + """A decorator to define a periodic task. + + :param spacing: how often (in seconds) to run the periodic task. + :param enabled: whether the task is enabled; defaults to ``spacing > 0``. + """ + return periodics.periodic(spacing=spacing, + enabled=enabled and spacing > 0, + **kwargs) + + +class Stop(Exception): + """A signal to stop the current iteration of a periodic task.""" + + +def node_periodic(purpose, spacing, enabled=True, filters=None, + predicate=None, predicate_extra_fields=(), limit=None, + shared_task=True): + """A decorator to define a periodic task to act on nodes. + + Defines a periodic task that fetches the list of nodes mapped to the + current conductor which satisfy the provided filters. + + The decorated function must be a method on either the conductor manager + or a hardware interface. The signature is: + + * for conductor manager: ``(self, task, context)`` + * for hardware interfaces: ``(self, task, manager, context)``. + + ``NodeNotFound`` and ``NodeLocked`` exceptions are ignored. Raise ``Stop`` + to abort the current iteration of the task and reschedule it. + + :param purpose: a human-readable description of the activity, e.g. + "verifying that the cat is purring". + :param spacing: how often (in seconds) to run the periodic task. + :param enabled: whether the task is enabled; defaults to ``spacing > 0``. + :param filters: database-level filters for the nodes. + :param predicate: a callable to run on the fetched nodes *before* creating + a task for them. The only parameter will be a named tuple with fields + ``uuid``, ``driver``, ``conductor_group`` plus everything from + ``predicate_extra_fields``. If the callable accepts a 2nd parameter, + it will be the conductor manager instance. + :param predicate_extra_fields: extra fields to fetch on the initial + request and pass into the ``predicate``. Must not contain ``uuid``, + ``driver`` and ``conductor_group`` since they are always included. + :param limit: how many nodes to process before stopping the current + iteration. If ``predicate`` returns ``False``, the node is not counted. + If the decorated function returns ``False``, the node is not counted + either. Can be a callable, in which case it will be called on each + iteration to determine the limit. + :param shared_task: if ``True``, the task will have a shared lock. It is + recommended to start with a shared lock and upgrade it only if needed. + """ + node_type = collections.namedtuple( + 'Node', + ['uuid', 'driver', 'conductor_group'] + list(predicate_extra_fields) + ) + + # Accepting a conductor manager is a bit of an edge case, doing a bit of + # a signature magic to avoid passing it everywhere. + accepts_manager = (predicate is not None + and len(inspect.signature(predicate).parameters) > 1) + + def decorator(func): + @periodic(spacing=spacing, enabled=enabled) + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + # Make it work with both drivers and the conductor manager + if isinstance(self, base_manager.BaseConductorManager): + manager = self + context = args[0] + else: + manager = args[0] + context = args[1] + + if callable(limit): + local_limit = limit() + else: + local_limit = limit + assert local_limit is None or local_limit > 0 + + nodes = manager.iter_nodes(filters=filters, + fields=predicate_extra_fields) + for (node_uuid, *other) in nodes: + if predicate is not None: + node = node_type(node_uuid, *other) + if accepts_manager: + result = predicate(node, manager) + else: + result = predicate(node) + if not result: + continue + + try: + with task_manager.acquire(context, node_uuid, + purpose=purpose, + shared=shared_task) as task: + result = func(self, task, *args, **kwargs) + except exception.NodeNotFound: + LOG.info("During %(action)s, node %(node)s was not found " + "and presumed deleted by another process.", + {'node': node_uuid, 'action': purpose}) + except exception.NodeLocked: + LOG.info("During %(action)s, node %(node)s was already " + "locked by another process. Skip.", + {'node': node_uuid, 'action': purpose}) + except Stop: + break + finally: + # Yield on every iteration + eventlet.sleep(0) + + if (local_limit is not None + and (result is None or result)): + local_limit -= 1 + if not local_limit: + return + + return wrapper + + return decorator diff --git a/ironic/drivers/modules/drac/bios.py b/ironic/drivers/modules/drac/bios.py index e40089a4ff..795e4f1508 100644 --- a/ironic/drivers/modules/drac/bios.py +++ b/ironic/drivers/modules/drac/bios.py @@ -15,7 +15,6 @@ DRAC BIOS configuration specific methods """ -from futurist import periodics from ironic_lib import metrics_utils from oslo_log import log as logging from oslo_utils import importutils @@ -23,7 +22,7 @@ from oslo_utils import timeutils from ironic.common import exception from ironic.common.i18n import _ -from ironic.conductor import task_manager +from ironic.conductor import periodics from ironic.conductor import utils as manager_utils from ironic.conf import CONF from ironic.drivers import base @@ -151,9 +150,16 @@ class DracWSManBIOS(base.BIOSInterface): # spacing since BIOS jobs could be comparatively shorter in time than # RAID ones currently using the raid spacing to avoid errors # spacing parameter for periodic method - @periodics.periodic( - spacing=CONF.drac.query_raid_config_job_status_interval) - def _query_bios_config_job_status(self, manager, context): + @periodics.node_periodic( + purpose='checking async bios configuration jobs', + spacing=CONF.drac.query_raid_config_job_status_interval, + filters={'reserved': False, 'maintenance': False}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: ( + n.driver_internal_info.get('bios_config_job_ids') + or n.driver_internal_info.get('factory_reset_time_before_reboot')), + ) + def _query_bios_config_job_status(self, task, manager, context): """Periodic task to check the progress of running BIOS config jobs. :param manager: an instance of Ironic Conductor Manager with @@ -161,47 +167,17 @@ class DracWSManBIOS(base.BIOSInterface): :param context: context of the request, needed when acquiring a lock on a node. For access control. """ + # skip a node not being managed by idrac driver + if not isinstance(task.driver.bios, DracWSManBIOS): + return - filters = {'reserved': False, 'maintenance': False} - fields = ['driver_internal_info'] + # check bios_config_job_id exist & checks job is completed + if task.node.driver_internal_info.get("bios_config_job_ids"): + self._check_node_bios_jobs(task) - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - # NOTE(TheJulia) Evaluate if work is actually required before - # creating a task for every node in the deployment which does - # not have a lock and is not in maintenance mode. - if (not driver_internal_info.get("bios_config_job_ids") - and not driver_internal_info.get( - "factory_reset_time_before_reboot")): - continue - - lock_purpose = 'checking async bios configuration jobs' - # Performing read-only/non-destructive work with shared lock - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - # skip a node not being managed by idrac driver - if not isinstance(task.driver.bios, DracWSManBIOS): - continue - - # check bios_config_job_id exist & checks job is completed - if driver_internal_info.get("bios_config_job_ids"): - self._check_node_bios_jobs(task) - - if driver_internal_info.get( - "factory_reset_time_before_reboot"): - self._check_last_system_inventory_changed(task) - - except exception.NodeNotFound: - LOG.info("During query_bios_config_job_status, node " - "%(node)s was not found and presumed deleted by " - "another process.", {'node': node_uuid}) - except exception.NodeLocked: - LOG.info("During query_bios_config_job_status, node " - "%(node)s was already locked by another process. " - "Skip.", {'node': node_uuid}) + if task.node.driver_internal_info.get( + "factory_reset_time_before_reboot"): + self._check_last_system_inventory_changed(task) def _check_last_system_inventory_changed(self, task): """Check the progress of last system inventory time of a node. diff --git a/ironic/drivers/modules/drac/management.py b/ironic/drivers/modules/drac/management.py index dd614b42a9..9b2b534311 100644 --- a/ironic/drivers/modules/drac/management.py +++ b/ironic/drivers/modules/drac/management.py @@ -23,7 +23,6 @@ DRAC management interface import json import time -from futurist import periodics from ironic_lib import metrics_utils import jsonschema from jsonschema import exceptions as json_schema_exc @@ -34,6 +33,7 @@ from ironic.common import boot_devices from ironic.common import exception from ironic.common.i18n import _ from ironic.common import molds +from ironic.conductor import periodics from ironic.conductor import task_manager from ironic.conductor import utils as manager_utils from ironic.conf import CONF @@ -485,46 +485,23 @@ class DracRedfishManagement(redfish_management.RedfishManagement): # Export executed as part of Import async periodic task status check @METRICS.timer('DracRedfishManagement._query_import_configuration_status') - @periodics.periodic( + @periodics.node_periodic( + purpose='checking async import configuration task', spacing=CONF.drac.query_import_config_job_status_interval, - enabled=CONF.drac.query_import_config_job_status_interval > 0) - def _query_import_configuration_status(self, manager, context): + filters={'reserved': False, 'maintenance': False}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: ( + n.driver_internal_info.get('import_task_monitor_url') + ), + ) + def _query_import_configuration_status(self, task, manager, context): """Period job to check import configuration task.""" + if not isinstance(task.driver.management, DracRedfishManagement): + return - filters = {'reserved': False, 'maintenance': False} - fields = ['driver_internal_info'] - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - - task_monitor_url = driver_internal_info.get( - 'import_task_monitor_url') - # NOTE(TheJulia): Evaluate if a task montitor URL exists - # based upon our inital DB query before pulling a task for - # every node in the deployment which reduces the overall - # number of DB queries triggering in the background where - # no work is required. - if not task_monitor_url: - continue - - lock_purpose = 'checking async import configuration task' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.management, - DracRedfishManagement): - continue - self._check_import_configuration_task( - task, task_monitor_url) - except exception.NodeNotFound: - LOG.info('During _query_import_configuration_status, node ' - '%(node)s was not found and presumed deleted by ' - 'another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During _query_import_configuration_status, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + self._check_import_configuration_task( + task, task.node.driver_internal_info.get( + 'import_task_monitor_url')) def _check_import_configuration_task(self, task, task_monitor_url): """Checks progress of running import configuration task""" diff --git a/ironic/drivers/modules/drac/raid.py b/ironic/drivers/modules/drac/raid.py index 1bdd36d85d..726f57d3af 100644 --- a/ironic/drivers/modules/drac/raid.py +++ b/ironic/drivers/modules/drac/raid.py @@ -18,7 +18,6 @@ DRAC RAID specific methods from collections import defaultdict import math -from futurist import periodics from ironic_lib import metrics_utils from oslo_log import log as logging from oslo_utils import importutils @@ -28,7 +27,7 @@ import tenacity from ironic.common import exception from ironic.common.i18n import _ from ironic.common import raid as raid_common -from ironic.conductor import task_manager +from ironic.conductor import periodics from ironic.conductor import utils as manager_utils from ironic.conf import CONF from ironic.drivers import base @@ -1487,38 +1486,22 @@ class DracRedfishRAID(redfish_raid.RedfishRAID): return False @METRICS.timer('DracRedfishRAID._query_raid_tasks_status') - @periodics.periodic( - spacing=CONF.drac.query_raid_config_job_status_interval) - def _query_raid_tasks_status(self, manager, context): + @periodics.node_periodic( + purpose='checking async RAID tasks', + spacing=CONF.drac.query_raid_config_job_status_interval, + filters={'reserved': False, 'maintenance': False}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: ( + n.driver_internal_info.get('raid_task_monitor_uris') + ), + ) + def _query_raid_tasks_status(self, task, manager, context): """Periodic task to check the progress of running RAID tasks""" + if not isinstance(task.driver.raid, DracRedfishRAID): + return - filters = {'reserved': False, 'maintenance': False} - fields = ['driver_internal_info'] - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - task_monitor_uris = driver_internal_info.get( - 'raid_task_monitor_uris') - if not task_monitor_uris: - continue - try: - lock_purpose = 'checking async RAID tasks' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.raid, - DracRedfishRAID): - continue - self._check_raid_tasks_status( - task, task_monitor_uris) - except exception.NodeNotFound: - LOG.info('During _query_raid_tasks_status, node ' - '%(node)s was not found and presumed deleted by ' - 'another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During _query_raid_tasks_status, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + self._check_raid_tasks_status( + task, task.node.driver_internal_info.get('raid_task_monitor_uris')) def _check_raid_tasks_status(self, task, task_mon_uris): """Checks RAID tasks for completion @@ -1763,43 +1746,21 @@ class DracWSManRAID(base.RAIDInterface): return {'logical_disks': logical_disks} @METRICS.timer('DracRAID._query_raid_config_job_status') - @periodics.periodic( - spacing=CONF.drac.query_raid_config_job_status_interval) - def _query_raid_config_job_status(self, manager, context): + @periodics.node_periodic( + purpose='checking async raid configuration jobs', + spacing=CONF.drac.query_raid_config_job_status_interval, + filters={'reserved': False, 'maintenance': False}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: ( + n.driver_internal_info.get('raid_config_job_ids') + ), + ) + def _query_raid_config_job_status(self, task, manager, context): """Periodic task to check the progress of running RAID config jobs.""" + if not isinstance(task.driver.raid, DracWSManRAID): + return - filters = {'reserved': False, 'maintenance': False} - fields = ['driver_internal_info'] - - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - - job_ids = driver_internal_info.get('raid_config_job_ids') - # NOTE(TheJulia): Evaluate if there is work to be done - # based upon the original DB query's results so we don't - # proceed creating tasks for every node in the deployment. - if not job_ids: - continue - - lock_purpose = 'checking async raid configuration jobs' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.raid, DracWSManRAID): - continue - - self._check_node_raid_jobs(task) - - except exception.NodeNotFound: - LOG.info("During query_raid_config_job_status, node " - "%(node)s was not found and presumed deleted by " - "another process.", {'node': node_uuid}) - except exception.NodeLocked: - LOG.info("During query_raid_config_job_status, node " - "%(node)s was already locked by another process. " - "Skip.", {'node': node_uuid}) + self._check_node_raid_jobs(task) @METRICS.timer('DracRAID._check_node_raid_jobs') def _check_node_raid_jobs(self, task): diff --git a/ironic/drivers/modules/inspector.py b/ironic/drivers/modules/inspector.py index b344abb74c..1b866d0d5c 100644 --- a/ironic/drivers/modules/inspector.py +++ b/ironic/drivers/modules/inspector.py @@ -20,7 +20,6 @@ import shlex from urllib import parse as urlparse import eventlet -from futurist import periodics import openstack from oslo_log import log as logging @@ -29,6 +28,7 @@ from ironic.common.i18n import _ from ironic.common import keystone from ironic.common import states from ironic.common import utils +from ironic.conductor import periodics from ironic.conductor import task_manager from ironic.conductor import utils as cond_utils from ironic.conf import CONF @@ -292,21 +292,14 @@ class Inspector(base.InspectInterface): 'ironic-inspector', {'uuid': node_uuid}) _get_client(task.context).abort_introspection(node_uuid) - @periodics.periodic(spacing=CONF.inspector.status_check_period) - def _periodic_check_result(self, manager, context): + @periodics.node_periodic( + purpose='checking hardware inspection status', + spacing=CONF.inspector.status_check_period, + filters={'provision_state': states.INSPECTWAIT}, + ) + def _periodic_check_result(self, task, manager, context): """Periodic task checking results of inspection.""" - filters = {'provision_state': states.INSPECTWAIT} - node_iter = manager.iter_nodes(filters=filters) - - for node_uuid, driver, conductor_group in node_iter: - try: - lock_purpose = 'checking hardware inspection status' - with task_manager.acquire(context, node_uuid, - shared=True, - purpose=lock_purpose) as task: - _check_status(task) - except (exception.NodeLocked, exception.NodeNotFound): - continue + _check_status(task) def _start_inspection(node_uuid, context): diff --git a/ironic/drivers/modules/irmc/raid.py b/ironic/drivers/modules/irmc/raid.py index 8f1bd172af..3368e887d3 100644 --- a/ironic/drivers/modules/irmc/raid.py +++ b/ironic/drivers/modules/irmc/raid.py @@ -15,7 +15,6 @@ """ Irmc RAID specific methods """ -from futurist import periodics from ironic_lib import metrics_utils from oslo_log import log as logging from oslo_utils import importutils @@ -23,7 +22,7 @@ from oslo_utils import importutils from ironic.common import exception from ironic.common import raid as raid_common from ironic.common import states -from ironic.conductor import task_manager +from ironic.conductor import periodics from ironic.conductor import utils as manager_utils from ironic import conf from ironic.drivers import base @@ -430,80 +429,63 @@ class IRMCRAID(base.RAIDInterface): {'node_id': node_uuid, 'cfg': node.raid_config}) @METRICS.timer('IRMCRAID._query_raid_config_fgi_status') - @periodics.periodic( - spacing=CONF.irmc.query_raid_config_fgi_status_interval) - def _query_raid_config_fgi_status(self, manager, context): + @periodics.node_periodic( + purpose='checking async RAID configuration tasks', + spacing=CONF.irmc.query_raid_config_fgi_status_interval, + filters={'reserved': False, 'provision_state': states.CLEANWAIT, + 'maintenance': False}, + predicate_extra_fields=['raid_config'], + predicate=lambda n: ( + n.raid_config and not n.raid_config.get('fgi_status') + ), + ) + def _query_raid_config_fgi_status(self, task, manager, context): """Periodic tasks to check the progress of running RAID config.""" + node = task.node + node_uuid = task.node.uuid + if not isinstance(task.driver.raid, IRMCRAID): + return + if task.node.target_raid_config is None: + return + task.upgrade_lock() + if node.provision_state != states.CLEANWAIT: + return + # Avoid hitting clean_callback_timeout expiration + node.touch_provisioning() - filters = {'reserved': False, 'provision_state': states.CLEANWAIT, - 'maintenance': False} - fields = ['raid_config'] - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, raid_config) in node_list: - try: - # NOTE(TheJulia): Evaluate based upon presence of raid - # configuration before triggering a task, as opposed to after - # so we don't create excess node task objects with related - # DB queries. - if not raid_config or raid_config.get('fgi_status'): - continue + raid_config = node.raid_config - lock_purpose = 'checking async RAID configuration tasks' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - node = task.node - node_uuid = task.node.uuid - if not isinstance(task.driver.raid, IRMCRAID): - continue - if task.node.target_raid_config is None: - continue - task.upgrade_lock() - if node.provision_state != states.CLEANWAIT: - continue - # Avoid hitting clean_callback_timeout expiration - node.touch_provisioning() + try: + report = irmc_common.get_irmc_report(node) + except client.scci.SCCIInvalidInputError: + raid_config.update({'fgi_status': RAID_FAILED}) + raid_common.update_raid_info(node, raid_config) + self._set_clean_failed(task, RAID_FAILED) + return + except client.scci.SCCIClientError: + raid_config.update({'fgi_status': RAID_FAILED}) + raid_common.update_raid_info(node, raid_config) + self._set_clean_failed(task, RAID_FAILED) + return - try: - report = irmc_common.get_irmc_report(node) - except client.scci.SCCIInvalidInputError: - raid_config.update({'fgi_status': RAID_FAILED}) - raid_common.update_raid_info(node, raid_config) - self._set_clean_failed(task, RAID_FAILED) - continue - except client.scci.SCCIClientError: - raid_config.update({'fgi_status': RAID_FAILED}) - raid_common.update_raid_info(node, raid_config) - self._set_clean_failed(task, RAID_FAILED) - continue - - fgi_status_dict = _get_fgi_status(report, node_uuid) - # Note(trungnv): Allow to check until RAID mechanism to be - # completed with RAID information in report. - if fgi_status_dict == 'completing': - continue - if not fgi_status_dict: - raid_config.update({'fgi_status': RAID_FAILED}) - raid_common.update_raid_info(node, raid_config) - self._set_clean_failed(task, fgi_status_dict) - continue - if all(fgi_status == 'Idle' for fgi_status in - fgi_status_dict.values()): - raid_config.update({'fgi_status': RAID_COMPLETED}) - raid_common.update_raid_info(node, raid_config) - LOG.info('RAID configuration has completed on ' - 'node %(node)s with fgi_status is %(fgi)s', - {'node': node_uuid, 'fgi': RAID_COMPLETED}) - self._resume_cleaning(task) - - except exception.NodeNotFound: - LOG.info('During query_raid_config_job_status, node ' - '%(node)s was not found raid_config and presumed ' - 'deleted by another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During query_raid_config_job_status, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + fgi_status_dict = _get_fgi_status(report, node_uuid) + # Note(trungnv): Allow to check until RAID mechanism to be + # completed with RAID information in report. + if fgi_status_dict == 'completing': + return + if not fgi_status_dict: + raid_config.update({'fgi_status': RAID_FAILED}) + raid_common.update_raid_info(node, raid_config) + self._set_clean_failed(task, fgi_status_dict) + return + if all(fgi_status == 'Idle' for fgi_status in + fgi_status_dict.values()): + raid_config.update({'fgi_status': RAID_COMPLETED}) + raid_common.update_raid_info(node, raid_config) + LOG.info('RAID configuration has completed on ' + 'node %(node)s with fgi_status is %(fgi)s', + {'node': node_uuid, 'fgi': RAID_COMPLETED}) + self._resume_cleaning(task) def _set_clean_failed(self, task, fgi_status_dict): LOG.error('RAID configuration task failed for node %(node)s. ' diff --git a/ironic/drivers/modules/pxe_base.py b/ironic/drivers/modules/pxe_base.py index 5fff4ae51b..ab5b0d5357 100644 --- a/ironic/drivers/modules/pxe_base.py +++ b/ironic/drivers/modules/pxe_base.py @@ -13,7 +13,6 @@ Base PXE Interface Methods """ -from futurist import periodics from ironic_lib import metrics_utils from oslo_config import cfg from oslo_log import log as logging @@ -24,7 +23,7 @@ from ironic.common import exception from ironic.common.i18n import _ from ironic.common import pxe_utils from ironic.common import states -from ironic.conductor import task_manager +from ironic.conductor import periodics from ironic.conductor import utils as manager_utils from ironic.drivers.modules import boot_mode_utils from ironic.drivers.modules import deploy_utils @@ -452,29 +451,23 @@ class PXEBaseMixin(object): states.RESCUEWAIT} @METRICS.timer('PXEBaseMixin._check_boot_timeouts') - @periodics.periodic(spacing=CONF.pxe.boot_retry_check_interval, - enabled=bool(CONF.pxe.boot_retry_timeout)) - def _check_boot_timeouts(self, manager, context): + @periodics.node_periodic( + purpose='checking PXE boot status', + spacing=CONF.pxe.boot_retry_check_interval, + enabled=bool(CONF.pxe.boot_retry_timeout), + filters={'provision_state_in': _RETRY_ALLOWED_STATES, + 'reserved': False, + 'maintenance': False, + 'provisioned_before': CONF.pxe.boot_retry_timeout}, + ) + def _check_boot_timeouts(self, task, manager, context): """Periodically checks whether boot has timed out and retry it. + :param task: a task instance. :param manager: conductor manager. :param context: request context. """ - filters = {'provision_state_in': self._RETRY_ALLOWED_STATES, - 'reserved': False, - 'maintenance': False, - 'provisioned_before': CONF.pxe.boot_retry_timeout} - node_iter = manager.iter_nodes(filters=filters) - - for node_uuid, driver, conductor_group in node_iter: - try: - lock_purpose = 'checking PXE boot status' - with task_manager.acquire(context, node_uuid, - shared=True, - purpose=lock_purpose) as task: - self._check_boot_status(task) - except (exception.NodeLocked, exception.NodeNotFound): - continue + self._check_boot_status(task) def _check_boot_status(self, task): if not isinstance(task.driver.boot, PXEBaseMixin): diff --git a/ironic/drivers/modules/redfish/management.py b/ironic/drivers/modules/redfish/management.py index 9a68d99754..ab1a105efb 100644 --- a/ironic/drivers/modules/redfish/management.py +++ b/ironic/drivers/modules/redfish/management.py @@ -15,7 +15,6 @@ import collections -from futurist import periodics from ironic_lib import metrics_utils from oslo_log import log from oslo_utils import importutils @@ -29,6 +28,7 @@ from ironic.common.i18n import _ from ironic.common import indicator_states from ironic.common import states from ironic.common import utils +from ironic.conductor import periodics from ironic.conductor import task_manager from ironic.conductor import utils as manager_utils from ironic.conf import CONF @@ -853,100 +853,46 @@ class RedfishManagement(base.ManagementInterface): node.save() @METRICS.timer('RedfishManagement._query_firmware_update_failed') - @periodics.periodic( + @periodics.node_periodic( + purpose='checking if async firmware update failed', spacing=CONF.redfish.firmware_update_fail_interval, - enabled=CONF.redfish.firmware_update_fail_interval > 0) - def _query_firmware_update_failed(self, manager, context): + filters={'reserved': False, 'provision_state': states.CLEANFAIL, + 'maintenance': True}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: n.driver_internal_info.get('firmware_updates'), + ) + def _query_firmware_update_failed(self, task, manager, context): """Periodic job to check for failed firmware updates.""" + if not isinstance(task.driver.management, RedfishManagement): + return - filters = {'reserved': False, 'provision_state': states.CLEANFAIL, - 'maintenance': True} + node = task.node - fields = ['driver_internal_info'] + # A firmware update failed. Discard any remaining firmware + # updates so when the user takes the node out of + # maintenance mode, pending firmware updates do not + # automatically continue. + LOG.warning('Firmware update failed for node %(node)s. ' + 'Discarding remaining firmware updates.', + {'node': node.uuid}) - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - firmware_updates = driver_internal_info.get( - 'firmware_updates') - # NOTE(TheJulia): If we don't have a entry upfront, we can - # safely skip past the node as we know work here is not - # required, otherwise minimizing the number of potential - # nodes to visit. - if not firmware_updates: - continue - - lock_purpose = 'checking async firmware update failed.' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.management, - RedfishManagement): - continue - - node = task.node - - # A firmware update failed. Discard any remaining firmware - # updates so when the user takes the node out of - # maintenance mode, pending firmware updates do not - # automatically continue. - LOG.warning('Firmware update failed for node %(node)s. ' - 'Discarding remaining firmware updates.', - {'node': node.uuid}) - - task.upgrade_lock() - self._clear_firmware_updates(node) - - except exception.NodeNotFound: - LOG.info('During _query_firmware_update_failed, node ' - '%(node)s was not found and presumed deleted by ' - 'another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During _query_firmware_update_failed, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + task.upgrade_lock() + self._clear_firmware_updates(node) @METRICS.timer('RedfishManagement._query_firmware_update_status') - @periodics.periodic( + @periodics.node_periodic( + purpose='checking async firmware update tasks', spacing=CONF.redfish.firmware_update_status_interval, - enabled=CONF.redfish.firmware_update_status_interval > 0) - def _query_firmware_update_status(self, manager, context): + filters={'reserved': False, 'provision_state': states.CLEANWAIT}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: n.driver_internal_info.get('firmware_updates'), + ) + def _query_firmware_update_status(self, task, manager, context): """Periodic job to check firmware update tasks.""" + if not isinstance(task.driver.management, RedfishManagement): + return - filters = {'reserved': False, 'provision_state': states.CLEANWAIT} - fields = ['driver_internal_info'] - - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - firmware_updates = driver_internal_info.get( - 'firmware_updates') - # NOTE(TheJulia): Check and skip upfront before creating a - # task so we don't generate additional tasks and db queries - # for every node in CLEANWAIT which is not locked. - if not firmware_updates: - continue - - lock_purpose = 'checking async firmware update tasks.' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.management, - RedfishManagement): - continue - - self._check_node_firmware_update(task) - - except exception.NodeNotFound: - LOG.info('During _query_firmware_update_status, node ' - '%(node)s was not found and presumed deleted by ' - 'another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During _query_firmware_update_status, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + self._check_node_firmware_update(task) @METRICS.timer('RedfishManagement._check_node_firmware_update') def _check_node_firmware_update(self, task): diff --git a/ironic/drivers/modules/redfish/raid.py b/ironic/drivers/modules/redfish/raid.py index c01d08a9cf..95052bb467 100644 --- a/ironic/drivers/modules/redfish/raid.py +++ b/ironic/drivers/modules/redfish/raid.py @@ -15,7 +15,6 @@ import math -from futurist import periodics from ironic_lib import metrics_utils from oslo_log import log from oslo_utils import importutils @@ -25,7 +24,7 @@ from ironic.common import exception from ironic.common.i18n import _ from ironic.common import raid from ironic.common import states -from ironic.conductor import task_manager +from ironic.conductor import periodics from ironic.conductor import utils as manager_utils from ironic.conf import CONF from ironic.drivers import base @@ -1014,98 +1013,46 @@ class RedfishRAID(base.RAIDInterface): node.save() @METRICS.timer('RedfishRAID._query_raid_config_failed') - @periodics.periodic( + @periodics.node_periodic( + purpose='checking async RAID config failed', spacing=CONF.redfish.raid_config_fail_interval, - enabled=CONF.redfish.raid_config_fail_interval > 0) - def _query_raid_config_failed(self, manager, context): + filters={'reserved': False, 'provision_state': states.CLEANFAIL, + 'maintenance': True}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: n.driver_internal_info.get('raid_configs'), + ) + def _query_raid_config_failed(self, task, manager, context): """Periodic job to check for failed RAID configuration.""" + if not isinstance(task.driver.raid, RedfishRAID): + return - filters = {'reserved': False, 'provision_state': states.CLEANFAIL, - 'maintenance': True} + node = task.node - fields = ['driver_internal_info'] + # A RAID config failed. Discard any remaining RAID + # configs so when the user takes the node out of + # maintenance mode, pending RAID configs do not + # automatically continue. + LOG.warning('RAID configuration failed for node %(node)s. ' + 'Discarding remaining RAID configurations.', + {'node': node.uuid}) - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - raid_configs = driver_internal_info.get( - 'raid_configs') - # NOTE(TheJulia): Evaluate the presence of raid configuration - # activity before pulling the task, so we don't needlessly - # create database queries with tasks which would be skipped - # anyhow. - if not raid_configs: - continue - - lock_purpose = 'checking async RAID config failed.' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.raid, RedfishRAID): - continue - - node = task.node - - # A RAID config failed. Discard any remaining RAID - # configs so when the user takes the node out of - # maintenance mode, pending RAID configs do not - # automatically continue. - LOG.warning('RAID configuration failed for node %(node)s. ' - 'Discarding remaining RAID configurations.', - {'node': node.uuid}) - - task.upgrade_lock() - self._clear_raid_configs(node) - - except exception.NodeNotFound: - LOG.info('During _query_raid_config_failed, node ' - '%(node)s was not found and presumed deleted by ' - 'another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During _query_raid_config_failed, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + task.upgrade_lock() + self._clear_raid_configs(node) @METRICS.timer('RedfishRAID._query_raid_config_status') - @periodics.periodic( + @periodics.node_periodic( + purpose='checking async RAID config tasks', spacing=CONF.redfish.raid_config_status_interval, - enabled=CONF.redfish.raid_config_status_interval > 0) - def _query_raid_config_status(self, manager, context): + filters={'reserved': False, 'provision_state': states.CLEANWAIT}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: n.driver_internal_info.get('raid_configs'), + ) + def _query_raid_config_status(self, task, manager, context): """Periodic job to check RAID config tasks.""" + if not isinstance(task.driver.raid, RedfishRAID): + return - filters = {'reserved': False, 'provision_state': states.CLEANWAIT} - fields = ['driver_internal_info'] - - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - raid_configs = driver_internal_info.get( - 'raid_configs') - # NOTE(TheJulia): Skip to next record if we do not - # have raid configuraiton tasks, so we don't pull tasks - # for every unrelated node in CLEANWAIT. - if not raid_configs: - continue - - lock_purpose = 'checking async RAID config tasks.' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.raid, RedfishRAID): - continue - - self._check_node_raid_config(task) - - except exception.NodeNotFound: - LOG.info('During _query_raid_config_status, node ' - '%(node)s was not found and presumed deleted by ' - 'another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During _query_raid_config_status, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + self._check_node_raid_config(task) def _get_error_messages(self, response): try: diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py index e93b3cb787..a00bb97f83 100644 --- a/ironic/tests/unit/conductor/test_manager.py +++ b/ironic/tests/unit/conductor/test_manager.py @@ -5567,7 +5567,7 @@ class ManagerPowerRecoveryTestCase(mgr_utils.CommonMixIn, self.task.driver = self.driver self.filters = {'maintenance': True, 'fault': 'power failure'} - self.columns = ['uuid', 'driver', 'conductor_group', 'id'] + self.columns = ['uuid', 'driver', 'conductor_group'] def test_node_not_mapped(self, get_nodeinfo_mock, mapped_mock, acquire_mock): @@ -6152,7 +6152,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase): self.filters = {'reserved': False, 'maintenance': False, 'provision_state': states.ACTIVE} - self.columns = ['uuid', 'driver', 'conductor_group', 'id', + self.columns = ['uuid', 'driver', 'conductor_group', 'conductor_affinity'] def _assert_get_nodeinfo_args(self, get_nodeinfo_mock): @@ -6200,7 +6200,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase): self.service, self.node.uuid, self.node.driver, self.node.conductor_group) acquire_mock.assert_called_once_with(self.context, self.node.uuid, - purpose=mock.ANY) + purpose=mock.ANY, shared=False) # assert spawn_after has been called self.task.spawn_after.assert_called_once_with( self.service._spawn_worker, @@ -6234,7 +6234,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase): # assert acquire() gets called 2 times only instead of 3. When # NoFreeConductorWorker is raised the loop should be broken expected = [mock.call(self.context, self.node.uuid, - purpose=mock.ANY)] * 2 + purpose=mock.ANY, shared=False)] * 2 self.assertEqual(expected, acquire_mock.call_args_list) # assert spawn_after has been called twice @@ -6264,7 +6264,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase): # assert acquire() gets called 3 times expected = [mock.call(self.context, self.node.uuid, - purpose=mock.ANY)] * 3 + purpose=mock.ANY, shared=False)] * 3 self.assertEqual(expected, acquire_mock.call_args_list) # assert spawn_after has been called only 2 times @@ -6296,7 +6296,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase): # assert acquire() gets called only once because of the worker limit acquire_mock.assert_called_once_with(self.context, self.node.uuid, - purpose=mock.ANY) + purpose=mock.ANY, shared=False) # assert spawn_after has been called self.task.spawn_after.assert_called_once_with( diff --git a/ironic/tests/unit/conductor/test_periodics.py b/ironic/tests/unit/conductor/test_periodics.py new file mode 100644 index 0000000000..85868163a0 --- /dev/null +++ b/ironic/tests/unit/conductor/test_periodics.py @@ -0,0 +1,135 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from oslo_utils import uuidutils + +from ironic.common import context as ironic_context +from ironic.conductor import base_manager +from ironic.conductor import periodics +from ironic.conductor import task_manager +from ironic.tests.unit.db import base as db_base +from ironic.tests.unit.objects import utils as obj_utils + + +_FILTERS = {'maintenance': False} + + +class PeriodicTestService(base_manager.BaseConductorManager): + + def __init__(self, test): + self.test = test + self.nodes = [] + + @periodics.node_periodic(purpose="herding cats", spacing=42) + def simple(self, task, context): + self.test.assertIsInstance(context, ironic_context.RequestContext) + self.test.assertTrue(task.shared) + self.nodes.append(task.node.uuid) + + @periodics.node_periodic(purpose="herding cats", spacing=42, + shared_task=False, filters=_FILTERS) + def exclusive(self, task, context): + self.test.assertIsInstance(context, ironic_context.RequestContext) + self.test.assertFalse(task.shared) + self.nodes.append(task.node.uuid) + + @periodics.node_periodic(purpose="never running", spacing=42, + predicate=lambda n: n.cat != 'meow', + predicate_extra_fields=['cat']) + def never_run(self, task, context): + self.test.fail(f"Was not supposed to run, ran with {task.node}") + + @periodics.node_periodic(purpose="herding cats", spacing=42, limit=3) + def limit(self, task, context): + self.test.assertIsInstance(context, ironic_context.RequestContext) + self.test.assertTrue(task.shared) + self.nodes.append(task.node.uuid) + if task.node.uuid == 'stop': + raise periodics.Stop() + + +@mock.patch.object(PeriodicTestService, 'iter_nodes', autospec=True) +class NodePeriodicTestCase(db_base.DbTestCase): + + def setUp(self): + super().setUp() + self.service = PeriodicTestService(self) + self.ctx = ironic_context.get_admin_context() + self.uuid = uuidutils.generate_uuid() + self.node = obj_utils.create_test_node(self.context, uuid=self.uuid) + + def test_simple(self, mock_iter_nodes): + mock_iter_nodes.return_value = iter([ + (uuidutils.generate_uuid(), 'driver1', ''), + (self.uuid, 'driver2', 'group'), + ]) + + self.service.simple(self.ctx) + + mock_iter_nodes.assert_called_once_with(self.service, + filters=None, fields=()) + self.assertEqual([self.uuid], self.service.nodes) + + def test_exclusive(self, mock_iter_nodes): + mock_iter_nodes.return_value = iter([ + (uuidutils.generate_uuid(), 'driver1', ''), + (self.uuid, 'driver2', 'group'), + ]) + + self.service.exclusive(self.ctx) + + mock_iter_nodes.assert_called_once_with(self.service, + filters=_FILTERS, + fields=()) + self.assertEqual([self.uuid], self.service.nodes) + + @mock.patch.object(task_manager, 'acquire', autospec=True) + def test_never_run(self, mock_acquire, mock_iter_nodes): + mock_iter_nodes.return_value = iter([ + (self.uuid, 'driver2', 'group', 'meow'), + ]) + + self.service.never_run(self.ctx) + + mock_iter_nodes.assert_called_once_with(self.service, + filters=None, + fields=['cat']) + self.assertEqual([], self.service.nodes) + mock_acquire.assert_not_called() + + @mock.patch.object(task_manager, 'acquire', autospec=True) + def test_limit(self, mock_acquire, mock_iter_nodes): + mock_iter_nodes.return_value = iter([ + (self.uuid, 'driver1', ''), + ] * 10) + mock_acquire.return_value.__enter__.return_value.node.uuid = self.uuid + + self.service.limit(self.ctx) + + mock_iter_nodes.assert_called_once_with(self.service, + filters=None, fields=()) + self.assertEqual([self.uuid] * 3, self.service.nodes) + + @mock.patch.object(task_manager, 'acquire', autospec=True) + def test_stop(self, mock_acquire, mock_iter_nodes): + mock_iter_nodes.return_value = iter([ + (self.uuid, 'driver1', ''), + ] * 10) + mock_acquire.return_value.__enter__.return_value.node.uuid = 'stop' + + self.service.limit(self.ctx) + + mock_iter_nodes.assert_called_once_with(self.service, + filters=None, fields=()) + self.assertEqual(['stop'], self.service.nodes) diff --git a/ironic/tests/unit/drivers/modules/drac/test_management.py b/ironic/tests/unit/drivers/modules/drac/test_management.py index f3d23d9a83..9d5182e899 100644 --- a/ironic/tests/unit/drivers/modules/drac/test_management.py +++ b/ironic/tests/unit/drivers/modules/drac/test_management.py @@ -28,6 +28,7 @@ from oslo_utils import importutils import ironic.common.boot_devices from ironic.common import exception from ironic.common import molds +from ironic.conductor import periodics from ironic.conductor import task_manager from ironic.conductor import utils as manager_utils from ironic.drivers.modules import deploy_utils @@ -1021,7 +1022,7 @@ class DracRedfishManagementTestCase(test_utils.BaseDracTest): self.management._check_import_configuration_task.assert_not_called() - @mock.patch.object(drac_mgmt.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_import_configuration_status_node_notfound( self, mock_acquire, mock_log): @@ -1044,7 +1045,7 @@ class DracRedfishManagementTestCase(test_utils.BaseDracTest): self.management._check_import_configuration_task.assert_not_called() self.assertTrue(mock_log.called) - @mock.patch.object(drac_mgmt.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_import_configuration_status_node_locked( self, mock_acquire, mock_log): diff --git a/ironic/tests/unit/drivers/modules/drac/test_raid.py b/ironic/tests/unit/drivers/modules/drac/test_raid.py index 1a5928e431..01a5ca9d15 100644 --- a/ironic/tests/unit/drivers/modules/drac/test_raid.py +++ b/ironic/tests/unit/drivers/modules/drac/test_raid.py @@ -25,6 +25,7 @@ import tenacity from ironic.common import exception from ironic.common import states +from ironic.conductor import periodics from ironic.conductor import task_manager from ironic.conductor import utils as manager_utils from ironic.conf import CONF @@ -2592,7 +2593,7 @@ class DracRedfishRAIDTestCase(test_utils.BaseDracTest): self.raid._check_raid_tasks_status.assert_not_called() - @mock.patch.object(drac_raid.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_raid_tasks_status_node_notfound( self, mock_acquire, mock_log): @@ -2610,7 +2611,7 @@ class DracRedfishRAIDTestCase(test_utils.BaseDracTest): self.raid._check_raid_tasks_status.assert_not_called() self.assertTrue(mock_log.called) - @mock.patch.object(drac_raid.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_raid_tasks_status_node_locked( self, mock_acquire, mock_log): diff --git a/ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py b/ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py index 865f589626..57ba8263fd 100644 --- a/ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py +++ b/ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py @@ -49,6 +49,8 @@ class iRMCPeriodicTaskTestCase(test_common.BaseIRMCTest): { 'key': 'value' }]} + self.node.raid_config = self.raid_config + self.node.target_raid_config = self.target_raid_config @mock.patch.object(irmc_common, 'get_irmc_report', autospec=True) def test__query_raid_config_fgi_status_without_node( @@ -286,6 +288,7 @@ class iRMCPeriodicTaskTestCase(test_common.BaseIRMCTest): mock_manager = mock.Mock() raid_config = self.raid_config raid_config_2 = self.raid_config.copy() + self.node_2.raid_config = raid_config_2 fgi_status_dict = {} fgi_mock.side_effect = [{}, {'0': 'Idle', '1': 'Idle'}] node_list = [(self.node_2.uuid, 'fake-hardware', '', raid_config_2), diff --git a/ironic/tests/unit/drivers/modules/redfish/test_management.py b/ironic/tests/unit/drivers/modules/redfish/test_management.py index 99da1265b4..d5f23b93f0 100644 --- a/ironic/tests/unit/drivers/modules/redfish/test_management.py +++ b/ironic/tests/unit/drivers/modules/redfish/test_management.py @@ -25,6 +25,7 @@ from ironic.common import components from ironic.common import exception from ironic.common import indicator_states from ironic.common import states +from ironic.conductor import periodics from ironic.conductor import task_manager from ironic.conductor import utils as manager_utils from ironic.drivers.modules import deploy_utils @@ -905,7 +906,7 @@ class RedfishManagementTestCase(db_base.DbTestCase): management._clear_firmware_updates.assert_not_called() - @mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_firmware_update_failed_node_notfound(self, mock_acquire, mock_log): @@ -928,7 +929,7 @@ class RedfishManagementTestCase(db_base.DbTestCase): management._clear_firmware_updates.assert_not_called() self.assertTrue(mock_log.called) - @mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_firmware_update_failed_node_locked( self, mock_acquire, mock_log): @@ -1017,7 +1018,7 @@ class RedfishManagementTestCase(db_base.DbTestCase): management._check_node_firmware_update.assert_not_called() - @mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_firmware_update_status_node_notfound(self, mock_acquire, mock_log): @@ -1040,7 +1041,7 @@ class RedfishManagementTestCase(db_base.DbTestCase): management._check_node_firmware_update.assert_not_called() self.assertTrue(mock_log.called) - @mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_firmware_update_status_node_locked( self, mock_acquire, mock_log):