diff --git a/vitrage/evaluator/evaluator_base.py b/vitrage/entity_graph/graph_clone/__init__.py similarity index 59% rename from vitrage/evaluator/evaluator_base.py rename to vitrage/entity_graph/graph_clone/__init__.py index 06281ed3e..77a9f1ece 100644 --- a/vitrage/evaluator/evaluator_base.py +++ b/vitrage/entity_graph/graph_clone/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2017 - Nokia +# Copyright 2018 - Nokia # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -11,19 +11,4 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import abc -import six - - -@six.add_metaclass(abc.ABCMeta) -class EvaluatorBase(object): - - def __init__(self, conf, entity_graph): - super(EvaluatorBase, self).__init__() - self._conf = conf - self._entity_graph = entity_graph - - @abc.abstractmethod - def run_evaluator(self): - """Start evaluation """ - pass +__author__ = 'stack' diff --git a/vitrage/entity_graph/graph_clone/base.py b/vitrage/entity_graph/graph_clone/base.py new file mode 100644 index 000000000..50584258e --- /dev/null +++ b/vitrage/entity_graph/graph_clone/base.py @@ -0,0 +1,126 @@ +# Copyright 2018 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import abc +import time + +from oslo_log import log +from oslo_service import service as os_service + +LOG = log.getLogger(__name__) + + +GRAPH_UPDATE = 'graph_update' +POISON_PILL = None + + +class GraphCloneManagerBase(object): + + def __init__(self, conf, entity_graph, worker_num): + self._conf = conf + self._entity_graph = entity_graph + self._workers_num = worker_num + self._worker_queues = list() + self._p_launcher = os_service.ProcessLauncher(conf) + + def start(self): + LOG.info('%s start %s processes', self.__class__.__name__, + self._workers_num) + for i in range(self._workers_num): + worker_queue = self._run_worker(i, self._workers_num) + self._worker_queues.append(worker_queue) + self.before_subscribe() + self._entity_graph.subscribe(self.notify_graph_update) + + @abc.abstractmethod + def _run_worker(self, worker_index, workers_num): + raise NotImplementedError + + @abc.abstractmethod + def before_subscribe(self): + pass + + def notify_graph_update(self, before, current, is_vertex, *args, **kwargs): + """Notify all workers + + This method is subscribed to entity graph changes. + Per each change in the main entity graph, this method will notify + each of the evaluators, causing them to update their own graph. + """ + self._notify_and_wait((GRAPH_UPDATE, before, current, is_vertex)) + + def _notify_and_wait(self, payload): + for q in self._worker_queues: + q.put(payload) + time.sleep(0) # context switch before join + for q in self._worker_queues: + q.join() + + def stop_all_workers(self): + self._notify_and_wait(POISON_PILL) + for q in self._worker_queues: + q.close() + self._worker_queues = list() + + +class GraphCloneWorkerBase(os_service.Service): + def __init__(self, + conf, + task_queue, + entity_graph): + super(GraphCloneWorkerBase, self).__init__() + self._conf = conf + self._task_queue = task_queue + self._entity_graph = entity_graph + + def start(self): + super(GraphCloneWorkerBase, self).start() + self._entity_graph.notifier._subscriptions = [] # Quick n dirty + self.tg.add_thread(self._read_queue) + LOG.info("%s - Started!", self.__class__.__name__) + + def _read_queue(self): + while True: + next_task = self._task_queue.get() + if next_task is POISON_PILL: + self._task_queue.task_done() + break + try: + self.do_task(next_task) + except Exception as e: + LOG.exception("Graph may not be in sync: exception %s", e) + self._task_queue.task_done() + # Evaluator queue may have been updated, thus the sleep: + time.sleep(0) + + def do_task(self, task): + action = task[0] + if action == GRAPH_UPDATE: + (action, before, current, is_vertex) = task + self._graph_update(before, current, is_vertex) + + def _graph_update(self, before, current, is_vertex): + if current: + if is_vertex: + self._entity_graph.add_vertex(current) + else: + self._entity_graph.add_edge(current) + else: + if is_vertex: + self._entity_graph.delete_vertex(before) + else: + self._entity_graph.delete_edge(before) + + def stop(self, graceful=False): + super(GraphCloneWorkerBase, self).stop(graceful) + LOG.info("%s - Stopped!", self.__class__.__name__) diff --git a/vitrage/entity_graph/vitrage_init.py b/vitrage/entity_graph/vitrage_init.py index 809a202e2..385e14ba9 100644 --- a/vitrage/entity_graph/vitrage_init.py +++ b/vitrage/entity_graph/vitrage_init.py @@ -43,7 +43,7 @@ class VitrageInit(object): on_end_messages_func() - self.evaluator.run_evaluator() + self.evaluator.start() # TODO(idan_hefetz) As vitrage is not yet persistent, there aren't # TODO(idan_hefetz) any deduced alarms to be removed during init diff --git a/vitrage/evaluator/evaluator_service.py b/vitrage/evaluator/evaluator_service.py index 0ece2c95b..d4c6ef49d 100644 --- a/vitrage/evaluator/evaluator_service.py +++ b/vitrage/evaluator/evaluator_service.py @@ -13,157 +13,90 @@ # under the License. import multiprocessing -import time from oslo_concurrency import processutils from oslo_log import log -from oslo_service import service as os_service from vitrage.entity_graph import EVALUATOR_TOPIC -from vitrage.evaluator.evaluator_base import EvaluatorBase - +from vitrage.entity_graph.graph_clone import base from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator from vitrage.evaluator.scenario_repository import ScenarioRepository from vitrage.messaging import VitrageNotifier LOG = log.getLogger(__name__) - START_EVALUATION = 'start_evaluation' -POISON_PILL = None +RELOAD_TEMPLATES = 'reload_templates' -class EvaluatorManager(EvaluatorBase): +class EvaluatorManager(base.GraphCloneManagerBase): def __init__(self, conf, entity_graph): - super(EvaluatorManager, self).__init__(conf, entity_graph) - self._workers_num = conf.evaluator.workers or \ - processutils.get_worker_count() - self._worker_queues = list() - self._p_launcher = os_service.ProcessLauncher(conf) + workers_num = conf.evaluator.workers or processutils.get_worker_count() + super(EvaluatorManager, self).__init__(conf, entity_graph, workers_num) - def run_evaluator(self): - LOG.info('Starting %s Evaluator Processes', str(self._workers_num)) - for i in range(self._workers_num): - self._add_worker(enabled=False) - self._notify_all(None, None, None, evaluator_action=START_EVALUATION) - self._entity_graph.subscribe(self._notify_all) + def before_subscribe(self): + self.start_evaluations() - def _add_worker(self, enabled=False): + def _run_worker(self, worker_index, workers_num): """Create an EvaluatorWorker and it's task queue The new worker is initialized with a scenario repository that only contains a portion of the templates """ - scenario_repo = ScenarioRepository( - self._conf, - len(self._worker_queues), - self._workers_num) + tasks_queue = multiprocessing.JoinableQueue() w = EvaluatorWorker( self._conf, tasks_queue, self._entity_graph, - scenario_repo, - enabled) + worker_index, + workers_num) self._p_launcher.launch_service(w) - self._worker_queues.append(tasks_queue) + return tasks_queue - def _notify_all(self, before, current, is_vertex, *args, **kwargs): - """Notify all workers + def start_evaluations(self): + self._notify_and_wait((START_EVALUATION,)) - This method is subscribed to entity graph changes. - Per each change in the main entity graph, this method will notify - each of the evaluators, causing them to update their own graph. - """ - evaluator_action = kwargs.get('evaluator_action', None) - self._notify_and_wait((before, current, is_vertex, evaluator_action)) - - def _notify_and_wait(self, payload): - for q in self._worker_queues: - q.put(payload) - time.sleep(0) # context switch before join - for q in self._worker_queues: - q.join() - - def stop_all_workers(self): - self._notify_and_wait(POISON_PILL) - for q in self._worker_queues: - q.close() - self._worker_queues = list() - - def reload_all_workers(self, enabled=True): - self.stop_all_workers() - for i in range(self._workers_num): - self._add_worker(enabled=enabled) + def reload_evaluators_templates(self): + self._notify_and_wait((RELOAD_TEMPLATES,)) -class EvaluatorWorker(os_service.Service): +class EvaluatorWorker(base.GraphCloneWorkerBase): def __init__(self, conf, task_queue, - entity_graph, - scenario_repo, - enabled=False): - super(EvaluatorWorker, self).__init__() - self._conf = conf - self._task_queue = task_queue - self._entity_graph = entity_graph - self._scenario_repo = scenario_repo - self._enabled = enabled + e_graph, + worker_index, + workers_num): + super(EvaluatorWorker, self).__init__(conf, task_queue, e_graph) + self._worker_index = worker_index + self._workers_num = workers_num self._evaluator = None def start(self): super(EvaluatorWorker, self).start() + scenario_repo = ScenarioRepository(self._conf, self._worker_index, + self._workers_num) actions_callback = VitrageNotifier( conf=self._conf, publisher_id='vitrage_evaluator', topic=EVALUATOR_TOPIC).notify - self._entity_graph.notifier._subscriptions = [] # Quick n dirty self._evaluator = ScenarioEvaluator( self._conf, self._entity_graph, - self._scenario_repo, + scenario_repo, actions_callback, - self._enabled) - self.tg.add_thread(self._read_queue) - LOG.info("EvaluatorWorkerService - Started!") + enabled=False) self._evaluator.scenario_repo.log_enabled_scenarios() - def _read_queue(self): - while True: - next_task = self._task_queue.get() - if next_task is POISON_PILL: - self._task_queue.task_done() - break - try: - self._do_task(next_task) - except Exception as e: - LOG.exception("Graph may not be in sync: exception %s", e) - self._task_queue.task_done() - # Evaluator queue may have been updated, thus the sleep: - time.sleep(0) + def do_task(self, task): + super(EvaluatorWorker, self).do_task(task) + action = task[0] + if action == START_EVALUATION: + self._evaluator.run_evaluator() + elif action == RELOAD_TEMPLATES: + self._reload_templates() - def _do_task(self, task): - (before, current, is_vertex, action) = task - if not action: - self._graph_update(before, current, is_vertex) - elif action == START_EVALUATION: - self._evaluator.run_evaluator() - - def _graph_update(self, before, current, is_vertex): - if current: - if is_vertex: - self._entity_graph.add_vertex(current) - else: - self._entity_graph.add_edge(current) - else: - if is_vertex: - self._entity_graph.delete_vertex(before) - else: - self._entity_graph.delete_edge(before) - - def stop(self, graceful=False): - super(EvaluatorWorker, self).stop(graceful) - self.tg.stop() - LOG.info("EvaluatorWorkerService - Stopped!") + def _reload_templates(self): + raise NotImplementedError() diff --git a/vitrage/evaluator/scenario_evaluator.py b/vitrage/evaluator/scenario_evaluator.py index bde2b7356..408f6b367 100644 --- a/vitrage/evaluator/scenario_evaluator.py +++ b/vitrage/evaluator/scenario_evaluator.py @@ -28,7 +28,6 @@ from vitrage.evaluator.actions.action_executor import ActionExecutor from vitrage.evaluator.actions.base import ActionMode from vitrage.evaluator.actions.base import ActionType import vitrage.evaluator.actions.priority_tools as pt -from vitrage.evaluator.evaluator_base import EvaluatorBase from vitrage.evaluator.template_data import ActionSpecs from vitrage.evaluator.template_data import EdgeDescription from vitrage.graph.algo_driver.algorithm import Mapping @@ -52,7 +51,7 @@ TARGET = 'target' SOURCE = 'source' -class ScenarioEvaluator(EvaluatorBase): +class ScenarioEvaluator(object): def __init__(self, conf, @@ -60,7 +59,8 @@ class ScenarioEvaluator(EvaluatorBase): scenario_repo, actions_callback, enabled=False): - super(ScenarioEvaluator, self).__init__(conf, e_graph) + self._conf = conf + self._entity_graph = e_graph self._db_connection = storage.get_connection_from_config(self._conf) self._scenario_repo = scenario_repo self._action_executor = ActionExecutor(self._conf, actions_callback)