diff --git a/aodh/cmd/alarm.py b/aodh/cmd/alarm.py index 6310ca063..3914be13e 100644 --- a/aodh/cmd/alarm.py +++ b/aodh/cmd/alarm.py @@ -15,7 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo_service import service as os_service +import cotyledon from aodh import evaluator as evaluator_svc from aodh import event as event_svc @@ -25,17 +25,23 @@ from aodh import service def notifier(): conf = service.prepare_service() - os_service.launch(conf, notifier_svc.AlarmNotifierService(conf), - workers=conf.notifier.workers).wait() + sm = cotyledon.ServiceManager() + sm.add(notifier_svc.AlarmNotifierService, + workers=conf.notifier.workers, args=(conf,)) + sm.run() def evaluator(): conf = service.prepare_service() - os_service.launch(conf, evaluator_svc.AlarmEvaluationService(conf), - workers=conf.evaluator.workers).wait() + sm = cotyledon.ServiceManager() + sm.add(evaluator_svc.AlarmEvaluationService, + workers=conf.evaluator.workers, args=(conf,)) + sm.run() def listener(): conf = service.prepare_service() - os_service.launch(conf, event_svc.EventAlarmEvaluationService(conf), - workers=conf.listener.workers).wait() + sm = cotyledon.ServiceManager() + sm.add(event_svc.EventAlarmEvaluationService, + workers=conf.listener.workers, args=(conf,)) + sm.run() diff --git a/aodh/evaluator/__init__.py b/aodh/evaluator/__init__.py index 69519fe7e..f5939e400 100644 --- a/aodh/evaluator/__init__.py +++ b/aodh/evaluator/__init__.py @@ -20,11 +20,11 @@ import json import threading from concurrent import futures +import cotyledon import croniter from futurist import periodics from oslo_config import cfg from oslo_log import log -from oslo_service import service as os_service from oslo_utils import timeutils import pytz import six @@ -182,68 +182,34 @@ class Evaluator(object): """ -class AlarmEvaluationService(os_service.Service): +class AlarmEvaluationService(cotyledon.Service): PARTITIONING_GROUP_NAME = "alarm_evaluator" EVALUATOR_EXTENSIONS_NAMESPACE = "aodh.evaluator" - def __init__(self, conf): - super(AlarmEvaluationService, self).__init__() + def __init__(self, worker_id, conf): + super(AlarmEvaluationService, self).__init__(worker_id) self.conf = conf - @property - def _storage_conn(self): - if not self.storage_conn: - self.storage_conn = storage.get_connection_from_config(self.conf) - return self.storage_conn + ef = lambda: futures.ThreadPoolExecutor(max_workers=10) + self.periodic = periodics.PeriodicWorker.create( + [], executor_factory=ef) - def _load_evaluators(self): self.evaluators = extension.ExtensionManager( namespace=self.EVALUATOR_EXTENSIONS_NAMESPACE, invoke_on_load=True, invoke_args=(self.conf,) ) + self.storage_conn = storage.get_connection_from_config(self.conf) - def _evaluate_assigned_alarms(self): - try: - alarms = self._assigned_alarms() - LOG.info(_('initiating evaluation cycle on %d alarms') % - len(alarms)) - for alarm in alarms: - self._evaluate_alarm(alarm) - except Exception: - LOG.exception(_('alarm evaluation cycle failed')) - - def _evaluate_alarm(self, alarm): - """Evaluate the alarms assigned to this evaluator.""" - if alarm.type not in self.evaluators: - LOG.debug('skipping alarm %s: type unsupported', alarm.alarm_id) - return - - LOG.debug('evaluating alarm %s', alarm.alarm_id) - try: - self.evaluators[alarm.type].obj.evaluate(alarm) - except Exception: - LOG.exception(_('Failed to evaluate alarm %s'), alarm.alarm_id) - - def start(self): - super(AlarmEvaluationService, self).start() - - self.storage_conn = None - self._load_evaluators() self.partition_coordinator = coordination.PartitionCoordinator( self.conf) - self.partition_coordinator.start() self.partition_coordinator.join_group(self.PARTITIONING_GROUP_NAME) # allow time for coordination if necessary delay_start = self.partition_coordinator.is_active() - ef = lambda: futures.ThreadPoolExecutor(max_workers=10) - self.periodic = periodics.PeriodicWorker.create( - [], executor_factory=ef) - if self.evaluators: @periodics.periodic(spacing=self.conf.evaluation_interval, run_immediately=not delay_start) @@ -267,25 +233,39 @@ class AlarmEvaluationService(os_service.Service): t.daemon = True t.start() - # NOTE(sileht): We have to drop eventlet to drop this last eventlet - # thread - # Add a dummy thread to have wait() working - self.tg.add_timer(604800, lambda: None) + def terminate(self): + self.periodic.stop() + self.partition_coordinator.stop() + self.periodic.wait() - def stop(self): - if getattr(self, 'periodic', None): - self.periodic.stop() - self.periodic.wait() - if getattr(self, 'partition_coordinator', None): - self.partition_coordinator.stop() - super(AlarmEvaluationService, self).stop() + def _evaluate_assigned_alarms(self): + try: + alarms = self._assigned_alarms() + LOG.info(_('initiating evaluation cycle on %d alarms') % + len(alarms)) + for alarm in alarms: + self._evaluate_alarm(alarm) + except Exception: + LOG.exception(_('alarm evaluation cycle failed')) + + def _evaluate_alarm(self, alarm): + """Evaluate the alarms assigned to this evaluator.""" + if alarm.type not in self.evaluators: + LOG.debug('skipping alarm %s: type unsupported', alarm.alarm_id) + return + + LOG.debug('evaluating alarm %s', alarm.alarm_id) + try: + self.evaluators[alarm.type].obj.evaluate(alarm) + except Exception: + LOG.exception(_('Failed to evaluate alarm %s'), alarm.alarm_id) def _assigned_alarms(self): # NOTE(r-mibu): The 'event' type alarms will be evaluated by the # event-driven alarm evaluator, so this periodical evaluator skips # those alarms. - all_alarms = self._storage_conn.get_alarms(enabled=True, - exclude=dict(type='event')) + all_alarms = self.storage_conn.get_alarms(enabled=True, + exclude=dict(type='event')) all_alarms = list(all_alarms) all_alarm_ids = [a.alarm_id for a in all_alarms] selected = self.partition_coordinator.extract_my_subset( diff --git a/aodh/event.py b/aodh/event.py index f7117d2b4..a64e29a82 100644 --- a/aodh/event.py +++ b/aodh/event.py @@ -13,10 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +import cotyledon from oslo_config import cfg from oslo_log import log import oslo_messaging -from oslo_service import service from aodh.evaluator import event from aodh import messaging @@ -51,14 +51,10 @@ class EventAlarmEndpoint(object): self.evaluator.evaluate_events(notification['payload']) -class EventAlarmEvaluationService(service.Service): - - def __init__(self, conf): - super(EventAlarmEvaluationService, self).__init__() +class EventAlarmEvaluationService(cotyledon.Service): + def __init__(self, worker_id, conf): + super(EventAlarmEvaluationService, self).__init__(worker_id) self.conf = conf - - def start(self): - super(EventAlarmEvaluationService, self).start() self.storage_conn = storage.get_connection_from_config(self.conf) self.evaluator = event.EventAlarmEvaluator(self.conf) self.listener = messaging.get_batch_notification_listener( @@ -69,11 +65,7 @@ class EventAlarmEvaluationService(service.Service): self.conf.listener.batch_size, self.conf.listener.batch_timeout) self.listener.start() - # Add a dummy thread to have wait() working - self.tg.add_timer(604800, lambda: None) - def stop(self): - if getattr(self, 'listener', None): - self.listener.stop() - self.listener.wait() - super(EventAlarmEvaluationService, self).stop() + def terminate(self): + self.listener.stop() + self.listener.wait() diff --git a/aodh/notifier/__init__.py b/aodh/notifier/__init__.py index bd38bf885..29afa3e50 100644 --- a/aodh/notifier/__init__.py +++ b/aodh/notifier/__init__.py @@ -15,10 +15,10 @@ import abc +import cotyledon from oslo_config import cfg from oslo_log import log import oslo_messaging -from oslo_service import service as os_service from oslo_utils import netutils import six from stevedore import extension @@ -66,15 +66,11 @@ class AlarmNotifier(object): """ -class AlarmNotifierService(os_service.Service): +class AlarmNotifierService(cotyledon.Service): NOTIFIER_EXTENSIONS_NAMESPACE = "aodh.notifier" - def __init__(self, conf): - super(AlarmNotifierService, self).__init__() + def __init__(self, worker_id, conf): self.conf = conf - - def start(self): - super(AlarmNotifierService, self).start() transport = messaging.get_transport(self.conf) self.notifiers = extension.ExtensionManager( self.NOTIFIER_EXTENSIONS_NAMESPACE, @@ -86,14 +82,10 @@ class AlarmNotifierService(os_service.Service): transport, [target], [AlarmEndpoint(self.notifiers)], False, self.conf.notifier.batch_size, self.conf.notifier.batch_timeout) self.listener.start() - # Add a dummy thread to have wait() working - self.tg.add_timer(604800, lambda: None) - def stop(self): - if getattr(self, 'listener', None): - self.listener.stop() - self.listener.wait() - super(AlarmNotifierService, self).stop() + def terminate(self): + self.listener.stop() + self.listener.wait() class AlarmEndpoint(object): diff --git a/aodh/tests/unit/test_evaluator.py b/aodh/tests/unit/test_evaluator.py index c52bec63c..f77002e12 100644 --- a/aodh/tests/unit/test_evaluator.py +++ b/aodh/tests/unit/test_evaluator.py @@ -18,6 +18,7 @@ import time import mock from oslo_config import fixture as fixture_config +from oslotest import mockpatch from stevedore import extension from aodh import evaluator @@ -34,7 +35,9 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): self.setup_messaging(self.CONF) self.threshold_eval = mock.MagicMock() - self.evaluators = extension.ExtensionManager.make_test_instance( + self._fake_conn = mock.Mock() + self._fake_pc = mock.Mock() + self._fake_em = extension.ExtensionManager.make_test_instance( [ extension.Extension( 'threshold', @@ -44,14 +47,23 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): ] ) - self.svc = evaluator.AlarmEvaluationService(self.CONF) - self.svc.tg = mock.Mock() + self.useFixture(mockpatch.Patch( + 'stevedore.extension.ExtensionManager', + return_value=self._fake_em + )) + self.useFixture(mockpatch.Patch( + 'aodh.coordination.PartitionCoordinator', + return_value=self._fake_pc + )) + self.useFixture(mockpatch.Patch( + 'aodh.storage.get_connection_from_config', + return_value=self._fake_conn + )) - @mock.patch('aodh.storage.get_connection_from_config', - mock.MagicMock()) def _do_test_start(self, test_interval=120, coordination_heartbeat=1.0, coordination_active=False): + self.CONF.set_override('evaluation_interval', test_interval) self.CONF.set_override('heartbeat', @@ -59,18 +71,14 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): group='coordination', enforce_type=True) - with mock.patch('aodh.coordination.PartitionCoordinator') as m_pc: - m_pc.return_value.is_active.return_value = coordination_active + self._fake_pc.is_active.return_value = coordination_active - self.svc.start() - self.svc.stop() - - self.svc.partition_coordinator.start.assert_called_once_with() - self.svc.partition_coordinator.join_group.assert_called_once_with( - self.svc.PARTITIONING_GROUP_NAME) - - actual = self.svc.tg.add_timer.call_args_list - self.assertEqual([mock.call(604800, mock.ANY)], actual) + svc = evaluator.AlarmEvaluationService(0, self.CONF) + self.addCleanup(svc.terminate) + svc.terminate() + svc.partition_coordinator.start.assert_called_once_with() + svc.partition_coordinator.join_group.assert_called_once_with( + svc.PARTITIONING_GROUP_NAME) def test_start_singleton(self): self._do_test_start(coordination_active=False) @@ -82,78 +90,62 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): self._do_test_start(coordination_active=True, test_interval=10, coordination_heartbeat=5) - @mock.patch('stevedore.extension.ExtensionManager') - @mock.patch('aodh.storage.get_connection_from_config') - @mock.patch('aodh.coordination.PartitionCoordinator') - def test_evaluation_cycle(self, m_pc, m_conn, m_em): + def test_evaluation_cycle(self): alarm = mock.Mock(type='threshold', alarm_id="alarm_id1") - m_pc.return_value.extract_my_subset.return_value = ["alarm_id1"] - m_pc.return_value.is_active.return_value = False - m_conn.return_value.get_alarms.return_value = [alarm] - m_em.return_value = self.evaluators + self._fake_pc.extract_my_subset.return_value = ["alarm_id1"] + self._fake_pc.is_active.return_value = False + self._fake_conn.get_alarms.return_value = [alarm] self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None] - self.addCleanup(self.svc.stop) - self.svc.start() - + svc = evaluator.AlarmEvaluationService(0, self.CONF) + self.addCleanup(svc.terminate) time.sleep(1) - - target = self.svc.partition_coordinator.extract_my_subset - target.assert_called_once_with(self.svc.PARTITIONING_GROUP_NAME, + target = svc.partition_coordinator.extract_my_subset + target.assert_called_once_with(svc.PARTITIONING_GROUP_NAME, ["alarm_id1"]) self.threshold_eval.evaluate.assert_called_once_with(alarm) - @mock.patch('stevedore.extension.ExtensionManager') - @mock.patch('aodh.coordination.PartitionCoordinator') - def test_evaluation_cycle_with_bad_alarm(self, m_pc, m_em): - m_pc.return_value.is_active.return_value = False - m_em.return_value = self.evaluators + def test_evaluation_cycle_with_bad_alarm(self): alarms = [ - mock.Mock(type='threshold', name='bad'), - mock.Mock(type='threshold', name='good'), + mock.Mock(type='threshold', name='bad', alarm_id='a'), + mock.Mock(type='threshold', name='good', alarm_id='b'), ] self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None] - with mock.patch.object(self.svc, '_assigned_alarms', - return_value=alarms): - self.addCleanup(self.svc.stop) - self.svc.start() - - time.sleep(1) + self._fake_pc.is_active.return_value = False + self._fake_pc.extract_my_subset.return_value = ['a', 'b'] + self._fake_conn.get_alarms.return_value = alarms + svc = evaluator.AlarmEvaluationService(0, self.CONF) + self.addCleanup(svc.terminate) + time.sleep(1) self.assertEqual([mock.call(alarms[0]), mock.call(alarms[1])], self.threshold_eval.evaluate.call_args_list) - @mock.patch('stevedore.extension.ExtensionManager') - def test_unknown_extension_skipped(self, m_em): - m_em.return_value = self.evaluators + def test_unknown_extension_skipped(self): alarms = [ - mock.Mock(type='not_existing_type'), - mock.Mock(type='threshold') + mock.Mock(type='not_existing_type', alarm_id='a'), + mock.Mock(type='threshold', alarm_id='b') ] - with mock.patch.object(self.svc, '_assigned_alarms', - return_value=alarms): - self.addCleanup(self.svc.stop) - self.svc.start() - time.sleep(1) - self.threshold_eval.evaluate.assert_called_once_with(alarms[1]) - - @mock.patch('stevedore.extension.ExtensionManager') - @mock.patch('aodh.coordination.PartitionCoordinator') - @mock.patch('aodh.storage.get_connection_from_config') - def test_check_alarm_query_constraints(self, m_conn, m_pc, m_em): - m_conn.return_value.get_alarms.return_value = [] - m_pc.return_value.extract_my_subset.return_value = [] - m_pc.return_value.is_active.return_value = False - m_em.return_value = self.evaluators - - self.addCleanup(self.svc.stop) - self.svc.start() + self._fake_pc.is_active.return_value = False + self._fake_pc.extract_my_subset.return_value = ['a', 'b'] + self._fake_conn.get_alarms.return_value = alarms + svc = evaluator.AlarmEvaluationService(0, self.CONF) + self.addCleanup(svc.terminate) time.sleep(1) + self.threshold_eval.evaluate.assert_called_once_with(alarms[1]) + def test_check_alarm_query_constraints(self): + self._fake_conn.get_alarms.return_value = [] + self._fake_pc.extract_my_subset.return_value = [] + self._fake_pc.is_active.return_value = False + + svc = evaluator.AlarmEvaluationService(0, self.CONF) + self.addCleanup(svc.terminate) + time.sleep(1) expected = [({'enabled': True, 'exclude': {'type': 'event'}},)] self.assertEqual(expected, - self.svc.storage_conn.get_alarms.call_args_list) + svc.storage_conn.get_alarms.call_args_list) diff --git a/aodh/tests/unit/test_event.py b/aodh/tests/unit/test_event.py index bea1062a0..2a85b9737 100644 --- a/aodh/tests/unit/test_event.py +++ b/aodh/tests/unit/test_event.py @@ -18,7 +18,6 @@ import time from oslo_config import fixture as fixture_config import oslo_messaging -from oslo_messaging import server from aodh import event from aodh import service @@ -29,41 +28,21 @@ class TestEventAlarmEvaluationService(tests_base.BaseTestCase): def setUp(self): super(TestEventAlarmEvaluationService, self).setUp() - conf = service.prepare_service(argv=[], config_files=[]) self.CONF = self.useFixture(fixture_config.Config(conf)).conf + self.CONF.set_override("batch_size", 2, 'listener') self.setup_messaging(self.CONF) - self.service = event.EventAlarmEvaluationService(self.CONF) - self._msg_notifier = oslo_messaging.Notifier( + + @mock.patch('aodh.storage.get_connection_from_config', + mock.MagicMock()) + @mock.patch('aodh.event.EventAlarmEndpoint.sample') + def test_batch_event_listener(self, mocked): + msg_notifier = oslo_messaging.Notifier( self.transport, topics=['alarm.all'], driver='messaging', publisher_id='test-publisher') - @mock.patch('aodh.storage.get_connection_from_config', - mock.MagicMock()) - def test_start_and_stop_service(self): - self.addCleanup(self.service.stop) - self.service.start() - self.assertIsInstance(self.service.listener, - server.MessageHandlingServer) - - @mock.patch('aodh.storage.get_connection_from_config', - mock.MagicMock()) - def test_listener_start_called(self): - listener = mock.Mock() - with mock.patch('aodh.messaging.get_batch_notification_listener', - return_value=listener): - self.addCleanup(self.service.stop) - self.service.start() - self.assertTrue(listener.start.called) - - @mock.patch('aodh.event.EventAlarmEndpoint.sample') - def test_batch_event_listener(self, mocked): received_events = [] mocked.side_effect = lambda msg: received_events.append(msg) - self.CONF.set_override("batch_size", 2, 'listener') - with mock.patch('aodh.storage.get_connection_from_config'): - self.svc = event.EventAlarmEvaluationService(self.CONF) - self.svc.start() event1 = {'event_type': 'compute.instance.update', 'traits': ['foo', 'bar'], 'message_id': '20d03d17-4aba-4900-a179-dba1281a3451', @@ -72,9 +51,12 @@ class TestEventAlarmEvaluationService(tests_base.BaseTestCase): 'traits': ['foo', 'bar'], 'message_id': '20d03d17-4aba-4900-a179-dba1281a3452', 'generated': '2016-04-23T06:50:23.622739'} - self._msg_notifier.sample({}, 'event', event1) - self._msg_notifier.sample({}, 'event', event2) + msg_notifier.sample({}, 'event', event1) + msg_notifier.sample({}, 'event', event2) + + svc = event.EventAlarmEvaluationService(0, self.CONF) + self.addCleanup(svc.terminate) + time.sleep(1) self.assertEqual(1, len(received_events)) self.assertEqual(2, len(received_events[0])) - self.svc.stop() diff --git a/aodh/tests/unit/test_notifier.py b/aodh/tests/unit/test_notifier.py index c37cde0b5..e842b3551 100644 --- a/aodh/tests/unit/test_notifier.py +++ b/aodh/tests/unit/test_notifier.py @@ -51,9 +51,8 @@ class TestAlarmNotifierService(tests_base.BaseTestCase): self.setup_messaging(self.CONF) def test_init_host_queue(self): - self.service = notifier.AlarmNotifierService(self.CONF) - self.service.start() - self.service.stop() + self.service = notifier.AlarmNotifierService(0, self.CONF) + self.service.terminate() class TestAlarmNotifier(tests_base.BaseTestCase): @@ -69,9 +68,8 @@ class TestAlarmNotifier(tests_base.BaseTestCase): self.useFixture(mockpatch.Patch( 'aodh.notifier.zaqar.ZaqarAlarmNotifier.get_zaqar_client', return_value=self.zaqar)) - self.service = notifier.AlarmNotifierService(self.CONF) - self.service.start() - self.addCleanup(self.service.stop) + self.service = notifier.AlarmNotifierService(0, self.CONF) + self.addCleanup(self.service.terminate) def test_notify_alarm(self): data = { @@ -120,11 +118,11 @@ class TestAlarmNotifier(tests_base.BaseTestCase): 'reason': 'Everything is fine', 'reason_data': {'fine': 'fine'} } - self.service.stop() + self.service.terminate() self.CONF.set_override("batch_size", 2, 'notifier') # Init a new service with new configuration - self.svc = notifier.AlarmNotifierService(self.CONF) - self.svc.start() + self.svc = notifier.AlarmNotifierService(0, self.CONF) + self.addCleanup(self.svc.terminate) self._msg_notifier.sample({}, 'alarm.update', data1) self._msg_notifier.sample({}, 'alarm.update', data2) time.sleep(1) @@ -150,7 +148,6 @@ class TestAlarmNotifier(tests_base.BaseTestCase): notifications[1]) self.assertEqual(mock.call('Received %s messages in batch.', 2), logger.call_args_list[0]) - self.svc.stop() @staticmethod def _notification(action): diff --git a/etc/aodh/aodh-config-generator.conf b/etc/aodh/aodh-config-generator.conf index c2f6f4557..576b45dd4 100644 --- a/etc/aodh/aodh-config-generator.conf +++ b/etc/aodh/aodh-config-generator.conf @@ -8,5 +8,4 @@ namespace = oslo.log namespace = oslo.messaging namespace = oslo.middleware.cors namespace = oslo.policy -namespace = oslo.service.service namespace = keystonemiddleware.auth_token diff --git a/requirements.txt b/requirements.txt index 437daafec..e8bdf7ec2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,6 @@ oslo.config>=2.6.0 # Apache-2.0 oslo.i18n>=1.5.0 # Apache-2.0 oslo.log>=1.2.0 # Apache-2.0 oslo.policy>=0.5.0 # Apache-2.0 -oslo.service>=0.1.0 # Apache-2.0 PasteDeploy>=1.5.0 pbr<2.0,>=0.11 pecan>=0.8.0 @@ -33,3 +32,4 @@ tooz>=1.28.0 # Apache-2.0 Werkzeug>=0.7 # BSD License WebOb>=1.2.3 WSME>=0.8 +cotyledon