Merge "Collect periodic tasks from all enabled hardware interfaces"
This commit is contained in:
commit
ffefb805f3
@ -396,6 +396,15 @@ def interfaces(interface_type):
|
||||
return _get_all_drivers(_INTERFACE_LOADERS[interface_type]())
|
||||
|
||||
|
||||
def all_interfaces():
|
||||
"""Get all interfaces for all interface types.
|
||||
|
||||
:returns: Dictionary mapping interface type to dictionary mapping
|
||||
interface name to interface object.
|
||||
"""
|
||||
return {iface: interfaces(iface) for iface in _INTERFACE_LOADERS}
|
||||
|
||||
|
||||
def enabled_supported_interfaces(hardware_type):
|
||||
"""Get usable interfaces for a given hardware type.
|
||||
|
||||
|
@ -149,33 +149,7 @@ class BaseConductorManager(object):
|
||||
LOG.error(msg, {'host': self.host, 'names': name_clashes})
|
||||
raise exception.DriverNameConflict(names=name_clashes)
|
||||
|
||||
# Collect driver-specific periodic tasks.
|
||||
# Conductor periodic tasks accept context argument, driver periodic
|
||||
# tasks accept this manager and context. We have to ensure that the
|
||||
# same driver interface class is not traversed twice, otherwise
|
||||
# we'll have several instances of the same task.
|
||||
LOG.debug('Collecting periodic tasks')
|
||||
self._periodic_task_callables = []
|
||||
periodic_task_classes = set()
|
||||
self._collect_periodic_tasks(self, (admin_context,))
|
||||
for driver_obj in drivers.values():
|
||||
for iface_name in driver_obj.all_interfaces:
|
||||
iface = getattr(driver_obj, iface_name, None)
|
||||
if iface and iface.__class__ not in periodic_task_classes:
|
||||
self._collect_periodic_tasks(iface, (self, admin_context))
|
||||
periodic_task_classes.add(iface.__class__)
|
||||
|
||||
if (len(self._periodic_task_callables) >
|
||||
CONF.conductor.workers_pool_size):
|
||||
LOG.warning('This conductor has %(tasks)d periodic tasks '
|
||||
'enabled, but only %(workers)d task workers '
|
||||
'allowed by [conductor]workers_pool_size option',
|
||||
{'tasks': len(self._periodic_task_callables),
|
||||
'workers': CONF.conductor.workers_pool_size})
|
||||
|
||||
self._periodic_tasks = periodics.PeriodicWorker(
|
||||
self._periodic_task_callables,
|
||||
executor_factory=periodics.ExistingExecutor(self._executor))
|
||||
self._collect_periodic_tasks(admin_context)
|
||||
|
||||
# Check for required config options if object_store_endpoint_type is
|
||||
# radosgw
|
||||
@ -267,6 +241,66 @@ class BaseConductorManager(object):
|
||||
state, 'provision_updated_at',
|
||||
last_error=last_error)
|
||||
|
||||
def _collect_periodic_tasks(self, admin_context):
|
||||
"""Collect driver-specific periodic tasks.
|
||||
|
||||
Conductor periodic tasks accept context argument, driver periodic
|
||||
tasks accept this manager and context. We have to ensure that the
|
||||
same driver interface class is not traversed twice, otherwise
|
||||
we'll have several instances of the same task.
|
||||
|
||||
:param admin_context: Administrator context to pass to tasks.
|
||||
"""
|
||||
LOG.debug('Collecting periodic tasks')
|
||||
# collected callables
|
||||
periodic_task_callables = []
|
||||
# list of visited classes to avoid adding the same tasks twice
|
||||
periodic_task_classes = set()
|
||||
|
||||
def _collect_from(obj, args):
|
||||
"""Collect tasks from the given object.
|
||||
|
||||
:param obj: the object to collect tasks from.
|
||||
:param args: a tuple of arguments to pass to tasks.
|
||||
"""
|
||||
if obj and obj.__class__ not in periodic_task_classes:
|
||||
for name, member in inspect.getmembers(obj):
|
||||
if periodics.is_periodic(member):
|
||||
LOG.debug('Found periodic task %(owner)s.%(member)s',
|
||||
{'owner': obj.__class__.__name__,
|
||||
'member': name})
|
||||
periodic_task_callables.append((member, args, {}))
|
||||
periodic_task_classes.add(obj.__class__)
|
||||
|
||||
# First, collect tasks from the conductor itself
|
||||
_collect_from(self, (admin_context,))
|
||||
|
||||
# Second, collect tasks from hardware interfaces
|
||||
for ifaces in driver_factory.all_interfaces().values():
|
||||
for iface in ifaces.values():
|
||||
_collect_from(iface, args=(self, admin_context))
|
||||
# TODO(dtantsur): allow periodics on hardware types themselves?
|
||||
|
||||
# Finally, collect tasks from interfaces of classic drivers, since they
|
||||
# are not necessary registered as new-style hardware interfaces.
|
||||
for driver_obj in driver_factory.drivers().values():
|
||||
for iface_name in driver_obj.all_interfaces:
|
||||
iface = getattr(driver_obj, iface_name, None)
|
||||
_collect_from(iface, args=(self, admin_context))
|
||||
|
||||
if len(periodic_task_callables) > CONF.conductor.workers_pool_size:
|
||||
LOG.warning('This conductor has %(tasks)d periodic tasks '
|
||||
'enabled, but only %(workers)d task workers '
|
||||
'allowed by [conductor]workers_pool_size option',
|
||||
{'tasks': len(periodic_task_callables),
|
||||
'workers': CONF.conductor.workers_pool_size})
|
||||
|
||||
self._periodic_tasks = periodics.PeriodicWorker(
|
||||
periodic_task_callables,
|
||||
executor_factory=periodics.ExistingExecutor(self._executor))
|
||||
# This is only used in tests currently. Delete it?
|
||||
self._periodic_task_callables = periodic_task_callables
|
||||
|
||||
def del_host(self, deregister=True):
|
||||
# Conductor deregistration fails if called on non-initialized
|
||||
# conductor (e.g. when rpc server is unreachable).
|
||||
@ -331,22 +365,6 @@ class BaseConductorManager(object):
|
||||
# TODO(jroll) validate against other conductor, warn if different
|
||||
# how do we do this performantly? :|
|
||||
|
||||
def _collect_periodic_tasks(self, obj, args):
|
||||
"""Collect periodic tasks from a given object.
|
||||
|
||||
Populates self._periodic_task_callables with tuples
|
||||
(callable, args, kwargs).
|
||||
|
||||
:param obj: object containing periodic tasks as methods
|
||||
:param args: tuple with arguments to pass to every task
|
||||
"""
|
||||
for name, member in inspect.getmembers(obj):
|
||||
if periodics.is_periodic(member):
|
||||
LOG.debug('Found periodic task %(owner)s.%(member)s',
|
||||
{'owner': obj.__class__.__name__,
|
||||
'member': name})
|
||||
self._periodic_task_callables.append((member, args, {}))
|
||||
|
||||
def _on_periodic_tasks_stop(self, fut):
|
||||
try:
|
||||
fut.result()
|
||||
|
@ -111,16 +111,21 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
self.assertEqual(2, net_factory.call_count)
|
||||
self.assertEqual(2, storage_factory.call_count)
|
||||
|
||||
@mock.patch.object(driver_factory.DriverFactory, '__getitem__')
|
||||
def test_start_registers_driver_specific_tasks(self, get_mock):
|
||||
init_names = ['fake1']
|
||||
self.config(enabled_drivers=init_names)
|
||||
|
||||
@mock.patch.object(driver_factory, 'all_interfaces', autospec=True)
|
||||
@mock.patch.object(driver_factory, 'hardware_types', autospec=True)
|
||||
@mock.patch.object(driver_factory, 'drivers', autospec=True)
|
||||
def test_start_registers_driver_specific_tasks(self, mock_drivers,
|
||||
mock_hw_types, mock_ifaces):
|
||||
class TestInterface(object):
|
||||
@periodics.periodic(spacing=100500)
|
||||
def iface(self):
|
||||
pass
|
||||
|
||||
class TestInterface2(object):
|
||||
@periodics.periodic(spacing=100500)
|
||||
def iface(self):
|
||||
pass
|
||||
|
||||
class Driver(object):
|
||||
core_interfaces = []
|
||||
standard_interfaces = ['iface']
|
||||
@ -132,22 +137,27 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
def task(self, context):
|
||||
pass
|
||||
|
||||
obj = Driver()
|
||||
get_mock.return_value = mock.Mock(obj=obj)
|
||||
driver = Driver()
|
||||
iface1 = TestInterface()
|
||||
iface2 = TestInterface2()
|
||||
expected = [iface1.iface, iface2.iface]
|
||||
|
||||
mock_drivers.return_value = {'fake1': driver}
|
||||
mock_ifaces.return_value = {
|
||||
'management': {'fake1': iface1},
|
||||
'power': {'fake2': iface2}
|
||||
}
|
||||
|
||||
with mock.patch.object(
|
||||
driver_factory.DriverFactory()._extension_manager,
|
||||
'names') as mock_names:
|
||||
mock_names.return_value = init_names
|
||||
self._start_service(start_periodic_tasks=True)
|
||||
|
||||
tasks = {c[0] for c in self.service._periodic_task_callables}
|
||||
self.assertTrue(periodics.is_periodic(obj.iface.iface))
|
||||
self.assertIn(obj.iface.iface, tasks)
|
||||
for item in expected:
|
||||
self.assertTrue(periodics.is_periodic(item))
|
||||
self.assertIn(item, tasks)
|
||||
|
||||
# no periodic tasks from the Driver object
|
||||
self.assertTrue(periodics.is_periodic(obj.task))
|
||||
self.assertNotIn(obj.task, tasks)
|
||||
self.assertTrue(periodics.is_periodic(driver.task))
|
||||
self.assertNotIn(driver.task, tasks)
|
||||
|
||||
@mock.patch.object(driver_factory.DriverFactory, '__init__')
|
||||
def test_start_fails_on_missing_driver(self, mock_df):
|
||||
@ -194,7 +204,7 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
df_mock.return_value = driver_factory_mock
|
||||
self.service.init_host()
|
||||
self.assertFalse(log_mock.error.called)
|
||||
df_mock.assert_called_once_with()
|
||||
df_mock.assert_called_with()
|
||||
self.assertFalse(del_mock.called)
|
||||
|
||||
@mock.patch.object(base_manager, 'LOG')
|
||||
@ -208,7 +218,7 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
ht_mock.return_value = driver_factory_mock
|
||||
self.service.init_host()
|
||||
self.assertFalse(log_mock.error.called)
|
||||
ht_mock.assert_called_once_with()
|
||||
ht_mock.assert_called_with()
|
||||
self.assertFalse(del_mock.called)
|
||||
|
||||
@mock.patch.object(base_manager, 'LOG')
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
Fixes collection of periodic tasks from hardware interfaces that are not
|
||||
used in any enabled classic drivers. See `bug 2001884
|
||||
<https://storyboard.openstack.org/#!/story/2001884>`_ for details.
|
Loading…
Reference in New Issue
Block a user