From 72f8164f04b77a9a011a29af1eda93e6d7a2c04c Mon Sep 17 00:00:00 2001 From: Idan Hefetz Date: Thu, 30 Nov 2017 14:40:27 +0000 Subject: [PATCH] use messaging instead of multiprocessing queue Multiprocessing queue causes deadlocks and is removed. Entity graph service now listens on two topics, evaluator notifications should be handled with a higher priority. Change-Id: I53f94a684c72353a7aa7b88fe7322f83ebc7194a --- .zuul.yaml | 2 - vitrage/cli/graph.py | 8 +- vitrage/entity_graph/__init__.py | 2 + .../consistency/consistency_enforcer.py | 6 +- vitrage/entity_graph/consistency/service.py | 13 +- vitrage/entity_graph/service.py | 115 ++++++++++-------- vitrage/entity_graph/vitrage_init.py | 75 ++++++------ vitrage/evaluator/actions/action_executor.py | 17 +-- vitrage/evaluator/evaluator_base.py | 3 +- vitrage/evaluator/evaluator_service.py | 17 +-- vitrage/evaluator/scenario_evaluator.py | 6 +- .../consistency/test_consistency.py | 15 ++- .../evaluator/test_action_executor.py | 27 ++-- .../evaluator/test_scenario_evaluator.py | 17 ++- .../entity_graph/test_processor_service.py | 72 +++++++++++ 15 files changed, 254 insertions(+), 141 deletions(-) create mode 100644 vitrage/tests/unit/entity_graph/test_processor_service.py diff --git a/.zuul.yaml b/.zuul.yaml index e59ee86df..472a45de5 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -5,10 +5,8 @@ - vitrage-dsvm-api-py27 - vitrage-dsvm-datasources-py27 - vitrage-dsvm-api-py35: - voting: false branches: ^(?!stable/(newton|ocata|pike)).*$ - vitrage-dsvm-datasources-py35: - voting: false branches: ^(?!stable/(newton|ocata|pike)).*$ gate: jobs: diff --git a/vitrage/cli/graph.py b/vitrage/cli/graph.py index af5f03d33..6d2794426 100644 --- a/vitrage/cli/graph.py +++ b/vitrage/cli/graph.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import multiprocessing import sys from oslo_service import service as os_service @@ -38,20 +37,17 @@ def main(): print(VITRAGE_TITLE) conf = service.prepare_service() - evaluator_queue = multiprocessing.Queue() e_graph = entity_graph.get_graph_driver(conf)('Entity Graph') launcher = os_service.ServiceLauncher(conf) full_scenario_repo = ScenarioRepository(conf) clear_db(conf) - launcher.launch_service(VitrageGraphService( - conf, evaluator_queue, e_graph)) + launcher.launch_service(VitrageGraphService(conf, e_graph)) launcher.launch_service(VitrageApiHandlerService( conf, e_graph, full_scenario_repo)) - launcher.launch_service(VitrageConsistencyService( - conf, evaluator_queue, e_graph)) + launcher.launch_service(VitrageConsistencyService(conf, e_graph)) launcher.wait() diff --git a/vitrage/entity_graph/__init__.py b/vitrage/entity_graph/__init__.py index 68c0a9245..d5ef919f7 100644 --- a/vitrage/entity_graph/__init__.py +++ b/vitrage/entity_graph/__init__.py @@ -32,6 +32,8 @@ OPTS = [ help='graph driver implementation class'), ] +EVALUATOR_TOPIC = 'vitrage.evaluator' + def get_graph_driver(conf): try: diff --git a/vitrage/entity_graph/consistency/consistency_enforcer.py b/vitrage/entity_graph/consistency/consistency_enforcer.py index c413951c2..a50401982 100644 --- a/vitrage/entity_graph/consistency/consistency_enforcer.py +++ b/vitrage/entity_graph/consistency/consistency_enforcer.py @@ -35,10 +35,10 @@ class ConsistencyEnforcer(object): def __init__(self, conf, - evaluator_queue, + actions_callback, entity_graph): self.conf = conf - self.evaluator_queue = evaluator_queue + self.actions_callback = actions_callback self.graph = entity_graph def periodic_process(self): @@ -104,7 +104,7 @@ class ConsistencyEnforcer(object): VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY], VProps.IS_REAL_VITRAGE_ID: True } - self.evaluator_queue.put(event) + self.actions_callback('consistency', event) @staticmethod def _filter_vertices_to_be_deleted(vertices): diff --git a/vitrage/entity_graph/consistency/service.py b/vitrage/entity_graph/consistency/service.py index 02355cadf..54092944d 100644 --- a/vitrage/entity_graph/consistency/service.py +++ b/vitrage/entity_graph/consistency/service.py @@ -17,6 +17,8 @@ from oslo_service import service as os_service from vitrage.entity_graph.consistency.consistency_enforcer \ import ConsistencyEnforcer +from vitrage.entity_graph import EVALUATOR_TOPIC +from vitrage.messaging import VitrageNotifier LOG = log.getLogger(__name__) @@ -25,21 +27,22 @@ class VitrageConsistencyService(os_service.Service): def __init__(self, conf, - evaluator_queue, entity_graph): super(VitrageConsistencyService, self).__init__() self.conf = conf - self.evaluator_queue = evaluator_queue self.entity_graph = entity_graph + self.actions_notifier = VitrageNotifier( + conf, 'vitrage_consistency', EVALUATOR_TOPIC) def start(self): LOG.info("Vitrage Consistency Service - Starting...") super(VitrageConsistencyService, self).start() - consistency_enf = ConsistencyEnforcer(self.conf, - self.evaluator_queue, - self.entity_graph) + consistency_enf = ConsistencyEnforcer( + conf=self.conf, + actions_callback=self.actions_notifier.notify, + entity_graph=self.entity_graph) self.tg.add_timer(self.conf.datasources.snapshots_interval, consistency_enf.periodic_process, initial_delay=60 + diff --git a/vitrage/entity_graph/service.py b/vitrage/entity_graph/service.py index 7402e7e6e..f37a3fc9a 100644 --- a/vitrage/entity_graph/service.py +++ b/vitrage/entity_graph/service.py @@ -11,15 +11,15 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -import datetime import threading +import time from oslo_log import log import oslo_messaging from oslo_service import service as os_service -from vitrage.entity_graph.processor import processor as proc +from vitrage.entity_graph import EVALUATOR_TOPIC +from vitrage.entity_graph.processor.processor import Processor from vitrage.entity_graph.vitrage_init import VitrageInit from vitrage.evaluator.evaluator_service import EvaluatorManager from vitrage import messaging @@ -31,30 +31,31 @@ class VitrageGraphService(os_service.Service): def __init__(self, conf, - evaluator_queue, graph): super(VitrageGraphService, self).__init__() self.conf = conf - self.evaluator_queue = evaluator_queue self.graph = graph - self.evaluator = EvaluatorManager(conf, graph, evaluator_queue) - self.init = VitrageInit(conf, graph, self.evaluator, evaluator_queue) - self.processor = proc.Processor(self.conf, - self.init, - e_graph=graph) - self.processor_lock = threading.RLock() - self.listener = self._create_datasources_event_listener() + self.evaluator = EvaluatorManager(conf, graph) + self.init = VitrageInit(conf, graph, self.evaluator) + self.processor = Processor(self.conf, self.init, e_graph=graph) + self.listener = self._init_listener() + + def _init_listener(self): + collector_topic = self.conf.datasources.notification_topic_collector + evaluator_topic = EVALUATOR_TOPIC + return TwoPriorityListener( + self.conf, + self.processor.process_event, + collector_topic, + evaluator_topic) def start(self): LOG.info("Vitrage Graph Service - Starting...") - super(VitrageGraphService, self).start() - self.tg.add_timer(0.1, self._process_event_non_blocking) self.tg.add_thread( self.init.initializing_process, on_end_messages_func=self.processor.on_recieved_all_end_messages) self.listener.start() - LOG.info("Vitrage Graph Service - Started!") def stop(self, graceful=False): @@ -66,50 +67,66 @@ class VitrageGraphService(os_service.Service): LOG.info("Vitrage Graph Service - Stopped!") - def _process_event_non_blocking(self): - """Process events received from datasource - In order that other services (such as graph consistency, api handler) - could get work time as well, the work processing performed for 2 - seconds and goes to sleep for 1 second. if there are more events in - the queue they are done when timer returns. - """ - with self.processor_lock: - start_time = datetime.datetime.now() - while not self.evaluator_queue.empty(): - time_delta = datetime.datetime.now() - start_time - if time_delta.total_seconds() >= 2: - break - if not self.evaluator_queue.empty(): - self.do_process(self.evaluator_queue) +PRIORITY_DELAY = 0.05 - def do_process(self, queue): - try: - event = queue.get() - self.processor.process_event(event) - except Exception as e: - LOG.exception("Exception: %s", e) - def _create_datasources_event_listener(self): - topic = self.conf.datasources.notification_topic_collector - transport = messaging.get_transport(self.conf) - targets = [oslo_messaging.Target(topic=topic)] +class TwoPriorityListener(object): + def __init__(self, conf, do_work_func, topic_low, topic_high): + self._conf = conf + self._do_work_func = do_work_func + self._lock = threading.Lock() + self._high_event_finish_time = 0 + + self._low_pri_listener = self._init_listener( + topic_low, self._do_low_priority_work) + self._high_pri_listener = self._init_listener( + topic_high, self._do_high_priority_work) + + def start(self): + self._high_pri_listener.start() + self._low_pri_listener.start() + + def stop(self): + self._low_pri_listener.stop() + self._high_pri_listener.stop() + + def wait(self): + self._low_pri_listener.wait() + self._high_pri_listener.wait() + + def _do_high_priority_work(self, event): + self._lock.acquire() + self._do_work_func(event) + self._high_event_finish_time = time.time() + self._lock.release() + + def _do_low_priority_work(self, event): + while True: + self._lock.acquire() + if (time.time() - self._high_event_finish_time) < PRIORITY_DELAY: + self._lock.release() + time.sleep(PRIORITY_DELAY) + else: + break + self._do_work_func(event) + self._lock.release() + + def _init_listener(self, topic, callback): + if not topic: + return return messaging.get_notification_listener( - transport, - targets, - [PushNotificationsEndpoint(self.processor.process_event, - self.processor_lock)]) + transport=messaging.get_transport(self._conf), + targets=[oslo_messaging.Target(topic=topic)], + endpoints=[PushNotificationsEndpoint(callback)]) class PushNotificationsEndpoint(object): - - def __init__(self, process_event_callback, processor_lock): + def __init__(self, process_event_callback): self.process_event_callback = process_event_callback - self.processor_lock = processor_lock def info(self, ctxt, publisher_id, event_type, payload, metadata): try: - with self.processor_lock: - self.process_event_callback(payload) + self.process_event_callback(payload) except Exception as e: LOG.exception(e) diff --git a/vitrage/entity_graph/vitrage_init.py b/vitrage/entity_graph/vitrage_init.py index 38568439d..809a202e2 100644 --- a/vitrage/entity_graph/vitrage_init.py +++ b/vitrage/entity_graph/vitrage_init.py @@ -14,13 +14,7 @@ from oslo_log import log import time -from vitrage.common.constants import DatasourceAction -from vitrage.common.constants import DatasourceProperties as DSProps -from vitrage.common.constants import EntityCategory -from vitrage.common.constants import GraphAction from vitrage.common.constants import VertexProperties as VProps -from vitrage.datasources.consistency import CONSISTENCY_DATASOURCE -from vitrage.utils.datetime import utcnow LOG = log.getLogger(__name__) @@ -30,11 +24,10 @@ class VitrageInit(object): RECEIVED_ALL_END_MESSAGES = 'received_all_end_messages' FINISHED = 'finished' - def __init__(self, conf, graph=None, evaluator=None, evaluator_queue=None): + def __init__(self, conf, graph=None, evaluator=None): self.conf = conf self.graph = graph self.evaluator = evaluator - self.evaluator_queue = evaluator_queue self.status = self.STARTED self.end_messages = {} @@ -50,14 +43,14 @@ class VitrageInit(object): on_end_messages_func() - timestamp = str(utcnow()) self.evaluator.run_evaluator() - if not self._wait_for_action(self.evaluator_queue.empty): - LOG.error('Evaluator Queue Not Empty') - - self._mark_old_deduced_alarms_as_deleted(timestamp, self.graph, - self.evaluator_queue) + # TODO(idan_hefetz) As vitrage is not yet persistent, there aren't + # TODO(idan_hefetz) any deduced alarms to be removed during init + # if not self._wait_for_action(self.evaluator_queue.empty): + # LOG.error('Evaluator Queue Not Empty') + # self._mark_old_deduced_alarms_as_deleted(timestamp, self.graph, + # self.evaluator_queue) self.status = self.FINISHED LOG.info('Init Finished') @@ -87,30 +80,30 @@ class VitrageInit(object): count_retries += 1 time.sleep(self.conf.consistency.initialization_interval) - def _mark_old_deduced_alarms_as_deleted(self, timestamp, graph, out_queue): - query = { - 'and': [ - {'==': {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}}, - {'==': {VProps.VITRAGE_TYPE: VProps.VITRAGE_TYPE}}, - {'<': {VProps.VITRAGE_SAMPLE_TIMESTAMP: timestamp}} - ] - } - old_deduced_alarms = graph.get_vertices(query_dict=query) - self._push_events_to_queue(old_deduced_alarms, - GraphAction.DELETE_ENTITY, - out_queue) - - def _push_events_to_queue(self, vertices, action, out_queue): - for vertex in vertices: - event = { - DSProps.ENTITY_TYPE: CONSISTENCY_DATASOURCE, - DSProps.DATASOURCE_ACTION: DatasourceAction.UPDATE, - DSProps.SAMPLE_DATE: str(utcnow()), - DSProps.EVENT_TYPE: action, - VProps.VITRAGE_ID: vertex[VProps.VITRAGE_ID], - VProps.ID: vertex.get(VProps.ID, None), - VProps.VITRAGE_TYPE: vertex[VProps.VITRAGE_TYPE], - VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY], - VProps.IS_REAL_VITRAGE_ID: True - } - out_queue.put(event) + # def _mark_old_deduced_alarms_as_deleted(self, timestamp,graph,out_queue): + # query = { + # 'and': [ + # {'==': {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}}, + # {'==': {VProps.VITRAGE_TYPE: VProps.VITRAGE_TYPE}}, + # {'<': {VProps.VITRAGE_SAMPLE_TIMESTAMP: timestamp}} + # ] + # } + # old_deduced_alarms = graph.get_vertices(query_dict=query) + # self._push_events_to_queue(old_deduced_alarms, + # GraphAction.DELETE_ENTITY, + # out_queue) + # + # def _push_events_to_queue(self, vertices, action, out_queue): + # for vertex in vertices: + # event = { + # DSProps.ENTITY_TYPE: CONSISTENCY_DATASOURCE, + # DSProps.DATASOURCE_ACTION: DatasourceAction.UPDATE, + # DSProps.SAMPLE_DATE: str(utcnow()), + # DSProps.EVENT_TYPE: action, + # VProps.VITRAGE_ID: vertex[VProps.VITRAGE_ID], + # VProps.ID: vertex.get(VProps.ID, None), + # VProps.VITRAGE_TYPE: vertex[VProps.VITRAGE_TYPE], + # VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY], + # VProps.IS_REAL_VITRAGE_ID: True + # } + # out_queue.put(event) diff --git a/vitrage/evaluator/actions/action_executor.py b/vitrage/evaluator/actions/action_executor.py index f6490c4d0..ed01ed382 100644 --- a/vitrage/evaluator/actions/action_executor.py +++ b/vitrage/evaluator/actions/action_executor.py @@ -44,11 +44,14 @@ from vitrage.utils import datetime as datetime_utils LOG = log.getLogger(__name__) +EVALUATOR_EVENT = 'evaluator.event' + class ActionExecutor(object): - def __init__(self, conf, event_queue): - self.event_queue = event_queue + def __init__(self, conf, actions_callback): + + self.actions_callback = actions_callback self.notifier = EvaluatorNotifier(conf) self.action_recipes = ActionExecutor._register_action_recipes() @@ -78,7 +81,7 @@ class ActionExecutor(object): ActionExecutor._add_default_properties(event) event[EVALUATOR_EVENT_TYPE] = ADD_VERTEX - self.event_queue.put(event) + self.actions_callback(EVALUATOR_EVENT, event) def _update_vertex(self, params): @@ -86,14 +89,14 @@ class ActionExecutor(object): ActionExecutor._add_default_properties(event) event[EVALUATOR_EVENT_TYPE] = UPDATE_VERTEX - self.event_queue.put(event) + self.actions_callback(EVALUATOR_EVENT, event) def _remove_vertex(self, params): event = copy.deepcopy(params) ActionExecutor._add_default_properties(event) event[EVALUATOR_EVENT_TYPE] = REMOVE_VERTEX - self.event_queue.put(event) + self.actions_callback(EVALUATOR_EVENT, event) def _add_edge(self, params): @@ -101,7 +104,7 @@ class ActionExecutor(object): ActionExecutor._add_default_properties(event) event[EVALUATOR_EVENT_TYPE] = ADD_EDGE - self.event_queue.put(event) + self.actions_callback(EVALUATOR_EVENT, event) def _remove_edge(self, params): @@ -109,7 +112,7 @@ class ActionExecutor(object): ActionExecutor._add_default_properties(event) event[EVALUATOR_EVENT_TYPE] = REMOVE_EDGE - self.event_queue.put(event) + self.actions_callback(EVALUATOR_EVENT, event) def _execute_external(self, params): diff --git a/vitrage/evaluator/evaluator_base.py b/vitrage/evaluator/evaluator_base.py index be1c60767..06281ed3e 100644 --- a/vitrage/evaluator/evaluator_base.py +++ b/vitrage/evaluator/evaluator_base.py @@ -18,11 +18,10 @@ import six @six.add_metaclass(abc.ABCMeta) class EvaluatorBase(object): - def __init__(self, conf, entity_graph, evaluator_queue): + def __init__(self, conf, entity_graph): super(EvaluatorBase, self).__init__() self._conf = conf self._entity_graph = entity_graph - self._evaluator_queue = evaluator_queue @abc.abstractmethod def run_evaluator(self): diff --git a/vitrage/evaluator/evaluator_service.py b/vitrage/evaluator/evaluator_service.py index 5d1bdb1e3..0ece2c95b 100644 --- a/vitrage/evaluator/evaluator_service.py +++ b/vitrage/evaluator/evaluator_service.py @@ -18,10 +18,13 @@ import time from oslo_concurrency import processutils from oslo_log import log from oslo_service import service as os_service + +from vitrage.entity_graph import EVALUATOR_TOPIC from vitrage.evaluator.evaluator_base import EvaluatorBase from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator from vitrage.evaluator.scenario_repository import ScenarioRepository +from vitrage.messaging import VitrageNotifier LOG = log.getLogger(__name__) @@ -32,9 +35,8 @@ POISON_PILL = None class EvaluatorManager(EvaluatorBase): - def __init__(self, conf, entity_graph, evaluator_queue): - super(EvaluatorManager, self).__init__(conf, entity_graph, - evaluator_queue) + def __init__(self, conf, entity_graph): + super(EvaluatorManager, self).__init__(conf, entity_graph) self._workers_num = conf.evaluator.workers or \ processutils.get_worker_count() self._worker_queues = list() @@ -63,7 +65,6 @@ class EvaluatorManager(EvaluatorBase): tasks_queue, self._entity_graph, scenario_repo, - self._evaluator_queue, enabled) self._p_launcher.launch_service(w) self._worker_queues.append(tasks_queue) @@ -103,25 +104,27 @@ class EvaluatorWorker(os_service.Service): task_queue, entity_graph, scenario_repo, - evaluator_queue, enabled=False): super(EvaluatorWorker, self).__init__() self._conf = conf self._task_queue = task_queue self._entity_graph = entity_graph self._scenario_repo = scenario_repo - self._evaluator_queue = evaluator_queue self._enabled = enabled self._evaluator = None def start(self): super(EvaluatorWorker, self).start() + actions_callback = VitrageNotifier( + conf=self._conf, + publisher_id='vitrage_evaluator', + topic=EVALUATOR_TOPIC).notify self._entity_graph.notifier._subscriptions = [] # Quick n dirty self._evaluator = ScenarioEvaluator( self._conf, self._entity_graph, self._scenario_repo, - self._evaluator_queue, + actions_callback, self._enabled) self.tg.add_thread(self._read_queue) LOG.info("EvaluatorWorkerService - Started!") diff --git a/vitrage/evaluator/scenario_evaluator.py b/vitrage/evaluator/scenario_evaluator.py index 1c8068fb4..8b201f69c 100644 --- a/vitrage/evaluator/scenario_evaluator.py +++ b/vitrage/evaluator/scenario_evaluator.py @@ -57,12 +57,12 @@ class ScenarioEvaluator(EvaluatorBase): conf, e_graph, scenario_repo, - event_queue, + actions_callback, enabled=False): - super(ScenarioEvaluator, self).__init__(conf, e_graph, event_queue) + super(ScenarioEvaluator, self).__init__(conf, e_graph) self._db_connection = storage.get_connection_from_config(self._conf) self._scenario_repo = scenario_repo - self._action_executor = ActionExecutor(self._conf, event_queue) + self._action_executor = ActionExecutor(self._conf, actions_callback) self._entity_graph.subscribe(self.process_event) self._active_actions_tracker = ActiveActionsTracker( self._conf, self._db_connection) diff --git a/vitrage/tests/functional/entity_graph/consistency/test_consistency.py b/vitrage/tests/functional/entity_graph/consistency/test_consistency.py index dbaa50910..589400700 100644 --- a/vitrage/tests/functional/entity_graph/consistency/test_consistency.py +++ b/vitrage/tests/functional/entity_graph/consistency/test_consistency.py @@ -88,14 +88,25 @@ class TestConsistencyFunctional(TestFunctionalBase): cls.graph) cls.event_queue = queue.Queue() + + def actions_callback(event_type, data): + """Mock notify method + + Mocks vitrage.messaging.VitrageNotifier.notify(event_type, data) + + :param event_type: is currently always the same and is ignored + :param data: + """ + cls.event_queue.put(data) + scenario_repo = ScenarioRepository(cls.conf) cls.evaluator = ScenarioEvaluator(cls.conf, cls.processor.entity_graph, scenario_repo, - cls.event_queue) + actions_callback) cls.consistency_enforcer = ConsistencyEnforcer( cls.conf, - cls.event_queue, + actions_callback, cls.processor.entity_graph) @unittest.skip("test_initializing_process skipping") diff --git a/vitrage/tests/functional/evaluator/test_action_executor.py b/vitrage/tests/functional/evaluator/test_action_executor.py index 7b97fc840..218c42d65 100644 --- a/vitrage/tests/functional/evaluator/test_action_executor.py +++ b/vitrage/tests/functional/evaluator/test_action_executor.py @@ -66,6 +66,14 @@ class TestActionExecutor(TestFunctionalBase): for datasource_name in cls.conf.datasources.types: register_opts(cls.conf, datasource_name, cls.conf.datasources.path) + def _init_executer(self): + event_queue = queue.Queue() + + def actions_callback(event_type, data): + event_queue.put(data) + + return event_queue, ActionExecutor(self.conf, actions_callback) + def test_execute_set_state(self): # Test Setup @@ -80,8 +88,7 @@ class TestActionExecutor(TestFunctionalBase): props = {TFields.STATE: OperationalResourceState.SUBOPTIMAL} action_spec = ActionSpecs(0, ActionType.SET_STATE, targets, props) - event_queue = queue.Queue() - action_executor = ActionExecutor(self.conf, event_queue) + event_queue, action_executor = self._init_executer() # Test Action - do action_executor.execute(action_spec, ActionMode.DO) @@ -131,8 +138,7 @@ class TestActionExecutor(TestFunctionalBase): props = {} action_spec = ActionSpecs(0, ActionType.MARK_DOWN, targets, props) - event_queue = queue.Queue() - action_executor = ActionExecutor(self.conf, event_queue) + event_queue, action_executor = self._init_executer() # Test Action - do action_executor.execute(action_spec, ActionMode.DO) @@ -168,8 +174,7 @@ class TestActionExecutor(TestFunctionalBase): props = {} action_spec = ActionSpecs(0, ActionType.MARK_DOWN, targets, props) - event_queue = queue.Queue() - action_executor = ActionExecutor(self.conf, event_queue) + event_queue, action_executor = self._init_executer() # Test Action - do action_executor.execute(action_spec, ActionMode.DO) @@ -223,8 +228,7 @@ class TestActionExecutor(TestFunctionalBase): action_spec = ActionSpecs( 0, ActionType.ADD_CAUSAL_RELATIONSHIP, targets, {}) - event_queue = queue.Queue() - action_executor = ActionExecutor(self.conf, event_queue) + event_queue, action_executor = self._init_executer() before_edge = processor.entity_graph.get_edge(alarm2.vertex_id, alarm1.vertex_id, @@ -266,8 +270,8 @@ class TestActionExecutor(TestFunctionalBase): alarm_vertex_attrs = {VProps.VITRAGE_TYPE: VITRAGE_DATASOURCE} before_alarms = processor.entity_graph.get_vertices( vertex_attr_filter=alarm_vertex_attrs) - event_queue = queue.Queue() - action_executor = ActionExecutor(self.conf, event_queue) + + event_queue, action_executor = self._init_executer() # Test Action action_executor.execute(action_spec, ActionMode.DO) @@ -330,8 +334,7 @@ class TestActionExecutor(TestFunctionalBase): before_alarms = processor.entity_graph.get_vertices( vertex_attr_filter=alarm_vertex_attrs) - event_queue = queue.Queue() - action_executor = ActionExecutor(self.conf, event_queue) + event_queue, action_executor = self._init_executer() # Test Action - undo action_executor.execute(action_spec, ActionMode.UNDO) diff --git a/vitrage/tests/functional/evaluator/test_scenario_evaluator.py b/vitrage/tests/functional/evaluator/test_scenario_evaluator.py index 6b6553bd0..986f2955e 100644 --- a/vitrage/tests/functional/evaluator/test_scenario_evaluator.py +++ b/vitrage/tests/functional/evaluator/test_scenario_evaluator.py @@ -1370,8 +1370,21 @@ class TestScenarioEvaluator(TestFunctionalBase): def _init_system(self): processor = self._create_processor_with_graph(self.conf) event_queue = queue.Queue() - evaluator = ScenarioEvaluator(self.conf, processor.entity_graph, - self.scenario_repository, event_queue, + + def actions_callback(event_type, data): + """Mock notify method + + Mocks vitrage.messaging.VitrageNotifier.notify(event_type, data) + + :param event_type: is currently always the same and is ignored + :param data: + """ + event_queue.put(data) + + evaluator = ScenarioEvaluator(self.conf, + processor.entity_graph, + self.scenario_repository, + actions_callback, enabled=True) return event_queue, processor, evaluator diff --git a/vitrage/tests/unit/entity_graph/test_processor_service.py b/vitrage/tests/unit/entity_graph/test_processor_service.py new file mode 100644 index 000000000..3c0063f80 --- /dev/null +++ b/vitrage/tests/unit/entity_graph/test_processor_service.py @@ -0,0 +1,72 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import threading + +from vitrage.entity_graph.service import TwoPriorityListener +from vitrage.tests import base + + +class TwoPriorityListenerTest(base.BaseTest): + + @classmethod + def setUpClass(cls): + super(TwoPriorityListenerTest, cls).setUpClass() + cls.calc_result = 0 + + def do_work(self, x): + if x: + self.calc_result = self.calc_result * 2 + else: + self.calc_result = self.calc_result + 1 + + def test_queue_coordination(self): + explain = """ + initially calc_result is 0. + each high priority call multiplies by *2 + each low priority call adds +1 + so, if all the high calls are performed first, and then all the low, + the result should be the number of low priority calls. + 0*(2^n) + 1*n + """ + priority_listener = TwoPriorityListener(None, self.do_work, None, None) + + def write_high(): + for i in range(10000): + priority_listener._do_high_priority_work(True) + + def write_low(): + for i in range(10000): + priority_listener._do_low_priority_work(False) + + self.calc_result = 0 + t1 = threading.Thread(name='high_1', target=write_high) + t2 = threading.Thread(name='high_2', target=write_high) + t3 = threading.Thread(name='low_1', target=write_low) + t4 = threading.Thread(name='low_2', target=write_low) + self._start_and_join(t1, t2, t3, t4) + self.assertEqual(20000, self.calc_result, explain) + + self.calc_result = 0 + t1 = threading.Thread(name='high_1', target=write_high) + t2 = threading.Thread(name='low_1', target=write_low) + t3 = threading.Thread(name='low_2', target=write_low) + t4 = threading.Thread(name='high_2', target=write_high) + self._start_and_join(t1, t2, t3, t4) + self.assertEqual(20000, self.calc_result, explain) + + def _start_and_join(self, *args): + for t in args: + t.start() + for t in args: + t.join()