Integration of Synchronizer + Processor + Transformer

Implements: blueprint vitrage-resource-processor

Change-Id: I922e360133c43e2c2957fd3ed3547ca5cc8fb5a8
This commit is contained in:
Alexey Weyl 2015-12-30 08:04:39 +02:00
parent 26a20ded95
commit f70ca274b9
11 changed files with 100 additions and 92 deletions

View File

@ -17,18 +17,18 @@ class VertexProperties(object):
TYPE = 'TYPE'
SUB_TYPE = 'SUB_TYPE'
ID = 'ID'
IS_VERTEX_DELETED = 'IS_VERTEX_DELETED'
IS_DELETED = 'IS_DELETED'
VERTEX_DELETION_TIMESTAMP = 'VERTEX_DELETION_TIMESTAMP'
STATE = 'STATE'
PROJECT = 'PROJECT'
UPDATE_TIMESTAMP = 'UPDATE_TIMESTAMP'
NAME = 'NAME'
IS_PARTIAL_DATA = 'IS_PARTIAL_DATA'
IS_PLACEHOLDER = 'IS_PLACEHOLDER'
class EdgeProperties(object):
RELATION_NAME = 'RELATION_NAME'
IS_EDGE_DELETED = 'IS_EDGE_DELETED'
IS_DELETED = 'IS_DELETED'
EDGE_DELETION_TIMESTAMP = 'EDGE_DELETION_TIMESTAMP'

View File

@ -34,14 +34,14 @@ class EntityGraphManager(object):
self.graph = create_graph('entity graph')
self.transformer = transformer_manager.TransformerManager()
def is_partial_data_vertex(self, vertex):
"""Check if a vertex is a partial data vertex
def can_vertex_be_deleted(self, vertex):
"""Check if the vertex can be deleted
Vertex is a partial data vertex if it's IS_PARTIAL_DATA property is
True and if it has no neighbors
Vertex can be deleted if it's IS_PLACEHOLDER property is
True and if it has no neighbors that aren't marked deleted
"""
if not vertex.properties[VertexProperties.IS_PARTIAL_DATA]:
if not vertex.properties[VertexProperties.IS_PLACEHOLDER]:
return False
# check that vertex has no neighbors
@ -51,25 +51,25 @@ class EntityGraphManager(object):
return not any(True for neighbor_edge in neighbor_edges
if not self.is_edge_deleted(neighbor_edge))
def delete_partial_data_vertex(self, suspected_vertex):
"""Checks if it is a partial data vertex, and if so deletes it """
def delete_placeholder_vertex(self, suspected_vertex):
"""Checks if it is a placeholder vertex, and if so deletes it """
if self.is_partial_data_vertex(suspected_vertex):
LOG.debug("Delete partial data vertex: %s", suspected_vertex)
if self.can_vertex_be_deleted(suspected_vertex):
LOG.debug("Delete placeholder vertex: %s", suspected_vertex)
self.graph.remove_vertex(suspected_vertex)
def is_vertex_deleted(self, vertex):
return vertex.properties.get(
VertexProperties.IS_VERTEX_DELETED, False)
VertexProperties.IS_DELETED, False)
def is_edge_deleted(self, edge):
return edge.properties.get(
EdgeProperties.IS_EDGE_DELETED, False)
EdgeProperties.IS_DELETED, False)
def mark_vertex_as_deleted(self, vertex):
"""Marks the vertex as is deleted, and updates deletion timestamp"""
vertex.properties[VertexProperties.IS_VERTEX_DELETED] = True
vertex.properties[VertexProperties.IS_DELETED] = True
vertex.properties[VertexProperties.VERTEX_DELETION_TIMESTAMP] = \
datetime.datetime.now()
self.graph.update_vertex(vertex)
@ -77,7 +77,7 @@ class EntityGraphManager(object):
def mark_edge_as_deleted(self, edge):
"""Marks the edge as is deleted, and updates delete timestamp"""
edge.properties[EdgeProperties.IS_EDGE_DELETED] = True
edge.properties[EdgeProperties.IS_DELETED] = True
edge.properties[EdgeProperties.EDGE_DELETION_TIMESTAMP] = \
datetime.datetime.now()
self.graph.update_edge(edge)
@ -112,8 +112,8 @@ class EntityGraphManager(object):
def check_timestamp(self, curr_vertex, new_vertex):
is_timestamp_property_exist = VertexProperties.UPDATE_TIMESTAMP \
not in curr_vertex.properties.keys()
if is_timestamp_property_exist:
in curr_vertex.properties.keys()
if not is_timestamp_property_exist:
return True
current_time = parser.parse(curr_vertex.properties[
@ -121,3 +121,8 @@ class EntityGraphManager(object):
new_time = parser.parse(new_vertex.properties[
VertexProperties.UPDATE_TIMESTAMP])
return current_time <= new_time
def can_update_vertex(self, curr_vertex, new_vertex):
return (not curr_vertex) or \
(not (not curr_vertex.properties[VertexProperties.IS_PLACEHOLDER]
and new_vertex.properties[VertexProperties.IS_PLACEHOLDER]))

View File

@ -121,9 +121,9 @@ class Processor(processor.ProcessorBase):
for edge in neighbor_edges:
self.e_g_manager.mark_edge_as_deleted(edge)
# delete partial data vertices that connected only to this vertex
# delete placeholder vertices that connected only to this vertex
for vertex in neighbor_vertices:
self.e_g_manager.delete_partial_data_vertex(vertex)
self.e_g_manager.delete_placeholder_vertex(vertex)
# delete vertex
self.e_g_manager.mark_vertex_as_deleted(deleted_vertex)
@ -152,11 +152,15 @@ class Processor(processor.ProcessorBase):
LOG.debug("Connect neighbors. Neighbors: %s, valid_edges: %s",
neighbors, valid_edges)
for (vertex, edge) in neighbors:
if not valid_edges or not \
curr_vertex = self.e_g_manager.graph.get_vertex(vertex.vertex_id)
if (not curr_vertex) or self.e_g_manager.check_update_validation(
curr_vertex, vertex):
if self.e_g_manager.can_update_vertex(curr_vertex, vertex):
self.e_g_manager.graph.update_vertex(vertex)
if not valid_edges or not \
self.e_g_manager.is_edge_exist_in_list(edge, valid_edges):
# connect entity with neighbor
self.e_g_manager.graph.update_vertex(vertex)
self.e_g_manager.graph.update_edge(edge)
self.e_g_manager.graph.update_edge(edge)
def _delete_old_connections(self, vertex, old_edges):
"""Deletes the "vertex" old connections
@ -167,12 +171,12 @@ class Processor(processor.ProcessorBase):
LOG.debug("Delete old connections. Vertex: %s, old edges: %s",
vertex, old_edges)
# remove old edges and partial data vertices if exist
# remove old edges and placeholder vertices if exist
for edge in old_edges:
self.e_g_manager.mark_edge_as_deleted(edge)
curr_ver = graph_utils.get_neighbor_vertex(
edge, vertex, self.e_g_manager.graph)
self.e_g_manager.delete_partial_data_vertex(curr_ver)
self.e_g_manager.delete_placeholder_vertex(curr_ver)
def _find_edges_status(self, vertex, neighbors):
"""Finds "vertex" valid and old connections

View File

@ -54,7 +54,7 @@ class InstanceTransformer(base.Transformer):
Entity event is received from synchronizer it need to be
transformed into entity wrapper. The wrapper contains:
1. Entity Vertex - The vertex itself with all fields
2. Neighbor list - neighbor vertex with partial data and an edge
2. Neighbor list - neighbor placeholder vertex and an edge
3. Action type - CREATE/UPDATE/DELETE
:param entity_event: a general event from the synchronizer
@ -68,8 +68,7 @@ class InstanceTransformer(base.Transformer):
entity_key = self.extract_key(entity_event)
metadata = {
cons.VertexProperties.NAME: entity_event[self.INSTANCE_NAME],
cons.VertexProperties.IS_PARTIAL_DATA: False
cons.VertexProperties.NAME: entity_event[self.INSTANCE_NAME]
}
entity_vertex = graph_utils.create_vertex(
@ -80,6 +79,7 @@ class InstanceTransformer(base.Transformer):
entity_project=entity_event[self.PROJECT_ID],
entity_state=entity_event[self.SNAPSHOT_INSTANCE_STATE],
update_timestamp=entity_event[self.SNAPSHOT_TIMESTAMP],
is_placeholder=False,
metadata=metadata
)
@ -132,7 +132,7 @@ class InstanceTransformer(base.Transformer):
@staticmethod
def create_host_neighbor(vertex_id, host_name):
host_vertex = HostTransformer.create_partial_vertex(host_name)
host_vertex = HostTransformer.create_placeholder_vertex(host_name)
relation_edge = graph_utils.create_edge(
source_id=host_vertex.vertex_id,
@ -142,27 +142,23 @@ class InstanceTransformer(base.Transformer):
return base.Neighbor(host_vertex, relation_edge)
@staticmethod
def create_partial_vertex(instance_id):
def create_placeholder_vertex(instance_id):
"""Creates Vertex with partial data.
"""Creates placeholder vertex.
Vertex with partial data contains only mandatory fields
Placeholder vertex contains only mandatory fields
:param instance_id: The instance ID
:return: Vertex with partial data
:return: Placeholder vertex
:rtype: Vertex
"""
metadata = {
cons.VertexProperties.IS_PARTIAL_DATA: True
}
return graph_utils.create_vertex(
InstanceTransformer.build_instance_key(instance_id),
entity_id=instance_id,
entity_type=cons.EntityTypes.RESOURCE,
entity_subtype=INSTANCE_SUBTYPE,
metadata=metadata
is_placeholder=True
)
@ -192,16 +188,11 @@ class HostTransformer(base.Transformer):
host_name])
@staticmethod
def create_partial_vertex(host_name):
metadata = {
cons.VertexProperties.IS_PARTIAL_DATA: True
}
def create_placeholder_vertex(host_name):
return graph_utils.create_vertex(
HostTransformer.build_host_key(host_name),
entity_id=host_name,
entity_type=cons.EntityTypes.RESOURCE,
entity_subtype=HOST_SUBTYPE,
metadata=metadata
is_placeholder=True
)

View File

@ -15,7 +15,6 @@
from vitrage.common.constants import EdgeProperties as EConst
from vitrage.common.constants import VertexProperties as VConst
from vitrage.graph import Edge
from vitrage.graph import Vertex
def create_vertex(vertex_id,
@ -27,6 +26,7 @@ def create_vertex(vertex_id,
is_deleted=False,
deletion_timestamp=None,
update_timestamp=None,
is_placeholder=False,
metadata=None):
"""A builder to create a vertex
@ -43,13 +43,15 @@ def create_vertex(vertex_id,
:param entity_state:
:type entity_state: str
:param is_deleted:
:type is_deleted: str
:type is_deleted: boolean
:param deletion_timestamp:
:type deletion_timestamp: str
:param update_timestamp:
:type update_timestamp: str
:param metadata:
:type metadata: dict
:param is_placeholder:
:type is_placeholder: boolean
:return:
:rtype: Vertex
"""
@ -61,8 +63,9 @@ def create_vertex(vertex_id,
VConst.STATE: entity_state,
VConst.SUB_TYPE: entity_subtype,
VConst.TYPE: entity_type,
VConst.IS_VERTEX_DELETED: is_deleted,
VConst.UPDATE_TIMESTAMP: update_timestamp
VConst.IS_DELETED: is_deleted,
VConst.UPDATE_TIMESTAMP: update_timestamp,
VConst.IS_PLACEHOLDER: is_placeholder
}
if metadata:
properties.update(metadata)
@ -72,6 +75,9 @@ def create_vertex(vertex_id,
return vertex
from vitrage.graph import Vertex
def create_edge(source_id,
target_id,
relation_type,
@ -97,7 +103,7 @@ def create_edge(source_id,
"""
properties = {
EConst.EDGE_DELETION_TIMESTAMP: deletion_timestamp,
EConst.IS_EDGE_DELETED: is_deleted,
EConst.IS_DELETED: is_deleted,
EConst.RELATION_NAME: relation_type,
}
if metadata:

View File

@ -266,12 +266,12 @@ class GraphTest(base.BaseTest):
# Changing the referenced item
updated_e = e
updated_e[EConst.IS_EDGE_DELETED] = 'KUKU'
updated_e[EConst.IS_DELETED] = 'KUKU'
updated_e[EConst.EDGE_DELETION_TIMESTAMP] = 'CHANGED'
# Get it again
e = g.get_edge(v_node.vertex_id, v_host.vertex_id, label)
self.assertEqual(False, e.get(EConst.IS_EDGE_DELETED, None),
self.assertEqual(False, e.get(EConst.IS_DELETED, None),
'Change should not affect graph item')
self.assertEqual(e_node_to_host[EConst.EDGE_DELETION_TIMESTAMP],
e[EConst.EDGE_DELETION_TIMESTAMP],
@ -280,8 +280,8 @@ class GraphTest(base.BaseTest):
g.update_edge(updated_e)
# Get it again
e = g.get_edge(v_node.vertex_id, v_host.vertex_id, label)
self.assertEqual(updated_e[EConst.IS_EDGE_DELETED],
e[EConst.IS_EDGE_DELETED],
self.assertEqual(updated_e[EConst.IS_DELETED],
e[EConst.IS_DELETED],
'Graph item should change after update')
self.assertEqual(updated_e[EConst.EDGE_DELETION_TIMESTAMP],
e[EConst.EDGE_DELETION_TIMESTAMP],

View File

@ -25,14 +25,15 @@ class BaseProcessor(base.BaseTest):
self.transform = transformer_manager.TransformerManager()
def _update_vertex_to_graph(self, e_g_manager, type, sub_type, id,
is_deleted, is_partial_data, additional_prop):
is_deleted, is_placeholder_data,
additional_prop):
# create vertex properties
prop = {key: value for key, value in additional_prop.iteritems()}
prop[VertexProperties.TYPE] = type
prop[VertexProperties.SUB_TYPE] = sub_type
prop[VertexProperties.ID] = id
prop[VertexProperties.IS_VERTEX_DELETED] = is_deleted
prop[VertexProperties.IS_PARTIAL_DATA] = is_partial_data
prop[VertexProperties.IS_DELETED] = is_deleted
prop[VertexProperties.IS_PLACEHOLDER] = is_placeholder_data
# TODO(Alexey): change back to original method
# vertex_id = self.transform.get_key(prop)

View File

@ -22,7 +22,7 @@ class TestEntityGraphManager(base.BaseProcessor):
def setUp(self):
super(TestEntityGraphManager, self).setUp()
def test_is_partial_data_vertex(self):
def test_can_vertex_be_deleted(self):
e_g_manager = entity_graph_manager.EntityGraphManager()
# create vertex properties
@ -30,10 +30,10 @@ class TestEntityGraphManager(base.BaseProcessor):
'RESOURCE', 'INSTANCE',
'123', False, True, {})
# check is partial data vertex
is_partial_data_vertex = \
e_g_manager.is_partial_data_vertex(instance_vertex)
self.assertTrue(is_partial_data_vertex)
# check is placeholder vertex
is_placeholder_vertex = \
e_g_manager.can_vertex_be_deleted(instance_vertex)
self.assertTrue(is_placeholder_vertex)
# add host vertex
host_vertex = self._update_vertex_to_graph(e_g_manager, 'RESOURCE',
@ -43,21 +43,21 @@ class TestEntityGraphManager(base.BaseProcessor):
instance_vertex.vertex_id,
'contains')
# check is partial data vertex
is_partial_data_vertex = \
e_g_manager.is_partial_data_vertex(instance_vertex)
self.assertFalse(is_partial_data_vertex)
# check is placeholder vertex
is_placeholder_vertex = \
e_g_manager.can_vertex_be_deleted(instance_vertex)
self.assertFalse(is_placeholder_vertex)
# change host to is_deleted
e_g_manager.mark_vertex_as_deleted(host_vertex)
e_g_manager.mark_edge_as_deleted(edge)
# check is partial data vertex
is_partial_data_vertex = \
e_g_manager.is_partial_data_vertex(instance_vertex)
self.assertTrue(is_partial_data_vertex)
# check is placeholder vertex
is_placeholder_vertex = \
e_g_manager.can_vertex_be_deleted(instance_vertex)
self.assertTrue(is_placeholder_vertex)
def test_is_not_partial_data_vertex(self):
def test_is_not_can_vertex_be_deleted(self):
e_g_manager = entity_graph_manager.EntityGraphManager()
# create vertex properties
@ -66,11 +66,11 @@ class TestEntityGraphManager(base.BaseProcessor):
'INSTANCE', '12345',
False, False, prop)
# check is not partial data vertex
is_partial_data_vertex = e_g_manager.is_partial_data_vertex(vertex)
self.assertFalse(is_partial_data_vertex)
# check is not placeholder vertex
is_placeholder_vertex = e_g_manager.can_vertex_be_deleted(vertex)
self.assertFalse(is_placeholder_vertex)
def test_delete_partial_data_vertex(self):
def test_delete_placeholder_vertex(self):
e_g_manager = entity_graph_manager.EntityGraphManager()
# create vertex properties
@ -78,12 +78,12 @@ class TestEntityGraphManager(base.BaseProcessor):
'INSTANCE', '12345',
False, True, {})
# check is partial data vertex
is_partial_data_vertex = e_g_manager.is_partial_data_vertex(vertex)
self.assertTrue(is_partial_data_vertex)
# check is placeholder vertex
is_placeholder_vertex = e_g_manager.can_vertex_be_deleted(vertex)
self.assertTrue(is_placeholder_vertex)
# deal with partial data vertex - mark it as deleted
e_g_manager.delete_partial_data_vertex(vertex)
# deal with placeholder vertex - mark it as deleted
e_g_manager.delete_placeholder_vertex(vertex)
vertex = e_g_manager.graph.get_vertex(vertex.vertex_id)
self.assertTrue(not vertex)

View File

@ -82,7 +82,7 @@ class TestProcessor(base.BaseTest):
self._check_graph(processor, self.NUM_VERTICES_AFTER_DELETION,
self.NUM_EDGES_AFTER_DELETION)
def test_create_entity_with_partial_data_neighbor(self):
def test_create_entity_with_placeholder_neighbor(self):
# create instance event with host neighbor and check validity
self._create_and_check_entity()
@ -99,7 +99,7 @@ class TestProcessor(base.BaseTest):
# update instance event with state running
vertex.properties[VertexProperties.STATE] = 'RUNNING'
vertex.properties[VertexProperties.UPDATE_TIMESTAMP] = \
datetime.utcnow().replace(tzinfo=pytz.utc).__str__()
str(datetime.utcnow().replace(tzinfo=pytz.utc))
processor.update_entity(vertex, neighbors)
# check state

View File

@ -67,8 +67,8 @@ class NovaInstanceTransformerTest(base.BaseTest):
event[transformer.HOST_NAME])
def _validate_host_neighbor(self, h_neighbor, host_name):
expected_neighbor = nt.HostTransformer.create_partial_vertex(host_name)
expected_neighbor = nt.HostTransformer.\
create_placeholder_vertex(host_name)
self.assertEqual(expected_neighbor, h_neighbor.vertex)
def _validate_snapshot_vertex_props(self, vertex, event):
@ -107,8 +107,8 @@ class NovaInstanceTransformerTest(base.BaseTest):
observed_name = vertex.get(cons.VertexProperties.NAME)
self.assertEqual(expected_name, observed_name)
is_partial = vertex.get(cons.VertexProperties.IS_PARTIAL_DATA)
self.assertEqual(False, is_partial)
is_placeholder = vertex.get(cons.VertexProperties.IS_PLACEHOLDER)
self.assertEqual(False, is_placeholder)
def test_key_fields(self):
LOG.debug('Test get key fields from nova instance transformer')
@ -178,12 +178,13 @@ class NovaInstanceTransformerTest(base.BaseTest):
self.assertEqual(vertex_id, neighbor.edge.target_id)
self.assertEqual(cons.EdgeLabels.CONTAINS, neighbor.edge.label)
def test_create_partial_vertex(self):
LOG.debug('Test create partial vertex')
def test_create_placeholder_vertex(self):
LOG.debug('Test create placeholder vertex')
instance_id = '123456'
vertex_id = nt.InstanceTransformer.build_instance_key(instance_id)
p_vertex = nt.InstanceTransformer.create_partial_vertex(instance_id)
p_vertex = nt.InstanceTransformer.\
create_placeholder_vertex(instance_id)
self.assertEqual(vertex_id, p_vertex.vertex_id)
self.assertEqual(5, p_vertex.properties.keys().__len__())
@ -195,4 +196,4 @@ class NovaInstanceTransformerTest(base.BaseTest):
p_vertex.get(cons.VertexProperties.SUB_TYPE))
self.assertEqual(True,
p_vertex.get(
cons.VertexProperties.IS_PARTIAL_DATA))
cons.VertexProperties.IS_PLACEHOLDER))

View File

@ -25,7 +25,7 @@ LOG = logging.getLogger(__name__)
def create_vertex(entity_id, entity_type, entity_subtype=None):
"""returns vertex with partial data"""
"""returns placeholder vertex"""
vertex_id = base_transformer.Transformer.KEY_SEPARATOR.join(
[entity_type, entity_subtype, entity_id])