Merge "refactoring vitrage processes"

This commit is contained in:
Zuul 2018-03-27 15:12:53 +00:00 committed by Gerrit Code Review
commit e85af78a76
21 changed files with 488 additions and 724 deletions

View File

@ -16,6 +16,7 @@ from oslo_log import log
import oslo_messaging
from oslo_service import service as os_service
from vitrage.common.utils import spawn
from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.messaging import VitrageNotifier
@ -44,6 +45,9 @@ class VitrageApiHandlerService(os_service.Service):
self.db = storage.get_connection_from_config(conf)
def start(self):
spawn(self._start)
def _start(self):
LOG.info("Vitrage Api Handler Service - Starting...")
super(VitrageApiHandlerService, self).start()

View File

@ -15,43 +15,25 @@
import sys
from oslo_service import service as os_service
from vitrage.api_handler.service import VitrageApiHandlerService
from vitrage.cli import VITRAGE_TITLE
from vitrage import entity_graph
from vitrage.entity_graph.consistency.service import VitrageConsistencyService
from vitrage.entity_graph import get_graph_driver
from vitrage.entity_graph.graph_init import VitrageGraphInit
from vitrage import service
from vitrage import storage
from vitrage.entity_graph.service import VitrageGraphService
from vitrage.evaluator.evaluator_service import EvaluatorManager
def main():
"""Starts all the Entity graph services
1. Starts the Entity graph service
2. Starts the api_handler service
3. Starts the Consistency service
"""
"""Main method of vitrage-graph"""
print(VITRAGE_TITLE)
conf = service.prepare_service()
e_graph = entity_graph.get_graph_driver(conf)('Entity Graph')
evaluator = EvaluatorManager(conf, e_graph)
launcher = os_service.ServiceLauncher(conf)
e_graph = get_graph_driver(conf)('Entity Graph')
db_connection = storage.get_connection_from_config(conf)
clear_active_actions_table(db_connection)
launcher.launch_service(VitrageGraphService(
conf, e_graph, evaluator, db_connection))
launcher.launch_service(VitrageApiHandlerService(conf, e_graph))
launcher.launch_service(VitrageConsistencyService(conf, e_graph))
launcher.wait()
VitrageApiHandlerService(conf, e_graph).start()
VitrageGraphInit(conf, e_graph, db_connection).run()
def clear_active_actions_table(db_connection):
@ -62,5 +44,6 @@ def clear_active_actions_table(db_connection):
"""
db_connection.active_actions.delete()
if __name__ == "__main__":
sys.exit(main())

View File

@ -20,6 +20,7 @@ from collections import defaultdict
import copy
import itertools
import random
import threading
from oslo_config import cfg
@ -79,3 +80,10 @@ def get_portion(lst, num_of_portions, portion_index):
curr_portion = next(g)
portions[curr_portion].append(curr_item)
return portions[portion_index]
def spawn(target, *args, **kwargs):
t = threading.Thread(target=target, args=args, kwargs=kwargs)
t.daemon = True
t.start()
return t

View File

@ -24,8 +24,10 @@ from vitrage.common.constants import GraphAction
from vitrage.common.constants import VertexProperties as VProps
from vitrage.datasources.consistency import CONSISTENCY_DATASOURCE
from vitrage.datasources import OPENSTACK_CLUSTER
from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.evaluator.actions.evaluator_event_transformer \
import VITRAGE_DATASOURCE
from vitrage.messaging import VitrageNotifier
from vitrage.utils.datetime import utcnow
LOG = log.getLogger(__name__)
@ -35,10 +37,11 @@ class ConsistencyEnforcer(object):
def __init__(self,
conf,
actions_callback,
entity_graph):
entity_graph,
actions_callback=None):
self.conf = conf
self.actions_callback = actions_callback
self.actions_callback = actions_callback or VitrageNotifier(
conf, 'vitrage_consistency', [EVALUATOR_TOPIC]).notify
self.graph = entity_graph
def periodic_process(self):

View File

@ -1,58 +0,0 @@
# Copyright 2016 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from oslo_service import service as os_service
from vitrage.entity_graph.consistency.consistency_enforcer \
import ConsistencyEnforcer
from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.messaging import VitrageNotifier
LOG = log.getLogger(__name__)
class VitrageConsistencyService(os_service.Service):
def __init__(self,
conf,
entity_graph):
super(VitrageConsistencyService, self).__init__()
self.conf = conf
self.entity_graph = entity_graph
self.actions_notifier = VitrageNotifier(
conf, 'vitrage_consistency', [EVALUATOR_TOPIC])
def start(self):
LOG.info("Vitrage Consistency Service - Starting...")
super(VitrageConsistencyService, self).start()
consistency_enf = ConsistencyEnforcer(
conf=self.conf,
actions_callback=self.actions_notifier.notify,
entity_graph=self.entity_graph)
self.tg.add_timer(self.conf.datasources.snapshots_interval,
consistency_enf.periodic_process,
initial_delay=60 +
self.conf.datasources.snapshots_interval)
LOG.info("Vitrage Consistency Service - Started!")
def stop(self, graceful=False):
LOG.info("Vitrage Consistency Service - Stopping...")
super(VitrageConsistencyService, self).stop()
LOG.info("Vitrage Consistency Service - Stopped!")

View File

@ -1,14 +0,0 @@
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__author__ = 'stack'

View File

@ -1,137 +0,0 @@
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import setproctitle
import time
from oslo_log import log
from oslo_service import service as os_service
LOG = log.getLogger(__name__)
GRAPH_UPDATE = 'graph_update'
POISON_PILL = None
class GraphCloneManagerBase(object):
def __init__(self, conf, entity_graph, worker_num):
self._conf = conf
self._entity_graph = entity_graph
self._workers_num = worker_num
self._worker_queues = list()
self._p_launcher = os_service.ProcessLauncher(conf)
def start(self):
LOG.info('%s start %s processes', self.__class__.__name__,
self._workers_num)
for i in range(self._workers_num):
worker_queue = self._run_worker(i, self._workers_num)
self._worker_queues.append(worker_queue)
self.before_subscribe()
self._entity_graph.subscribe(self.notify_graph_update)
@abc.abstractmethod
def _run_worker(self, worker_index, workers_num):
raise NotImplementedError
@abc.abstractmethod
def before_subscribe(self):
pass
def notify_graph_update(self, before, current, is_vertex, *args, **kwargs):
"""Notify all workers
This method is subscribed to entity graph changes.
Per each change in the main entity graph, this method will notify
each of the evaluators, causing them to update their own graph.
"""
self._notify_and_wait((GRAPH_UPDATE, before, current, is_vertex))
def _notify_and_wait(self, payload):
for q in self._worker_queues:
q.put(payload)
time.sleep(0) # context switch before join
for q in self._worker_queues:
q.join()
def stop_all_workers(self):
self._notify_and_wait(POISON_PILL)
for q in self._worker_queues:
q.close()
self._worker_queues = list()
class GraphCloneWorkerBase(os_service.Service):
def __init__(self,
conf,
task_queue,
entity_graph):
super(GraphCloneWorkerBase, self).__init__()
self._conf = conf
self._task_queue = task_queue
self._entity_graph = entity_graph
def name(self):
return ''
def start(self):
super(GraphCloneWorkerBase, self).start()
try:
setproctitle.setproctitle('{} {} {}'.format(
'vitrage-graph',
self.__class__.__name__,
self.name()))
except Exception:
LOG.warning('failed to set process name')
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
self.tg.add_thread(self._read_queue)
LOG.info("%s - Started!", self.__class__.__name__)
def _read_queue(self):
while True:
try:
next_task = self._task_queue.get()
if next_task is POISON_PILL:
self._task_queue.task_done()
break
self.do_task(next_task)
except Exception as e:
LOG.exception("Graph may not be in sync: exception %s", e)
self._task_queue.task_done()
# Evaluator queue may have been updated, thus the sleep:
time.sleep(0)
def do_task(self, task):
action = task[0]
if action == GRAPH_UPDATE:
(action, before, current, is_vertex) = task
self._graph_update(before, current, is_vertex)
def _graph_update(self, before, current, is_vertex):
if current:
if is_vertex:
self._entity_graph.add_vertex(current)
else:
self._entity_graph.add_edge(current)
else:
if is_vertex:
self._entity_graph.delete_vertex(before)
else:
self._entity_graph.delete_edge(before)
def stop(self, graceful=False):
super(GraphCloneWorkerBase, self).stop(graceful)
LOG.info("%s - Stopped!", self.__class__.__name__)

View File

@ -1,4 +1,4 @@
# Copyright 2015 - Alcatel-Lucent
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -11,89 +11,74 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_messaging
import threading
import time
from oslo_log import log
from oslo_service import service as os_service
import oslo_messaging
from vitrage.common.utils import spawn
from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.entity_graph.processor.processor import Processor
from vitrage.entity_graph.vitrage_init import VitrageInit
from vitrage.evaluator.template_loader_service import TemplateLoaderManager
from vitrage.entity_graph.scheduler import Scheduler
from vitrage.entity_graph.workers import GraphWorkersManager
from vitrage import messaging
from vitrage.persistency.graph_persistor import GraphPersistor
LOG = log.getLogger(__name__)
class VitrageGraphService(os_service.Service):
def __init__(self,
conf,
graph,
evaluator,
db):
super(VitrageGraphService, self).__init__()
class VitrageGraphInit(object):
def __init__(self, conf, graph, db_connection):
self.conf = conf
self.graph = graph
self.evaluator = evaluator
self.templates_loader = TemplateLoaderManager(conf, graph, db)
self.init = VitrageInit(conf, graph, self.evaluator,
self.templates_loader)
self.graph_persistor = GraphPersistor(conf) if \
self.conf.persistency.enable_persistency else None
self.processor = Processor(self.conf, self.init, graph,
self.graph_persistor)
self.listener = self._init_listener()
def _init_listener(self):
collector_topic = self.conf.datasources.notification_topic_collector
evaluator_topic = EVALUATOR_TOPIC
return TwoPriorityListener(
self.conf,
self.workers = GraphWorkersManager(conf, graph, db_connection)
self.events_coordination = EventsCoordination(
conf,
self.process_event,
collector_topic,
evaluator_topic)
conf.datasources.notification_topic_collector,
EVALUATOR_TOPIC)
self.end_messages = {}
self.scheduler = Scheduler(conf, graph)
self.processor = Processor(conf,
self._handle_end_message,
graph,
self.scheduler.graph_persistor)
def run(self):
LOG.info('Init Started')
self.events_coordination.start()
self._wait_for_all_end_messages()
self.scheduler.start_periodic_tasks()
self.processor.start_notifier()
spawn(self.workers.submit_start_evaluations)
self.workers.run()
def process_event(self, event):
if event.get('template_action'):
self.templates_loader.handle_template_event(event)
self.evaluator.reload_evaluators_templates()
self.workers.submit_template_event(event)
self.workers.submit_evaluators_reload_templates()
else:
self.processor.process_event(event)
def start(self):
LOG.info("Vitrage Graph Service - Starting...")
super(VitrageGraphService, self).start()
if self.graph_persistor:
self.tg.add_timer(
self.conf.persistency.graph_persistency_interval,
self.graph_persistor.store_graph,
self.conf.persistency.graph_persistency_interval,
graph=self.graph)
self.tg.add_thread(
self.init.initializing_process,
on_end_messages_func=self.processor.on_recieved_all_end_messages)
self.listener.start()
LOG.info("Vitrage Graph Service - Started!")
def _handle_end_message(self, vitrage_type):
self.end_messages[vitrage_type] = True
def stop(self, graceful=False):
LOG.info("Vitrage Graph Service - Stopping...")
self.evaluator.stop_all_workers()
self.templates_loader.stop_all_workers()
self.listener.stop()
self.listener.wait()
super(VitrageGraphService, self).stop(graceful)
LOG.info("Vitrage Graph Service - Stopped!")
def _wait_for_all_end_messages(self):
start = time.time()
timeout = self.conf.consistency.initialization_max_retries * \
self.conf.consistency.initialization_interval
while time.time() < start + timeout:
if len(self.end_messages) == len(self.conf.datasources.types):
LOG.info('end messages received')
return True
time.sleep(0.2)
LOG.warning('Missing end messages %s', self.end_messages.keys())
return False
PRIORITY_DELAY = 0.05
class TwoPriorityListener(object):
class EventsCoordination(object):
def __init__(self, conf, do_work_func, topic_low, topic_high):
self._conf = conf
self._lock = threading.Lock()
@ -103,7 +88,7 @@ class TwoPriorityListener(object):
try:
return do_work_func(event)
except Exception as e:
LOG.exception('Got Exception %s', e)
LOG.exception('Got Exception %s for event %s', e, str(event))
self._do_work_func = do_work
@ -114,7 +99,9 @@ class TwoPriorityListener(object):
def start(self):
self._high_pri_listener.start()
LOG.info('Listening on %s', self._high_pri_listener.targets[0].topic)
self._low_pri_listener.start()
LOG.info('Listening on %s', self._low_pri_listener.targets[0].topic)
def stop(self):
self._low_pri_listener.stop()

View File

@ -23,7 +23,8 @@ from vitrage.entity_graph.mappings.datasource_info_mapper import \
from vitrage.entity_graph.processor import base as processor
from vitrage.entity_graph.processor.notifier import GraphNotifier
from vitrage.entity_graph.processor import processor_utils as PUtils
from vitrage.entity_graph.transformer_manager import TransformerManager
from vitrage.entity_graph.processor.transformer_manager import \
TransformerManager
from vitrage.graph import Direction
LOG = log.getLogger(__name__)
@ -31,14 +32,14 @@ LOG = log.getLogger(__name__)
class Processor(processor.ProcessorBase):
def __init__(self, conf, initialization_status, e_graph,
def __init__(self, conf, end_messages_func=None, e_graph=None,
graph_persistor=None):
super(Processor, self).__init__()
self.conf = conf
self.transformer_manager = TransformerManager(self.conf)
self.info_mapper = DatasourceInfoMapper(self.conf)
self._initialize_events_actions()
self.initialization_status = initialization_status
self.end_messages_func = end_messages_func
self.entity_graph = e_graph
self._notifier = GraphNotifier(conf)
self._graph_persistor = graph_persistor
@ -189,9 +190,9 @@ class Processor(processor.ProcessorBase):
vertex, graph_vertex)
def handle_end_message(self, vertex, neighbors):
self.initialization_status.handle_end_message(vertex)
self.end_messages_func(vertex[VProps.VITRAGE_TYPE])
def on_recieved_all_end_messages(self):
def start_notifier(self):
if self._notifier and self._notifier.enabled:
self.entity_graph.subscribe(self._notifier.notify_when_applicable)
LOG.info('Graph notifications subscription added')

View File

@ -0,0 +1,73 @@
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from concurrent.futures import ThreadPoolExecutor
from futurist import periodics
from oslo_log import log
from vitrage.common.utils import spawn
from vitrage.entity_graph.consistency.consistency_enforcer import\
ConsistencyEnforcer
from vitrage.persistency.graph_persistor import GraphPersistor
LOG = log.getLogger(__name__)
class Scheduler(object):
def __init__(self,
conf,
graph):
super(Scheduler, self).__init__()
self.conf = conf
self.graph = graph
self.graph_persistor = GraphPersistor(conf) if \
self.conf.persistency.enable_persistency else None
self.consistency = ConsistencyEnforcer(conf, graph)
self.periodic = None
def start_periodic_tasks(self):
self.periodic = periodics.PeriodicWorker.create(
[], executor_factory=lambda: ThreadPoolExecutor(max_workers=10))
self.add_persistor_timer()
self.add_consistency_timer()
spawn(self.periodic.start)
def add_persistor_timer(self):
spacing = self.conf.persistency.graph_persistency_interval
@periodics.periodic(spacing=spacing)
def persist():
if self.graph_persistor:
try:
self.graph_persistor.store_graph(graph=self.graph)
except Exception as e:
LOG.exception('persist failed %s', e)
self.periodic.add(persist)
LOG.info("periodic task - persistor %s", spacing)
def add_consistency_timer(self):
spacing = self.conf.datasources.snapshots_interval
@periodics.periodic(spacing=spacing)
def run_consistency():
try:
self.consistency.periodic_process()
except Exception as e:
LOG.exception('run_consistency failed %s', e)
self.periodic.add(run_consistency)
LOG.info("periodic task - run_consistency %s", spacing)

View File

@ -1,112 +0,0 @@
# Copyright 2016 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
import time
from vitrage.common.constants import VertexProperties as VProps
LOG = log.getLogger(__name__)
class VitrageInit(object):
STARTED = 'started'
RECEIVED_ALL_END_MESSAGES = 'received_all_end_messages'
FINISHED = 'finished'
def __init__(self, conf, graph=None, evaluator=None, template_loader=None):
self.conf = conf
self.graph = graph
self.evaluator = evaluator
self.template_loader = template_loader
self.status = self.STARTED
self.end_messages = {}
def initializing_process(self, on_end_messages_func):
try:
LOG.info('Init Started')
if not self._wait_for_all_end_messages():
LOG.warning('Initialization - max retries reached %s',
self.end_messages)
else:
LOG.info('Initialization - All end messages were received')
on_end_messages_func()
self.evaluator.start()
if self.template_loader:
self.template_loader.start()
# TODO(idan_hefetz) As vitrage is not yet persistent, there aren't
# TODO(idan_hefetz) any deduced alarms to be removed during init
# if not self._wait_for_action(self.evaluator_queue.empty):
# LOG.error('Evaluator Queue Not Empty')
# self._mark_old_deduced_alarms_as_deleted(timestamp, self.graph,
# self.evaluator_queue)
self.status = self.FINISHED
LOG.info('Init Finished')
except Exception as e:
LOG.exception('Init Failed: %s', e)
def handle_end_message(self, vertex):
self.end_messages[vertex[VProps.VITRAGE_TYPE]] = True
if len(self.end_messages) == len(self.conf.datasources.types):
self.status = self.RECEIVED_ALL_END_MESSAGES
def _wait_for_all_end_messages(self):
return self._wait_for_action(
lambda: self.status == self.RECEIVED_ALL_END_MESSAGES)
def _wait_for_action(self, function):
count_retries = 0
while True:
if count_retries >= \
self.conf.consistency.initialization_max_retries:
return False
if function():
return True
count_retries += 1
time.sleep(self.conf.consistency.initialization_interval)
# def _mark_old_deduced_alarms_as_deleted(self, timestamp,graph,out_queue):
# query = {
# 'and': [
# {'==': {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}},
# {'==': {VProps.VITRAGE_TYPE: VProps.VITRAGE_TYPE}},
# {'<': {VProps.VITRAGE_SAMPLE_TIMESTAMP: timestamp}}
# ]
# }
# old_deduced_alarms = graph.get_vertices(query_dict=query)
# self._push_events_to_queue(old_deduced_alarms,
# GraphAction.DELETE_ENTITY,
# out_queue)
#
# def _push_events_to_queue(self, vertices, action, out_queue):
# for vertex in vertices:
# event = {
# DSProps.ENTITY_TYPE: CONSISTENCY_DATASOURCE,
# DSProps.DATASOURCE_ACTION: DatasourceAction.UPDATE,
# DSProps.SAMPLE_DATE: str(utcnow()),
# DSProps.EVENT_TYPE: action,
# VProps.VITRAGE_ID: vertex[VProps.VITRAGE_ID],
# VProps.ID: vertex.get(VProps.ID, None),
# VProps.VITRAGE_TYPE: vertex[VProps.VITRAGE_TYPE],
# VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY],
# VProps.IS_REAL_VITRAGE_ID: True
# }
# out_queue.put(event)

View File

@ -0,0 +1,325 @@
# Copyright 2018 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import cotyledon
import multiprocessing
from oslo_concurrency import processutils as ps
from oslo_log import log
from vitrage.common.constants import TemplateStatus as TStatus
from vitrage.common.constants import TemplateTypes as TType
from vitrage.common.exception import VitrageError
from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.evaluator.actions.base import ActionMode
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
from vitrage.evaluator.scenario_repository import ScenarioRepository
from vitrage.messaging import VitrageNotifier
LOG = log.getLogger(__name__)
# Supported message types
GRAPH_UPDATE = 'graph_update'
START_EVALUATION = 'start_evaluation'
RELOAD_TEMPLATES = 'reload_templates'
TEMPLATE_ACTION = 'template_action'
ADD = 'add'
DELETE = 'delete'
class GraphWorkersManager(cotyledon.ServiceManager):
"""GraphWorkersManager
- worker processes
- the queues used to communicate with these workers
- methods interface to submit tasks to workers
"""
def __init__(self, conf, entity_graph, db):
super(GraphWorkersManager, self).__init__()
self._conf = conf
self._entity_graph = entity_graph
self._db = db
self._evaluator_queues = []
self._template_queues = []
self._all_queues = []
self.add_evaluator_workers()
self.add_template_workers()
def add_evaluator_workers(self):
"""Add evaluator workers
Evaluator workers receive all graph updates, hence are updated.
Each evaluator worker holds an enabled scenario-evaluator and process
every change.
Each worker's scenario-evaluator runs different template scenarios.
Interface to these workers is:
submit_graph_update(..)
submit_start_evaluations(..)
submit_evaluators_reload_templates(..)
"""
if self._evaluator_queues:
raise VitrageError('add_evaluator_workers called more than once')
workers = self._conf.evaluator.workers or ps.get_worker_count()
queues = [multiprocessing.JoinableQueue() for i in range(workers)]
self.add(EvaluatorWorker,
args=(self._conf, queues, self._entity_graph, workers),
workers=workers)
self._evaluator_queues = queues
self._all_queues.extend(queues)
def add_template_workers(self):
"""Add template workers
Template workers receive all graph updates, hence are updated.
Each template worker holds a disabled scenario-evaluator that does
not process changes.
The scenario-evaluator is enabled when a template add/delete arrives,
so this worker will run the added template on the entire graph.
Interface to these workers is:
submit_graph_update(..)
submit_template_event(..)
"""
if self._template_queues:
raise VitrageError('add_template_workers called more than once')
workers = 1 # currently more than one worker is not supported
queues = [multiprocessing.JoinableQueue() for i in range(workers)]
self.add(TemplateLoaderWorker,
args=(self._conf, queues, self._entity_graph),
workers=workers)
self._template_queues = queues
self._all_queues.extend(queues)
def submit_graph_update(self, before, current, is_vertex, *args, **kwargs):
"""Graph update all workers
This method is subscribed to entity graph changes.
Per each change in the main entity graph, this method will notify
each of the workers, causing them to update their own graph.
"""
self._submit_and_wait(
self._all_queues,
(GRAPH_UPDATE, before, current, is_vertex))
def submit_start_evaluations(self):
"""Enable scenario-evaluator in all evaluator workers
Enables the worker's scenario-evaluator, and run it on the entire graph
"""
self._submit_and_wait(self._evaluator_queues, (START_EVALUATION,))
self._entity_graph.subscribe(self.submit_graph_update)
LOG.info('Init Finished')
def submit_evaluators_reload_templates(self):
"""Recreate the scenario-repository in all evaluator workers
So that new/deleted templates are added/removed
"""
self._submit_and_wait(self._evaluator_queues, (RELOAD_TEMPLATES,))
def submit_template_event(self, event):
"""Template worker to load the new/deleted template
Load the template to scenario-evaluator and run it on the entire graph
"""
template_action = event.get(TEMPLATE_ACTION)
if template_action == ADD:
templates = self._db.templates.query(status=TStatus.LOADING)
new_status = TStatus.ACTIVE
action_mode = ActionMode.DO
elif template_action == DELETE:
templates = self._db.templates.query(status=TStatus.DELETING)
new_status = TStatus.DELETED
action_mode = ActionMode.UNDO
else:
raise VitrageError('Invalid template_action %s' % template_action)
self._submit_and_wait(
self._template_queues,
(
TEMPLATE_ACTION,
[t.name for t in templates
if t.template_type == TType.STANDARD],
action_mode,
))
for t in templates:
self._db.templates.update(t.uuid, 'status', new_status)
@staticmethod
def _submit_and_wait(queues, payload):
for q in queues:
q.put(payload)
for q in queues:
q.join()
class GraphCloneWorkerBase(cotyledon.Service):
def __init__(self,
worker_id,
conf,
task_queues,
entity_graph):
super(GraphCloneWorkerBase, self).__init__(worker_id)
self._conf = conf
self._task_queue = task_queues[worker_id]
self._entity_graph = entity_graph
self._running = False
name = 'GraphCloneWorkerBase'
@abc.abstractmethod
def _init_instance(self):
"""This method is executed in the newly created process"""
raise NotImplementedError
def run(self):
LOG.info("%s - Starting %s", self.__class__.__name__, self.worker_id)
self._running = True
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
self._init_instance()
self._read_queue()
def terminate(self):
self._running = False
LOG.info("%s - Stopped!", self.__class__.__name__)
def _read_queue(self):
LOG.debug("%s - reading queue %s",
self.__class__.__name__, self.worker_id)
while self._running:
try:
next_task = self._task_queue.get()
self.do_task(next_task)
except Exception as e:
LOG.exception("Graph may not be in sync: exception %s", e)
self._task_queue.task_done()
def do_task(self, task):
action = task[0]
if action == GRAPH_UPDATE:
(action, before, current, is_vertex) = task
self._graph_update(before, current, is_vertex)
def _graph_update(self, before, current, is_vertex):
if current:
if is_vertex:
self._entity_graph.add_vertex(current)
else:
self._entity_graph.add_edge(current)
else:
if is_vertex:
self._entity_graph.delete_vertex(before)
else:
self._entity_graph.delete_edge(before)
class EvaluatorWorker(GraphCloneWorkerBase):
def __init__(self,
worker_id,
conf,
task_queues,
e_graph,
workers_num):
super(EvaluatorWorker, self).__init__(
worker_id, conf, task_queues, e_graph)
self._workers_num = workers_num
self._evaluator = None
name = 'EvaluatorWorker'
def _init_instance(self):
scenario_repo = ScenarioRepository(self._conf, self.worker_id,
self._workers_num)
actions_callback = VitrageNotifier(
conf=self._conf,
publisher_id='vitrage_evaluator',
topics=[EVALUATOR_TOPIC]).notify
self._evaluator = ScenarioEvaluator(
self._conf,
self._entity_graph,
scenario_repo,
actions_callback,
enabled=False)
self._evaluator.scenario_repo.log_enabled_scenarios()
def do_task(self, task):
super(EvaluatorWorker, self).do_task(task)
action = task[0]
if action == START_EVALUATION:
self._evaluator.run_evaluator()
elif action == RELOAD_TEMPLATES:
self._reload_templates()
def _reload_templates(self):
LOG.info("reloading evaluator scenarios")
scenario_repo = ScenarioRepository(self._conf, self.worker_id,
self._workers_num)
self._evaluator.scenario_repo = scenario_repo
self._evaluator.scenario_repo.log_enabled_scenarios()
class TemplateLoaderWorker(GraphCloneWorkerBase):
def __init__(self,
worker_id,
conf,
task_queues,
e_graph):
super(TemplateLoaderWorker, self).__init__(worker_id,
conf,
task_queues,
e_graph)
self._evaluator = None
name = 'TemplateLoaderWorker'
def _init_instance(self):
actions_callback = VitrageNotifier(
conf=self._conf,
publisher_id='vitrage_evaluator',
topics=[EVALUATOR_TOPIC]).notify
self._evaluator = ScenarioEvaluator(
self._conf,
self._entity_graph,
None,
actions_callback,
enabled=False)
def do_task(self, task):
super(TemplateLoaderWorker, self).do_task(task)
action = task[0]
if action == TEMPLATE_ACTION:
(action, template_names, action_mode) = task
self._template_action(template_names, action_mode)
def _template_action(self, template_names, action_mode):
self._enable_evaluator_templates(template_names)
self._evaluator.run_evaluator(action_mode)
self._disable_evaluator()
def _enable_evaluator_templates(self, template_names):
scenario_repo = ScenarioRepository(self._conf)
for s in scenario_repo._all_scenarios:
s.enabled = False
for template_name in template_names:
if s.id.startswith(template_name):
s.enabled = True
self._evaluator.scenario_repo = scenario_repo
self._evaluator.scenario_repo.log_enabled_scenarios()
self._evaluator.enabled = True
def _disable_evaluator(self):
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
self._evaluator.enabled = False

View File

@ -1,109 +0,0 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import multiprocessing
from oslo_concurrency import processutils
from oslo_log import log
from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.entity_graph.graph_clone import base
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
from vitrage.evaluator.scenario_repository import ScenarioRepository
from vitrage.messaging import VitrageNotifier
LOG = log.getLogger(__name__)
START_EVALUATION = 'start_evaluation'
RELOAD_TEMPLATES = 'reload_templates'
class EvaluatorManager(base.GraphCloneManagerBase):
def __init__(self, conf, entity_graph):
workers_num = conf.evaluator.workers or processutils.get_worker_count()
super(EvaluatorManager, self).__init__(conf, entity_graph, workers_num)
def before_subscribe(self):
self.start_evaluations()
def _run_worker(self, worker_index, workers_num):
"""Create an EvaluatorWorker and it's task queue
The new worker is initialized with a scenario repository
that only contains a portion of the templates
"""
tasks_queue = multiprocessing.JoinableQueue()
w = EvaluatorWorker(
self._conf,
tasks_queue,
self._entity_graph,
worker_index,
workers_num)
self._p_launcher.launch_service(w)
return tasks_queue
def start_evaluations(self):
self._notify_and_wait((START_EVALUATION,))
def reload_evaluators_templates(self):
self._notify_and_wait((RELOAD_TEMPLATES,))
class EvaluatorWorker(base.GraphCloneWorkerBase):
def __init__(self,
conf,
task_queue,
e_graph,
worker_index,
workers_num):
super(EvaluatorWorker, self).__init__(conf, task_queue, e_graph)
self._worker_index = worker_index
self._workers_num = workers_num
self._evaluator = None
def name(self):
return "(%s)" % str(self._worker_index)
def start(self):
super(EvaluatorWorker, self).start()
scenario_repo = ScenarioRepository(self._conf, self._worker_index,
self._workers_num)
actions_callback = VitrageNotifier(
conf=self._conf,
publisher_id='vitrage_evaluator',
topics=[EVALUATOR_TOPIC]).notify
self._evaluator = ScenarioEvaluator(
self._conf,
self._entity_graph,
scenario_repo,
actions_callback,
enabled=False)
self._evaluator.scenario_repo.log_enabled_scenarios()
def do_task(self, task):
super(EvaluatorWorker, self).do_task(task)
action = task[0]
if action == START_EVALUATION:
self._evaluator.run_evaluator()
elif action == RELOAD_TEMPLATES:
self._reload_templates()
def _reload_templates(self):
scenario_repo = ScenarioRepository(self._conf, self._worker_index,
self._workers_num)
self._evaluator.scenario_repo = scenario_repo
LOG.info("reloading evaluator scenarios")
self._evaluator.scenario_repo.log_enabled_scenarios()

View File

@ -1,122 +0,0 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import multiprocessing
from oslo_log import log
from vitrage.common.constants import TemplateStatus as TStatus
from vitrage.common.constants import TemplateTypes as TType
from vitrage.common.exception import VitrageError
from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage.entity_graph.graph_clone import base
from vitrage.evaluator.actions.base import ActionMode
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
from vitrage.evaluator.scenario_repository import ScenarioRepository
from vitrage.messaging import VitrageNotifier
LOG = log.getLogger(__name__)
TEMPLATE_ACTION = 'template_action'
ADD = 'add'
DELETE = 'delete'
class TemplateLoaderManager(base.GraphCloneManagerBase):
def __init__(self, conf, entity_graph, db):
super(TemplateLoaderManager, self).__init__(conf, entity_graph, 1)
self._db = db
def _run_worker(self, worker_index, workers_num):
tasks_queue = multiprocessing.JoinableQueue()
w = TemplateLoaderWorker(
self._conf,
tasks_queue,
self._entity_graph)
self._p_launcher.launch_service(w)
return tasks_queue
def handle_template_event(self, event):
template_action = event.get(TEMPLATE_ACTION)
if template_action == ADD:
templates = self._db.templates.query(status=TStatus.LOADING)
new_status = TStatus.ACTIVE
action_mode = ActionMode.DO
elif template_action == DELETE:
templates = self._db.templates.query(status=TStatus.DELETING)
new_status = TStatus.DELETED
action_mode = ActionMode.UNDO
else:
raise VitrageError('Invalid template_action %s' % template_action)
self._template_worker_task(
[t.name for t in templates if t.template_type == TType.STANDARD],
action_mode)
for t in templates:
self._db.templates.update(t.uuid, 'status', new_status)
def _template_worker_task(self, template_names, action_mode):
self._notify_and_wait((TEMPLATE_ACTION, template_names, action_mode))
class TemplateLoaderWorker(base.GraphCloneWorkerBase):
def __init__(self,
conf,
task_queue,
e_graph):
super(TemplateLoaderWorker, self).__init__(conf, task_queue, e_graph)
self._evaluator = None
def start(self):
super(TemplateLoaderWorker, self).start()
actions_callback = VitrageNotifier(
conf=self._conf,
publisher_id='vitrage_evaluator',
topics=[EVALUATOR_TOPIC]).notify
self._evaluator = ScenarioEvaluator(
self._conf,
self._entity_graph,
None,
actions_callback,
enabled=False)
def do_task(self, task):
super(TemplateLoaderWorker, self).do_task(task)
action = task[0]
if action == TEMPLATE_ACTION:
(action, template_names, action_mode) = task
self._template_action(template_names, action_mode)
def _template_action(self, template_names, action_mode):
self._enable_evaluator_templates(template_names)
self._evaluator.run_evaluator(action_mode)
self._disable_evaluator()
def _enable_evaluator_templates(self, template_names):
scenario_repo = ScenarioRepository(self._conf)
for s in scenario_repo._all_scenarios:
s.enabled = False
for template_name in template_names:
if s.id.startswith(template_name):
s.enabled = True
self._evaluator.scenario_repo = scenario_repo
self._evaluator.scenario_repo.log_enabled_scenarios()
self._evaluator.enabled = True
def _disable_evaluator(self):
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
self._evaluator.enabled = False

View File

@ -35,6 +35,7 @@ class GraphPersistor(object):
self.last_event_timestamp = datetime.datetime.utcnow()
def store_graph(self, graph):
LOG.info('Graph persistency running..')
try:
graph_snapshot = graph.write_gpickle()
db_row = models.GraphSnapshot(

View File

@ -14,7 +14,6 @@
from datetime import timedelta
import time
import unittest
from oslo_config import cfg
from six.moves import queue
@ -30,9 +29,6 @@ from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE
from vitrage.entity_graph.consistency.consistency_enforcer \
import ConsistencyEnforcer
from vitrage.entity_graph.processor.processor import Processor
from vitrage.entity_graph.vitrage_init import VitrageInit
from vitrage.evaluator.actions.evaluator_event_transformer \
import VITRAGE_DATASOURCE
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
from vitrage.evaluator.scenario_repository import ScenarioRepository
from vitrage.graph.driver.networkx_graph import NXGraph
@ -81,9 +77,7 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration):
cls.add_db(cls.conf)
cls.load_datasources(cls.conf)
cls.graph = NXGraph("Entity Graph")
cls.initialization_status = VitrageInit(cls.conf, cls.graph)
cls.processor = Processor(cls.conf, cls.initialization_status,
cls.graph)
cls.processor = Processor(cls.conf, lambda x: x, cls.graph)
cls.event_queue = queue.Queue()
@ -104,68 +98,8 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration):
actions_callback)
cls.consistency_enforcer = ConsistencyEnforcer(
cls.conf,
actions_callback,
cls.processor.entity_graph)
@unittest.skip("test_initializing_process skipping")
def test_initializing_process(self):
# Setup
num_of_host_alarms = self.NUM_HOSTS - 2
num_instances_per_host = 4
self._create_processor_with_graph(self.conf, processor=self.processor)
self._add_alarms()
self._set_end_messages()
self.assertThat(self.processor.entity_graph.get_vertices(),
matchers.HasLength(
self._num_total_expected_vertices() +
num_of_host_alarms + self.NUM_INSTANCES)
)
# Action
# eventlet.spawn(self._process_events)
# processor_thread = threading.Thread(target=self._process_events)
# processor_thread.start()
self.initialization_status.initializing_process(
self.processor.on_recieved_all_end_messages)
self._process_events()
# Test Assertions
num_correct_alarms = num_of_host_alarms + \
num_of_host_alarms * num_instances_per_host
num_undeleted_vertices_in_graph = \
len(self.processor.entity_graph.get_vertices(vertex_attr_filter={
VProps.VITRAGE_IS_DELETED: False
}))
self.assertEqual(self._num_total_expected_vertices() +
num_correct_alarms,
num_undeleted_vertices_in_graph)
alarm_vertices_in_graph = self.processor.entity_graph.get_vertices({
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
VProps.VITRAGE_IS_DELETED: False
})
self.assertThat(alarm_vertices_in_graph,
matchers.HasLength(num_correct_alarms))
is_deleted_alarm_vertices_in_graph = \
self.processor.entity_graph.get_vertices({
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
VProps.VITRAGE_IS_DELETED: True
})
self.assertEqual(is_deleted_alarm_vertices_in_graph,
matchers.HasLength(
num_of_host_alarms * num_instances_per_host)
)
instance_vertices = self.processor.entity_graph.get_vertices({
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
VProps.VITRAGE_TYPE: VITRAGE_DATASOURCE,
VProps.VITRAGE_IS_DELETED: False
})
self.assertThat(instance_vertices,
matchers.HasLength(
num_of_host_alarms * num_instances_per_host)
)
cls.processor.entity_graph,
actions_callback)
def test_periodic_process(self):
# Setup
@ -234,8 +168,6 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration):
self.initialization_status.end_messages[NOVA_INSTANCE_DATASOURCE] = \
True
self.initialization_status.end_messages[NAGIOS_DATASOURCE] = True
self.initialization_status.status = \
self.initialization_status.RECEIVED_ALL_END_MESSAGES
def _add_alarms(self):
# find hosts and instances

View File

@ -25,7 +25,6 @@ from vitrage.datasources.nova.host import NOVA_HOST_DATASOURCE
from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE
from vitrage.entity_graph.processor import processor as proc
from vitrage.entity_graph.vitrage_init import VitrageInit
from vitrage.graph.driver.networkx_graph import NXGraph
import vitrage.graph.utils as graph_utils
from vitrage.opts import register_opts
@ -128,8 +127,7 @@ class TestEntityGraphUnitBase(base.BaseTest):
@staticmethod
def create_processor_and_graph(conf):
e_graph = NXGraph("Entity Graph")
init = VitrageInit(conf)
return proc.Processor(conf, init, e_graph)
return proc.Processor(conf, e_graph=e_graph)
@staticmethod
def _create_event(spec_type=None,

View File

@ -15,7 +15,7 @@
from oslo_config import cfg
from vitrage.common.constants import VertexProperties as VProps
from vitrage.entity_graph import transformer_manager
from vitrage.entity_graph.processor import transformer_manager
from vitrage.graph import driver as graph
from vitrage.tests.unit.entity_graph.base import TestEntityGraphUnitBase

View File

@ -13,15 +13,15 @@
# under the License.
import threading
from vitrage.entity_graph.service import TwoPriorityListener
from vitrage.entity_graph.graph_init import EventsCoordination
from vitrage.tests import base
class TwoPriorityListenerTest(base.BaseTest):
class EventsCoordinationTest(base.BaseTest):
@classmethod
def setUpClass(cls):
super(TwoPriorityListenerTest, cls).setUpClass()
super(EventsCoordinationTest, cls).setUpClass()
cls.calc_result = 0
def do_work(self, x):
@ -39,7 +39,7 @@ class TwoPriorityListenerTest(base.BaseTest):
the result should be the number of low priority calls.
0*(2^n) + 1*n
"""
priority_listener = TwoPriorityListener(None, self.do_work, None, None)
priority_listener = EventsCoordination(None, self.do_work, None, None)
def write_high():
for i in range(10000):

View File

@ -22,7 +22,8 @@ from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
from vitrage.datasources.nova.instance.transformer import InstanceTransformer
from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE
from vitrage.datasources.nova.zone.transformer import ZoneTransformer
from vitrage.entity_graph.transformer_manager import TransformerManager
from vitrage.entity_graph.processor.transformer_manager import\
TransformerManager
from vitrage.opts import register_opts
from vitrage.tests import base