diff --git a/vitrage/api_handler/service.py b/vitrage/api_handler/service.py deleted file mode 100644 index bd0f3f87d..000000000 --- a/vitrage/api_handler/service.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright 2016 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_log import log -import oslo_messaging - -from vitrage.common.utils import spawn -from vitrage.entity_graph import EVALUATOR_TOPIC -from vitrage.messaging import VitrageNotifier - -from vitrage.api_handler.apis.alarm import AlarmApis -from vitrage.api_handler.apis.event import EventApis -from vitrage.api_handler.apis.rca import RcaApis -from vitrage.api_handler.apis.resource import ResourceApis -from vitrage.api_handler.apis.template import TemplateApis -from vitrage.api_handler.apis.topology import TopologyApis -from vitrage.api_handler.apis.webhook import WebhookApis -from vitrage import messaging -from vitrage import rpc as vitrage_rpc -from vitrage import storage - -LOG = log.getLogger(__name__) - - -class VitrageApiHandlerService(object): - - def __init__(self, conf, e_graph): - super(VitrageApiHandlerService, self).__init__() - self.conf = conf - self.entity_graph = e_graph - self.notifier = VitrageNotifier(self.conf, "vitrage.api", - [EVALUATOR_TOPIC]) - self.db = storage.get_connection_from_config(conf) - - def start(self): - spawn(self._start) - - def _start(self): - LOG.info("Vitrage Api Handler Service - Starting...") - - transport = messaging.get_rpc_transport(self.conf) - rabbit_hosts = self.conf.oslo_messaging_rabbit.rabbit_hosts - target = oslo_messaging.Target(topic=self.conf.rpc_topic, - server=rabbit_hosts) - - endpoints = [TopologyApis(self.entity_graph, self.conf), - AlarmApis(self.entity_graph, self.conf), - RcaApis(self.entity_graph, self.conf), - TemplateApis(self.notifier, self.db), - EventApis(self.conf), - ResourceApis(self.entity_graph, self.conf), - WebhookApis(self.conf)] - - server = vitrage_rpc.get_server(target, endpoints, transport) - - server.start() - - LOG.info("Vitrage Api Handler Service - Started!") diff --git a/vitrage/cli/graph.py b/vitrage/cli/graph.py index 969302dae..0557312c7 100644 --- a/vitrage/cli/graph.py +++ b/vitrage/cli/graph.py @@ -15,7 +15,6 @@ import sys -from vitrage.api_handler.service import VitrageApiHandlerService from vitrage.cli import VITRAGE_TITLE from vitrage.entity_graph import get_graph_driver from vitrage.entity_graph.graph_init import VitrageGraphInit @@ -32,7 +31,6 @@ def main(): db_connection = storage.get_connection_from_config(conf) clear_active_actions_table(db_connection) - VitrageApiHandlerService(conf, e_graph).start() VitrageGraphInit(conf, e_graph, db_connection).run() diff --git a/vitrage/entity_graph/workers.py b/vitrage/entity_graph/workers.py index b826ac203..8953d9332 100644 --- a/vitrage/entity_graph/workers.py +++ b/vitrage/entity_graph/workers.py @@ -17,7 +17,15 @@ import multiprocessing from oslo_concurrency import processutils as ps from oslo_log import log +import oslo_messaging +from vitrage.api_handler.apis.alarm import AlarmApis +from vitrage.api_handler.apis.event import EventApis +from vitrage.api_handler.apis.rca import RcaApis +from vitrage.api_handler.apis.resource import ResourceApis +from vitrage.api_handler.apis.template import TemplateApis +from vitrage.api_handler.apis.topology import TopologyApis +from vitrage.api_handler.apis.webhook import WebhookApis from vitrage.common.constants import TemplateStatus as TStatus from vitrage.common.constants import TemplateTypes as TType from vitrage.common.exception import VitrageError @@ -25,7 +33,9 @@ from vitrage.entity_graph import EVALUATOR_TOPIC from vitrage.evaluator.actions.base import ActionMode from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator from vitrage.evaluator.scenario_repository import ScenarioRepository -from vitrage.messaging import VitrageNotifier +from vitrage import messaging +from vitrage import rpc as vitrage_rpc +from vitrage import storage LOG = log.getLogger(__name__) @@ -53,9 +63,11 @@ class GraphWorkersManager(cotyledon.ServiceManager): self._db = db self._evaluator_queues = [] self._template_queues = [] + self._api_queues = [] self._all_queues = [] self.add_evaluator_workers() self.add_template_workers() + self.add_api_workers() def add_evaluator_workers(self): """Add evaluator workers @@ -101,6 +113,24 @@ class GraphWorkersManager(cotyledon.ServiceManager): self._template_queues = queues self._all_queues.extend(queues) + def add_api_workers(self): + """Add Api workers + + Api workers receive all graph updates, hence are updated. + Each template worker holds a disabled scenario-evaluator that does + not process changes. + These also hold a rpc server and process the incoming Api calls + """ + if self._api_queues: + raise VitrageError('add_api_workers called more than once') + workers = self._conf.api.workers + queues = [multiprocessing.JoinableQueue() for i in range(workers)] + self.add(ApiWorker, + args=(self._conf, queues, self._entity_graph), + workers=workers) + self._api_queues = queues + self._all_queues.extend(queues) + def submit_graph_update(self, before, current, is_vertex, *args, **kwargs): """Graph update all workers @@ -243,7 +273,7 @@ class EvaluatorWorker(GraphCloneWorkerBase): def _init_instance(self): scenario_repo = ScenarioRepository(self._conf, self.worker_id, self._workers_num) - actions_callback = VitrageNotifier( + actions_callback = messaging.VitrageNotifier( conf=self._conf, publisher_id='vitrage_evaluator', topics=[EVALUATOR_TOPIC]).notify @@ -286,7 +316,7 @@ class TemplateLoaderWorker(GraphCloneWorkerBase): name = 'TemplateLoaderWorker' def _init_instance(self): - actions_callback = VitrageNotifier( + actions_callback = messaging.VitrageNotifier( conf=self._conf, publisher_id='vitrage_evaluator', topics=[EVALUATOR_TOPIC]).notify @@ -323,3 +353,33 @@ class TemplateLoaderWorker(GraphCloneWorkerBase): def _disable_evaluator(self): self._entity_graph.notifier._subscriptions = [] # Quick n dirty self._evaluator.enabled = False + + +class ApiWorker(GraphCloneWorkerBase): + + name = 'ApiWorker' + + def _init_instance(self): + conf = self._conf + LOG.info("Vitrage Api Handler Service - Starting...") + notifier = messaging.VitrageNotifier(conf, "vitrage.api", + [EVALUATOR_TOPIC]) + db = storage.get_connection_from_config(conf) + transport = messaging.get_rpc_transport(conf) + rabbit_hosts = conf.oslo_messaging_rabbit.rabbit_hosts + target = oslo_messaging.Target(topic=conf.rpc_topic, + server=rabbit_hosts) + + endpoints = [TopologyApis(self._entity_graph, conf), + AlarmApis(self._entity_graph, conf), + RcaApis(self._entity_graph, conf), + TemplateApis(notifier, db), + EventApis(conf), + ResourceApis(self._entity_graph, conf), + WebhookApis(conf)] + + server = vitrage_rpc.get_server(target, endpoints, transport) + + server.start() + + LOG.info("Vitrage Api Handler Service - Started!")