deduced alarm notifier, is subscribed to graph changes, upon a deduced alarm added an activate notification is sent to the notifers service
Change-Id: I4b51ea0a1f0db8c42ab8d2c9497ebecebe6d07a0
This commit is contained in:
parent
199262a27e
commit
4ec0e3a8f8
@ -35,16 +35,12 @@ class AodhDriver(AlarmDriverBase):
|
|||||||
return alarm[AodhProps.NAME]
|
return alarm[AodhProps.NAME]
|
||||||
|
|
||||||
def _get_alarms(self):
|
def _get_alarms(self):
|
||||||
|
try:
|
||||||
|
aodh_alarms = self.client.alarms.list()
|
||||||
|
return [self._convert_alarm(alarm) for alarm in aodh_alarms]
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception("Exception: %s", e)
|
||||||
return []
|
return []
|
||||||
# TODO(iafek): enable the code below
|
|
||||||
|
|
||||||
# try:
|
|
||||||
# aodh_alarms = self.client.alarms.list()
|
|
||||||
# return [_convert_alarm(alarm)
|
|
||||||
# for alarm in aodh_alarms]
|
|
||||||
# except Exception:
|
|
||||||
# LOG.error("Exception: %s", traceback.print_exc())
|
|
||||||
# return []
|
|
||||||
|
|
||||||
def _is_erroneous(self, alarm):
|
def _is_erroneous(self, alarm):
|
||||||
return alarm and alarm[AodhProps.STATE] != AodhState.OK
|
return alarm and alarm[AodhProps.STATE] != AodhState.OK
|
||||||
@ -56,22 +52,30 @@ class AodhDriver(AlarmDriverBase):
|
|||||||
def _is_valid(self, alarm):
|
def _is_valid(self, alarm):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def _convert_event_alarm(alarm):
|
def _convert_event_alarm(cls, alarm):
|
||||||
converted_alarm = AodhDriver._convert_base_alarm(alarm)
|
res = cls._convert_base_alarm(alarm)
|
||||||
event_type, resource_id = \
|
res[AodhProps.EVENT_TYPE] = alarm.event_rule[AodhProps.EVENT_TYPE],
|
||||||
AodhDriver._parse_event_rule(alarm.event_rule)
|
res[AodhProps.RESOURCE_ID] = _parse_query(alarm.event_rule,
|
||||||
converted_alarm[AodhProps.EVENT_TYPE] = event_type
|
AodhProps.RESOURCE_ID)
|
||||||
converted_alarm[AodhProps.RESOURCE_ID] = resource_id
|
return res
|
||||||
return converted_alarm
|
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def _convert_threshold_alarm(alarm):
|
def _convert_threshold_alarm(cls, alarm):
|
||||||
converted_alarm = AodhDriver._convert_base_alarm(alarm)
|
res = cls._convert_base_alarm(alarm)
|
||||||
converted_alarm[AodhProps.STATE_TIMESTAMP] = alarm.state_timestamp
|
res[AodhProps.STATE_TIMESTAMP] = alarm.state_timestamp
|
||||||
converted_alarm[AodhProps.RESOURCE_ID] = \
|
res[AodhProps.RESOURCE_ID] = _parse_query(alarm.threshold_rule,
|
||||||
AodhDriver._parse_threshold_rule(alarm.threshold_rule)
|
AodhProps.RESOURCE_ID)
|
||||||
return converted_alarm
|
return res
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _convert_vitrage_alarm(cls, alarm):
|
||||||
|
res = cls._convert_base_alarm(alarm)
|
||||||
|
res[AodhProps.VITRAGE_ID] = _parse_query(alarm.event_rule,
|
||||||
|
AodhProps.VITRAGE_ID)
|
||||||
|
res[AodhProps.RESOURCE_ID] = _parse_query(alarm.event_rule,
|
||||||
|
AodhProps.RESOURCE_ID)
|
||||||
|
return res
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _convert_base_alarm(alarm):
|
def _convert_base_alarm(alarm):
|
||||||
@ -90,32 +94,27 @@ class AodhDriver(AlarmDriverBase):
|
|||||||
AodhProps.TYPE: alarm.type
|
AodhProps.TYPE: alarm.type
|
||||||
}
|
}
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def _parse_event_rule(rule):
|
def _convert_alarm(cls, alarm):
|
||||||
event_type = rule[AodhProps.EVENT_TYPE]
|
alarm_type = alarm.type
|
||||||
resource_id = \
|
if alarm_type == AodhProps.EVENT and _is_vitrage_alarm(alarm):
|
||||||
AodhDriver._parse_resource_id(rule[AodhProps.QUERY])
|
return cls._convert_vitrage_alarm(alarm)
|
||||||
return event_type, resource_id
|
elif alarm_type == AodhProps.EVENT:
|
||||||
|
return cls._convert_event_alarm(alarm)
|
||||||
@staticmethod
|
elif alarm_type == AodhProps.THRESHOLD:
|
||||||
def _parse_threshold_rule(rule):
|
return cls._convert_threshold_alarm(alarm)
|
||||||
return AodhDriver._parse_resource_id(rule[AodhProps.QUERY])
|
else:
|
||||||
|
LOG.warning('Unsupported Aodh alarm of type %s' % alarm_type)
|
||||||
@staticmethod
|
|
||||||
def _parse_resource_id(query_fields):
|
|
||||||
for query in query_fields:
|
|
||||||
field = query['field']
|
|
||||||
if field == AodhProps.RESOURCE_ID:
|
|
||||||
return query['value']
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _convert_alarm(alarm):
|
def _parse_query(data, key):
|
||||||
alarm_type = alarm.type
|
query_fields = data.get(AodhProps.QUERY, {})
|
||||||
if alarm_type == AodhProps.EVENT:
|
for query in query_fields:
|
||||||
return AodhDriver._convert_event_alarm(alarm)
|
field = query['field']
|
||||||
elif alarm_type == AodhProps.THRESHOLD:
|
if field == key:
|
||||||
return AodhDriver._convert_threshold_alarm(alarm)
|
return query['value']
|
||||||
else:
|
return None
|
||||||
LOG.info('Unsupported Aodh alarm of type %s' % alarm_type)
|
|
||||||
|
|
||||||
|
def _is_vitrage_alarm(alarm):
|
||||||
|
return _parse_query(alarm.event_rule, AodhProps.VITRAGE_ID) is not None
|
||||||
|
@ -30,6 +30,7 @@ class AodhProperties(object):
|
|||||||
THRESHOLD = 'threshold'
|
THRESHOLD = 'threshold'
|
||||||
TIMESTAMP = 'timestamp'
|
TIMESTAMP = 'timestamp'
|
||||||
TYPE = 'type'
|
TYPE = 'type'
|
||||||
|
VITRAGE_ID = 'vitrage_id'
|
||||||
|
|
||||||
|
|
||||||
class AodhState(object):
|
class AodhState(object):
|
||||||
|
@ -36,10 +36,14 @@ class AodhTransformer(AlarmTransformerBase):
|
|||||||
super(AodhTransformer, self).__init__(transformers)
|
super(AodhTransformer, self).__init__(transformers)
|
||||||
|
|
||||||
def _create_snapshot_entity_vertex(self, entity_event):
|
def _create_snapshot_entity_vertex(self, entity_event):
|
||||||
self._create_vertex(entity_event)
|
if _is_vitrage_alarm(entity_event):
|
||||||
|
return self._create_merge_alarm_vertex(entity_event)
|
||||||
|
return self._create_vertex(entity_event)
|
||||||
|
|
||||||
def _create_update_entity_vertex(self, entity_event):
|
def _create_update_entity_vertex(self, entity_event):
|
||||||
self._create_vertex(entity_event)
|
if _is_vitrage_alarm(entity_event):
|
||||||
|
return self._create_merge_alarm_vertex(entity_event)
|
||||||
|
return self._create_vertex(entity_event)
|
||||||
|
|
||||||
def _create_vertex(self, entity_event):
|
def _create_vertex(self, entity_event):
|
||||||
metadata = {
|
metadata = {
|
||||||
@ -74,6 +78,30 @@ class AodhTransformer(AlarmTransformerBase):
|
|||||||
update_timestamp=update_timestamp,
|
update_timestamp=update_timestamp,
|
||||||
metadata=metadata)
|
metadata=metadata)
|
||||||
|
|
||||||
|
def _create_merge_alarm_vertex(self, entity_event):
|
||||||
|
"""Handle an alarm that already has a vitrage_id
|
||||||
|
|
||||||
|
This is a deduced alarm created in aodh by vitrage, so it already
|
||||||
|
exists in the graph.
|
||||||
|
This function will update the exiting vertex (and not create a new one)
|
||||||
|
"""
|
||||||
|
metadata = {
|
||||||
|
AodhProps.DESCRIPTION: entity_event[AodhProps.DESCRIPTION],
|
||||||
|
VProps.PROJECT_ID: entity_event[AodhProps.PROJECT_ID],
|
||||||
|
}
|
||||||
|
sample_timestamp = entity_event[DSProps.SAMPLE_DATE]
|
||||||
|
update_timestamp = self._format_update_timestamp(
|
||||||
|
AodhTransformer._timestamp(entity_event), sample_timestamp)
|
||||||
|
|
||||||
|
return graph_utils.create_vertex(
|
||||||
|
self._create_entity_key(entity_event),
|
||||||
|
entity_id=entity_event.get(AodhProps.ALARM_ID),
|
||||||
|
entity_category=EntityCategory.ALARM,
|
||||||
|
entity_type='vitrage',
|
||||||
|
sample_timestamp=sample_timestamp,
|
||||||
|
update_timestamp=update_timestamp,
|
||||||
|
metadata=metadata)
|
||||||
|
|
||||||
def _create_neighbors(self, entity_event):
|
def _create_neighbors(self, entity_event):
|
||||||
graph_neighbors = entity_event.get(self.QUERY_RESULT, [])
|
graph_neighbors = entity_event.get(self.QUERY_RESULT, [])
|
||||||
result = []
|
result = []
|
||||||
@ -89,6 +117,9 @@ class AodhTransformer(AlarmTransformerBase):
|
|||||||
return entity_event[AodhProps.STATE] == self.STATUS_OK
|
return entity_event[AodhProps.STATE] == self.STATUS_OK
|
||||||
|
|
||||||
def _create_entity_key(self, entity_event):
|
def _create_entity_key(self, entity_event):
|
||||||
|
if _is_vitrage_alarm(entity_event):
|
||||||
|
return entity_event.get(AodhProps.VITRAGE_ID)
|
||||||
|
|
||||||
sync_type = entity_event[DSProps.SYNC_TYPE]
|
sync_type = entity_event[DSProps.SYNC_TYPE]
|
||||||
alarm_name = entity_event[AodhProps.NAME]
|
alarm_name = entity_event[AodhProps.NAME]
|
||||||
resource_id = entity_event[AodhProps.RESOURCE_ID]
|
resource_id = entity_event[AodhProps.RESOURCE_ID]
|
||||||
@ -110,3 +141,7 @@ class AodhTransformer(AlarmTransformerBase):
|
|||||||
if not affected_resource_id:
|
if not affected_resource_id:
|
||||||
return None
|
return None
|
||||||
return {VProps.ID: affected_resource_id}
|
return {VProps.ID: affected_resource_id}
|
||||||
|
|
||||||
|
|
||||||
|
def _is_vitrage_alarm(entity_event):
|
||||||
|
return entity_event.get(AodhProps.VITRAGE_ID) is not None
|
||||||
|
@ -22,4 +22,8 @@ OPTS = [
|
|||||||
help='A path for the configuration files of the data sources'
|
help='A path for the configuration files of the data sources'
|
||||||
' values'
|
' values'
|
||||||
),
|
),
|
||||||
|
cfg.StrOpt('notifier_topic',
|
||||||
|
default='vitrage.graph',
|
||||||
|
help='The topic that vitrage-graph uses for alarm '
|
||||||
|
'notification messages.'),
|
||||||
]
|
]
|
||||||
|
95
vitrage/entity_graph/processor/notifier.py
Normal file
95
vitrage/entity_graph/processor/notifier.py
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
# Copyright 2016 - Nokia
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
from oslo_log import log
|
||||||
|
import oslo_messaging
|
||||||
|
|
||||||
|
from vitrage.common.constants import EntityCategory
|
||||||
|
from vitrage.common.constants import NotifierEventTypes
|
||||||
|
from vitrage.common.constants import VertexProperties as VProps
|
||||||
|
from vitrage.evaluator.actions import evaluator_event_transformer as evaluator
|
||||||
|
from vitrage.messaging import get_transport
|
||||||
|
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class DeducedAlarmNotifier(object):
|
||||||
|
"""Allows writing to message bus"""
|
||||||
|
def __init__(self, conf):
|
||||||
|
self.oslo_notifier = None
|
||||||
|
try:
|
||||||
|
topic = conf.entity_graph.notifier_topic
|
||||||
|
notifier_plugins = conf.notifiers
|
||||||
|
if not topic or not notifier_plugins:
|
||||||
|
LOG.info('DeducedAlarmNotifier is disabled')
|
||||||
|
return
|
||||||
|
|
||||||
|
self.oslo_notifier = oslo_messaging.Notifier(
|
||||||
|
get_transport(conf),
|
||||||
|
driver='messagingv2',
|
||||||
|
publisher_id='vitrage.deduced',
|
||||||
|
topic=topic)
|
||||||
|
except Exception:
|
||||||
|
LOG.info('DeducedAlarmNotifier missing configuration')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def enabled(self):
|
||||||
|
return self.oslo_notifier is not None
|
||||||
|
|
||||||
|
def notify_when_applicable(self, before, current, is_vertex):
|
||||||
|
"""Callback subscribed to driver.graph updates
|
||||||
|
|
||||||
|
:param is_vertex:
|
||||||
|
:param before: The graph element (vertex or edge) prior to the
|
||||||
|
change that happened. None if the element was just created.
|
||||||
|
:param current: The graph element (vertex or edge) after the
|
||||||
|
change that happened. Deleted elements should arrive with the
|
||||||
|
is_deleted property set to True
|
||||||
|
"""
|
||||||
|
notification_type = _get_notification_type(before, current, is_vertex)
|
||||||
|
if not notification_type:
|
||||||
|
return
|
||||||
|
|
||||||
|
LOG.debug('DeducedAlarmNotifier : %s', notification_type)
|
||||||
|
LOG.debug('DeducedAlarmNotifier : %s', current.properties)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.oslo_notifier.info({}, notification_type, current.properties)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception('DeducedAlarmNotifier cannot notify - %s', e)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_notification_type(before, current, is_vertex):
|
||||||
|
if not is_vertex:
|
||||||
|
return None
|
||||||
|
if not _is_active_deduced_alarm(before) and \
|
||||||
|
_is_active_deduced_alarm(current):
|
||||||
|
return NotifierEventTypes.ACTIVATE_DEDUCED_ALARM_EVENT
|
||||||
|
if _is_active_deduced_alarm(before) and \
|
||||||
|
not _is_active_deduced_alarm(current):
|
||||||
|
return NotifierEventTypes.DEACTIVATE_DEDUCED_ALARM_EVENT
|
||||||
|
|
||||||
|
|
||||||
|
def _is_active_deduced_alarm(vertex):
|
||||||
|
if not vertex:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not (vertex.get(VProps.CATEGORY) == EntityCategory.ALARM and
|
||||||
|
vertex.get(VProps.TYPE) == evaluator.VITRAGE_TYPE):
|
||||||
|
return False
|
||||||
|
|
||||||
|
if vertex.get(VProps.IS_DELETED, False) or \
|
||||||
|
vertex.get(VProps.IS_PLACEHOLDER, False):
|
||||||
|
return False
|
||||||
|
return True
|
@ -20,6 +20,7 @@ from vitrage.common.constants import VertexProperties as VProps
|
|||||||
from vitrage.datasources.transformer_base import TransformerBase
|
from vitrage.datasources.transformer_base import TransformerBase
|
||||||
from vitrage.entity_graph.processor import base as processor
|
from vitrage.entity_graph.processor import base as processor
|
||||||
from vitrage.entity_graph.processor import entity_graph
|
from vitrage.entity_graph.processor import entity_graph
|
||||||
|
from vitrage.entity_graph.processor.notifier import DeducedAlarmNotifier
|
||||||
from vitrage.entity_graph.states.state_manager import StateManager
|
from vitrage.entity_graph.states.state_manager import StateManager
|
||||||
from vitrage.entity_graph.transformer_manager import TransformerManager
|
from vitrage.entity_graph.transformer_manager import TransformerManager
|
||||||
from vitrage.graph import Direction
|
from vitrage.graph import Direction
|
||||||
@ -38,6 +39,7 @@ class Processor(processor.ProcessorBase):
|
|||||||
self.initialization_status = initialization_status
|
self.initialization_status = initialization_status
|
||||||
self.entity_graph = entity_graph.EntityGraph("Entity Graph") if \
|
self.entity_graph = entity_graph.EntityGraph("Entity Graph") if \
|
||||||
e_graph is None else e_graph
|
e_graph is None else e_graph
|
||||||
|
self._notifier = DeducedAlarmNotifier(conf)
|
||||||
|
|
||||||
def process_event(self, event):
|
def process_event(self, event):
|
||||||
"""Decides which action to run on given event
|
"""Decides which action to run on given event
|
||||||
@ -55,7 +57,7 @@ class Processor(processor.ProcessorBase):
|
|||||||
self._enrich_event(event)
|
self._enrich_event(event)
|
||||||
entity = self.transformer_manager.transform(event)
|
entity = self.transformer_manager.transform(event)
|
||||||
self._calculate_aggregated_state(entity.vertex, entity.action)
|
self._calculate_aggregated_state(entity.vertex, entity.action)
|
||||||
return self.actions[entity.action](entity.vertex, entity.neighbors)
|
self.actions[entity.action](entity.vertex, entity.neighbors)
|
||||||
|
|
||||||
def create_entity(self, new_vertex, neighbors):
|
def create_entity(self, new_vertex, neighbors):
|
||||||
"""Adds new vertex to the entity graph
|
"""Adds new vertex to the entity graph
|
||||||
@ -97,8 +99,8 @@ class Processor(processor.ProcessorBase):
|
|||||||
updated_vertex)
|
updated_vertex)
|
||||||
self._update_neighbors(updated_vertex, neighbors)
|
self._update_neighbors(updated_vertex, neighbors)
|
||||||
else:
|
else:
|
||||||
LOG.info("Update event arrived on invalid resource: %s",
|
LOG.warning("Update event arrived on invalid resource: %s",
|
||||||
updated_vertex)
|
updated_vertex)
|
||||||
|
|
||||||
def delete_entity(self, deleted_vertex, neighbors):
|
def delete_entity(self, deleted_vertex, neighbors):
|
||||||
"""Deletes the vertex from the entity graph
|
"""Deletes the vertex from the entity graph
|
||||||
@ -132,8 +134,8 @@ class Processor(processor.ProcessorBase):
|
|||||||
|
|
||||||
self.entity_graph.mark_vertex_as_deleted(deleted_vertex)
|
self.entity_graph.mark_vertex_as_deleted(deleted_vertex)
|
||||||
else:
|
else:
|
||||||
LOG.info("Delete event arrived on invalid resource: %s",
|
LOG.warning("Delete event arrived on invalid resource: %s",
|
||||||
deleted_vertex)
|
deleted_vertex)
|
||||||
|
|
||||||
def update_relationship(self, entity_vertex, neighbors):
|
def update_relationship(self, entity_vertex, neighbors):
|
||||||
LOG.debug('Update relationship in entity graph:\n%s', neighbors)
|
LOG.debug('Update relationship in entity graph:\n%s', neighbors)
|
||||||
@ -159,6 +161,12 @@ class Processor(processor.ProcessorBase):
|
|||||||
len(self.conf.datasources.types):
|
len(self.conf.datasources.types):
|
||||||
self.initialization_status.status = \
|
self.initialization_status.status = \
|
||||||
self.initialization_status.RECEIVED_ALL_END_MESSAGES
|
self.initialization_status.RECEIVED_ALL_END_MESSAGES
|
||||||
|
self.do_on_initialization_end()
|
||||||
|
|
||||||
|
def do_on_initialization_end(self):
|
||||||
|
if self._notifier.enabled:
|
||||||
|
self.entity_graph.subscribe(self._notifier.notify_when_applicable)
|
||||||
|
LOG.info('Graph notifications subscription added')
|
||||||
|
|
||||||
def _update_neighbors(self, vertex, neighbors):
|
def _update_neighbors(self, vertex, neighbors):
|
||||||
"""Updates vertices neighbor connections
|
"""Updates vertices neighbor connections
|
||||||
|
@ -58,7 +58,7 @@ class NXGraph(Graph):
|
|||||||
self_copy._g = self._g.copy()
|
self_copy._g = self._g.copy()
|
||||||
return self_copy
|
return self_copy
|
||||||
|
|
||||||
@Notifier.add_notify
|
@Notifier.update_notify
|
||||||
def add_vertex(self, v):
|
def add_vertex(self, v):
|
||||||
"""Add a vertex to the graph
|
"""Add a vertex to the graph
|
||||||
|
|
||||||
@ -71,7 +71,7 @@ class NXGraph(Graph):
|
|||||||
properties_copy = copy.copy(v.properties)
|
properties_copy = copy.copy(v.properties)
|
||||||
self._g.add_node(n=v.vertex_id, attr_dict=properties_copy)
|
self._g.add_node(n=v.vertex_id, attr_dict=properties_copy)
|
||||||
|
|
||||||
@Notifier.add_notify
|
@Notifier.update_notify
|
||||||
def add_edge(self, e):
|
def add_edge(self, e):
|
||||||
"""Add an edge to the graph
|
"""Add an edge to the graph
|
||||||
|
|
||||||
|
@ -12,4 +12,9 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
pass
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
OPTS = [
|
||||||
|
cfg.ListOpt('notifiers',
|
||||||
|
help='Names of enabled notifiers (example aodh)'),
|
||||||
|
]
|
||||||
|
@ -11,87 +11,92 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
import random
|
|
||||||
import string
|
|
||||||
|
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
|
||||||
from vitrage import clients
|
from vitrage import clients
|
||||||
from vitrage.common.constants import NotifierEventTypes
|
from vitrage.common.constants import NotifierEventTypes
|
||||||
|
from vitrage.common.constants import VertexProperties as VProps
|
||||||
|
from vitrage.datasources.aodh.properties import AodhState
|
||||||
|
from vitrage.entity_graph.states.normalized_alarm_severity import \
|
||||||
|
NormalizedAlarmSeverity
|
||||||
from vitrage.notifier.plugins.base import NotifierBase
|
from vitrage.notifier.plugins.base import NotifierBase
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def aodh_alarm_name_generator(name, unique=None, size=6,
|
severity_translation = {
|
||||||
chars=string.ascii_uppercase + string.digits):
|
NormalizedAlarmSeverity.CRITICAL: 'critical',
|
||||||
if unique:
|
NormalizedAlarmSeverity.SEVERE: 'moderate',
|
||||||
return name.join(['_', unique])
|
NormalizedAlarmSeverity.WARNING: 'low',
|
||||||
else:
|
}
|
||||||
unique = ''.join(random.choice(chars) for _ in range(size))
|
|
||||||
return name.join(['_', unique])
|
|
||||||
|
|
||||||
|
|
||||||
class AodhNotifier(NotifierBase):
|
class AodhNotifier(NotifierBase):
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_notifier_name():
|
||||||
|
return 'aodh'
|
||||||
|
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
super(AodhNotifier, self).__init__(conf)
|
super(AodhNotifier, self).__init__(conf)
|
||||||
self.client = clients.ceilometer_client(conf)
|
self.client = clients.ceilometer_client(conf)
|
||||||
|
|
||||||
def process_event(self, data, event_type):
|
def process_event(self, data, event_type):
|
||||||
if event_type == NotifierEventTypes.DEACTIVATE_DEDUCED_ALARM_EVENT:
|
response = None
|
||||||
self._deactivate_aodh_alarm(data)
|
if event_type == NotifierEventTypes.ACTIVATE_DEDUCED_ALARM_EVENT:
|
||||||
elif event_type == NotifierEventTypes.ACTIVATE_DEDUCED_ALARM_EVENT:
|
if not data.get(VProps.ID):
|
||||||
self._activate_aodh_alarm(data)
|
response = self._create_aodh_alarm(data, AodhState.ALARM)
|
||||||
|
else:
|
||||||
|
response = self._update_aodh_alarm(data, AodhState.ALARM)
|
||||||
|
elif event_type == NotifierEventTypes.DEACTIVATE_DEDUCED_ALARM_EVENT:
|
||||||
|
response = self._update_aodh_alarm(data, AodhState.OK)
|
||||||
|
|
||||||
# noinspection PyMethodMayBeStatic
|
if response and response.alarm_id:
|
||||||
def _activate_aodh_alarm(self, data):
|
LOG.info('Aodh Alarm id %s: ', response.alarm_id)
|
||||||
LOG.info('### Activate aodh alarm')
|
else:
|
||||||
# alarm_name = aodh_alarm_name_generator(
|
LOG.error('Failed to %s Aodh Alarm \n%s', event_type, str(data))
|
||||||
# data.get(VProps.NAME),
|
|
||||||
# data.get('affected_resource_id'))
|
|
||||||
# query = [dict(
|
|
||||||
# field='resource_id',
|
|
||||||
# type='string',
|
|
||||||
# op='eq',
|
|
||||||
# value=data.get('affected_resource_id'))]
|
|
||||||
# severity = data.get(VProps.SEVERITY)
|
|
||||||
# try:
|
|
||||||
# alarm = self.client.alarms.create(
|
|
||||||
# name=alarm_name,
|
|
||||||
# description='Vitrage deduced alarm',
|
|
||||||
# query=query,
|
|
||||||
# severity=severity,
|
|
||||||
# state='alarm',
|
|
||||||
# type='event',
|
|
||||||
# event_rule={"event_type": '*'})
|
|
||||||
# LOG.info('Aodh Alarm created: ' + str(alarm))
|
|
||||||
# except Exception as e:
|
|
||||||
# LOG.exception('Failed to create Aodh Alarm, Got Exception: %s',e)
|
|
||||||
# name
|
|
||||||
# description
|
|
||||||
# type' : event or threshold
|
|
||||||
# threshold_rule
|
|
||||||
# event_rule
|
|
||||||
# state': ok, alarm, insufficient data
|
|
||||||
# severity': moderate, critical, low
|
|
||||||
# enabled
|
|
||||||
# alarm_actions
|
|
||||||
# ok_actions
|
|
||||||
# insufficient_data_actions
|
|
||||||
# repeat_actions
|
|
||||||
# project_id
|
|
||||||
# user_id
|
|
||||||
# time_constraints
|
|
||||||
|
|
||||||
# noinspection PyMethodMayBeStatic
|
def _create_aodh_alarm(self, alarm, state):
|
||||||
def _deactivate_aodh_alarm(self, data):
|
alarm_request = _alarm_request(alarm, state)
|
||||||
LOG.info('### Deactivate aodh alarm')
|
try:
|
||||||
# try:
|
LOG.info('Aodh Alarm - Activate: ' + str(alarm_request))
|
||||||
# alarm = self.client.alarms.update(
|
return self.client.alarms.create(**alarm_request)
|
||||||
# alarm_id=data.get(VProps.ID),
|
except Exception as e:
|
||||||
# state='ok')
|
LOG.exception('Failed to activate Aodh Alarm Got Exception: %s', e)
|
||||||
# LOG.info('Aodh Alarm deactivated ' + str(alarm))
|
return
|
||||||
# except Exception as e:
|
|
||||||
# LOG.exception('Failed to update Aodh Alarm, Got Exception: %s',e)
|
def _update_aodh_alarm(self, alarm, state):
|
||||||
|
aodh_id = alarm.get(VProps.ID)
|
||||||
|
try:
|
||||||
|
LOG.info('Aodh Alarm $%s update state %s', aodh_id, state)
|
||||||
|
return self.client.alarms.update(alarm_id=aodh_id, state=state)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception('Failed to update Aodh Alarm Got Exception: %s', e)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def _alarm_request(data, state):
|
||||||
|
# TODO(ihefetz) resource id should come from the alarm
|
||||||
|
affected_resource_id = data.get(VProps.VITRAGE_ID).replace(
|
||||||
|
'ALARM:vitrage:deduced_vm_alarm:RESOURCE:nova.instance:', '')
|
||||||
|
alarm_name = data.get(VProps.NAME)
|
||||||
|
aodh_alarm_name = '_'.join([alarm_name, affected_resource_id])
|
||||||
|
severity = severity_translation.get(data.get(VProps.SEVERITY), 'low')
|
||||||
|
return dict(
|
||||||
|
name=aodh_alarm_name,
|
||||||
|
description=u'Vitrage deduced alarm',
|
||||||
|
event_rule=dict(query=[
|
||||||
|
dict(
|
||||||
|
field=u'resource_id',
|
||||||
|
type='',
|
||||||
|
op=u'eq',
|
||||||
|
value=affected_resource_id),
|
||||||
|
dict(
|
||||||
|
field=u'vitrage_id',
|
||||||
|
type='',
|
||||||
|
op=u'eq',
|
||||||
|
value=data.get(VProps.VITRAGE_ID))]),
|
||||||
|
severity=severity,
|
||||||
|
state=state,
|
||||||
|
type=u'event')
|
||||||
|
@ -25,3 +25,8 @@ class NotifierBase(object):
|
|||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def process_event(self, data, event_type):
|
def process_event(self, data, event_type):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
@abc.abstractmethod
|
||||||
|
def get_notifier_name():
|
||||||
|
pass
|
||||||
|
@ -28,9 +28,9 @@ class VitrageNotifierService(os_service.Service):
|
|||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
super(VitrageNotifierService, self).__init__()
|
super(VitrageNotifierService, self).__init__()
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
self.notifiers = [AodhNotifier(conf)]
|
self.notifiers = self.get_notifier_plugins(conf)
|
||||||
transport = messaging.get_transport(conf)
|
transport = messaging.get_transport(conf)
|
||||||
target = oslo_messaging.Target(topic='stam')
|
target = oslo_messaging.Target(topic=conf.entity_graph.notifier_topic)
|
||||||
self.listener = messaging.get_notification_listener(
|
self.listener = messaging.get_notification_listener(
|
||||||
transport, [target],
|
transport, [target],
|
||||||
[VitrageEventEndpoint(self.notifiers)])
|
[VitrageEventEndpoint(self.notifiers)])
|
||||||
@ -52,6 +52,20 @@ class VitrageNotifierService(os_service.Service):
|
|||||||
|
|
||||||
LOG.info("Vitrage Notifier Service - Stopped!")
|
LOG.info("Vitrage Notifier Service - Stopped!")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_notifier_plugins(conf):
|
||||||
|
notifiers = []
|
||||||
|
conf_notifier_names = conf.notifiers
|
||||||
|
if not conf_notifier_names:
|
||||||
|
LOG.info('There are no notifier plugins in configuration')
|
||||||
|
return []
|
||||||
|
for plugin in [AodhNotifier]:
|
||||||
|
plugin_name = plugin.get_notifier_name()
|
||||||
|
if plugin_name in conf_notifier_names:
|
||||||
|
LOG.info('Notifier plugin %s started', plugin_name)
|
||||||
|
notifiers.append(plugin(conf))
|
||||||
|
return notifiers
|
||||||
|
|
||||||
|
|
||||||
class VitrageEventEndpoint(object):
|
class VitrageEventEndpoint(object):
|
||||||
|
|
||||||
|
@ -23,6 +23,7 @@ import vitrage.datasources
|
|||||||
import vitrage.entity_graph.consistency
|
import vitrage.entity_graph.consistency
|
||||||
import vitrage.evaluator
|
import vitrage.evaluator
|
||||||
import vitrage.keystone_client
|
import vitrage.keystone_client
|
||||||
|
import vitrage.notifier
|
||||||
import vitrage.rpc
|
import vitrage.rpc
|
||||||
|
|
||||||
DATASOURCES_PATH = 'vitrage.datasources.'
|
DATASOURCES_PATH = 'vitrage.datasources.'
|
||||||
@ -39,7 +40,10 @@ def list_opts():
|
|||||||
('consistency', vitrage.entity_graph.consistency.OPTS),
|
('consistency', vitrage.entity_graph.consistency.OPTS),
|
||||||
('entity_graph', vitrage.entity_graph.OPTS),
|
('entity_graph', vitrage.entity_graph.OPTS),
|
||||||
('service_credentials', vitrage.keystone_client.OPTS),
|
('service_credentials', vitrage.keystone_client.OPTS),
|
||||||
('DEFAULT', itertools.chain(vitrage.clients.OPTS, vitrage.rpc.OPTS))
|
('DEFAULT', itertools.chain(
|
||||||
|
vitrage.clients.OPTS,
|
||||||
|
vitrage.rpc.OPTS,
|
||||||
|
vitrage.notifier.OPTS))
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
15
vitrage/tests/unit/notifier/__init__.py
Normal file
15
vitrage/tests/unit/notifier/__init__.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
# Copyright 2015 - Alcatel-Lucent
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
__author__ = 'stack'
|
105
vitrage/tests/unit/notifier/test_notifier.py
Normal file
105
vitrage/tests/unit/notifier/test_notifier.py
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
# Copyright 2016 - Alcatel-Lucent
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
test_vitrage graph
|
||||||
|
----------------------------------
|
||||||
|
|
||||||
|
Tests for `vitrage` graph driver
|
||||||
|
"""
|
||||||
|
|
||||||
|
from vitrage.common.constants import EntityCategory
|
||||||
|
from vitrage.common.constants import NotifierEventTypes as NType
|
||||||
|
from vitrage.common.constants import VertexProperties as VProps
|
||||||
|
from vitrage.entity_graph.processor.notifier import _get_notification_type
|
||||||
|
from vitrage.evaluator.actions import evaluator_event_transformer as evaluator
|
||||||
|
from vitrage.graph import Vertex
|
||||||
|
from vitrage.tests import base
|
||||||
|
|
||||||
|
|
||||||
|
resource = Vertex('123', {
|
||||||
|
VProps.CATEGORY: EntityCategory.RESOURCE,
|
||||||
|
VProps.TYPE: 'some_resource_type',
|
||||||
|
VProps.IS_DELETED: False,
|
||||||
|
VProps.IS_PLACEHOLDER: False,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
deduced_alarm = Vertex('123', {
|
||||||
|
VProps.CATEGORY: EntityCategory.ALARM,
|
||||||
|
VProps.TYPE: evaluator.VITRAGE_TYPE,
|
||||||
|
VProps.IS_DELETED: False,
|
||||||
|
VProps.IS_PLACEHOLDER: False,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
non_deduced_alarm = Vertex('123', {
|
||||||
|
VProps.CATEGORY: EntityCategory.ALARM,
|
||||||
|
VProps.TYPE: 'TEST_ALARM',
|
||||||
|
VProps.IS_DELETED: False,
|
||||||
|
VProps.IS_PLACEHOLDER: True,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
deleted_alarm = Vertex('123', {
|
||||||
|
VProps.CATEGORY: EntityCategory.ALARM,
|
||||||
|
VProps.TYPE: evaluator.VITRAGE_TYPE,
|
||||||
|
VProps.IS_DELETED: True,
|
||||||
|
VProps.IS_PLACEHOLDER: False,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
placeholder_alarm = Vertex('123', {
|
||||||
|
VProps.CATEGORY: EntityCategory.ALARM,
|
||||||
|
VProps.TYPE: evaluator.VITRAGE_TYPE,
|
||||||
|
VProps.IS_DELETED: False,
|
||||||
|
VProps.IS_PLACEHOLDER: True,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class GraphTest(base.BaseTest):
|
||||||
|
def test_notification_type_new_alarm(self):
|
||||||
|
ret = _get_notification_type(None, deduced_alarm, True)
|
||||||
|
self.assertEqual(NType.ACTIVATE_DEDUCED_ALARM_EVENT, ret,
|
||||||
|
'new alarm should notify activate')
|
||||||
|
|
||||||
|
ret = _get_notification_type(None, non_deduced_alarm, True)
|
||||||
|
self.assertIsNone(ret, 'alarm that is not a deduced alarm')
|
||||||
|
|
||||||
|
def test_notification_type_deleted_alarm(self):
|
||||||
|
ret = _get_notification_type(deduced_alarm, deleted_alarm, True)
|
||||||
|
self.assertEqual(NType.DEACTIVATE_DEDUCED_ALARM_EVENT, ret,
|
||||||
|
'deleted alarm should notify deactivate')
|
||||||
|
|
||||||
|
def test_notification_type_resource_vertex(self):
|
||||||
|
ret = _get_notification_type(None, resource, True)
|
||||||
|
self.assertIsNone(ret, 'any non alarm vertex should be ignored')
|
||||||
|
|
||||||
|
def test_notification_type_updated_alarm(self):
|
||||||
|
ret = _get_notification_type(deduced_alarm, deduced_alarm, True)
|
||||||
|
self.assertIsNone(ret, 'A not new alarm vertex should be ignored')
|
||||||
|
|
||||||
|
ret = _get_notification_type(deleted_alarm, deduced_alarm, True)
|
||||||
|
self.assertEqual(NType.ACTIVATE_DEDUCED_ALARM_EVENT, ret,
|
||||||
|
'old alarm become not deleted should notify activate')
|
||||||
|
|
||||||
|
ret = _get_notification_type(placeholder_alarm, deduced_alarm, True)
|
||||||
|
self.assertEqual(NType.ACTIVATE_DEDUCED_ALARM_EVENT, ret,
|
||||||
|
'placeholder become active should notify activate')
|
||||||
|
|
||||||
|
def test_notification_type_placeholder_alarm(self):
|
||||||
|
ret = _get_notification_type(None, placeholder_alarm, True)
|
||||||
|
self.assertIsNone(ret, 'A not new alarm vertex should be ignored')
|
Loading…
x
Reference in New Issue
Block a user