refactoring vitrage processes
oslo service replaced by cotyledon. Motivation: Oslo service uses eventlet and green threads for timers and threads. In Vitrage we use real threads and multiprocessing, it seems this combination causes unexpected behavior (such as timers not running, processes not starting) so the entire solution is very delicate to changes. As part of an ongoing effort (previously, monkeypatching was removed) this commit solves issues encountered in rpc_collector commit in addition it removes unnecessary complications in the application. Co-Authored-By: Eyal <eyal.bar-ilan@nokia.com> Change-Id: If33708e128d7aeb420a9aa79f71cfccd2b48dfb3
This commit is contained in:
parent
683b7a3cdb
commit
af3ab0fdd2
@ -16,6 +16,7 @@ from oslo_log import log
|
||||
import oslo_messaging
|
||||
from oslo_service import service as os_service
|
||||
|
||||
from vitrage.common.utils import spawn
|
||||
from vitrage.entity_graph import EVALUATOR_TOPIC
|
||||
from vitrage.messaging import VitrageNotifier
|
||||
|
||||
@ -44,6 +45,9 @@ class VitrageApiHandlerService(os_service.Service):
|
||||
self.db = storage.get_connection_from_config(conf)
|
||||
|
||||
def start(self):
|
||||
spawn(self._start)
|
||||
|
||||
def _start(self):
|
||||
LOG.info("Vitrage Api Handler Service - Starting...")
|
||||
|
||||
super(VitrageApiHandlerService, self).start()
|
||||
|
@ -15,43 +15,25 @@
|
||||
|
||||
import sys
|
||||
|
||||
from oslo_service import service as os_service
|
||||
|
||||
from vitrage.api_handler.service import VitrageApiHandlerService
|
||||
from vitrage.cli import VITRAGE_TITLE
|
||||
from vitrage import entity_graph
|
||||
from vitrage.entity_graph.consistency.service import VitrageConsistencyService
|
||||
from vitrage.entity_graph import get_graph_driver
|
||||
from vitrage.entity_graph.graph_init import VitrageGraphInit
|
||||
from vitrage import service
|
||||
from vitrage import storage
|
||||
|
||||
from vitrage.entity_graph.service import VitrageGraphService
|
||||
from vitrage.evaluator.evaluator_service import EvaluatorManager
|
||||
|
||||
|
||||
def main():
|
||||
"""Starts all the Entity graph services
|
||||
|
||||
1. Starts the Entity graph service
|
||||
2. Starts the api_handler service
|
||||
3. Starts the Consistency service
|
||||
"""
|
||||
"""Main method of vitrage-graph"""
|
||||
|
||||
print(VITRAGE_TITLE)
|
||||
conf = service.prepare_service()
|
||||
e_graph = entity_graph.get_graph_driver(conf)('Entity Graph')
|
||||
evaluator = EvaluatorManager(conf, e_graph)
|
||||
launcher = os_service.ServiceLauncher(conf)
|
||||
e_graph = get_graph_driver(conf)('Entity Graph')
|
||||
db_connection = storage.get_connection_from_config(conf)
|
||||
clear_active_actions_table(db_connection)
|
||||
|
||||
launcher.launch_service(VitrageGraphService(
|
||||
conf, e_graph, evaluator, db_connection))
|
||||
|
||||
launcher.launch_service(VitrageApiHandlerService(conf, e_graph))
|
||||
|
||||
launcher.launch_service(VitrageConsistencyService(conf, e_graph))
|
||||
|
||||
launcher.wait()
|
||||
VitrageApiHandlerService(conf, e_graph).start()
|
||||
VitrageGraphInit(conf, e_graph, db_connection).run()
|
||||
|
||||
|
||||
def clear_active_actions_table(db_connection):
|
||||
@ -62,5 +44,6 @@ def clear_active_actions_table(db_connection):
|
||||
"""
|
||||
db_connection.active_actions.delete()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
@ -20,6 +20,7 @@ from collections import defaultdict
|
||||
import copy
|
||||
import itertools
|
||||
import random
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
@ -79,3 +80,10 @@ def get_portion(lst, num_of_portions, portion_index):
|
||||
curr_portion = next(g)
|
||||
portions[curr_portion].append(curr_item)
|
||||
return portions[portion_index]
|
||||
|
||||
|
||||
def spawn(target, *args, **kwargs):
|
||||
t = threading.Thread(target=target, args=args, kwargs=kwargs)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
return t
|
||||
|
@ -24,8 +24,10 @@ from vitrage.common.constants import GraphAction
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
from vitrage.datasources.consistency import CONSISTENCY_DATASOURCE
|
||||
from vitrage.datasources import OPENSTACK_CLUSTER
|
||||
from vitrage.entity_graph import EVALUATOR_TOPIC
|
||||
from vitrage.evaluator.actions.evaluator_event_transformer \
|
||||
import VITRAGE_DATASOURCE
|
||||
from vitrage.messaging import VitrageNotifier
|
||||
from vitrage.utils.datetime import utcnow
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -35,10 +37,11 @@ class ConsistencyEnforcer(object):
|
||||
|
||||
def __init__(self,
|
||||
conf,
|
||||
actions_callback,
|
||||
entity_graph):
|
||||
entity_graph,
|
||||
actions_callback=None):
|
||||
self.conf = conf
|
||||
self.actions_callback = actions_callback
|
||||
self.actions_callback = actions_callback or VitrageNotifier(
|
||||
conf, 'vitrage_consistency', [EVALUATOR_TOPIC]).notify
|
||||
self.graph = entity_graph
|
||||
|
||||
def periodic_process(self):
|
||||
|
@ -1,58 +0,0 @@
|
||||
# Copyright 2016 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_service import service as os_service
|
||||
|
||||
from vitrage.entity_graph.consistency.consistency_enforcer \
|
||||
import ConsistencyEnforcer
|
||||
from vitrage.entity_graph import EVALUATOR_TOPIC
|
||||
from vitrage.messaging import VitrageNotifier
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class VitrageConsistencyService(os_service.Service):
|
||||
|
||||
def __init__(self,
|
||||
conf,
|
||||
entity_graph):
|
||||
super(VitrageConsistencyService, self).__init__()
|
||||
self.conf = conf
|
||||
self.entity_graph = entity_graph
|
||||
self.actions_notifier = VitrageNotifier(
|
||||
conf, 'vitrage_consistency', [EVALUATOR_TOPIC])
|
||||
|
||||
def start(self):
|
||||
LOG.info("Vitrage Consistency Service - Starting...")
|
||||
|
||||
super(VitrageConsistencyService, self).start()
|
||||
|
||||
consistency_enf = ConsistencyEnforcer(
|
||||
conf=self.conf,
|
||||
actions_callback=self.actions_notifier.notify,
|
||||
entity_graph=self.entity_graph)
|
||||
self.tg.add_timer(self.conf.datasources.snapshots_interval,
|
||||
consistency_enf.periodic_process,
|
||||
initial_delay=60 +
|
||||
self.conf.datasources.snapshots_interval)
|
||||
|
||||
LOG.info("Vitrage Consistency Service - Started!")
|
||||
|
||||
def stop(self, graceful=False):
|
||||
LOG.info("Vitrage Consistency Service - Stopping...")
|
||||
|
||||
super(VitrageConsistencyService, self).stop()
|
||||
|
||||
LOG.info("Vitrage Consistency Service - Stopped!")
|
@ -1,14 +0,0 @@
|
||||
# Copyright 2018 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
__author__ = 'stack'
|
@ -1,137 +0,0 @@
|
||||
# Copyright 2018 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import abc
|
||||
import setproctitle
|
||||
import time
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_service import service as os_service
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
GRAPH_UPDATE = 'graph_update'
|
||||
POISON_PILL = None
|
||||
|
||||
|
||||
class GraphCloneManagerBase(object):
|
||||
|
||||
def __init__(self, conf, entity_graph, worker_num):
|
||||
self._conf = conf
|
||||
self._entity_graph = entity_graph
|
||||
self._workers_num = worker_num
|
||||
self._worker_queues = list()
|
||||
self._p_launcher = os_service.ProcessLauncher(conf)
|
||||
|
||||
def start(self):
|
||||
LOG.info('%s start %s processes', self.__class__.__name__,
|
||||
self._workers_num)
|
||||
for i in range(self._workers_num):
|
||||
worker_queue = self._run_worker(i, self._workers_num)
|
||||
self._worker_queues.append(worker_queue)
|
||||
self.before_subscribe()
|
||||
self._entity_graph.subscribe(self.notify_graph_update)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _run_worker(self, worker_index, workers_num):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def before_subscribe(self):
|
||||
pass
|
||||
|
||||
def notify_graph_update(self, before, current, is_vertex, *args, **kwargs):
|
||||
"""Notify all workers
|
||||
|
||||
This method is subscribed to entity graph changes.
|
||||
Per each change in the main entity graph, this method will notify
|
||||
each of the evaluators, causing them to update their own graph.
|
||||
"""
|
||||
self._notify_and_wait((GRAPH_UPDATE, before, current, is_vertex))
|
||||
|
||||
def _notify_and_wait(self, payload):
|
||||
for q in self._worker_queues:
|
||||
q.put(payload)
|
||||
time.sleep(0) # context switch before join
|
||||
for q in self._worker_queues:
|
||||
q.join()
|
||||
|
||||
def stop_all_workers(self):
|
||||
self._notify_and_wait(POISON_PILL)
|
||||
for q in self._worker_queues:
|
||||
q.close()
|
||||
self._worker_queues = list()
|
||||
|
||||
|
||||
class GraphCloneWorkerBase(os_service.Service):
|
||||
def __init__(self,
|
||||
conf,
|
||||
task_queue,
|
||||
entity_graph):
|
||||
super(GraphCloneWorkerBase, self).__init__()
|
||||
self._conf = conf
|
||||
self._task_queue = task_queue
|
||||
self._entity_graph = entity_graph
|
||||
|
||||
def name(self):
|
||||
return ''
|
||||
|
||||
def start(self):
|
||||
super(GraphCloneWorkerBase, self).start()
|
||||
try:
|
||||
setproctitle.setproctitle('{} {} {}'.format(
|
||||
'vitrage-graph',
|
||||
self.__class__.__name__,
|
||||
self.name()))
|
||||
except Exception:
|
||||
LOG.warning('failed to set process name')
|
||||
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
|
||||
self.tg.add_thread(self._read_queue)
|
||||
LOG.info("%s - Started!", self.__class__.__name__)
|
||||
|
||||
def _read_queue(self):
|
||||
while True:
|
||||
try:
|
||||
next_task = self._task_queue.get()
|
||||
if next_task is POISON_PILL:
|
||||
self._task_queue.task_done()
|
||||
break
|
||||
self.do_task(next_task)
|
||||
except Exception as e:
|
||||
LOG.exception("Graph may not be in sync: exception %s", e)
|
||||
self._task_queue.task_done()
|
||||
# Evaluator queue may have been updated, thus the sleep:
|
||||
time.sleep(0)
|
||||
|
||||
def do_task(self, task):
|
||||
action = task[0]
|
||||
if action == GRAPH_UPDATE:
|
||||
(action, before, current, is_vertex) = task
|
||||
self._graph_update(before, current, is_vertex)
|
||||
|
||||
def _graph_update(self, before, current, is_vertex):
|
||||
if current:
|
||||
if is_vertex:
|
||||
self._entity_graph.add_vertex(current)
|
||||
else:
|
||||
self._entity_graph.add_edge(current)
|
||||
else:
|
||||
if is_vertex:
|
||||
self._entity_graph.delete_vertex(before)
|
||||
else:
|
||||
self._entity_graph.delete_edge(before)
|
||||
|
||||
def stop(self, graceful=False):
|
||||
super(GraphCloneWorkerBase, self).stop(graceful)
|
||||
LOG.info("%s - Stopped!", self.__class__.__name__)
|
@ -1,99 +1,84 @@
|
||||
# Copyright 2015 - Alcatel-Lucent
|
||||
# Copyright 2018 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import oslo_messaging
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_service import service as os_service
|
||||
import oslo_messaging
|
||||
|
||||
from vitrage.common.utils import spawn
|
||||
from vitrage.entity_graph import EVALUATOR_TOPIC
|
||||
from vitrage.entity_graph.processor.processor import Processor
|
||||
from vitrage.entity_graph.vitrage_init import VitrageInit
|
||||
from vitrage.evaluator.template_loader_service import TemplateLoaderManager
|
||||
from vitrage.entity_graph.scheduler import Scheduler
|
||||
from vitrage.entity_graph.workers import GraphWorkersManager
|
||||
from vitrage import messaging
|
||||
from vitrage.persistency.graph_persistor import GraphPersistor
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class VitrageGraphService(os_service.Service):
|
||||
|
||||
def __init__(self,
|
||||
conf,
|
||||
graph,
|
||||
evaluator,
|
||||
db):
|
||||
super(VitrageGraphService, self).__init__()
|
||||
class VitrageGraphInit(object):
|
||||
def __init__(self, conf, graph, db_connection):
|
||||
self.conf = conf
|
||||
self.graph = graph
|
||||
self.evaluator = evaluator
|
||||
self.templates_loader = TemplateLoaderManager(conf, graph, db)
|
||||
self.init = VitrageInit(conf, graph, self.evaluator,
|
||||
self.templates_loader)
|
||||
self.graph_persistor = GraphPersistor(conf) if \
|
||||
self.conf.persistency.enable_persistency else None
|
||||
self.processor = Processor(self.conf, self.init, graph,
|
||||
self.graph_persistor)
|
||||
self.listener = self._init_listener()
|
||||
|
||||
def _init_listener(self):
|
||||
collector_topic = self.conf.datasources.notification_topic_collector
|
||||
evaluator_topic = EVALUATOR_TOPIC
|
||||
return TwoPriorityListener(
|
||||
self.conf,
|
||||
self.workers = GraphWorkersManager(conf, graph, db_connection)
|
||||
self.events_coordination = EventsCoordination(
|
||||
conf,
|
||||
self.process_event,
|
||||
collector_topic,
|
||||
evaluator_topic)
|
||||
conf.datasources.notification_topic_collector,
|
||||
EVALUATOR_TOPIC)
|
||||
self.end_messages = {}
|
||||
self.scheduler = Scheduler(conf, graph)
|
||||
self.processor = Processor(conf,
|
||||
self._handle_end_message,
|
||||
graph,
|
||||
self.scheduler.graph_persistor)
|
||||
|
||||
def run(self):
|
||||
LOG.info('Init Started')
|
||||
self.events_coordination.start()
|
||||
self._wait_for_all_end_messages()
|
||||
self.scheduler.start_periodic_tasks()
|
||||
self.processor.start_notifier()
|
||||
spawn(self.workers.submit_start_evaluations)
|
||||
self.workers.run()
|
||||
|
||||
def process_event(self, event):
|
||||
if event.get('template_action'):
|
||||
self.templates_loader.handle_template_event(event)
|
||||
self.evaluator.reload_evaluators_templates()
|
||||
self.workers.submit_template_event(event)
|
||||
self.workers.submit_evaluators_reload_templates()
|
||||
else:
|
||||
self.processor.process_event(event)
|
||||
|
||||
def start(self):
|
||||
LOG.info("Vitrage Graph Service - Starting...")
|
||||
super(VitrageGraphService, self).start()
|
||||
if self.graph_persistor:
|
||||
self.tg.add_timer(
|
||||
self.conf.persistency.graph_persistency_interval,
|
||||
self.graph_persistor.store_graph,
|
||||
self.conf.persistency.graph_persistency_interval,
|
||||
graph=self.graph)
|
||||
self.tg.add_thread(
|
||||
self.init.initializing_process,
|
||||
on_end_messages_func=self.processor.on_recieved_all_end_messages)
|
||||
self.listener.start()
|
||||
LOG.info("Vitrage Graph Service - Started!")
|
||||
def _handle_end_message(self, vitrage_type):
|
||||
self.end_messages[vitrage_type] = True
|
||||
|
||||
def stop(self, graceful=False):
|
||||
LOG.info("Vitrage Graph Service - Stopping...")
|
||||
self.evaluator.stop_all_workers()
|
||||
self.templates_loader.stop_all_workers()
|
||||
self.listener.stop()
|
||||
self.listener.wait()
|
||||
super(VitrageGraphService, self).stop(graceful)
|
||||
|
||||
LOG.info("Vitrage Graph Service - Stopped!")
|
||||
def _wait_for_all_end_messages(self):
|
||||
start = time.time()
|
||||
timeout = self.conf.consistency.initialization_max_retries * \
|
||||
self.conf.consistency.initialization_interval
|
||||
while time.time() < start + timeout:
|
||||
if len(self.end_messages) == len(self.conf.datasources.types):
|
||||
LOG.info('end messages received')
|
||||
return True
|
||||
time.sleep(0.2)
|
||||
LOG.warning('Missing end messages %s', self.end_messages.keys())
|
||||
return False
|
||||
|
||||
|
||||
PRIORITY_DELAY = 0.05
|
||||
|
||||
|
||||
class TwoPriorityListener(object):
|
||||
class EventsCoordination(object):
|
||||
def __init__(self, conf, do_work_func, topic_low, topic_high):
|
||||
self._conf = conf
|
||||
self._lock = threading.Lock()
|
||||
@ -103,7 +88,7 @@ class TwoPriorityListener(object):
|
||||
try:
|
||||
return do_work_func(event)
|
||||
except Exception as e:
|
||||
LOG.exception('Got Exception %s', e)
|
||||
LOG.exception('Got Exception %s for event %s', e, str(event))
|
||||
|
||||
self._do_work_func = do_work
|
||||
|
||||
@ -114,7 +99,9 @@ class TwoPriorityListener(object):
|
||||
|
||||
def start(self):
|
||||
self._high_pri_listener.start()
|
||||
LOG.info('Listening on %s', self._high_pri_listener.targets[0].topic)
|
||||
self._low_pri_listener.start()
|
||||
LOG.info('Listening on %s', self._low_pri_listener.targets[0].topic)
|
||||
|
||||
def stop(self):
|
||||
self._low_pri_listener.stop()
|
@ -23,7 +23,8 @@ from vitrage.entity_graph.mappings.datasource_info_mapper import \
|
||||
from vitrage.entity_graph.processor import base as processor
|
||||
from vitrage.entity_graph.processor.notifier import GraphNotifier
|
||||
from vitrage.entity_graph.processor import processor_utils as PUtils
|
||||
from vitrage.entity_graph.transformer_manager import TransformerManager
|
||||
from vitrage.entity_graph.processor.transformer_manager import \
|
||||
TransformerManager
|
||||
from vitrage.graph import Direction
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -31,14 +32,14 @@ LOG = log.getLogger(__name__)
|
||||
|
||||
class Processor(processor.ProcessorBase):
|
||||
|
||||
def __init__(self, conf, initialization_status, e_graph,
|
||||
def __init__(self, conf, end_messages_func=None, e_graph=None,
|
||||
graph_persistor=None):
|
||||
super(Processor, self).__init__()
|
||||
self.conf = conf
|
||||
self.transformer_manager = TransformerManager(self.conf)
|
||||
self.info_mapper = DatasourceInfoMapper(self.conf)
|
||||
self._initialize_events_actions()
|
||||
self.initialization_status = initialization_status
|
||||
self.end_messages_func = end_messages_func
|
||||
self.entity_graph = e_graph
|
||||
self._notifier = GraphNotifier(conf)
|
||||
self._graph_persistor = graph_persistor
|
||||
@ -189,9 +190,9 @@ class Processor(processor.ProcessorBase):
|
||||
vertex, graph_vertex)
|
||||
|
||||
def handle_end_message(self, vertex, neighbors):
|
||||
self.initialization_status.handle_end_message(vertex)
|
||||
self.end_messages_func(vertex[VProps.VITRAGE_TYPE])
|
||||
|
||||
def on_recieved_all_end_messages(self):
|
||||
def start_notifier(self):
|
||||
if self._notifier and self._notifier.enabled:
|
||||
self.entity_graph.subscribe(self._notifier.notify_when_applicable)
|
||||
LOG.info('Graph notifications subscription added')
|
||||
|
73
vitrage/entity_graph/scheduler.py
Normal file
73
vitrage/entity_graph/scheduler.py
Normal file
@ -0,0 +1,73 @@
|
||||
# Copyright 2018 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from futurist import periodics
|
||||
|
||||
from oslo_log import log
|
||||
from vitrage.common.utils import spawn
|
||||
|
||||
from vitrage.entity_graph.consistency.consistency_enforcer import\
|
||||
ConsistencyEnforcer
|
||||
from vitrage.persistency.graph_persistor import GraphPersistor
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Scheduler(object):
|
||||
|
||||
def __init__(self,
|
||||
conf,
|
||||
graph):
|
||||
super(Scheduler, self).__init__()
|
||||
self.conf = conf
|
||||
self.graph = graph
|
||||
self.graph_persistor = GraphPersistor(conf) if \
|
||||
self.conf.persistency.enable_persistency else None
|
||||
self.consistency = ConsistencyEnforcer(conf, graph)
|
||||
self.periodic = None
|
||||
|
||||
def start_periodic_tasks(self):
|
||||
self.periodic = periodics.PeriodicWorker.create(
|
||||
[], executor_factory=lambda: ThreadPoolExecutor(max_workers=10))
|
||||
|
||||
self.add_persistor_timer()
|
||||
self.add_consistency_timer()
|
||||
spawn(self.periodic.start)
|
||||
|
||||
def add_persistor_timer(self):
|
||||
spacing = self.conf.persistency.graph_persistency_interval
|
||||
|
||||
@periodics.periodic(spacing=spacing)
|
||||
def persist():
|
||||
if self.graph_persistor:
|
||||
try:
|
||||
self.graph_persistor.store_graph(graph=self.graph)
|
||||
except Exception as e:
|
||||
LOG.exception('persist failed %s', e)
|
||||
|
||||
self.periodic.add(persist)
|
||||
LOG.info("periodic task - persistor %s", spacing)
|
||||
|
||||
def add_consistency_timer(self):
|
||||
spacing = self.conf.datasources.snapshots_interval
|
||||
|
||||
@periodics.periodic(spacing=spacing)
|
||||
def run_consistency():
|
||||
try:
|
||||
self.consistency.periodic_process()
|
||||
except Exception as e:
|
||||
LOG.exception('run_consistency failed %s', e)
|
||||
|
||||
self.periodic.add(run_consistency)
|
||||
LOG.info("periodic task - run_consistency %s", spacing)
|
@ -1,112 +0,0 @@
|
||||
# Copyright 2016 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from oslo_log import log
|
||||
import time
|
||||
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class VitrageInit(object):
|
||||
STARTED = 'started'
|
||||
RECEIVED_ALL_END_MESSAGES = 'received_all_end_messages'
|
||||
FINISHED = 'finished'
|
||||
|
||||
def __init__(self, conf, graph=None, evaluator=None, template_loader=None):
|
||||
self.conf = conf
|
||||
self.graph = graph
|
||||
self.evaluator = evaluator
|
||||
self.template_loader = template_loader
|
||||
self.status = self.STARTED
|
||||
self.end_messages = {}
|
||||
|
||||
def initializing_process(self, on_end_messages_func):
|
||||
try:
|
||||
LOG.info('Init Started')
|
||||
|
||||
if not self._wait_for_all_end_messages():
|
||||
LOG.warning('Initialization - max retries reached %s',
|
||||
self.end_messages)
|
||||
else:
|
||||
LOG.info('Initialization - All end messages were received')
|
||||
|
||||
on_end_messages_func()
|
||||
|
||||
self.evaluator.start()
|
||||
if self.template_loader:
|
||||
self.template_loader.start()
|
||||
|
||||
# TODO(idan_hefetz) As vitrage is not yet persistent, there aren't
|
||||
# TODO(idan_hefetz) any deduced alarms to be removed during init
|
||||
# if not self._wait_for_action(self.evaluator_queue.empty):
|
||||
# LOG.error('Evaluator Queue Not Empty')
|
||||
# self._mark_old_deduced_alarms_as_deleted(timestamp, self.graph,
|
||||
# self.evaluator_queue)
|
||||
self.status = self.FINISHED
|
||||
|
||||
LOG.info('Init Finished')
|
||||
except Exception as e:
|
||||
LOG.exception('Init Failed: %s', e)
|
||||
|
||||
def handle_end_message(self, vertex):
|
||||
self.end_messages[vertex[VProps.VITRAGE_TYPE]] = True
|
||||
|
||||
if len(self.end_messages) == len(self.conf.datasources.types):
|
||||
self.status = self.RECEIVED_ALL_END_MESSAGES
|
||||
|
||||
def _wait_for_all_end_messages(self):
|
||||
return self._wait_for_action(
|
||||
lambda: self.status == self.RECEIVED_ALL_END_MESSAGES)
|
||||
|
||||
def _wait_for_action(self, function):
|
||||
count_retries = 0
|
||||
while True:
|
||||
if count_retries >= \
|
||||
self.conf.consistency.initialization_max_retries:
|
||||
return False
|
||||
|
||||
if function():
|
||||
return True
|
||||
|
||||
count_retries += 1
|
||||
time.sleep(self.conf.consistency.initialization_interval)
|
||||
|
||||
# def _mark_old_deduced_alarms_as_deleted(self, timestamp,graph,out_queue):
|
||||
# query = {
|
||||
# 'and': [
|
||||
# {'==': {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}},
|
||||
# {'==': {VProps.VITRAGE_TYPE: VProps.VITRAGE_TYPE}},
|
||||
# {'<': {VProps.VITRAGE_SAMPLE_TIMESTAMP: timestamp}}
|
||||
# ]
|
||||
# }
|
||||
# old_deduced_alarms = graph.get_vertices(query_dict=query)
|
||||
# self._push_events_to_queue(old_deduced_alarms,
|
||||
# GraphAction.DELETE_ENTITY,
|
||||
# out_queue)
|
||||
#
|
||||
# def _push_events_to_queue(self, vertices, action, out_queue):
|
||||
# for vertex in vertices:
|
||||
# event = {
|
||||
# DSProps.ENTITY_TYPE: CONSISTENCY_DATASOURCE,
|
||||
# DSProps.DATASOURCE_ACTION: DatasourceAction.UPDATE,
|
||||
# DSProps.SAMPLE_DATE: str(utcnow()),
|
||||
# DSProps.EVENT_TYPE: action,
|
||||
# VProps.VITRAGE_ID: vertex[VProps.VITRAGE_ID],
|
||||
# VProps.ID: vertex.get(VProps.ID, None),
|
||||
# VProps.VITRAGE_TYPE: vertex[VProps.VITRAGE_TYPE],
|
||||
# VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY],
|
||||
# VProps.IS_REAL_VITRAGE_ID: True
|
||||
# }
|
||||
# out_queue.put(event)
|
325
vitrage/entity_graph/workers.py
Normal file
325
vitrage/entity_graph/workers.py
Normal file
@ -0,0 +1,325 @@
|
||||
# Copyright 2018 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import abc
|
||||
import cotyledon
|
||||
import multiprocessing
|
||||
|
||||
from oslo_concurrency import processutils as ps
|
||||
from oslo_log import log
|
||||
|
||||
from vitrage.common.constants import TemplateStatus as TStatus
|
||||
from vitrage.common.constants import TemplateTypes as TType
|
||||
from vitrage.common.exception import VitrageError
|
||||
from vitrage.entity_graph import EVALUATOR_TOPIC
|
||||
from vitrage.evaluator.actions.base import ActionMode
|
||||
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
|
||||
from vitrage.evaluator.scenario_repository import ScenarioRepository
|
||||
from vitrage.messaging import VitrageNotifier
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
# Supported message types
|
||||
GRAPH_UPDATE = 'graph_update'
|
||||
START_EVALUATION = 'start_evaluation'
|
||||
RELOAD_TEMPLATES = 'reload_templates'
|
||||
TEMPLATE_ACTION = 'template_action'
|
||||
|
||||
ADD = 'add'
|
||||
DELETE = 'delete'
|
||||
|
||||
|
||||
class GraphWorkersManager(cotyledon.ServiceManager):
|
||||
"""GraphWorkersManager
|
||||
|
||||
- worker processes
|
||||
- the queues used to communicate with these workers
|
||||
- methods interface to submit tasks to workers
|
||||
"""
|
||||
def __init__(self, conf, entity_graph, db):
|
||||
super(GraphWorkersManager, self).__init__()
|
||||
self._conf = conf
|
||||
self._entity_graph = entity_graph
|
||||
self._db = db
|
||||
self._evaluator_queues = []
|
||||
self._template_queues = []
|
||||
self._all_queues = []
|
||||
self.add_evaluator_workers()
|
||||
self.add_template_workers()
|
||||
|
||||
def add_evaluator_workers(self):
|
||||
"""Add evaluator workers
|
||||
|
||||
Evaluator workers receive all graph updates, hence are updated.
|
||||
Each evaluator worker holds an enabled scenario-evaluator and process
|
||||
every change.
|
||||
Each worker's scenario-evaluator runs different template scenarios.
|
||||
Interface to these workers is:
|
||||
submit_graph_update(..)
|
||||
submit_start_evaluations(..)
|
||||
submit_evaluators_reload_templates(..)
|
||||
"""
|
||||
if self._evaluator_queues:
|
||||
raise VitrageError('add_evaluator_workers called more than once')
|
||||
workers = self._conf.evaluator.workers or ps.get_worker_count()
|
||||
queues = [multiprocessing.JoinableQueue() for i in range(workers)]
|
||||
self.add(EvaluatorWorker,
|
||||
args=(self._conf, queues, self._entity_graph, workers),
|
||||
workers=workers)
|
||||
self._evaluator_queues = queues
|
||||
self._all_queues.extend(queues)
|
||||
|
||||
def add_template_workers(self):
|
||||
"""Add template workers
|
||||
|
||||
Template workers receive all graph updates, hence are updated.
|
||||
Each template worker holds a disabled scenario-evaluator that does
|
||||
not process changes.
|
||||
The scenario-evaluator is enabled when a template add/delete arrives,
|
||||
so this worker will run the added template on the entire graph.
|
||||
Interface to these workers is:
|
||||
submit_graph_update(..)
|
||||
submit_template_event(..)
|
||||
"""
|
||||
if self._template_queues:
|
||||
raise VitrageError('add_template_workers called more than once')
|
||||
workers = 1 # currently more than one worker is not supported
|
||||
queues = [multiprocessing.JoinableQueue() for i in range(workers)]
|
||||
self.add(TemplateLoaderWorker,
|
||||
args=(self._conf, queues, self._entity_graph),
|
||||
workers=workers)
|
||||
self._template_queues = queues
|
||||
self._all_queues.extend(queues)
|
||||
|
||||
def submit_graph_update(self, before, current, is_vertex, *args, **kwargs):
|
||||
"""Graph update all workers
|
||||
|
||||
This method is subscribed to entity graph changes.
|
||||
Per each change in the main entity graph, this method will notify
|
||||
each of the workers, causing them to update their own graph.
|
||||
"""
|
||||
self._submit_and_wait(
|
||||
self._all_queues,
|
||||
(GRAPH_UPDATE, before, current, is_vertex))
|
||||
|
||||
def submit_start_evaluations(self):
|
||||
"""Enable scenario-evaluator in all evaluator workers
|
||||
|
||||
Enables the worker's scenario-evaluator, and run it on the entire graph
|
||||
"""
|
||||
self._submit_and_wait(self._evaluator_queues, (START_EVALUATION,))
|
||||
self._entity_graph.subscribe(self.submit_graph_update)
|
||||
LOG.info('Init Finished')
|
||||
|
||||
def submit_evaluators_reload_templates(self):
|
||||
"""Recreate the scenario-repository in all evaluator workers
|
||||
|
||||
So that new/deleted templates are added/removed
|
||||
"""
|
||||
self._submit_and_wait(self._evaluator_queues, (RELOAD_TEMPLATES,))
|
||||
|
||||
def submit_template_event(self, event):
|
||||
"""Template worker to load the new/deleted template
|
||||
|
||||
Load the template to scenario-evaluator and run it on the entire graph
|
||||
"""
|
||||
template_action = event.get(TEMPLATE_ACTION)
|
||||
|
||||
if template_action == ADD:
|
||||
templates = self._db.templates.query(status=TStatus.LOADING)
|
||||
new_status = TStatus.ACTIVE
|
||||
action_mode = ActionMode.DO
|
||||
elif template_action == DELETE:
|
||||
templates = self._db.templates.query(status=TStatus.DELETING)
|
||||
new_status = TStatus.DELETED
|
||||
action_mode = ActionMode.UNDO
|
||||
else:
|
||||
raise VitrageError('Invalid template_action %s' % template_action)
|
||||
|
||||
self._submit_and_wait(
|
||||
self._template_queues,
|
||||
(
|
||||
TEMPLATE_ACTION,
|
||||
[t.name for t in templates
|
||||
if t.template_type == TType.STANDARD],
|
||||
action_mode,
|
||||
))
|
||||
|
||||
for t in templates:
|
||||
self._db.templates.update(t.uuid, 'status', new_status)
|
||||
|
||||
@staticmethod
|
||||
def _submit_and_wait(queues, payload):
|
||||
for q in queues:
|
||||
q.put(payload)
|
||||
for q in queues:
|
||||
q.join()
|
||||
|
||||
|
||||
class GraphCloneWorkerBase(cotyledon.Service):
|
||||
def __init__(self,
|
||||
worker_id,
|
||||
conf,
|
||||
task_queues,
|
||||
entity_graph):
|
||||
super(GraphCloneWorkerBase, self).__init__(worker_id)
|
||||
self._conf = conf
|
||||
self._task_queue = task_queues[worker_id]
|
||||
self._entity_graph = entity_graph
|
||||
self._running = False
|
||||
|
||||
name = 'GraphCloneWorkerBase'
|
||||
|
||||
@abc.abstractmethod
|
||||
def _init_instance(self):
|
||||
"""This method is executed in the newly created process"""
|
||||
raise NotImplementedError
|
||||
|
||||
def run(self):
|
||||
LOG.info("%s - Starting %s", self.__class__.__name__, self.worker_id)
|
||||
self._running = True
|
||||
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
|
||||
self._init_instance()
|
||||
self._read_queue()
|
||||
|
||||
def terminate(self):
|
||||
self._running = False
|
||||
LOG.info("%s - Stopped!", self.__class__.__name__)
|
||||
|
||||
def _read_queue(self):
|
||||
LOG.debug("%s - reading queue %s",
|
||||
self.__class__.__name__, self.worker_id)
|
||||
while self._running:
|
||||
try:
|
||||
next_task = self._task_queue.get()
|
||||
self.do_task(next_task)
|
||||
except Exception as e:
|
||||
LOG.exception("Graph may not be in sync: exception %s", e)
|
||||
self._task_queue.task_done()
|
||||
|
||||
def do_task(self, task):
|
||||
action = task[0]
|
||||
if action == GRAPH_UPDATE:
|
||||
(action, before, current, is_vertex) = task
|
||||
self._graph_update(before, current, is_vertex)
|
||||
|
||||
def _graph_update(self, before, current, is_vertex):
|
||||
if current:
|
||||
if is_vertex:
|
||||
self._entity_graph.add_vertex(current)
|
||||
else:
|
||||
self._entity_graph.add_edge(current)
|
||||
else:
|
||||
if is_vertex:
|
||||
self._entity_graph.delete_vertex(before)
|
||||
else:
|
||||
self._entity_graph.delete_edge(before)
|
||||
|
||||
|
||||
class EvaluatorWorker(GraphCloneWorkerBase):
|
||||
def __init__(self,
|
||||
worker_id,
|
||||
conf,
|
||||
task_queues,
|
||||
e_graph,
|
||||
workers_num):
|
||||
super(EvaluatorWorker, self).__init__(
|
||||
worker_id, conf, task_queues, e_graph)
|
||||
self._workers_num = workers_num
|
||||
self._evaluator = None
|
||||
|
||||
name = 'EvaluatorWorker'
|
||||
|
||||
def _init_instance(self):
|
||||
scenario_repo = ScenarioRepository(self._conf, self.worker_id,
|
||||
self._workers_num)
|
||||
actions_callback = VitrageNotifier(
|
||||
conf=self._conf,
|
||||
publisher_id='vitrage_evaluator',
|
||||
topics=[EVALUATOR_TOPIC]).notify
|
||||
self._evaluator = ScenarioEvaluator(
|
||||
self._conf,
|
||||
self._entity_graph,
|
||||
scenario_repo,
|
||||
actions_callback,
|
||||
enabled=False)
|
||||
self._evaluator.scenario_repo.log_enabled_scenarios()
|
||||
|
||||
def do_task(self, task):
|
||||
super(EvaluatorWorker, self).do_task(task)
|
||||
action = task[0]
|
||||
if action == START_EVALUATION:
|
||||
self._evaluator.run_evaluator()
|
||||
elif action == RELOAD_TEMPLATES:
|
||||
self._reload_templates()
|
||||
|
||||
def _reload_templates(self):
|
||||
LOG.info("reloading evaluator scenarios")
|
||||
scenario_repo = ScenarioRepository(self._conf, self.worker_id,
|
||||
self._workers_num)
|
||||
self._evaluator.scenario_repo = scenario_repo
|
||||
self._evaluator.scenario_repo.log_enabled_scenarios()
|
||||
|
||||
|
||||
class TemplateLoaderWorker(GraphCloneWorkerBase):
|
||||
def __init__(self,
|
||||
worker_id,
|
||||
conf,
|
||||
task_queues,
|
||||
e_graph):
|
||||
super(TemplateLoaderWorker, self).__init__(worker_id,
|
||||
conf,
|
||||
task_queues,
|
||||
e_graph)
|
||||
self._evaluator = None
|
||||
|
||||
name = 'TemplateLoaderWorker'
|
||||
|
||||
def _init_instance(self):
|
||||
actions_callback = VitrageNotifier(
|
||||
conf=self._conf,
|
||||
publisher_id='vitrage_evaluator',
|
||||
topics=[EVALUATOR_TOPIC]).notify
|
||||
self._evaluator = ScenarioEvaluator(
|
||||
self._conf,
|
||||
self._entity_graph,
|
||||
None,
|
||||
actions_callback,
|
||||
enabled=False)
|
||||
|
||||
def do_task(self, task):
|
||||
super(TemplateLoaderWorker, self).do_task(task)
|
||||
action = task[0]
|
||||
if action == TEMPLATE_ACTION:
|
||||
(action, template_names, action_mode) = task
|
||||
self._template_action(template_names, action_mode)
|
||||
|
||||
def _template_action(self, template_names, action_mode):
|
||||
self._enable_evaluator_templates(template_names)
|
||||
self._evaluator.run_evaluator(action_mode)
|
||||
self._disable_evaluator()
|
||||
|
||||
def _enable_evaluator_templates(self, template_names):
|
||||
scenario_repo = ScenarioRepository(self._conf)
|
||||
for s in scenario_repo._all_scenarios:
|
||||
s.enabled = False
|
||||
for template_name in template_names:
|
||||
if s.id.startswith(template_name):
|
||||
s.enabled = True
|
||||
self._evaluator.scenario_repo = scenario_repo
|
||||
self._evaluator.scenario_repo.log_enabled_scenarios()
|
||||
self._evaluator.enabled = True
|
||||
|
||||
def _disable_evaluator(self):
|
||||
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
|
||||
self._evaluator.enabled = False
|
@ -1,109 +0,0 @@
|
||||
# Copyright 2017 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import multiprocessing
|
||||
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_log import log
|
||||
|
||||
from vitrage.entity_graph import EVALUATOR_TOPIC
|
||||
from vitrage.entity_graph.graph_clone import base
|
||||
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
|
||||
from vitrage.evaluator.scenario_repository import ScenarioRepository
|
||||
from vitrage.messaging import VitrageNotifier
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
START_EVALUATION = 'start_evaluation'
|
||||
RELOAD_TEMPLATES = 'reload_templates'
|
||||
|
||||
|
||||
class EvaluatorManager(base.GraphCloneManagerBase):
|
||||
|
||||
def __init__(self, conf, entity_graph):
|
||||
workers_num = conf.evaluator.workers or processutils.get_worker_count()
|
||||
super(EvaluatorManager, self).__init__(conf, entity_graph, workers_num)
|
||||
|
||||
def before_subscribe(self):
|
||||
self.start_evaluations()
|
||||
|
||||
def _run_worker(self, worker_index, workers_num):
|
||||
"""Create an EvaluatorWorker and it's task queue
|
||||
|
||||
The new worker is initialized with a scenario repository
|
||||
that only contains a portion of the templates
|
||||
"""
|
||||
|
||||
tasks_queue = multiprocessing.JoinableQueue()
|
||||
w = EvaluatorWorker(
|
||||
self._conf,
|
||||
tasks_queue,
|
||||
self._entity_graph,
|
||||
worker_index,
|
||||
workers_num)
|
||||
self._p_launcher.launch_service(w)
|
||||
return tasks_queue
|
||||
|
||||
def start_evaluations(self):
|
||||
self._notify_and_wait((START_EVALUATION,))
|
||||
|
||||
def reload_evaluators_templates(self):
|
||||
self._notify_and_wait((RELOAD_TEMPLATES,))
|
||||
|
||||
|
||||
class EvaluatorWorker(base.GraphCloneWorkerBase):
|
||||
def __init__(self,
|
||||
conf,
|
||||
task_queue,
|
||||
e_graph,
|
||||
worker_index,
|
||||
workers_num):
|
||||
super(EvaluatorWorker, self).__init__(conf, task_queue, e_graph)
|
||||
self._worker_index = worker_index
|
||||
self._workers_num = workers_num
|
||||
self._evaluator = None
|
||||
|
||||
def name(self):
|
||||
return "(%s)" % str(self._worker_index)
|
||||
|
||||
def start(self):
|
||||
super(EvaluatorWorker, self).start()
|
||||
scenario_repo = ScenarioRepository(self._conf, self._worker_index,
|
||||
self._workers_num)
|
||||
actions_callback = VitrageNotifier(
|
||||
conf=self._conf,
|
||||
publisher_id='vitrage_evaluator',
|
||||
topics=[EVALUATOR_TOPIC]).notify
|
||||
self._evaluator = ScenarioEvaluator(
|
||||
self._conf,
|
||||
self._entity_graph,
|
||||
scenario_repo,
|
||||
actions_callback,
|
||||
enabled=False)
|
||||
self._evaluator.scenario_repo.log_enabled_scenarios()
|
||||
|
||||
def do_task(self, task):
|
||||
super(EvaluatorWorker, self).do_task(task)
|
||||
action = task[0]
|
||||
if action == START_EVALUATION:
|
||||
self._evaluator.run_evaluator()
|
||||
elif action == RELOAD_TEMPLATES:
|
||||
self._reload_templates()
|
||||
|
||||
def _reload_templates(self):
|
||||
scenario_repo = ScenarioRepository(self._conf, self._worker_index,
|
||||
self._workers_num)
|
||||
self._evaluator.scenario_repo = scenario_repo
|
||||
LOG.info("reloading evaluator scenarios")
|
||||
self._evaluator.scenario_repo.log_enabled_scenarios()
|
@ -1,122 +0,0 @@
|
||||
# Copyright 2017 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import multiprocessing
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from vitrage.common.constants import TemplateStatus as TStatus
|
||||
from vitrage.common.constants import TemplateTypes as TType
|
||||
from vitrage.common.exception import VitrageError
|
||||
from vitrage.entity_graph import EVALUATOR_TOPIC
|
||||
from vitrage.entity_graph.graph_clone import base
|
||||
from vitrage.evaluator.actions.base import ActionMode
|
||||
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
|
||||
from vitrage.evaluator.scenario_repository import ScenarioRepository
|
||||
from vitrage.messaging import VitrageNotifier
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
TEMPLATE_ACTION = 'template_action'
|
||||
ADD = 'add'
|
||||
DELETE = 'delete'
|
||||
|
||||
|
||||
class TemplateLoaderManager(base.GraphCloneManagerBase):
|
||||
|
||||
def __init__(self, conf, entity_graph, db):
|
||||
super(TemplateLoaderManager, self).__init__(conf, entity_graph, 1)
|
||||
self._db = db
|
||||
|
||||
def _run_worker(self, worker_index, workers_num):
|
||||
tasks_queue = multiprocessing.JoinableQueue()
|
||||
w = TemplateLoaderWorker(
|
||||
self._conf,
|
||||
tasks_queue,
|
||||
self._entity_graph)
|
||||
self._p_launcher.launch_service(w)
|
||||
return tasks_queue
|
||||
|
||||
def handle_template_event(self, event):
|
||||
template_action = event.get(TEMPLATE_ACTION)
|
||||
|
||||
if template_action == ADD:
|
||||
templates = self._db.templates.query(status=TStatus.LOADING)
|
||||
new_status = TStatus.ACTIVE
|
||||
action_mode = ActionMode.DO
|
||||
elif template_action == DELETE:
|
||||
templates = self._db.templates.query(status=TStatus.DELETING)
|
||||
new_status = TStatus.DELETED
|
||||
action_mode = ActionMode.UNDO
|
||||
else:
|
||||
raise VitrageError('Invalid template_action %s' % template_action)
|
||||
|
||||
self._template_worker_task(
|
||||
[t.name for t in templates if t.template_type == TType.STANDARD],
|
||||
action_mode)
|
||||
|
||||
for t in templates:
|
||||
self._db.templates.update(t.uuid, 'status', new_status)
|
||||
|
||||
def _template_worker_task(self, template_names, action_mode):
|
||||
self._notify_and_wait((TEMPLATE_ACTION, template_names, action_mode))
|
||||
|
||||
|
||||
class TemplateLoaderWorker(base.GraphCloneWorkerBase):
|
||||
def __init__(self,
|
||||
conf,
|
||||
task_queue,
|
||||
e_graph):
|
||||
super(TemplateLoaderWorker, self).__init__(conf, task_queue, e_graph)
|
||||
self._evaluator = None
|
||||
|
||||
def start(self):
|
||||
super(TemplateLoaderWorker, self).start()
|
||||
actions_callback = VitrageNotifier(
|
||||
conf=self._conf,
|
||||
publisher_id='vitrage_evaluator',
|
||||
topics=[EVALUATOR_TOPIC]).notify
|
||||
self._evaluator = ScenarioEvaluator(
|
||||
self._conf,
|
||||
self._entity_graph,
|
||||
None,
|
||||
actions_callback,
|
||||
enabled=False)
|
||||
|
||||
def do_task(self, task):
|
||||
super(TemplateLoaderWorker, self).do_task(task)
|
||||
action = task[0]
|
||||
if action == TEMPLATE_ACTION:
|
||||
(action, template_names, action_mode) = task
|
||||
self._template_action(template_names, action_mode)
|
||||
|
||||
def _template_action(self, template_names, action_mode):
|
||||
self._enable_evaluator_templates(template_names)
|
||||
self._evaluator.run_evaluator(action_mode)
|
||||
self._disable_evaluator()
|
||||
|
||||
def _enable_evaluator_templates(self, template_names):
|
||||
scenario_repo = ScenarioRepository(self._conf)
|
||||
for s in scenario_repo._all_scenarios:
|
||||
s.enabled = False
|
||||
for template_name in template_names:
|
||||
if s.id.startswith(template_name):
|
||||
s.enabled = True
|
||||
self._evaluator.scenario_repo = scenario_repo
|
||||
self._evaluator.scenario_repo.log_enabled_scenarios()
|
||||
self._evaluator.enabled = True
|
||||
|
||||
def _disable_evaluator(self):
|
||||
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
|
||||
self._evaluator.enabled = False
|
@ -35,6 +35,7 @@ class GraphPersistor(object):
|
||||
self.last_event_timestamp = datetime.datetime.utcnow()
|
||||
|
||||
def store_graph(self, graph):
|
||||
LOG.info('Graph persistency running..')
|
||||
try:
|
||||
graph_snapshot = graph.write_gpickle()
|
||||
db_row = models.GraphSnapshot(
|
||||
|
@ -14,7 +14,6 @@
|
||||
|
||||
from datetime import timedelta
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from oslo_config import cfg
|
||||
from six.moves import queue
|
||||
@ -30,9 +29,6 @@ from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE
|
||||
from vitrage.entity_graph.consistency.consistency_enforcer \
|
||||
import ConsistencyEnforcer
|
||||
from vitrage.entity_graph.processor.processor import Processor
|
||||
from vitrage.entity_graph.vitrage_init import VitrageInit
|
||||
from vitrage.evaluator.actions.evaluator_event_transformer \
|
||||
import VITRAGE_DATASOURCE
|
||||
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
|
||||
from vitrage.evaluator.scenario_repository import ScenarioRepository
|
||||
from vitrage.graph.driver.networkx_graph import NXGraph
|
||||
@ -81,9 +77,7 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration):
|
||||
cls.add_db(cls.conf)
|
||||
cls.load_datasources(cls.conf)
|
||||
cls.graph = NXGraph("Entity Graph")
|
||||
cls.initialization_status = VitrageInit(cls.conf, cls.graph)
|
||||
cls.processor = Processor(cls.conf, cls.initialization_status,
|
||||
cls.graph)
|
||||
cls.processor = Processor(cls.conf, lambda x: x, cls.graph)
|
||||
|
||||
cls.event_queue = queue.Queue()
|
||||
|
||||
@ -104,68 +98,8 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration):
|
||||
actions_callback)
|
||||
cls.consistency_enforcer = ConsistencyEnforcer(
|
||||
cls.conf,
|
||||
actions_callback,
|
||||
cls.processor.entity_graph)
|
||||
|
||||
@unittest.skip("test_initializing_process skipping")
|
||||
def test_initializing_process(self):
|
||||
# Setup
|
||||
num_of_host_alarms = self.NUM_HOSTS - 2
|
||||
num_instances_per_host = 4
|
||||
self._create_processor_with_graph(self.conf, processor=self.processor)
|
||||
self._add_alarms()
|
||||
self._set_end_messages()
|
||||
self.assertThat(self.processor.entity_graph.get_vertices(),
|
||||
matchers.HasLength(
|
||||
self._num_total_expected_vertices() +
|
||||
num_of_host_alarms + self.NUM_INSTANCES)
|
||||
)
|
||||
|
||||
# Action
|
||||
# eventlet.spawn(self._process_events)
|
||||
# processor_thread = threading.Thread(target=self._process_events)
|
||||
# processor_thread.start()
|
||||
self.initialization_status.initializing_process(
|
||||
self.processor.on_recieved_all_end_messages)
|
||||
self._process_events()
|
||||
|
||||
# Test Assertions
|
||||
num_correct_alarms = num_of_host_alarms + \
|
||||
num_of_host_alarms * num_instances_per_host
|
||||
num_undeleted_vertices_in_graph = \
|
||||
len(self.processor.entity_graph.get_vertices(vertex_attr_filter={
|
||||
VProps.VITRAGE_IS_DELETED: False
|
||||
}))
|
||||
self.assertEqual(self._num_total_expected_vertices() +
|
||||
num_correct_alarms,
|
||||
num_undeleted_vertices_in_graph)
|
||||
|
||||
alarm_vertices_in_graph = self.processor.entity_graph.get_vertices({
|
||||
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
|
||||
VProps.VITRAGE_IS_DELETED: False
|
||||
})
|
||||
self.assertThat(alarm_vertices_in_graph,
|
||||
matchers.HasLength(num_correct_alarms))
|
||||
|
||||
is_deleted_alarm_vertices_in_graph = \
|
||||
self.processor.entity_graph.get_vertices({
|
||||
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
|
||||
VProps.VITRAGE_IS_DELETED: True
|
||||
})
|
||||
self.assertEqual(is_deleted_alarm_vertices_in_graph,
|
||||
matchers.HasLength(
|
||||
num_of_host_alarms * num_instances_per_host)
|
||||
)
|
||||
|
||||
instance_vertices = self.processor.entity_graph.get_vertices({
|
||||
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
|
||||
VProps.VITRAGE_TYPE: VITRAGE_DATASOURCE,
|
||||
VProps.VITRAGE_IS_DELETED: False
|
||||
})
|
||||
self.assertThat(instance_vertices,
|
||||
matchers.HasLength(
|
||||
num_of_host_alarms * num_instances_per_host)
|
||||
)
|
||||
cls.processor.entity_graph,
|
||||
actions_callback)
|
||||
|
||||
def test_periodic_process(self):
|
||||
# Setup
|
||||
@ -234,8 +168,6 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration):
|
||||
self.initialization_status.end_messages[NOVA_INSTANCE_DATASOURCE] = \
|
||||
True
|
||||
self.initialization_status.end_messages[NAGIOS_DATASOURCE] = True
|
||||
self.initialization_status.status = \
|
||||
self.initialization_status.RECEIVED_ALL_END_MESSAGES
|
||||
|
||||
def _add_alarms(self):
|
||||
# find hosts and instances
|
||||
|
@ -25,7 +25,6 @@ from vitrage.datasources.nova.host import NOVA_HOST_DATASOURCE
|
||||
from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
|
||||
from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE
|
||||
from vitrage.entity_graph.processor import processor as proc
|
||||
from vitrage.entity_graph.vitrage_init import VitrageInit
|
||||
from vitrage.graph.driver.networkx_graph import NXGraph
|
||||
import vitrage.graph.utils as graph_utils
|
||||
from vitrage.opts import register_opts
|
||||
@ -128,8 +127,7 @@ class TestEntityGraphUnitBase(base.BaseTest):
|
||||
@staticmethod
|
||||
def create_processor_and_graph(conf):
|
||||
e_graph = NXGraph("Entity Graph")
|
||||
init = VitrageInit(conf)
|
||||
return proc.Processor(conf, init, e_graph)
|
||||
return proc.Processor(conf, e_graph=e_graph)
|
||||
|
||||
@staticmethod
|
||||
def _create_event(spec_type=None,
|
||||
|
@ -15,7 +15,7 @@
|
||||
from oslo_config import cfg
|
||||
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
from vitrage.entity_graph import transformer_manager
|
||||
from vitrage.entity_graph.processor import transformer_manager
|
||||
from vitrage.graph import driver as graph
|
||||
from vitrage.tests.unit.entity_graph.base import TestEntityGraphUnitBase
|
||||
|
||||
|
@ -13,15 +13,15 @@
|
||||
# under the License.
|
||||
import threading
|
||||
|
||||
from vitrage.entity_graph.service import TwoPriorityListener
|
||||
from vitrage.entity_graph.graph_init import EventsCoordination
|
||||
from vitrage.tests import base
|
||||
|
||||
|
||||
class TwoPriorityListenerTest(base.BaseTest):
|
||||
class EventsCoordinationTest(base.BaseTest):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TwoPriorityListenerTest, cls).setUpClass()
|
||||
super(EventsCoordinationTest, cls).setUpClass()
|
||||
cls.calc_result = 0
|
||||
|
||||
def do_work(self, x):
|
||||
@ -39,7 +39,7 @@ class TwoPriorityListenerTest(base.BaseTest):
|
||||
the result should be the number of low priority calls.
|
||||
0*(2^n) + 1*n
|
||||
"""
|
||||
priority_listener = TwoPriorityListener(None, self.do_work, None, None)
|
||||
priority_listener = EventsCoordination(None, self.do_work, None, None)
|
||||
|
||||
def write_high():
|
||||
for i in range(10000):
|
||||
|
@ -22,7 +22,8 @@ from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
|
||||
from vitrage.datasources.nova.instance.transformer import InstanceTransformer
|
||||
from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE
|
||||
from vitrage.datasources.nova.zone.transformer import ZoneTransformer
|
||||
from vitrage.entity_graph.transformer_manager import TransformerManager
|
||||
from vitrage.entity_graph.processor.transformer_manager import\
|
||||
TransformerManager
|
||||
from vitrage.opts import register_opts
|
||||
from vitrage.tests import base
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user