vitrage processor code review and documentation
Implements: blueprint vitrage-resource-processor Change-Id: If017664f342227260b39dc284e5c7b3131c7472a
This commit is contained in:
parent
5d807bf05c
commit
e91651e5e5
@ -34,6 +34,12 @@ class EntityGraphManager(object):
|
||||
self.transformer = transformer_manager.TransformerManager()
|
||||
|
||||
def is_partial_data_vertex(self, vertex):
|
||||
"""Check if a vertex is a partial data vertex
|
||||
|
||||
Checks if the vertex was updated only because it's a neighbor of a
|
||||
full data vertex
|
||||
"""
|
||||
|
||||
# check that vertex has no neighbors
|
||||
neighbor_edges = self.graph.get_edges(vertex.vertex_id,
|
||||
direction=Direction.BOTH)
|
||||
@ -51,6 +57,8 @@ class EntityGraphManager(object):
|
||||
if prop not in key_properties)
|
||||
|
||||
def delete_partial_data_vertex(self, suspected_vertex):
|
||||
"""Checks if it is a partial data vertex, and if so deletes it """
|
||||
|
||||
if self.is_partial_data_vertex(suspected_vertex):
|
||||
LOG.debug("Delete partial data vertex: %s", suspected_vertex)
|
||||
self.graph.remove_vertex(suspected_vertex)
|
||||
@ -64,18 +72,24 @@ class EntityGraphManager(object):
|
||||
EdgeProperties.IS_EDGE_DELETED, False)
|
||||
|
||||
def mark_vertex_as_deleted(self, vertex):
|
||||
"""Marks the vertex as is deleted, and updates deletion timestamp"""
|
||||
|
||||
vertex.properties[VertexProperties.IS_VERTEX_DELETED] = True
|
||||
vertex.properties[VertexProperties.VERTEX_DELETION_TIMESTAMP] = \
|
||||
datetime.datetime.now()
|
||||
self.graph.update_vertex(vertex)
|
||||
|
||||
def mark_edge_as_deleted(self, edge):
|
||||
"""Marks the edge as is deleted, and updates delete timestamp"""
|
||||
|
||||
edge.properties[EdgeProperties.IS_EDGE_DELETED] = True
|
||||
edge.properties[EdgeProperties.EDGE_DELETION_TIMESTAMP] = \
|
||||
datetime.datetime.now()
|
||||
self.graph.update_edge(edge)
|
||||
|
||||
def find_neighbor_types(self, neighbors):
|
||||
"""Finds all the types (TYPE, SUB_TYPE) of the neighbors """
|
||||
|
||||
neighbor_types = set()
|
||||
for (vertex, edge) in neighbors:
|
||||
neighbor_types.add(self.get_vertex_type(vertex))
|
||||
@ -87,7 +101,14 @@ class EntityGraphManager(object):
|
||||
return (type, sub_type)
|
||||
|
||||
def check_update_validation(self, curr_vertex, updated_vertex):
|
||||
return not self.is_vertex_deleted(curr_vertex) and \
|
||||
"""Checks current and updated validation
|
||||
|
||||
Check 2 conditions:
|
||||
1. is the vertex not deleted
|
||||
2. is updated timestamp bigger then current timestamp
|
||||
"""
|
||||
|
||||
return (not self.is_vertex_deleted(curr_vertex)) and \
|
||||
self.check_timestamp(curr_vertex, updated_vertex)
|
||||
|
||||
def is_edge_exist_in_list(self, edge, edges_list):
|
||||
|
@ -38,30 +38,58 @@ class Processor(processor.ProcessorBase):
|
||||
self._initialize_events_actions()
|
||||
|
||||
def process_event(self, event):
|
||||
"""Decides which action to run on given event
|
||||
|
||||
Transforms the event into a tupple (vertex, neighbors,action).
|
||||
After transforming, it runs the correct action according to the
|
||||
action received from the transformer.
|
||||
|
||||
:param event: The event to be processed
|
||||
:type event: Dictionary
|
||||
"""
|
||||
|
||||
entity = self.transform_entity(event)
|
||||
if entity.action not in self.events:
|
||||
if entity.action not in self.actions:
|
||||
LOG.info("error event: %s", event)
|
||||
return None
|
||||
|
||||
return self.events[entity.action](entity.vertex, entity.neighbors)
|
||||
return self.actions[entity.action](entity.vertex, entity.neighbors)
|
||||
|
||||
def create_entity(self, new_vertex, neighbors):
|
||||
"""Adds new vertex to the entity graph
|
||||
|
||||
Adds the entity to the entity graph, and connects it's neighbors
|
||||
|
||||
:param new_vertex: The new vertex to add to graph
|
||||
:type new_vertex: Vertex
|
||||
|
||||
:param neighbors: The neighbors of the new vertex
|
||||
:type neighbors: List
|
||||
"""
|
||||
|
||||
LOG.debug("Add entity to entity graph: %s", new_vertex)
|
||||
|
||||
# add the entity
|
||||
self.e_g_manager.graph.add_vertex(new_vertex)
|
||||
|
||||
# add the connecting entities
|
||||
self._connect_neighbors(new_vertex, neighbors, None)
|
||||
self._connect_neighbors(neighbors, None)
|
||||
|
||||
def update_entity(self, updated_vertex, neighbors):
|
||||
"""Updates the vertex in the entity graph
|
||||
|
||||
Updates the in entity in the entity graph. In addition it removes old
|
||||
neighbor connections, and connects the new neighbors.
|
||||
|
||||
:param updated_vertex: The vertex to be updated in the graph
|
||||
:type updated_vertex: Vertex
|
||||
|
||||
:param neighbors: The neighbors of the updated vertex
|
||||
:type neighbors: List
|
||||
"""
|
||||
|
||||
LOG.debug("Update entity in entity graph: %s", updated_vertex)
|
||||
|
||||
# update the entity
|
||||
curr_vertex = \
|
||||
self.e_g_manager.graph.get_vertex(updated_vertex.vertex_id)
|
||||
|
||||
if not curr_vertex or self.e_g_manager.check_update_validation(
|
||||
if (not curr_vertex) or self.e_g_manager.check_update_validation(
|
||||
curr_vertex, updated_vertex):
|
||||
self.e_g_manager.graph.update_vertex(updated_vertex)
|
||||
# add the connecting entities
|
||||
@ -71,25 +99,35 @@ class Processor(processor.ProcessorBase):
|
||||
updated_vertex)
|
||||
|
||||
def delete_entity(self, deleted_vertex, neighbors):
|
||||
"""Deletes the vertex from the entity graph
|
||||
|
||||
Marks the corresponding vertex and its edges as deleted
|
||||
|
||||
:param deleted_vertex: The vertex to be deleted from the graph
|
||||
:type deleted_vertex: Vertex
|
||||
|
||||
:param neighbors: The neighbors of the deleted vertex
|
||||
:type neighbors: List
|
||||
"""
|
||||
|
||||
LOG.debug("Delete entity from entity graph: %s", deleted_vertex)
|
||||
|
||||
# update the entity
|
||||
curr_vertex = \
|
||||
self.e_g_manager.graph.get_vertex(deleted_vertex.vertex_id)
|
||||
|
||||
if self.e_g_manager.check_update_validation(
|
||||
if (not curr_vertex) or self.e_g_manager.check_update_validation(
|
||||
curr_vertex, deleted_vertex):
|
||||
n_vertices = self.e_g_manager.graph.neighbors(
|
||||
neighbor_vertices = self.e_g_manager.graph.neighbors(
|
||||
deleted_vertex.vertex_id, direction=Direction.BOTH)
|
||||
n_edges = self.e_g_manager.graph.get_edges(
|
||||
neighbor_edges = self.e_g_manager.graph.get_edges(
|
||||
deleted_vertex.vertex_id, direction=Direction.BOTH)
|
||||
|
||||
# delete connected edges
|
||||
for edge in n_edges:
|
||||
for edge in neighbor_edges:
|
||||
self.e_g_manager.mark_edge_as_deleted(edge)
|
||||
|
||||
# delete partial data vertices that connected only to this vertex
|
||||
for vertex in n_vertices:
|
||||
for vertex in neighbor_vertices:
|
||||
self.e_g_manager.delete_partial_data_vertex(vertex)
|
||||
|
||||
# delete vertex
|
||||
@ -137,17 +175,23 @@ class Processor(processor.ProcessorBase):
|
||||
|
||||
return base.EntityWrapper(vertex, neighbors, event_type)
|
||||
|
||||
def _update_neighbors(self, updated_vertex, neighbors):
|
||||
def _update_neighbors(self, vertex, neighbors):
|
||||
"""Updates vertices neighbor connections
|
||||
|
||||
1. Removes old neighbor connections
|
||||
2. connects the new neighbors.
|
||||
"""
|
||||
|
||||
(valid_edges, old_edges) = self._find_edges_status(
|
||||
updated_vertex, neighbors)
|
||||
vertex, neighbors)
|
||||
self._delete_old_connections(vertex, old_edges)
|
||||
self._connect_neighbors(neighbors, valid_edges)
|
||||
|
||||
# delete old unnecessary neighbors
|
||||
self._delete_old_connections(updated_vertex, old_edges)
|
||||
def _connect_neighbors(self, neighbors, valid_edges):
|
||||
"""Updates the neighbor vertex and adds the connection edges """
|
||||
|
||||
# connect new neighbors
|
||||
self._connect_neighbors(updated_vertex, neighbors, valid_edges)
|
||||
|
||||
def _connect_neighbors(self, updated_vertex, neighbors, valid_edges):
|
||||
LOG.debug("Connect neighbors. Neighbors: %s, valid_edges: %s",
|
||||
neighbors, valid_edges)
|
||||
for (vertex, edge) in neighbors:
|
||||
if not valid_edges or not \
|
||||
self.e_g_manager.is_edge_exist_in_list(edge, valid_edges):
|
||||
@ -155,15 +199,30 @@ class Processor(processor.ProcessorBase):
|
||||
self.e_g_manager.graph.update_vertex(vertex)
|
||||
self.e_g_manager.graph.update_edge(edge)
|
||||
|
||||
def _delete_old_connections(self, updated_vertex, old_edges):
|
||||
def _delete_old_connections(self, vertex, old_edges):
|
||||
"""Deletes the "vertex" old connections
|
||||
|
||||
Finds the old connections that are connected to updated_vertex,
|
||||
and marks them as deleted
|
||||
"""
|
||||
|
||||
LOG.debug("Delete old connections. Vertex: %s, old edges: %s",
|
||||
vertex, old_edges)
|
||||
# remove old edges and partial data vertices if exist
|
||||
for edge in old_edges:
|
||||
self.e_g_manager.mark_edge_as_deleted(edge)
|
||||
curr_ver = graph_utils.get_neighbor_vertex(
|
||||
edge, updated_vertex, self.e_g_manager.graph)
|
||||
edge, vertex, self.e_g_manager.graph)
|
||||
self.e_g_manager.delete_partial_data_vertex(curr_ver)
|
||||
|
||||
def _find_edges_status(self, updated_vertex, neighbors):
|
||||
def _find_edges_status(self, vertex, neighbors):
|
||||
"""Finds "vertex" valid and old connections
|
||||
|
||||
Checks all the edges that are connected to the vertex in the entity
|
||||
graph, and finds which of them are old connections (edges that are no
|
||||
longer connected to those entities), and which are valid connections.
|
||||
"""
|
||||
|
||||
valid_edges = []
|
||||
old_edges = []
|
||||
|
||||
@ -173,11 +232,11 @@ class Processor(processor.ProcessorBase):
|
||||
|
||||
# iterate over current neighbor edges and check existence in new list
|
||||
for curr_edge in self.e_g_manager.graph.get_edges(
|
||||
updated_vertex.vertex_id, direction=Direction.BOTH):
|
||||
vertex.vertex_id, direction=Direction.BOTH):
|
||||
# check if the edge in the graph has a a connection to the
|
||||
# same type of resources in the new neighbors list
|
||||
neighbor_vertex = graph_utils.get_neighbor_vertex(
|
||||
curr_edge, updated_vertex, self.e_g_manager.graph)
|
||||
curr_edge, vertex, self.e_g_manager.graph)
|
||||
is_connection_type_exist = self.e_g_manager.get_vertex_type(
|
||||
neighbor_vertex) in graph_neighbor_types
|
||||
|
||||
@ -197,7 +256,7 @@ class Processor(processor.ProcessorBase):
|
||||
return (valid_edges, old_edges)
|
||||
|
||||
def _initialize_events_actions(self):
|
||||
self.events = {}
|
||||
self.events[EventAction.CREATE] = self.create_entity
|
||||
self.events[EventAction.UPDATE] = self.update_entity
|
||||
self.events[EventAction.DELETE] = self.delete_entity
|
||||
self.actions = {}
|
||||
self.actions[EventAction.CREATE] = self.create_entity
|
||||
self.actions[EventAction.UPDATE] = self.update_entity
|
||||
self.actions[EventAction.DELETE] = self.delete_entity
|
||||
|
Loading…
x
Reference in New Issue
Block a user