Graph fast fail-over

* Initialize quickly upon fail-over without requesting updates.
* In case of downtime, Vitrage-graph startup will requests collector updates
* vitrage-persistor has an expirer timer to remove old db events

Story: 2002663
Task: 22473
Change-Id: Icccf230e69c41a2f115c0797e60df774db637594
Depends-On: I042665e0d642ba36a97af84a6dc0581888025207
Depends-On: Id5dbd165a1e0220e4e24207e8d237f94415fc490
This commit is contained in:
Idan Hefetz 2018-07-10 15:04:48 +00:00
parent d5c742f460
commit fb4088c32c
30 changed files with 496 additions and 381 deletions

View File

@ -45,8 +45,8 @@ debug = false
notifiers = nova,webhook notifiers = nova,webhook
[datasources] [datasources]
types=mock_graph_datasource types=doctor,mock_graph_datasource
path=vitrage.tests.mocks path=vitrage.datasources,vitrage.tests.mocks
snapshots_interval=60 snapshots_interval=60
[mock_graph_datasource] [mock_graph_datasource]

View File

@ -32,6 +32,7 @@ console_scripts =
vitrage-persistor = vitrage.cli.persistor:main vitrage-persistor = vitrage.cli.persistor:main
vitrage-ml = vitrage.cli.machine_learning:main vitrage-ml = vitrage.cli.machine_learning:main
vitrage-dbsync = vitrage.cli.storage:dbsync vitrage-dbsync = vitrage.cli.storage:dbsync
vitrage-purge-data = vitrage.cli.storage:purge_data
vitrage-snmp-parsing = vitrage.cli.snmp_parsing:main vitrage-snmp-parsing = vitrage.cli.snmp_parsing:main
vitrage.entity_graph = vitrage.entity_graph =

View File

@ -21,3 +21,12 @@ def dbsync():
print(VITRAGE_TITLE) print(VITRAGE_TITLE)
conf = service.prepare_service() conf = service.prepare_service()
storage.get_connection_from_config(conf).upgrade() storage.get_connection_from_config(conf).upgrade()
def purge_data():
print(VITRAGE_TITLE)
conf = service.prepare_service()
db = storage.get_connection_from_config(conf)
db.active_actions.delete()
db.events.delete()
db.graph_snapshots.delete()

View File

@ -14,12 +14,16 @@
# under the License. # under the License.
class VertexProperties(object): class ElementProperties(object):
VITRAGE_IS_DELETED = 'vitrage_is_deleted'
UPDATE_TIMESTAMP = 'update_timestamp'
class VertexProperties(ElementProperties):
VITRAGE_CATEGORY = 'vitrage_category' VITRAGE_CATEGORY = 'vitrage_category'
VITRAGE_TYPE = 'vitrage_type' VITRAGE_TYPE = 'vitrage_type'
VITRAGE_ID = 'vitrage_id' VITRAGE_ID = 'vitrage_id'
VITRAGE_STATE = 'vitrage_state' VITRAGE_STATE = 'vitrage_state'
VITRAGE_IS_DELETED = 'vitrage_is_deleted'
VITRAGE_IS_PLACEHOLDER = 'vitrage_is_placeholder' VITRAGE_IS_PLACEHOLDER = 'vitrage_is_placeholder'
VITRAGE_SAMPLE_TIMESTAMP = 'vitrage_sample_timestamp' VITRAGE_SAMPLE_TIMESTAMP = 'vitrage_sample_timestamp'
VITRAGE_AGGREGATED_STATE = 'vitrage_aggregated_state' VITRAGE_AGGREGATED_STATE = 'vitrage_aggregated_state'
@ -27,10 +31,10 @@ class VertexProperties(object):
VITRAGE_AGGREGATED_SEVERITY = 'vitrage_aggregated_severity' VITRAGE_AGGREGATED_SEVERITY = 'vitrage_aggregated_severity'
VITRAGE_OPERATIONAL_SEVERITY = 'vitrage_operational_severity' VITRAGE_OPERATIONAL_SEVERITY = 'vitrage_operational_severity'
VITRAGE_RESOURCE_ID = 'vitrage_resource_id' VITRAGE_RESOURCE_ID = 'vitrage_resource_id'
VITRAGE_CACHED_ID = 'vitrage_cached_id'
ID = 'id' ID = 'id'
STATE = 'state' STATE = 'state'
PROJECT_ID = 'project_id' PROJECT_ID = 'project_id'
UPDATE_TIMESTAMP = 'update_timestamp'
NAME = 'name' NAME = 'name'
SEVERITY = 'severity' SEVERITY = 'severity'
IS_MARKED_DOWN = 'is_marked_down' IS_MARKED_DOWN = 'is_marked_down'
@ -44,10 +48,8 @@ class VertexProperties(object):
IS_REAL_VITRAGE_ID = 'is_real_vitrage_id' IS_REAL_VITRAGE_ID = 'is_real_vitrage_id'
class EdgeProperties(object): class EdgeProperties(ElementProperties):
RELATIONSHIP_TYPE = 'relationship_type' RELATIONSHIP_TYPE = 'relationship_type'
VITRAGE_IS_DELETED = 'vitrage_is_deleted'
UPDATE_TIMESTAMP = 'update_timestamp'
class EdgeLabel(object): class EdgeLabel(object):

View File

@ -18,8 +18,10 @@
# under the License. # under the License.
from collections import defaultdict from collections import defaultdict
import copy import copy
import hashlib
import itertools import itertools
import random import random
import six
import threading import threading
from oslo_config import cfg from oslo_config import cfg
@ -87,3 +89,11 @@ def spawn(target, *args, **kwargs):
t.daemon = True t.daemon = True
t.start() t.start()
return t return t
def md5(obj):
if isinstance(obj, tuple):
obj = str(obj)
if isinstance(obj, six.string_types):
return hashlib.md5(six.b(obj)).hexdigest()
raise Exception('Unknown object for md5 %s', str(obj))

View File

@ -1,58 +0,0 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
import oslo_messaging
from vitrage.messaging import get_transport
LOG = log.getLogger(__name__)
class CollectorNotifier(object):
"""Allows writing to message bus"""
def __init__(self, conf):
self.oslo_notifier = None
try:
topics = [conf.datasources.notification_topic_collector]
if conf.persistency.enable_persistency:
topics.append(conf.persistency.persistor_topic)
else:
LOG.warning("Not persisting events")
self.oslo_notifier = oslo_messaging.Notifier(
get_transport(conf),
driver='messagingv2',
publisher_id='datasources.events',
topics=topics)
except Exception as e:
LOG.info('Collector notifier - missing configuration %s'
% str(e))
@property
def enabled(self):
return self.oslo_notifier is not None
def notify_when_applicable(self, enriched_event):
"""Callback subscribed to driver.graph updates
:param enriched_event: the event with enriched data added by the driver
"""
try:
self.oslo_notifier.info({}, '', enriched_event)
except Exception:
LOG.exception('Datasource event cannot be notified - %s.',
enriched_event)

View File

@ -32,10 +32,6 @@ class ListenerService(object):
self._create_callbacks_by_events_dict(conf) self._create_callbacks_by_events_dict(conf)
topics = [conf.datasources.notification_topic_collector] topics = [conf.datasources.notification_topic_collector]
if conf.persistency.enable_persistency:
topics.append(conf.persistency.persistor_topic)
else:
LOG.warning("Not persisting events")
notifier = VitrageNotifier(conf, 'driver.events', topics) notifier = VitrageNotifier(conf, 'driver.events', topics)
self.listener = self._get_topics_listener(conf, notifier.notify) self.listener = self._get_topics_listener(conf, notifier.notify)

View File

@ -19,7 +19,6 @@ from oslo_log import log
from vitrage.common.constants import DatasourceAction from vitrage.common.constants import DatasourceAction
from vitrage.datasources import utils from vitrage.datasources import utils
from vitrage.messaging import VitrageNotifier
from vitrage import rpc as vitrage_rpc from vitrage import rpc as vitrage_rpc
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -28,13 +27,11 @@ LOG = log.getLogger(__name__)
class CollectorRpcHandlerService(object): class CollectorRpcHandlerService(object):
def __init__(self, conf): def __init__(self, conf):
super(CollectorRpcHandlerService, self).__init__()
self.conf = conf self.conf = conf
async_persistor = self.create_async_persistor(conf)
self.server = vitrage_rpc.get_default_server( self.server = vitrage_rpc.get_default_server(
conf, conf,
conf.rpc_topic_collector, conf.rpc_topic_collector,
[DriversEndpoint(conf, async_persistor)]) [DriversEndpoint(conf)])
def start(self): def start(self):
LOG.info("Collector Rpc Handler Service - Starting...") LOG.info("Collector Rpc Handler Service - Starting...")
@ -46,29 +43,11 @@ class CollectorRpcHandlerService(object):
self.server.stop() self.server.stop()
LOG.info("Collector Rpc Handler Service - Stopped!") LOG.info("Collector Rpc Handler Service - Stopped!")
@staticmethod
def create_async_persistor(conf):
if not conf.persistency.enable_persistency:
return None
topics = [conf.persistency.persistor_topic]
notifier = VitrageNotifier(conf, 'driver.events', topics)
persist_worker = futures.ThreadPoolExecutor(max_workers=1)
def do_persist(events):
for e in events:
notifier.notify('', e)
def do_work(events):
persist_worker.submit(do_persist, events)
return do_work
class DriversEndpoint(object): class DriversEndpoint(object):
def __init__(self, conf, async_persistor): def __init__(self, conf):
self.conf = conf self.conf = conf
self.async_persistor = async_persistor
self.pool = futures.ThreadPoolExecutor( self.pool = futures.ThreadPoolExecutor(
max_workers=len(self.conf.datasources.types)) max_workers=len(self.conf.datasources.types))
@ -93,8 +72,6 @@ class DriversEndpoint(object):
result.extend(list(self.pool.map(run_driver, failed_drivers))) result.extend(list(self.pool.map(run_driver, failed_drivers)))
events = [e for success, events in result if success for e in events] events = [e for success, events in result if success for e in events]
if self.async_persistor:
self.async_persistor(events)
LOG.debug("run drivers get_all done.") LOG.debug("run drivers get_all done.")
return events return events
@ -103,7 +80,5 @@ class DriversEndpoint(object):
LOG.debug("run driver get_changes: %s", driver_name) LOG.debug("run driver get_changes: %s", driver_name)
drivers = utils.get_drivers_by_name(self.conf, [driver_name]) drivers = utils.get_drivers_by_name(self.conf, [driver_name])
events = drivers[0].get_changes(DatasourceAction.UPDATE) events = drivers[0].get_changes(DatasourceAction.UPDATE)
if self.async_persistor:
self.async_persistor(events)
LOG.debug("run driver get_changes: %s done.", driver_name) LOG.debug("run driver get_changes: %s done.", driver_name)
return events return events

View File

@ -29,6 +29,7 @@ from vitrage.common.constants import GraphAction
from vitrage.common.constants import UpdateMethod from vitrage.common.constants import UpdateMethod
from vitrage.common.constants import VertexProperties as VProps from vitrage.common.constants import VertexProperties as VProps
from vitrage.common.exception import VitrageTransformerError from vitrage.common.exception import VitrageTransformerError
from vitrage.common.utils import md5
from vitrage.datasources import OPENSTACK_CLUSTER from vitrage.datasources import OPENSTACK_CLUSTER
import vitrage.graph.utils as graph_utils import vitrage.graph.utils as graph_utils
from vitrage.utils import datetime as datetime_utils from vitrage.utils import datetime as datetime_utils
@ -169,6 +170,7 @@ class TransformerBase(object):
if vertex.get(VProps.IS_REAL_VITRAGE_ID): if vertex.get(VProps.IS_REAL_VITRAGE_ID):
return vertex return vertex
new_uuid = self.uuid_from_deprecated_vitrage_id(vertex.vertex_id) new_uuid = self.uuid_from_deprecated_vitrage_id(vertex.vertex_id)
vertex.properties[VProps.VITRAGE_CACHED_ID] = md5(vertex.vertex_id)
vertex.vertex_id = new_uuid vertex.vertex_id = new_uuid
vertex.properties[VProps.VITRAGE_ID] = new_uuid vertex.properties[VProps.VITRAGE_ID] = new_uuid
vertex.properties[VProps.IS_REAL_VITRAGE_ID] = True vertex.properties[VProps.IS_REAL_VITRAGE_ID] = True
@ -176,7 +178,7 @@ class TransformerBase(object):
@classmethod @classmethod
def uuid_from_deprecated_vitrage_id(cls, vitrage_id): def uuid_from_deprecated_vitrage_id(cls, vitrage_id):
old_vitrage_id = hash(vitrage_id) old_vitrage_id = md5(vitrage_id)
new_uuid = cls.key_to_uuid_cache.get(old_vitrage_id) new_uuid = cls.key_to_uuid_cache.get(old_vitrage_id)
if not new_uuid: if not new_uuid:
new_uuid = uuidutils.generate_uuid() new_uuid = uuidutils.generate_uuid()

View File

@ -30,12 +30,12 @@ def create_rpc_client_instance(conf):
def get_all(rpc_client, events_coordination, driver_names, action, def get_all(rpc_client, events_coordination, driver_names, action,
retry_on_fault=False, first_call_timeout=None): retry_on_fault=False):
LOG.info('get_all starting for %s', driver_names) LOG.info('get_all starting for %s', driver_names)
t1 = time.time() t1 = time.time()
def _call(_client): def _call():
return _client.call( return rpc_client.call(
{}, {},
'driver_get_all', 'driver_get_all',
driver_names=driver_names, driver_names=driver_names,
@ -43,15 +43,10 @@ def get_all(rpc_client, events_coordination, driver_names, action,
retry_on_fault=retry_on_fault) retry_on_fault=retry_on_fault)
try: try:
if first_call_timeout: events = _call()
# create a temporary client instance with a timeout
client = rpc_client.prepare(timeout=first_call_timeout)
events = _call(client)
else:
events = _call(rpc_client)
except oslo_messaging.MessagingTimeout: except oslo_messaging.MessagingTimeout:
LOG.exception('Got MessagingTimeout') LOG.exception('Got MessagingTimeout')
events = _call(rpc_client) if retry_on_fault else [] events = _call() if retry_on_fault else []
t2 = time.time() t2 = time.time()
events_coordination.handle_multiple_low_priority(events) events_coordination.handle_multiple_low_priority(events)
t3 = time.time() t3 = time.time()

View File

@ -18,14 +18,20 @@ from oslo_log import log
import oslo_messaging import oslo_messaging
from vitrage.common.constants import DatasourceAction from vitrage.common.constants import DatasourceAction
from vitrage.common.constants import VertexProperties as VProps
from vitrage.common.utils import spawn from vitrage.common.utils import spawn
from vitrage.datasources.transformer_base import TransformerBase
from vitrage.entity_graph import datasource_rpc as ds_rpc from vitrage.entity_graph import datasource_rpc as ds_rpc
from vitrage.entity_graph import EVALUATOR_TOPIC from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.entity_graph.graph_persistency import GraphPersistency
from vitrage.entity_graph.processor.notifier import GraphNotifier
from vitrage.entity_graph.processor.processor import Processor from vitrage.entity_graph.processor.processor import Processor
from vitrage.entity_graph.scheduler import Scheduler from vitrage.entity_graph.scheduler import Scheduler
from vitrage.entity_graph.workers import GraphWorkersManager from vitrage.entity_graph.workers import GraphWorkersManager
from vitrage.graph.driver.networkx_graph import NXGraph
from vitrage import messaging from vitrage import messaging
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -40,27 +46,49 @@ class VitrageGraphInit(object):
self.process_event, self.process_event,
conf.datasources.notification_topic_collector, conf.datasources.notification_topic_collector,
EVALUATOR_TOPIC) EVALUATOR_TOPIC)
self.scheduler = Scheduler(conf, graph, self.events_coordination) self.persist = GraphPersistency(conf, db_connection, graph)
self.processor = Processor(conf, graph, self.scheduler.graph_persistor) self.scheduler = Scheduler(conf, graph, self.events_coordination,
self.persist)
self.processor = Processor(conf, graph)
def run(self): def run(self):
LOG.info('Init Started') LOG.info('Init Started')
LOG.info('clearing database active_actions') graph_snapshot = self.persist.query_recent_snapshot()
if graph_snapshot:
self._restart_from_stored_graph(graph_snapshot)
else:
self._start_from_scratch()
self.workers.run()
def _restart_from_stored_graph(self, graph_snapshot):
LOG.info('Initializing graph from database snapshot (%sKb)',
len(graph_snapshot.graph_snapshot) / 1024)
NXGraph.read_gpickle(graph_snapshot.graph_snapshot, self.graph)
self.persist.replay_events(self.graph, graph_snapshot.event_id)
self._recreate_transformers_id_cache()
LOG.info("%s vertices loaded", self.graph.num_vertices())
spawn(self._start_all_workers, is_snapshot=True)
def _start_from_scratch(self):
LOG.info('Starting for the first time')
LOG.info('Clearing database active_actions')
self.db.active_actions.delete() self.db.active_actions.delete()
ds_rpc.get_all( ds_rpc.get_all(
ds_rpc.create_rpc_client_instance(self.conf), ds_rpc.create_rpc_client_instance(self.conf),
self.events_coordination, self.events_coordination,
self.conf.datasources.types, self.conf.datasources.types,
action=DatasourceAction.INIT_SNAPSHOT, action=DatasourceAction.INIT_SNAPSHOT,
retry_on_fault=True, retry_on_fault=True)
first_call_timeout=10) LOG.info("%s vertices loaded", self.graph.num_vertices())
self.processor.start_notifier() self.persist.store_graph()
spawn(self.start_all_workers) spawn(self._start_all_workers, is_snapshot=False)
self.workers.run()
def start_all_workers(self): def _start_all_workers(self, is_snapshot):
self.workers.submit_start_evaluations() # evaluate entire graph if is_snapshot:
self.graph.subscribe(self.workers.submit_graph_update) self.workers.submit_enable_evaluations()
else:
self.workers.submit_start_evaluations()
self._add_graph_subscriptions()
self.scheduler.start_periodic_tasks() self.scheduler.start_periodic_tasks()
LOG.info('Init Finished') LOG.info('Init Finished')
self.events_coordination.start() self.events_coordination.start()
@ -72,6 +100,24 @@ class VitrageGraphInit(object):
else: else:
self.processor.process_event(event) self.processor.process_event(event)
def _recreate_transformers_id_cache(self):
for v in self.graph.get_vertices():
if not v.get(VProps.VITRAGE_CACHED_ID):
LOG.warning("Missing vitrage_cached_id in the vertex. "
"Vertex is not added to the ID cache %s", str(v))
else:
TransformerBase.key_to_uuid_cache[v[VProps.VITRAGE_CACHED_ID]]\
= v.vertex_id
def _add_graph_subscriptions(self):
self.graph.subscribe(self.workers.submit_graph_update)
vitrage_notifier = GraphNotifier(self.conf)
if vitrage_notifier.enabled:
self.graph.subscribe(vitrage_notifier.notify_when_applicable)
LOG.info('Subscribed vitrage notifier to graph changes')
self.graph.subscribe(self.persist.persist_event,
finalization=True)
PRIORITY_DELAY = 0.05 PRIORITY_DELAY = 0.05

View File

@ -0,0 +1,129 @@
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import timedelta
from oslo_log import log
from vitrage.common.constants import VertexProperties as VProps
from vitrage.graph import Edge
from vitrage.graph import Vertex
from vitrage.storage.sqlalchemy import models
from vitrage.utils.datetime import utcnow
LOG = log.getLogger(__name__)
EPSILON = 30
class GraphPersistency(object):
def __init__(self, conf, db, graph):
self.conf = conf
self.db = db
self.graph = graph
self.enabled = conf.persistency.enable_persistency
def store_graph(self):
if not self.enabled:
return
LOG.info('Persisting graph...')
try:
last_event_id = self.db.events.get_last_event_id()
last_event_id = last_event_id.event_id if last_event_id else 0
graph_snapshot = self.graph.write_gpickle()
self.db.graph_snapshots.update(models.GraphSnapshot(
snapshot_id=1,
event_id=last_event_id,
graph_snapshot=graph_snapshot))
LOG.info('Persisting graph - done')
except Exception:
LOG.exception("Graph is not stored")
def _recent_snapshot_time(self):
t = utcnow(with_timezone=False)
t = t - timedelta(seconds=3 * self.conf.datasources.snapshots_interval)
t = t - timedelta(seconds=EPSILON)
return t
def query_recent_snapshot(self):
if not self.enabled:
return
timestamp = self._recent_snapshot_time()
return self.db.graph_snapshots.query(timestamp=timestamp)
def replay_events(self, graph, event_id):
LOG.info('Getting events from database')
events = self.db.events.get_replay_events(
event_id=event_id)
LOG.info('Applying %s database events', len(events))
for event in events:
if event.is_vertex:
v_id = event.payload['vertex_id']
del event.payload['vertex_id']
v = Vertex(v_id, event.payload)
graph.update_vertex(v)
else:
source_id = event.payload['source_id']
target_id = event.payload['target_id']
label = event.payload['label']
del event.payload['source_id']
del event.payload['target_id']
del event.payload['label']
e = Edge(source_id, target_id, label, event.payload)
graph.update_edge(e)
def persist_event(self, before, current, is_vertex, graph, event_id=None):
"""Callback subscribed to driver.graph updates"""
if not self.enabled or\
not self.is_important_change(before,
current,
VProps.UPDATE_TIMESTAMP,
VProps.VITRAGE_SAMPLE_TIMESTAMP):
return
if is_vertex:
curr = current.properties.copy()
curr['vertex_id'] = current.vertex_id
else:
curr = current.properties.copy()
curr['source_id'] = current.source_id
curr['target_id'] = current.target_id
curr['label'] = current.label
event_row = models.Event(payload=curr, is_vertex=is_vertex,
event_id=event_id)
self.db.events.create(event_row)
@staticmethod
def is_important_change(before, curr, *args):
"""Non important changes such as update_timestamp shouldn't be stored
:param args: list of keys that should be ignored
:return: True if this change should be stored
"""
if not curr:
return False
if curr and not before:
return True
for key, content in curr.properties.items():
if key in args:
continue
elif isinstance(content, dict) or isinstance(content, list):
return True # TODO(ihefetz): can be imporved
elif before.properties.get(key) != content:
return True
return False

View File

@ -21,7 +21,6 @@ from vitrage.datasources.transformer_base import TransformerBase
from vitrage.entity_graph.mappings.datasource_info_mapper import \ from vitrage.entity_graph.mappings.datasource_info_mapper import \
DatasourceInfoMapper DatasourceInfoMapper
from vitrage.entity_graph.processor import base as processor from vitrage.entity_graph.processor import base as processor
from vitrage.entity_graph.processor.notifier import GraphNotifier
from vitrage.entity_graph.processor import processor_utils as PUtils from vitrage.entity_graph.processor import processor_utils as PUtils
from vitrage.entity_graph.processor.transformer_manager import \ from vitrage.entity_graph.processor.transformer_manager import \
TransformerManager TransformerManager
@ -32,15 +31,13 @@ LOG = log.getLogger(__name__)
class Processor(processor.ProcessorBase): class Processor(processor.ProcessorBase):
def __init__(self, conf, e_graph=None, graph_persistor=None): def __init__(self, conf, e_graph=None):
super(Processor, self).__init__() super(Processor, self).__init__()
self.conf = conf self.conf = conf
self.transformer_manager = TransformerManager(self.conf) self.transformer_manager = TransformerManager(self.conf)
self.info_mapper = DatasourceInfoMapper(self.conf) self.info_mapper = DatasourceInfoMapper(self.conf)
self._initialize_events_actions() self._initialize_events_actions()
self.entity_graph = e_graph self.entity_graph = e_graph
self._notifier = GraphNotifier(conf)
self._graph_persistor = graph_persistor
def process_event(self, event): def process_event(self, event):
"""Decides which action to run on given event """Decides which action to run on given event
@ -60,10 +57,9 @@ class Processor(processor.ProcessorBase):
if entity.action not in self.actions.keys(): if entity.action not in self.actions.keys():
LOG.debug('deprecated or unknown entity %s ignored', str(entity)) LOG.debug('deprecated or unknown entity %s ignored', str(entity))
return return
self._calculate_vitrage_aggregated_values(entity.vertex, entity.action) self._calculate_vitrage_aggregated_values(entity.vertex, entity.action)
self.actions[entity.action](entity.vertex, entity.neighbors) self.actions[entity.action](entity.vertex, entity.neighbors)
if self._graph_persistor:
self._graph_persistor.update_last_event_timestamp(event)
def create_entity(self, new_vertex, neighbors): def create_entity(self, new_vertex, neighbors):
"""Adds new vertex to the entity graph """Adds new vertex to the entity graph
@ -141,7 +137,7 @@ class Processor(processor.ProcessorBase):
PUtils.mark_deleted(self.entity_graph, deleted_vertex) PUtils.mark_deleted(self.entity_graph, deleted_vertex)
else: else:
LOG.warning("Delete event arrived on invalid resource: " LOG.warning("Delete entity arrived on invalid resource: "
"deleted_vertex - %s, graph_vertex - %s", "deleted_vertex - %s, graph_vertex - %s",
deleted_vertex, graph_vertex) deleted_vertex, graph_vertex)
@ -185,15 +181,10 @@ class Processor(processor.ProcessorBase):
PUtils.is_newer_vertex(graph_vertex, vertex): PUtils.is_newer_vertex(graph_vertex, vertex):
self.entity_graph.remove_vertex(vertex) self.entity_graph.remove_vertex(vertex)
else: else:
LOG.warning("Delete event arrived on invalid resource: " LOG.warning("Remove deleted entity arrived on invalid resource: "
"deleted_vertex - %s, graph_vertex - %s", "deleted_vertex - %s, graph_vertex - %s",
vertex, graph_vertex) vertex, graph_vertex)
def start_notifier(self):
if self._notifier and self._notifier.enabled:
self.entity_graph.subscribe(self._notifier.notify_when_applicable)
LOG.info('Graph notifications subscription added')
def _update_neighbors(self, vertex, neighbors): def _update_neighbors(self, vertex, neighbors):
"""Updates vertices neighbor connections """Updates vertices neighbor connections

View File

@ -23,20 +23,18 @@ from vitrage.common.utils import spawn
from vitrage.entity_graph.consistency.consistency_enforcer import\ from vitrage.entity_graph.consistency.consistency_enforcer import\
ConsistencyEnforcer ConsistencyEnforcer
from vitrage.entity_graph import datasource_rpc as ds_rpc from vitrage.entity_graph import datasource_rpc as ds_rpc
from vitrage.persistency.graph_persistor import GraphPersistor
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
class Scheduler(object): class Scheduler(object):
def __init__(self, conf, graph, events_coordination): def __init__(self, conf, graph, events_coordination, persist):
super(Scheduler, self).__init__() super(Scheduler, self).__init__()
self.conf = conf self.conf = conf
self.graph = graph self.graph = graph
self.events_coordination = events_coordination self.events_coordination = events_coordination
self.graph_persistor = GraphPersistor(conf) if \ self.persist = persist
self.conf.persistency.enable_persistency else None
self.consistency = ConsistencyEnforcer(conf, graph) self.consistency = ConsistencyEnforcer(conf, graph)
self.periodic = None self.periodic = None
@ -44,27 +42,10 @@ class Scheduler(object):
self.periodic = periodics.PeriodicWorker.create( self.periodic = periodics.PeriodicWorker.create(
[], executor_factory=lambda: ThreadPoolExecutor(max_workers=10)) [], executor_factory=lambda: ThreadPoolExecutor(max_workers=10))
self.add_persist_timer()
self.add_consistency_timer() self.add_consistency_timer()
self.add_rpc_datasources_timers() self.add_rpc_datasources_timers()
spawn(self.periodic.start) spawn(self.periodic.start)
def add_persist_timer(self):
if not self.graph_persistor:
return
spacing = self.conf.persistency.graph_persistency_interval
@periodics.periodic(spacing=spacing)
def persist_periodic():
if self.graph_persistor:
try:
self.graph_persistor.store_graph(graph=self.graph)
except Exception:
LOG.exception('Persist failed.')
self.periodic.add(persist_periodic)
LOG.info("added persist_periodic (spacing=%s)", spacing)
def add_consistency_timer(self): def add_consistency_timer(self):
spacing = self.conf.datasources.snapshots_interval spacing = self.conf.datasources.snapshots_interval
@ -89,6 +70,7 @@ class Scheduler(object):
self.events_coordination, self.events_coordination,
self.conf.datasources.types, self.conf.datasources.types,
DatasourceAction.SNAPSHOT) DatasourceAction.SNAPSHOT)
self.persist.store_graph()
except Exception: except Exception:
LOG.exception('get_all_periodic failed.') LOG.exception('get_all_periodic failed.')

View File

@ -42,6 +42,7 @@ LOG = log.getLogger(__name__)
# Supported message types # Supported message types
GRAPH_UPDATE = 'graph_update' GRAPH_UPDATE = 'graph_update'
ENABLE_EVALUATION = 'enable_evaluation'
START_EVALUATION = 'start_evaluation' START_EVALUATION = 'start_evaluation'
RELOAD_TEMPLATES = 'reload_templates' RELOAD_TEMPLATES = 'reload_templates'
TEMPLATE_ACTION = 'template_action' TEMPLATE_ACTION = 'template_action'
@ -151,6 +152,13 @@ class GraphWorkersManager(cotyledon.ServiceManager):
""" """
self._submit_and_wait(self._evaluator_queues, (START_EVALUATION,)) self._submit_and_wait(self._evaluator_queues, (START_EVALUATION,))
def submit_enable_evaluations(self):
"""Enable scenario-evaluator in all evaluator workers
Only enables the worker's scenario-evaluator, without traversing
"""
self._submit_and_wait(self._evaluator_queues, (ENABLE_EVALUATION,))
def submit_evaluators_reload_templates(self): def submit_evaluators_reload_templates(self):
"""Recreate the scenario-repository in all evaluator workers """Recreate the scenario-repository in all evaluator workers
@ -288,7 +296,11 @@ class EvaluatorWorker(GraphCloneWorkerBase):
super(EvaluatorWorker, self).do_task(task) super(EvaluatorWorker, self).do_task(task)
action = task[0] action = task[0]
if action == START_EVALUATION: if action == START_EVALUATION:
# fresh init (without snapshot) requires iterating the graph
self._evaluator.run_evaluator() self._evaluator.run_evaluator()
elif action == ENABLE_EVALUATION:
# init with a snapshot does not require iterating the graph
self._evaluator.enabled = True
elif action == RELOAD_TEMPLATES: elif action == RELOAD_TEMPLATES:
self._reload_templates() self._reload_templates()

View File

@ -21,6 +21,7 @@ from oslo_log import log
from vitrage.common.constants import EdgeProperties as EProps from vitrage.common.constants import EdgeProperties as EProps
from vitrage.common.constants import VertexProperties as VProps from vitrage.common.constants import VertexProperties as VProps
from vitrage.common.utils import md5
from vitrage.common.utils import recursive_keypairs from vitrage.common.utils import recursive_keypairs
from vitrage.datasources.listener_service import defaultdict from vitrage.datasources.listener_service import defaultdict
from vitrage.entity_graph.mappings.datasource_info_mapper \ from vitrage.entity_graph.mappings.datasource_info_mapper \
@ -232,7 +233,7 @@ class ScenarioEvaluator(object):
match_action_spec = self._get_action_spec(action_spec, match) match_action_spec = self._get_action_spec(action_spec, match)
items_ids = \ items_ids = \
[match_item[1].vertex_id for match_item in match.items()] [match_item[1].vertex_id for match_item in match.items()]
match_hash = hash(tuple(sorted(items_ids))) match_hash = md5(tuple(sorted(items_ids)))
self._evaluate_property_functions(template_schema, match, self._evaluate_property_functions(template_schema, match,
match_action_spec.properties) match_action_spec.properties)
@ -301,11 +302,8 @@ class ScenarioEvaluator(object):
def _generate_action_id(action_spec): def _generate_action_id(action_spec):
"""Generate a unique action id for the action """Generate a unique action id for the action
BEWARE: The implementation of this function MUST NOT BE CHANGED!! BEWARE: The value created here should not be stored in database,
as in python3, the hash function seed changes after program restart
The created hash is used for storing the active actions in the
database. If changed, existing active actions can no longer be
retrieved.
""" """
targets = [(k, v.vertex_id) for k, v in action_spec.targets.items()] targets = [(k, v.vertex_id) for k, v in action_spec.targets.items()]
return hash( return hash(

View File

@ -89,12 +89,6 @@ class NXAlgorithm(GraphAlgorithm):
vertices=self._vertex_result_to_list(n_result), vertices=self._vertex_result_to_list(n_result),
edges=self._edge_result_to_list(e_result)) edges=self._edge_result_to_list(e_result))
LOG.debug('graph_query_vertices: find graph: nodes %s, edges %s',
str(list(graph._g.nodes(data=True))),
str(list(graph._g.edges(data=True))))
LOG.debug('graph_query_vertices: real graph: nodes %s, edges %s',
str(list(self.graph._g.nodes(data=True))),
str(list(self.graph._g.edges(data=True))))
return graph return graph
def sub_graph_matching(self, def sub_graph_matching(self,

View File

@ -49,8 +49,25 @@ class Graph(object):
self.graph_type = graph_type self.graph_type = graph_type
self.notifier = Notifier() self.notifier = Notifier()
def subscribe(self, function): def subscribe(self, function, finalization=False):
self.notifier.subscribe(function) """Subscribe to graph changes
:param function: function will be called after each graph change
:param finalization: function will be called after all non finalization
Usage Example:
graph = NXGraph()
graph.subscribe(foo1, finalization=True)
graph.subscribe(foo2, finalization=False)
graph.subscribe(foo3, finalization=False)
The order of the calls in this example wii be:
1. foo2
2. foo3
3. foo1
foo1 is called last because it subscribed as a finalization function
"""
self.notifier.subscribe(function, finalization)
def is_subscribed(self): def is_subscribed(self):
return self.notifier.is_subscribed() return self.notifier.is_subscribed()

View File

@ -11,9 +11,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import functools import functools
import itertools
from vitrage.graph.driver.elements import Vertex from vitrage.graph.driver.elements import Vertex
@ -34,15 +35,21 @@ def _after_func(graph, item, data_before=None):
class Notifier(object): class Notifier(object):
def __init__(self): def __init__(self):
self._subscriptions = [] self._subscriptions = []
self._finalization_subscriptions = []
def subscribe(self, function): def subscribe(self, function, finalization=False):
self._subscriptions.append(function) if finalization:
self._finalization_subscriptions.append(function)
else:
self._subscriptions.append(function)
def is_subscribed(self): def is_subscribed(self):
return len(self._subscriptions) != 0 size = len(self._subscriptions) + len(self._finalization_subscriptions)
return size != 0
def notify(self, *args, **kwargs): def notify(self, *args, **kwargs):
for func in self._subscriptions: for func in itertools.chain(self._subscriptions,
self._finalization_subscriptions):
func(*args, **kwargs) func(*args, **kwargs)
@staticmethod @staticmethod

View File

@ -22,7 +22,4 @@ OPTS = [
cfg.BoolOpt('enable_persistency', cfg.BoolOpt('enable_persistency',
default=False, default=False,
help='Periodically store entity graph snapshot to database'), help='Periodically store entity graph snapshot to database'),
cfg.IntOpt('graph_persistency_interval',
default=3600,
help='Store graph to database every X seconds'),
] ]

View File

@ -1,60 +0,0 @@
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
from oslo_log import log
from dateutil import parser
from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.graph.driver.networkx_graph import NXGraph
from vitrage import storage
from vitrage.storage.sqlalchemy import models
from vitrage.utils import datetime
from vitrage.utils.datetime import utcnow
LOG = log.getLogger(__name__)
class GraphPersistor(object):
def __init__(self, conf):
super(GraphPersistor, self).__init__()
self.db_connection = storage.get_connection_from_config(conf)
self.last_event_timestamp = datetime.datetime.utcnow()
def store_graph(self, graph):
LOG.info('Graph persistency running..')
try:
graph_snapshot = graph.write_gpickle()
db_row = models.GraphSnapshot(
last_event_timestamp=self.last_event_timestamp,
graph_snapshot=graph_snapshot)
self.db_connection.graph_snapshots.create(db_row)
except Exception as e:
LOG.exception("Graph is not stored: %s", e)
def load_graph(self, timestamp=None):
db_row = self.db_connection.graph_snapshots.query(timestamp) if \
timestamp else self.db_connection.graph_snapshots.query(utcnow())
return NXGraph.read_gpickle(db_row.graph_snapshot) if db_row else None
def delete_graph_snapshots(self, timestamp):
"""Deletes all graph snapshots until timestamp"""
self.db_connection.graph_snapshots.delete(timestamp)
def update_last_event_timestamp(self, event):
timestamp = event.get(DSProps.SAMPLE_DATE)
self.last_event_timestamp = parser.parse(timestamp) if timestamp \
else None

View File

@ -13,16 +13,15 @@
# under the License. # under the License.
from __future__ import print_function from __future__ import print_function
from concurrent.futures import ThreadPoolExecutor
import cotyledon import cotyledon
import dateutil.parser from futurist import periodics
import oslo_messaging as oslo_m
from oslo_log import log from oslo_log import log
from vitrage.common.constants import DatasourceProperties as DSProps import oslo_messaging as oslo_m
from vitrage.common.constants import GraphAction
from vitrage.common.utils import spawn
from vitrage import messaging from vitrage import messaging
from vitrage.storage.sqlalchemy import models
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -39,11 +38,13 @@ class PersistorService(cotyledon.Service):
self.listener = messaging.get_notification_listener( self.listener = messaging.get_notification_listener(
transport, [target], transport, [target],
[VitragePersistorEndpoint(self.db_connection)]) [VitragePersistorEndpoint(self.db_connection)])
self.scheduler = Scheduler(conf, db_connection)
def run(self): def run(self):
LOG.info("Vitrage Persistor Service - Starting...") LOG.info("Vitrage Persistor Service - Starting...")
self.listener.start() self.listener.start()
self.scheduler.start_periodic_tasks()
LOG.info("Vitrage Persistor Service - Started!") LOG.info("Vitrage Persistor Service - Started!")
@ -57,19 +58,45 @@ class PersistorService(cotyledon.Service):
class VitragePersistorEndpoint(object): class VitragePersistorEndpoint(object):
funcs = {}
def __init__(self, db_connection): def __init__(self, db_connection):
self.db_connection = db_connection self.db_connection = db_connection
def info(self, ctxt, publisher_id, event_type, payload, metadata): def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.debug('Vitrage Event Info: payload %s', payload) LOG.debug('Event_type: %s Payload %s', event_type, payload)
self.process_event(payload) if event_type and event_type in self.funcs.keys():
self.funcs[event_type](self.db_connection, event_type, payload)
def process_event(self, data):
""":param data: Serialized to a JSON formatted ``str`` """ class Scheduler(object):
if data.get(DSProps.EVENT_TYPE) == GraphAction.END_MESSAGE:
return def __init__(self, conf, db):
collector_timestamp = \ self.conf = conf
dateutil.parser.parse(data.get(DSProps.SAMPLE_DATE)) self.db = db
event_row = models.Event(payload=data, self.periodic = None
collector_timestamp=collector_timestamp)
self.db_connection.events.create(event_row) def start_periodic_tasks(self):
self.periodic = periodics.PeriodicWorker.create(
[], executor_factory=lambda: ThreadPoolExecutor(max_workers=10))
self.add_expirer_timer()
spawn(self.periodic.start)
def add_expirer_timer(self):
spacing = 60
@periodics.periodic(spacing=spacing)
def expirer_periodic():
try:
event_id = self.db.graph_snapshots.query_snapshot_event_id()
if event_id:
LOG.debug('Expirer deleting event - id=%s', event_id)
self.db.events.delete(event_id)
except Exception:
LOG.exception('DB periodic cleanup run failed.')
self.periodic.add(expirer_periodic)
LOG.info("Database periodic cleanup starting (spacing=%ss)", spacing)

View File

@ -203,12 +203,8 @@ class EventsConnection(object):
""" """
raise NotImplementedError('query events is not implemented') raise NotImplementedError('query events is not implemented')
def delete(self, def delete(self, event_id=None):
event_id=None, """Delete all events older than event_id"""
collector_timestamp=None,
gt_collector_timestamp=None,
lt_collector_timestamp=None):
"""Delete all events that match the filters."""
raise NotImplementedError('delete events is not implemented') raise NotImplementedError('delete events is not implemented')

View File

@ -18,6 +18,7 @@ from __future__ import absolute_import
from oslo_db.sqlalchemy import session as db_session from oslo_db.sqlalchemy import session as db_session
from oslo_log import log from oslo_log import log
from sqlalchemy.engine import url as sqlalchemy_url from sqlalchemy.engine import url as sqlalchemy_url
from sqlalchemy import or_
from vitrage.common.exception import VitrageInputError from vitrage.common.exception import VitrageInputError
from vitrage import storage from vitrage import storage
@ -79,6 +80,18 @@ class Connection(base.Connection):
def upgrade(self, nocreate=False): def upgrade(self, nocreate=False):
engine = self._engine_facade.get_engine() engine = self._engine_facade.get_engine()
engine.connect() engine.connect()
# As the following tables were changed in Rocky, they are removed and
# created. This is fine for an upgrade from Queens, since data in these
# was anyway deleted in each restart.
# starting From Rocky, data in these tables should not be removed.
models.Base.metadata.drop_all(
engine, tables=[
models.ActiveAction.__table__,
models.Event.__table__,
models.GraphSnapshot.__table__])
models.Base.metadata.create_all( models.Base.metadata.create_all(
engine, tables=[models.ActiveAction.__table__, engine, tables=[models.ActiveAction.__table__,
models.Template.__table__, models.Template.__table__,
@ -247,6 +260,21 @@ class EventsConnection(base.EventsConnection, BaseTableConn):
with session.begin(): with session.begin():
session.merge(event) session.merge(event)
def get_last_event_id(self):
session = self._engine_facade.get_session()
query = session.query(models.Event.event_id)
return query.order_by(models.Event.event_id.desc()).first()
def get_replay_events(self, event_id):
"""Get all events that occurred after the specified event_id
:rtype: list of vitrage.storage.sqlalchemy.models.Event
"""
session = self._engine_facade.get_session()
query = session.query(models.Event)
query = query.filter(models.Event.event_id > event_id)
return query.order_by(models.Event.event_id.asc()).all()
def query(self, def query(self,
event_id=None, event_id=None,
collector_timestamp=None, collector_timestamp=None,
@ -290,31 +318,12 @@ class EventsConnection(base.EventsConnection, BaseTableConn):
lt_collector_timestamp) lt_collector_timestamp)
return query return query
def delete(self, def delete(self, event_id=None):
event_id=None, """Delete all events older than event_id"""
collector_timestamp=None, session = self._engine_facade.get_session()
gt_collector_timestamp=None, query = session.query(models.Event)
lt_collector_timestamp=None): if event_id:
"""Delete all events that match the filters. query = query.filter(models.Event.event_id < event_id)
:raises: vitrage.common.exception.VitrageInputError.
"""
if (event_id or collector_timestamp) and \
(gt_collector_timestamp or lt_collector_timestamp):
msg = "Calling function with both specific event and range of " \
"events parameters at the same time "
LOG.debug(msg)
raise VitrageInputError(msg)
query = self.query_filter(
models.Event,
event_id=event_id,
collector_timestamp=collector_timestamp)
query = self._update_query_gt_lt(gt_collector_timestamp,
lt_collector_timestamp,
query)
query.delete() query.delete()
@ -334,15 +343,19 @@ class GraphSnapshotsConnection(base.GraphSnapshotsConnection, BaseTableConn):
def query(self, timestamp=None): def query(self, timestamp=None):
query = self.query_filter(models.GraphSnapshot) query = self.query_filter(models.GraphSnapshot)
query = query.filter(models.GraphSnapshot.last_event_timestamp <= query = query.filter(
timestamp) or_(models.GraphSnapshot.updated_at >= timestamp,
return query.order_by( models.GraphSnapshot.created_at >= timestamp))
models.GraphSnapshot.last_event_timestamp.desc()).first() return query.first()
def delete(self, timestamp=None): def query_snapshot_event_id(self):
"""Delete all graph snapshots taken until timestamp.""" """Select the event_id of the stored snapshot"""
session = self._engine_facade.get_session()
query = session.query(models.GraphSnapshot.event_id)
result = query.first()
return result[0] if result else None
def delete(self):
"""Delete all graph snapshots"""
query = self.query_filter(models.GraphSnapshot) query = self.query_filter(models.GraphSnapshot)
query = query.filter(models.GraphSnapshot.last_event_timestamp <=
timestamp)
query.delete() query.delete()

View File

@ -16,7 +16,7 @@ import zlib
from oslo_db.sqlalchemy import models from oslo_db.sqlalchemy import models
from sqlalchemy import Column, DateTime, INTEGER, String, \ from sqlalchemy import Column, INTEGER, String, \
SmallInteger, BigInteger, Index, Boolean SmallInteger, BigInteger, Index, Boolean
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
@ -73,20 +73,19 @@ class Event(Base):
__tablename__ = 'events' __tablename__ = 'events'
event_id = Column("id", INTEGER, primary_key=True, nullable=False, event_id = Column("id", BigInteger(), primary_key=True, autoincrement=True)
autoincrement=True)
collector_timestamp = Column(DateTime, index=True, nullable=False)
payload = Column(JSONEncodedDict(), nullable=False) payload = Column(JSONEncodedDict(), nullable=False)
is_vertex = Column(Boolean, nullable=False)
def __repr__(self): def __repr__(self):
return \ return \
"<Event(" \ "<Event(" \
"id='%s', " \ "id='%s', " \
"collector_timestamp='%s', " \ "is_vertex='%s', " \
"payload='%s')>" % \ "payload='%s')>" % \
( (
self.event_id, self.event_id,
self.collector_timestamp, self.is_vertex,
self.payload self.payload
) )
@ -106,7 +105,7 @@ class ActiveAction(Base, models.TimestampMixin):
target_vertex_id = Column(String(128)) target_vertex_id = Column(String(128))
action_id = Column(String(128), primary_key=True) action_id = Column(String(128), primary_key=True)
score = Column(SmallInteger()) score = Column(SmallInteger())
trigger = Column(BigInteger(), primary_key=True) trigger = Column(String(128), primary_key=True)
def __repr__(self): def __repr__(self):
return \ return \
@ -134,16 +133,19 @@ class ActiveAction(Base, models.TimestampMixin):
class GraphSnapshot(Base): class GraphSnapshot(Base):
__tablename__ = 'graph_snapshots' __tablename__ = 'graph_snapshots'
last_event_timestamp = Column(DateTime, primary_key=True, nullable=False) snapshot_id = Column("id", INTEGER, primary_key=True)
event_id = Column(BigInteger, nullable=False)
graph_snapshot = Column(CompressedBinary((2 ** 32) - 1), nullable=False) graph_snapshot = Column(CompressedBinary((2 ** 32) - 1), nullable=False)
def __repr__(self): def __repr__(self):
return \ return \
"<GraphSnapshot(" \ "<GraphSnapshot(" \
"last_event_timestamp='%s', " \ "id=%s," \
"event_id='%s', " \
"graph_snapshot='%s')>" %\ "graph_snapshot='%s')>" %\
( (
self.last_event_timestamp, self.snapshot_id,
self.event_id,
self.graph_snapshot self.graph_snapshot
) )

View File

@ -11,21 +11,21 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import sys
import time import time
from oslo_config import cfg from oslo_config import cfg
from oslo_db.options import database_opts
from vitrage.persistency.graph_persistor import GraphPersistor from vitrage.common.constants import EdgeProperties
from vitrage import storage from vitrage.common.constants import VertexProperties
from vitrage.storage.sqlalchemy import models from vitrage.graph.driver.networkx_graph import NXGraph
from vitrage.entity_graph import graph_persistency
from vitrage.tests.functional.base import TestFunctionalBase from vitrage.tests.functional.base import TestFunctionalBase
from vitrage.tests.functional.test_configuration import TestConfiguration
from vitrage.tests.mocks.graph_generator import GraphGenerator from vitrage.tests.mocks.graph_generator import GraphGenerator
from vitrage.utils.datetime import utcnow
class TestGraphPersistor(TestFunctionalBase): class TestGraphPersistor(TestFunctionalBase, TestConfiguration):
# noinspection PyAttributeOutsideInit,PyPep8Naming # noinspection PyAttributeOutsideInit,PyPep8Naming
@classmethod @classmethod
@ -34,62 +34,79 @@ class TestGraphPersistor(TestFunctionalBase):
cls.conf = cfg.ConfigOpts() cls.conf = cfg.ConfigOpts()
cls.conf.register_opts(cls.PROCESSOR_OPTS, group='entity_graph') cls.conf.register_opts(cls.PROCESSOR_OPTS, group='entity_graph')
cls.conf.register_opts(cls.DATASOURCES_OPTS, group='datasources') cls.conf.register_opts(cls.DATASOURCES_OPTS, group='datasources')
cls.conf.register_opts(database_opts, group='database') cls.conf.register_opts(cls.PERSISTENCY_OPTS, group='persistency')
cls.conf.set_override('connection', 'sqlite:///test-%s.db' cls.add_db(cls.conf)
% sys.version_info[0], group='database')
cls._db = storage.get_connection_from_config(cls.conf)
engine = cls._db._engine_facade.get_engine()
models.Base.metadata.create_all(engine)
cls.load_datasources(cls.conf) cls.load_datasources(cls.conf)
cls.graph_persistor = GraphPersistor(cls.conf) graph_persistency.EPSILON = 0.1
def test_persist_graph(self): def test_graph_store_and_query_recent_snapshot(self):
g = GraphGenerator().create_graph() g = GraphGenerator().create_graph()
current_time = utcnow() graph_persistor = graph_persistency.GraphPersistency(self.conf,
self.graph_persistor.last_event_timestamp = current_time self._db, g)
self.graph_persistor.store_graph(g) graph_persistor.store_graph()
graph_snapshot = self.graph_persistor.load_graph(current_time) recovered_data = graph_persistor.query_recent_snapshot()
self.assert_graph_equal(g, graph_snapshot) recovered_graph = self.load_snapshot(recovered_data)
self.graph_persistor.delete_graph_snapshots(utcnow()) self.assert_graph_equal(g, recovered_graph)
def test_persist_two_graphs(self): time.sleep(graph_persistency.EPSILON + 0.1 +
g1 = GraphGenerator().create_graph() 3 * self.conf.datasources.snapshots_interval)
current_time1 = utcnow() recovered_data = graph_persistor.query_recent_snapshot()
self.graph_persistor.last_event_timestamp = current_time1 self.assertIsNone(recovered_data, 'Should not be a recent snapshot')
self.graph_persistor.store_graph(g1)
graph_snapshot1 = self.graph_persistor.load_graph(current_time1)
g2 = GraphGenerator(5).create_graph() def test_event_store_and_replay_events(self):
current_time2 = utcnow()
self.graph_persistor.last_event_timestamp = current_time2
self.graph_persistor.store_graph(g2)
graph_snapshot2 = self.graph_persistor.load_graph(current_time2)
self.assert_graph_equal(g1, graph_snapshot1)
self.assert_graph_equal(g2, graph_snapshot2)
self.graph_persistor.delete_graph_snapshots(utcnow())
def test_load_last_graph_snapshot_until_timestamp(self):
g1 = GraphGenerator().create_graph()
self.graph_persistor.last_event_timestamp = utcnow()
self.graph_persistor.store_graph(g1)
time.sleep(1)
time_in_between = utcnow()
time.sleep(1)
g2 = GraphGenerator(5).create_graph()
self.graph_persistor.last_event_timestamp = utcnow()
self.graph_persistor.store_graph(g2)
graph_snapshot = self.graph_persistor.load_graph(time_in_between)
self.assert_graph_equal(g1, graph_snapshot)
self.graph_persistor.delete_graph_snapshots(utcnow())
def test_delete_graph_snapshots(self):
g = GraphGenerator().create_graph() g = GraphGenerator().create_graph()
self.graph_persistor.last_event_timestamp = utcnow() vertices = g.get_vertices()
self.graph_persistor.store_graph(g) graph_persistor = graph_persistency.GraphPersistency(self.conf,
self.graph_persistor.delete_graph_snapshots(utcnow()) self._db, g)
graph_snapshot = self.graph_persistor.load_graph(utcnow()) self.fail_msg = None
self.assertIsNone(graph_snapshot) self.event_id = 1
def callback(pre_item,
current_item,
is_vertex,
graph):
try:
graph_persistor.persist_event(
pre_item, current_item, is_vertex, graph, self.event_id)
except Exception as e:
self.fail_msg = 'persist_event failed with exception ' + str(e)
self.event_id = self.event_id + 1
# Subscribe graph changes to callback, so events are written to db
# after each update_vertex and update_edge callback will be called
g.subscribe(callback)
vertices[0][VertexProperties.VITRAGE_IS_DELETED] = True
g.update_vertex(vertices[0])
vertices[1][VertexProperties.VITRAGE_IS_DELETED] = True
g.update_vertex(vertices[1])
edge = g.get_edges(vertices[0].vertex_id).pop()
edge[EdgeProperties.VITRAGE_IS_DELETED] = True
g.update_edge(edge)
# Store graph:
graph_persistor.store_graph()
# Create more events:
vertices[2][VertexProperties.VITRAGE_IS_DELETED] = True
g.update_vertex(vertices[2])
vertices[3][VertexProperties.VITRAGE_IS_DELETED] = True
g.update_vertex(vertices[3])
edge = g.get_edges(vertices[2].vertex_id).pop()
edge[EdgeProperties.RELATIONSHIP_TYPE] = 'kuku'
g.update_edge(edge)
self.assertIsNone(self.fail_msg, 'callback failed')
# Reload snapshot
recovered_data = graph_persistor.query_recent_snapshot()
recovered_graph = self.load_snapshot(recovered_data)
# Replay events:
self.assertEqual(3, recovered_data.event_id, 'graph snapshot event_id')
graph_persistor.replay_events(recovered_graph, recovered_data.event_id)
self.assert_graph_equal(g, recovered_graph)
@staticmethod
def load_snapshot(data):
return NXGraph.read_gpickle(data.graph_snapshot) if data else None

View File

@ -3,11 +3,11 @@
"id": "111", "id": "111",
"vitrage_is_deleted": false, "vitrage_is_deleted": false,
"vitrage_category": "RESOURCE", "vitrage_category": "RESOURCE",
"vitrage_operational_state": "N/A", "vitrage_operational_state": "OK",
"state": "AVAILABLE", "state": "AVAILABLE",
"vitrage_type": "nova.host", "vitrage_type": "nova.host",
"vitrage_sample_timestamp": "2017-12-24 10:32:41.389676+00:00", "vitrage_sample_timestamp": "2017-12-24 10:32:41.389676+00:00",
"vitrage_aggregated_state": null, "vitrage_aggregated_state": "AVAILABLE",
"vitrage_is_placeholder": false, "vitrage_is_placeholder": false,
"is_real_vitrage_id": true "is_real_vitrage_id": true
} }

View File

@ -163,7 +163,7 @@ class NovaInstanceTransformerTest(base.BaseTest):
def _validate_vertex_props(self, vertex, event): def _validate_vertex_props(self, vertex, event):
self.assertThat(vertex.properties, matchers.HasLength(13)) self.assertThat(vertex.properties, matchers.HasLength(14))
is_update_event = tbase.is_update_event(event) is_update_event = tbase.is_update_event(event)

View File

@ -60,6 +60,13 @@ class TestEntityGraphUnitBase(base.BaseTest):
min=1) min=1)
] ]
PERSISTENCY_OPTS = [
cfg.StrOpt('persistor_topic',
default=None),
cfg.BoolOpt('enable_persistency',
default=True),
]
NUM_CLUSTERS = 1 NUM_CLUSTERS = 1
NUM_ZONES = 2 NUM_ZONES = 2
NUM_HOSTS = 4 NUM_HOSTS = 4

View File

@ -427,7 +427,7 @@ class TestGraph(GraphTestBase):
self.assertEqual(OPENSTACK_CLUSTER, found_vertex[VProps.VITRAGE_TYPE], self.assertEqual(OPENSTACK_CLUSTER, found_vertex[VProps.VITRAGE_TYPE],
'get_vertices check node vertex') 'get_vertices check node vertex')
def _check_callback_result(self, result, msg, exp_prev, exp_curr): def _check_callbacks_result(self, msg, exp_prev, exp_curr):
def assert_none_or_equals(exp, act, message): def assert_none_or_equals(exp, act, message):
if exp: if exp:
@ -435,12 +435,17 @@ class TestGraph(GraphTestBase):
else: else:
self.assertIsNone(act, message) self.assertIsNone(act, message)
self.assertIsNotNone(result, msg + ' Callback was not called') self.assertIsNotNone(self.result, msg + ' Callback was not called')
assert_none_or_equals(exp_prev, result[0], assert_none_or_equals(exp_prev, self.result[0],
msg + ' prev_item unexpected') msg + ' prev_item unexpected')
assert_none_or_equals(exp_curr, result[1], assert_none_or_equals(exp_curr, self.result[1],
msg + ' curr_item unexpected') msg + ' curr_item unexpected')
self.assertEqual(self.result, self.final_result,
'callback order is incorrect')
self.result = None self.result = None
self.final_result = None
def _assert_none_or_equals(self, exp, act, msg): def _assert_none_or_equals(self, exp, act, msg):
if exp: if exp:
@ -453,11 +458,15 @@ class TestGraph(GraphTestBase):
g = NXGraph('test_graph_callbacks') g = NXGraph('test_graph_callbacks')
self.result = None self.result = None
self.final_result = None
def callback(pre_item, def callback_2(pre_item, current_item, is_vertex, graph):
current_item, # We want to make sure this callback was called ^after^ the other
is_vertex, # And expect that the later callback copies the result from the
graph): # prior call, hence these should be equal after both were called
self.final_result = self.result
def callback(pre_item, current_item, is_vertex, graph):
LOG.info('called with: pre_event_item ' + str(pre_item) + LOG.info('called with: pre_event_item ' + str(pre_item) +
' current_item ' + str(current_item)) ' current_item ' + str(current_item))
self.assertIsNotNone(current_item) self.assertIsNotNone(current_item)
@ -470,32 +479,31 @@ class TestGraph(GraphTestBase):
'Got notification, but add_vertex notification is not registered') 'Got notification, but add_vertex notification is not registered')
# subscribe # subscribe
g.subscribe(callback_2, finalization=True)
g.subscribe(callback) g.subscribe(callback)
# These actions will trigger callbacks: # These actions will trigger callbacks:
g.add_vertex(v_node) g.add_vertex(v_node)
self._check_callback_result(self.result, 'add vertex', None, v_node) self._check_callbacks_result('add vertex', None, v_node)
g.add_vertex(v_host) g.add_vertex(v_host)
self._check_callback_result(self.result, 'add vertex', None, v_host) self._check_callbacks_result('add vertex', None, v_host)
g.add_edge(e_node_to_host) g.add_edge(e_node_to_host)
self._check_callback_result(self.result, 'add edge', None, self._check_callbacks_result('add edge', None, e_node_to_host)
e_node_to_host)
updated_vertex = g.get_vertex(v_host.vertex_id) updated_vertex = g.get_vertex(v_host.vertex_id)
updated_vertex[VProps.VITRAGE_CATEGORY] = ALARM updated_vertex[VProps.VITRAGE_CATEGORY] = ALARM
g.update_vertex(updated_vertex) g.update_vertex(updated_vertex)
self._check_callback_result(self.result, 'update vertex', self._check_callbacks_result('update vertex', v_host, updated_vertex)
v_host, updated_vertex)
updated_edge = g.get_edge(e_node_to_host.source_id, updated_edge = g.get_edge(e_node_to_host.source_id,
e_node_to_host.target_id, e_node_to_host.target_id,
e_node_to_host.label) e_node_to_host.label)
updated_edge['ZIG'] = 'ZAG' updated_edge['ZIG'] = 'ZAG'
g.update_edge(updated_edge) g.update_edge(updated_edge)
self._check_callback_result(self.result, 'update edge', e_node_to_host, self._check_callbacks_result('update edge', e_node_to_host,
updated_edge) updated_edge)
def test_union(self): def test_union(self):
v1 = v_node v1 = v_node