From 26a20ded95caea52b588753348610a7d16fee20c Mon Sep 17 00:00:00 2001 From: Alexey Weyl Date: Tue, 29 Dec 2015 11:21:04 +0200 Subject: [PATCH] Integration of Synchronizer + Processor + Transformer Implements: blueprint vitrage-resource-processor Change-Id: Ice972f21eb903ef741f1969ae2bc7c42c1581542 --- requirements.txt | 2 + setup.cfg | 1 + test-requirements.txt | 2 + .../synchronizer_client.py => cmd/graph.py} | 22 ++++------ vitrage/entity_graph/processor/__init__.py | 26 ++++++++++- .../processor/entity_graph_manager.py | 32 +++++++------- vitrage/entity_graph/processor/processor.py | 43 +------------------ vitrage/service.py | 23 ---------- .../transformer_inst_snapshot_dynamic.json | 8 ++-- vitrage/tests/unit/processor/base.py | 6 ++- .../processor/test_entity_graph_manager.py | 39 +++++++++-------- .../tests/unit/processor/test_processor.py | 13 ++++-- 12 files changed, 95 insertions(+), 122 deletions(-) rename vitrage/{entity_graph/processor/synchronizer_client.py => cmd/graph.py} (66%) diff --git a/requirements.txt b/requirements.txt index 6890c80ea..d9e541598 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,10 +4,12 @@ pbr>=1.6 Babel>=1.3 +python-dateutil>=2.4.2 python-novaclient>=2.26.0 networkx>=1.10 oslo.log>=1.12.0 # Apache-2.0 oslo.policy>=0.3.0 +oslo.service>=0.1.0 # Apache-2.0 pecan>=0.8.0 PasteDeploy>=1.5.0 Werkzeug>=0.7 diff --git a/setup.cfg b/setup.cfg index dd47a01db..3bbdff811 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,7 @@ setup-hooks = [entry_points] console_scripts = vitrage-api = vitrage.cmd.api:main + vitrage-entity-graph = vitrage.cmd.graph:entity_graph oslo.config.opts = vitrage = vitrage.opts:list_opts diff --git a/test-requirements.txt b/test-requirements.txt index 6a476837d..e9d06dd6a 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,6 +3,7 @@ # process, which may cause wedges in the gate later. hacking<0.11,>=0.10.0 +python-dateutil>=2.4.2 coverage>=3.6 discover networkx>=1.10 @@ -12,6 +13,7 @@ sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 oslo.log>=1.12.0 # Apache-2.0 oslosphinx>=2.5.0 # Apache-2.0 oslotest>=1.10.0 # Apache-2.0 +oslo.service>=0.1.0 # Apache-2.0 testrepository>=0.0.18 testscenarios>=0.4 testtools>=1.4.0 diff --git a/vitrage/entity_graph/processor/synchronizer_client.py b/vitrage/cmd/graph.py similarity index 66% rename from vitrage/entity_graph/processor/synchronizer_client.py rename to vitrage/cmd/graph.py index be7651da6..dafae15a1 100644 --- a/vitrage/entity_graph/processor/synchronizer_client.py +++ b/vitrage/cmd/graph.py @@ -12,19 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo_log import log +from oslo_service import service as os_service + +from vitrage import entity_graph as entity_graph_svc +from vitrage import service -LOG = log.getLogger(__name__) - - -class SynchronizerClient(object): - - def __init__(self): - pass - - def get_all(self): - pass - - def get_all_entity(self, entity_name): - pass +def entity_graph(): + conf = service.prepare_service() + os_service.launch(conf, + entity_graph_svc.VitrageEntityGraphService(conf)).wait() diff --git a/vitrage/entity_graph/processor/__init__.py b/vitrage/entity_graph/processor/__init__.py index 7a05fb1f4..76ddf9828 100644 --- a/vitrage/entity_graph/processor/__init__.py +++ b/vitrage/entity_graph/processor/__init__.py @@ -12,4 +12,28 @@ # License for the specific language governing permissions and limitations # under the License. -__author__ = 'stack' +from oslo_log import log + +# from oslo_service import service as os_service + + +LOG = log.getLogger(__name__) + + +# class VitrageEntityGraphService(os_service.Service): +class VitrageEntityGraphService(object): + + def __init__(self): + super(VitrageEntityGraphService, self).__init__() + + def start(self): + LOG.info("Start ProcessorService") + super(VitrageEntityGraphService, self).start() + + LOG.info("Finish start ProcessorService") + + def stop(self): + LOG.info("Stop ProcessorService") + + super(VitrageEntityGraphService, self).stop() + LOG.info("Finish stop ProcessorService") diff --git a/vitrage/entity_graph/processor/entity_graph_manager.py b/vitrage/entity_graph/processor/entity_graph_manager.py index 96bfceb9f..a4e491f0c 100644 --- a/vitrage/entity_graph/processor/entity_graph_manager.py +++ b/vitrage/entity_graph/processor/entity_graph_manager.py @@ -13,6 +13,7 @@ # under the License. import datetime +from dateutil import parser from oslo_log import log @@ -36,25 +37,19 @@ class EntityGraphManager(object): def is_partial_data_vertex(self, vertex): """Check if a vertex is a partial data vertex - Checks if the vertex was updated only because it's a neighbor of a - full data vertex + Vertex is a partial data vertex if it's IS_PARTIAL_DATA property is + True and if it has no neighbors """ + if not vertex.properties[VertexProperties.IS_PARTIAL_DATA]: + return False + # check that vertex has no neighbors neighbor_edges = self.graph.get_edges(vertex.vertex_id, direction=Direction.BOTH) - for neighbor_edge in neighbor_edges: - if not self.is_edge_deleted(neighbor_edge): - return False - # check properties - # TODO(Alexey): implement get_vertex_essential_properties - # key_properties = self.transformer.key_fields(vertex) - key_properties = [VertexProperties.TYPE, VertexProperties.SUB_TYPE, - VertexProperties.ID] - - return not any(True for prop in vertex.properties - if prop not in key_properties) + return not any(True for neighbor_edge in neighbor_edges + if not self.is_edge_deleted(neighbor_edge)) def delete_partial_data_vertex(self, suspected_vertex): """Checks if it is a partial data vertex, and if so deletes it """ @@ -118,6 +113,11 @@ class EntityGraphManager(object): def check_timestamp(self, curr_vertex, new_vertex): is_timestamp_property_exist = VertexProperties.UPDATE_TIMESTAMP \ not in curr_vertex.properties.keys() - is_old = curr_vertex.properties[VertexProperties.UPDATE_TIMESTAMP] <= \ - new_vertex.properties[VertexProperties.UPDATE_TIMESTAMP] - return is_timestamp_property_exist or is_old + if is_timestamp_property_exist: + return True + + current_time = parser.parse(curr_vertex.properties[ + VertexProperties.UPDATE_TIMESTAMP]) + new_time = parser.parse(new_vertex.properties[ + VertexProperties.UPDATE_TIMESTAMP]) + return current_time <= new_time diff --git a/vitrage/entity_graph/processor/processor.py b/vitrage/entity_graph/processor/processor.py index f77652307..82ecb5d8f 100644 --- a/vitrage/entity_graph/processor/processor.py +++ b/vitrage/entity_graph/processor/processor.py @@ -12,16 +12,11 @@ # License for the specific language governing permissions and limitations # under the License. -import datetime - from oslo_log import log from vitrage.common.constants import EventAction -from vitrage.common.constants import SyncMode -from vitrage.common.constants import VertexProperties from vitrage.entity_graph.processor import base as processor from vitrage.entity_graph.processor import entity_graph_manager -from vitrage.entity_graph.transformer import base from vitrage.entity_graph.transformer import transformer_manager from vitrage.graph import Direction from vitrage.graph import utils as graph_utils @@ -137,43 +132,7 @@ class Processor(processor.ProcessorBase): deleted_vertex) def transform_entity(self, event): - # TODO(Alexey): change back to the original call - # return self.transformer.transform(event) - - # create vertex - vertex = graph_utils.create_vertex( - 'RESOURCE_INSTANCE_' + event['id'], - entity_id=event['id'], - entity_type='RESOURCE', - entity_subtype='INSTANCE', - entity_state=event[VertexProperties.STATE.lower()], - update_timestamp=datetime.datetime.now().time(), - is_deleted=False) - - # create neighbors - neighbor_vertex = graph_utils.create_vertex( - 'RESOURCE_HOST_' + event['hostname'], - entity_id=event['hostname'], - entity_type='RESOURCE', - entity_subtype='HOST', - is_deleted=None) - neighbor_edge = graph_utils.create_edge( - neighbor_vertex.vertex_id, - vertex.vertex_id, - 'contains', - is_deleted=False) - neighbors = [base.Neighbor(neighbor_vertex, neighbor_edge)] - - # decide event type - if event['sync_mode'] == SyncMode.INIT_SNAPSHOT: - event_type = EventAction.CREATE - elif event['sync_mode'] == SyncMode.UPDATE: - if event['event_type'] == 'compute.instance.volume.attach': - event_type = EventAction.UPDATE - elif event['event_type'] == 'compute.instance.delete.end': - event_type = EventAction.DELETE - - return base.EntityWrapper(vertex, neighbors, event_type) + return self.transformer.transform(event) def _update_neighbors(self, vertex, neighbors): """Updates vertices neighbor connections diff --git a/vitrage/service.py b/vitrage/service.py index 48faaa630..f1e1be3b1 100644 --- a/vitrage/service.py +++ b/vitrage/service.py @@ -16,9 +16,7 @@ import logging from oslo_config import cfg from oslo_log import log from oslo_policy import opts as policy_opts -from oslo_service import service as os_service -from vitrage.entity_graph.processor import synchronizer_client from vitrage import opts LOG = log.getLogger(__name__) @@ -42,24 +40,3 @@ def prepare_service(args=None, default_opts=None, conf=None): conf.log_opt_values(LOG, logging.DEBUG) return conf - - -class ProcessorService(os_service.Service): - - def __init__(self): - super(ProcessorService, self).__init__() - - def start(self): - LOG.info("Start ProcessorService") - super(ProcessorService, self).start() - - # ThreadPool.start() - synchronizer_client.SynchronizerActions.get_all() - - LOG.info("Finish start ProcessorService") - - def stop(self): - LOG.info("Stop ProcessorService") - # ThreadPool.stop() - super(ProcessorService, self).stop() - LOG.info("Finish stop ProcessorService") diff --git a/vitrage/tests/resources/transformer_inst_snapshot_dynamic.json b/vitrage/tests/resources/transformer_inst_snapshot_dynamic.json index d6966f564..87a098f9f 100644 --- a/vitrage/tests/resources/transformer_inst_snapshot_dynamic.json +++ b/vitrage/tests/resources/transformer_inst_snapshot_dynamic.json @@ -1,13 +1,15 @@ { "sync_mode": "init_snapshot", "image": "cirros-[a-z]+", - "state": "ACTIVE|INACTIVE", + "status": "ACTIVE|INACTIVE", "tenant_id": "[0-9a-f]{32}", "user_id": "[0-9a-f]{32}", "flavor": "m1.nano", - "hostname": "host[0-4]", + "OS-EXT-SRV-ATTR:host": "host[0-4]", "id": "vm[0-1][0-9]", "name": "vm[0-9]{3}", - "event_type": "update" + "event_type": "update", + "sync_type": "nova.instance", + "updated": "2015-12-01T12:46:41Z" } diff --git a/vitrage/tests/unit/processor/base.py b/vitrage/tests/unit/processor/base.py index 28fbdd4d7..a6097e33d 100644 --- a/vitrage/tests/unit/processor/base.py +++ b/vitrage/tests/unit/processor/base.py @@ -24,13 +24,15 @@ class BaseProcessor(base.BaseTest): super(BaseProcessor, self).setUp() self.transform = transformer_manager.TransformerManager() - def _update_vertex_to_graph(self, e_g_manager, type, - sub_type, id, additional_prop): + def _update_vertex_to_graph(self, e_g_manager, type, sub_type, id, + is_deleted, is_partial_data, additional_prop): # create vertex properties prop = {key: value for key, value in additional_prop.iteritems()} prop[VertexProperties.TYPE] = type prop[VertexProperties.SUB_TYPE] = sub_type prop[VertexProperties.ID] = id + prop[VertexProperties.IS_VERTEX_DELETED] = is_deleted + prop[VertexProperties.IS_PARTIAL_DATA] = is_partial_data # TODO(Alexey): change back to original method # vertex_id = self.transform.get_key(prop) diff --git a/vitrage/tests/unit/processor/test_entity_graph_manager.py b/vitrage/tests/unit/processor/test_entity_graph_manager.py index dc2816258..67498383e 100644 --- a/vitrage/tests/unit/processor/test_entity_graph_manager.py +++ b/vitrage/tests/unit/processor/test_entity_graph_manager.py @@ -12,7 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -from vitrage.common.constants import EdgeProperties from vitrage.common.constants import VertexProperties from vitrage.entity_graph.processor import entity_graph_manager from vitrage.tests.unit.processor import base @@ -29,7 +28,7 @@ class TestEntityGraphManager(base.BaseProcessor): # create vertex properties instance_vertex = self._update_vertex_to_graph(e_g_manager, 'RESOURCE', 'INSTANCE', - '12345', {}) + '123', False, True, {}) # check is partial data vertex is_partial_data_vertex = \ @@ -38,9 +37,8 @@ class TestEntityGraphManager(base.BaseProcessor): # add host vertex host_vertex = self._update_vertex_to_graph(e_g_manager, 'RESOURCE', - 'HOST', '54321', {}) - prop = {} - prop[EdgeProperties.RELATION_NAME] = 'contains' + 'HOST', '321', + False, True, {}) edge = self._update_edge_to_graph(e_g_manager, host_vertex.vertex_id, instance_vertex.vertex_id, 'contains') @@ -63,10 +61,10 @@ class TestEntityGraphManager(base.BaseProcessor): e_g_manager = entity_graph_manager.EntityGraphManager() # create vertex properties - prop = {} - prop[VertexProperties.STATE] = 'ACTIVE' + prop = {VertexProperties.STATE: 'ACTIVE'} vertex = self._update_vertex_to_graph(e_g_manager, 'RESOURCE', - 'INSTANCE', '12345', prop) + 'INSTANCE', '12345', + False, False, prop) # check is not partial data vertex is_partial_data_vertex = e_g_manager.is_partial_data_vertex(vertex) @@ -77,7 +75,8 @@ class TestEntityGraphManager(base.BaseProcessor): # create vertex properties vertex = self._update_vertex_to_graph(e_g_manager, 'RESOURCE', - 'INSTANCE', '12345', {}) + 'INSTANCE', '12345', + False, True, {}) # check is partial data vertex is_partial_data_vertex = e_g_manager.is_partial_data_vertex(vertex) @@ -93,7 +92,8 @@ class TestEntityGraphManager(base.BaseProcessor): # create vertex properties vertex = self._update_vertex_to_graph(e_g_manager, 'RESOURCE', - 'INSTANCE', '12345', {}) + 'INSTANCE', '12345', + False, True, {}) # check vitrage deleted self.assertFalse(e_g_manager.is_vertex_deleted(vertex)) @@ -105,9 +105,11 @@ class TestEntityGraphManager(base.BaseProcessor): # create vertex properties vertex1 = self._update_vertex_to_graph(e_g_manager, 'RESOURCE', - 'INSTANCE', '12345', {}) + 'INSTANCE', '12345', + False, True, {}) vertex2 = self._update_vertex_to_graph(e_g_manager, 'RESOURCE', - 'HOST', '54321', {}) + 'HOST', '54321', + False, True, {}) edge = self._update_edge_to_graph(e_g_manager, vertex1.vertex_id, vertex2.vertex_id, 'contains') @@ -119,17 +121,18 @@ class TestEntityGraphManager(base.BaseProcessor): def test_find_neighbor_types(self): neighbors = [] e_g_manager = entity_graph_manager.EntityGraphManager() - entities_details = [('RESOURCE', 'HOST', '1'), - ('RESOURCE', 'STORAGE', '2'), - ('RESOURCE', 'APPLICATION', '3'), - ('RESOURCE', 'STORAGE', '4'), - ('ALARM', 'INSTANCE_AT_RISK', '5')] + entities_details = [('RESOURCE', 'HOST', '1', False, True), + ('RESOURCE', 'STORAGE', '2', False, True), + ('RESOURCE', 'APPLICATION', '3', False, True), + ('RESOURCE', 'STORAGE', '4', False, True), + ('ALARM', 'INSTANCE_AT_RISK', '5', False, True)] # add neighbors for details in entities_details: # neighbor vertex = self._update_vertex_to_graph(e_g_manager, details[0], - details[1], details[2], {}) + details[1], details[2], + details[3], details[4], {}) neighbors.append((vertex, None)) # get neighbors types diff --git a/vitrage/tests/unit/processor/test_processor.py b/vitrage/tests/unit/processor/test_processor.py index 956196abf..2a3934ef9 100644 --- a/vitrage/tests/unit/processor/test_processor.py +++ b/vitrage/tests/unit/processor/test_processor.py @@ -12,7 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. -import datetime +from datetime import datetime +import pytz import unittest from vitrage.common.constants import SyncMode @@ -40,6 +41,7 @@ class TestProcessor(base.BaseTest): super(TestProcessor, self).setUp() self.spec_list = self._get_spec_list() + # TODO(Alexey): un skip this test when host and zone transformers are ready @unittest.skip('Not ready yet') def test_create_entity_graph(self): processor = self._create_processor_with_graph() @@ -54,6 +56,8 @@ class TestProcessor(base.BaseTest): # bfs_list = graph.algo.bfs(graph) # self.assertEqual(num_vertices, len(bfs_list)) + # TODO(Alexey): un skip this test when instance transformer update is ready + @unittest.skip('Not ready yet') def test_process_event(self): # check create instance event processor = proc.Processor() @@ -84,7 +88,7 @@ class TestProcessor(base.BaseTest): def test_update_entity_state(self): # create instance event with host neighbor and check validity - prop = {VertexProperties.STATE.lower(): 'STARTING'} + prop = {'status': 'STARTING'} (vertex, neighbors, processor) =\ self._create_and_check_entity(properties=prop) @@ -95,7 +99,7 @@ class TestProcessor(base.BaseTest): # update instance event with state running vertex.properties[VertexProperties.STATE] = 'RUNNING' vertex.properties[VertexProperties.UPDATE_TIMESTAMP] = \ - datetime.datetime.now().time() + datetime.utcnow().replace(tzinfo=pytz.utc).__str__() processor.update_entity(vertex, neighbors) # check state @@ -257,6 +261,9 @@ class TestProcessor(base.BaseTest): for key, value in properties.iteritems(): events_list[0][key] = value + # TODO(Alexey): remove this and fix it in mock + events_list[0]['sync_type'] = "nova.instance" + return events_list[0] def _get_instance_entity_spec_list(self, config_file_path,