From c96197cf18bf237e0ac21dd05a10a0a31c3d5535 Mon Sep 17 00:00:00 2001 From: Galyna Zholtkevych Date: Tue, 6 Dec 2016 11:23:06 +0200 Subject: [PATCH] Make _send_sensors_data concurrent The method _send_sensor data is executed in a single thread for all nodes consequently and slow down collecting sensor data process. This makes this task concurrent by node distribution among configured in conductor number of workers using synchronized queue. Change-Id: Ic97bd798f2aa74998d185b6f50635f5862a677f5 Closes-Bug: 1642285 --- etc/ironic/ironic.conf.sample | 12 ++ ironic/conductor/manager.py | 59 ++++++-- ironic/conf/conductor.py | 15 +- ironic/tests/unit/conductor/test_manager.py | 128 ++++++++++-------- ...for-send-sensor-data-89d29c12da30ec54.yaml | 11 ++ 5 files changed, 156 insertions(+), 69 deletions(-) create mode 100644 releasenotes/notes/multiple-workers-for-send-sensor-data-89d29c12da30ec54.yaml diff --git a/etc/ironic/ironic.conf.sample b/etc/ironic/ironic.conf.sample index b6570e2d90..7be00fd88a 100644 --- a/etc/ironic/ironic.conf.sample +++ b/etc/ironic/ironic.conf.sample @@ -930,6 +930,18 @@ # ceilometer via the notification bus. (integer value) #send_sensor_data_interval = 600 +# The maximum number of workers that can be started +# simultaneously for send data from sensors periodic task. +# (integer value) +# Minimum value: 1 +#send_sensor_data_workers = 4 + +# The time in seconds to wait for send sensors data periodic +# task to be finished before allowing periodic call to happen +# again. Should be less than send_sensor_data_interval value. +# (integer value) +#send_sensor_data_wait_timeout = 300 + # List of comma separated meter types which need to be sent to # Ceilometer. The default value, "ALL", is a special value # meaning send all the sensor data. (list value) diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index cfc438e462..bb394852c3 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -44,10 +44,12 @@ notifying Neutron of a change, etc. import collections import datetime +from six.moves import queue import tempfile import eventlet from futurist import periodics +from futurist import waiters from ironic_lib import metrics_utils from oslo_log import log import oslo_messaging as messaging @@ -2021,19 +2023,14 @@ class ConductorManager(base_manager.BaseConductorManager): driver = driver_factory.get_driver(driver_name) return driver.get_properties() - @METRICS.timer('ConductorManager._send_sensor_data') - @periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval) - def _send_sensor_data(self, context): - """Periodically sends sensor data to Ceilometer.""" - # do nothing if send_sensor_data option is False - if not CONF.conductor.send_sensor_data: - return - - filters = {'associated': True} - node_iter = self.iter_nodes(fields=['instance_uuid'], - filters=filters) - - for (node_uuid, driver, instance_uuid) in node_iter: + @METRICS.timer('ConductorManager._sensors_nodes_task') + def _sensors_nodes_task(self, context, nodes): + """Sends sensors data for nodes from synchronized queue.""" + while True: + try: + node_uuid, driver, instance_uuid = nodes.get_nowait() + except queue.Empty: + break # populate the message which will be sent to ceilometer message = {'message_id': uuidutils.generate_uuid(), 'instance_uuid': instance_uuid, @@ -2086,6 +2083,42 @@ class ConductorManager(base_manager.BaseConductorManager): # Yield on every iteration eventlet.sleep(0) + @METRICS.timer('ConductorManager._send_sensor_data') + @periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval) + def _send_sensor_data(self, context): + """Periodically sends sensor data to Ceilometer.""" + + # do nothing if send_sensor_data option is False + if not CONF.conductor.send_sensor_data: + return + + filters = {'associated': True} + nodes = queue.Queue() + for node_info in self.iter_nodes(fields=['instance_uuid'], + filters=filters): + nodes.put_nowait(node_info) + + number_of_threads = min(CONF.conductor.send_sensor_data_workers, + nodes.qsize()) + futures = [] + for thread_number in range(number_of_threads): + try: + futures.append( + self._spawn_worker(self._sensors_nodes_task, + context, nodes)) + except exception.NoFreeConductorWorker: + LOG.warning(_LW("There is no more conductor workers for " + "task of sending sensors data. %(workers)d " + "workers has been already spawned."), + {'workers': thread_number}) + break + + done, not_done = waiters.wait_for_all( + futures, timeout=CONF.conductor.send_sensor_data_wait_timeout) + if not_done: + LOG.warning(_LW("%d workers for send sensors data did not " + "complete"), len(not_done)) + def _filter_out_unsupported_types(self, sensors_data): """Filters out sensor data types that aren't specified in the config. diff --git a/ironic/conf/conductor.py b/ironic/conf/conductor.py index 932b82c48e..01b6c37a1e 100644 --- a/ironic/conf/conductor.py +++ b/ironic/conf/conductor.py @@ -80,8 +80,19 @@ opts = [ 'notification bus')), cfg.IntOpt('send_sensor_data_interval', default=600, - help=_('Seconds between conductor sending sensor data message' - ' to ceilometer via the notification bus.')), + help=_('Seconds between conductor sending sensor data message ' + 'to ceilometer via the notification bus.')), + cfg.IntOpt('send_sensor_data_workers', + default=4, min=1, + help=_('The maximum number of workers that can be started ' + 'simultaneously for send data from sensors periodic ' + 'task.')), + cfg.IntOpt('send_sensor_data_wait_timeout', + default=300, + help=_('The time in seconds to wait for send sensors data ' + 'periodic task to be finished before allowing periodic ' + 'call to happen again. Should be less than ' + 'send_sensor_data_interval value.')), cfg.ListOpt('send_sensor_data_types', default=['ALL'], help=_('List of comma separated meter types which need to be' diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py index 1b65fdc037..84d8c5686f 100644 --- a/ironic/tests/unit/conductor/test_manager.py +++ b/ironic/tests/unit/conductor/test_manager.py @@ -28,6 +28,7 @@ from oslo_utils import uuidutils from oslo_versionedobjects import base as ovo_base from oslo_versionedobjects import fields import six +from six.moves import queue from ironic.common import boot_devices from ironic.common import driver_factory @@ -3225,70 +3226,34 @@ class UpdatePortTestCase(mgr_utils.ServiceSetUpMixin, expected_result = {} self.assertEqual(expected_result, actual_result) - @mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor') - @mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list') @mock.patch.object(task_manager, 'acquire') - def test___send_sensor_data(self, acquire_mock, get_nodeinfo_list_mock, - _mapped_to_this_conductor_mock): - node = obj_utils.create_test_node(self.context, - driver='fake') + def test_send_sensor_task(self, acquire_mock): + nodes = queue.Queue() + for i in range(5): + nodes.put_nowait(('fake_uuid-%d' % i, 'fake', None)) self._start_service() CONF.set_override('send_sensor_data', True, group='conductor') + acquire_mock.return_value.__enter__.return_value.driver = self.driver with mock.patch.object(self.driver.management, 'get_sensors_data') as get_sensors_data_mock: with mock.patch.object(self.driver.management, 'validate') as validate_mock: get_sensors_data_mock.return_value = 'fake-sensor-data' - _mapped_to_this_conductor_mock.return_value = True - get_nodeinfo_list_mock.return_value = [(node.uuid, node.driver, - node.instance_uuid)] - self.service._send_sensor_data(self.context) - self.assertTrue(get_nodeinfo_list_mock.called) - self.assertTrue(_mapped_to_this_conductor_mock.called) - self.assertTrue(acquire_mock.called) - self.assertTrue(get_sensors_data_mock.called) - self.assertTrue(validate_mock.called) + self.service._sensors_nodes_task(self.context, nodes) + self.assertEqual(5, acquire_mock.call_count) + self.assertEqual(5, validate_mock.call_count) + self.assertEqual(5, get_sensors_data_mock.call_count) - @mock.patch.object(manager.ConductorManager, '_fail_if_in_state', - autospec=True) - @mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor') - @mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list') - @mock.patch.object(task_manager, 'acquire') - def test___send_sensor_data_disabled(self, acquire_mock, - get_nodeinfo_list_mock, - _mapped_to_this_conductor_mock, - mock_fail_if_state): - node = obj_utils.create_test_node(self.context, - driver='fake') - self._start_service() - acquire_mock.return_value.__enter__.return_value.driver = self.driver - with mock.patch.object(self.driver.management, - 'get_sensors_data') as get_sensors_data_mock: - with mock.patch.object(self.driver.management, - 'validate') as validate_mock: - get_sensors_data_mock.return_value = 'fake-sensor-data' - _mapped_to_this_conductor_mock.return_value = True - get_nodeinfo_list_mock.return_value = [(node.uuid, node.driver, - node.instance_uuid)] - self.service._send_sensor_data(self.context) - self.assertFalse(get_nodeinfo_list_mock.called) - self.assertFalse(_mapped_to_this_conductor_mock.called) - self.assertFalse(acquire_mock.called) - self.assertFalse(get_sensors_data_mock.called) - self.assertFalse(validate_mock.called) - mock_fail_if_state.assert_called_once_with( - mock.ANY, mock.ANY, - {'provision_state': 'deploying', 'reserved': False}, - 'deploying', 'provision_updated_at', - last_error=mock.ANY) - - @mock.patch.object(manager.ConductorManager, 'iter_nodes', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) - def test___send_sensor_data_no_management(self, acquire_mock, - iter_nodes_mock): + def test_send_sensor_task_no_management(self, acquire_mock): + nodes = queue.Queue() + nodes.put_nowait(('fake_uuid', 'fake', None)) + CONF.set_override('send_sensor_data', True, group='conductor') - iter_nodes_mock.return_value = [('fake_uuid1', 'fake', 'fake_uuid2')] + + self._start_service() + self.driver.management = None acquire_mock.return_value.__enter__.return_value.driver = self.driver @@ -3296,13 +3261,68 @@ class UpdatePortTestCase(mgr_utils.ServiceSetUpMixin, autospec=True) as get_sensors_data_mock: with mock.patch.object(fake.FakeManagement, 'validate', autospec=True) as validate_mock: - self.service._send_sensor_data(self.context) + self.service._sensors_nodes_task(self.context, nodes) - self.assertTrue(iter_nodes_mock.called) self.assertTrue(acquire_mock.called) self.assertFalse(get_sensors_data_mock.called) self.assertFalse(validate_mock.called) + @mock.patch.object(manager.ConductorManager, '_spawn_worker') + @mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor') + @mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list') + def test___send_sensor_data(self, get_nodeinfo_list_mock, + _mapped_to_this_conductor_mock, + mock_spawn): + self._start_service() + + CONF.set_override('send_sensor_data', True, group='conductor') + # NOTE(galyna): do not wait for threads to be finished in unittests + CONF.set_override('send_sensor_data_wait_timeout', 0, + group='conductor') + _mapped_to_this_conductor_mock.return_value = True + get_nodeinfo_list_mock.return_value = [('fake_uuid', 'fake', None)] + self.service._send_sensor_data(self.context) + mock_spawn.assert_called_with(self.service._sensors_nodes_task, + self.context, + mock.ANY) + + @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker') + @mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor') + @mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list') + def test___send_sensor_data_multiple_workers( + self, get_nodeinfo_list_mock, _mapped_to_this_conductor_mock, + mock_spawn): + self._start_service() + mock_spawn.reset_mock() + + number_of_workers = 8 + CONF.set_override('send_sensor_data', True, group='conductor') + CONF.set_override('send_sensor_data_workers', number_of_workers, + group='conductor') + # NOTE(galyna): do not wait for threads to be finished in unittests + CONF.set_override('send_sensor_data_wait_timeout', 0, + group='conductor') + + _mapped_to_this_conductor_mock.return_value = True + get_nodeinfo_list_mock.return_value = [('fake_uuid', 'fake', + None)] * 20 + self.service._send_sensor_data(self.context) + self.assertEqual(number_of_workers, + mock_spawn.call_count) + + @mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor') + @mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list') + def test___send_sensor_data_disabled(self, get_nodeinfo_list_mock, + _mapped_to_this_conductor_mock): + self._start_service() + get_nodeinfo_list_mock.reset_mock() + with mock.patch.object(manager.ConductorManager, + '_spawn_worker') as _spawn_mock: + self.service._send_sensor_data(self.context) + self.assertFalse(get_nodeinfo_list_mock.called) + self.assertFalse(_mapped_to_this_conductor_mock.called) + self.assertFalse(_spawn_mock.called) + def test_set_boot_device(self): node = obj_utils.create_test_node(self.context, driver='fake') with mock.patch.object(self.driver.management, 'validate') as mock_val: diff --git a/releasenotes/notes/multiple-workers-for-send-sensor-data-89d29c12da30ec54.yaml b/releasenotes/notes/multiple-workers-for-send-sensor-data-89d29c12da30ec54.yaml new file mode 100644 index 0000000000..4a025a1144 --- /dev/null +++ b/releasenotes/notes/multiple-workers-for-send-sensor-data-89d29c12da30ec54.yaml @@ -0,0 +1,11 @@ +--- +features: + - Adds new option ``[conductor]/send_sensor_data_workers`` + to allow concurrently sending sensor data using the specified + number of green threads. + ``[conductor]/wait_timeout_for_send_sensor_data`` option allows + to specify the time to wait for all spawned green threads before + running the periodic task again. +upgrade: + - Increases number of workers from 1 to 4 for the ``send_sensor_data`` + periodic task.