event-alarm: add event wrapper class
This patch adds a new wrapper class for event to hold sanitized traits and related functions. The parser and helper functions for events are moved to this new class. Also, this patch makes sure that each event will be sanitized only once when it loaded. Change-Id: I3b941ff531afea20b2403eb391ef49cf08cd65e7
This commit is contained in:
parent
5ad719a9fb
commit
cf4cc65be6
@ -13,7 +13,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import fnmatch
|
||||
import operator
|
||||
|
||||
@ -44,6 +43,75 @@ OPTS = [
|
||||
]
|
||||
|
||||
|
||||
def _sanitize_trait_value(value, trait_type):
|
||||
if trait_type in (2, 'integer'):
|
||||
return int(value)
|
||||
elif trait_type in (3, 'float'):
|
||||
return float(value)
|
||||
elif trait_type in (4, 'datetime'):
|
||||
return timeutils.normalize_time(timeutils.parse_isotime(value))
|
||||
else:
|
||||
return six.text_type(value)
|
||||
|
||||
|
||||
class InvalidEvent(Exception):
|
||||
"""Error raised when the received event is missing mandatory fields."""
|
||||
|
||||
|
||||
class Event(object):
|
||||
"""Wrapped event object to hold converted values for this evaluator."""
|
||||
|
||||
TRAIT_FIELD = 0
|
||||
TRAIT_TYPE = 1
|
||||
TRAIT_VALUE = 2
|
||||
|
||||
def __init__(self, event):
|
||||
self.obj = event
|
||||
self._validate()
|
||||
self.id = event.get('message_id')
|
||||
self._parse_traits()
|
||||
|
||||
def _validate(self):
|
||||
"""Validate received event has mandatory parameters."""
|
||||
|
||||
if not self.obj:
|
||||
LOG.error(_LE('Received invalid event (empty or None)'))
|
||||
raise InvalidEvent()
|
||||
|
||||
if not self.obj.get('event_type'):
|
||||
LOG.error(_LE('Failed to extract event_type from event = %s'),
|
||||
self.obj)
|
||||
raise InvalidEvent()
|
||||
|
||||
if not self.obj.get('message_id'):
|
||||
LOG.error(_LE('Failed to extract message_id from event = %s'),
|
||||
self.obj)
|
||||
raise InvalidEvent()
|
||||
|
||||
def _parse_traits(self):
|
||||
self.traits = {}
|
||||
self.project = ''
|
||||
for t in self.obj.get('traits', []):
|
||||
k = t[self.TRAIT_FIELD]
|
||||
v = _sanitize_trait_value(t[self.TRAIT_VALUE], t[self.TRAIT_TYPE])
|
||||
self.traits[k] = v
|
||||
if k in ('tenant_id', 'project_id'):
|
||||
self.project = v
|
||||
|
||||
def get_value(self, field):
|
||||
if field.startswith('traits.'):
|
||||
key = field.split('.', 1)[-1]
|
||||
return self.traits.get(key)
|
||||
|
||||
v = self.obj
|
||||
for f in field.split('.'):
|
||||
if hasattr(v, 'get'):
|
||||
v = v.get(f)
|
||||
else:
|
||||
return None
|
||||
return v
|
||||
|
||||
|
||||
class EventAlarmEvaluator(evaluator.Evaluator):
|
||||
|
||||
def __init__(self, conf, notifier):
|
||||
@ -58,52 +126,25 @@ class EventAlarmEvaluator(evaluator.Evaluator):
|
||||
|
||||
LOG.debug('Starting event alarm evaluation: #events = %d',
|
||||
len(events))
|
||||
for event in events:
|
||||
LOG.debug('Evaluating event: event = %s', event)
|
||||
|
||||
if not self._validate(event):
|
||||
for e in events:
|
||||
LOG.debug('Evaluating event: event = %s', e)
|
||||
try:
|
||||
event = Event(e)
|
||||
except InvalidEvent:
|
||||
LOG.debug('Aborting evaluation of the event.')
|
||||
continue
|
||||
|
||||
project = self._get_project(event)
|
||||
for id, alarm in six.iteritems(self._get_project_alarms(project)):
|
||||
for id, alarm in six.iteritems(
|
||||
self._get_project_alarms(event.project)):
|
||||
try:
|
||||
self._evaluate_alarm(alarm, event)
|
||||
except Exception:
|
||||
LOG.exception(_LE('Failed to evaluate alarm (id=%(a)s) '
|
||||
'triggered by event = %(e)s.'),
|
||||
{'a': id, 'e': event})
|
||||
{'a': id, 'e': e})
|
||||
|
||||
LOG.debug('Finished event alarm evaluation.')
|
||||
|
||||
@staticmethod
|
||||
def _validate(event):
|
||||
"""Validate received event has mandatory parameters."""
|
||||
|
||||
if not event:
|
||||
LOG.error(_LE('Received invalid event (empty or None)'))
|
||||
return False
|
||||
|
||||
if not event.get('event_type'):
|
||||
LOG.error(_LE('Failed to extract event_type from event = %s'),
|
||||
event)
|
||||
return False
|
||||
|
||||
if not event.get('message_id'):
|
||||
LOG.error(_LE('Failed to extract message_id from event = %s'),
|
||||
event)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _get_project(event):
|
||||
"""Extract project ID from the event."""
|
||||
for trait in event.get('traits') or []:
|
||||
if trait[0] in (u'tenant_id', u'project_id'):
|
||||
return trait[2]
|
||||
return ''
|
||||
|
||||
def _get_project_alarms(self, project):
|
||||
if self.conf.event_alarm_cache_ttl and project in self.caches:
|
||||
if timeutils.is_older_than(self.caches[project]['updated'],
|
||||
@ -127,13 +168,6 @@ class EventAlarmEvaluator(evaluator.Evaluator):
|
||||
|
||||
return alarms
|
||||
|
||||
@staticmethod
|
||||
def _sanitize(event):
|
||||
"""Change traits format to dict."""
|
||||
e = copy.deepcopy(event)
|
||||
e['traits'] = {t[0]: t[2] for t in event.get('traits', [])}
|
||||
return e
|
||||
|
||||
def _evaluate_alarm(self, alarm, event):
|
||||
"""Evaluate the alarm by referring the received event.
|
||||
|
||||
@ -148,7 +182,7 @@ class EventAlarmEvaluator(evaluator.Evaluator):
|
||||
|
||||
LOG.debug('Evaluating alarm (id=%(a)s) triggered by event '
|
||||
'(message_id=%(e)s).',
|
||||
{'a': alarm.alarm_id, 'e': event['message_id']})
|
||||
{'a': alarm.alarm_id, 'e': event.id})
|
||||
|
||||
if not alarm.repeat_actions and alarm.state == evaluator.ALARM:
|
||||
LOG.debug('Skip evaluation of the alarm id=%s which have already '
|
||||
@ -156,21 +190,14 @@ class EventAlarmEvaluator(evaluator.Evaluator):
|
||||
return
|
||||
|
||||
event_pattern = alarm.rule['event_type']
|
||||
if not fnmatch.fnmatch(event['event_type'], event_pattern):
|
||||
if not fnmatch.fnmatch(event.obj['event_type'], event_pattern):
|
||||
LOG.debug('Aborting evaluation of the alarm (id=%s) due to '
|
||||
'uninterested event_type.', alarm.alarm_id)
|
||||
return
|
||||
|
||||
value = self._sanitize(event)
|
||||
|
||||
def _compare(condition):
|
||||
op = COMPARATORS[condition.get('op', 'eq')]
|
||||
v = value
|
||||
for f in condition['field'].split('.'):
|
||||
if hasattr(v, 'get'):
|
||||
v = v.get(f)
|
||||
else:
|
||||
break
|
||||
v = event.get_value(condition['field'])
|
||||
LOG.debug('Comparing value=%(v)s against condition=%(c)s .',
|
||||
{'v': v, 'c': condition})
|
||||
return op(v, condition['value'])
|
||||
@ -189,8 +216,8 @@ class EventAlarmEvaluator(evaluator.Evaluator):
|
||||
state = evaluator.ALARM
|
||||
reason = (_('Event (message_id=%(message)s) hit the query of alarm '
|
||||
'(id=%(alarm)s)') %
|
||||
{'message': event['message_id'], 'alarm': alarm.alarm_id})
|
||||
reason_data = {'type': 'event', 'event': event}
|
||||
{'message': event.id, 'alarm': alarm.alarm_id})
|
||||
reason_data = {'type': 'event', 'event': event.obj}
|
||||
self._refresh(alarm, state, reason, reason_data)
|
||||
|
||||
def _refresh(self, alarm, state, reason, reason_data):
|
||||
|
@ -169,10 +169,17 @@ class TestEventAlarmEvaluate(base.TestEvaluatorBase):
|
||||
self._alarm(id=2),
|
||||
]
|
||||
event = self._event()
|
||||
original = event_evaluator.EventAlarmEvaluator._sanitize(event)
|
||||
|
||||
original = self.evaluator._fire_alarm
|
||||
|
||||
with mock.patch.object(event_evaluator.EventAlarmEvaluator,
|
||||
'_sanitize',
|
||||
side_effect=[Exception('boom'), original]):
|
||||
'_fire_alarm') as _fire_alarm:
|
||||
def _side_effect(*args, **kwargs):
|
||||
_fire_alarm.side_effect = original
|
||||
return Exception('boom')
|
||||
|
||||
_fire_alarm.side_effect = _side_effect
|
||||
|
||||
self._do_test_event_alarm(
|
||||
alarms, [event],
|
||||
expect_alarm_states={alarms[0].alarm_id: evaluator.UNKNOWN,
|
||||
|
Loading…
x
Reference in New Issue
Block a user