Make _send_sensors_data concurrent

The method _send_sensor data is executed in a single
thread for all nodes consequently and slow down
collecting sensor data process.
This makes this task concurrent by node distribution
among configured in conductor number of workers using
synchronized queue.

Change-Id: Ic97bd798f2aa74998d185b6f50635f5862a677f5
Closes-Bug: 1642285
This commit is contained in:
Galyna Zholtkevych 2016-12-06 11:23:06 +02:00
parent 44e6e49052
commit c96197cf18
5 changed files with 156 additions and 69 deletions

View File

@ -930,6 +930,18 @@
# ceilometer via the notification bus. (integer value)
#send_sensor_data_interval = 600
# The maximum number of workers that can be started
# simultaneously for send data from sensors periodic task.
# (integer value)
# Minimum value: 1
#send_sensor_data_workers = 4
# The time in seconds to wait for send sensors data periodic
# task to be finished before allowing periodic call to happen
# again. Should be less than send_sensor_data_interval value.
# (integer value)
#send_sensor_data_wait_timeout = 300
# List of comma separated meter types which need to be sent to
# Ceilometer. The default value, "ALL", is a special value
# meaning send all the sensor data. (list value)

View File

@ -44,10 +44,12 @@ notifying Neutron of a change, etc.
import collections
import datetime
from six.moves import queue
import tempfile
import eventlet
from futurist import periodics
from futurist import waiters
from ironic_lib import metrics_utils
from oslo_log import log
import oslo_messaging as messaging
@ -2021,19 +2023,14 @@ class ConductorManager(base_manager.BaseConductorManager):
driver = driver_factory.get_driver(driver_name)
return driver.get_properties()
@METRICS.timer('ConductorManager._send_sensor_data')
@periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval)
def _send_sensor_data(self, context):
"""Periodically sends sensor data to Ceilometer."""
# do nothing if send_sensor_data option is False
if not CONF.conductor.send_sensor_data:
return
filters = {'associated': True}
node_iter = self.iter_nodes(fields=['instance_uuid'],
filters=filters)
for (node_uuid, driver, instance_uuid) in node_iter:
@METRICS.timer('ConductorManager._sensors_nodes_task')
def _sensors_nodes_task(self, context, nodes):
"""Sends sensors data for nodes from synchronized queue."""
while True:
try:
node_uuid, driver, instance_uuid = nodes.get_nowait()
except queue.Empty:
break
# populate the message which will be sent to ceilometer
message = {'message_id': uuidutils.generate_uuid(),
'instance_uuid': instance_uuid,
@ -2086,6 +2083,42 @@ class ConductorManager(base_manager.BaseConductorManager):
# Yield on every iteration
eventlet.sleep(0)
@METRICS.timer('ConductorManager._send_sensor_data')
@periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval)
def _send_sensor_data(self, context):
"""Periodically sends sensor data to Ceilometer."""
# do nothing if send_sensor_data option is False
if not CONF.conductor.send_sensor_data:
return
filters = {'associated': True}
nodes = queue.Queue()
for node_info in self.iter_nodes(fields=['instance_uuid'],
filters=filters):
nodes.put_nowait(node_info)
number_of_threads = min(CONF.conductor.send_sensor_data_workers,
nodes.qsize())
futures = []
for thread_number in range(number_of_threads):
try:
futures.append(
self._spawn_worker(self._sensors_nodes_task,
context, nodes))
except exception.NoFreeConductorWorker:
LOG.warning(_LW("There is no more conductor workers for "
"task of sending sensors data. %(workers)d "
"workers has been already spawned."),
{'workers': thread_number})
break
done, not_done = waiters.wait_for_all(
futures, timeout=CONF.conductor.send_sensor_data_wait_timeout)
if not_done:
LOG.warning(_LW("%d workers for send sensors data did not "
"complete"), len(not_done))
def _filter_out_unsupported_types(self, sensors_data):
"""Filters out sensor data types that aren't specified in the config.

View File

@ -80,8 +80,19 @@ opts = [
'notification bus')),
cfg.IntOpt('send_sensor_data_interval',
default=600,
help=_('Seconds between conductor sending sensor data message'
' to ceilometer via the notification bus.')),
help=_('Seconds between conductor sending sensor data message '
'to ceilometer via the notification bus.')),
cfg.IntOpt('send_sensor_data_workers',
default=4, min=1,
help=_('The maximum number of workers that can be started '
'simultaneously for send data from sensors periodic '
'task.')),
cfg.IntOpt('send_sensor_data_wait_timeout',
default=300,
help=_('The time in seconds to wait for send sensors data '
'periodic task to be finished before allowing periodic '
'call to happen again. Should be less than '
'send_sensor_data_interval value.')),
cfg.ListOpt('send_sensor_data_types',
default=['ALL'],
help=_('List of comma separated meter types which need to be'

View File

@ -28,6 +28,7 @@ from oslo_utils import uuidutils
from oslo_versionedobjects import base as ovo_base
from oslo_versionedobjects import fields
import six
from six.moves import queue
from ironic.common import boot_devices
from ironic.common import driver_factory
@ -3225,70 +3226,34 @@ class UpdatePortTestCase(mgr_utils.ServiceSetUpMixin,
expected_result = {}
self.assertEqual(expected_result, actual_result)
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
@mock.patch.object(task_manager, 'acquire')
def test___send_sensor_data(self, acquire_mock, get_nodeinfo_list_mock,
_mapped_to_this_conductor_mock):
node = obj_utils.create_test_node(self.context,
driver='fake')
def test_send_sensor_task(self, acquire_mock):
nodes = queue.Queue()
for i in range(5):
nodes.put_nowait(('fake_uuid-%d' % i, 'fake', None))
self._start_service()
CONF.set_override('send_sensor_data', True, group='conductor')
acquire_mock.return_value.__enter__.return_value.driver = self.driver
with mock.patch.object(self.driver.management,
'get_sensors_data') as get_sensors_data_mock:
with mock.patch.object(self.driver.management,
'validate') as validate_mock:
get_sensors_data_mock.return_value = 'fake-sensor-data'
_mapped_to_this_conductor_mock.return_value = True
get_nodeinfo_list_mock.return_value = [(node.uuid, node.driver,
node.instance_uuid)]
self.service._send_sensor_data(self.context)
self.assertTrue(get_nodeinfo_list_mock.called)
self.assertTrue(_mapped_to_this_conductor_mock.called)
self.assertTrue(acquire_mock.called)
self.assertTrue(get_sensors_data_mock.called)
self.assertTrue(validate_mock.called)
self.service._sensors_nodes_task(self.context, nodes)
self.assertEqual(5, acquire_mock.call_count)
self.assertEqual(5, validate_mock.call_count)
self.assertEqual(5, get_sensors_data_mock.call_count)
@mock.patch.object(manager.ConductorManager, '_fail_if_in_state',
autospec=True)
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
@mock.patch.object(task_manager, 'acquire')
def test___send_sensor_data_disabled(self, acquire_mock,
get_nodeinfo_list_mock,
_mapped_to_this_conductor_mock,
mock_fail_if_state):
node = obj_utils.create_test_node(self.context,
driver='fake')
self._start_service()
acquire_mock.return_value.__enter__.return_value.driver = self.driver
with mock.patch.object(self.driver.management,
'get_sensors_data') as get_sensors_data_mock:
with mock.patch.object(self.driver.management,
'validate') as validate_mock:
get_sensors_data_mock.return_value = 'fake-sensor-data'
_mapped_to_this_conductor_mock.return_value = True
get_nodeinfo_list_mock.return_value = [(node.uuid, node.driver,
node.instance_uuid)]
self.service._send_sensor_data(self.context)
self.assertFalse(get_nodeinfo_list_mock.called)
self.assertFalse(_mapped_to_this_conductor_mock.called)
self.assertFalse(acquire_mock.called)
self.assertFalse(get_sensors_data_mock.called)
self.assertFalse(validate_mock.called)
mock_fail_if_state.assert_called_once_with(
mock.ANY, mock.ANY,
{'provision_state': 'deploying', 'reserved': False},
'deploying', 'provision_updated_at',
last_error=mock.ANY)
@mock.patch.object(manager.ConductorManager, 'iter_nodes', autospec=True)
@mock.patch.object(task_manager, 'acquire', autospec=True)
def test___send_sensor_data_no_management(self, acquire_mock,
iter_nodes_mock):
def test_send_sensor_task_no_management(self, acquire_mock):
nodes = queue.Queue()
nodes.put_nowait(('fake_uuid', 'fake', None))
CONF.set_override('send_sensor_data', True, group='conductor')
iter_nodes_mock.return_value = [('fake_uuid1', 'fake', 'fake_uuid2')]
self._start_service()
self.driver.management = None
acquire_mock.return_value.__enter__.return_value.driver = self.driver
@ -3296,13 +3261,68 @@ class UpdatePortTestCase(mgr_utils.ServiceSetUpMixin,
autospec=True) as get_sensors_data_mock:
with mock.patch.object(fake.FakeManagement, 'validate',
autospec=True) as validate_mock:
self.service._send_sensor_data(self.context)
self.service._sensors_nodes_task(self.context, nodes)
self.assertTrue(iter_nodes_mock.called)
self.assertTrue(acquire_mock.called)
self.assertFalse(get_sensors_data_mock.called)
self.assertFalse(validate_mock.called)
@mock.patch.object(manager.ConductorManager, '_spawn_worker')
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
def test___send_sensor_data(self, get_nodeinfo_list_mock,
_mapped_to_this_conductor_mock,
mock_spawn):
self._start_service()
CONF.set_override('send_sensor_data', True, group='conductor')
# NOTE(galyna): do not wait for threads to be finished in unittests
CONF.set_override('send_sensor_data_wait_timeout', 0,
group='conductor')
_mapped_to_this_conductor_mock.return_value = True
get_nodeinfo_list_mock.return_value = [('fake_uuid', 'fake', None)]
self.service._send_sensor_data(self.context)
mock_spawn.assert_called_with(self.service._sensors_nodes_task,
self.context,
mock.ANY)
@mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker')
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
def test___send_sensor_data_multiple_workers(
self, get_nodeinfo_list_mock, _mapped_to_this_conductor_mock,
mock_spawn):
self._start_service()
mock_spawn.reset_mock()
number_of_workers = 8
CONF.set_override('send_sensor_data', True, group='conductor')
CONF.set_override('send_sensor_data_workers', number_of_workers,
group='conductor')
# NOTE(galyna): do not wait for threads to be finished in unittests
CONF.set_override('send_sensor_data_wait_timeout', 0,
group='conductor')
_mapped_to_this_conductor_mock.return_value = True
get_nodeinfo_list_mock.return_value = [('fake_uuid', 'fake',
None)] * 20
self.service._send_sensor_data(self.context)
self.assertEqual(number_of_workers,
mock_spawn.call_count)
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
def test___send_sensor_data_disabled(self, get_nodeinfo_list_mock,
_mapped_to_this_conductor_mock):
self._start_service()
get_nodeinfo_list_mock.reset_mock()
with mock.patch.object(manager.ConductorManager,
'_spawn_worker') as _spawn_mock:
self.service._send_sensor_data(self.context)
self.assertFalse(get_nodeinfo_list_mock.called)
self.assertFalse(_mapped_to_this_conductor_mock.called)
self.assertFalse(_spawn_mock.called)
def test_set_boot_device(self):
node = obj_utils.create_test_node(self.context, driver='fake')
with mock.patch.object(self.driver.management, 'validate') as mock_val:

View File

@ -0,0 +1,11 @@
---
features:
- Adds new option ``[conductor]/send_sensor_data_workers``
to allow concurrently sending sensor data using the specified
number of green threads.
``[conductor]/wait_timeout_for_send_sensor_data`` option allows
to specify the time to wait for all spawned green threads before
running the periodic task again.
upgrade:
- Increases number of workers from 1 to 4 for the ``send_sensor_data``
periodic task.