Merge "Add evaluator for event alarm"
This commit is contained in:
commit
7aceda2c53
171
aodh/evaluator/event.py
Normal file
171
aodh/evaluator/event.py
Normal file
@ -0,0 +1,171 @@
|
||||
#
|
||||
# Copyright 2015 NEC Corporation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import fnmatch
|
||||
import operator
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from aodh import evaluator
|
||||
from aodh.i18n import _, _LE
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
COMPARATORS = {
|
||||
'gt': operator.gt,
|
||||
'lt': operator.lt,
|
||||
'ge': operator.ge,
|
||||
'le': operator.le,
|
||||
'eq': operator.eq,
|
||||
'ne': operator.ne,
|
||||
}
|
||||
|
||||
|
||||
class EventAlarmEvaluator(evaluator.Evaluator):
|
||||
|
||||
def evaluate_events(self, events):
|
||||
"""Evaluate the events by referring related alarms."""
|
||||
|
||||
if not isinstance(events, list):
|
||||
events = [events]
|
||||
|
||||
LOG.debug('Starting event alarm evaluation: #events = %d',
|
||||
len(events))
|
||||
for event in events:
|
||||
LOG.debug('Evaluating event: event = %s', event)
|
||||
|
||||
if not self._validate(event):
|
||||
LOG.debug('Aborting evaluation of the event.')
|
||||
continue
|
||||
|
||||
project = self._get_project(event)
|
||||
# TODO(r-mibu): cache alarms to reduce DB accesses
|
||||
alarms = self._storage_conn.get_alarms(enabled=True,
|
||||
alarm_type='event',
|
||||
project=project)
|
||||
LOG.debug('Found %(num)d alarms related to the event '
|
||||
'(message_id=%(id)s)',
|
||||
{'num': len(alarms), 'id': event['message_id']})
|
||||
|
||||
for alarm in alarms:
|
||||
try:
|
||||
self._evaluate_alarm(alarm, event)
|
||||
except Exception:
|
||||
LOG.exception(_LE('Failed to evaluate alarm (id=%(a)s) '
|
||||
'triggered by event = %(e)s.'),
|
||||
{'a': alarm.alarm_id, 'e': event})
|
||||
|
||||
LOG.debug('Finished event alarm evaluation.')
|
||||
|
||||
@staticmethod
|
||||
def _validate(event):
|
||||
"""Validate received event has mandatory parameters."""
|
||||
|
||||
if not event:
|
||||
LOG.error(_LE('Received invalid event (empty or None)'))
|
||||
return False
|
||||
|
||||
if not event.get('event_type'):
|
||||
LOG.error(_LE('Failed to extract event_type from event = %s'),
|
||||
event)
|
||||
return False
|
||||
|
||||
if not event.get('message_id'):
|
||||
LOG.error(_LE('Failed to extract message_id from event = %s'),
|
||||
event)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _get_project(event):
|
||||
"""Extract project ID from the event."""
|
||||
for trait in event.get('traits') or []:
|
||||
if trait[0] in (u'tenant_id', u'project_id'):
|
||||
return trait[2]
|
||||
return ''
|
||||
|
||||
@staticmethod
|
||||
def _sanitize(event):
|
||||
"""Change traits format to dict."""
|
||||
e = copy.deepcopy(event)
|
||||
e['traits'] = {t[0]: t[2] for t in event.get('traits', [])}
|
||||
return e
|
||||
|
||||
def _evaluate_alarm(self, alarm, event):
|
||||
"""Evaluate the alarm by referring the received event.
|
||||
|
||||
This function compares each condition of the alarm on the assumption
|
||||
that all conditions are combined by AND operator.
|
||||
When the received event met conditions defined in alarm 'event_type'
|
||||
and 'query', the alarm will be fired and updated to state='alarm'
|
||||
(alarmed).
|
||||
Note: by this evaluator, the alarm won't be changed to state='ok'
|
||||
nor state='insufficient data'.
|
||||
"""
|
||||
|
||||
LOG.debug('Evaluating alarm (id=%(a)s) triggered by event '
|
||||
'(message_id=%(e)s).',
|
||||
{'a': alarm.alarm_id, 'e': event['message_id']})
|
||||
|
||||
if not alarm.repeat_actions and alarm.state == evaluator.ALARM:
|
||||
LOG.debug('Skip evaluation of the alarm id=%s which have already '
|
||||
'fired.', alarm.alarm_id)
|
||||
return
|
||||
|
||||
event_pattern = alarm.rule['event_type']
|
||||
if not fnmatch.fnmatch(event['event_type'], event_pattern):
|
||||
LOG.debug('Aborting evaluation of the alarm (id=%s) due to '
|
||||
'uninterested event_type.', alarm.alarm_id)
|
||||
return
|
||||
|
||||
value = self._sanitize(event)
|
||||
|
||||
def _compare(condition):
|
||||
op = COMPARATORS[condition.get('op', 'eq')]
|
||||
v = value
|
||||
for f in condition['field'].split('.'):
|
||||
if hasattr(v, 'get'):
|
||||
v = v.get(f)
|
||||
else:
|
||||
break
|
||||
LOG.debug('Comparing value=%(v)s against condition=%(c)s .',
|
||||
{'v': v, 'c': condition})
|
||||
return op(v, condition['value'])
|
||||
|
||||
for condition in alarm.rule['query']:
|
||||
if not _compare(condition):
|
||||
LOG.debug('Aborting evaluation of the alarm due to '
|
||||
'unmet condition=%s .', condition)
|
||||
return
|
||||
|
||||
self._fire_alarm(alarm, event)
|
||||
|
||||
def _fire_alarm(self, alarm, event):
|
||||
"""Update alarm state and fire alarm via alarm notifier."""
|
||||
|
||||
state = evaluator.ALARM
|
||||
reason = (_('Event (message_id=%(message)s) hit the query of alarm '
|
||||
'(id=%(alarm)s)') %
|
||||
{'message': event['message_id'], 'alarm': alarm.alarm_id})
|
||||
reason_data = {'type': 'event', 'event': event}
|
||||
self._refresh(alarm, state, reason, reason_data)
|
||||
|
||||
# NOTE(r-mibu): This method won't be used, but we have to define here in
|
||||
# order to overwrite the abstract method in the super class.
|
||||
# TODO(r-mibu): Change the base (common) class design for evaluators.
|
||||
def evaluate(self, alarm):
|
||||
pass
|
216
aodh/tests/evaluator/test_event.py
Normal file
216
aodh/tests/evaluator/test_event.py
Normal file
@ -0,0 +1,216 @@
|
||||
#
|
||||
# Copyright 2015 NEC Corporation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
|
||||
from aodh import evaluator
|
||||
from aodh.evaluator import event as event_evaluator
|
||||
from aodh.storage import models
|
||||
from aodh.tests import constants
|
||||
from aodh.tests.evaluator import base
|
||||
|
||||
|
||||
class TestEventAlarmEvaluate(base.TestEvaluatorBase):
|
||||
EVALUATOR = event_evaluator.EventAlarmEvaluator
|
||||
|
||||
@staticmethod
|
||||
def _alarm(**kwargs):
|
||||
alarm_id = kwargs.get('id') or str(uuid.uuid4())
|
||||
return models.Alarm(name=kwargs.get('name', alarm_id),
|
||||
type='event',
|
||||
enabled=True,
|
||||
alarm_id=alarm_id,
|
||||
description='desc',
|
||||
state=kwargs.get('state', 'insufficient data'),
|
||||
severity='critical',
|
||||
state_timestamp=constants.MIN_DATETIME,
|
||||
timestamp=constants.MIN_DATETIME,
|
||||
ok_actions=[],
|
||||
insufficient_data_actions=[],
|
||||
alarm_actions=[],
|
||||
repeat_actions=kwargs.get('repeat', False),
|
||||
user_id='user',
|
||||
project_id=kwargs.get('project', ''),
|
||||
time_constraints=[],
|
||||
rule=dict(event_type=kwargs.get('event_type', '*'),
|
||||
query=kwargs.get('query', [])))
|
||||
|
||||
@staticmethod
|
||||
def _event(**kwargs):
|
||||
return {'message_id': kwargs.get('id') or str(uuid.uuid4()),
|
||||
'event_type': kwargs.get('event_type', 'type0'),
|
||||
'traits': kwargs.get('traits', [])}
|
||||
|
||||
def _do_test_event_alarm(self, alarms, events,
|
||||
expect_project_in_query=None,
|
||||
expect_alarm_states=None,
|
||||
expect_alarm_updates=None,
|
||||
expect_notifications=None):
|
||||
self.storage_conn.get_alarms.return_value = alarms
|
||||
|
||||
self.evaluator.evaluate_events(events)
|
||||
|
||||
if expect_project_in_query is not None:
|
||||
self.assertEqual([mock.call(enabled=True,
|
||||
alarm_type='event',
|
||||
project=expect_project_in_query)],
|
||||
self.storage_conn.get_alarms.call_args_list)
|
||||
if expect_alarm_states is not None:
|
||||
for expected, alarm in zip(expect_alarm_states, alarms):
|
||||
self.assertEqual(expected, alarm.state)
|
||||
if expect_alarm_updates is not None:
|
||||
self.assertEqual([mock.call(a) for a in expect_alarm_updates],
|
||||
self.storage_conn.update_alarm.call_args_list)
|
||||
if expect_notifications is not None:
|
||||
expected = []
|
||||
for n in expect_notifications:
|
||||
alarm = n['alarm']
|
||||
event = n['event']
|
||||
previous = n.get('previous', evaluator.UNKNOWN)
|
||||
reason = ('Event (message_id=%(e)s) hit the query of alarm '
|
||||
'(id=%(a)s)' %
|
||||
{'e': event['message_id'], 'a': alarm.alarm_id})
|
||||
data = {'type': 'event', 'event': event}
|
||||
expected.append(mock.call(alarm, previous, reason, data))
|
||||
self.assertEqual(expected, self.notifier.notify.call_args_list)
|
||||
|
||||
def test_fire_alarm_in_the_same_project_id(self):
|
||||
alarm = self._alarm(project='project1')
|
||||
event = self._event(traits=[['project_id', 1, 'project1']])
|
||||
self._do_test_event_alarm([alarm], [event],
|
||||
expect_project_in_query='project1',
|
||||
expect_alarm_states=[evaluator.ALARM],
|
||||
expect_alarm_updates=[alarm],
|
||||
expect_notifications=[dict(alarm=alarm,
|
||||
event=event)])
|
||||
|
||||
def test_fire_alarm_in_the_same_tenant_id(self):
|
||||
alarm = self._alarm(project='project1')
|
||||
event = self._event(traits=[['tenant_id', 1, 'project1']])
|
||||
self._do_test_event_alarm([alarm], [event],
|
||||
expect_project_in_query='project1',
|
||||
expect_alarm_states=[evaluator.ALARM],
|
||||
expect_alarm_updates=[alarm],
|
||||
expect_notifications=[dict(alarm=alarm,
|
||||
event=event)])
|
||||
|
||||
def test_fire_alarm_in_project_none(self):
|
||||
alarm = self._alarm(project='')
|
||||
event = self._event()
|
||||
self._do_test_event_alarm([alarm], [event],
|
||||
expect_project_in_query='',
|
||||
expect_alarm_states=[evaluator.ALARM],
|
||||
expect_alarm_updates=[alarm],
|
||||
expect_notifications=[dict(alarm=alarm,
|
||||
event=event)])
|
||||
|
||||
def test_continue_following_evaluation_after_exception(self):
|
||||
alarms = [
|
||||
self._alarm(),
|
||||
self._alarm(),
|
||||
]
|
||||
event = self._event()
|
||||
original = event_evaluator.EventAlarmEvaluator._sanitize(event)
|
||||
with mock.patch.object(event_evaluator.EventAlarmEvaluator,
|
||||
'_sanitize',
|
||||
side_effect=[Exception('boom'), original]):
|
||||
self._do_test_event_alarm(alarms, [event],
|
||||
expect_alarm_states=[evaluator.UNKNOWN,
|
||||
evaluator.ALARM],
|
||||
expect_alarm_updates=[alarms[1]],
|
||||
expect_notifications=[
|
||||
dict(alarm=alarms[1], event=event)])
|
||||
|
||||
def test_skip_event_missing_event_type(self):
|
||||
alarm = self._alarm()
|
||||
event = {'message_id': str(uuid.uuid4()), 'traits': []}
|
||||
self._do_test_event_alarm([alarm], [event],
|
||||
expect_alarm_states=[evaluator.UNKNOWN],
|
||||
expect_alarm_updates=[],
|
||||
expect_notifications=[])
|
||||
|
||||
def test_skip_event_missing_message_id(self):
|
||||
alarm = self._alarm()
|
||||
event = {'event_type': 'type1', 'traits': []}
|
||||
self._do_test_event_alarm([alarm], [event],
|
||||
expect_alarm_states=[evaluator.UNKNOWN],
|
||||
expect_alarm_updates=[],
|
||||
expect_notifications=[])
|
||||
|
||||
def test_continue_alarming_when_repeat_actions_enabled(self):
|
||||
alarm = self._alarm(repeat=True, state=evaluator.ALARM)
|
||||
event = self._event()
|
||||
self._do_test_event_alarm([alarm], [event],
|
||||
expect_alarm_states=[evaluator.ALARM],
|
||||
expect_alarm_updates=[],
|
||||
expect_notifications=[
|
||||
dict(alarm=alarm,
|
||||
event=event,
|
||||
previous=evaluator.ALARM)])
|
||||
|
||||
def test_do_not_continue_alarming_when_repeat_actions_disabled(self):
|
||||
alarm = self._alarm(repeat=False, state=evaluator.ALARM)
|
||||
event = self._event()
|
||||
self._do_test_event_alarm([alarm], [event],
|
||||
expect_alarm_states=[evaluator.ALARM],
|
||||
expect_alarm_updates=[],
|
||||
expect_notifications=[])
|
||||
|
||||
def test_skip_uninterested_event_type(self):
|
||||
alarm = self._alarm(event_type='compute.instance.exists')
|
||||
event = self._event(event_type='compute.instance.update')
|
||||
self._do_test_event_alarm([alarm], [event],
|
||||
expect_alarm_states=[evaluator.UNKNOWN],
|
||||
expect_alarm_updates=[],
|
||||
expect_notifications=[])
|
||||
|
||||
def test_fire_alarm_event_type_pattern_matched(self):
|
||||
alarm = self._alarm(event_type='compute.instance.*')
|
||||
event = self._event(event_type='compute.instance.update')
|
||||
self._do_test_event_alarm([alarm], [event],
|
||||
expect_alarm_states=[evaluator.ALARM],
|
||||
expect_alarm_updates=[alarm],
|
||||
expect_notifications=[dict(alarm=alarm,
|
||||
event=event)])
|
||||
|
||||
def test_skip_event_type_pattern_unmatched(self):
|
||||
alarm = self._alarm(event_type='compute.instance.*')
|
||||
event = self._event(event_type='dummy.compute.instance')
|
||||
self._do_test_event_alarm([alarm], [event],
|
||||
expect_alarm_states=[evaluator.UNKNOWN],
|
||||
expect_alarm_updates=[],
|
||||
expect_notifications=[])
|
||||
|
||||
def test_fire_alarm_query_matched(self):
|
||||
alarm = self._alarm(query=[dict(field="traits.state",
|
||||
value="stopped",
|
||||
op="eq")])
|
||||
event = self._event(traits=[['state', 1, 'stopped']])
|
||||
self._do_test_event_alarm([alarm], [event],
|
||||
expect_alarm_states=[evaluator.ALARM],
|
||||
expect_alarm_updates=[alarm],
|
||||
expect_notifications=[dict(alarm=alarm,
|
||||
event=event)])
|
||||
|
||||
def test_skip_query_unmatched(self):
|
||||
alarm = self._alarm(query=[dict(field="traits.state",
|
||||
value="stopped",
|
||||
op="eq")])
|
||||
event = self._event(traits=[['state', 1, 'active']])
|
||||
self._do_test_event_alarm([alarm], [event],
|
||||
expect_alarm_states=[evaluator.UNKNOWN],
|
||||
expect_alarm_updates=[],
|
||||
expect_notifications=[])
|
Loading…
x
Reference in New Issue
Block a user