From fb4088c32c54979f9be2b8efa83f3140458a3982 Mon Sep 17 00:00:00 2001 From: Idan Hefetz Date: Tue, 10 Jul 2018 15:04:48 +0000 Subject: [PATCH] Graph fast fail-over * Initialize quickly upon fail-over without requesting updates. * In case of downtime, Vitrage-graph startup will requests collector updates * vitrage-persistor has an expirer timer to remove old db events Story: 2002663 Task: 22473 Change-Id: Icccf230e69c41a2f115c0797e60df774db637594 Depends-On: I042665e0d642ba36a97af84a6dc0581888025207 Depends-On: Id5dbd165a1e0220e4e24207e8d237f94415fc490 --- devstack/gate_hook_mock.sh | 4 +- setup.cfg | 1 + vitrage/cli/storage.py | 9 ++ vitrage/common/constants.py | 14 +- vitrage/common/utils.py | 10 ++ vitrage/datasources/collector_notifier.py | 58 -------- vitrage/datasources/listener_service.py | 4 - vitrage/datasources/rpc_service.py | 29 +--- vitrage/datasources/transformer_base.py | 4 +- vitrage/entity_graph/datasource_rpc.py | 15 +- vitrage/entity_graph/graph_init.py | 68 +++++++-- vitrage/entity_graph/graph_persistency.py | 129 +++++++++++++++++ vitrage/entity_graph/processor/processor.py | 17 +-- vitrage/entity_graph/scheduler.py | 24 +-- vitrage/entity_graph/workers.py | 12 ++ vitrage/evaluator/scenario_evaluator.py | 10 +- .../graph/algo_driver/networkx_algorithm.py | 6 - vitrage/graph/driver/graph.py | 21 ++- vitrage/graph/driver/notifier.py | 17 ++- vitrage/persistency/__init__.py | 3 - vitrage/persistency/graph_persistor.py | 60 -------- vitrage/persistency/service.py | 61 +++++--- vitrage/storage/base.py | 8 +- vitrage/storage/impl_sqlalchemy.py | 81 ++++++----- vitrage/storage/sqlalchemy/models.py | 22 +-- .../graph_persistor/test_graph_persistor.py | 137 ++++++++++-------- .../vertices/nova.host.json | 4 +- .../nova/test_nova_instance_transformer.py | 2 +- vitrage/tests/unit/entity_graph/base.py | 7 + vitrage/tests/unit/graph/test_graph.py | 40 +++-- 30 files changed, 496 insertions(+), 381 deletions(-) delete mode 100644 vitrage/datasources/collector_notifier.py create mode 100644 vitrage/entity_graph/graph_persistency.py delete mode 100644 vitrage/persistency/graph_persistor.py diff --git a/devstack/gate_hook_mock.sh b/devstack/gate_hook_mock.sh index b7b3f509d..731068f8c 100644 --- a/devstack/gate_hook_mock.sh +++ b/devstack/gate_hook_mock.sh @@ -45,8 +45,8 @@ debug = false notifiers = nova,webhook [datasources] -types=mock_graph_datasource -path=vitrage.tests.mocks +types=doctor,mock_graph_datasource +path=vitrage.datasources,vitrage.tests.mocks snapshots_interval=60 [mock_graph_datasource] diff --git a/setup.cfg b/setup.cfg index 208248347..d8d9e3bc1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,7 @@ console_scripts = vitrage-persistor = vitrage.cli.persistor:main vitrage-ml = vitrage.cli.machine_learning:main vitrage-dbsync = vitrage.cli.storage:dbsync + vitrage-purge-data = vitrage.cli.storage:purge_data vitrage-snmp-parsing = vitrage.cli.snmp_parsing:main vitrage.entity_graph = diff --git a/vitrage/cli/storage.py b/vitrage/cli/storage.py index ca5bf91eb..697246ff2 100644 --- a/vitrage/cli/storage.py +++ b/vitrage/cli/storage.py @@ -21,3 +21,12 @@ def dbsync(): print(VITRAGE_TITLE) conf = service.prepare_service() storage.get_connection_from_config(conf).upgrade() + + +def purge_data(): + print(VITRAGE_TITLE) + conf = service.prepare_service() + db = storage.get_connection_from_config(conf) + db.active_actions.delete() + db.events.delete() + db.graph_snapshots.delete() diff --git a/vitrage/common/constants.py b/vitrage/common/constants.py index 229b8272d..676802500 100644 --- a/vitrage/common/constants.py +++ b/vitrage/common/constants.py @@ -14,12 +14,16 @@ # under the License. -class VertexProperties(object): +class ElementProperties(object): + VITRAGE_IS_DELETED = 'vitrage_is_deleted' + UPDATE_TIMESTAMP = 'update_timestamp' + + +class VertexProperties(ElementProperties): VITRAGE_CATEGORY = 'vitrage_category' VITRAGE_TYPE = 'vitrage_type' VITRAGE_ID = 'vitrage_id' VITRAGE_STATE = 'vitrage_state' - VITRAGE_IS_DELETED = 'vitrage_is_deleted' VITRAGE_IS_PLACEHOLDER = 'vitrage_is_placeholder' VITRAGE_SAMPLE_TIMESTAMP = 'vitrage_sample_timestamp' VITRAGE_AGGREGATED_STATE = 'vitrage_aggregated_state' @@ -27,10 +31,10 @@ class VertexProperties(object): VITRAGE_AGGREGATED_SEVERITY = 'vitrage_aggregated_severity' VITRAGE_OPERATIONAL_SEVERITY = 'vitrage_operational_severity' VITRAGE_RESOURCE_ID = 'vitrage_resource_id' + VITRAGE_CACHED_ID = 'vitrage_cached_id' ID = 'id' STATE = 'state' PROJECT_ID = 'project_id' - UPDATE_TIMESTAMP = 'update_timestamp' NAME = 'name' SEVERITY = 'severity' IS_MARKED_DOWN = 'is_marked_down' @@ -44,10 +48,8 @@ class VertexProperties(object): IS_REAL_VITRAGE_ID = 'is_real_vitrage_id' -class EdgeProperties(object): +class EdgeProperties(ElementProperties): RELATIONSHIP_TYPE = 'relationship_type' - VITRAGE_IS_DELETED = 'vitrage_is_deleted' - UPDATE_TIMESTAMP = 'update_timestamp' class EdgeLabel(object): diff --git a/vitrage/common/utils.py b/vitrage/common/utils.py index 9d57aa7e9..e041104e1 100644 --- a/vitrage/common/utils.py +++ b/vitrage/common/utils.py @@ -18,8 +18,10 @@ # under the License. from collections import defaultdict import copy +import hashlib import itertools import random +import six import threading from oslo_config import cfg @@ -87,3 +89,11 @@ def spawn(target, *args, **kwargs): t.daemon = True t.start() return t + + +def md5(obj): + if isinstance(obj, tuple): + obj = str(obj) + if isinstance(obj, six.string_types): + return hashlib.md5(six.b(obj)).hexdigest() + raise Exception('Unknown object for md5 %s', str(obj)) diff --git a/vitrage/datasources/collector_notifier.py b/vitrage/datasources/collector_notifier.py deleted file mode 100644 index 4296593f7..000000000 --- a/vitrage/datasources/collector_notifier.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright 2017 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_log import log -import oslo_messaging - -from vitrage.messaging import get_transport - - -LOG = log.getLogger(__name__) - - -class CollectorNotifier(object): - """Allows writing to message bus""" - def __init__(self, conf): - self.oslo_notifier = None - try: - topics = [conf.datasources.notification_topic_collector] - if conf.persistency.enable_persistency: - topics.append(conf.persistency.persistor_topic) - else: - LOG.warning("Not persisting events") - - self.oslo_notifier = oslo_messaging.Notifier( - get_transport(conf), - driver='messagingv2', - publisher_id='datasources.events', - topics=topics) - except Exception as e: - LOG.info('Collector notifier - missing configuration %s' - % str(e)) - - @property - def enabled(self): - return self.oslo_notifier is not None - - def notify_when_applicable(self, enriched_event): - """Callback subscribed to driver.graph updates - - :param enriched_event: the event with enriched data added by the driver - """ - - try: - self.oslo_notifier.info({}, '', enriched_event) - except Exception: - LOG.exception('Datasource event cannot be notified - %s.', - enriched_event) diff --git a/vitrage/datasources/listener_service.py b/vitrage/datasources/listener_service.py index e8f528813..798f4f0ea 100644 --- a/vitrage/datasources/listener_service.py +++ b/vitrage/datasources/listener_service.py @@ -32,10 +32,6 @@ class ListenerService(object): self._create_callbacks_by_events_dict(conf) topics = [conf.datasources.notification_topic_collector] - if conf.persistency.enable_persistency: - topics.append(conf.persistency.persistor_topic) - else: - LOG.warning("Not persisting events") notifier = VitrageNotifier(conf, 'driver.events', topics) self.listener = self._get_topics_listener(conf, notifier.notify) diff --git a/vitrage/datasources/rpc_service.py b/vitrage/datasources/rpc_service.py index 8655c36cc..4a3f8f0d9 100644 --- a/vitrage/datasources/rpc_service.py +++ b/vitrage/datasources/rpc_service.py @@ -19,7 +19,6 @@ from oslo_log import log from vitrage.common.constants import DatasourceAction from vitrage.datasources import utils -from vitrage.messaging import VitrageNotifier from vitrage import rpc as vitrage_rpc LOG = log.getLogger(__name__) @@ -28,13 +27,11 @@ LOG = log.getLogger(__name__) class CollectorRpcHandlerService(object): def __init__(self, conf): - super(CollectorRpcHandlerService, self).__init__() self.conf = conf - async_persistor = self.create_async_persistor(conf) self.server = vitrage_rpc.get_default_server( conf, conf.rpc_topic_collector, - [DriversEndpoint(conf, async_persistor)]) + [DriversEndpoint(conf)]) def start(self): LOG.info("Collector Rpc Handler Service - Starting...") @@ -46,29 +43,11 @@ class CollectorRpcHandlerService(object): self.server.stop() LOG.info("Collector Rpc Handler Service - Stopped!") - @staticmethod - def create_async_persistor(conf): - if not conf.persistency.enable_persistency: - return None - topics = [conf.persistency.persistor_topic] - notifier = VitrageNotifier(conf, 'driver.events', topics) - persist_worker = futures.ThreadPoolExecutor(max_workers=1) - - def do_persist(events): - for e in events: - notifier.notify('', e) - - def do_work(events): - persist_worker.submit(do_persist, events) - - return do_work - class DriversEndpoint(object): - def __init__(self, conf, async_persistor): + def __init__(self, conf): self.conf = conf - self.async_persistor = async_persistor self.pool = futures.ThreadPoolExecutor( max_workers=len(self.conf.datasources.types)) @@ -93,8 +72,6 @@ class DriversEndpoint(object): result.extend(list(self.pool.map(run_driver, failed_drivers))) events = [e for success, events in result if success for e in events] - if self.async_persistor: - self.async_persistor(events) LOG.debug("run drivers get_all done.") return events @@ -103,7 +80,5 @@ class DriversEndpoint(object): LOG.debug("run driver get_changes: %s", driver_name) drivers = utils.get_drivers_by_name(self.conf, [driver_name]) events = drivers[0].get_changes(DatasourceAction.UPDATE) - if self.async_persistor: - self.async_persistor(events) LOG.debug("run driver get_changes: %s done.", driver_name) return events diff --git a/vitrage/datasources/transformer_base.py b/vitrage/datasources/transformer_base.py index 75b6e1951..ac4337ca1 100644 --- a/vitrage/datasources/transformer_base.py +++ b/vitrage/datasources/transformer_base.py @@ -29,6 +29,7 @@ from vitrage.common.constants import GraphAction from vitrage.common.constants import UpdateMethod from vitrage.common.constants import VertexProperties as VProps from vitrage.common.exception import VitrageTransformerError +from vitrage.common.utils import md5 from vitrage.datasources import OPENSTACK_CLUSTER import vitrage.graph.utils as graph_utils from vitrage.utils import datetime as datetime_utils @@ -169,6 +170,7 @@ class TransformerBase(object): if vertex.get(VProps.IS_REAL_VITRAGE_ID): return vertex new_uuid = self.uuid_from_deprecated_vitrage_id(vertex.vertex_id) + vertex.properties[VProps.VITRAGE_CACHED_ID] = md5(vertex.vertex_id) vertex.vertex_id = new_uuid vertex.properties[VProps.VITRAGE_ID] = new_uuid vertex.properties[VProps.IS_REAL_VITRAGE_ID] = True @@ -176,7 +178,7 @@ class TransformerBase(object): @classmethod def uuid_from_deprecated_vitrage_id(cls, vitrage_id): - old_vitrage_id = hash(vitrage_id) + old_vitrage_id = md5(vitrage_id) new_uuid = cls.key_to_uuid_cache.get(old_vitrage_id) if not new_uuid: new_uuid = uuidutils.generate_uuid() diff --git a/vitrage/entity_graph/datasource_rpc.py b/vitrage/entity_graph/datasource_rpc.py index 3796c4ff3..2a80d11ce 100644 --- a/vitrage/entity_graph/datasource_rpc.py +++ b/vitrage/entity_graph/datasource_rpc.py @@ -30,12 +30,12 @@ def create_rpc_client_instance(conf): def get_all(rpc_client, events_coordination, driver_names, action, - retry_on_fault=False, first_call_timeout=None): + retry_on_fault=False): LOG.info('get_all starting for %s', driver_names) t1 = time.time() - def _call(_client): - return _client.call( + def _call(): + return rpc_client.call( {}, 'driver_get_all', driver_names=driver_names, @@ -43,15 +43,10 @@ def get_all(rpc_client, events_coordination, driver_names, action, retry_on_fault=retry_on_fault) try: - if first_call_timeout: - # create a temporary client instance with a timeout - client = rpc_client.prepare(timeout=first_call_timeout) - events = _call(client) - else: - events = _call(rpc_client) + events = _call() except oslo_messaging.MessagingTimeout: LOG.exception('Got MessagingTimeout') - events = _call(rpc_client) if retry_on_fault else [] + events = _call() if retry_on_fault else [] t2 = time.time() events_coordination.handle_multiple_low_priority(events) t3 = time.time() diff --git a/vitrage/entity_graph/graph_init.py b/vitrage/entity_graph/graph_init.py index 83e1e20f3..5694e8e2d 100644 --- a/vitrage/entity_graph/graph_init.py +++ b/vitrage/entity_graph/graph_init.py @@ -18,14 +18,20 @@ from oslo_log import log import oslo_messaging from vitrage.common.constants import DatasourceAction +from vitrage.common.constants import VertexProperties as VProps from vitrage.common.utils import spawn +from vitrage.datasources.transformer_base import TransformerBase from vitrage.entity_graph import datasource_rpc as ds_rpc from vitrage.entity_graph import EVALUATOR_TOPIC +from vitrage.entity_graph.graph_persistency import GraphPersistency +from vitrage.entity_graph.processor.notifier import GraphNotifier from vitrage.entity_graph.processor.processor import Processor from vitrage.entity_graph.scheduler import Scheduler from vitrage.entity_graph.workers import GraphWorkersManager +from vitrage.graph.driver.networkx_graph import NXGraph from vitrage import messaging + LOG = log.getLogger(__name__) @@ -40,27 +46,49 @@ class VitrageGraphInit(object): self.process_event, conf.datasources.notification_topic_collector, EVALUATOR_TOPIC) - self.scheduler = Scheduler(conf, graph, self.events_coordination) - self.processor = Processor(conf, graph, self.scheduler.graph_persistor) + self.persist = GraphPersistency(conf, db_connection, graph) + self.scheduler = Scheduler(conf, graph, self.events_coordination, + self.persist) + self.processor = Processor(conf, graph) def run(self): LOG.info('Init Started') - LOG.info('clearing database active_actions') + graph_snapshot = self.persist.query_recent_snapshot() + if graph_snapshot: + self._restart_from_stored_graph(graph_snapshot) + else: + self._start_from_scratch() + self.workers.run() + + def _restart_from_stored_graph(self, graph_snapshot): + LOG.info('Initializing graph from database snapshot (%sKb)', + len(graph_snapshot.graph_snapshot) / 1024) + NXGraph.read_gpickle(graph_snapshot.graph_snapshot, self.graph) + self.persist.replay_events(self.graph, graph_snapshot.event_id) + self._recreate_transformers_id_cache() + LOG.info("%s vertices loaded", self.graph.num_vertices()) + spawn(self._start_all_workers, is_snapshot=True) + + def _start_from_scratch(self): + LOG.info('Starting for the first time') + LOG.info('Clearing database active_actions') self.db.active_actions.delete() ds_rpc.get_all( ds_rpc.create_rpc_client_instance(self.conf), self.events_coordination, self.conf.datasources.types, action=DatasourceAction.INIT_SNAPSHOT, - retry_on_fault=True, - first_call_timeout=10) - self.processor.start_notifier() - spawn(self.start_all_workers) - self.workers.run() + retry_on_fault=True) + LOG.info("%s vertices loaded", self.graph.num_vertices()) + self.persist.store_graph() + spawn(self._start_all_workers, is_snapshot=False) - def start_all_workers(self): - self.workers.submit_start_evaluations() # evaluate entire graph - self.graph.subscribe(self.workers.submit_graph_update) + def _start_all_workers(self, is_snapshot): + if is_snapshot: + self.workers.submit_enable_evaluations() + else: + self.workers.submit_start_evaluations() + self._add_graph_subscriptions() self.scheduler.start_periodic_tasks() LOG.info('Init Finished') self.events_coordination.start() @@ -72,6 +100,24 @@ class VitrageGraphInit(object): else: self.processor.process_event(event) + def _recreate_transformers_id_cache(self): + for v in self.graph.get_vertices(): + if not v.get(VProps.VITRAGE_CACHED_ID): + LOG.warning("Missing vitrage_cached_id in the vertex. " + "Vertex is not added to the ID cache %s", str(v)) + else: + TransformerBase.key_to_uuid_cache[v[VProps.VITRAGE_CACHED_ID]]\ + = v.vertex_id + + def _add_graph_subscriptions(self): + self.graph.subscribe(self.workers.submit_graph_update) + vitrage_notifier = GraphNotifier(self.conf) + if vitrage_notifier.enabled: + self.graph.subscribe(vitrage_notifier.notify_when_applicable) + LOG.info('Subscribed vitrage notifier to graph changes') + self.graph.subscribe(self.persist.persist_event, + finalization=True) + PRIORITY_DELAY = 0.05 diff --git a/vitrage/entity_graph/graph_persistency.py b/vitrage/entity_graph/graph_persistency.py new file mode 100644 index 000000000..156caeb71 --- /dev/null +++ b/vitrage/entity_graph/graph_persistency.py @@ -0,0 +1,129 @@ +# Copyright 2018 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from datetime import timedelta + +from oslo_log import log + +from vitrage.common.constants import VertexProperties as VProps +from vitrage.graph import Edge +from vitrage.graph import Vertex + +from vitrage.storage.sqlalchemy import models +from vitrage.utils.datetime import utcnow + + +LOG = log.getLogger(__name__) + +EPSILON = 30 + + +class GraphPersistency(object): + def __init__(self, conf, db, graph): + self.conf = conf + self.db = db + self.graph = graph + self.enabled = conf.persistency.enable_persistency + + def store_graph(self): + if not self.enabled: + return + + LOG.info('Persisting graph...') + try: + last_event_id = self.db.events.get_last_event_id() + last_event_id = last_event_id.event_id if last_event_id else 0 + graph_snapshot = self.graph.write_gpickle() + self.db.graph_snapshots.update(models.GraphSnapshot( + snapshot_id=1, + event_id=last_event_id, + graph_snapshot=graph_snapshot)) + LOG.info('Persisting graph - done') + except Exception: + LOG.exception("Graph is not stored") + + def _recent_snapshot_time(self): + t = utcnow(with_timezone=False) + t = t - timedelta(seconds=3 * self.conf.datasources.snapshots_interval) + t = t - timedelta(seconds=EPSILON) + return t + + def query_recent_snapshot(self): + if not self.enabled: + return + timestamp = self._recent_snapshot_time() + return self.db.graph_snapshots.query(timestamp=timestamp) + + def replay_events(self, graph, event_id): + LOG.info('Getting events from database') + events = self.db.events.get_replay_events( + event_id=event_id) + LOG.info('Applying %s database events', len(events)) + + for event in events: + if event.is_vertex: + v_id = event.payload['vertex_id'] + del event.payload['vertex_id'] + v = Vertex(v_id, event.payload) + graph.update_vertex(v) + else: + source_id = event.payload['source_id'] + target_id = event.payload['target_id'] + label = event.payload['label'] + del event.payload['source_id'] + del event.payload['target_id'] + del event.payload['label'] + e = Edge(source_id, target_id, label, event.payload) + graph.update_edge(e) + + def persist_event(self, before, current, is_vertex, graph, event_id=None): + """Callback subscribed to driver.graph updates""" + if not self.enabled or\ + not self.is_important_change(before, + current, + VProps.UPDATE_TIMESTAMP, + VProps.VITRAGE_SAMPLE_TIMESTAMP): + return + + if is_vertex: + curr = current.properties.copy() + curr['vertex_id'] = current.vertex_id + else: + curr = current.properties.copy() + curr['source_id'] = current.source_id + curr['target_id'] = current.target_id + curr['label'] = current.label + + event_row = models.Event(payload=curr, is_vertex=is_vertex, + event_id=event_id) + self.db.events.create(event_row) + + @staticmethod + def is_important_change(before, curr, *args): + """Non important changes such as update_timestamp shouldn't be stored + + :param args: list of keys that should be ignored + :return: True if this change should be stored + """ + if not curr: + return False + if curr and not before: + return True + for key, content in curr.properties.items(): + if key in args: + continue + elif isinstance(content, dict) or isinstance(content, list): + return True # TODO(ihefetz): can be imporved + elif before.properties.get(key) != content: + return True + return False diff --git a/vitrage/entity_graph/processor/processor.py b/vitrage/entity_graph/processor/processor.py index 38209307c..3b7aeab1f 100644 --- a/vitrage/entity_graph/processor/processor.py +++ b/vitrage/entity_graph/processor/processor.py @@ -21,7 +21,6 @@ from vitrage.datasources.transformer_base import TransformerBase from vitrage.entity_graph.mappings.datasource_info_mapper import \ DatasourceInfoMapper from vitrage.entity_graph.processor import base as processor -from vitrage.entity_graph.processor.notifier import GraphNotifier from vitrage.entity_graph.processor import processor_utils as PUtils from vitrage.entity_graph.processor.transformer_manager import \ TransformerManager @@ -32,15 +31,13 @@ LOG = log.getLogger(__name__) class Processor(processor.ProcessorBase): - def __init__(self, conf, e_graph=None, graph_persistor=None): + def __init__(self, conf, e_graph=None): super(Processor, self).__init__() self.conf = conf self.transformer_manager = TransformerManager(self.conf) self.info_mapper = DatasourceInfoMapper(self.conf) self._initialize_events_actions() self.entity_graph = e_graph - self._notifier = GraphNotifier(conf) - self._graph_persistor = graph_persistor def process_event(self, event): """Decides which action to run on given event @@ -60,10 +57,9 @@ class Processor(processor.ProcessorBase): if entity.action not in self.actions.keys(): LOG.debug('deprecated or unknown entity %s ignored', str(entity)) return + self._calculate_vitrage_aggregated_values(entity.vertex, entity.action) self.actions[entity.action](entity.vertex, entity.neighbors) - if self._graph_persistor: - self._graph_persistor.update_last_event_timestamp(event) def create_entity(self, new_vertex, neighbors): """Adds new vertex to the entity graph @@ -141,7 +137,7 @@ class Processor(processor.ProcessorBase): PUtils.mark_deleted(self.entity_graph, deleted_vertex) else: - LOG.warning("Delete event arrived on invalid resource: " + LOG.warning("Delete entity arrived on invalid resource: " "deleted_vertex - %s, graph_vertex - %s", deleted_vertex, graph_vertex) @@ -185,15 +181,10 @@ class Processor(processor.ProcessorBase): PUtils.is_newer_vertex(graph_vertex, vertex): self.entity_graph.remove_vertex(vertex) else: - LOG.warning("Delete event arrived on invalid resource: " + LOG.warning("Remove deleted entity arrived on invalid resource: " "deleted_vertex - %s, graph_vertex - %s", vertex, graph_vertex) - def start_notifier(self): - if self._notifier and self._notifier.enabled: - self.entity_graph.subscribe(self._notifier.notify_when_applicable) - LOG.info('Graph notifications subscription added') - def _update_neighbors(self, vertex, neighbors): """Updates vertices neighbor connections diff --git a/vitrage/entity_graph/scheduler.py b/vitrage/entity_graph/scheduler.py index c2052bb5f..cfd839ffa 100644 --- a/vitrage/entity_graph/scheduler.py +++ b/vitrage/entity_graph/scheduler.py @@ -23,20 +23,18 @@ from vitrage.common.utils import spawn from vitrage.entity_graph.consistency.consistency_enforcer import\ ConsistencyEnforcer from vitrage.entity_graph import datasource_rpc as ds_rpc -from vitrage.persistency.graph_persistor import GraphPersistor LOG = log.getLogger(__name__) class Scheduler(object): - def __init__(self, conf, graph, events_coordination): + def __init__(self, conf, graph, events_coordination, persist): super(Scheduler, self).__init__() self.conf = conf self.graph = graph self.events_coordination = events_coordination - self.graph_persistor = GraphPersistor(conf) if \ - self.conf.persistency.enable_persistency else None + self.persist = persist self.consistency = ConsistencyEnforcer(conf, graph) self.periodic = None @@ -44,27 +42,10 @@ class Scheduler(object): self.periodic = periodics.PeriodicWorker.create( [], executor_factory=lambda: ThreadPoolExecutor(max_workers=10)) - self.add_persist_timer() self.add_consistency_timer() self.add_rpc_datasources_timers() spawn(self.periodic.start) - def add_persist_timer(self): - if not self.graph_persistor: - return - spacing = self.conf.persistency.graph_persistency_interval - - @periodics.periodic(spacing=spacing) - def persist_periodic(): - if self.graph_persistor: - try: - self.graph_persistor.store_graph(graph=self.graph) - except Exception: - LOG.exception('Persist failed.') - - self.periodic.add(persist_periodic) - LOG.info("added persist_periodic (spacing=%s)", spacing) - def add_consistency_timer(self): spacing = self.conf.datasources.snapshots_interval @@ -89,6 +70,7 @@ class Scheduler(object): self.events_coordination, self.conf.datasources.types, DatasourceAction.SNAPSHOT) + self.persist.store_graph() except Exception: LOG.exception('get_all_periodic failed.') diff --git a/vitrage/entity_graph/workers.py b/vitrage/entity_graph/workers.py index ca71b1c33..5e2817f6e 100644 --- a/vitrage/entity_graph/workers.py +++ b/vitrage/entity_graph/workers.py @@ -42,6 +42,7 @@ LOG = log.getLogger(__name__) # Supported message types GRAPH_UPDATE = 'graph_update' +ENABLE_EVALUATION = 'enable_evaluation' START_EVALUATION = 'start_evaluation' RELOAD_TEMPLATES = 'reload_templates' TEMPLATE_ACTION = 'template_action' @@ -151,6 +152,13 @@ class GraphWorkersManager(cotyledon.ServiceManager): """ self._submit_and_wait(self._evaluator_queues, (START_EVALUATION,)) + def submit_enable_evaluations(self): + """Enable scenario-evaluator in all evaluator workers + + Only enables the worker's scenario-evaluator, without traversing + """ + self._submit_and_wait(self._evaluator_queues, (ENABLE_EVALUATION,)) + def submit_evaluators_reload_templates(self): """Recreate the scenario-repository in all evaluator workers @@ -288,7 +296,11 @@ class EvaluatorWorker(GraphCloneWorkerBase): super(EvaluatorWorker, self).do_task(task) action = task[0] if action == START_EVALUATION: + # fresh init (without snapshot) requires iterating the graph self._evaluator.run_evaluator() + elif action == ENABLE_EVALUATION: + # init with a snapshot does not require iterating the graph + self._evaluator.enabled = True elif action == RELOAD_TEMPLATES: self._reload_templates() diff --git a/vitrage/evaluator/scenario_evaluator.py b/vitrage/evaluator/scenario_evaluator.py index 151ec566c..03c91296f 100644 --- a/vitrage/evaluator/scenario_evaluator.py +++ b/vitrage/evaluator/scenario_evaluator.py @@ -21,6 +21,7 @@ from oslo_log import log from vitrage.common.constants import EdgeProperties as EProps from vitrage.common.constants import VertexProperties as VProps +from vitrage.common.utils import md5 from vitrage.common.utils import recursive_keypairs from vitrage.datasources.listener_service import defaultdict from vitrage.entity_graph.mappings.datasource_info_mapper \ @@ -232,7 +233,7 @@ class ScenarioEvaluator(object): match_action_spec = self._get_action_spec(action_spec, match) items_ids = \ [match_item[1].vertex_id for match_item in match.items()] - match_hash = hash(tuple(sorted(items_ids))) + match_hash = md5(tuple(sorted(items_ids))) self._evaluate_property_functions(template_schema, match, match_action_spec.properties) @@ -301,11 +302,8 @@ class ScenarioEvaluator(object): def _generate_action_id(action_spec): """Generate a unique action id for the action - BEWARE: The implementation of this function MUST NOT BE CHANGED!! - - The created hash is used for storing the active actions in the - database. If changed, existing active actions can no longer be - retrieved. + BEWARE: The value created here should not be stored in database, + as in python3, the hash function seed changes after program restart """ targets = [(k, v.vertex_id) for k, v in action_spec.targets.items()] return hash( diff --git a/vitrage/graph/algo_driver/networkx_algorithm.py b/vitrage/graph/algo_driver/networkx_algorithm.py index 5ecc6cbdb..f331c4ee1 100644 --- a/vitrage/graph/algo_driver/networkx_algorithm.py +++ b/vitrage/graph/algo_driver/networkx_algorithm.py @@ -89,12 +89,6 @@ class NXAlgorithm(GraphAlgorithm): vertices=self._vertex_result_to_list(n_result), edges=self._edge_result_to_list(e_result)) - LOG.debug('graph_query_vertices: find graph: nodes %s, edges %s', - str(list(graph._g.nodes(data=True))), - str(list(graph._g.edges(data=True)))) - LOG.debug('graph_query_vertices: real graph: nodes %s, edges %s', - str(list(self.graph._g.nodes(data=True))), - str(list(self.graph._g.edges(data=True)))) return graph def sub_graph_matching(self, diff --git a/vitrage/graph/driver/graph.py b/vitrage/graph/driver/graph.py index d8bf0a691..e1335125e 100644 --- a/vitrage/graph/driver/graph.py +++ b/vitrage/graph/driver/graph.py @@ -49,8 +49,25 @@ class Graph(object): self.graph_type = graph_type self.notifier = Notifier() - def subscribe(self, function): - self.notifier.subscribe(function) + def subscribe(self, function, finalization=False): + """Subscribe to graph changes + + :param function: function will be called after each graph change + :param finalization: function will be called after all non finalization + + Usage Example: + graph = NXGraph() + graph.subscribe(foo1, finalization=True) + graph.subscribe(foo2, finalization=False) + graph.subscribe(foo3, finalization=False) + + The order of the calls in this example wii be: + 1. foo2 + 2. foo3 + 3. foo1 + foo1 is called last because it subscribed as a finalization function + """ + self.notifier.subscribe(function, finalization) def is_subscribed(self): return self.notifier.is_subscribed() diff --git a/vitrage/graph/driver/notifier.py b/vitrage/graph/driver/notifier.py index bbf93b04f..966222989 100644 --- a/vitrage/graph/driver/notifier.py +++ b/vitrage/graph/driver/notifier.py @@ -11,9 +11,10 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - import functools +import itertools + from vitrage.graph.driver.elements import Vertex @@ -34,15 +35,21 @@ def _after_func(graph, item, data_before=None): class Notifier(object): def __init__(self): self._subscriptions = [] + self._finalization_subscriptions = [] - def subscribe(self, function): - self._subscriptions.append(function) + def subscribe(self, function, finalization=False): + if finalization: + self._finalization_subscriptions.append(function) + else: + self._subscriptions.append(function) def is_subscribed(self): - return len(self._subscriptions) != 0 + size = len(self._subscriptions) + len(self._finalization_subscriptions) + return size != 0 def notify(self, *args, **kwargs): - for func in self._subscriptions: + for func in itertools.chain(self._subscriptions, + self._finalization_subscriptions): func(*args, **kwargs) @staticmethod diff --git a/vitrage/persistency/__init__.py b/vitrage/persistency/__init__.py index a7950a197..db8157379 100644 --- a/vitrage/persistency/__init__.py +++ b/vitrage/persistency/__init__.py @@ -22,7 +22,4 @@ OPTS = [ cfg.BoolOpt('enable_persistency', default=False, help='Periodically store entity graph snapshot to database'), - cfg.IntOpt('graph_persistency_interval', - default=3600, - help='Store graph to database every X seconds'), ] diff --git a/vitrage/persistency/graph_persistor.py b/vitrage/persistency/graph_persistor.py deleted file mode 100644 index 779188398..000000000 --- a/vitrage/persistency/graph_persistor.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright 2018 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from __future__ import print_function - -from oslo_log import log - -from dateutil import parser -from vitrage.common.constants import DatasourceProperties as DSProps -from vitrage.graph.driver.networkx_graph import NXGraph -from vitrage import storage -from vitrage.storage.sqlalchemy import models -from vitrage.utils import datetime -from vitrage.utils.datetime import utcnow - - -LOG = log.getLogger(__name__) - - -class GraphPersistor(object): - def __init__(self, conf): - super(GraphPersistor, self).__init__() - self.db_connection = storage.get_connection_from_config(conf) - self.last_event_timestamp = datetime.datetime.utcnow() - - def store_graph(self, graph): - LOG.info('Graph persistency running..') - try: - graph_snapshot = graph.write_gpickle() - db_row = models.GraphSnapshot( - last_event_timestamp=self.last_event_timestamp, - graph_snapshot=graph_snapshot) - self.db_connection.graph_snapshots.create(db_row) - except Exception as e: - LOG.exception("Graph is not stored: %s", e) - - def load_graph(self, timestamp=None): - db_row = self.db_connection.graph_snapshots.query(timestamp) if \ - timestamp else self.db_connection.graph_snapshots.query(utcnow()) - return NXGraph.read_gpickle(db_row.graph_snapshot) if db_row else None - - def delete_graph_snapshots(self, timestamp): - """Deletes all graph snapshots until timestamp""" - self.db_connection.graph_snapshots.delete(timestamp) - - def update_last_event_timestamp(self, event): - timestamp = event.get(DSProps.SAMPLE_DATE) - self.last_event_timestamp = parser.parse(timestamp) if timestamp \ - else None diff --git a/vitrage/persistency/service.py b/vitrage/persistency/service.py index 4b096b747..bc7d42ad8 100644 --- a/vitrage/persistency/service.py +++ b/vitrage/persistency/service.py @@ -13,16 +13,15 @@ # under the License. from __future__ import print_function - +from concurrent.futures import ThreadPoolExecutor import cotyledon -import dateutil.parser -import oslo_messaging as oslo_m +from futurist import periodics from oslo_log import log -from vitrage.common.constants import DatasourceProperties as DSProps -from vitrage.common.constants import GraphAction +import oslo_messaging as oslo_m + +from vitrage.common.utils import spawn from vitrage import messaging -from vitrage.storage.sqlalchemy import models LOG = log.getLogger(__name__) @@ -39,11 +38,13 @@ class PersistorService(cotyledon.Service): self.listener = messaging.get_notification_listener( transport, [target], [VitragePersistorEndpoint(self.db_connection)]) + self.scheduler = Scheduler(conf, db_connection) def run(self): LOG.info("Vitrage Persistor Service - Starting...") self.listener.start() + self.scheduler.start_periodic_tasks() LOG.info("Vitrage Persistor Service - Started!") @@ -57,19 +58,45 @@ class PersistorService(cotyledon.Service): class VitragePersistorEndpoint(object): + + funcs = {} + def __init__(self, db_connection): self.db_connection = db_connection def info(self, ctxt, publisher_id, event_type, payload, metadata): - LOG.debug('Vitrage Event Info: payload %s', payload) - self.process_event(payload) + LOG.debug('Event_type: %s Payload %s', event_type, payload) + if event_type and event_type in self.funcs.keys(): + self.funcs[event_type](self.db_connection, event_type, payload) - def process_event(self, data): - """:param data: Serialized to a JSON formatted ``str`` """ - if data.get(DSProps.EVENT_TYPE) == GraphAction.END_MESSAGE: - return - collector_timestamp = \ - dateutil.parser.parse(data.get(DSProps.SAMPLE_DATE)) - event_row = models.Event(payload=data, - collector_timestamp=collector_timestamp) - self.db_connection.events.create(event_row) + +class Scheduler(object): + + def __init__(self, conf, db): + self.conf = conf + self.db = db + self.periodic = None + + def start_periodic_tasks(self): + self.periodic = periodics.PeriodicWorker.create( + [], executor_factory=lambda: ThreadPoolExecutor(max_workers=10)) + + self.add_expirer_timer() + spawn(self.periodic.start) + + def add_expirer_timer(self): + spacing = 60 + + @periodics.periodic(spacing=spacing) + def expirer_periodic(): + try: + event_id = self.db.graph_snapshots.query_snapshot_event_id() + if event_id: + LOG.debug('Expirer deleting event - id=%s', event_id) + self.db.events.delete(event_id) + + except Exception: + LOG.exception('DB periodic cleanup run failed.') + + self.periodic.add(expirer_periodic) + LOG.info("Database periodic cleanup starting (spacing=%ss)", spacing) diff --git a/vitrage/storage/base.py b/vitrage/storage/base.py index d0e74cd16..bfc2ee286 100644 --- a/vitrage/storage/base.py +++ b/vitrage/storage/base.py @@ -203,12 +203,8 @@ class EventsConnection(object): """ raise NotImplementedError('query events is not implemented') - def delete(self, - event_id=None, - collector_timestamp=None, - gt_collector_timestamp=None, - lt_collector_timestamp=None): - """Delete all events that match the filters.""" + def delete(self, event_id=None): + """Delete all events older than event_id""" raise NotImplementedError('delete events is not implemented') diff --git a/vitrage/storage/impl_sqlalchemy.py b/vitrage/storage/impl_sqlalchemy.py index d7a5a464f..c48a17e7d 100644 --- a/vitrage/storage/impl_sqlalchemy.py +++ b/vitrage/storage/impl_sqlalchemy.py @@ -18,6 +18,7 @@ from __future__ import absolute_import from oslo_db.sqlalchemy import session as db_session from oslo_log import log from sqlalchemy.engine import url as sqlalchemy_url +from sqlalchemy import or_ from vitrage.common.exception import VitrageInputError from vitrage import storage @@ -79,6 +80,18 @@ class Connection(base.Connection): def upgrade(self, nocreate=False): engine = self._engine_facade.get_engine() engine.connect() + + # As the following tables were changed in Rocky, they are removed and + # created. This is fine for an upgrade from Queens, since data in these + # was anyway deleted in each restart. + # starting From Rocky, data in these tables should not be removed. + + models.Base.metadata.drop_all( + engine, tables=[ + models.ActiveAction.__table__, + models.Event.__table__, + models.GraphSnapshot.__table__]) + models.Base.metadata.create_all( engine, tables=[models.ActiveAction.__table__, models.Template.__table__, @@ -247,6 +260,21 @@ class EventsConnection(base.EventsConnection, BaseTableConn): with session.begin(): session.merge(event) + def get_last_event_id(self): + session = self._engine_facade.get_session() + query = session.query(models.Event.event_id) + return query.order_by(models.Event.event_id.desc()).first() + + def get_replay_events(self, event_id): + """Get all events that occurred after the specified event_id + + :rtype: list of vitrage.storage.sqlalchemy.models.Event + """ + session = self._engine_facade.get_session() + query = session.query(models.Event) + query = query.filter(models.Event.event_id > event_id) + return query.order_by(models.Event.event_id.asc()).all() + def query(self, event_id=None, collector_timestamp=None, @@ -290,31 +318,12 @@ class EventsConnection(base.EventsConnection, BaseTableConn): lt_collector_timestamp) return query - def delete(self, - event_id=None, - collector_timestamp=None, - gt_collector_timestamp=None, - lt_collector_timestamp=None): - """Delete all events that match the filters. - - :raises: vitrage.common.exception.VitrageInputError. - """ - if (event_id or collector_timestamp) and \ - (gt_collector_timestamp or lt_collector_timestamp): - msg = "Calling function with both specific event and range of " \ - "events parameters at the same time " - LOG.debug(msg) - raise VitrageInputError(msg) - - query = self.query_filter( - models.Event, - event_id=event_id, - collector_timestamp=collector_timestamp) - - query = self._update_query_gt_lt(gt_collector_timestamp, - lt_collector_timestamp, - query) - + def delete(self, event_id=None): + """Delete all events older than event_id""" + session = self._engine_facade.get_session() + query = session.query(models.Event) + if event_id: + query = query.filter(models.Event.event_id < event_id) query.delete() @@ -334,15 +343,19 @@ class GraphSnapshotsConnection(base.GraphSnapshotsConnection, BaseTableConn): def query(self, timestamp=None): query = self.query_filter(models.GraphSnapshot) - query = query.filter(models.GraphSnapshot.last_event_timestamp <= - timestamp) - return query.order_by( - models.GraphSnapshot.last_event_timestamp.desc()).first() + query = query.filter( + or_(models.GraphSnapshot.updated_at >= timestamp, + models.GraphSnapshot.created_at >= timestamp)) + return query.first() - def delete(self, timestamp=None): - """Delete all graph snapshots taken until timestamp.""" + def query_snapshot_event_id(self): + """Select the event_id of the stored snapshot""" + session = self._engine_facade.get_session() + query = session.query(models.GraphSnapshot.event_id) + result = query.first() + return result[0] if result else None + + def delete(self): + """Delete all graph snapshots""" query = self.query_filter(models.GraphSnapshot) - - query = query.filter(models.GraphSnapshot.last_event_timestamp <= - timestamp) query.delete() diff --git a/vitrage/storage/sqlalchemy/models.py b/vitrage/storage/sqlalchemy/models.py index 06bd0c339..e1dc64850 100644 --- a/vitrage/storage/sqlalchemy/models.py +++ b/vitrage/storage/sqlalchemy/models.py @@ -16,7 +16,7 @@ import zlib from oslo_db.sqlalchemy import models -from sqlalchemy import Column, DateTime, INTEGER, String, \ +from sqlalchemy import Column, INTEGER, String, \ SmallInteger, BigInteger, Index, Boolean from sqlalchemy.ext.declarative import declarative_base @@ -73,20 +73,19 @@ class Event(Base): __tablename__ = 'events' - event_id = Column("id", INTEGER, primary_key=True, nullable=False, - autoincrement=True) - collector_timestamp = Column(DateTime, index=True, nullable=False) + event_id = Column("id", BigInteger(), primary_key=True, autoincrement=True) payload = Column(JSONEncodedDict(), nullable=False) + is_vertex = Column(Boolean, nullable=False) def __repr__(self): return \ "" % \ ( self.event_id, - self.collector_timestamp, + self.is_vertex, self.payload ) @@ -106,7 +105,7 @@ class ActiveAction(Base, models.TimestampMixin): target_vertex_id = Column(String(128)) action_id = Column(String(128), primary_key=True) score = Column(SmallInteger()) - trigger = Column(BigInteger(), primary_key=True) + trigger = Column(String(128), primary_key=True) def __repr__(self): return \ @@ -134,16 +133,19 @@ class ActiveAction(Base, models.TimestampMixin): class GraphSnapshot(Base): __tablename__ = 'graph_snapshots' - last_event_timestamp = Column(DateTime, primary_key=True, nullable=False) + snapshot_id = Column("id", INTEGER, primary_key=True) + event_id = Column(BigInteger, nullable=False) graph_snapshot = Column(CompressedBinary((2 ** 32) - 1), nullable=False) def __repr__(self): return \ "" %\ ( - self.last_event_timestamp, + self.snapshot_id, + self.event_id, self.graph_snapshot ) diff --git a/vitrage/tests/functional/entity_graph/graph_persistor/test_graph_persistor.py b/vitrage/tests/functional/entity_graph/graph_persistor/test_graph_persistor.py index 6d0edab7b..0366c9959 100644 --- a/vitrage/tests/functional/entity_graph/graph_persistor/test_graph_persistor.py +++ b/vitrage/tests/functional/entity_graph/graph_persistor/test_graph_persistor.py @@ -11,21 +11,21 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import sys import time from oslo_config import cfg -from oslo_db.options import database_opts -from vitrage.persistency.graph_persistor import GraphPersistor -from vitrage import storage -from vitrage.storage.sqlalchemy import models +from vitrage.common.constants import EdgeProperties +from vitrage.common.constants import VertexProperties +from vitrage.graph.driver.networkx_graph import NXGraph + +from vitrage.entity_graph import graph_persistency from vitrage.tests.functional.base import TestFunctionalBase +from vitrage.tests.functional.test_configuration import TestConfiguration from vitrage.tests.mocks.graph_generator import GraphGenerator -from vitrage.utils.datetime import utcnow -class TestGraphPersistor(TestFunctionalBase): +class TestGraphPersistor(TestFunctionalBase, TestConfiguration): # noinspection PyAttributeOutsideInit,PyPep8Naming @classmethod @@ -34,62 +34,79 @@ class TestGraphPersistor(TestFunctionalBase): cls.conf = cfg.ConfigOpts() cls.conf.register_opts(cls.PROCESSOR_OPTS, group='entity_graph') cls.conf.register_opts(cls.DATASOURCES_OPTS, group='datasources') - cls.conf.register_opts(database_opts, group='database') - cls.conf.set_override('connection', 'sqlite:///test-%s.db' - % sys.version_info[0], group='database') - cls._db = storage.get_connection_from_config(cls.conf) - engine = cls._db._engine_facade.get_engine() - models.Base.metadata.create_all(engine) + cls.conf.register_opts(cls.PERSISTENCY_OPTS, group='persistency') + cls.add_db(cls.conf) cls.load_datasources(cls.conf) - cls.graph_persistor = GraphPersistor(cls.conf) + graph_persistency.EPSILON = 0.1 - def test_persist_graph(self): + def test_graph_store_and_query_recent_snapshot(self): g = GraphGenerator().create_graph() - current_time = utcnow() - self.graph_persistor.last_event_timestamp = current_time - self.graph_persistor.store_graph(g) - graph_snapshot = self.graph_persistor.load_graph(current_time) - self.assert_graph_equal(g, graph_snapshot) - self.graph_persistor.delete_graph_snapshots(utcnow()) + graph_persistor = graph_persistency.GraphPersistency(self.conf, + self._db, g) + graph_persistor.store_graph() + recovered_data = graph_persistor.query_recent_snapshot() + recovered_graph = self.load_snapshot(recovered_data) + self.assert_graph_equal(g, recovered_graph) - def test_persist_two_graphs(self): - g1 = GraphGenerator().create_graph() - current_time1 = utcnow() - self.graph_persistor.last_event_timestamp = current_time1 - self.graph_persistor.store_graph(g1) - graph_snapshot1 = self.graph_persistor.load_graph(current_time1) + time.sleep(graph_persistency.EPSILON + 0.1 + + 3 * self.conf.datasources.snapshots_interval) + recovered_data = graph_persistor.query_recent_snapshot() + self.assertIsNone(recovered_data, 'Should not be a recent snapshot') - g2 = GraphGenerator(5).create_graph() - current_time2 = utcnow() - self.graph_persistor.last_event_timestamp = current_time2 - self.graph_persistor.store_graph(g2) - graph_snapshot2 = self.graph_persistor.load_graph(current_time2) - - self.assert_graph_equal(g1, graph_snapshot1) - self.assert_graph_equal(g2, graph_snapshot2) - self.graph_persistor.delete_graph_snapshots(utcnow()) - - def test_load_last_graph_snapshot_until_timestamp(self): - g1 = GraphGenerator().create_graph() - self.graph_persistor.last_event_timestamp = utcnow() - self.graph_persistor.store_graph(g1) - - time.sleep(1) - time_in_between = utcnow() - time.sleep(1) - - g2 = GraphGenerator(5).create_graph() - self.graph_persistor.last_event_timestamp = utcnow() - self.graph_persistor.store_graph(g2) - - graph_snapshot = self.graph_persistor.load_graph(time_in_between) - self.assert_graph_equal(g1, graph_snapshot) - self.graph_persistor.delete_graph_snapshots(utcnow()) - - def test_delete_graph_snapshots(self): + def test_event_store_and_replay_events(self): g = GraphGenerator().create_graph() - self.graph_persistor.last_event_timestamp = utcnow() - self.graph_persistor.store_graph(g) - self.graph_persistor.delete_graph_snapshots(utcnow()) - graph_snapshot = self.graph_persistor.load_graph(utcnow()) - self.assertIsNone(graph_snapshot) + vertices = g.get_vertices() + graph_persistor = graph_persistency.GraphPersistency(self.conf, + self._db, g) + self.fail_msg = None + self.event_id = 1 + + def callback(pre_item, + current_item, + is_vertex, + graph): + try: + graph_persistor.persist_event( + pre_item, current_item, is_vertex, graph, self.event_id) + except Exception as e: + self.fail_msg = 'persist_event failed with exception ' + str(e) + self.event_id = self.event_id + 1 + + # Subscribe graph changes to callback, so events are written to db + # after each update_vertex and update_edge callback will be called + g.subscribe(callback) + vertices[0][VertexProperties.VITRAGE_IS_DELETED] = True + g.update_vertex(vertices[0]) + vertices[1][VertexProperties.VITRAGE_IS_DELETED] = True + g.update_vertex(vertices[1]) + edge = g.get_edges(vertices[0].vertex_id).pop() + edge[EdgeProperties.VITRAGE_IS_DELETED] = True + g.update_edge(edge) + + # Store graph: + graph_persistor.store_graph() + + # Create more events: + vertices[2][VertexProperties.VITRAGE_IS_DELETED] = True + g.update_vertex(vertices[2]) + vertices[3][VertexProperties.VITRAGE_IS_DELETED] = True + g.update_vertex(vertices[3]) + edge = g.get_edges(vertices[2].vertex_id).pop() + edge[EdgeProperties.RELATIONSHIP_TYPE] = 'kuku' + g.update_edge(edge) + + self.assertIsNone(self.fail_msg, 'callback failed') + + # Reload snapshot + recovered_data = graph_persistor.query_recent_snapshot() + recovered_graph = self.load_snapshot(recovered_data) + + # Replay events: + self.assertEqual(3, recovered_data.event_id, 'graph snapshot event_id') + graph_persistor.replay_events(recovered_graph, recovered_data.event_id) + + self.assert_graph_equal(g, recovered_graph) + + @staticmethod + def load_snapshot(data): + return NXGraph.read_gpickle(data.graph_snapshot) if data else None diff --git a/vitrage/tests/resources/mock_configurations/vertices/nova.host.json b/vitrage/tests/resources/mock_configurations/vertices/nova.host.json index cee3e4995..e0fe14ee4 100644 --- a/vitrage/tests/resources/mock_configurations/vertices/nova.host.json +++ b/vitrage/tests/resources/mock_configurations/vertices/nova.host.json @@ -3,11 +3,11 @@ "id": "111", "vitrage_is_deleted": false, "vitrage_category": "RESOURCE", - "vitrage_operational_state": "N/A", + "vitrage_operational_state": "OK", "state": "AVAILABLE", "vitrage_type": "nova.host", "vitrage_sample_timestamp": "2017-12-24 10:32:41.389676+00:00", - "vitrage_aggregated_state": null, + "vitrage_aggregated_state": "AVAILABLE", "vitrage_is_placeholder": false, "is_real_vitrage_id": true } diff --git a/vitrage/tests/unit/datasources/nova/test_nova_instance_transformer.py b/vitrage/tests/unit/datasources/nova/test_nova_instance_transformer.py index dfa23b5a1..276d2373a 100644 --- a/vitrage/tests/unit/datasources/nova/test_nova_instance_transformer.py +++ b/vitrage/tests/unit/datasources/nova/test_nova_instance_transformer.py @@ -163,7 +163,7 @@ class NovaInstanceTransformerTest(base.BaseTest): def _validate_vertex_props(self, vertex, event): - self.assertThat(vertex.properties, matchers.HasLength(13)) + self.assertThat(vertex.properties, matchers.HasLength(14)) is_update_event = tbase.is_update_event(event) diff --git a/vitrage/tests/unit/entity_graph/base.py b/vitrage/tests/unit/entity_graph/base.py index f146e7ff1..321a9b31a 100644 --- a/vitrage/tests/unit/entity_graph/base.py +++ b/vitrage/tests/unit/entity_graph/base.py @@ -60,6 +60,13 @@ class TestEntityGraphUnitBase(base.BaseTest): min=1) ] + PERSISTENCY_OPTS = [ + cfg.StrOpt('persistor_topic', + default=None), + cfg.BoolOpt('enable_persistency', + default=True), + ] + NUM_CLUSTERS = 1 NUM_ZONES = 2 NUM_HOSTS = 4 diff --git a/vitrage/tests/unit/graph/test_graph.py b/vitrage/tests/unit/graph/test_graph.py index d961f82d8..3dbeb1fa6 100644 --- a/vitrage/tests/unit/graph/test_graph.py +++ b/vitrage/tests/unit/graph/test_graph.py @@ -427,7 +427,7 @@ class TestGraph(GraphTestBase): self.assertEqual(OPENSTACK_CLUSTER, found_vertex[VProps.VITRAGE_TYPE], 'get_vertices check node vertex') - def _check_callback_result(self, result, msg, exp_prev, exp_curr): + def _check_callbacks_result(self, msg, exp_prev, exp_curr): def assert_none_or_equals(exp, act, message): if exp: @@ -435,12 +435,17 @@ class TestGraph(GraphTestBase): else: self.assertIsNone(act, message) - self.assertIsNotNone(result, msg + ' Callback was not called') - assert_none_or_equals(exp_prev, result[0], + self.assertIsNotNone(self.result, msg + ' Callback was not called') + assert_none_or_equals(exp_prev, self.result[0], msg + ' prev_item unexpected') - assert_none_or_equals(exp_curr, result[1], + assert_none_or_equals(exp_curr, self.result[1], msg + ' curr_item unexpected') + + self.assertEqual(self.result, self.final_result, + 'callback order is incorrect') + self.result = None + self.final_result = None def _assert_none_or_equals(self, exp, act, msg): if exp: @@ -453,11 +458,15 @@ class TestGraph(GraphTestBase): g = NXGraph('test_graph_callbacks') self.result = None + self.final_result = None - def callback(pre_item, - current_item, - is_vertex, - graph): + def callback_2(pre_item, current_item, is_vertex, graph): + # We want to make sure this callback was called ^after^ the other + # And expect that the later callback copies the result from the + # prior call, hence these should be equal after both were called + self.final_result = self.result + + def callback(pre_item, current_item, is_vertex, graph): LOG.info('called with: pre_event_item ' + str(pre_item) + ' current_item ' + str(current_item)) self.assertIsNotNone(current_item) @@ -470,32 +479,31 @@ class TestGraph(GraphTestBase): 'Got notification, but add_vertex notification is not registered') # subscribe + g.subscribe(callback_2, finalization=True) g.subscribe(callback) # These actions will trigger callbacks: g.add_vertex(v_node) - self._check_callback_result(self.result, 'add vertex', None, v_node) + self._check_callbacks_result('add vertex', None, v_node) g.add_vertex(v_host) - self._check_callback_result(self.result, 'add vertex', None, v_host) + self._check_callbacks_result('add vertex', None, v_host) g.add_edge(e_node_to_host) - self._check_callback_result(self.result, 'add edge', None, - e_node_to_host) + self._check_callbacks_result('add edge', None, e_node_to_host) updated_vertex = g.get_vertex(v_host.vertex_id) updated_vertex[VProps.VITRAGE_CATEGORY] = ALARM g.update_vertex(updated_vertex) - self._check_callback_result(self.result, 'update vertex', - v_host, updated_vertex) + self._check_callbacks_result('update vertex', v_host, updated_vertex) updated_edge = g.get_edge(e_node_to_host.source_id, e_node_to_host.target_id, e_node_to_host.label) updated_edge['ZIG'] = 'ZAG' g.update_edge(updated_edge) - self._check_callback_result(self.result, 'update edge', e_node_to_host, - updated_edge) + self._check_callbacks_result('update edge', e_node_to_host, + updated_edge) def test_union(self): v1 = v_node