Enrich aodh sync event so it can be connected to the graph
Change-Id: Ie0de4f265ede2024e27ffde2cd8ed11b8bc43380
This commit is contained in:
parent
e29d4f30e2
commit
6c679d8c5f
@ -50,7 +50,8 @@ class Processor(processor.ProcessorBase):
|
||||
"""
|
||||
|
||||
LOG.debug('Processor event received')
|
||||
entity = self.transform_entity(event)
|
||||
self.transformer_manager.enrich_event(event, self.entity_graph)
|
||||
entity = self.transformer_manager.transform(event)
|
||||
self._calculate_aggregated_state(entity.vertex, entity.action)
|
||||
return self.actions[entity.action](entity.vertex, entity.neighbors)
|
||||
|
||||
@ -156,10 +157,6 @@ class Processor(processor.ProcessorBase):
|
||||
self.initialization_status.status = \
|
||||
self.initialization_status.RECEIVED_ALL_END_MESSAGES
|
||||
|
||||
def transform_entity(self, event):
|
||||
entity = self.transformer_manager.transform(event)
|
||||
return entity
|
||||
|
||||
def _update_neighbors(self, vertex, neighbors):
|
||||
"""Updates vertices neighbor connections
|
||||
|
||||
|
@ -62,22 +62,24 @@ class TransformerManager(object):
|
||||
return transformer
|
||||
|
||||
def transform(self, entity_event):
|
||||
try:
|
||||
sync_type = entity_event[SyncProps.SYNC_TYPE]
|
||||
LOG.debug('TRANSFORMER EVENT: %s', sync_type)
|
||||
LOG.debug('Event:\n%s', entity_event)
|
||||
except KeyError:
|
||||
raise VitrageTransformerError(
|
||||
'Entity Event must contains sync_type field.')
|
||||
|
||||
sync_type = self._get_sync_type(entity_event)
|
||||
LOG.debug('TRANSFORMER EVENT: %s', sync_type)
|
||||
LOG.debug('Event:\n%s', entity_event)
|
||||
return self.get_transformer(sync_type).transform(entity_event)
|
||||
|
||||
def extract_key(self, entity_event):
|
||||
def enrich_event(self, entity_event, graph):
|
||||
sync_type = self._get_sync_type(entity_event)
|
||||
return self.get_transformer(sync_type).enrich_event(entity_event,
|
||||
graph)
|
||||
|
||||
def extract_key(self, entity_event):
|
||||
sync_type = self._get_sync_type(entity_event)
|
||||
return self.get_transformer(sync_type)._create_entity_key()
|
||||
|
||||
@staticmethod
|
||||
def _get_sync_type(entity_event):
|
||||
try:
|
||||
sync_type = entity_event[SyncProps.SYNC_TYPE]
|
||||
return entity_event[SyncProps.SYNC_TYPE]
|
||||
except KeyError:
|
||||
raise VitrageTransformerError(
|
||||
'Entity Event must contains sync_type field.')
|
||||
|
||||
return self.get_transformer(sync_type)._create_entity_key()
|
||||
|
@ -36,3 +36,9 @@ class AodhState(object):
|
||||
OK = 'ok'
|
||||
ALARM = 'alarm'
|
||||
INSUFFICIENT_DATA = 'insufficient_data'
|
||||
|
||||
|
||||
class EventProps(object):
|
||||
AFFECTED_TYPE = 'affected_resource_type'
|
||||
AFFECTED_CATEGORY = 'affected_resource_category'
|
||||
RESOURCE_VERTEX_ID = 'resource_vertex_id'
|
||||
|
@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
from oslo_log import log as logging
|
||||
|
||||
from vitrage.common.constants import EdgeLabels
|
||||
from vitrage.common.constants import EntityCategory
|
||||
from vitrage.common.constants import SynchronizerProperties as SyncProps
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
@ -20,11 +21,13 @@ from vitrage.common import datetime_utils
|
||||
import vitrage.graph.utils as graph_utils
|
||||
from vitrage.synchronizer.plugins.aodh.properties import AodhProperties \
|
||||
as AodhProps
|
||||
from vitrage.synchronizer.plugins.aodh.properties import EventProps
|
||||
from vitrage.synchronizer.plugins.base.alarm.properties \
|
||||
import AlarmProperties as AlarmProps
|
||||
from vitrage.synchronizer.plugins.base.alarm.transformer \
|
||||
import BaseAlarmTransformer
|
||||
from vitrage.synchronizer.plugins import transformer_base as tbase
|
||||
from vitrage.synchronizer.plugins.transformer_base import Neighbor
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -71,8 +74,23 @@ class AodhTransformer(BaseAlarmTransformer):
|
||||
|
||||
# noinspection PyMethodMayBeStatic
|
||||
def _create_neighbors(self, entity_event):
|
||||
# TODO(iafek): get neighbour resource by its id
|
||||
return []
|
||||
resource_id = entity_event[AodhProps.RESOURCE_ID]
|
||||
resource_type = entity_event['affected_resource_type']
|
||||
resource_category = entity_event['affected_resource_category']
|
||||
resource_vertex_id = entity_event['resource_vertex_id']
|
||||
vertex = graph_utils.create_vertex(
|
||||
resource_vertex_id,
|
||||
entity_id=resource_id,
|
||||
entity_category=resource_category,
|
||||
entity_type=resource_type,
|
||||
sample_timestamp=entity_event[SyncProps.SAMPLE_DATE],
|
||||
is_placeholder=True)
|
||||
|
||||
edge = graph_utils.create_edge(
|
||||
source_id=self.extract_key(entity_event),
|
||||
target_id=resource_vertex_id,
|
||||
relationship_type=EdgeLabels.ON)
|
||||
return [Neighbor(vertex, edge)]
|
||||
|
||||
def _ok_status(self, entity_event):
|
||||
return entity_event[AodhProps.STATE] == self.STATUS_OK
|
||||
@ -92,3 +110,19 @@ class AodhTransformer(BaseAlarmTransformer):
|
||||
entity_event[AodhProps.TIMESTAMP],
|
||||
'%Y-%m-%dT%H:%M:%S.%f',
|
||||
tbase.TIMESTAMP_FORMAT)
|
||||
|
||||
@staticmethod
|
||||
def enrich_event(event, graph):
|
||||
affected_resource_id = event.get(AodhProps.RESOURCE_ID, None)
|
||||
if not affected_resource_id:
|
||||
return
|
||||
|
||||
vertices = graph.get_vertices({VProps.ID: affected_resource_id})
|
||||
LOG.debug('affected resource id %s found %s items',
|
||||
affected_resource_id, str(len(vertices)))
|
||||
if len(vertices) != 1:
|
||||
LOG.error('Unknown affected resource id %s', affected_resource_id)
|
||||
return
|
||||
event[EventProps.AFFECTED_TYPE] = vertices[0][VProps.TYPE]
|
||||
event[EventProps.AFFECTED_CATEGORY] = vertices[0][VProps.CATEGORY]
|
||||
event[EventProps.RESOURCE_VERTEX_ID] = vertices[0].vertex_id
|
||||
|
@ -203,3 +203,7 @@ class TransformerBase(object):
|
||||
@staticmethod
|
||||
def _format_update_timestamp(update_timestamp, sample_timestamp):
|
||||
return update_timestamp if update_timestamp else sample_timestamp
|
||||
|
||||
@staticmethod
|
||||
def enrich_event(event, graph):
|
||||
pass
|
||||
|
@ -98,7 +98,8 @@ class TestEntityGraphUnitBase(base.BaseTest):
|
||||
if processor is None:
|
||||
processor = proc.Processor(self.conf, InitializationStatus())
|
||||
|
||||
vertex, neighbors, event_type = processor.transform_entity(event)
|
||||
vertex, neighbors, event_type = processor.transformer_manager\
|
||||
.transform(event)
|
||||
processor.create_entity(vertex, neighbors)
|
||||
|
||||
return vertex, neighbors, processor
|
||||
|
Loading…
x
Reference in New Issue
Block a user