Merge "Database read write in bulk"
This commit is contained in:
commit
ad7c9d38ad
@ -100,6 +100,7 @@ class ConsistencyEnforcer(object):
|
||||
return self._filter_vertices_to_be_deleted(vertices)
|
||||
|
||||
def _push_events_to_queue(self, vertices, action):
|
||||
events = []
|
||||
for vertex in vertices:
|
||||
event = {
|
||||
DSProps.ENTITY_TYPE: CONSISTENCY_DATASOURCE,
|
||||
@ -112,7 +113,8 @@ class ConsistencyEnforcer(object):
|
||||
VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY],
|
||||
VProps.IS_REAL_VITRAGE_ID: True
|
||||
}
|
||||
self.actions_callback('consistency', event)
|
||||
events.append(event)
|
||||
self.actions_callback('consistency', events)
|
||||
|
||||
@staticmethod
|
||||
def _filter_vertices_to_be_deleted(vertices):
|
||||
|
@ -93,11 +93,15 @@ class VitrageGraphInit(object):
|
||||
self.events_coordination.start()
|
||||
|
||||
def process_event(self, event):
|
||||
if event.get('template_action'):
|
||||
if isinstance(event, list):
|
||||
for e in event:
|
||||
self.processor.process_event(e)
|
||||
elif event.get('template_action'):
|
||||
self.workers.submit_template_event(event)
|
||||
self.workers.submit_evaluators_reload_templates()
|
||||
else:
|
||||
self.processor.process_event(event)
|
||||
self.persist.flush_events()
|
||||
|
||||
def _recreate_transformers_id_cache(self):
|
||||
for v in self.graph.get_vertices():
|
||||
|
@ -28,6 +28,7 @@ class GraphPersistency(object):
|
||||
self.conf = conf
|
||||
self.db = db
|
||||
self.graph = graph
|
||||
self.events_buffer = []
|
||||
|
||||
def store_graph(self):
|
||||
LOG.info('Persisting graph...')
|
||||
@ -91,7 +92,13 @@ class GraphPersistency(object):
|
||||
|
||||
event_row = models.Event(payload=curr, is_vertex=is_vertex,
|
||||
event_id=event_id)
|
||||
self.db.events.create(event_row)
|
||||
self.events_buffer.append(event_row)
|
||||
|
||||
def flush_events(self):
|
||||
if not self.events_buffer:
|
||||
return
|
||||
self.db.events.bulk_create(self.events_buffer)
|
||||
self.events_buffer = []
|
||||
|
||||
@staticmethod
|
||||
def is_important_change(before, curr, *args):
|
||||
|
@ -18,7 +18,6 @@ import cotyledon
|
||||
import multiprocessing
|
||||
import multiprocessing.queues
|
||||
import os
|
||||
from oslo_concurrency import processutils as ps
|
||||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
from oslo_utils import uuidutils
|
||||
@ -94,7 +93,7 @@ class GraphWorkersManager(cotyledon.ServiceManager):
|
||||
"""
|
||||
if self._evaluator_queues:
|
||||
raise VitrageError('add_evaluator_workers called more than once')
|
||||
workers = self._conf.evaluator.workers or ps.get_worker_count()
|
||||
workers = self._conf.evaluator.workers
|
||||
queues = [multiprocessing.JoinableQueue() for i in range(workers)]
|
||||
self.add(EvaluatorWorker,
|
||||
args=(self._conf, queues, workers),
|
||||
|
@ -19,12 +19,10 @@ from vitrage.evaluator.template_schemas import init_template_schemas
|
||||
# Register options for the service
|
||||
OPTS = [
|
||||
cfg.IntOpt('workers',
|
||||
default=None,
|
||||
default=1,
|
||||
min=1,
|
||||
max=32,
|
||||
help='Number of workers for template evaluator, default is '
|
||||
'equal to the number of CPUs available if that can be '
|
||||
'determined, else a default worker count of 1 is returned.'
|
||||
help='Number of workers for template evaluator.'
|
||||
),
|
||||
]
|
||||
|
||||
|
@ -45,6 +45,8 @@ from vitrage.utils import datetime as datetime_utils
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
EVALUATOR_EVENT = 'evaluator.event'
|
||||
TARGET = 'target'
|
||||
SOURCE = 'source'
|
||||
|
||||
|
||||
class ActionExecutor(object):
|
||||
@ -64,7 +66,17 @@ class ActionExecutor(object):
|
||||
EXECUTE_EXTERNAL: self._execute_external,
|
||||
}
|
||||
|
||||
def execute(self, action_spec, action_mode):
|
||||
def execute(self, actions):
|
||||
if not actions:
|
||||
return
|
||||
|
||||
events = []
|
||||
for action in actions:
|
||||
LOG.info('Action: %s', self._action_str(action))
|
||||
events.extend(self._execute(action.specs, action.mode))
|
||||
self.actions_callback(EVALUATOR_EVENT, events)
|
||||
|
||||
def _execute(self, action_spec, action_mode):
|
||||
|
||||
action_recipe = self.action_recipes[action_spec.type]
|
||||
if action_mode == ActionMode.DO:
|
||||
@ -72,8 +84,10 @@ class ActionExecutor(object):
|
||||
else:
|
||||
steps = action_recipe.get_undo_recipe(action_spec)
|
||||
|
||||
events = []
|
||||
for step in steps:
|
||||
self.action_step_defs[step.type](step.params)
|
||||
events.append(self.action_step_defs[step.type](step.params))
|
||||
return events
|
||||
|
||||
def _add_vertex(self, params):
|
||||
|
||||
@ -81,7 +95,7 @@ class ActionExecutor(object):
|
||||
ActionExecutor._add_default_properties(event)
|
||||
event[EVALUATOR_EVENT_TYPE] = ADD_VERTEX
|
||||
|
||||
self.actions_callback(EVALUATOR_EVENT, event)
|
||||
return event
|
||||
|
||||
def _update_vertex(self, params):
|
||||
|
||||
@ -89,14 +103,14 @@ class ActionExecutor(object):
|
||||
ActionExecutor._add_default_properties(event)
|
||||
event[EVALUATOR_EVENT_TYPE] = UPDATE_VERTEX
|
||||
|
||||
self.actions_callback(EVALUATOR_EVENT, event)
|
||||
return event
|
||||
|
||||
def _remove_vertex(self, params):
|
||||
event = copy.deepcopy(params)
|
||||
ActionExecutor._add_default_properties(event)
|
||||
event[EVALUATOR_EVENT_TYPE] = REMOVE_VERTEX
|
||||
|
||||
self.actions_callback(EVALUATOR_EVENT, event)
|
||||
return event
|
||||
|
||||
def _add_edge(self, params):
|
||||
|
||||
@ -104,7 +118,7 @@ class ActionExecutor(object):
|
||||
ActionExecutor._add_default_properties(event)
|
||||
event[EVALUATOR_EVENT_TYPE] = ADD_EDGE
|
||||
|
||||
self.actions_callback(EVALUATOR_EVENT, event)
|
||||
return event
|
||||
|
||||
def _remove_edge(self, params):
|
||||
|
||||
@ -112,7 +126,7 @@ class ActionExecutor(object):
|
||||
ActionExecutor._add_default_properties(event)
|
||||
event[EVALUATOR_EVENT_TYPE] = REMOVE_EDGE
|
||||
|
||||
self.actions_callback(EVALUATOR_EVENT, event)
|
||||
return event
|
||||
|
||||
def _execute_external(self, params):
|
||||
|
||||
@ -158,3 +172,11 @@ class ActionExecutor(object):
|
||||
"%s.%s" % (ExecuteMistral.__module__, ExecuteMistral.__name__))
|
||||
|
||||
return recipes
|
||||
|
||||
@staticmethod
|
||||
def _action_str(action):
|
||||
s = action.specs.targets.get(SOURCE, {}).get(VProps.VITRAGE_ID, '')
|
||||
t = action.specs.targets.get(TARGET, {}).get(VProps.VITRAGE_ID, '')
|
||||
return '%s %s \'%s\' targets (%s,%s)' % (action.mode.upper(),
|
||||
action.specs.type,
|
||||
action.action_id, s, t)
|
||||
|
@ -39,6 +39,8 @@ from vitrage.graph.algo_driver.sub_graph_matching import \
|
||||
NEG_CONDITION
|
||||
from vitrage.graph.driver import Vertex
|
||||
from vitrage import storage
|
||||
from vitrage.storage.sqlalchemy import models
|
||||
from vitrage.utils.datetime import utcnow
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -65,12 +67,10 @@ class ScenarioEvaluator(object):
|
||||
enabled=False):
|
||||
self._conf = conf
|
||||
self._entity_graph = e_graph
|
||||
self._db_connection = storage.get_connection_from_config(self._conf)
|
||||
self._db = storage.get_connection_from_config(self._conf)
|
||||
self._scenario_repo = scenario_repo
|
||||
self._action_executor = ActionExecutor(self._conf, actions_callback)
|
||||
self._entity_graph.subscribe(self.process_event)
|
||||
self._active_actions_tracker = ActiveActionsTracker(
|
||||
self._conf, self._db_connection)
|
||||
self.enabled = enabled
|
||||
self.connected_component_cache = defaultdict(dict)
|
||||
|
||||
@ -138,10 +138,7 @@ class ScenarioEvaluator(object):
|
||||
LOG.exception("Evaluator error, will not execute actions %s",
|
||||
str(actions))
|
||||
|
||||
for action in actions_to_preform:
|
||||
LOG.info('Action: %s', self._action_str(action))
|
||||
self._action_executor.execute(action.specs, action.mode)
|
||||
|
||||
self._action_executor.execute(actions_to_preform)
|
||||
LOG.debug('Process event - completed')
|
||||
|
||||
def _get_element_scenarios(self, element, is_vertex):
|
||||
@ -314,33 +311,23 @@ class ScenarioEvaluator(object):
|
||||
|
||||
def _analyze_and_filter_actions(self, actions):
|
||||
LOG.debug("Actions before filtering: %s", actions)
|
||||
if not actions:
|
||||
return []
|
||||
|
||||
actions_to_perform = []
|
||||
active_actions = ActiveActionsTracker(self._conf, self._db, actions)
|
||||
for action_info in actions:
|
||||
if action_info.mode == ActionMode.DO:
|
||||
is_highest_score, exists = \
|
||||
self._active_actions_tracker.calc_do_action(action_info)
|
||||
if is_highest_score and not exists:
|
||||
actions_to_perform.append(action_info)
|
||||
active_actions.calc_do_action(action_info)
|
||||
elif action_info.mode == ActionMode.UNDO:
|
||||
is_highest_score, second_highest = \
|
||||
self._active_actions_tracker.calc_undo_action(action_info)
|
||||
if is_highest_score:
|
||||
# We should 'DO' the Second highest scored action so
|
||||
# to override the existing dominant action.
|
||||
# or, if there is no second highest scored action
|
||||
# So we just 'UNDO' the existing dominant action
|
||||
if second_highest:
|
||||
action_to_perform = self._db_action_to_action_info(
|
||||
second_highest)
|
||||
actions_to_perform.append(action_to_perform)
|
||||
else:
|
||||
active_actions.calc_undo_action(action_info)
|
||||
|
||||
actions_to_perform.append(action_info)
|
||||
active_actions.flush_db_updates()
|
||||
|
||||
unique_ordered_actions = OrderedDict()
|
||||
for action in actions_to_perform:
|
||||
id_ = ScenarioEvaluator._generate_action_id(action.specs)
|
||||
for action in active_actions.actions_to_perform:
|
||||
if isinstance(action, models.ActiveAction):
|
||||
action = self._db_action_to_action_info(action)
|
||||
id_ = self._generate_action_id(action.specs)
|
||||
unique_ordered_actions[id_] = action
|
||||
return unique_ordered_actions.values()
|
||||
|
||||
@ -472,14 +459,6 @@ class ScenarioEvaluator(object):
|
||||
for v_id in ver_to_remove:
|
||||
del match[v_id]
|
||||
|
||||
@staticmethod
|
||||
def _action_str(action):
|
||||
s = action.specs.targets.get(SOURCE, {}).get(VProps.VITRAGE_ID, '')
|
||||
t = action.specs.targets.get(TARGET, {}).get(VProps.VITRAGE_ID, '')
|
||||
return '%s %s \'%s\' targets (%s,%s)' % (action.mode.upper(),
|
||||
action.specs.type,
|
||||
action.action_id, s, t)
|
||||
|
||||
|
||||
class ActiveActionsTracker(object):
|
||||
"""Keeps track of all active actions and relative dominance/priority.
|
||||
@ -502,13 +481,31 @@ class ActiveActionsTracker(object):
|
||||
The score is used to determine which action in each group of similar
|
||||
actions to be executed next.
|
||||
"""
|
||||
action_tools = None
|
||||
|
||||
def __init__(self, conf, db_connection):
|
||||
def __init__(self, conf, db, actions):
|
||||
self.db = db
|
||||
self.data = defaultdict(set)
|
||||
self.actions_to_create = {}
|
||||
self.actions_to_remove = set()
|
||||
self.actions_to_perform = [] # use a list to keep the insertion order
|
||||
self._init_action_tools(conf)
|
||||
|
||||
# Query DB for all actions with same properties
|
||||
actions_keys = set([self._get_key(action) for action in actions])
|
||||
db_rows = self.db.active_actions.query_similar(actions_keys) or []
|
||||
for db_row in db_rows:
|
||||
self.data[(db_row.source_vertex_id, db_row.target_vertex_id,
|
||||
db_row.extra_info, db_row.action_type)].add(db_row)
|
||||
|
||||
@classmethod
|
||||
def _init_action_tools(cls, conf):
|
||||
if cls.action_tools:
|
||||
return
|
||||
info_mapper = DatasourceInfoMapper(conf)
|
||||
self._db = db_connection
|
||||
alarms_score = info_mapper.get_datasource_priorities('vitrage')
|
||||
all_scores = info_mapper.get_datasource_priorities()
|
||||
self._action_tools = {
|
||||
cls.action_tools = {
|
||||
ActionType.SET_STATE: pt.SetStateTools(all_scores),
|
||||
ActionType.RAISE_ALARM: pt.RaiseAlarmTools(alarms_score),
|
||||
ActionType.ADD_CAUSAL_RELATIONSHIP: pt.BaselineTools,
|
||||
@ -523,50 +520,78 @@ class ActiveActionsTracker(object):
|
||||
Only a top scored action that is new should be performed
|
||||
:return: (is top score, is it already existing)
|
||||
"""
|
||||
# TODO(idan_hefetz): DB read and write not in a transaction
|
||||
active_actions = self._query_similar_actions(action_info)
|
||||
similar_actions = self._get_similar(action_info)
|
||||
exists = any(
|
||||
a.action_id == action_info.action_id and
|
||||
a.trigger == action_info.trigger_id for a in active_actions)
|
||||
a.trigger == action_info.trigger_id for a in similar_actions)
|
||||
if not exists:
|
||||
db_row = self._to_db_row(action_info)
|
||||
active_actions.append(db_row)
|
||||
LOG.debug("DB Insert active_actions %s", str(db_row))
|
||||
self._db.active_actions.create(db_row)
|
||||
|
||||
return self._is_highest_score(active_actions, action_info), exists
|
||||
self._add(action_info)
|
||||
if not exists and self._is_highest_score(similar_actions, action_info):
|
||||
self.actions_to_perform.append(action_info)
|
||||
|
||||
def calc_undo_action(self, action_info):
|
||||
"""Delete this action form active_actions table, if exists
|
||||
|
||||
return value to help decide if action should be performed
|
||||
decide if action should be performed
|
||||
A top scored action should be 'undone' if there is not a second action.
|
||||
If there is a second, it should now be 'done' and become the dominant
|
||||
:param action_info: action to delete
|
||||
:return: is_highest_score, second highest action if exists
|
||||
"""
|
||||
# TODO(idan_hefetz): DB read and write not in a transaction
|
||||
active_actions = self._query_similar_actions(action_info)
|
||||
similar_actions = self._get_similar(action_info)
|
||||
if not self._is_highest_score(similar_actions, action_info):
|
||||
self._remove(action_info)
|
||||
return
|
||||
|
||||
LOG.debug("DB delete active_actions %s %s",
|
||||
action_info.action_id,
|
||||
str(action_info.trigger_id))
|
||||
self._db.active_actions.delete(
|
||||
action_id=action_info.action_id,
|
||||
trigger=action_info.trigger_id)
|
||||
|
||||
is_highest_score = self._is_highest_score(active_actions, action_info)
|
||||
if is_highest_score and len(active_actions) > 1:
|
||||
return is_highest_score, self._sort_db_actions(active_actions)[1]
|
||||
second_highest = self._sort_db_actions(similar_actions)[1]\
|
||||
if len(similar_actions) > 1 else None
|
||||
# We should 'DO' the Second highest scored action so
|
||||
# to override the existing dominant action.
|
||||
# or, if there is no second highest scored action
|
||||
# So we just 'UNDO' the existing dominant action
|
||||
if second_highest:
|
||||
self.actions_to_perform.append(second_highest)
|
||||
else:
|
||||
return is_highest_score, None
|
||||
self.actions_to_perform.append(action_info)
|
||||
self._remove(action_info)
|
||||
|
||||
def flush_db_updates(self):
|
||||
self.db.active_actions.bulk_create(self.actions_to_create.values())
|
||||
self.db.active_actions.bulk_delete(self.actions_to_remove)
|
||||
|
||||
def _add(self, action_info):
|
||||
db_row = self._to_db_row(action_info)
|
||||
self._get_similar(action_info).add(db_row)
|
||||
id_ = ScenarioEvaluator._generate_action_id(action_info.specs)
|
||||
if id_ not in self.actions_to_create:
|
||||
self.actions_to_create[id_] = db_row
|
||||
|
||||
def _remove(self, action_info):
|
||||
similar_actions = self._get_similar(action_info)
|
||||
for action in similar_actions:
|
||||
if action.trigger == action_info.trigger_id and \
|
||||
action.action_id == action_info.action_id:
|
||||
similar_actions.remove(action)
|
||||
break
|
||||
self.actions_to_remove.add(
|
||||
(action_info.trigger_id, action_info.action_id))
|
||||
|
||||
def _get_similar(self, action_info):
|
||||
return self.data.get(self._get_key(action_info), set())
|
||||
|
||||
def _get_key(self, action_info):
|
||||
src = action_info.specs.targets.get(SOURCE, {}).get(VProps.VITRAGE_ID)
|
||||
trg = action_info.specs.targets.get(TARGET, {}).get(VProps.VITRAGE_ID)
|
||||
extra_info = self.action_tools[action_info.specs.type].get_extra_info(
|
||||
action_info.specs)
|
||||
action_type = action_info.specs.type
|
||||
return src, trg, extra_info, action_type
|
||||
|
||||
def _to_db_row(self, action_info):
|
||||
source = action_info.specs.targets.get(SOURCE, {})
|
||||
target = action_info.specs.targets.get(TARGET, {})
|
||||
action_score = self._action_tools[action_info.specs.type].\
|
||||
action_score = self.action_tools[action_info.specs.type]. \
|
||||
get_score(action_info)
|
||||
extra_info = self._action_tools[action_info.specs.type].\
|
||||
extra_info = self.action_tools[action_info.specs.type]. \
|
||||
get_extra_info(action_info.specs)
|
||||
return storage.sqlalchemy.models.ActiveAction(
|
||||
action_type=action_info.specs.type,
|
||||
@ -577,18 +602,6 @@ class ActiveActionsTracker(object):
|
||||
trigger=action_info.trigger_id,
|
||||
score=action_score)
|
||||
|
||||
def _query_similar_actions(self, action_info):
|
||||
"""Query DB for all actions with same properties"""
|
||||
source = action_info.specs.targets.get(SOURCE, {})
|
||||
target = action_info.specs.targets.get(TARGET, {})
|
||||
extra_info = self._action_tools[action_info.specs.type].get_extra_info(
|
||||
action_info.specs)
|
||||
return self._db.active_actions.query(
|
||||
action_type=action_info.specs.type,
|
||||
extra_info=extra_info,
|
||||
source_vertex_id=source.get(VProps.VITRAGE_ID),
|
||||
target_vertex_id=target.get(VProps.VITRAGE_ID))
|
||||
|
||||
@classmethod
|
||||
def _is_highest_score(cls, db_actions, action_info):
|
||||
"""Get the top action from the list and compare to action_info
|
||||
@ -600,7 +613,8 @@ class ActiveActionsTracker(object):
|
||||
if not db_actions:
|
||||
return True
|
||||
highest_score_action = min(
|
||||
db_actions, key=lambda action: (-action.score, action.created_at))
|
||||
db_actions, key=lambda action: (-action.score, action.created_at
|
||||
or utcnow(False)))
|
||||
return highest_score_action.trigger == action_info.trigger_id and \
|
||||
highest_score_action.action_id == action_info.action_id
|
||||
|
||||
|
@ -17,7 +17,7 @@ from __future__ import absolute_import
|
||||
|
||||
from oslo_db.sqlalchemy import session as db_session
|
||||
from oslo_log import log
|
||||
from sqlalchemy import and_
|
||||
from sqlalchemy import and_, or_
|
||||
from sqlalchemy.engine import url as sqlalchemy_url
|
||||
from sqlalchemy import func
|
||||
|
||||
@ -146,6 +146,14 @@ class BaseTableConn(object):
|
||||
super(BaseTableConn, self).__init__()
|
||||
self._engine_facade = engine_facade
|
||||
|
||||
def bulk_create(self, items):
|
||||
if not items:
|
||||
return
|
||||
|
||||
session = self._engine_facade.get_session()
|
||||
with session.begin():
|
||||
session.bulk_save_objects(items)
|
||||
|
||||
def query_filter(self, model, **kwargs):
|
||||
session = self._engine_facade.get_session()
|
||||
query = session.query(model)
|
||||
@ -225,6 +233,21 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn):
|
||||
trigger=trigger)
|
||||
return query.all()
|
||||
|
||||
def query_similar(self, actions):
|
||||
"""Query DB for all actions with same properties"""
|
||||
session = self._engine_facade.get_session()
|
||||
query = session.query(models.ActiveAction)
|
||||
|
||||
filters = []
|
||||
for source, target, extra_info, action_type in actions:
|
||||
filters.append(
|
||||
and_(models.ActiveAction.action_type == action_type,
|
||||
models.ActiveAction.extra_info == extra_info,
|
||||
models.ActiveAction.source_vertex_id == source,
|
||||
models.ActiveAction.target_vertex_id == target,))
|
||||
query = query.filter(or_(*filters))
|
||||
return query.all()
|
||||
|
||||
def delete(self,
|
||||
action_type=None,
|
||||
extra_info=None,
|
||||
@ -244,6 +267,20 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn):
|
||||
trigger=trigger)
|
||||
return query.delete()
|
||||
|
||||
def bulk_delete(self, actions):
|
||||
if not actions:
|
||||
return
|
||||
session = self._engine_facade.get_session()
|
||||
query = session.query(models.ActiveAction)
|
||||
|
||||
filters = []
|
||||
for trigger, action_id in actions:
|
||||
filters.append(
|
||||
and_(models.ActiveAction.trigger == trigger,
|
||||
models.ActiveAction.action_id == action_id))
|
||||
query = query.filter(or_(*filters))
|
||||
return query.delete()
|
||||
|
||||
|
||||
class WebhooksConnection(base.WebhooksConnection,
|
||||
BaseTableConn):
|
||||
|
@ -51,3 +51,13 @@ class TestFunctionalBase(TestEntityGraphUnitBase):
|
||||
snap_vals={DSProps.DATASOURCE_ACTION:
|
||||
DatasourceAction.INIT_SNAPSHOT})
|
||||
return mock_driver.generate_sequential_events_list(gen_list)
|
||||
|
||||
@staticmethod
|
||||
def _consume_queue(event_queue, processor):
|
||||
while not event_queue.empty():
|
||||
data = event_queue.get()
|
||||
if isinstance(data, list):
|
||||
for event in data:
|
||||
processor.process_event(event)
|
||||
else:
|
||||
processor.process_event(data)
|
||||
|
@ -260,8 +260,12 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration):
|
||||
count = 0
|
||||
while not self.event_queue.empty():
|
||||
count += 1
|
||||
event = self.event_queue.get()
|
||||
self.processor.process_event(event)
|
||||
data = self.event_queue.get()
|
||||
if isinstance(data, list):
|
||||
for event in data:
|
||||
self.processor.process_event(event)
|
||||
else:
|
||||
self.processor.process_event(data)
|
||||
return
|
||||
|
||||
num_retries += 1
|
||||
|
@ -73,6 +73,7 @@ class TestGraphPersistor(TestFunctionalBase, TestConfiguration):
|
||||
edge = g.get_edges(vertices[0].vertex_id).pop()
|
||||
edge[EdgeProperties.VITRAGE_IS_DELETED] = True
|
||||
g.update_edge(edge)
|
||||
graph_persistor.flush_events()
|
||||
|
||||
# Store graph:
|
||||
graph_persistor.store_graph()
|
||||
@ -85,6 +86,7 @@ class TestGraphPersistor(TestFunctionalBase, TestConfiguration):
|
||||
edge = g.get_edges(vertices[2].vertex_id).pop()
|
||||
edge[EdgeProperties.RELATIONSHIP_TYPE] = 'kuku'
|
||||
g.update_edge(edge)
|
||||
graph_persistor.flush_events()
|
||||
|
||||
self.assertIsNone(self.fail_msg, 'callback failed')
|
||||
|
||||
|
@ -38,6 +38,7 @@ from vitrage.evaluator.actions.evaluator_event_transformer \
|
||||
import VITRAGE_DATASOURCE
|
||||
from vitrage.evaluator.actions.recipes.action_steps import ADD_VERTEX
|
||||
from vitrage.evaluator.actions.recipes.base import EVALUATOR_EVENT_TYPE
|
||||
from vitrage.evaluator.scenario_evaluator import ActionInfo
|
||||
from vitrage.evaluator.template_data import ActionSpecs
|
||||
from vitrage.evaluator.template_fields import TemplateFields as TFields
|
||||
from vitrage.opts import register_opts
|
||||
@ -84,8 +85,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
|
||||
event_queue, action_executor = self._init_executer()
|
||||
|
||||
# Test Action - do
|
||||
action_executor.execute(action_spec, ActionMode.DO)
|
||||
processor.process_event(event_queue.get())
|
||||
action_executor.execute(
|
||||
[ActionInfo(action_spec, ActionMode.DO, None, None)])
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
host_vertex_after = processor.entity_graph.get_vertex(
|
||||
host_vertex_before.vertex_id)
|
||||
@ -104,8 +106,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
|
||||
self.assertEqual(v_state_after, OperationalResourceState.SUBOPTIMAL)
|
||||
|
||||
# Test Action - undo
|
||||
action_executor.execute(action_spec, ActionMode.UNDO)
|
||||
processor.process_event(event_queue.get())
|
||||
action_executor.execute(
|
||||
[ActionInfo(action_spec, ActionMode.UNDO, None, None)])
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
host_vertex_after_undo = processor.entity_graph.get_vertex(
|
||||
host_vertex_before.vertex_id)
|
||||
@ -134,8 +137,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
|
||||
event_queue, action_executor = self._init_executer()
|
||||
|
||||
# Test Action - do
|
||||
action_executor.execute(action_spec, ActionMode.DO)
|
||||
processor.process_event(event_queue.get())
|
||||
action_executor.execute(
|
||||
[ActionInfo(action_spec, ActionMode.DO, None, None)])
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
instance_vertex_after = processor.entity_graph.get_vertex(
|
||||
instance_vertex_before.vertex_id)
|
||||
@ -144,8 +148,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
|
||||
self.assertTrue(instance_vertex_after.get(VProps.IS_MARKED_DOWN))
|
||||
|
||||
# Test Action - undo
|
||||
action_executor.execute(action_spec, ActionMode.UNDO)
|
||||
processor.process_event(event_queue.get())
|
||||
action_executor.execute(
|
||||
[ActionInfo(action_spec, ActionMode.UNDO, None, None)])
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
instance_vertex_after_undo = processor.entity_graph.get_vertex(
|
||||
instance_vertex_before.vertex_id)
|
||||
@ -170,8 +175,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
|
||||
event_queue, action_executor = self._init_executer()
|
||||
|
||||
# Test Action - do
|
||||
action_executor.execute(action_spec, ActionMode.DO)
|
||||
processor.process_event(event_queue.get())
|
||||
action_executor.execute(
|
||||
[ActionInfo(action_spec, ActionMode.DO, None, None)])
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
host_vertex_after = processor.entity_graph.get_vertex(
|
||||
host_vertex_before.vertex_id)
|
||||
@ -180,8 +186,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
|
||||
self.assertTrue(host_vertex_after.get(VProps.IS_MARKED_DOWN))
|
||||
|
||||
# Test Action - undo
|
||||
action_executor.execute(action_spec, ActionMode.UNDO)
|
||||
processor.process_event(event_queue.get())
|
||||
action_executor.execute(
|
||||
[ActionInfo(action_spec, ActionMode.UNDO, None, None)])
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
host_vertex_after_undo = processor.entity_graph.get_vertex(
|
||||
host_vertex_before.vertex_id)
|
||||
@ -227,8 +234,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
|
||||
alarm1.vertex_id,
|
||||
EdgeLabel.CAUSES)
|
||||
# Test Action - do
|
||||
action_executor.execute(action_spec, ActionMode.DO)
|
||||
processor.process_event(event_queue.get())
|
||||
action_executor.execute(
|
||||
[ActionInfo(action_spec, ActionMode.DO, None, None)])
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
new_edge = processor.entity_graph.get_edge(alarm2.vertex_id,
|
||||
alarm1.vertex_id,
|
||||
@ -267,8 +275,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
|
||||
event_queue, action_executor = self._init_executer()
|
||||
|
||||
# Test Action
|
||||
action_executor.execute(action_spec, ActionMode.DO)
|
||||
processor.process_event(event_queue.get())
|
||||
action_executor.execute(
|
||||
[ActionInfo(action_spec, ActionMode.DO, None, None)])
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
after_alarms = processor.entity_graph.get_vertices(
|
||||
vertex_attr_filter=alarm_vertex_attrs)
|
||||
@ -330,9 +339,9 @@ class TestActionExecutor(TestFunctionalBase, TestConfiguration):
|
||||
event_queue, action_executor = self._init_executer()
|
||||
|
||||
# Test Action - undo
|
||||
action_executor.execute(action_spec, ActionMode.UNDO)
|
||||
event = event_queue.get()
|
||||
processor.process_event(event)
|
||||
action_executor.execute(
|
||||
[ActionInfo(action_spec, ActionMode.UNDO, None, None)])
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
after_alarms = processor.entity_graph.get_vertices(
|
||||
vertex_attr_filter=alarm_vertex_attrs)
|
||||
|
@ -450,8 +450,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
port_vertex = entity_graph.get_vertices(
|
||||
vertex_attr_filter={VProps.VITRAGE_TYPE:
|
||||
NEUTRON_PORT_DATASOURCE})[0]
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
# test asserts
|
||||
query = {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}
|
||||
@ -482,8 +481,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
nagios_event = mock_driver.generate_random_events_list(generator)[0]
|
||||
|
||||
processor.process_event(nagios_event)
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
# test asserts
|
||||
self.assertEqual(num_orig_vertices + num_added_vertices +
|
||||
@ -527,8 +525,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
nagios_vertex.vertex_id)][0]
|
||||
nagios_edge[EProps.VITRAGE_IS_DELETED] = True
|
||||
processor.entity_graph.update_edge(nagios_edge)
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
# test asserts
|
||||
self.assertEqual(num_orig_vertices + num_added_vertices +
|
||||
@ -583,8 +580,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
nagios_vertex.vertex_id)][0]
|
||||
nagios_edge[EProps.VITRAGE_IS_DELETED] = False
|
||||
processor.entity_graph.update_edge(nagios_edge)
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
# test asserts
|
||||
self.assertEqual(num_orig_vertices + num_added_vertices +
|
||||
@ -638,8 +634,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
# disable PORT_PROBLEM alarm
|
||||
nagios_event[NagiosProperties.STATUS] = NagiosTestStatus.OK
|
||||
processor.process_event(nagios_event)
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
# test asserts
|
||||
self.assertEqual(num_orig_vertices + num_added_vertices +
|
||||
@ -756,8 +751,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
EdgeLabel.ATTACHED)
|
||||
entity_graph.add_edge(edge)
|
||||
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
# test asserts
|
||||
query = {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}
|
||||
@ -789,8 +783,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
nagios_event = mock_driver.generate_random_events_list(generator)[0]
|
||||
|
||||
processor.process_event(nagios_event)
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
self.assertEqual(num_orig_vertices + num_added_vertices +
|
||||
num_deduced_vertices + num_network_alarm_vertices,
|
||||
@ -825,8 +818,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
# delete NETWORK_PROBLEM alarm
|
||||
nagios_event[NagiosProperties.STATUS] = NagiosTestStatus.OK
|
||||
processor.process_event(nagios_event)
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
self.assertEqual(num_orig_vertices + num_added_vertices +
|
||||
num_deduced_vertices + num_network_alarm_vertices +
|
||||
@ -907,8 +899,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
volume_event1['attachments'][0]['server_id'] = instances[0][VProps.ID]
|
||||
|
||||
processor.process_event(volume_event1)
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
# test asserts
|
||||
num_volumes = 1
|
||||
@ -955,8 +946,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
volume_event2['attachments'][0]['server_id'] = instances[1][VProps.ID]
|
||||
|
||||
processor.process_event(volume_event2)
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
# test asserts
|
||||
num_volumes = 2
|
||||
@ -1021,8 +1011,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
volume_event2['volume_attachment'][0]['instance_uuid'] = \
|
||||
volume_event2['attachments'][0]['server_id']
|
||||
processor.process_event(volume_event2)
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
# test asserts
|
||||
self.assertEqual(num_orig_vertices + num_volumes + num_deduced_alarms +
|
||||
@ -1106,8 +1095,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
volume_event1['volume_attachment'][0]['instance_uuid'] = \
|
||||
volume_event1['attachments'][0]['server_id']
|
||||
processor.process_event(volume_event1)
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
|
||||
# test asserts
|
||||
self.assertEqual(num_orig_vertices + num_volumes + num_deduced_alarms +
|
||||
@ -1352,8 +1340,7 @@ class TestScenarioEvaluator(TestFunctionalBase, TestConfiguration):
|
||||
def get_host_after_event(self, event_queue, nagios_event,
|
||||
processor, target_host):
|
||||
processor.process_event(nagios_event)
|
||||
while not event_queue.empty():
|
||||
processor.process_event(event_queue.get())
|
||||
self._consume_queue(event_queue, processor)
|
||||
host_v = self._get_entity_from_graph(NOVA_HOST_DATASOURCE,
|
||||
target_host,
|
||||
target_host,
|
||||
|
Loading…
x
Reference in New Issue
Block a user