diff --git a/aodh/evaluator/event.py b/aodh/evaluator/event.py new file mode 100644 index 000000000..0ed9cec5d --- /dev/null +++ b/aodh/evaluator/event.py @@ -0,0 +1,171 @@ +# +# Copyright 2015 NEC Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import fnmatch +import operator + +from oslo_log import log + +from aodh import evaluator +from aodh.i18n import _, _LE + +LOG = log.getLogger(__name__) + +COMPARATORS = { + 'gt': operator.gt, + 'lt': operator.lt, + 'ge': operator.ge, + 'le': operator.le, + 'eq': operator.eq, + 'ne': operator.ne, +} + + +class EventAlarmEvaluator(evaluator.Evaluator): + + def evaluate_events(self, events): + """Evaluate the events by referring related alarms.""" + + if not isinstance(events, list): + events = [events] + + LOG.debug('Starting event alarm evaluation: #events = %d', + len(events)) + for event in events: + LOG.debug('Evaluating event: event = %s', event) + + if not self._validate(event): + LOG.debug('Aborting evaluation of the event.') + continue + + project = self._get_project(event) + # TODO(r-mibu): cache alarms to reduce DB accesses + alarms = self._storage_conn.get_alarms(enabled=True, + alarm_type='event', + project=project) + LOG.debug('Found %(num)d alarms related to the event ' + '(message_id=%(id)s)', + {'num': len(alarms), 'id': event['message_id']}) + + for alarm in alarms: + try: + self._evaluate_alarm(alarm, event) + except Exception: + LOG.exception(_LE('Failed to evaluate alarm (id=%(a)s) ' + 'triggered by event = %(e)s.'), + {'a': alarm.alarm_id, 'e': event}) + + LOG.debug('Finished event alarm evaluation.') + + @staticmethod + def _validate(event): + """Validate received event has mandatory parameters.""" + + if not event: + LOG.error(_LE('Received invalid event (empty or None)')) + return False + + if not event.get('event_type'): + LOG.error(_LE('Failed to extract event_type from event = %s'), + event) + return False + + if not event.get('message_id'): + LOG.error(_LE('Failed to extract message_id from event = %s'), + event) + return False + + return True + + @staticmethod + def _get_project(event): + """Extract project ID from the event.""" + for trait in event.get('traits') or []: + if trait[0] in (u'tenant_id', u'project_id'): + return trait[2] + return '' + + @staticmethod + def _sanitize(event): + """Change traits format to dict.""" + e = copy.deepcopy(event) + e['traits'] = {t[0]: t[2] for t in event.get('traits', [])} + return e + + def _evaluate_alarm(self, alarm, event): + """Evaluate the alarm by referring the received event. + + This function compares each condition of the alarm on the assumption + that all conditions are combined by AND operator. + When the received event met conditions defined in alarm 'event_type' + and 'query', the alarm will be fired and updated to state='alarm' + (alarmed). + Note: by this evaluator, the alarm won't be changed to state='ok' + nor state='insufficient data'. + """ + + LOG.debug('Evaluating alarm (id=%(a)s) triggered by event ' + '(message_id=%(e)s).', + {'a': alarm.alarm_id, 'e': event['message_id']}) + + if not alarm.repeat_actions and alarm.state == evaluator.ALARM: + LOG.debug('Skip evaluation of the alarm id=%s which have already ' + 'fired.', alarm.alarm_id) + return + + event_pattern = alarm.rule['event_type'] + if not fnmatch.fnmatch(event['event_type'], event_pattern): + LOG.debug('Aborting evaluation of the alarm (id=%s) due to ' + 'uninterested event_type.', alarm.alarm_id) + return + + value = self._sanitize(event) + + def _compare(condition): + op = COMPARATORS[condition.get('op', 'eq')] + v = value + for f in condition['field'].split('.'): + if hasattr(v, 'get'): + v = v.get(f) + else: + break + LOG.debug('Comparing value=%(v)s against condition=%(c)s .', + {'v': v, 'c': condition}) + return op(v, condition['value']) + + for condition in alarm.rule['query']: + if not _compare(condition): + LOG.debug('Aborting evaluation of the alarm due to ' + 'unmet condition=%s .', condition) + return + + self._fire_alarm(alarm, event) + + def _fire_alarm(self, alarm, event): + """Update alarm state and fire alarm via alarm notifier.""" + + state = evaluator.ALARM + reason = (_('Event (message_id=%(message)s) hit the query of alarm ' + '(id=%(alarm)s)') % + {'message': event['message_id'], 'alarm': alarm.alarm_id}) + reason_data = {'type': 'event', 'event': event} + self._refresh(alarm, state, reason, reason_data) + + # NOTE(r-mibu): This method won't be used, but we have to define here in + # order to overwrite the abstract method in the super class. + # TODO(r-mibu): Change the base (common) class design for evaluators. + def evaluate(self, alarm): + pass diff --git a/aodh/tests/evaluator/test_event.py b/aodh/tests/evaluator/test_event.py new file mode 100644 index 000000000..b14bae8a4 --- /dev/null +++ b/aodh/tests/evaluator/test_event.py @@ -0,0 +1,216 @@ +# +# Copyright 2015 NEC Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import uuid + +import mock + +from aodh import evaluator +from aodh.evaluator import event as event_evaluator +from aodh.storage import models +from aodh.tests import constants +from aodh.tests.evaluator import base + + +class TestEventAlarmEvaluate(base.TestEvaluatorBase): + EVALUATOR = event_evaluator.EventAlarmEvaluator + + @staticmethod + def _alarm(**kwargs): + alarm_id = kwargs.get('id') or str(uuid.uuid4()) + return models.Alarm(name=kwargs.get('name', alarm_id), + type='event', + enabled=True, + alarm_id=alarm_id, + description='desc', + state=kwargs.get('state', 'insufficient data'), + severity='critical', + state_timestamp=constants.MIN_DATETIME, + timestamp=constants.MIN_DATETIME, + ok_actions=[], + insufficient_data_actions=[], + alarm_actions=[], + repeat_actions=kwargs.get('repeat', False), + user_id='user', + project_id=kwargs.get('project', ''), + time_constraints=[], + rule=dict(event_type=kwargs.get('event_type', '*'), + query=kwargs.get('query', []))) + + @staticmethod + def _event(**kwargs): + return {'message_id': kwargs.get('id') or str(uuid.uuid4()), + 'event_type': kwargs.get('event_type', 'type0'), + 'traits': kwargs.get('traits', [])} + + def _do_test_event_alarm(self, alarms, events, + expect_project_in_query=None, + expect_alarm_states=None, + expect_alarm_updates=None, + expect_notifications=None): + self.storage_conn.get_alarms.return_value = alarms + + self.evaluator.evaluate_events(events) + + if expect_project_in_query is not None: + self.assertEqual([mock.call(enabled=True, + alarm_type='event', + project=expect_project_in_query)], + self.storage_conn.get_alarms.call_args_list) + if expect_alarm_states is not None: + for expected, alarm in zip(expect_alarm_states, alarms): + self.assertEqual(expected, alarm.state) + if expect_alarm_updates is not None: + self.assertEqual([mock.call(a) for a in expect_alarm_updates], + self.storage_conn.update_alarm.call_args_list) + if expect_notifications is not None: + expected = [] + for n in expect_notifications: + alarm = n['alarm'] + event = n['event'] + previous = n.get('previous', evaluator.UNKNOWN) + reason = ('Event (message_id=%(e)s) hit the query of alarm ' + '(id=%(a)s)' % + {'e': event['message_id'], 'a': alarm.alarm_id}) + data = {'type': 'event', 'event': event} + expected.append(mock.call(alarm, previous, reason, data)) + self.assertEqual(expected, self.notifier.notify.call_args_list) + + def test_fire_alarm_in_the_same_project_id(self): + alarm = self._alarm(project='project1') + event = self._event(traits=[['project_id', 1, 'project1']]) + self._do_test_event_alarm([alarm], [event], + expect_project_in_query='project1', + expect_alarm_states=[evaluator.ALARM], + expect_alarm_updates=[alarm], + expect_notifications=[dict(alarm=alarm, + event=event)]) + + def test_fire_alarm_in_the_same_tenant_id(self): + alarm = self._alarm(project='project1') + event = self._event(traits=[['tenant_id', 1, 'project1']]) + self._do_test_event_alarm([alarm], [event], + expect_project_in_query='project1', + expect_alarm_states=[evaluator.ALARM], + expect_alarm_updates=[alarm], + expect_notifications=[dict(alarm=alarm, + event=event)]) + + def test_fire_alarm_in_project_none(self): + alarm = self._alarm(project='') + event = self._event() + self._do_test_event_alarm([alarm], [event], + expect_project_in_query='', + expect_alarm_states=[evaluator.ALARM], + expect_alarm_updates=[alarm], + expect_notifications=[dict(alarm=alarm, + event=event)]) + + def test_continue_following_evaluation_after_exception(self): + alarms = [ + self._alarm(), + self._alarm(), + ] + event = self._event() + original = event_evaluator.EventAlarmEvaluator._sanitize(event) + with mock.patch.object(event_evaluator.EventAlarmEvaluator, + '_sanitize', + side_effect=[Exception('boom'), original]): + self._do_test_event_alarm(alarms, [event], + expect_alarm_states=[evaluator.UNKNOWN, + evaluator.ALARM], + expect_alarm_updates=[alarms[1]], + expect_notifications=[ + dict(alarm=alarms[1], event=event)]) + + def test_skip_event_missing_event_type(self): + alarm = self._alarm() + event = {'message_id': str(uuid.uuid4()), 'traits': []} + self._do_test_event_alarm([alarm], [event], + expect_alarm_states=[evaluator.UNKNOWN], + expect_alarm_updates=[], + expect_notifications=[]) + + def test_skip_event_missing_message_id(self): + alarm = self._alarm() + event = {'event_type': 'type1', 'traits': []} + self._do_test_event_alarm([alarm], [event], + expect_alarm_states=[evaluator.UNKNOWN], + expect_alarm_updates=[], + expect_notifications=[]) + + def test_continue_alarming_when_repeat_actions_enabled(self): + alarm = self._alarm(repeat=True, state=evaluator.ALARM) + event = self._event() + self._do_test_event_alarm([alarm], [event], + expect_alarm_states=[evaluator.ALARM], + expect_alarm_updates=[], + expect_notifications=[ + dict(alarm=alarm, + event=event, + previous=evaluator.ALARM)]) + + def test_do_not_continue_alarming_when_repeat_actions_disabled(self): + alarm = self._alarm(repeat=False, state=evaluator.ALARM) + event = self._event() + self._do_test_event_alarm([alarm], [event], + expect_alarm_states=[evaluator.ALARM], + expect_alarm_updates=[], + expect_notifications=[]) + + def test_skip_uninterested_event_type(self): + alarm = self._alarm(event_type='compute.instance.exists') + event = self._event(event_type='compute.instance.update') + self._do_test_event_alarm([alarm], [event], + expect_alarm_states=[evaluator.UNKNOWN], + expect_alarm_updates=[], + expect_notifications=[]) + + def test_fire_alarm_event_type_pattern_matched(self): + alarm = self._alarm(event_type='compute.instance.*') + event = self._event(event_type='compute.instance.update') + self._do_test_event_alarm([alarm], [event], + expect_alarm_states=[evaluator.ALARM], + expect_alarm_updates=[alarm], + expect_notifications=[dict(alarm=alarm, + event=event)]) + + def test_skip_event_type_pattern_unmatched(self): + alarm = self._alarm(event_type='compute.instance.*') + event = self._event(event_type='dummy.compute.instance') + self._do_test_event_alarm([alarm], [event], + expect_alarm_states=[evaluator.UNKNOWN], + expect_alarm_updates=[], + expect_notifications=[]) + + def test_fire_alarm_query_matched(self): + alarm = self._alarm(query=[dict(field="traits.state", + value="stopped", + op="eq")]) + event = self._event(traits=[['state', 1, 'stopped']]) + self._do_test_event_alarm([alarm], [event], + expect_alarm_states=[evaluator.ALARM], + expect_alarm_updates=[alarm], + expect_notifications=[dict(alarm=alarm, + event=event)]) + + def test_skip_query_unmatched(self): + alarm = self._alarm(query=[dict(field="traits.state", + value="stopped", + op="eq")]) + event = self._event(traits=[['state', 1, 'active']]) + self._do_test_event_alarm([alarm], [event], + expect_alarm_states=[evaluator.UNKNOWN], + expect_alarm_updates=[], + expect_notifications=[])