diff --git a/vitrage/api_handler/service.py b/vitrage/api_handler/service.py index 79850892e..4eefdd0a2 100644 --- a/vitrage/api_handler/service.py +++ b/vitrage/api_handler/service.py @@ -16,6 +16,7 @@ from oslo_log import log import oslo_messaging from oslo_service import service as os_service +from vitrage.common.utils import spawn from vitrage.entity_graph import EVALUATOR_TOPIC from vitrage.messaging import VitrageNotifier @@ -44,6 +45,9 @@ class VitrageApiHandlerService(os_service.Service): self.db = storage.get_connection_from_config(conf) def start(self): + spawn(self._start) + + def _start(self): LOG.info("Vitrage Api Handler Service - Starting...") super(VitrageApiHandlerService, self).start() diff --git a/vitrage/cli/graph.py b/vitrage/cli/graph.py index 328c3401d..969302dae 100644 --- a/vitrage/cli/graph.py +++ b/vitrage/cli/graph.py @@ -15,43 +15,25 @@ import sys -from oslo_service import service as os_service - from vitrage.api_handler.service import VitrageApiHandlerService from vitrage.cli import VITRAGE_TITLE -from vitrage import entity_graph -from vitrage.entity_graph.consistency.service import VitrageConsistencyService +from vitrage.entity_graph import get_graph_driver +from vitrage.entity_graph.graph_init import VitrageGraphInit from vitrage import service from vitrage import storage -from vitrage.entity_graph.service import VitrageGraphService -from vitrage.evaluator.evaluator_service import EvaluatorManager - def main(): - """Starts all the Entity graph services - - 1. Starts the Entity graph service - 2. Starts the api_handler service - 3. Starts the Consistency service - """ + """Main method of vitrage-graph""" print(VITRAGE_TITLE) conf = service.prepare_service() - e_graph = entity_graph.get_graph_driver(conf)('Entity Graph') - evaluator = EvaluatorManager(conf, e_graph) - launcher = os_service.ServiceLauncher(conf) + e_graph = get_graph_driver(conf)('Entity Graph') db_connection = storage.get_connection_from_config(conf) clear_active_actions_table(db_connection) - launcher.launch_service(VitrageGraphService( - conf, e_graph, evaluator, db_connection)) - - launcher.launch_service(VitrageApiHandlerService(conf, e_graph)) - - launcher.launch_service(VitrageConsistencyService(conf, e_graph)) - - launcher.wait() + VitrageApiHandlerService(conf, e_graph).start() + VitrageGraphInit(conf, e_graph, db_connection).run() def clear_active_actions_table(db_connection): @@ -62,5 +44,6 @@ def clear_active_actions_table(db_connection): """ db_connection.active_actions.delete() + if __name__ == "__main__": sys.exit(main()) diff --git a/vitrage/common/utils.py b/vitrage/common/utils.py index 029510711..9d57aa7e9 100644 --- a/vitrage/common/utils.py +++ b/vitrage/common/utils.py @@ -20,6 +20,7 @@ from collections import defaultdict import copy import itertools import random +import threading from oslo_config import cfg @@ -79,3 +80,10 @@ def get_portion(lst, num_of_portions, portion_index): curr_portion = next(g) portions[curr_portion].append(curr_item) return portions[portion_index] + + +def spawn(target, *args, **kwargs): + t = threading.Thread(target=target, args=args, kwargs=kwargs) + t.daemon = True + t.start() + return t diff --git a/vitrage/entity_graph/consistency/consistency_enforcer.py b/vitrage/entity_graph/consistency/consistency_enforcer.py index a50401982..5a1e4edcd 100644 --- a/vitrage/entity_graph/consistency/consistency_enforcer.py +++ b/vitrage/entity_graph/consistency/consistency_enforcer.py @@ -24,8 +24,10 @@ from vitrage.common.constants import GraphAction from vitrage.common.constants import VertexProperties as VProps from vitrage.datasources.consistency import CONSISTENCY_DATASOURCE from vitrage.datasources import OPENSTACK_CLUSTER +from vitrage.entity_graph import EVALUATOR_TOPIC from vitrage.evaluator.actions.evaluator_event_transformer \ import VITRAGE_DATASOURCE +from vitrage.messaging import VitrageNotifier from vitrage.utils.datetime import utcnow LOG = log.getLogger(__name__) @@ -35,10 +37,11 @@ class ConsistencyEnforcer(object): def __init__(self, conf, - actions_callback, - entity_graph): + entity_graph, + actions_callback=None): self.conf = conf - self.actions_callback = actions_callback + self.actions_callback = actions_callback or VitrageNotifier( + conf, 'vitrage_consistency', [EVALUATOR_TOPIC]).notify self.graph = entity_graph def periodic_process(self): diff --git a/vitrage/entity_graph/consistency/service.py b/vitrage/entity_graph/consistency/service.py deleted file mode 100644 index 6e4b7ac3c..000000000 --- a/vitrage/entity_graph/consistency/service.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright 2016 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_log import log -from oslo_service import service as os_service - -from vitrage.entity_graph.consistency.consistency_enforcer \ - import ConsistencyEnforcer -from vitrage.entity_graph import EVALUATOR_TOPIC -from vitrage.messaging import VitrageNotifier - -LOG = log.getLogger(__name__) - - -class VitrageConsistencyService(os_service.Service): - - def __init__(self, - conf, - entity_graph): - super(VitrageConsistencyService, self).__init__() - self.conf = conf - self.entity_graph = entity_graph - self.actions_notifier = VitrageNotifier( - conf, 'vitrage_consistency', [EVALUATOR_TOPIC]) - - def start(self): - LOG.info("Vitrage Consistency Service - Starting...") - - super(VitrageConsistencyService, self).start() - - consistency_enf = ConsistencyEnforcer( - conf=self.conf, - actions_callback=self.actions_notifier.notify, - entity_graph=self.entity_graph) - self.tg.add_timer(self.conf.datasources.snapshots_interval, - consistency_enf.periodic_process, - initial_delay=60 + - self.conf.datasources.snapshots_interval) - - LOG.info("Vitrage Consistency Service - Started!") - - def stop(self, graceful=False): - LOG.info("Vitrage Consistency Service - Stopping...") - - super(VitrageConsistencyService, self).stop() - - LOG.info("Vitrage Consistency Service - Stopped!") diff --git a/vitrage/entity_graph/graph_clone/__init__.py b/vitrage/entity_graph/graph_clone/__init__.py deleted file mode 100644 index 77a9f1ece..000000000 --- a/vitrage/entity_graph/graph_clone/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright 2018 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -__author__ = 'stack' diff --git a/vitrage/entity_graph/graph_clone/base.py b/vitrage/entity_graph/graph_clone/base.py deleted file mode 100644 index 8929a5f13..000000000 --- a/vitrage/entity_graph/graph_clone/base.py +++ /dev/null @@ -1,137 +0,0 @@ -# Copyright 2018 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import abc -import setproctitle -import time - -from oslo_log import log -from oslo_service import service as os_service - -LOG = log.getLogger(__name__) - - -GRAPH_UPDATE = 'graph_update' -POISON_PILL = None - - -class GraphCloneManagerBase(object): - - def __init__(self, conf, entity_graph, worker_num): - self._conf = conf - self._entity_graph = entity_graph - self._workers_num = worker_num - self._worker_queues = list() - self._p_launcher = os_service.ProcessLauncher(conf) - - def start(self): - LOG.info('%s start %s processes', self.__class__.__name__, - self._workers_num) - for i in range(self._workers_num): - worker_queue = self._run_worker(i, self._workers_num) - self._worker_queues.append(worker_queue) - self.before_subscribe() - self._entity_graph.subscribe(self.notify_graph_update) - - @abc.abstractmethod - def _run_worker(self, worker_index, workers_num): - raise NotImplementedError - - @abc.abstractmethod - def before_subscribe(self): - pass - - def notify_graph_update(self, before, current, is_vertex, *args, **kwargs): - """Notify all workers - - This method is subscribed to entity graph changes. - Per each change in the main entity graph, this method will notify - each of the evaluators, causing them to update their own graph. - """ - self._notify_and_wait((GRAPH_UPDATE, before, current, is_vertex)) - - def _notify_and_wait(self, payload): - for q in self._worker_queues: - q.put(payload) - time.sleep(0) # context switch before join - for q in self._worker_queues: - q.join() - - def stop_all_workers(self): - self._notify_and_wait(POISON_PILL) - for q in self._worker_queues: - q.close() - self._worker_queues = list() - - -class GraphCloneWorkerBase(os_service.Service): - def __init__(self, - conf, - task_queue, - entity_graph): - super(GraphCloneWorkerBase, self).__init__() - self._conf = conf - self._task_queue = task_queue - self._entity_graph = entity_graph - - def name(self): - return '' - - def start(self): - super(GraphCloneWorkerBase, self).start() - try: - setproctitle.setproctitle('{} {} {}'.format( - 'vitrage-graph', - self.__class__.__name__, - self.name())) - except Exception: - LOG.warning('failed to set process name') - self._entity_graph.notifier._subscriptions = [] # Quick n dirty - self.tg.add_thread(self._read_queue) - LOG.info("%s - Started!", self.__class__.__name__) - - def _read_queue(self): - while True: - try: - next_task = self._task_queue.get() - if next_task is POISON_PILL: - self._task_queue.task_done() - break - self.do_task(next_task) - except Exception as e: - LOG.exception("Graph may not be in sync: exception %s", e) - self._task_queue.task_done() - # Evaluator queue may have been updated, thus the sleep: - time.sleep(0) - - def do_task(self, task): - action = task[0] - if action == GRAPH_UPDATE: - (action, before, current, is_vertex) = task - self._graph_update(before, current, is_vertex) - - def _graph_update(self, before, current, is_vertex): - if current: - if is_vertex: - self._entity_graph.add_vertex(current) - else: - self._entity_graph.add_edge(current) - else: - if is_vertex: - self._entity_graph.delete_vertex(before) - else: - self._entity_graph.delete_edge(before) - - def stop(self, graceful=False): - super(GraphCloneWorkerBase, self).stop(graceful) - LOG.info("%s - Stopped!", self.__class__.__name__) diff --git a/vitrage/entity_graph/service.py b/vitrage/entity_graph/graph_init.py similarity index 54% rename from vitrage/entity_graph/service.py rename to vitrage/entity_graph/graph_init.py index e5f26784a..cfeb0e5e8 100644 --- a/vitrage/entity_graph/service.py +++ b/vitrage/entity_graph/graph_init.py @@ -1,99 +1,84 @@ -# Copyright 2015 - Alcatel-Lucent +# Copyright 2018 - Nokia # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import oslo_messaging import threading import time from oslo_log import log -from oslo_service import service as os_service +import oslo_messaging +from vitrage.common.utils import spawn from vitrage.entity_graph import EVALUATOR_TOPIC from vitrage.entity_graph.processor.processor import Processor -from vitrage.entity_graph.vitrage_init import VitrageInit -from vitrage.evaluator.template_loader_service import TemplateLoaderManager +from vitrage.entity_graph.scheduler import Scheduler +from vitrage.entity_graph.workers import GraphWorkersManager from vitrage import messaging -from vitrage.persistency.graph_persistor import GraphPersistor LOG = log.getLogger(__name__) -class VitrageGraphService(os_service.Service): - - def __init__(self, - conf, - graph, - evaluator, - db): - super(VitrageGraphService, self).__init__() +class VitrageGraphInit(object): + def __init__(self, conf, graph, db_connection): self.conf = conf - self.graph = graph - self.evaluator = evaluator - self.templates_loader = TemplateLoaderManager(conf, graph, db) - self.init = VitrageInit(conf, graph, self.evaluator, - self.templates_loader) - self.graph_persistor = GraphPersistor(conf) if \ - self.conf.persistency.enable_persistency else None - self.processor = Processor(self.conf, self.init, graph, - self.graph_persistor) - self.listener = self._init_listener() - - def _init_listener(self): - collector_topic = self.conf.datasources.notification_topic_collector - evaluator_topic = EVALUATOR_TOPIC - return TwoPriorityListener( - self.conf, + self.workers = GraphWorkersManager(conf, graph, db_connection) + self.events_coordination = EventsCoordination( + conf, self.process_event, - collector_topic, - evaluator_topic) + conf.datasources.notification_topic_collector, + EVALUATOR_TOPIC) + self.end_messages = {} + self.scheduler = Scheduler(conf, graph) + self.processor = Processor(conf, + self._handle_end_message, + graph, + self.scheduler.graph_persistor) + + def run(self): + LOG.info('Init Started') + self.events_coordination.start() + self._wait_for_all_end_messages() + self.scheduler.start_periodic_tasks() + self.processor.start_notifier() + spawn(self.workers.submit_start_evaluations) + self.workers.run() def process_event(self, event): if event.get('template_action'): - self.templates_loader.handle_template_event(event) - self.evaluator.reload_evaluators_templates() + self.workers.submit_template_event(event) + self.workers.submit_evaluators_reload_templates() else: self.processor.process_event(event) - def start(self): - LOG.info("Vitrage Graph Service - Starting...") - super(VitrageGraphService, self).start() - if self.graph_persistor: - self.tg.add_timer( - self.conf.persistency.graph_persistency_interval, - self.graph_persistor.store_graph, - self.conf.persistency.graph_persistency_interval, - graph=self.graph) - self.tg.add_thread( - self.init.initializing_process, - on_end_messages_func=self.processor.on_recieved_all_end_messages) - self.listener.start() - LOG.info("Vitrage Graph Service - Started!") + def _handle_end_message(self, vitrage_type): + self.end_messages[vitrage_type] = True - def stop(self, graceful=False): - LOG.info("Vitrage Graph Service - Stopping...") - self.evaluator.stop_all_workers() - self.templates_loader.stop_all_workers() - self.listener.stop() - self.listener.wait() - super(VitrageGraphService, self).stop(graceful) - - LOG.info("Vitrage Graph Service - Stopped!") + def _wait_for_all_end_messages(self): + start = time.time() + timeout = self.conf.consistency.initialization_max_retries * \ + self.conf.consistency.initialization_interval + while time.time() < start + timeout: + if len(self.end_messages) == len(self.conf.datasources.types): + LOG.info('end messages received') + return True + time.sleep(0.2) + LOG.warning('Missing end messages %s', self.end_messages.keys()) + return False PRIORITY_DELAY = 0.05 -class TwoPriorityListener(object): +class EventsCoordination(object): def __init__(self, conf, do_work_func, topic_low, topic_high): self._conf = conf self._lock = threading.Lock() @@ -103,7 +88,7 @@ class TwoPriorityListener(object): try: return do_work_func(event) except Exception as e: - LOG.exception('Got Exception %s', e) + LOG.exception('Got Exception %s for event %s', e, str(event)) self._do_work_func = do_work @@ -114,7 +99,9 @@ class TwoPriorityListener(object): def start(self): self._high_pri_listener.start() + LOG.info('Listening on %s', self._high_pri_listener.targets[0].topic) self._low_pri_listener.start() + LOG.info('Listening on %s', self._low_pri_listener.targets[0].topic) def stop(self): self._low_pri_listener.stop() diff --git a/vitrage/entity_graph/processor/processor.py b/vitrage/entity_graph/processor/processor.py index c3460e3e0..7ad1f8463 100644 --- a/vitrage/entity_graph/processor/processor.py +++ b/vitrage/entity_graph/processor/processor.py @@ -23,7 +23,8 @@ from vitrage.entity_graph.mappings.datasource_info_mapper import \ from vitrage.entity_graph.processor import base as processor from vitrage.entity_graph.processor.notifier import GraphNotifier from vitrage.entity_graph.processor import processor_utils as PUtils -from vitrage.entity_graph.transformer_manager import TransformerManager +from vitrage.entity_graph.processor.transformer_manager import \ + TransformerManager from vitrage.graph import Direction LOG = log.getLogger(__name__) @@ -31,14 +32,14 @@ LOG = log.getLogger(__name__) class Processor(processor.ProcessorBase): - def __init__(self, conf, initialization_status, e_graph, + def __init__(self, conf, end_messages_func=None, e_graph=None, graph_persistor=None): super(Processor, self).__init__() self.conf = conf self.transformer_manager = TransformerManager(self.conf) self.info_mapper = DatasourceInfoMapper(self.conf) self._initialize_events_actions() - self.initialization_status = initialization_status + self.end_messages_func = end_messages_func self.entity_graph = e_graph self._notifier = GraphNotifier(conf) self._graph_persistor = graph_persistor @@ -189,9 +190,9 @@ class Processor(processor.ProcessorBase): vertex, graph_vertex) def handle_end_message(self, vertex, neighbors): - self.initialization_status.handle_end_message(vertex) + self.end_messages_func(vertex[VProps.VITRAGE_TYPE]) - def on_recieved_all_end_messages(self): + def start_notifier(self): if self._notifier and self._notifier.enabled: self.entity_graph.subscribe(self._notifier.notify_when_applicable) LOG.info('Graph notifications subscription added') diff --git a/vitrage/entity_graph/transformer_manager.py b/vitrage/entity_graph/processor/transformer_manager.py similarity index 100% rename from vitrage/entity_graph/transformer_manager.py rename to vitrage/entity_graph/processor/transformer_manager.py diff --git a/vitrage/entity_graph/scheduler.py b/vitrage/entity_graph/scheduler.py new file mode 100644 index 000000000..ba9fa8df2 --- /dev/null +++ b/vitrage/entity_graph/scheduler.py @@ -0,0 +1,73 @@ +# Copyright 2018 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from concurrent.futures import ThreadPoolExecutor +from futurist import periodics + +from oslo_log import log +from vitrage.common.utils import spawn + +from vitrage.entity_graph.consistency.consistency_enforcer import\ + ConsistencyEnforcer +from vitrage.persistency.graph_persistor import GraphPersistor + +LOG = log.getLogger(__name__) + + +class Scheduler(object): + + def __init__(self, + conf, + graph): + super(Scheduler, self).__init__() + self.conf = conf + self.graph = graph + self.graph_persistor = GraphPersistor(conf) if \ + self.conf.persistency.enable_persistency else None + self.consistency = ConsistencyEnforcer(conf, graph) + self.periodic = None + + def start_periodic_tasks(self): + self.periodic = periodics.PeriodicWorker.create( + [], executor_factory=lambda: ThreadPoolExecutor(max_workers=10)) + + self.add_persistor_timer() + self.add_consistency_timer() + spawn(self.periodic.start) + + def add_persistor_timer(self): + spacing = self.conf.persistency.graph_persistency_interval + + @periodics.periodic(spacing=spacing) + def persist(): + if self.graph_persistor: + try: + self.graph_persistor.store_graph(graph=self.graph) + except Exception as e: + LOG.exception('persist failed %s', e) + + self.periodic.add(persist) + LOG.info("periodic task - persistor %s", spacing) + + def add_consistency_timer(self): + spacing = self.conf.datasources.snapshots_interval + + @periodics.periodic(spacing=spacing) + def run_consistency(): + try: + self.consistency.periodic_process() + except Exception as e: + LOG.exception('run_consistency failed %s', e) + + self.periodic.add(run_consistency) + LOG.info("periodic task - run_consistency %s", spacing) diff --git a/vitrage/entity_graph/vitrage_init.py b/vitrage/entity_graph/vitrage_init.py deleted file mode 100644 index f5ea386f2..000000000 --- a/vitrage/entity_graph/vitrage_init.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright 2016 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from oslo_log import log -import time - -from vitrage.common.constants import VertexProperties as VProps - -LOG = log.getLogger(__name__) - - -class VitrageInit(object): - STARTED = 'started' - RECEIVED_ALL_END_MESSAGES = 'received_all_end_messages' - FINISHED = 'finished' - - def __init__(self, conf, graph=None, evaluator=None, template_loader=None): - self.conf = conf - self.graph = graph - self.evaluator = evaluator - self.template_loader = template_loader - self.status = self.STARTED - self.end_messages = {} - - def initializing_process(self, on_end_messages_func): - try: - LOG.info('Init Started') - - if not self._wait_for_all_end_messages(): - LOG.warning('Initialization - max retries reached %s', - self.end_messages) - else: - LOG.info('Initialization - All end messages were received') - - on_end_messages_func() - - self.evaluator.start() - if self.template_loader: - self.template_loader.start() - - # TODO(idan_hefetz) As vitrage is not yet persistent, there aren't - # TODO(idan_hefetz) any deduced alarms to be removed during init - # if not self._wait_for_action(self.evaluator_queue.empty): - # LOG.error('Evaluator Queue Not Empty') - # self._mark_old_deduced_alarms_as_deleted(timestamp, self.graph, - # self.evaluator_queue) - self.status = self.FINISHED - - LOG.info('Init Finished') - except Exception as e: - LOG.exception('Init Failed: %s', e) - - def handle_end_message(self, vertex): - self.end_messages[vertex[VProps.VITRAGE_TYPE]] = True - - if len(self.end_messages) == len(self.conf.datasources.types): - self.status = self.RECEIVED_ALL_END_MESSAGES - - def _wait_for_all_end_messages(self): - return self._wait_for_action( - lambda: self.status == self.RECEIVED_ALL_END_MESSAGES) - - def _wait_for_action(self, function): - count_retries = 0 - while True: - if count_retries >= \ - self.conf.consistency.initialization_max_retries: - return False - - if function(): - return True - - count_retries += 1 - time.sleep(self.conf.consistency.initialization_interval) - - # def _mark_old_deduced_alarms_as_deleted(self, timestamp,graph,out_queue): - # query = { - # 'and': [ - # {'==': {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}}, - # {'==': {VProps.VITRAGE_TYPE: VProps.VITRAGE_TYPE}}, - # {'<': {VProps.VITRAGE_SAMPLE_TIMESTAMP: timestamp}} - # ] - # } - # old_deduced_alarms = graph.get_vertices(query_dict=query) - # self._push_events_to_queue(old_deduced_alarms, - # GraphAction.DELETE_ENTITY, - # out_queue) - # - # def _push_events_to_queue(self, vertices, action, out_queue): - # for vertex in vertices: - # event = { - # DSProps.ENTITY_TYPE: CONSISTENCY_DATASOURCE, - # DSProps.DATASOURCE_ACTION: DatasourceAction.UPDATE, - # DSProps.SAMPLE_DATE: str(utcnow()), - # DSProps.EVENT_TYPE: action, - # VProps.VITRAGE_ID: vertex[VProps.VITRAGE_ID], - # VProps.ID: vertex.get(VProps.ID, None), - # VProps.VITRAGE_TYPE: vertex[VProps.VITRAGE_TYPE], - # VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY], - # VProps.IS_REAL_VITRAGE_ID: True - # } - # out_queue.put(event) diff --git a/vitrage/entity_graph/workers.py b/vitrage/entity_graph/workers.py new file mode 100644 index 000000000..b826ac203 --- /dev/null +++ b/vitrage/entity_graph/workers.py @@ -0,0 +1,325 @@ +# Copyright 2018 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import abc +import cotyledon +import multiprocessing + +from oslo_concurrency import processutils as ps +from oslo_log import log + +from vitrage.common.constants import TemplateStatus as TStatus +from vitrage.common.constants import TemplateTypes as TType +from vitrage.common.exception import VitrageError +from vitrage.entity_graph import EVALUATOR_TOPIC +from vitrage.evaluator.actions.base import ActionMode +from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator +from vitrage.evaluator.scenario_repository import ScenarioRepository +from vitrage.messaging import VitrageNotifier + +LOG = log.getLogger(__name__) + +# Supported message types +GRAPH_UPDATE = 'graph_update' +START_EVALUATION = 'start_evaluation' +RELOAD_TEMPLATES = 'reload_templates' +TEMPLATE_ACTION = 'template_action' + +ADD = 'add' +DELETE = 'delete' + + +class GraphWorkersManager(cotyledon.ServiceManager): + """GraphWorkersManager + + - worker processes + - the queues used to communicate with these workers + - methods interface to submit tasks to workers + """ + def __init__(self, conf, entity_graph, db): + super(GraphWorkersManager, self).__init__() + self._conf = conf + self._entity_graph = entity_graph + self._db = db + self._evaluator_queues = [] + self._template_queues = [] + self._all_queues = [] + self.add_evaluator_workers() + self.add_template_workers() + + def add_evaluator_workers(self): + """Add evaluator workers + + Evaluator workers receive all graph updates, hence are updated. + Each evaluator worker holds an enabled scenario-evaluator and process + every change. + Each worker's scenario-evaluator runs different template scenarios. + Interface to these workers is: + submit_graph_update(..) + submit_start_evaluations(..) + submit_evaluators_reload_templates(..) + """ + if self._evaluator_queues: + raise VitrageError('add_evaluator_workers called more than once') + workers = self._conf.evaluator.workers or ps.get_worker_count() + queues = [multiprocessing.JoinableQueue() for i in range(workers)] + self.add(EvaluatorWorker, + args=(self._conf, queues, self._entity_graph, workers), + workers=workers) + self._evaluator_queues = queues + self._all_queues.extend(queues) + + def add_template_workers(self): + """Add template workers + + Template workers receive all graph updates, hence are updated. + Each template worker holds a disabled scenario-evaluator that does + not process changes. + The scenario-evaluator is enabled when a template add/delete arrives, + so this worker will run the added template on the entire graph. + Interface to these workers is: + submit_graph_update(..) + submit_template_event(..) + """ + if self._template_queues: + raise VitrageError('add_template_workers called more than once') + workers = 1 # currently more than one worker is not supported + queues = [multiprocessing.JoinableQueue() for i in range(workers)] + self.add(TemplateLoaderWorker, + args=(self._conf, queues, self._entity_graph), + workers=workers) + self._template_queues = queues + self._all_queues.extend(queues) + + def submit_graph_update(self, before, current, is_vertex, *args, **kwargs): + """Graph update all workers + + This method is subscribed to entity graph changes. + Per each change in the main entity graph, this method will notify + each of the workers, causing them to update their own graph. + """ + self._submit_and_wait( + self._all_queues, + (GRAPH_UPDATE, before, current, is_vertex)) + + def submit_start_evaluations(self): + """Enable scenario-evaluator in all evaluator workers + + Enables the worker's scenario-evaluator, and run it on the entire graph + """ + self._submit_and_wait(self._evaluator_queues, (START_EVALUATION,)) + self._entity_graph.subscribe(self.submit_graph_update) + LOG.info('Init Finished') + + def submit_evaluators_reload_templates(self): + """Recreate the scenario-repository in all evaluator workers + + So that new/deleted templates are added/removed + """ + self._submit_and_wait(self._evaluator_queues, (RELOAD_TEMPLATES,)) + + def submit_template_event(self, event): + """Template worker to load the new/deleted template + + Load the template to scenario-evaluator and run it on the entire graph + """ + template_action = event.get(TEMPLATE_ACTION) + + if template_action == ADD: + templates = self._db.templates.query(status=TStatus.LOADING) + new_status = TStatus.ACTIVE + action_mode = ActionMode.DO + elif template_action == DELETE: + templates = self._db.templates.query(status=TStatus.DELETING) + new_status = TStatus.DELETED + action_mode = ActionMode.UNDO + else: + raise VitrageError('Invalid template_action %s' % template_action) + + self._submit_and_wait( + self._template_queues, + ( + TEMPLATE_ACTION, + [t.name for t in templates + if t.template_type == TType.STANDARD], + action_mode, + )) + + for t in templates: + self._db.templates.update(t.uuid, 'status', new_status) + + @staticmethod + def _submit_and_wait(queues, payload): + for q in queues: + q.put(payload) + for q in queues: + q.join() + + +class GraphCloneWorkerBase(cotyledon.Service): + def __init__(self, + worker_id, + conf, + task_queues, + entity_graph): + super(GraphCloneWorkerBase, self).__init__(worker_id) + self._conf = conf + self._task_queue = task_queues[worker_id] + self._entity_graph = entity_graph + self._running = False + + name = 'GraphCloneWorkerBase' + + @abc.abstractmethod + def _init_instance(self): + """This method is executed in the newly created process""" + raise NotImplementedError + + def run(self): + LOG.info("%s - Starting %s", self.__class__.__name__, self.worker_id) + self._running = True + self._entity_graph.notifier._subscriptions = [] # Quick n dirty + self._init_instance() + self._read_queue() + + def terminate(self): + self._running = False + LOG.info("%s - Stopped!", self.__class__.__name__) + + def _read_queue(self): + LOG.debug("%s - reading queue %s", + self.__class__.__name__, self.worker_id) + while self._running: + try: + next_task = self._task_queue.get() + self.do_task(next_task) + except Exception as e: + LOG.exception("Graph may not be in sync: exception %s", e) + self._task_queue.task_done() + + def do_task(self, task): + action = task[0] + if action == GRAPH_UPDATE: + (action, before, current, is_vertex) = task + self._graph_update(before, current, is_vertex) + + def _graph_update(self, before, current, is_vertex): + if current: + if is_vertex: + self._entity_graph.add_vertex(current) + else: + self._entity_graph.add_edge(current) + else: + if is_vertex: + self._entity_graph.delete_vertex(before) + else: + self._entity_graph.delete_edge(before) + + +class EvaluatorWorker(GraphCloneWorkerBase): + def __init__(self, + worker_id, + conf, + task_queues, + e_graph, + workers_num): + super(EvaluatorWorker, self).__init__( + worker_id, conf, task_queues, e_graph) + self._workers_num = workers_num + self._evaluator = None + + name = 'EvaluatorWorker' + + def _init_instance(self): + scenario_repo = ScenarioRepository(self._conf, self.worker_id, + self._workers_num) + actions_callback = VitrageNotifier( + conf=self._conf, + publisher_id='vitrage_evaluator', + topics=[EVALUATOR_TOPIC]).notify + self._evaluator = ScenarioEvaluator( + self._conf, + self._entity_graph, + scenario_repo, + actions_callback, + enabled=False) + self._evaluator.scenario_repo.log_enabled_scenarios() + + def do_task(self, task): + super(EvaluatorWorker, self).do_task(task) + action = task[0] + if action == START_EVALUATION: + self._evaluator.run_evaluator() + elif action == RELOAD_TEMPLATES: + self._reload_templates() + + def _reload_templates(self): + LOG.info("reloading evaluator scenarios") + scenario_repo = ScenarioRepository(self._conf, self.worker_id, + self._workers_num) + self._evaluator.scenario_repo = scenario_repo + self._evaluator.scenario_repo.log_enabled_scenarios() + + +class TemplateLoaderWorker(GraphCloneWorkerBase): + def __init__(self, + worker_id, + conf, + task_queues, + e_graph): + super(TemplateLoaderWorker, self).__init__(worker_id, + conf, + task_queues, + e_graph) + self._evaluator = None + + name = 'TemplateLoaderWorker' + + def _init_instance(self): + actions_callback = VitrageNotifier( + conf=self._conf, + publisher_id='vitrage_evaluator', + topics=[EVALUATOR_TOPIC]).notify + self._evaluator = ScenarioEvaluator( + self._conf, + self._entity_graph, + None, + actions_callback, + enabled=False) + + def do_task(self, task): + super(TemplateLoaderWorker, self).do_task(task) + action = task[0] + if action == TEMPLATE_ACTION: + (action, template_names, action_mode) = task + self._template_action(template_names, action_mode) + + def _template_action(self, template_names, action_mode): + self._enable_evaluator_templates(template_names) + self._evaluator.run_evaluator(action_mode) + self._disable_evaluator() + + def _enable_evaluator_templates(self, template_names): + scenario_repo = ScenarioRepository(self._conf) + for s in scenario_repo._all_scenarios: + s.enabled = False + for template_name in template_names: + if s.id.startswith(template_name): + s.enabled = True + self._evaluator.scenario_repo = scenario_repo + self._evaluator.scenario_repo.log_enabled_scenarios() + self._evaluator.enabled = True + + def _disable_evaluator(self): + self._entity_graph.notifier._subscriptions = [] # Quick n dirty + self._evaluator.enabled = False diff --git a/vitrage/evaluator/evaluator_service.py b/vitrage/evaluator/evaluator_service.py deleted file mode 100644 index bc9f8d16d..000000000 --- a/vitrage/evaluator/evaluator_service.py +++ /dev/null @@ -1,109 +0,0 @@ -# Copyright 2017 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import multiprocessing - -from oslo_concurrency import processutils -from oslo_log import log - -from vitrage.entity_graph import EVALUATOR_TOPIC -from vitrage.entity_graph.graph_clone import base -from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator -from vitrage.evaluator.scenario_repository import ScenarioRepository -from vitrage.messaging import VitrageNotifier - -LOG = log.getLogger(__name__) - -START_EVALUATION = 'start_evaluation' -RELOAD_TEMPLATES = 'reload_templates' - - -class EvaluatorManager(base.GraphCloneManagerBase): - - def __init__(self, conf, entity_graph): - workers_num = conf.evaluator.workers or processutils.get_worker_count() - super(EvaluatorManager, self).__init__(conf, entity_graph, workers_num) - - def before_subscribe(self): - self.start_evaluations() - - def _run_worker(self, worker_index, workers_num): - """Create an EvaluatorWorker and it's task queue - - The new worker is initialized with a scenario repository - that only contains a portion of the templates - """ - - tasks_queue = multiprocessing.JoinableQueue() - w = EvaluatorWorker( - self._conf, - tasks_queue, - self._entity_graph, - worker_index, - workers_num) - self._p_launcher.launch_service(w) - return tasks_queue - - def start_evaluations(self): - self._notify_and_wait((START_EVALUATION,)) - - def reload_evaluators_templates(self): - self._notify_and_wait((RELOAD_TEMPLATES,)) - - -class EvaluatorWorker(base.GraphCloneWorkerBase): - def __init__(self, - conf, - task_queue, - e_graph, - worker_index, - workers_num): - super(EvaluatorWorker, self).__init__(conf, task_queue, e_graph) - self._worker_index = worker_index - self._workers_num = workers_num - self._evaluator = None - - def name(self): - return "(%s)" % str(self._worker_index) - - def start(self): - super(EvaluatorWorker, self).start() - scenario_repo = ScenarioRepository(self._conf, self._worker_index, - self._workers_num) - actions_callback = VitrageNotifier( - conf=self._conf, - publisher_id='vitrage_evaluator', - topics=[EVALUATOR_TOPIC]).notify - self._evaluator = ScenarioEvaluator( - self._conf, - self._entity_graph, - scenario_repo, - actions_callback, - enabled=False) - self._evaluator.scenario_repo.log_enabled_scenarios() - - def do_task(self, task): - super(EvaluatorWorker, self).do_task(task) - action = task[0] - if action == START_EVALUATION: - self._evaluator.run_evaluator() - elif action == RELOAD_TEMPLATES: - self._reload_templates() - - def _reload_templates(self): - scenario_repo = ScenarioRepository(self._conf, self._worker_index, - self._workers_num) - self._evaluator.scenario_repo = scenario_repo - LOG.info("reloading evaluator scenarios") - self._evaluator.scenario_repo.log_enabled_scenarios() diff --git a/vitrage/evaluator/template_loader_service.py b/vitrage/evaluator/template_loader_service.py deleted file mode 100644 index 39d285847..000000000 --- a/vitrage/evaluator/template_loader_service.py +++ /dev/null @@ -1,122 +0,0 @@ -# Copyright 2017 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import multiprocessing - -from oslo_log import log - -from vitrage.common.constants import TemplateStatus as TStatus -from vitrage.common.constants import TemplateTypes as TType -from vitrage.common.exception import VitrageError -from vitrage.entity_graph import EVALUATOR_TOPIC -from vitrage.entity_graph.graph_clone import base -from vitrage.evaluator.actions.base import ActionMode -from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator -from vitrage.evaluator.scenario_repository import ScenarioRepository -from vitrage.messaging import VitrageNotifier - -LOG = log.getLogger(__name__) - -TEMPLATE_ACTION = 'template_action' -ADD = 'add' -DELETE = 'delete' - - -class TemplateLoaderManager(base.GraphCloneManagerBase): - - def __init__(self, conf, entity_graph, db): - super(TemplateLoaderManager, self).__init__(conf, entity_graph, 1) - self._db = db - - def _run_worker(self, worker_index, workers_num): - tasks_queue = multiprocessing.JoinableQueue() - w = TemplateLoaderWorker( - self._conf, - tasks_queue, - self._entity_graph) - self._p_launcher.launch_service(w) - return tasks_queue - - def handle_template_event(self, event): - template_action = event.get(TEMPLATE_ACTION) - - if template_action == ADD: - templates = self._db.templates.query(status=TStatus.LOADING) - new_status = TStatus.ACTIVE - action_mode = ActionMode.DO - elif template_action == DELETE: - templates = self._db.templates.query(status=TStatus.DELETING) - new_status = TStatus.DELETED - action_mode = ActionMode.UNDO - else: - raise VitrageError('Invalid template_action %s' % template_action) - - self._template_worker_task( - [t.name for t in templates if t.template_type == TType.STANDARD], - action_mode) - - for t in templates: - self._db.templates.update(t.uuid, 'status', new_status) - - def _template_worker_task(self, template_names, action_mode): - self._notify_and_wait((TEMPLATE_ACTION, template_names, action_mode)) - - -class TemplateLoaderWorker(base.GraphCloneWorkerBase): - def __init__(self, - conf, - task_queue, - e_graph): - super(TemplateLoaderWorker, self).__init__(conf, task_queue, e_graph) - self._evaluator = None - - def start(self): - super(TemplateLoaderWorker, self).start() - actions_callback = VitrageNotifier( - conf=self._conf, - publisher_id='vitrage_evaluator', - topics=[EVALUATOR_TOPIC]).notify - self._evaluator = ScenarioEvaluator( - self._conf, - self._entity_graph, - None, - actions_callback, - enabled=False) - - def do_task(self, task): - super(TemplateLoaderWorker, self).do_task(task) - action = task[0] - if action == TEMPLATE_ACTION: - (action, template_names, action_mode) = task - self._template_action(template_names, action_mode) - - def _template_action(self, template_names, action_mode): - self._enable_evaluator_templates(template_names) - self._evaluator.run_evaluator(action_mode) - self._disable_evaluator() - - def _enable_evaluator_templates(self, template_names): - scenario_repo = ScenarioRepository(self._conf) - for s in scenario_repo._all_scenarios: - s.enabled = False - for template_name in template_names: - if s.id.startswith(template_name): - s.enabled = True - self._evaluator.scenario_repo = scenario_repo - self._evaluator.scenario_repo.log_enabled_scenarios() - self._evaluator.enabled = True - - def _disable_evaluator(self): - self._entity_graph.notifier._subscriptions = [] # Quick n dirty - self._evaluator.enabled = False diff --git a/vitrage/persistency/graph_persistor.py b/vitrage/persistency/graph_persistor.py index e0f46b8e7..779188398 100644 --- a/vitrage/persistency/graph_persistor.py +++ b/vitrage/persistency/graph_persistor.py @@ -35,6 +35,7 @@ class GraphPersistor(object): self.last_event_timestamp = datetime.datetime.utcnow() def store_graph(self, graph): + LOG.info('Graph persistency running..') try: graph_snapshot = graph.write_gpickle() db_row = models.GraphSnapshot( diff --git a/vitrage/tests/functional/entity_graph/consistency/test_consistency.py b/vitrage/tests/functional/entity_graph/consistency/test_consistency.py index 82ab97cc6..fdd43a60b 100644 --- a/vitrage/tests/functional/entity_graph/consistency/test_consistency.py +++ b/vitrage/tests/functional/entity_graph/consistency/test_consistency.py @@ -14,7 +14,6 @@ from datetime import timedelta import time -import unittest from oslo_config import cfg from six.moves import queue @@ -30,9 +29,6 @@ from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE from vitrage.entity_graph.consistency.consistency_enforcer \ import ConsistencyEnforcer from vitrage.entity_graph.processor.processor import Processor -from vitrage.entity_graph.vitrage_init import VitrageInit -from vitrage.evaluator.actions.evaluator_event_transformer \ - import VITRAGE_DATASOURCE from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator from vitrage.evaluator.scenario_repository import ScenarioRepository from vitrage.graph.driver.networkx_graph import NXGraph @@ -81,9 +77,7 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration): cls.add_db(cls.conf) cls.load_datasources(cls.conf) cls.graph = NXGraph("Entity Graph") - cls.initialization_status = VitrageInit(cls.conf, cls.graph) - cls.processor = Processor(cls.conf, cls.initialization_status, - cls.graph) + cls.processor = Processor(cls.conf, lambda x: x, cls.graph) cls.event_queue = queue.Queue() @@ -104,68 +98,8 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration): actions_callback) cls.consistency_enforcer = ConsistencyEnforcer( cls.conf, - actions_callback, - cls.processor.entity_graph) - - @unittest.skip("test_initializing_process skipping") - def test_initializing_process(self): - # Setup - num_of_host_alarms = self.NUM_HOSTS - 2 - num_instances_per_host = 4 - self._create_processor_with_graph(self.conf, processor=self.processor) - self._add_alarms() - self._set_end_messages() - self.assertThat(self.processor.entity_graph.get_vertices(), - matchers.HasLength( - self._num_total_expected_vertices() + - num_of_host_alarms + self.NUM_INSTANCES) - ) - - # Action - # eventlet.spawn(self._process_events) - # processor_thread = threading.Thread(target=self._process_events) - # processor_thread.start() - self.initialization_status.initializing_process( - self.processor.on_recieved_all_end_messages) - self._process_events() - - # Test Assertions - num_correct_alarms = num_of_host_alarms + \ - num_of_host_alarms * num_instances_per_host - num_undeleted_vertices_in_graph = \ - len(self.processor.entity_graph.get_vertices(vertex_attr_filter={ - VProps.VITRAGE_IS_DELETED: False - })) - self.assertEqual(self._num_total_expected_vertices() + - num_correct_alarms, - num_undeleted_vertices_in_graph) - - alarm_vertices_in_graph = self.processor.entity_graph.get_vertices({ - VProps.VITRAGE_CATEGORY: EntityCategory.ALARM, - VProps.VITRAGE_IS_DELETED: False - }) - self.assertThat(alarm_vertices_in_graph, - matchers.HasLength(num_correct_alarms)) - - is_deleted_alarm_vertices_in_graph = \ - self.processor.entity_graph.get_vertices({ - VProps.VITRAGE_CATEGORY: EntityCategory.ALARM, - VProps.VITRAGE_IS_DELETED: True - }) - self.assertEqual(is_deleted_alarm_vertices_in_graph, - matchers.HasLength( - num_of_host_alarms * num_instances_per_host) - ) - - instance_vertices = self.processor.entity_graph.get_vertices({ - VProps.VITRAGE_CATEGORY: EntityCategory.ALARM, - VProps.VITRAGE_TYPE: VITRAGE_DATASOURCE, - VProps.VITRAGE_IS_DELETED: False - }) - self.assertThat(instance_vertices, - matchers.HasLength( - num_of_host_alarms * num_instances_per_host) - ) + cls.processor.entity_graph, + actions_callback) def test_periodic_process(self): # Setup @@ -234,8 +168,6 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration): self.initialization_status.end_messages[NOVA_INSTANCE_DATASOURCE] = \ True self.initialization_status.end_messages[NAGIOS_DATASOURCE] = True - self.initialization_status.status = \ - self.initialization_status.RECEIVED_ALL_END_MESSAGES def _add_alarms(self): # find hosts and instances diff --git a/vitrage/tests/unit/entity_graph/base.py b/vitrage/tests/unit/entity_graph/base.py index d4273846c..f146e7ff1 100644 --- a/vitrage/tests/unit/entity_graph/base.py +++ b/vitrage/tests/unit/entity_graph/base.py @@ -25,7 +25,6 @@ from vitrage.datasources.nova.host import NOVA_HOST_DATASOURCE from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE from vitrage.entity_graph.processor import processor as proc -from vitrage.entity_graph.vitrage_init import VitrageInit from vitrage.graph.driver.networkx_graph import NXGraph import vitrage.graph.utils as graph_utils from vitrage.opts import register_opts @@ -128,8 +127,7 @@ class TestEntityGraphUnitBase(base.BaseTest): @staticmethod def create_processor_and_graph(conf): e_graph = NXGraph("Entity Graph") - init = VitrageInit(conf) - return proc.Processor(conf, init, e_graph) + return proc.Processor(conf, e_graph=e_graph) @staticmethod def _create_event(spec_type=None, diff --git a/vitrage/tests/unit/entity_graph/processor/base.py b/vitrage/tests/unit/entity_graph/processor/base.py index 8b608b929..282f898a3 100644 --- a/vitrage/tests/unit/entity_graph/processor/base.py +++ b/vitrage/tests/unit/entity_graph/processor/base.py @@ -15,7 +15,7 @@ from oslo_config import cfg from vitrage.common.constants import VertexProperties as VProps -from vitrage.entity_graph import transformer_manager +from vitrage.entity_graph.processor import transformer_manager from vitrage.graph import driver as graph from vitrage.tests.unit.entity_graph.base import TestEntityGraphUnitBase diff --git a/vitrage/tests/unit/entity_graph/test_processor_service.py b/vitrage/tests/unit/entity_graph/test_processor_service.py index 3c0063f80..68de7e269 100644 --- a/vitrage/tests/unit/entity_graph/test_processor_service.py +++ b/vitrage/tests/unit/entity_graph/test_processor_service.py @@ -13,15 +13,15 @@ # under the License. import threading -from vitrage.entity_graph.service import TwoPriorityListener +from vitrage.entity_graph.graph_init import EventsCoordination from vitrage.tests import base -class TwoPriorityListenerTest(base.BaseTest): +class EventsCoordinationTest(base.BaseTest): @classmethod def setUpClass(cls): - super(TwoPriorityListenerTest, cls).setUpClass() + super(EventsCoordinationTest, cls).setUpClass() cls.calc_result = 0 def do_work(self, x): @@ -39,7 +39,7 @@ class TwoPriorityListenerTest(base.BaseTest): the result should be the number of low priority calls. 0*(2^n) + 1*n """ - priority_listener = TwoPriorityListener(None, self.do_work, None, None) + priority_listener = EventsCoordination(None, self.do_work, None, None) def write_high(): for i in range(10000): diff --git a/vitrage/tests/unit/entity_graph/test_transformer_manager.py b/vitrage/tests/unit/entity_graph/test_transformer_manager.py index c52617390..9c6d90247 100644 --- a/vitrage/tests/unit/entity_graph/test_transformer_manager.py +++ b/vitrage/tests/unit/entity_graph/test_transformer_manager.py @@ -22,7 +22,8 @@ from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE from vitrage.datasources.nova.instance.transformer import InstanceTransformer from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE from vitrage.datasources.nova.zone.transformer import ZoneTransformer -from vitrage.entity_graph.transformer_manager import TransformerManager +from vitrage.entity_graph.processor.transformer_manager import\ + TransformerManager from vitrage.opts import register_opts from vitrage.tests import base