Integration of Synchronizer + Processor + Transformer

Implements: blueprint vitrage-resource-processor

Change-Id: Ice972f21eb903ef741f1969ae2bc7c42c1581542
This commit is contained in:
Alexey Weyl 2015-12-29 11:21:04 +02:00
parent dc35ae3e68
commit 26a20ded95
12 changed files with 95 additions and 122 deletions

View File

@ -4,10 +4,12 @@
pbr>=1.6
Babel>=1.3
python-dateutil>=2.4.2
python-novaclient>=2.26.0
networkx>=1.10
oslo.log>=1.12.0 # Apache-2.0
oslo.policy>=0.3.0
oslo.service>=0.1.0 # Apache-2.0
pecan>=0.8.0
PasteDeploy>=1.5.0
Werkzeug>=0.7

View File

@ -24,6 +24,7 @@ setup-hooks =
[entry_points]
console_scripts =
vitrage-api = vitrage.cmd.api:main
vitrage-entity-graph = vitrage.cmd.graph:entity_graph
oslo.config.opts =
vitrage = vitrage.opts:list_opts

View File

@ -3,6 +3,7 @@
# process, which may cause wedges in the gate later.
hacking<0.11,>=0.10.0
python-dateutil>=2.4.2
coverage>=3.6
discover
networkx>=1.10
@ -12,6 +13,7 @@ sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2
oslo.log>=1.12.0 # Apache-2.0
oslosphinx>=2.5.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
oslo.service>=0.1.0 # Apache-2.0
testrepository>=0.0.18
testscenarios>=0.4
testtools>=1.4.0

View File

@ -12,19 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from oslo_service import service as os_service
from vitrage import entity_graph as entity_graph_svc
from vitrage import service
LOG = log.getLogger(__name__)
class SynchronizerClient(object):
def __init__(self):
pass
def get_all(self):
pass
def get_all_entity(self, entity_name):
pass
def entity_graph():
conf = service.prepare_service()
os_service.launch(conf,
entity_graph_svc.VitrageEntityGraphService(conf)).wait()

View File

@ -12,4 +12,28 @@
# License for the specific language governing permissions and limitations
# under the License.
__author__ = 'stack'
from oslo_log import log
# from oslo_service import service as os_service
LOG = log.getLogger(__name__)
# class VitrageEntityGraphService(os_service.Service):
class VitrageEntityGraphService(object):
def __init__(self):
super(VitrageEntityGraphService, self).__init__()
def start(self):
LOG.info("Start ProcessorService")
super(VitrageEntityGraphService, self).start()
LOG.info("Finish start ProcessorService")
def stop(self):
LOG.info("Stop ProcessorService")
super(VitrageEntityGraphService, self).stop()
LOG.info("Finish stop ProcessorService")

View File

@ -13,6 +13,7 @@
# under the License.
import datetime
from dateutil import parser
from oslo_log import log
@ -36,25 +37,19 @@ class EntityGraphManager(object):
def is_partial_data_vertex(self, vertex):
"""Check if a vertex is a partial data vertex
Checks if the vertex was updated only because it's a neighbor of a
full data vertex
Vertex is a partial data vertex if it's IS_PARTIAL_DATA property is
True and if it has no neighbors
"""
if not vertex.properties[VertexProperties.IS_PARTIAL_DATA]:
return False
# check that vertex has no neighbors
neighbor_edges = self.graph.get_edges(vertex.vertex_id,
direction=Direction.BOTH)
for neighbor_edge in neighbor_edges:
if not self.is_edge_deleted(neighbor_edge):
return False
# check properties
# TODO(Alexey): implement get_vertex_essential_properties
# key_properties = self.transformer.key_fields(vertex)
key_properties = [VertexProperties.TYPE, VertexProperties.SUB_TYPE,
VertexProperties.ID]
return not any(True for prop in vertex.properties
if prop not in key_properties)
return not any(True for neighbor_edge in neighbor_edges
if not self.is_edge_deleted(neighbor_edge))
def delete_partial_data_vertex(self, suspected_vertex):
"""Checks if it is a partial data vertex, and if so deletes it """
@ -118,6 +113,11 @@ class EntityGraphManager(object):
def check_timestamp(self, curr_vertex, new_vertex):
is_timestamp_property_exist = VertexProperties.UPDATE_TIMESTAMP \
not in curr_vertex.properties.keys()
is_old = curr_vertex.properties[VertexProperties.UPDATE_TIMESTAMP] <= \
new_vertex.properties[VertexProperties.UPDATE_TIMESTAMP]
return is_timestamp_property_exist or is_old
if is_timestamp_property_exist:
return True
current_time = parser.parse(curr_vertex.properties[
VertexProperties.UPDATE_TIMESTAMP])
new_time = parser.parse(new_vertex.properties[
VertexProperties.UPDATE_TIMESTAMP])
return current_time <= new_time

View File

@ -12,16 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from oslo_log import log
from vitrage.common.constants import EventAction
from vitrage.common.constants import SyncMode
from vitrage.common.constants import VertexProperties
from vitrage.entity_graph.processor import base as processor
from vitrage.entity_graph.processor import entity_graph_manager
from vitrage.entity_graph.transformer import base
from vitrage.entity_graph.transformer import transformer_manager
from vitrage.graph import Direction
from vitrage.graph import utils as graph_utils
@ -137,43 +132,7 @@ class Processor(processor.ProcessorBase):
deleted_vertex)
def transform_entity(self, event):
# TODO(Alexey): change back to the original call
# return self.transformer.transform(event)
# create vertex
vertex = graph_utils.create_vertex(
'RESOURCE_INSTANCE_' + event['id'],
entity_id=event['id'],
entity_type='RESOURCE',
entity_subtype='INSTANCE',
entity_state=event[VertexProperties.STATE.lower()],
update_timestamp=datetime.datetime.now().time(),
is_deleted=False)
# create neighbors
neighbor_vertex = graph_utils.create_vertex(
'RESOURCE_HOST_' + event['hostname'],
entity_id=event['hostname'],
entity_type='RESOURCE',
entity_subtype='HOST',
is_deleted=None)
neighbor_edge = graph_utils.create_edge(
neighbor_vertex.vertex_id,
vertex.vertex_id,
'contains',
is_deleted=False)
neighbors = [base.Neighbor(neighbor_vertex, neighbor_edge)]
# decide event type
if event['sync_mode'] == SyncMode.INIT_SNAPSHOT:
event_type = EventAction.CREATE
elif event['sync_mode'] == SyncMode.UPDATE:
if event['event_type'] == 'compute.instance.volume.attach':
event_type = EventAction.UPDATE
elif event['event_type'] == 'compute.instance.delete.end':
event_type = EventAction.DELETE
return base.EntityWrapper(vertex, neighbors, event_type)
return self.transformer.transform(event)
def _update_neighbors(self, vertex, neighbors):
"""Updates vertices neighbor connections

View File

@ -16,9 +16,7 @@ import logging
from oslo_config import cfg
from oslo_log import log
from oslo_policy import opts as policy_opts
from oslo_service import service as os_service
from vitrage.entity_graph.processor import synchronizer_client
from vitrage import opts
LOG = log.getLogger(__name__)
@ -42,24 +40,3 @@ def prepare_service(args=None, default_opts=None, conf=None):
conf.log_opt_values(LOG, logging.DEBUG)
return conf
class ProcessorService(os_service.Service):
def __init__(self):
super(ProcessorService, self).__init__()
def start(self):
LOG.info("Start ProcessorService")
super(ProcessorService, self).start()
# ThreadPool.start()
synchronizer_client.SynchronizerActions.get_all()
LOG.info("Finish start ProcessorService")
def stop(self):
LOG.info("Stop ProcessorService")
# ThreadPool.stop()
super(ProcessorService, self).stop()
LOG.info("Finish stop ProcessorService")

View File

@ -1,13 +1,15 @@
{
"sync_mode": "init_snapshot",
"image": "cirros-[a-z]+",
"state": "ACTIVE|INACTIVE",
"status": "ACTIVE|INACTIVE",
"tenant_id": "[0-9a-f]{32}",
"user_id": "[0-9a-f]{32}",
"flavor": "m1.nano",
"hostname": "host[0-4]",
"OS-EXT-SRV-ATTR:host": "host[0-4]",
"id": "vm[0-1][0-9]",
"name": "vm[0-9]{3}",
"event_type": "update"
"event_type": "update",
"sync_type": "nova.instance",
"updated": "2015-12-01T12:46:41Z"
}

View File

@ -24,13 +24,15 @@ class BaseProcessor(base.BaseTest):
super(BaseProcessor, self).setUp()
self.transform = transformer_manager.TransformerManager()
def _update_vertex_to_graph(self, e_g_manager, type,
sub_type, id, additional_prop):
def _update_vertex_to_graph(self, e_g_manager, type, sub_type, id,
is_deleted, is_partial_data, additional_prop):
# create vertex properties
prop = {key: value for key, value in additional_prop.iteritems()}
prop[VertexProperties.TYPE] = type
prop[VertexProperties.SUB_TYPE] = sub_type
prop[VertexProperties.ID] = id
prop[VertexProperties.IS_VERTEX_DELETED] = is_deleted
prop[VertexProperties.IS_PARTIAL_DATA] = is_partial_data
# TODO(Alexey): change back to original method
# vertex_id = self.transform.get_key(prop)

View File

@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from vitrage.common.constants import EdgeProperties
from vitrage.common.constants import VertexProperties
from vitrage.entity_graph.processor import entity_graph_manager
from vitrage.tests.unit.processor import base
@ -29,7 +28,7 @@ class TestEntityGraphManager(base.BaseProcessor):
# create vertex properties
instance_vertex = self._update_vertex_to_graph(e_g_manager,
'RESOURCE', 'INSTANCE',
'12345', {})
'123', False, True, {})
# check is partial data vertex
is_partial_data_vertex = \
@ -38,9 +37,8 @@ class TestEntityGraphManager(base.BaseProcessor):
# add host vertex
host_vertex = self._update_vertex_to_graph(e_g_manager, 'RESOURCE',
'HOST', '54321', {})
prop = {}
prop[EdgeProperties.RELATION_NAME] = 'contains'
'HOST', '321',
False, True, {})
edge = self._update_edge_to_graph(e_g_manager, host_vertex.vertex_id,
instance_vertex.vertex_id,
'contains')
@ -63,10 +61,10 @@ class TestEntityGraphManager(base.BaseProcessor):
e_g_manager = entity_graph_manager.EntityGraphManager()
# create vertex properties
prop = {}
prop[VertexProperties.STATE] = 'ACTIVE'
prop = {VertexProperties.STATE: 'ACTIVE'}
vertex = self._update_vertex_to_graph(e_g_manager, 'RESOURCE',
'INSTANCE', '12345', prop)
'INSTANCE', '12345',
False, False, prop)
# check is not partial data vertex
is_partial_data_vertex = e_g_manager.is_partial_data_vertex(vertex)
@ -77,7 +75,8 @@ class TestEntityGraphManager(base.BaseProcessor):
# create vertex properties
vertex = self._update_vertex_to_graph(e_g_manager, 'RESOURCE',
'INSTANCE', '12345', {})
'INSTANCE', '12345',
False, True, {})
# check is partial data vertex
is_partial_data_vertex = e_g_manager.is_partial_data_vertex(vertex)
@ -93,7 +92,8 @@ class TestEntityGraphManager(base.BaseProcessor):
# create vertex properties
vertex = self._update_vertex_to_graph(e_g_manager, 'RESOURCE',
'INSTANCE', '12345', {})
'INSTANCE', '12345',
False, True, {})
# check vitrage deleted
self.assertFalse(e_g_manager.is_vertex_deleted(vertex))
@ -105,9 +105,11 @@ class TestEntityGraphManager(base.BaseProcessor):
# create vertex properties
vertex1 = self._update_vertex_to_graph(e_g_manager, 'RESOURCE',
'INSTANCE', '12345', {})
'INSTANCE', '12345',
False, True, {})
vertex2 = self._update_vertex_to_graph(e_g_manager, 'RESOURCE',
'HOST', '54321', {})
'HOST', '54321',
False, True, {})
edge = self._update_edge_to_graph(e_g_manager, vertex1.vertex_id,
vertex2.vertex_id, 'contains')
@ -119,17 +121,18 @@ class TestEntityGraphManager(base.BaseProcessor):
def test_find_neighbor_types(self):
neighbors = []
e_g_manager = entity_graph_manager.EntityGraphManager()
entities_details = [('RESOURCE', 'HOST', '1'),
('RESOURCE', 'STORAGE', '2'),
('RESOURCE', 'APPLICATION', '3'),
('RESOURCE', 'STORAGE', '4'),
('ALARM', 'INSTANCE_AT_RISK', '5')]
entities_details = [('RESOURCE', 'HOST', '1', False, True),
('RESOURCE', 'STORAGE', '2', False, True),
('RESOURCE', 'APPLICATION', '3', False, True),
('RESOURCE', 'STORAGE', '4', False, True),
('ALARM', 'INSTANCE_AT_RISK', '5', False, True)]
# add neighbors
for details in entities_details:
# neighbor
vertex = self._update_vertex_to_graph(e_g_manager, details[0],
details[1], details[2], {})
details[1], details[2],
details[3], details[4], {})
neighbors.append((vertex, None))
# get neighbors types

View File

@ -12,7 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from datetime import datetime
import pytz
import unittest
from vitrage.common.constants import SyncMode
@ -40,6 +41,7 @@ class TestProcessor(base.BaseTest):
super(TestProcessor, self).setUp()
self.spec_list = self._get_spec_list()
# TODO(Alexey): un skip this test when host and zone transformers are ready
@unittest.skip('Not ready yet')
def test_create_entity_graph(self):
processor = self._create_processor_with_graph()
@ -54,6 +56,8 @@ class TestProcessor(base.BaseTest):
# bfs_list = graph.algo.bfs(graph)
# self.assertEqual(num_vertices, len(bfs_list))
# TODO(Alexey): un skip this test when instance transformer update is ready
@unittest.skip('Not ready yet')
def test_process_event(self):
# check create instance event
processor = proc.Processor()
@ -84,7 +88,7 @@ class TestProcessor(base.BaseTest):
def test_update_entity_state(self):
# create instance event with host neighbor and check validity
prop = {VertexProperties.STATE.lower(): 'STARTING'}
prop = {'status': 'STARTING'}
(vertex, neighbors, processor) =\
self._create_and_check_entity(properties=prop)
@ -95,7 +99,7 @@ class TestProcessor(base.BaseTest):
# update instance event with state running
vertex.properties[VertexProperties.STATE] = 'RUNNING'
vertex.properties[VertexProperties.UPDATE_TIMESTAMP] = \
datetime.datetime.now().time()
datetime.utcnow().replace(tzinfo=pytz.utc).__str__()
processor.update_entity(vertex, neighbors)
# check state
@ -257,6 +261,9 @@ class TestProcessor(base.BaseTest):
for key, value in properties.iteritems():
events_list[0][key] = value
# TODO(Alexey): remove this and fix it in mock
events_list[0]['sync_type'] = "nova.instance"
return events_list[0]
def _get_instance_entity_spec_list(self, config_file_path,