diff --git a/vitrage/entity_graph/consistency/consistency_enforcer.py b/vitrage/entity_graph/consistency/consistency_enforcer.py index ffad08144..5da41c007 100644 --- a/vitrage/entity_graph/consistency/consistency_enforcer.py +++ b/vitrage/entity_graph/consistency/consistency_enforcer.py @@ -100,6 +100,7 @@ class ConsistencyEnforcer(object): return self._filter_vertices_to_be_deleted(vertices) def _push_events_to_queue(self, vertices, action): + events = [] for vertex in vertices: event = { DSProps.ENTITY_TYPE: CONSISTENCY_DATASOURCE, @@ -112,7 +113,8 @@ class ConsistencyEnforcer(object): VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY], VProps.IS_REAL_VITRAGE_ID: True } - self.actions_callback('consistency', event) + events.append(event) + self.actions_callback('consistency', events) @staticmethod def _filter_vertices_to_be_deleted(vertices): diff --git a/vitrage/entity_graph/graph_init.py b/vitrage/entity_graph/graph_init.py index 0229e9dea..2d6ae7be1 100644 --- a/vitrage/entity_graph/graph_init.py +++ b/vitrage/entity_graph/graph_init.py @@ -93,11 +93,15 @@ class VitrageGraphInit(object): self.events_coordination.start() def process_event(self, event): - if event.get('template_action'): + if isinstance(event, list): + for e in event: + self.processor.process_event(e) + elif event.get('template_action'): self.workers.submit_template_event(event) self.workers.submit_evaluators_reload_templates() else: self.processor.process_event(event) + self.persist.flush_events() def _recreate_transformers_id_cache(self): for v in self.graph.get_vertices(): diff --git a/vitrage/entity_graph/graph_persistency.py b/vitrage/entity_graph/graph_persistency.py index 9dbdd493c..e3725d4ae 100644 --- a/vitrage/entity_graph/graph_persistency.py +++ b/vitrage/entity_graph/graph_persistency.py @@ -28,6 +28,7 @@ class GraphPersistency(object): self.conf = conf self.db = db self.graph = graph + self.events_buffer = [] def store_graph(self): LOG.info('Persisting graph...') @@ -91,7 +92,13 @@ class GraphPersistency(object): event_row = models.Event(payload=curr, is_vertex=is_vertex, event_id=event_id) - self.db.events.create(event_row) + self.events_buffer.append(event_row) + + def flush_events(self): + if not self.events_buffer: + return + self.db.events.bulk_create(self.events_buffer) + self.events_buffer = [] @staticmethod def is_important_change(before, curr, *args): diff --git a/vitrage/entity_graph/workers.py b/vitrage/entity_graph/workers.py index beb9582af..4546348eb 100644 --- a/vitrage/entity_graph/workers.py +++ b/vitrage/entity_graph/workers.py @@ -18,7 +18,6 @@ import cotyledon import multiprocessing import multiprocessing.queues import os -from oslo_concurrency import processutils as ps from oslo_log import log import oslo_messaging from oslo_utils import uuidutils @@ -94,7 +93,7 @@ class GraphWorkersManager(cotyledon.ServiceManager): """ if self._evaluator_queues: raise VitrageError('add_evaluator_workers called more than once') - workers = self._conf.evaluator.workers or ps.get_worker_count() + workers = self._conf.evaluator.workers queues = [multiprocessing.JoinableQueue() for i in range(workers)] self.add(EvaluatorWorker, args=(self._conf, queues, workers), diff --git a/vitrage/evaluator/__init__.py b/vitrage/evaluator/__init__.py index 0796b239a..3cbb05479 100644 --- a/vitrage/evaluator/__init__.py +++ b/vitrage/evaluator/__init__.py @@ -19,12 +19,10 @@ from vitrage.evaluator.template_schemas import init_template_schemas # Register options for the service OPTS = [ cfg.IntOpt('workers', - default=None, + default=1, min=1, max=32, - help='Number of workers for template evaluator, default is ' - 'equal to the number of CPUs available if that can be ' - 'determined, else a default worker count of 1 is returned.' + help='Number of workers for template evaluator.' ), ] diff --git a/vitrage/evaluator/actions/action_executor.py b/vitrage/evaluator/actions/action_executor.py index ed01ed382..6ddb0dcfe 100644 --- a/vitrage/evaluator/actions/action_executor.py +++ b/vitrage/evaluator/actions/action_executor.py @@ -45,6 +45,8 @@ from vitrage.utils import datetime as datetime_utils LOG = log.getLogger(__name__) EVALUATOR_EVENT = 'evaluator.event' +TARGET = 'target' +SOURCE = 'source' class ActionExecutor(object): @@ -64,7 +66,17 @@ class ActionExecutor(object): EXECUTE_EXTERNAL: self._execute_external, } - def execute(self, action_spec, action_mode): + def execute(self, actions): + if not actions: + return + + events = [] + for action in actions: + LOG.info('Action: %s', self._action_str(action)) + events.extend(self._execute(action.specs, action.mode)) + self.actions_callback(EVALUATOR_EVENT, events) + + def _execute(self, action_spec, action_mode): action_recipe = self.action_recipes[action_spec.type] if action_mode == ActionMode.DO: @@ -72,8 +84,10 @@ class ActionExecutor(object): else: steps = action_recipe.get_undo_recipe(action_spec) + events = [] for step in steps: - self.action_step_defs[step.type](step.params) + events.append(self.action_step_defs[step.type](step.params)) + return events def _add_vertex(self, params): @@ -81,7 +95,7 @@ class ActionExecutor(object): ActionExecutor._add_default_properties(event) event[EVALUATOR_EVENT_TYPE] = ADD_VERTEX - self.actions_callback(EVALUATOR_EVENT, event) + return event def _update_vertex(self, params): @@ -89,14 +103,14 @@ class ActionExecutor(object): ActionExecutor._add_default_properties(event) event[EVALUATOR_EVENT_TYPE] = UPDATE_VERTEX - self.actions_callback(EVALUATOR_EVENT, event) + return event def _remove_vertex(self, params): event = copy.deepcopy(params) ActionExecutor._add_default_properties(event) event[EVALUATOR_EVENT_TYPE] = REMOVE_VERTEX - self.actions_callback(EVALUATOR_EVENT, event) + return event def _add_edge(self, params): @@ -104,7 +118,7 @@ class ActionExecutor(object): ActionExecutor._add_default_properties(event) event[EVALUATOR_EVENT_TYPE] = ADD_EDGE - self.actions_callback(EVALUATOR_EVENT, event) + return event def _remove_edge(self, params): @@ -112,7 +126,7 @@ class ActionExecutor(object): ActionExecutor._add_default_properties(event) event[EVALUATOR_EVENT_TYPE] = REMOVE_EDGE - self.actions_callback(EVALUATOR_EVENT, event) + return event def _execute_external(self, params): @@ -158,3 +172,11 @@ class ActionExecutor(object): "%s.%s" % (ExecuteMistral.__module__, ExecuteMistral.__name__)) return recipes + + @staticmethod + def _action_str(action): + s = action.specs.targets.get(SOURCE, {}).get(VProps.VITRAGE_ID, '') + t = action.specs.targets.get(TARGET, {}).get(VProps.VITRAGE_ID, '') + return '%s %s \'%s\' targets (%s,%s)' % (action.mode.upper(), + action.specs.type, + action.action_id, s, t) diff --git a/vitrage/evaluator/scenario_evaluator.py b/vitrage/evaluator/scenario_evaluator.py index 1beab061e..5467e1bd1 100644 --- a/vitrage/evaluator/scenario_evaluator.py +++ b/vitrage/evaluator/scenario_evaluator.py @@ -39,6 +39,8 @@ from vitrage.graph.algo_driver.sub_graph_matching import \ NEG_CONDITION from vitrage.graph.driver import Vertex from vitrage import storage +from vitrage.storage.sqlalchemy import models +from vitrage.utils.datetime import utcnow LOG = log.getLogger(__name__) @@ -65,12 +67,10 @@ class ScenarioEvaluator(object): enabled=False): self._conf = conf self._entity_graph = e_graph - self._db_connection = storage.get_connection_from_config(self._conf) + self._db = storage.get_connection_from_config(self._conf) self._scenario_repo = scenario_repo self._action_executor = ActionExecutor(self._conf, actions_callback) self._entity_graph.subscribe(self.process_event) - self._active_actions_tracker = ActiveActionsTracker( - self._conf, self._db_connection) self.enabled = enabled self.connected_component_cache = defaultdict(dict) @@ -138,10 +138,7 @@ class ScenarioEvaluator(object): LOG.exception("Evaluator error, will not execute actions %s", str(actions)) - for action in actions_to_preform: - LOG.info('Action: %s', self._action_str(action)) - self._action_executor.execute(action.specs, action.mode) - + self._action_executor.execute(actions_to_preform) LOG.debug('Process event - completed') def _get_element_scenarios(self, element, is_vertex): @@ -314,33 +311,23 @@ class ScenarioEvaluator(object): def _analyze_and_filter_actions(self, actions): LOG.debug("Actions before filtering: %s", actions) + if not actions: + return [] - actions_to_perform = [] + active_actions = ActiveActionsTracker(self._conf, self._db, actions) for action_info in actions: if action_info.mode == ActionMode.DO: - is_highest_score, exists = \ - self._active_actions_tracker.calc_do_action(action_info) - if is_highest_score and not exists: - actions_to_perform.append(action_info) + active_actions.calc_do_action(action_info) elif action_info.mode == ActionMode.UNDO: - is_highest_score, second_highest = \ - self._active_actions_tracker.calc_undo_action(action_info) - if is_highest_score: - # We should 'DO' the Second highest scored action so - # to override the existing dominant action. - # or, if there is no second highest scored action - # So we just 'UNDO' the existing dominant action - if second_highest: - action_to_perform = self._db_action_to_action_info( - second_highest) - actions_to_perform.append(action_to_perform) - else: + active_actions.calc_undo_action(action_info) - actions_to_perform.append(action_info) + active_actions.flush_db_updates() unique_ordered_actions = OrderedDict() - for action in actions_to_perform: - id_ = ScenarioEvaluator._generate_action_id(action.specs) + for action in active_actions.actions_to_perform: + if isinstance(action, models.ActiveAction): + action = self._db_action_to_action_info(action) + id_ = self._generate_action_id(action.specs) unique_ordered_actions[id_] = action return unique_ordered_actions.values() @@ -472,14 +459,6 @@ class ScenarioEvaluator(object): for v_id in ver_to_remove: del match[v_id] - @staticmethod - def _action_str(action): - s = action.specs.targets.get(SOURCE, {}).get(VProps.VITRAGE_ID, '') - t = action.specs.targets.get(TARGET, {}).get(VProps.VITRAGE_ID, '') - return '%s %s \'%s\' targets (%s,%s)' % (action.mode.upper(), - action.specs.type, - action.action_id, s, t) - class ActiveActionsTracker(object): """Keeps track of all active actions and relative dominance/priority. @@ -502,13 +481,31 @@ class ActiveActionsTracker(object): The score is used to determine which action in each group of similar actions to be executed next. """ + action_tools = None - def __init__(self, conf, db_connection): + def __init__(self, conf, db, actions): + self.db = db + self.data = defaultdict(set) + self.actions_to_create = {} + self.actions_to_remove = set() + self.actions_to_perform = [] # use a list to keep the insertion order + self._init_action_tools(conf) + + # Query DB for all actions with same properties + actions_keys = set([self._get_key(action) for action in actions]) + db_rows = self.db.active_actions.query_similar(actions_keys) or [] + for db_row in db_rows: + self.data[(db_row.source_vertex_id, db_row.target_vertex_id, + db_row.extra_info, db_row.action_type)].add(db_row) + + @classmethod + def _init_action_tools(cls, conf): + if cls.action_tools: + return info_mapper = DatasourceInfoMapper(conf) - self._db = db_connection alarms_score = info_mapper.get_datasource_priorities('vitrage') all_scores = info_mapper.get_datasource_priorities() - self._action_tools = { + cls.action_tools = { ActionType.SET_STATE: pt.SetStateTools(all_scores), ActionType.RAISE_ALARM: pt.RaiseAlarmTools(alarms_score), ActionType.ADD_CAUSAL_RELATIONSHIP: pt.BaselineTools, @@ -523,50 +520,78 @@ class ActiveActionsTracker(object): Only a top scored action that is new should be performed :return: (is top score, is it already existing) """ - # TODO(idan_hefetz): DB read and write not in a transaction - active_actions = self._query_similar_actions(action_info) + similar_actions = self._get_similar(action_info) exists = any( a.action_id == action_info.action_id and - a.trigger == action_info.trigger_id for a in active_actions) + a.trigger == action_info.trigger_id for a in similar_actions) if not exists: - db_row = self._to_db_row(action_info) - active_actions.append(db_row) - LOG.debug("DB Insert active_actions %s", str(db_row)) - self._db.active_actions.create(db_row) - - return self._is_highest_score(active_actions, action_info), exists + self._add(action_info) + if not exists and self._is_highest_score(similar_actions, action_info): + self.actions_to_perform.append(action_info) def calc_undo_action(self, action_info): """Delete this action form active_actions table, if exists - return value to help decide if action should be performed + decide if action should be performed A top scored action should be 'undone' if there is not a second action. If there is a second, it should now be 'done' and become the dominant :param action_info: action to delete - :return: is_highest_score, second highest action if exists """ - # TODO(idan_hefetz): DB read and write not in a transaction - active_actions = self._query_similar_actions(action_info) + similar_actions = self._get_similar(action_info) + if not self._is_highest_score(similar_actions, action_info): + self._remove(action_info) + return - LOG.debug("DB delete active_actions %s %s", - action_info.action_id, - str(action_info.trigger_id)) - self._db.active_actions.delete( - action_id=action_info.action_id, - trigger=action_info.trigger_id) - - is_highest_score = self._is_highest_score(active_actions, action_info) - if is_highest_score and len(active_actions) > 1: - return is_highest_score, self._sort_db_actions(active_actions)[1] + second_highest = self._sort_db_actions(similar_actions)[1]\ + if len(similar_actions) > 1 else None + # We should 'DO' the Second highest scored action so + # to override the existing dominant action. + # or, if there is no second highest scored action + # So we just 'UNDO' the existing dominant action + if second_highest: + self.actions_to_perform.append(second_highest) else: - return is_highest_score, None + self.actions_to_perform.append(action_info) + self._remove(action_info) + + def flush_db_updates(self): + self.db.active_actions.bulk_create(self.actions_to_create.values()) + self.db.active_actions.bulk_delete(self.actions_to_remove) + + def _add(self, action_info): + db_row = self._to_db_row(action_info) + self._get_similar(action_info).add(db_row) + id_ = ScenarioEvaluator._generate_action_id(action_info.specs) + if id_ not in self.actions_to_create: + self.actions_to_create[id_] = db_row + + def _remove(self, action_info): + similar_actions = self._get_similar(action_info) + for action in similar_actions: + if action.trigger == action_info.trigger_id and \ + action.action_id == action_info.action_id: + similar_actions.remove(action) + break + self.actions_to_remove.add( + (action_info.trigger_id, action_info.action_id)) + + def _get_similar(self, action_info): + return self.data.get(self._get_key(action_info), set()) + + def _get_key(self, action_info): + src = action_info.specs.targets.get(SOURCE, {}).get(VProps.VITRAGE_ID) + trg = action_info.specs.targets.get(TARGET, {}).get(VProps.VITRAGE_ID) + extra_info = self.action_tools[action_info.specs.type].get_extra_info( + action_info.specs) + action_type = action_info.specs.type + return src, trg, extra_info, action_type def _to_db_row(self, action_info): source = action_info.specs.targets.get(SOURCE, {}) target = action_info.specs.targets.get(TARGET, {}) - action_score = self._action_tools[action_info.specs.type].\ + action_score = self.action_tools[action_info.specs.type]. \ get_score(action_info) - extra_info = self._action_tools[action_info.specs.type].\ + extra_info = self.action_tools[action_info.specs.type]. \ get_extra_info(action_info.specs) return storage.sqlalchemy.models.ActiveAction( action_type=action_info.specs.type, @@ -577,18 +602,6 @@ class ActiveActionsTracker(object): trigger=action_info.trigger_id, score=action_score) - def _query_similar_actions(self, action_info): - """Query DB for all actions with same properties""" - source = action_info.specs.targets.get(SOURCE, {}) - target = action_info.specs.targets.get(TARGET, {}) - extra_info = self._action_tools[action_info.specs.type].get_extra_info( - action_info.specs) - return self._db.active_actions.query( - action_type=action_info.specs.type, - extra_info=extra_info, - source_vertex_id=source.get(VProps.VITRAGE_ID), - target_vertex_id=target.get(VProps.VITRAGE_ID)) - @classmethod def _is_highest_score(cls, db_actions, action_info): """Get the top action from the list and compare to action_info @@ -600,7 +613,8 @@ class ActiveActionsTracker(object): if not db_actions: return True highest_score_action = min( - db_actions, key=lambda action: (-action.score, action.created_at)) + db_actions, key=lambda action: (-action.score, action.created_at + or utcnow(False))) return highest_score_action.trigger == action_info.trigger_id and \ highest_score_action.action_id == action_info.action_id diff --git a/vitrage/storage/impl_sqlalchemy.py b/vitrage/storage/impl_sqlalchemy.py index 650de3fbb..95167ba87 100644 --- a/vitrage/storage/impl_sqlalchemy.py +++ b/vitrage/storage/impl_sqlalchemy.py @@ -17,7 +17,7 @@ from __future__ import absolute_import from oslo_db.sqlalchemy import session as db_session from oslo_log import log -from sqlalchemy import and_ +from sqlalchemy import and_, or_ from sqlalchemy.engine import url as sqlalchemy_url from sqlalchemy import func @@ -146,6 +146,14 @@ class BaseTableConn(object): super(BaseTableConn, self).__init__() self._engine_facade = engine_facade + def bulk_create(self, items): + if not items: + return + + session = self._engine_facade.get_session() + with session.begin(): + session.bulk_save_objects(items) + def query_filter(self, model, **kwargs): session = self._engine_facade.get_session() query = session.query(model) @@ -225,6 +233,21 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn): trigger=trigger) return query.all() + def query_similar(self, actions): + """Query DB for all actions with same properties""" + session = self._engine_facade.get_session() + query = session.query(models.ActiveAction) + + filters = [] + for source, target, extra_info, action_type in actions: + filters.append( + and_(models.ActiveAction.action_type == action_type, + models.ActiveAction.extra_info == extra_info, + models.ActiveAction.source_vertex_id == source, + models.ActiveAction.target_vertex_id == target,)) + query = query.filter(or_(*filters)) + return query.all() + def delete(self, action_type=None, extra_info=None, @@ -244,6 +267,20 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn): trigger=trigger) return query.delete() + def bulk_delete(self, actions): + if not actions: + return + session = self._engine_facade.get_session() + query = session.query(models.ActiveAction) + + filters = [] + for trigger, action_id in actions: + filters.append( + and_(models.ActiveAction.trigger == trigger, + models.ActiveAction.action_id == action_id)) + query = query.filter(or_(*filters)) + return query.delete() + class WebhooksConnection(base.WebhooksConnection, BaseTableConn): diff --git a/vitrage/tests/functional/base.py b/vitrage/tests/functional/base.py index 359139356..be03920c9 100644 --- a/vitrage/tests/functional/base.py +++ b/vitrage/tests/functional/base.py @@ -51,3 +51,13 @@ class TestFunctionalBase(TestEntityGraphUnitBase): snap_vals={DSProps.DATASOURCE_ACTION: DatasourceAction.INIT_SNAPSHOT}) return mock_driver.generate_sequential_events_list(gen_list) + + @staticmethod + def _consume_queue(event_queue, processor): + while not event_queue.empty(): + data = event_queue.get() + if isinstance(data, list): + for event in data: + processor.process_event(event) + else: + processor.process_event(data) diff --git a/vitrage/tests/functional/entity_graph/consistency/test_consistency.py b/vitrage/tests/functional/entity_graph/consistency/test_consistency.py index b9c765957..e098b4da4 100644 --- a/vitrage/tests/functional/entity_graph/consistency/test_consistency.py +++ b/vitrage/tests/functional/entity_graph/consistency/test_consistency.py @@ -260,8 +260,12 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration): count = 0 while not self.event_queue.empty(): count += 1 - event = self.event_queue.get() - self.processor.process_event(event) + data = self.event_queue.get() + if isinstance(data, list): + for event in data: + self.processor.process_event(event) + else: + self.processor.process_event(data) return num_retries += 1 diff --git a/vitrage/tests/functional/entity_graph/graph_persistor/test_graph_persistor.py b/vitrage/tests/functional/entity_graph/graph_persistor/test_graph_persistor.py index 3a1323280..5d959c5c6 100644 --- a/vitrage/tests/functional/entity_graph/graph_persistor/test_graph_persistor.py +++ b/vitrage/tests/functional/entity_graph/graph_persistor/test_graph_persistor.py @@ -73,6 +73,7 @@ class TestGraphPersistor(TestFunctionalBase, TestConfiguration): edge = g.get_edges(vertices[0].vertex_id).pop() edge[EdgeProperties.VITRAGE_IS_DELETED] = True g.update_edge(edge) + graph_persistor.flush_events() # Store graph: graph_persistor.store_graph() @@ -85,6 +86,7 @@ class TestGraphPersistor(TestFunctionalBase, TestConfiguration): edge = g.get_edges(vertices[2].vertex_id).pop() edge[EdgeProperties.RELATIONSHIP_TYPE] = 'kuku' g.update_edge(edge) + graph_persistor.flush_events() self.assertIsNone(self.fail_msg, 'callback failed') diff --git a/vitrage/tests/functional/evaluator/test_action_executor.py b/vitrage/tests/functional/evaluator/test_action_executor.py index 1a5199444..c60a85aa8 100644 --- a/vitrage/tests/functional/evaluator/test_action_executor.py +++ b/vitrage/tests/functional/evaluator/test_action_executor.py @@ -38,6 +38,7 @@ from vitrage.evaluator.actions.evaluator_event_transformer \ import VITRAGE_DATASOURCE from vitrage.evaluator.actions.recipes.action_steps import ADD_VERTEX from vitrage.evaluator.actions.recipes.base import EVALUATOR_EVENT_TYPE +from vitrage.evaluator.scenario_evaluator import ActionInfo from vitrage.evaluator.template_data import ActionSpecs from vitrage.evaluator.template_fields import TemplateFields as TFields from vitrage.opts import register_opts @@ -84,8 +85,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration): event_queue, action_executor = self._init_executer() # Test Action - do - action_executor.execute(action_spec, ActionMode.DO) - processor.process_event(event_queue.get()) + action_executor.execute( + [ActionInfo(action_spec, ActionMode.DO, None, None)]) + self._consume_queue(event_queue, processor) host_vertex_after = processor.entity_graph.get_vertex( host_vertex_before.vertex_id) @@ -104,8 +106,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration): self.assertEqual(v_state_after, OperationalResourceState.SUBOPTIMAL) # Test Action - undo - action_executor.execute(action_spec, ActionMode.UNDO) - processor.process_event(event_queue.get()) + action_executor.execute( + [ActionInfo(action_spec, ActionMode.UNDO, None, None)]) + self._consume_queue(event_queue, processor) host_vertex_after_undo = processor.entity_graph.get_vertex( host_vertex_before.vertex_id) @@ -134,8 +137,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration): event_queue, action_executor = self._init_executer() # Test Action - do - action_executor.execute(action_spec, ActionMode.DO) - processor.process_event(event_queue.get()) + action_executor.execute( + [ActionInfo(action_spec, ActionMode.DO, None, None)]) + self._consume_queue(event_queue, processor) instance_vertex_after = processor.entity_graph.get_vertex( instance_vertex_before.vertex_id) @@ -144,8 +148,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration): self.assertTrue(instance_vertex_after.get(VProps.IS_MARKED_DOWN)) # Test Action - undo - action_executor.execute(action_spec, ActionMode.UNDO) - processor.process_event(event_queue.get()) + action_executor.execute( + [ActionInfo(action_spec, ActionMode.UNDO, None, None)]) + self._consume_queue(event_queue, processor) instance_vertex_after_undo = processor.entity_graph.get_vertex( instance_vertex_before.vertex_id) @@ -170,8 +175,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration): event_queue, action_executor = self._init_executer() # Test Action - do - action_executor.execute(action_spec, ActionMode.DO) - processor.process_event(event_queue.get()) + action_executor.execute( + [ActionInfo(action_spec, ActionMode.DO, None, None)]) + self._consume_queue(event_queue, processor) host_vertex_after = processor.entity_graph.get_vertex( host_vertex_before.vertex_id) @@ -180,8 +186,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration): self.assertTrue(host_vertex_after.get(VProps.IS_MARKED_DOWN)) # Test Action - undo - action_executor.execute(action_spec, ActionMode.UNDO) - processor.process_event(event_queue.get()) + action_executor.execute( + [ActionInfo(action_spec, ActionMode.UNDO, None, None)]) + self._consume_queue(event_queue, processor) host_vertex_after_undo = processor.entity_graph.get_vertex( host_vertex_before.vertex_id) @@ -227,8 +234,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration): alarm1.vertex_id, EdgeLabel.CAUSES) # Test Action - do - action_executor.execute(action_spec, ActionMode.DO) - processor.process_event(event_queue.get()) + action_executor.execute( + [ActionInfo(action_spec, ActionMode.DO, None, None)]) + self._consume_queue(event_queue, processor) new_edge = processor.entity_graph.get_edge(alarm2.vertex_id, alarm1.vertex_id, @@ -267,8 +275,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration): event_queue, action_executor = self._init_executer() # Test Action - action_executor.execute(action_spec, ActionMode.DO) - processor.process_event(event_queue.get()) + action_executor.execute( + [ActionInfo(action_spec, ActionMode.DO, None, None)]) + self._consume_queue(event_queue, processor) after_alarms = processor.entity_graph.get_vertices( vertex_attr_filter=alarm_vertex_attrs) @@ -330,9 +339,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration): event_queue, action_executor = self._init_executer() # Test Action - undo - action_executor.execute(action_spec, ActionMode.UNDO) - event = event_queue.get() - processor.process_event(event) + action_executor.execute( + [ActionInfo(action_spec, ActionMode.UNDO, None, None)]) + self._consume_queue(event_queue, processor) after_alarms = processor.entity_graph.get_vertices( vertex_attr_filter=alarm_vertex_attrs) diff --git a/vitrage/tests/functional/evaluator/test_scenario_evaluator.py b/vitrage/tests/functional/evaluator/test_scenario_evaluator.py index 21b05623e..a2091251d 100644 --- a/vitrage/tests/functional/evaluator/test_scenario_evaluator.py +++ b/vitrage/tests/functional/evaluator/test_scenario_evaluator.py @@ -450,8 +450,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): port_vertex = entity_graph.get_vertices( vertex_attr_filter={VProps.VITRAGE_TYPE: NEUTRON_PORT_DATASOURCE})[0] - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) # test asserts query = {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM} @@ -482,8 +481,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): nagios_event = mock_driver.generate_random_events_list(generator)[0] processor.process_event(nagios_event) - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) # test asserts self.assertEqual(num_orig_vertices + num_added_vertices + @@ -527,8 +525,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): nagios_vertex.vertex_id)][0] nagios_edge[EProps.VITRAGE_IS_DELETED] = True processor.entity_graph.update_edge(nagios_edge) - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) # test asserts self.assertEqual(num_orig_vertices + num_added_vertices + @@ -583,8 +580,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): nagios_vertex.vertex_id)][0] nagios_edge[EProps.VITRAGE_IS_DELETED] = False processor.entity_graph.update_edge(nagios_edge) - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) # test asserts self.assertEqual(num_orig_vertices + num_added_vertices + @@ -638,8 +634,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): # disable PORT_PROBLEM alarm nagios_event[NagiosProperties.STATUS] = NagiosTestStatus.OK processor.process_event(nagios_event) - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) # test asserts self.assertEqual(num_orig_vertices + num_added_vertices + @@ -756,8 +751,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): EdgeLabel.ATTACHED) entity_graph.add_edge(edge) - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) # test asserts query = {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM} @@ -789,8 +783,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): nagios_event = mock_driver.generate_random_events_list(generator)[0] processor.process_event(nagios_event) - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) self.assertEqual(num_orig_vertices + num_added_vertices + num_deduced_vertices + num_network_alarm_vertices, @@ -825,8 +818,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): # delete NETWORK_PROBLEM alarm nagios_event[NagiosProperties.STATUS] = NagiosTestStatus.OK processor.process_event(nagios_event) - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) self.assertEqual(num_orig_vertices + num_added_vertices + num_deduced_vertices + num_network_alarm_vertices + @@ -907,8 +899,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): volume_event1['attachments'][0]['server_id'] = instances[0][VProps.ID] processor.process_event(volume_event1) - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) # test asserts num_volumes = 1 @@ -955,8 +946,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): volume_event2['attachments'][0]['server_id'] = instances[1][VProps.ID] processor.process_event(volume_event2) - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) # test asserts num_volumes = 2 @@ -1021,8 +1011,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): volume_event2['volume_attachment'][0]['instance_uuid'] = \ volume_event2['attachments'][0]['server_id'] processor.process_event(volume_event2) - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) # test asserts self.assertEqual(num_orig_vertices + num_volumes + num_deduced_alarms + @@ -1106,8 +1095,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): volume_event1['volume_attachment'][0]['instance_uuid'] = \ volume_event1['attachments'][0]['server_id'] processor.process_event(volume_event1) - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) # test asserts self.assertEqual(num_orig_vertices + num_volumes + num_deduced_alarms + @@ -1352,8 +1340,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration): def get_host_after_event(self, event_queue, nagios_event, processor, target_host): processor.process_event(nagios_event) - while not event_queue.empty(): - processor.process_event(event_queue.get()) + self._consume_queue(event_queue, processor) host_v = self._get_entity_from_graph(NOVA_HOST_DATASOURCE, target_host, target_host,