Merge "Switch to Futurist library for asynchronous execution and periodic tasks"
This commit is contained in:
commit
59c4960417
@ -77,12 +77,14 @@ Driver-Specific Periodic Tasks
|
||||
|
||||
Drivers may run their own periodic tasks, i.e. actions run repeatedly after
|
||||
a certain amount of time. Such task is created by decorating a method on the
|
||||
driver itself or on any interface with driver_periodic_task_ decorator, e.g.
|
||||
driver itself or on any interface with periodic_ decorator, e.g.
|
||||
|
||||
::
|
||||
|
||||
from futurist import periodics
|
||||
|
||||
class FakePower(base.PowerInterface):
|
||||
@base.driver_periodic_task(spacing=42)
|
||||
@periodics.periodic(spacing=42)
|
||||
def task(self, manager, context):
|
||||
pass # do something
|
||||
|
||||
@ -90,7 +92,7 @@ driver itself or on any interface with driver_periodic_task_ decorator, e.g.
|
||||
def __init__(self):
|
||||
self.power = FakePower()
|
||||
|
||||
@base.driver_periodic_task(spacing=42)
|
||||
@periodics.periodic(spacing=42)
|
||||
def task2(self, manager, context):
|
||||
pass # do something
|
||||
|
||||
@ -98,21 +100,6 @@ driver itself or on any interface with driver_periodic_task_ decorator, e.g.
|
||||
Here the ``spacing`` argument is a period in seconds for a given periodic task.
|
||||
For example 'spacing=5' means every 5 seconds.
|
||||
|
||||
.. note::
|
||||
The ``parallel`` argument may be passed to driver_periodic_task_.
|
||||
If it's set to False, this task will be run in the periodic task loop,
|
||||
rather than a separate greenthread.
|
||||
|
||||
This is deprecated as of Liberty release, and the parallel argument will be
|
||||
ignored starting in the Mitaka cycle, as such task would prevent all other
|
||||
periodic tasks from starting while it is running.
|
||||
|
||||
.. note::
|
||||
By default periodic task names are derived from method names,
|
||||
so they should be unique within a Python module.
|
||||
Use ``name`` argument to driver_periodic_task_ to override
|
||||
automatically generated name.
|
||||
|
||||
|
||||
Message Routing
|
||||
===============
|
||||
@ -137,4 +124,4 @@ driver actions such as take-over or clean-up.
|
||||
.. _DB API: ../api/ironic.db.api.html
|
||||
.. _diskimage-builder: https://github.com/openstack/diskimage-builder
|
||||
.. _consistent hashing algorithm: ../api/ironic.common.hash_ring.html
|
||||
.. _driver_periodic_task: ../api/ironic.drivers.base.html#ironic.drivers.base.driver_periodic_task
|
||||
.. _periodic: http://docs.openstack.org/developer/futurist/api.html#futurist.periodics.periodic
|
||||
|
@ -117,7 +117,8 @@
|
||||
# Options defined in ironic.common.service
|
||||
#
|
||||
|
||||
# Seconds between running periodic tasks. (integer value)
|
||||
# Default interval for running driver periodic tasks. (integer
|
||||
# value)
|
||||
#periodic_interval=60
|
||||
|
||||
# Name of this node. This can be an opaque identifier. It is
|
||||
@ -596,7 +597,9 @@
|
||||
# Options defined in ironic.conductor.base_manager
|
||||
#
|
||||
|
||||
# The size of the workers greenthread pool. (integer value)
|
||||
# The size of the workers greenthread pool. Note that 2
|
||||
# threads will be reserved by the conductor itself for
|
||||
# handling heart beats and periodic tasks. (integer value)
|
||||
#workers_pool_size=100
|
||||
|
||||
# Seconds between conductor heart beats. (integer value)
|
||||
|
@ -40,7 +40,8 @@ from ironic.objects import base as objects_base
|
||||
service_opts = [
|
||||
cfg.IntOpt('periodic_interval',
|
||||
default=60,
|
||||
help=_('Seconds between running periodic tasks.')),
|
||||
help=_('Default interval for running driver periodic tasks.'),
|
||||
deprecated_for_removal=True),
|
||||
cfg.StrOpt('host',
|
||||
default=socket.getfqdn(),
|
||||
help=_('Name of this node. This can be an opaque identifier. '
|
||||
@ -79,11 +80,7 @@ class RPCService(service.Service):
|
||||
self.rpcserver.start()
|
||||
|
||||
self.handle_signal()
|
||||
self.manager.init_host()
|
||||
self.tg.add_dynamic_timer(
|
||||
self.manager.periodic_tasks,
|
||||
periodic_interval_max=CONF.periodic_interval,
|
||||
context=admin_context)
|
||||
self.manager.init_host(admin_context)
|
||||
|
||||
LOG.info(_LI('Created RPC server for service %(service)s on host '
|
||||
'%(host)s.'),
|
||||
|
@ -15,13 +15,13 @@
|
||||
import inspect
|
||||
import threading
|
||||
|
||||
from eventlet import greenpool
|
||||
from oslo_concurrency import lockutils
|
||||
import futurist
|
||||
from futurist import periodics
|
||||
from futurist import rejection
|
||||
from oslo_config import cfg
|
||||
from oslo_context import context as ironic_context
|
||||
from oslo_db import exception as db_exception
|
||||
from oslo_log import log
|
||||
from oslo_service import periodic_task
|
||||
from oslo_utils import excutils
|
||||
|
||||
from ironic.common import driver_factory
|
||||
@ -40,8 +40,10 @@ from ironic.db import api as dbapi
|
||||
|
||||
conductor_opts = [
|
||||
cfg.IntOpt('workers_pool_size',
|
||||
default=100,
|
||||
help=_('The size of the workers greenthread pool.')),
|
||||
default=100, min=3,
|
||||
help=_('The size of the workers greenthread pool. '
|
||||
'Note that 2 threads will be reserved by the conductor '
|
||||
'itself for handling heart beats and periodic tasks.')),
|
||||
cfg.IntOpt('heartbeat_interval',
|
||||
default=10,
|
||||
help=_('Seconds between conductor heart beats.')),
|
||||
@ -51,18 +53,18 @@ conductor_opts = [
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(conductor_opts, 'conductor')
|
||||
LOG = log.getLogger(__name__)
|
||||
WORKER_SPAWN_lOCK = "conductor_worker_spawn"
|
||||
|
||||
|
||||
class BaseConductorManager(periodic_task.PeriodicTasks):
|
||||
class BaseConductorManager(object):
|
||||
|
||||
def __init__(self, host, topic):
|
||||
super(BaseConductorManager, self).__init__(CONF)
|
||||
super(BaseConductorManager, self).__init__()
|
||||
if not host:
|
||||
host = CONF.host
|
||||
self.host = host
|
||||
self.topic = topic
|
||||
self.notifier = rpc.get_notifier()
|
||||
self._started = False
|
||||
|
||||
def _get_driver(self, driver_name):
|
||||
"""Get the driver.
|
||||
@ -78,15 +80,29 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
|
||||
except KeyError:
|
||||
raise exception.DriverNotFound(driver_name=driver_name)
|
||||
|
||||
def init_host(self):
|
||||
def init_host(self, admin_context=None):
|
||||
"""Initialize the conductor host.
|
||||
|
||||
:param admin_context: the admin context to pass to periodic tasks.
|
||||
:raises: RuntimeError when conductor is already running
|
||||
:raises: NoDriversLoaded when no drivers are enabled on the conductor
|
||||
"""
|
||||
if self._started:
|
||||
raise RuntimeError(_('Attempt to start an already running '
|
||||
'conductor manager'))
|
||||
|
||||
self.dbapi = dbapi.get_instance()
|
||||
|
||||
self._keepalive_evt = threading.Event()
|
||||
"""Event for the keepalive thread."""
|
||||
|
||||
self._worker_pool = greenpool.GreenPool(
|
||||
size=CONF.conductor.workers_pool_size)
|
||||
"""GreenPool of background workers for performing tasks async."""
|
||||
# TODO(dtantsur): make the threshold configurable?
|
||||
rejection_func = rejection.reject_when_reached(
|
||||
CONF.conductor.workers_pool_size)
|
||||
self._executor = futurist.GreenThreadPoolExecutor(
|
||||
max_workers=CONF.conductor.workers_pool_size,
|
||||
check_and_reject=rejection_func)
|
||||
"""Executor for performing tasks async."""
|
||||
|
||||
self.ring_manager = hash.HashRingManager()
|
||||
"""Consistent hash ring which maps drivers to conductors."""
|
||||
@ -106,15 +122,36 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
|
||||
LOG.error(msg, self.host)
|
||||
raise exception.NoDriversLoaded(conductor=self.host)
|
||||
|
||||
# Collect driver-specific periodic tasks
|
||||
# Collect driver-specific periodic tasks.
|
||||
# Conductor periodic tasks accept context argument, driver periodic
|
||||
# tasks accept this manager and context. We have to ensure that the
|
||||
# same driver interface class is not traversed twice, otherwise
|
||||
# we'll have several instances of the same task.
|
||||
LOG.debug('Collecting periodic tasks')
|
||||
self._periodic_task_callables = []
|
||||
periodic_task_classes = set()
|
||||
self._collect_periodic_tasks(self, (admin_context,))
|
||||
for driver_obj in driver_factory.drivers().values():
|
||||
self._collect_periodic_tasks(driver_obj)
|
||||
self._collect_periodic_tasks(driver_obj, (self, admin_context))
|
||||
for iface_name in (driver_obj.core_interfaces +
|
||||
driver_obj.standard_interfaces +
|
||||
['vendor']):
|
||||
iface = getattr(driver_obj, iface_name, None)
|
||||
if iface:
|
||||
self._collect_periodic_tasks(iface)
|
||||
if iface and iface.__class__ not in periodic_task_classes:
|
||||
self._collect_periodic_tasks(iface, (self, admin_context))
|
||||
periodic_task_classes.add(iface.__class__)
|
||||
|
||||
if (len(self._periodic_task_callables) >
|
||||
CONF.conductor.workers_pool_size):
|
||||
LOG.warning(_LW('This conductor has %(tasks)d periodic tasks '
|
||||
'enabled, but only %(workers)d task workers '
|
||||
'allowed by [conductor]workers_pool_size option'),
|
||||
{'tasks': len(self._periodic_task_callables),
|
||||
'workers': CONF.conductor.workers_pool_size})
|
||||
|
||||
self._periodic_tasks = periodics.PeriodicWorker(
|
||||
self._periodic_task_callables,
|
||||
executor_factory=periodics.ExistingExecutor(self._executor))
|
||||
|
||||
# clear all locks held by this conductor before registering
|
||||
self.dbapi.clear_node_reservations_for_conductor(self.host)
|
||||
@ -134,6 +171,12 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
|
||||
update_existing=True)
|
||||
self.conductor = cdr
|
||||
|
||||
# Start periodic tasks
|
||||
self._periodic_tasks_worker = self._executor.submit(
|
||||
self._periodic_tasks.start, allow_empty=True)
|
||||
self._periodic_tasks_worker.add_done_callback(
|
||||
self._on_periodic_tasks_stop)
|
||||
|
||||
# NOTE(lucasagomes): If the conductor server dies abruptly
|
||||
# mid deployment (OMM Killer, power outage, etc...) we
|
||||
# can not resume the deployment even if the conductor
|
||||
@ -161,10 +204,7 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
|
||||
LOG.critical(_LC('Failed to start keepalive'))
|
||||
self.del_host()
|
||||
|
||||
def _collect_periodic_tasks(self, obj):
|
||||
for n, method in inspect.getmembers(obj, inspect.ismethod):
|
||||
if getattr(method, '_periodic_enabled', False):
|
||||
self.add_periodic_task(method)
|
||||
self._started = True
|
||||
|
||||
def del_host(self, deregister=True):
|
||||
# Conductor deregistration fails if called on non-initialized
|
||||
@ -190,11 +230,34 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
|
||||
# Waiting here to give workers the chance to finish. This has the
|
||||
# benefit of releasing locks workers placed on nodes, as well as
|
||||
# having work complete normally.
|
||||
self._worker_pool.waitall()
|
||||
self._periodic_tasks.stop()
|
||||
self._periodic_tasks.wait()
|
||||
self._executor.shutdown(wait=True)
|
||||
self._started = False
|
||||
|
||||
def periodic_tasks(self, context, raise_on_error=False):
|
||||
"""Periodic tasks are run at pre-specified interval."""
|
||||
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
|
||||
def _collect_periodic_tasks(self, obj, args):
|
||||
"""Collect periodic tasks from a given object.
|
||||
|
||||
Populates self._periodic_task_callables with tuples
|
||||
(callable, args, kwargs).
|
||||
|
||||
:param obj: object containing periodic tasks as methods
|
||||
:param args: tuple with arguments to pass to every task
|
||||
"""
|
||||
for name, member in inspect.getmembers(obj):
|
||||
if periodics.is_periodic(member):
|
||||
LOG.debug('Found periodic task %(owner)s.%(member)s',
|
||||
{'owner': obj.__class__.__name__,
|
||||
'member': name})
|
||||
self._periodic_task_callables.append((member, args, {}))
|
||||
|
||||
def _on_periodic_tasks_stop(self, fut):
|
||||
try:
|
||||
fut.result()
|
||||
except Exception as exc:
|
||||
LOG.critical(_LC('Periodic tasks worker has failed: %s'), exc)
|
||||
else:
|
||||
LOG.info(_LI('Successfully shut down periodic tasks'))
|
||||
|
||||
def iter_nodes(self, fields=None, **kwargs):
|
||||
"""Iterate over nodes mapped to this conductor.
|
||||
@ -217,7 +280,6 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
|
||||
if self._mapped_to_this_conductor(*result[:2]):
|
||||
yield result
|
||||
|
||||
@lockutils.synchronized(WORKER_SPAWN_lOCK, 'ironic-')
|
||||
def _spawn_worker(self, func, *args, **kwargs):
|
||||
|
||||
"""Create a greenthread to run func(*args, **kwargs).
|
||||
@ -225,13 +287,13 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
|
||||
Spawns a greenthread if there are free slots in pool, otherwise raises
|
||||
exception. Execution control returns immediately to the caller.
|
||||
|
||||
:returns: GreenThread object.
|
||||
:returns: Future object.
|
||||
:raises: NoFreeConductorWorker if worker pool is currently full.
|
||||
|
||||
"""
|
||||
if self._worker_pool.free():
|
||||
return self._worker_pool.spawn(func, *args, **kwargs)
|
||||
else:
|
||||
try:
|
||||
return self._executor.submit(func, *args, **kwargs)
|
||||
except futurist.RejectedSubmission:
|
||||
raise exception.NoFreeConductorWorker()
|
||||
|
||||
def _conductor_service_record_keepalive(self):
|
||||
|
@ -46,10 +46,10 @@ import datetime
|
||||
import tempfile
|
||||
|
||||
import eventlet
|
||||
from futurist import periodics
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import oslo_messaging as messaging
|
||||
from oslo_service import periodic_task
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
@ -1200,8 +1200,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
action=action, node=node.uuid,
|
||||
state=node.provision_state)
|
||||
|
||||
@periodic_task.periodic_task(
|
||||
spacing=CONF.conductor.sync_power_state_interval)
|
||||
@periodics.periodic(spacing=CONF.conductor.sync_power_state_interval)
|
||||
def _sync_power_states(self, context):
|
||||
"""Periodic task to sync power states for the nodes.
|
||||
|
||||
@ -1269,8 +1268,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
# Yield on every iteration
|
||||
eventlet.sleep(0)
|
||||
|
||||
@periodic_task.periodic_task(
|
||||
spacing=CONF.conductor.check_provision_state_interval)
|
||||
@periodics.periodic(spacing=CONF.conductor.check_provision_state_interval)
|
||||
def _check_deploy_timeouts(self, context):
|
||||
"""Periodically checks whether a deploy RPC call has timed out.
|
||||
|
||||
@ -1292,8 +1290,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
self._fail_if_in_state(context, filters, states.DEPLOYWAIT,
|
||||
sort_key, callback_method, err_handler)
|
||||
|
||||
@periodic_task.periodic_task(
|
||||
spacing=CONF.conductor.check_provision_state_interval)
|
||||
@periodics.periodic(spacing=CONF.conductor.check_provision_state_interval)
|
||||
def _check_deploying_status(self, context):
|
||||
"""Periodically checks the status of nodes in DEPLOYING state.
|
||||
|
||||
@ -1376,8 +1373,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
task.node.conductor_affinity = self.conductor.id
|
||||
task.node.save()
|
||||
|
||||
@periodic_task.periodic_task(
|
||||
spacing=CONF.conductor.check_provision_state_interval)
|
||||
@periodics.periodic(spacing=CONF.conductor.check_provision_state_interval)
|
||||
def _check_cleanwait_timeouts(self, context):
|
||||
"""Periodically checks for nodes being cleaned.
|
||||
|
||||
@ -1402,8 +1398,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
last_error=last_error,
|
||||
keep_target_state=True)
|
||||
|
||||
@periodic_task.periodic_task(
|
||||
spacing=CONF.conductor.sync_local_state_interval)
|
||||
@periodics.periodic(spacing=CONF.conductor.sync_local_state_interval)
|
||||
def _sync_local_state(self, context):
|
||||
"""Perform any actions necessary to sync local state.
|
||||
|
||||
@ -1826,8 +1821,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
driver = self._get_driver(driver_name)
|
||||
return driver.get_properties()
|
||||
|
||||
@periodic_task.periodic_task(
|
||||
spacing=CONF.conductor.send_sensor_data_interval)
|
||||
@periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval)
|
||||
def _send_sensor_data(self, context):
|
||||
"""Periodically sends sensor data to Ceilometer."""
|
||||
# do nothing if send_sensor_data option is False
|
||||
@ -2061,8 +2055,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
action='inspect', node=task.node.uuid,
|
||||
state=task.node.provision_state)
|
||||
|
||||
@periodic_task.periodic_task(
|
||||
spacing=CONF.conductor.check_provision_state_interval)
|
||||
@periodics.periodic(spacing=CONF.conductor.check_provision_state_interval)
|
||||
def _check_inspect_timeouts(self, context):
|
||||
"""Periodically checks inspect_timeout and fails upon reaching it.
|
||||
|
||||
|
@ -383,15 +383,15 @@ class TaskManager(object):
|
||||
# for some reason, this is true.
|
||||
# All of the above are asserted in tests such that we'll
|
||||
# catch if eventlet ever changes this behavior.
|
||||
thread = None
|
||||
fut = None
|
||||
try:
|
||||
thread = self._spawn_method(*self._spawn_args,
|
||||
**self._spawn_kwargs)
|
||||
fut = self._spawn_method(*self._spawn_args,
|
||||
**self._spawn_kwargs)
|
||||
|
||||
# NOTE(comstud): Trying to use a lambda here causes
|
||||
# the callback to not occur for some reason. This
|
||||
# also makes it easier to test.
|
||||
thread.link(self._thread_release_resources)
|
||||
fut.add_done_callback(self._thread_release_resources)
|
||||
# Don't unlock! The unlock will occur when the
|
||||
# thread finshes.
|
||||
return
|
||||
@ -408,9 +408,9 @@ class TaskManager(object):
|
||||
{'method': self._on_error_method.__name__,
|
||||
'node': self.node.uuid})
|
||||
|
||||
if thread is not None:
|
||||
# This means the link() failed for some
|
||||
if fut is not None:
|
||||
# This means the add_done_callback() failed for some
|
||||
# reason. Nuke the thread.
|
||||
thread.cancel()
|
||||
fut.cancel()
|
||||
self.release_resources()
|
||||
self.release_resources()
|
||||
|
@ -24,9 +24,9 @@ import inspect
|
||||
import json
|
||||
import os
|
||||
|
||||
import eventlet
|
||||
from futurist import periodics
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import periodic_task
|
||||
from oslo_utils import excutils
|
||||
import six
|
||||
|
||||
@ -40,6 +40,10 @@ RAID_CONFIG_SCHEMA = os.path.join(os.path.dirname(__file__),
|
||||
'raid_config_schema.json')
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.import_opt('periodic_interval', 'ironic.common.service')
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BaseDriver(object):
|
||||
"""Base class for all drivers.
|
||||
@ -1116,45 +1120,36 @@ def clean_step(priority, abortable=False, argsinfo=None):
|
||||
return decorator
|
||||
|
||||
|
||||
def driver_periodic_task(parallel=True, **other):
|
||||
def driver_periodic_task(**kwargs):
|
||||
"""Decorator for a driver-specific periodic task.
|
||||
|
||||
Deprecated, please use futurist directly.
|
||||
Example::
|
||||
|
||||
from futurist import periodics
|
||||
|
||||
class MyDriver(base.BaseDriver):
|
||||
@base.driver_periodic_task(spacing=42)
|
||||
@periodics.periodic(spacing=42)
|
||||
def task(self, manager, context):
|
||||
# do some job
|
||||
|
||||
:param parallel: If True (default), this task is run in a separate thread.
|
||||
If False, this task will be run in the conductor's periodic task
|
||||
loop, rather than a separate greenthread. This parameter is
|
||||
deprecated and will be ignored starting with Mitaka cycle.
|
||||
:param other: arguments to pass to @periodic_task.periodic_task
|
||||
:param kwargs: arguments to pass to @periodics.periodic
|
||||
"""
|
||||
# TODO(dtantsur): drop all this magic once
|
||||
# https://review.openstack.org/#/c/134303/ lands
|
||||
semaphore = eventlet.semaphore.BoundedSemaphore()
|
||||
LOG.warning(_LW('driver_periodic_task decorator is deprecated, please '
|
||||
'use futurist.periodics.periodic directly'))
|
||||
# Previously we accepted more arguments, make a backward compatibility
|
||||
# layer for out-of-tree drivers.
|
||||
new_kwargs = {}
|
||||
for arg in ('spacing', 'enabled', 'run_immediately'):
|
||||
try:
|
||||
new_kwargs[arg] = kwargs.pop(arg)
|
||||
except KeyError:
|
||||
pass
|
||||
new_kwargs.setdefault('spacing', CONF.periodic_interval)
|
||||
|
||||
def decorator2(func):
|
||||
@six.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if parallel:
|
||||
def _internal():
|
||||
with semaphore:
|
||||
func(*args, **kwargs)
|
||||
if kwargs:
|
||||
LOG.warning(_LW('The following arguments are not supported by '
|
||||
'futurist.periodics.periodic and are ignored: %s'),
|
||||
', '.join(kwargs))
|
||||
|
||||
eventlet.greenthread.spawn_n(_internal)
|
||||
else:
|
||||
LOG.warning(_LW(
|
||||
'Using periodic tasks with parallel=False is deprecated, '
|
||||
'"parallel" argument will be ignored starting with '
|
||||
'the Mitaka release'))
|
||||
func(*args, **kwargs)
|
||||
|
||||
# NOTE(dtantsur): name should be unique
|
||||
other.setdefault('name', '%s.%s' % (func.__module__, func.__name__))
|
||||
decorator = periodic_task.periodic_task(**other)
|
||||
return decorator(wrapper)
|
||||
|
||||
return decorator2
|
||||
return periodics.periodic(**new_kwargs)
|
||||
|
@ -16,6 +16,7 @@ Modules required to work with ironic_inspector:
|
||||
"""
|
||||
|
||||
import eventlet
|
||||
from futurist import periodics
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import importutils
|
||||
@ -121,8 +122,8 @@ class Inspector(base.InspectInterface):
|
||||
eventlet.spawn_n(_start_inspection, task.node.uuid, task.context)
|
||||
return states.INSPECTING
|
||||
|
||||
@base.driver_periodic_task(spacing=CONF.inspector.status_check_period,
|
||||
enabled=CONF.inspector.enabled)
|
||||
@periodics.periodic(spacing=CONF.inspector.status_check_period,
|
||||
enabled=CONF.inspector.enabled)
|
||||
def _periodic_check_result(self, manager, context):
|
||||
"""Periodic task checking results of inspection."""
|
||||
filters = {'provision_state': states.INSPECTING}
|
||||
|
@ -17,6 +17,7 @@
|
||||
|
||||
"""Test utils for Ironic Managers."""
|
||||
|
||||
from futurist import periodics
|
||||
import mock
|
||||
from oslo_utils import strutils
|
||||
from oslo_utils import uuidutils
|
||||
@ -175,8 +176,12 @@ class ServiceSetUpMixin(object):
|
||||
return
|
||||
self.service.del_host()
|
||||
|
||||
def _start_service(self):
|
||||
self.service.init_host()
|
||||
def _start_service(self, start_periodic_tasks=False):
|
||||
if start_periodic_tasks:
|
||||
self.service.init_host()
|
||||
else:
|
||||
with mock.patch.object(periodics, 'PeriodicWorker', autospec=True):
|
||||
self.service.init_host()
|
||||
self.addCleanup(self._stop_service)
|
||||
|
||||
|
||||
|
@ -13,6 +13,8 @@
|
||||
"""Test class for Ironic BaseConductorManager."""
|
||||
|
||||
import eventlet
|
||||
import futurist
|
||||
from futurist import periodics
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exception
|
||||
@ -23,6 +25,7 @@ from ironic.conductor import base_manager
|
||||
from ironic.conductor import manager
|
||||
from ironic.drivers import base as drivers_base
|
||||
from ironic import objects
|
||||
from ironic.tests import base as tests_base
|
||||
from ironic.tests.unit.conductor import mgr_utils
|
||||
from ironic.tests.unit.db import base as tests_db_base
|
||||
from ironic.tests.unit.objects import utils as obj_utils
|
||||
@ -86,6 +89,7 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
res = objects.Conductor.get_by_hostname(self.context,
|
||||
self.hostname)
|
||||
self.assertEqual(init_names, res['drivers'])
|
||||
self._stop_service()
|
||||
|
||||
# verify that restart registers new driver names
|
||||
self.config(enabled_drivers=restart_names)
|
||||
@ -98,12 +102,10 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
@mock.patch.object(driver_factory.DriverFactory, '__getitem__')
|
||||
def test_start_registers_driver_specific_tasks(self, get_mock):
|
||||
init_names = ['fake1']
|
||||
expected_name = 'ironic.tests.unit.conductor.test_base_manager.task'
|
||||
expected_name2 = 'ironic.tests.unit.conductor.test_base_manager.iface'
|
||||
self.config(enabled_drivers=init_names)
|
||||
|
||||
class TestInterface(object):
|
||||
@drivers_base.driver_periodic_task(spacing=100500)
|
||||
@periodics.periodic(spacing=100500)
|
||||
def iface(self):
|
||||
pass
|
||||
|
||||
@ -113,28 +115,27 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
|
||||
iface = TestInterface()
|
||||
|
||||
@drivers_base.driver_periodic_task(spacing=42)
|
||||
@periodics.periodic(spacing=42)
|
||||
def task(self, context):
|
||||
pass
|
||||
|
||||
@drivers_base.driver_periodic_task()
|
||||
def deprecated_task(self, context):
|
||||
pass
|
||||
|
||||
obj = Driver()
|
||||
self.assertTrue(obj.task._periodic_enabled)
|
||||
get_mock.return_value = mock.Mock(obj=obj)
|
||||
|
||||
with mock.patch.object(
|
||||
driver_factory.DriverFactory()._extension_manager,
|
||||
'names') as mock_names:
|
||||
mock_names.return_value = init_names
|
||||
self._start_service()
|
||||
tasks = dict(self.service._periodic_tasks)
|
||||
self.assertEqual(obj.task, tasks[expected_name])
|
||||
self.assertEqual(obj.iface.iface, tasks[expected_name2])
|
||||
self.assertEqual(42,
|
||||
self.service._periodic_spacing[expected_name])
|
||||
self.assertEqual(100500,
|
||||
self.service._periodic_spacing[expected_name2])
|
||||
self.assertIn(expected_name, self.service._periodic_last_run)
|
||||
self.assertIn(expected_name2, self.service._periodic_last_run)
|
||||
self._start_service(start_periodic_tasks=True)
|
||||
|
||||
tasks = {c[0] for c in self.service._periodic_task_callables}
|
||||
for t in (obj.task, obj.iface.iface, obj.deprecated_task):
|
||||
self.assertTrue(periodics.is_periodic(t))
|
||||
self.assertIn(t, tasks)
|
||||
|
||||
@mock.patch.object(driver_factory.DriverFactory, '__init__')
|
||||
def test_start_fails_on_missing_driver(self, mock_df):
|
||||
@ -154,6 +155,17 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
self.service.init_host)
|
||||
self.assertTrue(log_mock.error.called)
|
||||
|
||||
def test_prevent_double_start(self):
|
||||
self._start_service()
|
||||
self.assertRaisesRegexp(RuntimeError, 'already running',
|
||||
self.service.init_host)
|
||||
|
||||
@mock.patch.object(base_manager, 'LOG')
|
||||
def test_warning_on_low_workers_pool(self, log_mock):
|
||||
CONF.set_override('workers_pool_size', 3, 'conductor')
|
||||
self._start_service()
|
||||
self.assertTrue(log_mock.warning.called)
|
||||
|
||||
@mock.patch.object(eventlet.greenpool.GreenPool, 'waitall')
|
||||
def test_del_host_waits_on_workerpool(self, wait_mock):
|
||||
self._start_service()
|
||||
@ -185,3 +197,23 @@ class KeepAliveTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
mock_is_set.side_effect = [False, False, False, True]
|
||||
self.service._conductor_service_record_keepalive()
|
||||
self.assertEqual(3, mock_touch.call_count)
|
||||
|
||||
|
||||
class ManagerSpawnWorkerTestCase(tests_base.TestCase):
|
||||
def setUp(self):
|
||||
super(ManagerSpawnWorkerTestCase, self).setUp()
|
||||
self.service = manager.ConductorManager('hostname', 'test-topic')
|
||||
self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor)
|
||||
self.service._executor = self.executor
|
||||
|
||||
def test__spawn_worker(self):
|
||||
self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow')
|
||||
|
||||
self.executor.submit.assert_called_once_with(
|
||||
'fake', 1, 2, foo='bar', cat='meow')
|
||||
|
||||
def test__spawn_worker_none_free(self):
|
||||
self.executor.submit.side_effect = futurist.RejectedSubmission()
|
||||
|
||||
self.assertRaises(exception.NoFreeConductorWorker,
|
||||
self.service._spawn_worker, 'fake')
|
||||
|
@ -70,7 +70,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.service.change_node_power_state(self.context,
|
||||
node.uuid,
|
||||
states.POWER_ON)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
|
||||
get_power_mock.assert_called_once_with(mock.ANY)
|
||||
node.refresh()
|
||||
@ -103,7 +103,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
|
||||
# In this test worker should not be spawned, but waiting to make sure
|
||||
# the below perform_mock assertion is valid.
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
self.assertFalse(pwr_act_mock.called, 'node_power_action has been '
|
||||
'unexpectedly called.')
|
||||
# Verify existing reservation wasn't broken.
|
||||
@ -162,7 +162,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.service.change_node_power_state(self.context,
|
||||
node.uuid,
|
||||
new_state)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
|
||||
get_power_mock.assert_called_once_with(mock.ANY)
|
||||
set_power_mock.assert_called_once_with(mock.ANY, new_state)
|
||||
@ -298,7 +298,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
'first_method', 'POST',
|
||||
info)
|
||||
# Waiting to make sure the below assertions are valid.
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
|
||||
# Assert spawn_after was called
|
||||
self.assertTrue(mock_spawn.called)
|
||||
@ -320,7 +320,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
'third_method_sync',
|
||||
'POST', info)
|
||||
# Waiting to make sure the below assertions are valid.
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
|
||||
# Assert no workers were used
|
||||
self.assertFalse(mock_spawn.called)
|
||||
@ -438,7 +438,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
|
||||
|
||||
# Waiting to make sure the below assertions are valid.
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
|
||||
node.refresh()
|
||||
self.assertIsNone(node.last_error)
|
||||
@ -715,7 +715,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
provision_state=states.AVAILABLE)
|
||||
|
||||
self.service.do_node_deploy(self.context, node.uuid)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertEqual(states.DEPLOYING, node.provision_state)
|
||||
self.assertEqual(states.ACTIVE, node.target_provision_state)
|
||||
@ -745,7 +745,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
driver_internal_info={'is_whole_disk_image': False})
|
||||
|
||||
self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertEqual(states.DEPLOYING, node.provision_state)
|
||||
self.assertEqual(states.ACTIVE, node.target_provision_state)
|
||||
@ -774,7 +774,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
instance_info={'image_source': uuidutils.generate_uuid()})
|
||||
|
||||
self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertEqual(states.DEPLOYWAIT, node.provision_state)
|
||||
self.assertEqual(states.ACTIVE, node.target_provision_state)
|
||||
@ -798,7 +798,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
target_provision_state=states.NOSTATE)
|
||||
|
||||
self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertEqual(states.ACTIVE, node.provision_state)
|
||||
self.assertEqual(states.NOSTATE, node.target_provision_state)
|
||||
@ -822,7 +822,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
target_provision_state=states.NOSTATE)
|
||||
|
||||
self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertEqual(states.ACTIVE, node.provision_state)
|
||||
self.assertEqual(states.NOSTATE, node.target_provision_state)
|
||||
@ -845,7 +845,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
target_provision_state=states.NOSTATE)
|
||||
|
||||
self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertEqual(states.ACTIVE, node.provision_state)
|
||||
self.assertEqual(states.NOSTATE, node.target_provision_state)
|
||||
@ -893,7 +893,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid)
|
||||
# Compare true exception hidden by @messaging.expected_exceptions
|
||||
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
# Make sure things were rolled back
|
||||
self.assertEqual(prv_state, node.provision_state)
|
||||
@ -1049,7 +1049,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
provision_updated_at=datetime.datetime(2000, 1, 1, 0, 0))
|
||||
|
||||
self.service._check_deploy_timeouts(self.context)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertEqual(states.DEPLOYFAIL, node.provision_state)
|
||||
self.assertEqual(states.ACTIVE, node.target_provision_state)
|
||||
@ -1067,7 +1067,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
provision_updated_at=datetime.datetime(2000, 1, 1, 0, 0))
|
||||
|
||||
self.service._check_cleanwait_timeouts(self.context)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertEqual(states.CLEANFAIL, node.provision_state)
|
||||
self.assertEqual(tgt_prov_state, node.target_provision_state)
|
||||
@ -1162,8 +1162,9 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
target_provision_state=states.AVAILABLE,
|
||||
driver_internal_info={'is_whole_disk_image': False})
|
||||
|
||||
self._start_service()
|
||||
self.service.do_node_tear_down(self.context, node.uuid)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
# Node will be moved to AVAILABLE after cleaning, not tested here
|
||||
self.assertEqual(states.CLEANING, node.provision_state)
|
||||
@ -1176,7 +1177,6 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
def test__do_node_tear_down_from_valid_states(self):
|
||||
valid_states = [states.ACTIVE, states.DEPLOYWAIT, states.DEPLOYFAIL,
|
||||
states.ERROR]
|
||||
self._start_service()
|
||||
for state in valid_states:
|
||||
self._test_do_node_tear_down_from_state(state)
|
||||
|
||||
@ -1207,7 +1207,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid)
|
||||
# Compare true exception hidden by @messaging.expected_exceptions
|
||||
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
# Assert instance_info/driver_internal_info was not touched
|
||||
self.assertEqual(fake_instance_info, node.instance_info)
|
||||
@ -1236,7 +1236,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid, 'provide')
|
||||
# Compare true exception hidden by @messaging.expected_exceptions
|
||||
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
# Make sure things were rolled back
|
||||
self.assertEqual(prv_state, node.provision_state)
|
||||
@ -1463,7 +1463,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid, clean_steps)
|
||||
# Compare true exception hidden by @messaging.expected_exceptions
|
||||
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
mock_validate.assert_called_once_with(mock.ANY)
|
||||
mock_spawn.assert_called_with(self.service._do_node_clean, mock.ANY,
|
||||
clean_steps)
|
||||
@ -1492,9 +1492,6 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.service.continue_node_clean,
|
||||
self.context, node.uuid)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
node.refresh()
|
||||
|
||||
@mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker')
|
||||
def test_continue_node_clean_wrong_state(self, mock_spawn):
|
||||
# Test the appropriate exception is raised if node isn't already
|
||||
@ -1511,7 +1508,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.service.continue_node_clean,
|
||||
self.context, node.uuid)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
# Make sure things were rolled back
|
||||
self.assertEqual(prv_state, node.provision_state)
|
||||
@ -1533,7 +1530,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
clean_step=self.clean_steps[0])
|
||||
self._start_service()
|
||||
self.service.continue_node_clean(self.context, node.uuid)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertEqual(states.CLEANING, node.provision_state)
|
||||
self.assertEqual(tgt_prv_state, node.target_provision_state)
|
||||
@ -1561,7 +1558,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
driver_internal_info=driver_info, clean_step=self.clean_steps[0])
|
||||
self._start_service()
|
||||
self.service.continue_node_clean(self.context, node.uuid)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
if skip:
|
||||
expected_step_index = 1
|
||||
@ -1591,7 +1588,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
|
||||
self._start_service()
|
||||
self.service.continue_node_clean(self.context, node.uuid)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertEqual(states.CLEANFAIL, node.provision_state)
|
||||
self.assertEqual(tgt_prov_state, node.target_provision_state)
|
||||
@ -1619,7 +1616,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
|
||||
self._start_service()
|
||||
self.service.continue_node_clean(self.context, node.uuid)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertEqual(tgt_prov_state, node.provision_state)
|
||||
self.assertIsNone(node.target_provision_state)
|
||||
@ -1667,7 +1664,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
with task_manager.acquire(
|
||||
self.context, node.uuid, shared=False) as task:
|
||||
self.service._do_node_clean(task)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
# Assert that the node was moved to available without cleaning
|
||||
@ -1779,7 +1776,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid, shared=False) as task:
|
||||
self.service._do_node_clean(task, clean_steps=clean_steps)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
mock_validate.assert_called_once_with(task)
|
||||
@ -1827,7 +1824,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid, shared=False) as task:
|
||||
self.service._do_next_clean_step(task, 0)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
self.assertEqual(states.CLEANWAIT, node.provision_state)
|
||||
@ -1868,7 +1865,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid, shared=False) as task:
|
||||
self.service._do_next_clean_step(task, self.next_clean_step_index)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
self.assertEqual(states.CLEANWAIT, node.provision_state)
|
||||
@ -1907,7 +1904,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid, shared=False) as task:
|
||||
self.service._do_next_clean_step(task, None)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
# Cleaning should be complete without calling additional steps
|
||||
@ -1947,7 +1944,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid, shared=False) as task:
|
||||
self.service._do_next_clean_step(task, 0)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
# Cleaning should be complete
|
||||
@ -1992,7 +1989,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.service._do_next_clean_step(task, 0)
|
||||
tear_mock.assert_called_once_with(task.driver.deploy, task)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
# Make sure we go to CLEANFAIL, clear clean_steps
|
||||
@ -2034,7 +2031,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid, shared=False) as task:
|
||||
self.service._do_next_clean_step(task, 0)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
# Make sure we go to CLEANFAIL, clear clean_steps
|
||||
@ -2075,7 +2072,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid, shared=False) as task:
|
||||
self.service._do_next_clean_step(task, None)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
# Cleaning should be complete without calling additional steps
|
||||
@ -2114,7 +2111,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid, shared=False) as task:
|
||||
self.service._do_next_clean_step(task, 0)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
# Make sure we go to CLEANFAIL, clear clean_steps
|
||||
@ -2232,7 +2229,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node['id'], shared=False) as task:
|
||||
self.service._do_node_verify(task)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
mock_validate.assert_called_once_with(task)
|
||||
@ -2261,7 +2258,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node['id'], shared=False) as task:
|
||||
self.service._do_node_verify(task)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
mock_validate.assert_called_once_with(task)
|
||||
@ -2289,7 +2286,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node['id'], shared=False) as task:
|
||||
self.service._do_node_verify(task)
|
||||
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
mock_get_power_state.assert_called_once_with(task)
|
||||
@ -2394,14 +2391,14 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
self.context, node.uuid, True)
|
||||
# Compare true exception hidden by @messaging.expected_exceptions
|
||||
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
spawn_mock.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY)
|
||||
|
||||
def test_set_console_mode_enabled(self):
|
||||
node = obj_utils.create_test_node(self.context, driver='fake')
|
||||
self._start_service()
|
||||
self.service.set_console_mode(self.context, node.uuid, True)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertTrue(node.console_enabled)
|
||||
|
||||
@ -2409,7 +2406,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
node = obj_utils.create_test_node(self.context, driver='fake')
|
||||
self._start_service()
|
||||
self.service.set_console_mode(self.context, node.uuid, False)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertFalse(node.console_enabled)
|
||||
|
||||
@ -2425,7 +2422,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
# Compare true exception hidden by @messaging.expected_exceptions
|
||||
self.assertEqual(exception.UnsupportedDriverExtension,
|
||||
exc.exc_info[0])
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
|
||||
def test_set_console_mode_validation_fail(self):
|
||||
@ -2449,7 +2446,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
'start_console') as mock_sc:
|
||||
mock_sc.side_effect = exception.IronicException('test-error')
|
||||
self.service.set_console_mode(self.context, node.uuid, True)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
mock_sc.assert_called_once_with(mock.ANY)
|
||||
node.refresh()
|
||||
self.assertIsNotNone(node.last_error)
|
||||
@ -2463,7 +2460,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
'stop_console') as mock_sc:
|
||||
mock_sc.side_effect = exception.IronicException('test-error')
|
||||
self.service.set_console_mode(self.context, node.uuid, False)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
mock_sc.assert_called_once_with(mock.ANY)
|
||||
node.refresh()
|
||||
self.assertIsNotNone(node.last_error)
|
||||
@ -2475,7 +2472,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
with mock.patch.object(self.driver.console,
|
||||
'start_console') as mock_sc:
|
||||
self.service.set_console_mode(self.context, node.uuid, True)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
self.assertFalse(mock_sc.called)
|
||||
|
||||
def test_disable_console_already_disabled(self):
|
||||
@ -2485,7 +2482,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
with mock.patch.object(self.driver.console,
|
||||
'stop_console') as mock_sc:
|
||||
self.service.set_console_mode(self.context, node.uuid, False)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
self.assertFalse(mock_sc.called)
|
||||
|
||||
def test_get_console(self):
|
||||
@ -3065,32 +3062,6 @@ class RaidTestCases(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
self.assertEqual(exception.InvalidParameterValue, exc.exc_info[0])
|
||||
|
||||
|
||||
class ManagerSpawnWorkerTestCase(tests_base.TestCase):
|
||||
def setUp(self):
|
||||
super(ManagerSpawnWorkerTestCase, self).setUp()
|
||||
self.service = manager.ConductorManager('hostname', 'test-topic')
|
||||
|
||||
def test__spawn_worker(self):
|
||||
worker_pool = mock.Mock(spec_set=['free', 'spawn'])
|
||||
worker_pool.free.return_value = True
|
||||
self.service._worker_pool = worker_pool
|
||||
|
||||
self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow')
|
||||
|
||||
worker_pool.spawn.assert_called_once_with(
|
||||
'fake', 1, 2, foo='bar', cat='meow')
|
||||
|
||||
def test__spawn_worker_none_free(self):
|
||||
worker_pool = mock.Mock(spec_set=['free', 'spawn'])
|
||||
worker_pool.free.return_value = False
|
||||
self.service._worker_pool = worker_pool
|
||||
|
||||
self.assertRaises(exception.NoFreeConductorWorker,
|
||||
self.service._spawn_worker, 'fake')
|
||||
|
||||
self.assertFalse(worker_pool.spawn.called)
|
||||
|
||||
|
||||
@mock.patch.object(conductor_utils, 'node_power_action')
|
||||
class ManagerDoSyncPowerStateTestCase(tests_db_base.DbTestCase):
|
||||
def setUp(self):
|
||||
@ -4184,7 +4155,7 @@ class NodeInspectHardware(mgr_utils.ServiceSetUpMixin,
|
||||
inspection_started_at=datetime.datetime(2000, 1, 1, 0, 0))
|
||||
|
||||
self.service._check_inspect_timeouts(self.context)
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
self.assertEqual(states.INSPECTFAIL, node.provision_state)
|
||||
self.assertEqual(states.MANAGEABLE, node.target_provision_state)
|
||||
@ -4207,7 +4178,7 @@ class NodeInspectHardware(mgr_utils.ServiceSetUpMixin,
|
||||
self.context, node.uuid)
|
||||
# Compare true exception hidden by @messaging.expected_exceptions
|
||||
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
|
||||
self.service._worker_pool.waitall()
|
||||
self._stop_service()
|
||||
node.refresh()
|
||||
# Make sure things were rolled back
|
||||
self.assertEqual(prv_state, node.provision_state)
|
||||
|
@ -17,8 +17,6 @@
|
||||
|
||||
"""Tests for :class:`ironic.conductor.task_manager`."""
|
||||
|
||||
import eventlet
|
||||
from eventlet import greenpool
|
||||
import mock
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
@ -47,6 +45,7 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
|
||||
self.config(node_locked_retry_attempts=1, group='conductor')
|
||||
self.config(node_locked_retry_interval=0, group='conductor')
|
||||
self.node = obj_utils.create_test_node(self.context)
|
||||
self.future_mock = mock.Mock(spec=['cancel', 'add_done_callback'])
|
||||
|
||||
def test_excl_lock(self, get_portgroups_mock, get_ports_mock,
|
||||
get_driver_mock, reserve_mock, release_mock,
|
||||
@ -389,8 +388,7 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
|
||||
def test_spawn_after(
|
||||
self, get_portgroups_mock, get_ports_mock, get_driver_mock,
|
||||
reserve_mock, release_mock, node_get_mock):
|
||||
thread_mock = mock.Mock(spec_set=['link', 'cancel'])
|
||||
spawn_mock = mock.Mock(return_value=thread_mock)
|
||||
spawn_mock = mock.Mock(return_value=self.future_mock)
|
||||
task_release_mock = mock.Mock()
|
||||
reserve_mock.return_value = self.node
|
||||
|
||||
@ -399,9 +397,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
|
||||
task.release_resources = task_release_mock
|
||||
|
||||
spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow')
|
||||
thread_mock.link.assert_called_once_with(
|
||||
self.future_mock.add_done_callback.assert_called_once_with(
|
||||
task._thread_release_resources)
|
||||
self.assertFalse(thread_mock.cancel.called)
|
||||
self.assertFalse(self.future_mock.cancel.called)
|
||||
# Since we mocked link(), we're testing that __exit__ didn't
|
||||
# release resources pending the finishing of the background
|
||||
# thread
|
||||
@ -444,9 +442,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
|
||||
def test_spawn_after_link_fails(
|
||||
self, get_portgroups_mock, get_ports_mock, get_driver_mock,
|
||||
reserve_mock, release_mock, node_get_mock):
|
||||
thread_mock = mock.Mock(spec_set=['link', 'cancel'])
|
||||
thread_mock.link.side_effect = exception.IronicException('foo')
|
||||
spawn_mock = mock.Mock(return_value=thread_mock)
|
||||
self.future_mock.add_done_callback.side_effect = (
|
||||
exception.IronicException('foo'))
|
||||
spawn_mock = mock.Mock(return_value=self.future_mock)
|
||||
task_release_mock = mock.Mock()
|
||||
thr_release_mock = mock.Mock(spec_set=[])
|
||||
reserve_mock.return_value = self.node
|
||||
@ -459,8 +457,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
|
||||
self.assertRaises(exception.IronicException, _test_it)
|
||||
|
||||
spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow')
|
||||
thread_mock.link.assert_called_once_with(thr_release_mock)
|
||||
thread_mock.cancel.assert_called_once_with()
|
||||
self.future_mock.add_done_callback.assert_called_once_with(
|
||||
thr_release_mock)
|
||||
self.future_mock.cancel.assert_called_once_with()
|
||||
task_release_mock.assert_called_once_with()
|
||||
|
||||
def test_spawn_after_on_error_hook(
|
||||
@ -659,75 +658,3 @@ class ExclusiveLockDecoratorTestCase(tests_base.TestCase):
|
||||
_req_excl_lock_method,
|
||||
*self.args_task_second,
|
||||
**self.kwargs)
|
||||
|
||||
|
||||
class TaskManagerGreenThreadTestCase(tests_base.TestCase):
|
||||
"""Class to assert our assumptions about greenthread behavior."""
|
||||
def test_gt_link_callback_added_during_execution(self):
|
||||
pool = greenpool.GreenPool()
|
||||
q1 = eventlet.Queue()
|
||||
q2 = eventlet.Queue()
|
||||
|
||||
def func():
|
||||
q1.put(None)
|
||||
q2.get()
|
||||
|
||||
link_callback = mock.Mock()
|
||||
|
||||
thread = pool.spawn(func)
|
||||
q1.get()
|
||||
thread.link(link_callback)
|
||||
q2.put(None)
|
||||
pool.waitall()
|
||||
link_callback.assert_called_once_with(thread)
|
||||
|
||||
def test_gt_link_callback_added_after_execution(self):
|
||||
pool = greenpool.GreenPool()
|
||||
link_callback = mock.Mock()
|
||||
|
||||
thread = pool.spawn(lambda: None)
|
||||
pool.waitall()
|
||||
thread.link(link_callback)
|
||||
link_callback.assert_called_once_with(thread)
|
||||
|
||||
def test_gt_link_callback_exception_inside_thread(self):
|
||||
pool = greenpool.GreenPool()
|
||||
q1 = eventlet.Queue()
|
||||
q2 = eventlet.Queue()
|
||||
|
||||
def func():
|
||||
q1.put(None)
|
||||
q2.get()
|
||||
raise Exception()
|
||||
|
||||
link_callback = mock.Mock()
|
||||
|
||||
thread = pool.spawn(func)
|
||||
q1.get()
|
||||
thread.link(link_callback)
|
||||
q2.put(None)
|
||||
pool.waitall()
|
||||
link_callback.assert_called_once_with(thread)
|
||||
|
||||
def test_gt_link_callback_added_after_exception_inside_thread(self):
|
||||
pool = greenpool.GreenPool()
|
||||
|
||||
def func():
|
||||
raise Exception()
|
||||
|
||||
link_callback = mock.Mock()
|
||||
|
||||
thread = pool.spawn(func)
|
||||
pool.waitall()
|
||||
thread.link(link_callback)
|
||||
|
||||
link_callback.assert_called_once_with(thread)
|
||||
|
||||
def test_gt_cancel_doesnt_run_thread(self):
|
||||
pool = greenpool.GreenPool()
|
||||
func = mock.Mock()
|
||||
thread = pool.spawn(func)
|
||||
thread.link(lambda t: None)
|
||||
thread.cancel()
|
||||
pool.waitall()
|
||||
self.assertFalse(func.called)
|
||||
|
@ -15,7 +15,7 @@
|
||||
|
||||
import json
|
||||
|
||||
import eventlet
|
||||
from futurist import periodics
|
||||
import mock
|
||||
|
||||
from ironic.common import exception
|
||||
@ -85,36 +85,21 @@ class PassthruDecoratorTestCase(base.TestCase):
|
||||
inst2.driver_routes['driver_noexception']['func'])
|
||||
|
||||
|
||||
@mock.patch.object(eventlet.greenthread, 'spawn_n', autospec=True,
|
||||
side_effect=lambda func, *args, **kw: func(*args, **kw))
|
||||
class DriverPeriodicTaskTestCase(base.TestCase):
|
||||
def test(self, spawn_mock):
|
||||
def test(self):
|
||||
method_mock = mock.MagicMock(spec_set=[])
|
||||
function_mock = mock.MagicMock(spec_set=[])
|
||||
|
||||
class TestClass(object):
|
||||
@driver_base.driver_periodic_task(spacing=42)
|
||||
def method(self, foo, bar=None):
|
||||
method_mock(foo, bar=bar)
|
||||
|
||||
@driver_base.driver_periodic_task(spacing=100, parallel=False)
|
||||
def function():
|
||||
function_mock()
|
||||
|
||||
obj = TestClass()
|
||||
self.assertEqual(42, obj.method._periodic_spacing)
|
||||
self.assertTrue(obj.method._periodic_task)
|
||||
self.assertEqual('ironic.tests.unit.drivers.test_base.method',
|
||||
obj.method._periodic_name)
|
||||
self.assertEqual('ironic.tests.unit.drivers.test_base.function',
|
||||
function._periodic_name)
|
||||
self.assertTrue(periodics.is_periodic(obj.method))
|
||||
|
||||
obj.method(1, bar=2)
|
||||
method_mock.assert_called_once_with(1, bar=2)
|
||||
self.assertEqual(1, spawn_mock.call_count)
|
||||
function()
|
||||
function_mock.assert_called_once_with()
|
||||
self.assertEqual(1, spawn_mock.call_count)
|
||||
|
||||
|
||||
class CleanStepDecoratorTestCase(base.TestCase):
|
||||
|
16
releasenotes/notes/futurist-e9c55699f479f97a.yaml
Normal file
16
releasenotes/notes/futurist-e9c55699f479f97a.yaml
Normal file
@ -0,0 +1,16 @@
|
||||
---
|
||||
prelude: >
|
||||
This release features switch to Oslo Futurist library for asynchronous
|
||||
thread execution and periodic tasks. Main benefit is that periodic tasks
|
||||
are now executed truly in parallel, and not sequentially in one
|
||||
green thread.
|
||||
upgrade:
|
||||
- Configuration option "workers_pool_size" can no longer be less or equal
|
||||
to 2. Please set it to greater value (the default is 100) before update.
|
||||
deprecations:
|
||||
- Configuration option "periodic_interval" is deprecated.
|
||||
- Using "driver_periodic_task" decorator is deprecated. Please update your
|
||||
out-of-tree drivers to use "periodics.periodic" decorator from Futurist
|
||||
library.
|
||||
fixes:
|
||||
- Periodic tasks are no longer executed all in one thread.
|
@ -43,3 +43,4 @@ retrying!=1.3.0,>=1.2.3 # Apache-2.0
|
||||
oslo.versionedobjects>=1.5.0 # Apache-2.0
|
||||
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
|
||||
psutil<2.0.0,>=1.1.1 # BSD
|
||||
futurist>=0.11.0 # Apache-2.0
|
||||
|
Loading…
x
Reference in New Issue
Block a user