diff --git a/aodh/cmd/alarm.py b/aodh/cmd/alarm.py index 13383889b..6310ca063 100644 --- a/aodh/cmd/alarm.py +++ b/aodh/cmd/alarm.py @@ -25,14 +25,17 @@ from aodh import service def notifier(): conf = service.prepare_service() - os_service.launch(conf, notifier_svc.AlarmNotifierService(conf)).wait() + os_service.launch(conf, notifier_svc.AlarmNotifierService(conf), + workers=conf.notifier.workers).wait() def evaluator(): conf = service.prepare_service() - os_service.launch(conf, evaluator_svc.AlarmEvaluationService(conf)).wait() + os_service.launch(conf, evaluator_svc.AlarmEvaluationService(conf), + workers=conf.evaluator.workers).wait() def listener(): conf = service.prepare_service() - os_service.launch(conf, event_svc.EventAlarmEvaluationService(conf)).wait() + os_service.launch(conf, event_svc.EventAlarmEvaluationService(conf), + workers=conf.listener.workers).wait() diff --git a/aodh/evaluator/__init__.py b/aodh/evaluator/__init__.py index 67a31133d..1990aaa71 100644 --- a/aodh/evaluator/__init__.py +++ b/aodh/evaluator/__init__.py @@ -187,10 +187,6 @@ class AlarmEvaluationService(os_service.Service): def __init__(self, conf): super(AlarmEvaluationService, self).__init__() self.conf = conf - self.storage_conn = None - self._load_evaluators() - self.partition_coordinator = coordination.PartitionCoordinator( - conf.coordination.backend_url) @property def _storage_conn(self): @@ -229,6 +225,12 @@ class AlarmEvaluationService(os_service.Service): def start(self): super(AlarmEvaluationService, self).start() + + self.storage_conn = None + self._load_evaluators() + self.partition_coordinator = coordination.PartitionCoordinator( + self.conf.coordination.backend_url) + self.partition_coordinator.start() self.partition_coordinator.join_group(self.PARTITIONING_GROUP_NAME) @@ -268,9 +270,11 @@ class AlarmEvaluationService(os_service.Service): self.tg.add_timer(604800, lambda: None) def stop(self): - self.periodic.stop() - self.periodic.wait() - self.partition_coordinator.stop() + if getattr(self, 'periodic', None): + self.periodic.stop() + self.periodic.wait() + if getattr(self, 'partition_coordinator', None): + self.partition_coordinator.stop() super(AlarmEvaluationService, self).stop() def _assigned_alarms(self): diff --git a/aodh/event.py b/aodh/event.py index f17696862..2840aa88d 100644 --- a/aodh/event.py +++ b/aodh/event.py @@ -44,11 +44,11 @@ class EventAlarmEvaluationService(service.Service): def __init__(self, conf): super(EventAlarmEvaluationService, self).__init__() self.conf = conf - self.storage_conn = storage.get_connection_from_config(self.conf) - self.evaluator = event.EventAlarmEvaluator(self.conf) def start(self): super(EventAlarmEvaluationService, self).start() + self.storage_conn = storage.get_connection_from_config(self.conf) + self.evaluator = event.EventAlarmEvaluator(self.conf) self.listener = messaging.get_notification_listener( messaging.get_transport(self.conf), [oslo_messaging.Target(topic=self.conf.event_alarm_topic)], @@ -58,6 +58,7 @@ class EventAlarmEvaluationService(service.Service): self.tg.add_timer(604800, lambda: None) def stop(self): - self.listener.stop() - self.listener.wait() + if getattr(self, 'listener', None): + self.listener.stop() + self.listener.wait() super(EventAlarmEvaluationService, self).stop() diff --git a/aodh/notifier/__init__.py b/aodh/notifier/__init__.py index c7875b126..5b6c92c3b 100644 --- a/aodh/notifier/__init__.py +++ b/aodh/notifier/__init__.py @@ -58,26 +58,28 @@ class AlarmNotifierService(os_service.Service): def __init__(self, conf): super(AlarmNotifierService, self).__init__() - transport = messaging.get_transport(conf) - self.notifiers = extension.ExtensionManager( - self.NOTIFIER_EXTENSIONS_NAMESPACE, - invoke_on_load=True, - invoke_args=(conf,)) - - target = oslo_messaging.Target(topic=conf.notifier_topic) - self.listener = messaging.get_notification_listener( - transport, [target], - [AlarmEndpoint(self.notifiers)]) + self.conf = conf def start(self): super(AlarmNotifierService, self).start() + transport = messaging.get_transport(self.conf) + self.notifiers = extension.ExtensionManager( + self.NOTIFIER_EXTENSIONS_NAMESPACE, + invoke_on_load=True, + invoke_args=(self.conf,)) + + target = oslo_messaging.Target(topic=self.conf.notifier_topic) + self.listener = messaging.get_notification_listener( + transport, [target], + [AlarmEndpoint(self.notifiers)]) self.listener.start() # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None) def stop(self): - self.listener.stop() - self.listener.wait() + if getattr(self, 'listener', None): + self.listener.stop() + self.listener.wait() super(AlarmNotifierService, self).stop() diff --git a/aodh/opts.py b/aodh/opts.py index bc985e57f..a7ce77cdb 100644 --- a/aodh/opts.py +++ b/aodh/opts.py @@ -62,6 +62,9 @@ def list_opts(): ])), ('coordination', aodh.coordination.OPTS), ('database', aodh.storage.OPTS), + ('evaluator', aodh.service.EVALUATOR_OPTS), + ('listener', aodh.service.LISTENER_OPTS), + ('notifier', aodh.service.NOTIFIER_OPTS), ('service_credentials', aodh.keystone_client.OPTS), ] diff --git a/aodh/service.py b/aodh/service.py index 914ed0092..ba4b4af32 100644 --- a/aodh/service.py +++ b/aodh/service.py @@ -44,6 +44,30 @@ OPTS = [ ' collection of underlying meters.'), ] +EVALUATOR_OPTS = [ + cfg.IntOpt('workers', + default=1, + min=1, + help='Number of workers for evaluator service. ' + 'default value is 1.') +] + +NOTIFIER_OPTS = [ + cfg.IntOpt('workers', + default=1, + min=1, + help='Number of workers for notifier service. ' + 'default value is 1.') +] + +LISTENER_OPTS = [ + cfg.IntOpt('workers', + default=1, + min=1, + help='Number of workers for evaluator service. ' + 'default value is 1.') +] + def prepare_service(argv=None, config_files=None): conf = cfg.ConfigOpts() diff --git a/aodh/tests/unit/test_evaluator.py b/aodh/tests/unit/test_evaluator.py index ce27fdc2f..3ea7d9877 100644 --- a/aodh/tests/unit/test_evaluator.py +++ b/aodh/tests/unit/test_evaluator.py @@ -14,6 +14,8 @@ # under the License. """Tests for aodh.evaluator.AlarmEvaluationService. """ +import time + import mock from oslo_config import fixture as fixture_config from stevedore import extension @@ -28,9 +30,10 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): super(TestAlarmEvaluationService, self).setUp() conf = service.prepare_service(argv=[], config_files=[]) self.CONF = self.useFixture(fixture_config.Config(conf)).conf + self.CONF.set_override('workers', 1, 'evaluator') self.setup_messaging(self.CONF) - self.threshold_eval = mock.Mock() + self.threshold_eval = mock.MagicMock() self.evaluators = extension.ExtensionManager.make_test_instance( [ extension.Extension( @@ -41,15 +44,11 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): ] ) - self.storage_conn = mock.MagicMock() self.svc = evaluator.AlarmEvaluationService(self.CONF) self.svc.tg = mock.Mock() - self.svc.partition_coordinator = mock.MagicMock() - p_coord = self.svc.partition_coordinator - p_coord.extract_my_subset.side_effect = lambda _, x: x - self.svc.evaluators = self.evaluators - self.svc.supported_evaluators = ['threshold'] + @mock.patch('aodh.storage.get_connection_from_config', + mock.MagicMock()) def _do_test_start(self, test_interval=120, coordination_heartbeat=1.0, coordination_active=False): @@ -59,18 +58,19 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): coordination_heartbeat, group='coordination', enforce_type=True) - with mock.patch('aodh.storage.get_connection_from_config', - return_value=self.storage_conn): - p_coord_mock = self.svc.partition_coordinator - p_coord_mock.is_active.return_value = coordination_active + with mock.patch('aodh.coordination.PartitionCoordinator') as m_pc: + m_pc.return_value.is_active.return_value = coordination_active + + self.addCleanup(self.svc.stop) self.svc.start() - self.svc.partition_coordinator.start.assert_called_once_with() - self.svc.partition_coordinator.join_group.assert_called_once_with( - self.svc.PARTITIONING_GROUP_NAME) - actual = self.svc.tg.add_timer.call_args_list - self.assertEqual([mock.call(604800, mock.ANY)], actual) + self.svc.partition_coordinator.start.assert_called_once_with() + self.svc.partition_coordinator.join_group.assert_called_once_with( + self.svc.PARTITIONING_GROUP_NAME) + + actual = self.svc.tg.add_timer.call_args_list + self.assertEqual([mock.call(604800, mock.ANY)], actual) def test_start_singleton(self): self._do_test_start(coordination_active=False) @@ -82,55 +82,80 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): self._do_test_start(coordination_active=True, test_interval=10, coordination_heartbeat=5) - def test_evaluation_cycle(self): + @mock.patch('stevedore.extension.ExtensionManager') + @mock.patch('aodh.storage.get_connection_from_config') + @mock.patch('aodh.coordination.PartitionCoordinator') + def test_evaluation_cycle(self, m_pc, m_conn, m_em): alarm = mock.Mock(type='threshold') - self.storage_conn.get_alarms.return_value = [alarm] - with mock.patch('aodh.storage.get_connection_from_config', - return_value=self.storage_conn): - p_coord_mock = self.svc.partition_coordinator - p_coord_mock.extract_my_subset.return_value = [alarm] + m_pc.return_value.extract_my_subset.return_value = [alarm] + m_pc.return_value.is_active.return_value = False + m_conn.return_value.get_alarms.return_value = [alarm] + m_em.return_value = self.evaluators + self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None] - self.svc._evaluate_assigned_alarms() + self.addCleanup(self.svc.stop) + self.svc.start() - p_coord_mock.extract_my_subset.assert_called_once_with( - self.svc.PARTITIONING_GROUP_NAME, [alarm]) - self.threshold_eval.evaluate.assert_called_once_with(alarm) + time.sleep(1) + + target = self.svc.partition_coordinator.extract_my_subset + target.assert_called_once_with(self.svc.PARTITIONING_GROUP_NAME, + [alarm]) + self.threshold_eval.evaluate.assert_called_once_with(alarm) + + @mock.patch('stevedore.extension.ExtensionManager') + @mock.patch('aodh.coordination.PartitionCoordinator') + def test_evaluation_cycle_with_bad_alarm(self, m_pc, m_em): + m_pc.return_value.is_active.return_value = False + m_em.return_value = self.evaluators - def test_evaluation_cycle_with_bad_alarm(self): alarms = [ mock.Mock(type='threshold', name='bad'), mock.Mock(type='threshold', name='good'), ] - self.storage_conn.get_alarms.return_value = alarms self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None] - with mock.patch('aodh.storage.get_connection_from_config', - return_value=self.storage_conn): - p_coord_mock = self.svc.partition_coordinator - p_coord_mock.extract_my_subset.return_value = alarms - self.svc._evaluate_assigned_alarms() - self.assertEqual([mock.call(alarms[0]), mock.call(alarms[1])], - self.threshold_eval.evaluate.call_args_list) + with mock.patch.object(self.svc, '_assigned_alarms', + return_value=alarms): + self.addCleanup(self.svc.stop) + self.svc.start() - def test_unknown_extension_skipped(self): + time.sleep(1) + + self.assertEqual([mock.call(alarms[0]), mock.call(alarms[1])], + self.threshold_eval.evaluate.call_args_list) + + @mock.patch('stevedore.extension.ExtensionManager') + def test_unknown_extension_skipped(self, m_em): + m_em.return_value = self.evaluators alarms = [ mock.Mock(type='not_existing_type'), mock.Mock(type='threshold') ] - self.storage_conn.get_alarms.return_value = alarms - with mock.patch('aodh.storage.get_connection_from_config', - return_value=self.storage_conn): + with mock.patch.object(self.svc, '_assigned_alarms', + return_value=alarms): + self.addCleanup(self.svc.stop) self.svc.start() - self.svc._evaluate_assigned_alarms() + + time.sleep(1) + self.threshold_eval.evaluate.assert_called_once_with(alarms[1]) - def test_check_alarm_query_constraints(self): - self.storage_conn.get_alarms.return_value = [] - with mock.patch('aodh.storage.get_connection_from_config', - return_value=self.storage_conn): - self.svc.start() - self.svc._evaluate_assigned_alarms() - expected = [({'enabled': True, 'exclude': {'type': 'event'}},)] - self.assertEqual(expected, - self.storage_conn.get_alarms.call_args_list) + @mock.patch('stevedore.extension.ExtensionManager') + @mock.patch('aodh.coordination.PartitionCoordinator') + @mock.patch('aodh.storage.get_connection_from_config') + def test_check_alarm_query_constraints(self, m_conn, m_pc, m_em): + m_conn.return_value.get_alarms.return_value = [] + m_pc.return_value.extract_my_subset.return_value = [] + m_pc.return_value.is_active.return_value = False + m_em.return_value = self.evaluators + + self.addCleanup(self.svc.start) + self.svc.start() + + time.sleep(1) + + expected = [({'enabled': True, 'exclude': {'type': 'event'}},)] + self.assertEqual(expected, + self.svc.storage_conn.get_alarms.call_args_list) diff --git a/aodh/tests/unit/test_event.py b/aodh/tests/unit/test_event.py index 4ce2a46ed..19de42175 100644 --- a/aodh/tests/unit/test_event.py +++ b/aodh/tests/unit/test_event.py @@ -30,21 +30,23 @@ class TestEventAlarmEvaluationService(tests_base.BaseTestCase): conf = service.prepare_service(argv=[], config_files=[]) self.CONF = self.useFixture(fixture_config.Config(conf)).conf - self.storage_conn = mock.MagicMock() self.setup_messaging(self.CONF) - with mock.patch('aodh.storage.get_connection_from_config', - return_value=self.storage_conn): - self.service = event.EventAlarmEvaluationService(self.CONF) + self.service = event.EventAlarmEvaluationService(self.CONF) + @mock.patch('aodh.storage.get_connection_from_config', + mock.MagicMock()) def test_start_and_stop_service(self): + self.addCleanup(self.service.stop) self.service.start() self.assertIsInstance(self.service.listener, server.MessageHandlingServer) - self.service.stop() + @mock.patch('aodh.storage.get_connection_from_config', + mock.MagicMock()) def test_listener_start_called(self): listener = mock.Mock() with mock.patch('aodh.messaging.get_notification_listener', return_value=listener): + self.addCleanup(self.service.stop) self.service.start() self.assertTrue(listener.start.called) diff --git a/releasenotes/notes/enable-aodh-service-multi-processes-67ed9a0b7fac69aa.yaml b/releasenotes/notes/enable-aodh-service-multi-processes-67ed9a0b7fac69aa.yaml new file mode 100644 index 000000000..e24abad4a --- /dev/null +++ b/releasenotes/notes/enable-aodh-service-multi-processes-67ed9a0b7fac69aa.yaml @@ -0,0 +1,7 @@ +--- +features: + - Enable aodh services, including aodh-evalutor, aodh-listener and + aodh-notifier to run in multiple worker mode. + + New options are introduced corresponsively as [evaluator]workers, + [listener]workers and [notifier]workers. They all default to 1.