Merge "Make _send_sensors_data concurrent"
This commit is contained in:
commit
fd1445acaa
@ -940,6 +940,18 @@
|
||||
# ceilometer via the notification bus. (integer value)
|
||||
#send_sensor_data_interval = 600
|
||||
|
||||
# The maximum number of workers that can be started
|
||||
# simultaneously for send data from sensors periodic task.
|
||||
# (integer value)
|
||||
# Minimum value: 1
|
||||
#send_sensor_data_workers = 4
|
||||
|
||||
# The time in seconds to wait for send sensors data periodic
|
||||
# task to be finished before allowing periodic call to happen
|
||||
# again. Should be less than send_sensor_data_interval value.
|
||||
# (integer value)
|
||||
#send_sensor_data_wait_timeout = 300
|
||||
|
||||
# List of comma separated meter types which need to be sent to
|
||||
# Ceilometer. The default value, "ALL", is a special value
|
||||
# meaning send all the sensor data. (list value)
|
||||
|
@ -44,10 +44,12 @@ notifying Neutron of a change, etc.
|
||||
|
||||
import collections
|
||||
import datetime
|
||||
from six.moves import queue
|
||||
import tempfile
|
||||
|
||||
import eventlet
|
||||
from futurist import periodics
|
||||
from futurist import waiters
|
||||
from ironic_lib import metrics_utils
|
||||
from oslo_log import log
|
||||
import oslo_messaging as messaging
|
||||
@ -2054,19 +2056,14 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
driver = driver_factory.get_driver_or_hardware_type(driver_name)
|
||||
return driver.get_properties()
|
||||
|
||||
@METRICS.timer('ConductorManager._send_sensor_data')
|
||||
@periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval)
|
||||
def _send_sensor_data(self, context):
|
||||
"""Periodically sends sensor data to Ceilometer."""
|
||||
# do nothing if send_sensor_data option is False
|
||||
if not CONF.conductor.send_sensor_data:
|
||||
return
|
||||
|
||||
filters = {'associated': True}
|
||||
node_iter = self.iter_nodes(fields=['instance_uuid'],
|
||||
filters=filters)
|
||||
|
||||
for (node_uuid, driver, instance_uuid) in node_iter:
|
||||
@METRICS.timer('ConductorManager._sensors_nodes_task')
|
||||
def _sensors_nodes_task(self, context, nodes):
|
||||
"""Sends sensors data for nodes from synchronized queue."""
|
||||
while True:
|
||||
try:
|
||||
node_uuid, driver, instance_uuid = nodes.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
# populate the message which will be sent to ceilometer
|
||||
message = {'message_id': uuidutils.generate_uuid(),
|
||||
'instance_uuid': instance_uuid,
|
||||
@ -2119,6 +2116,42 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
# Yield on every iteration
|
||||
eventlet.sleep(0)
|
||||
|
||||
@METRICS.timer('ConductorManager._send_sensor_data')
|
||||
@periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval)
|
||||
def _send_sensor_data(self, context):
|
||||
"""Periodically sends sensor data to Ceilometer."""
|
||||
|
||||
# do nothing if send_sensor_data option is False
|
||||
if not CONF.conductor.send_sensor_data:
|
||||
return
|
||||
|
||||
filters = {'associated': True}
|
||||
nodes = queue.Queue()
|
||||
for node_info in self.iter_nodes(fields=['instance_uuid'],
|
||||
filters=filters):
|
||||
nodes.put_nowait(node_info)
|
||||
|
||||
number_of_threads = min(CONF.conductor.send_sensor_data_workers,
|
||||
nodes.qsize())
|
||||
futures = []
|
||||
for thread_number in range(number_of_threads):
|
||||
try:
|
||||
futures.append(
|
||||
self._spawn_worker(self._sensors_nodes_task,
|
||||
context, nodes))
|
||||
except exception.NoFreeConductorWorker:
|
||||
LOG.warning(_LW("There is no more conductor workers for "
|
||||
"task of sending sensors data. %(workers)d "
|
||||
"workers has been already spawned."),
|
||||
{'workers': thread_number})
|
||||
break
|
||||
|
||||
done, not_done = waiters.wait_for_all(
|
||||
futures, timeout=CONF.conductor.send_sensor_data_wait_timeout)
|
||||
if not_done:
|
||||
LOG.warning(_LW("%d workers for send sensors data did not "
|
||||
"complete"), len(not_done))
|
||||
|
||||
def _filter_out_unsupported_types(self, sensors_data):
|
||||
"""Filters out sensor data types that aren't specified in the config.
|
||||
|
||||
|
@ -80,8 +80,19 @@ opts = [
|
||||
'notification bus')),
|
||||
cfg.IntOpt('send_sensor_data_interval',
|
||||
default=600,
|
||||
help=_('Seconds between conductor sending sensor data message'
|
||||
' to ceilometer via the notification bus.')),
|
||||
help=_('Seconds between conductor sending sensor data message '
|
||||
'to ceilometer via the notification bus.')),
|
||||
cfg.IntOpt('send_sensor_data_workers',
|
||||
default=4, min=1,
|
||||
help=_('The maximum number of workers that can be started '
|
||||
'simultaneously for send data from sensors periodic '
|
||||
'task.')),
|
||||
cfg.IntOpt('send_sensor_data_wait_timeout',
|
||||
default=300,
|
||||
help=_('The time in seconds to wait for send sensors data '
|
||||
'periodic task to be finished before allowing periodic '
|
||||
'call to happen again. Should be less than '
|
||||
'send_sensor_data_interval value.')),
|
||||
cfg.ListOpt('send_sensor_data_types',
|
||||
default=['ALL'],
|
||||
help=_('List of comma separated meter types which need to be'
|
||||
|
@ -28,6 +28,7 @@ from oslo_utils import uuidutils
|
||||
from oslo_versionedobjects import base as ovo_base
|
||||
from oslo_versionedobjects import fields
|
||||
import six
|
||||
from six.moves import queue
|
||||
|
||||
from ironic.common import boot_devices
|
||||
from ironic.common import driver_factory
|
||||
@ -3273,70 +3274,34 @@ class UpdatePortTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
expected_result = {}
|
||||
self.assertEqual(expected_result, actual_result)
|
||||
|
||||
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
||||
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
|
||||
@mock.patch.object(task_manager, 'acquire')
|
||||
def test___send_sensor_data(self, acquire_mock, get_nodeinfo_list_mock,
|
||||
_mapped_to_this_conductor_mock):
|
||||
node = obj_utils.create_test_node(self.context,
|
||||
driver='fake')
|
||||
def test_send_sensor_task(self, acquire_mock):
|
||||
nodes = queue.Queue()
|
||||
for i in range(5):
|
||||
nodes.put_nowait(('fake_uuid-%d' % i, 'fake', None))
|
||||
self._start_service()
|
||||
CONF.set_override('send_sensor_data', True, group='conductor')
|
||||
|
||||
acquire_mock.return_value.__enter__.return_value.driver = self.driver
|
||||
with mock.patch.object(self.driver.management,
|
||||
'get_sensors_data') as get_sensors_data_mock:
|
||||
with mock.patch.object(self.driver.management,
|
||||
'validate') as validate_mock:
|
||||
get_sensors_data_mock.return_value = 'fake-sensor-data'
|
||||
_mapped_to_this_conductor_mock.return_value = True
|
||||
get_nodeinfo_list_mock.return_value = [(node.uuid, node.driver,
|
||||
node.instance_uuid)]
|
||||
self.service._send_sensor_data(self.context)
|
||||
self.assertTrue(get_nodeinfo_list_mock.called)
|
||||
self.assertTrue(_mapped_to_this_conductor_mock.called)
|
||||
self.assertTrue(acquire_mock.called)
|
||||
self.assertTrue(get_sensors_data_mock.called)
|
||||
self.assertTrue(validate_mock.called)
|
||||
self.service._sensors_nodes_task(self.context, nodes)
|
||||
self.assertEqual(5, acquire_mock.call_count)
|
||||
self.assertEqual(5, validate_mock.call_count)
|
||||
self.assertEqual(5, get_sensors_data_mock.call_count)
|
||||
|
||||
@mock.patch.object(manager.ConductorManager, '_fail_if_in_state',
|
||||
autospec=True)
|
||||
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
||||
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
|
||||
@mock.patch.object(task_manager, 'acquire')
|
||||
def test___send_sensor_data_disabled(self, acquire_mock,
|
||||
get_nodeinfo_list_mock,
|
||||
_mapped_to_this_conductor_mock,
|
||||
mock_fail_if_state):
|
||||
node = obj_utils.create_test_node(self.context,
|
||||
driver='fake')
|
||||
self._start_service()
|
||||
acquire_mock.return_value.__enter__.return_value.driver = self.driver
|
||||
with mock.patch.object(self.driver.management,
|
||||
'get_sensors_data') as get_sensors_data_mock:
|
||||
with mock.patch.object(self.driver.management,
|
||||
'validate') as validate_mock:
|
||||
get_sensors_data_mock.return_value = 'fake-sensor-data'
|
||||
_mapped_to_this_conductor_mock.return_value = True
|
||||
get_nodeinfo_list_mock.return_value = [(node.uuid, node.driver,
|
||||
node.instance_uuid)]
|
||||
self.service._send_sensor_data(self.context)
|
||||
self.assertFalse(get_nodeinfo_list_mock.called)
|
||||
self.assertFalse(_mapped_to_this_conductor_mock.called)
|
||||
self.assertFalse(acquire_mock.called)
|
||||
self.assertFalse(get_sensors_data_mock.called)
|
||||
self.assertFalse(validate_mock.called)
|
||||
mock_fail_if_state.assert_called_once_with(
|
||||
mock.ANY, mock.ANY,
|
||||
{'provision_state': 'deploying', 'reserved': False},
|
||||
'deploying', 'provision_updated_at',
|
||||
last_error=mock.ANY)
|
||||
|
||||
@mock.patch.object(manager.ConductorManager, 'iter_nodes', autospec=True)
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test___send_sensor_data_no_management(self, acquire_mock,
|
||||
iter_nodes_mock):
|
||||
def test_send_sensor_task_no_management(self, acquire_mock):
|
||||
nodes = queue.Queue()
|
||||
nodes.put_nowait(('fake_uuid', 'fake', None))
|
||||
|
||||
CONF.set_override('send_sensor_data', True, group='conductor')
|
||||
iter_nodes_mock.return_value = [('fake_uuid1', 'fake', 'fake_uuid2')]
|
||||
|
||||
self._start_service()
|
||||
|
||||
self.driver.management = None
|
||||
acquire_mock.return_value.__enter__.return_value.driver = self.driver
|
||||
|
||||
@ -3344,13 +3309,68 @@ class UpdatePortTestCase(mgr_utils.ServiceSetUpMixin,
|
||||
autospec=True) as get_sensors_data_mock:
|
||||
with mock.patch.object(fake.FakeManagement, 'validate',
|
||||
autospec=True) as validate_mock:
|
||||
self.service._send_sensor_data(self.context)
|
||||
self.service._sensors_nodes_task(self.context, nodes)
|
||||
|
||||
self.assertTrue(iter_nodes_mock.called)
|
||||
self.assertTrue(acquire_mock.called)
|
||||
self.assertFalse(get_sensors_data_mock.called)
|
||||
self.assertFalse(validate_mock.called)
|
||||
|
||||
@mock.patch.object(manager.ConductorManager, '_spawn_worker')
|
||||
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
||||
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
|
||||
def test___send_sensor_data(self, get_nodeinfo_list_mock,
|
||||
_mapped_to_this_conductor_mock,
|
||||
mock_spawn):
|
||||
self._start_service()
|
||||
|
||||
CONF.set_override('send_sensor_data', True, group='conductor')
|
||||
# NOTE(galyna): do not wait for threads to be finished in unittests
|
||||
CONF.set_override('send_sensor_data_wait_timeout', 0,
|
||||
group='conductor')
|
||||
_mapped_to_this_conductor_mock.return_value = True
|
||||
get_nodeinfo_list_mock.return_value = [('fake_uuid', 'fake', None)]
|
||||
self.service._send_sensor_data(self.context)
|
||||
mock_spawn.assert_called_with(self.service._sensors_nodes_task,
|
||||
self.context,
|
||||
mock.ANY)
|
||||
|
||||
@mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker')
|
||||
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
||||
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
|
||||
def test___send_sensor_data_multiple_workers(
|
||||
self, get_nodeinfo_list_mock, _mapped_to_this_conductor_mock,
|
||||
mock_spawn):
|
||||
self._start_service()
|
||||
mock_spawn.reset_mock()
|
||||
|
||||
number_of_workers = 8
|
||||
CONF.set_override('send_sensor_data', True, group='conductor')
|
||||
CONF.set_override('send_sensor_data_workers', number_of_workers,
|
||||
group='conductor')
|
||||
# NOTE(galyna): do not wait for threads to be finished in unittests
|
||||
CONF.set_override('send_sensor_data_wait_timeout', 0,
|
||||
group='conductor')
|
||||
|
||||
_mapped_to_this_conductor_mock.return_value = True
|
||||
get_nodeinfo_list_mock.return_value = [('fake_uuid', 'fake',
|
||||
None)] * 20
|
||||
self.service._send_sensor_data(self.context)
|
||||
self.assertEqual(number_of_workers,
|
||||
mock_spawn.call_count)
|
||||
|
||||
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
||||
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
|
||||
def test___send_sensor_data_disabled(self, get_nodeinfo_list_mock,
|
||||
_mapped_to_this_conductor_mock):
|
||||
self._start_service()
|
||||
get_nodeinfo_list_mock.reset_mock()
|
||||
with mock.patch.object(manager.ConductorManager,
|
||||
'_spawn_worker') as _spawn_mock:
|
||||
self.service._send_sensor_data(self.context)
|
||||
self.assertFalse(get_nodeinfo_list_mock.called)
|
||||
self.assertFalse(_mapped_to_this_conductor_mock.called)
|
||||
self.assertFalse(_spawn_mock.called)
|
||||
|
||||
def test_set_boot_device(self):
|
||||
node = obj_utils.create_test_node(self.context, driver='fake')
|
||||
with mock.patch.object(self.driver.management, 'validate') as mock_val:
|
||||
|
@ -0,0 +1,11 @@
|
||||
---
|
||||
features:
|
||||
- Adds new option ``[conductor]/send_sensor_data_workers``
|
||||
to allow concurrently sending sensor data using the specified
|
||||
number of green threads.
|
||||
``[conductor]/wait_timeout_for_send_sensor_data`` option allows
|
||||
to specify the time to wait for all spawned green threads before
|
||||
running the periodic task again.
|
||||
upgrade:
|
||||
- Increases number of workers from 1 to 4 for the ``send_sensor_data``
|
||||
periodic task.
|
Loading…
x
Reference in New Issue
Block a user