From d473bdabfdaf8fa2538de1056e804355a40979dc Mon Sep 17 00:00:00 2001 From: Yuriy Zveryanskyy Date: Thu, 6 Mar 2014 18:00:41 +0200 Subject: [PATCH] Add worker threads limit to _check_deploy_timeouts task New nodes filter parameters added to db.sqlalchemy.api. This parameters used in _check_deploy_timeouts periodic task for limit of worker threads which can be started simultaneously. New config option 'periodic_max_workers' added to conductor group. Closes-Bug: #1285793 Change-Id: I646540e334dec05682640c05cedb315e0ee355bc --- etc/ironic/ironic.conf.sample | 5 +++ ironic/conductor/manager.py | 57 +++++++++++++------------- ironic/conductor/utils.py | 8 ++++ ironic/db/api.py | 6 +++ ironic/db/sqlalchemy/api.py | 6 +++ ironic/tests/conductor/test_manager.py | 39 ++++++++++++++++++ ironic/tests/db/test_nodes.py | 29 +++++++++++++ 7 files changed, 121 insertions(+), 29 deletions(-) diff --git a/etc/ironic/ironic.conf.sample b/etc/ironic/ironic.conf.sample index dd7f7c35e3..6ecead896a 100644 --- a/etc/ironic/ironic.conf.sample +++ b/etc/ironic/ironic.conf.sample @@ -508,6 +508,11 @@ # the node power state in DB (integer value) #power_state_sync_max_retries=3 +# Maximum number of worker threads that can be started +# simultaneously by a periodic task. Should be less than RPC +# thread pool size. (integer value) +#periodic_max_workers=8 + [database] diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index 3bbd08bbe8..a69e4ee013 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -42,7 +42,6 @@ a change, etc. """ import collections -import datetime import eventlet from eventlet import greenpool @@ -64,7 +63,6 @@ from ironic.openstack.common import lockutils from ironic.openstack.common import log from ironic.openstack.common import periodic_task from ironic.openstack.common.rpc import common as messaging -from ironic.openstack.common import timeutils MANAGER_TOPIC = 'ironic.conductor_manager' WORKER_SPAWN_lOCK = "conductor_worker_spawn" @@ -108,6 +106,11 @@ conductor_opts = [ 'number of times Ironic should try syncing the ' 'hardware node power state with the node power state ' 'in DB'), + cfg.IntOpt('periodic_max_workers', + default=8, + help='Maximum number of worker threads that can be started ' + 'simultaneously by a periodic task. Should be less ' + 'than RPC thread pool size.'), ] CONF = cfg.CONF @@ -628,39 +631,35 @@ class ConductorManager(service.PeriodicService): if not CONF.conductor.deploy_callback_timeout: return - filters = {'reserved': False, 'maintenance': False} - columns = ['uuid', 'driver', 'provision_state', 'provision_updated_at'] - node_list = self.dbapi.get_nodeinfo_list(columns=columns, - filters=filters) + filters = {'reserved': False, 'provision_state': states.DEPLOYWAIT, + 'provisioned_before': CONF.conductor.deploy_callback_timeout} + columns = ['uuid', 'driver'] + node_list = self.dbapi.get_nodeinfo_list( + columns=columns, + filters=filters, + sort_key='provision_updated_at', + sort_dir='asc') - for (node_uuid, driver, state, update_time) in node_list: + workers_count = 0 + for node_uuid, driver in node_list: if not self._mapped_to_this_conductor(node_uuid, driver): continue - if state == states.DEPLOYWAIT: - limit = (timeutils.utcnow() - datetime.timedelta( - seconds=CONF.conductor.deploy_callback_timeout)) - if timeutils.normalize_time(update_time) <= limit: - try: - task = task_manager.TaskManager(context, node_uuid) - except (exception.NodeLocked, exception.NodeNotFound): - continue + try: + task = task_manager.TaskManager(context, node_uuid) + except (exception.NodeLocked, exception.NodeNotFound): + continue - node = task.node - node.provision_state = states.DEPLOYFAIL - node.target_provision_state = states.NOSTATE - msg = (_('Timeout reached when waiting callback for ' - 'node %s') % node_uuid) - node.last_error = msg - LOG.error(msg) - node.save(task.context) + try: + thread = self._spawn_worker(utils.cleanup_after_timeout, task) + thread.link(lambda t: task.release_resources()) + except exception.NoFreeConductorWorker: + task.release_resources() + break - try: - thread = self._spawn_worker( - utils.cleanup_after_timeout, task) - thread.link(lambda t: task.release_resources()) - except exception.NoFreeConductorWorker: - task.release_resources() + workers_count += 1 + if workers_count == CONF.conductor.periodic_max_workers: + break def rebalance_node_ring(self): """Perform any actions necessary when rebalancing the consistent hash. diff --git a/ironic/conductor/utils.py b/ironic/conductor/utils.py index e2690fedb3..b7c2b9896a 100644 --- a/ironic/conductor/utils.py +++ b/ironic/conductor/utils.py @@ -126,6 +126,14 @@ def cleanup_after_timeout(task): """ node = task.node context = task.context + node.provision_state = states.DEPLOYFAIL + node.target_provision_state = states.NOSTATE + msg = (_('Timeout reached while waiting for callback for node %s') + % node.uuid) + node.last_error = msg + LOG.error(msg) + node.save(context) + error_msg = _('Cleanup failed for node %(node)s after deploy timeout: ' ' %(error)s') try: diff --git a/ironic/db/api.py b/ironic/db/api.py index e880ef343c..bc92d21c78 100644 --- a/ironic/db/api.py +++ b/ironic/db/api.py @@ -60,6 +60,9 @@ class Connection(object): 'maintenance': True | False 'chassis_uuid': uuid of chassis 'driver': driver's name + 'provision_state': provision state of node + 'provisioned_before': nodes with provision_updated_at + field before this interval in seconds :param limit: Maximum number of nodes to return. :param marker: the last item of the previous page; we return the next result set. @@ -80,6 +83,9 @@ class Connection(object): 'maintenance': True | False 'chassis_uuid': uuid of chassis 'driver': driver's name + 'provision_state': provision state of node + 'provisioned_before': nodes with provision_updated_at + field before this interval in seconds :param limit: Maximum number of nodes to return. :param marker: the last item of the previous page; we return the next result set. diff --git a/ironic/db/sqlalchemy/api.py b/ironic/db/sqlalchemy/api.py index 64636513e1..41ab6d0113 100644 --- a/ironic/db/sqlalchemy/api.py +++ b/ironic/db/sqlalchemy/api.py @@ -229,6 +229,12 @@ class Connection(api.Connection): query = query.filter_by(maintenance=filters['maintenance']) if 'driver' in filters: query = query.filter_by(driver=filters['driver']) + if 'provision_state' in filters: + query = query.filter_by(provision_state=filters['provision_state']) + if 'provisioned_before' in filters: + limit = timeutils.utcnow() - datetime.timedelta( + seconds=filters['provisioned_before']) + query = query.filter(models.Node.provision_updated_at < limit) return query diff --git a/ironic/tests/conductor/test_manager.py b/ironic/tests/conductor/test_manager.py index e8ac3dab53..e126c2c7a1 100644 --- a/ironic/tests/conductor/test_manager.py +++ b/ironic/tests/conductor/test_manager.py @@ -953,6 +953,45 @@ class ManagerTestCase(tests_db_base.DbTestCase): self.service._worker_pool.waitall() spawn_mock.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY) + @mock.patch.object(timeutils, 'utcnow') + def test__check_deploy_timeouts_limit(self, mock_utcnow): + self.config(deploy_callback_timeout=60, group='conductor') + self.config(periodic_max_workers=2, group='conductor') + past = datetime.datetime(2000, 1, 1, 0, 0) + present = past + datetime.timedelta(minutes=10) + mock_utcnow.return_value = past + self._start_service() + + test_nodes = [] + for i in range(3): + next = past + datetime.timedelta(minutes=i) + n = utils.get_test_node(provision_state=states.DEPLOYWAIT, + target_provision_state=states.DEPLOYDONE, + provision_updated_at=next, + uuid=ironic_utils.generate_uuid()) + del n['id'] + node = self.dbapi.create_node(n) + test_nodes.append(node) + + mock_utcnow.return_value = present + self.service._conductor_service_record_keepalive(self.context) + with mock.patch.object(self.driver.deploy, 'clean_up') as clean_mock: + self.service._check_deploy_timeouts(self.context) + self.service._worker_pool.waitall() + for node in test_nodes[:-1]: + node.refresh(self.context) + self.assertEqual(states.DEPLOYFAIL, node.provision_state) + self.assertEqual(states.NOSTATE, node.target_provision_state) + self.assertIsNotNone(node.last_error) + + last_node = test_nodes[2] + last_node.refresh(self.context) + self.assertEqual(states.DEPLOYWAIT, last_node.provision_state) + self.assertEqual(states.DEPLOYDONE, + last_node.target_provision_state) + self.assertIsNone(last_node.last_error) + self.assertEqual(2, clean_mock.call_count) + def test_set_console_mode_enabled(self): ndict = utils.get_test_node(driver='fake') node = self.dbapi.create_node(ndict) diff --git a/ironic/tests/db/test_nodes.py b/ironic/tests/db/test_nodes.py index 4f631e9d74..6a32ffff94 100644 --- a/ironic/tests/db/test_nodes.py +++ b/ironic/tests/db/test_nodes.py @@ -21,6 +21,7 @@ import mock import six from ironic.common import exception +from ironic.common import states from ironic.common import utils as ironic_utils from ironic.db import api as dbapi from ironic.openstack.common import timeutils @@ -152,6 +153,34 @@ class DbNodeTestCase(base.DbTestCase): res = self.dbapi.get_node_list(filters={'maintenance': False}) self.assertEqual([1], [r.id for r in res]) + @mock.patch.object(timeutils, 'utcnow') + def test_get_nodeinfo_list_provision(self, mock_utcnow): + past = datetime.datetime(2000, 1, 1, 0, 0) + next = past + datetime.timedelta(minutes=8) + present = past + datetime.timedelta(minutes=10) + mock_utcnow.return_value = past + + # node with provision_updated timeout + n1 = utils.get_test_node(id=1, uuid=ironic_utils.generate_uuid(), + provision_updated_at=past) + # node with None in provision_updated_at + n2 = utils.get_test_node(id=2, uuid=ironic_utils.generate_uuid(), + provision_state=states.DEPLOYWAIT) + # node without timeout + n3 = utils.get_test_node(id=3, uuid=ironic_utils.generate_uuid(), + provision_updated_at=next) + self.dbapi.create_node(n1) + self.dbapi.create_node(n2) + self.dbapi.create_node(n3) + + mock_utcnow.return_value = present + res = self.dbapi.get_nodeinfo_list(filters={'provisioned_before': 300}) + self.assertEqual([1], [r[0] for r in res]) + + res = self.dbapi.get_nodeinfo_list(filters={'provision_state': + states.DEPLOYWAIT}) + self.assertEqual([2], [r[0] for r in res]) + def test_get_node_list(self): uuids = [] for i in range(1, 6):