Merge "Template loading moved from TemplateWorker to EvaluatorWorker"

This commit is contained in:
Zuul 2018-11-14 11:02:19 +00:00 committed by Gerrit Code Review
commit 8573746e86

View File

@ -70,7 +70,6 @@ class GraphWorkersManager(cotyledon.ServiceManager):
self._all_queues = [] self._all_queues = []
self.register_hooks(on_terminate=self._stop) self.register_hooks(on_terminate=self._stop)
self.add_evaluator_workers() self.add_evaluator_workers()
self.add_template_workers()
self.add_api_workers() self.add_api_workers()
def add_evaluator_workers(self): def add_evaluator_workers(self):
@ -95,28 +94,6 @@ class GraphWorkersManager(cotyledon.ServiceManager):
self._evaluator_queues = queues self._evaluator_queues = queues
self._all_queues.extend(queues) self._all_queues.extend(queues)
def add_template_workers(self):
"""Add template workers
Template workers receive all graph updates, hence are updated.
Each template worker holds a disabled scenario-evaluator that does
not process changes.
The scenario-evaluator is enabled when a template add/delete arrives,
so this worker will run the added template on the entire graph.
Interface to these workers is:
submit_graph_update(..)
submit_template_event(..)
"""
if self._template_queues:
raise VitrageError('add_template_workers called more than once')
workers = 1 # currently more than one worker is not supported
queues = [multiprocessing.JoinableQueue() for i in range(workers)]
self.add(TemplateLoaderWorker,
args=(self._conf, queues, self._entity_graph),
workers=workers)
self._template_queues = queues
self._all_queues.extend(queues)
def add_api_workers(self): def add_api_workers(self):
"""Add Api workers """Add Api workers
@ -185,8 +162,9 @@ class GraphWorkersManager(cotyledon.ServiceManager):
else: else:
raise VitrageError('Invalid template_action %s' % template_action) raise VitrageError('Invalid template_action %s' % template_action)
# Template event will be handled by a single evaluator worker
self._submit_and_wait( self._submit_and_wait(
self._template_queues, [self._evaluator_queues[0]],
( (
TEMPLATE_ACTION, TEMPLATE_ACTION,
[t.name for t in templates [t.name for t in templates
@ -304,6 +282,9 @@ class EvaluatorWorker(GraphCloneWorkerBase):
self._evaluator.enabled = True self._evaluator.enabled = True
elif action == RELOAD_TEMPLATES: elif action == RELOAD_TEMPLATES:
self._reload_templates() self._reload_templates()
elif action == TEMPLATE_ACTION:
(action, template_names, action_mode) = task
self._template_action(template_names, action_mode)
def _reload_templates(self): def _reload_templates(self):
LOG.info("reloading evaluator scenarios") LOG.info("reloading evaluator scenarios")
@ -312,46 +293,10 @@ class EvaluatorWorker(GraphCloneWorkerBase):
self._evaluator.scenario_repo = scenario_repo self._evaluator.scenario_repo = scenario_repo
self._evaluator.scenario_repo.log_enabled_scenarios() self._evaluator.scenario_repo.log_enabled_scenarios()
class TemplateLoaderWorker(GraphCloneWorkerBase):
def __init__(self,
worker_id,
conf,
task_queues,
e_graph):
super(TemplateLoaderWorker, self).__init__(worker_id,
conf,
task_queues,
e_graph)
self._evaluator = None
name = 'TemplateLoaderWorker'
def _init_instance(self):
actions_callback = messaging.VitrageNotifier(
conf=self._conf,
publisher_id='vitrage_evaluator',
topics=[EVALUATOR_TOPIC]).notify
self._evaluator = ScenarioEvaluator(
self._conf,
self._entity_graph,
None,
actions_callback,
enabled=False)
def do_task(self, task):
super(TemplateLoaderWorker, self).do_task(task)
action = task[0]
if action == TEMPLATE_ACTION:
(action, template_names, action_mode) = task
self._template_action(template_names, action_mode)
def _template_action(self, template_names, action_mode): def _template_action(self, template_names, action_mode):
self._enable_evaluator_templates(template_names) # Here, we create a temporary ScenarioRepo to execute the needed
self._evaluator.run_evaluator(action_mode) # templates. Once _reload_templates is called, it will create a
self._disable_evaluator() # non temporary ScenarioRepo, to replace this one
def _enable_evaluator_templates(self, template_names):
scenario_repo = ScenarioRepository(self._conf) scenario_repo = ScenarioRepository(self._conf)
for s in scenario_repo._all_scenarios: for s in scenario_repo._all_scenarios:
s.enabled = False s.enabled = False
@ -360,11 +305,7 @@ class TemplateLoaderWorker(GraphCloneWorkerBase):
s.enabled = True s.enabled = True
self._evaluator.scenario_repo = scenario_repo self._evaluator.scenario_repo = scenario_repo
self._evaluator.scenario_repo.log_enabled_scenarios() self._evaluator.scenario_repo.log_enabled_scenarios()
self._evaluator.enabled = True self._evaluator.run_evaluator(action_mode)
def _disable_evaluator(self):
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
self._evaluator.enabled = False
class ApiWorker(GraphCloneWorkerBase): class ApiWorker(GraphCloneWorkerBase):