From 6714cc4dd49643dcd822a679bb10aa306f83d4d2 Mon Sep 17 00:00:00 2001 From: Idan Hefetz Date: Wed, 19 Jul 2017 07:44:33 +0000 Subject: [PATCH] Synchronize notification listener and evaluator queue Change-Id: Ib16c36f174698d9644757dcc670a6816919f0e79 --- vitrage/entity_graph/service.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/vitrage/entity_graph/service.py b/vitrage/entity_graph/service.py index 3d9468431..1f9fdf8ff 100644 --- a/vitrage/entity_graph/service.py +++ b/vitrage/entity_graph/service.py @@ -13,6 +13,7 @@ # under the License. import datetime +import threading from oslo_log import log import oslo_messaging @@ -42,7 +43,8 @@ class VitrageGraphService(os_service.Service): self.processor = proc.Processor(self.conf, self.init, e_graph=entity_graph) - self.listener = self._create_datasources_event_listener(conf) + self.processor_lock = threading.RLock() + self.listener = self._create_datasources_event_listener() def start(self): LOG.info("Vitrage Graph Service - Starting...") @@ -65,10 +67,6 @@ class VitrageGraphService(os_service.Service): LOG.info("Vitrage Graph Service - Stopped!") - def _process_events(self): - while True: - self._process_event_non_blocking() - def _process_event_non_blocking(self): """Process events received from datasource @@ -77,6 +75,7 @@ class VitrageGraphService(os_service.Service): seconds and goes to sleep for 1 second. if there are more events in the queue they are done when timer returns. """ + self.processor_lock.acquire() start_time = datetime.datetime.now() while not self.evaluator_queue.empty(): time_delta = datetime.datetime.now() - start_time @@ -84,6 +83,7 @@ class VitrageGraphService(os_service.Service): break if not self.evaluator_queue.empty(): self.do_process(self.evaluator_queue) + self.processor_lock.release() def do_process(self, queue): try: @@ -92,23 +92,28 @@ class VitrageGraphService(os_service.Service): except Exception as e: LOG.exception("Exception: %s", e) - def _create_datasources_event_listener(self, conf): - topic = conf.datasources.notification_topic_collector - transport = messaging.get_transport(conf) + def _create_datasources_event_listener(self): + topic = self.conf.datasources.notification_topic_collector + transport = messaging.get_transport(self.conf) targets = [oslo_messaging.Target(topic=topic)] return messaging.get_notification_listener( transport, targets, - [PushNotificationsEndpoint(self.processor.process_event)]) + [PushNotificationsEndpoint(self.processor.process_event, + self.processor_lock)]) class PushNotificationsEndpoint(object): - def __init__(self, process_event_callback): + def __init__(self, process_event_callback, processor_lock): self.process_event_callback = process_event_callback + self.processor_lock = processor_lock def info(self, ctxt, publisher_id, event_type, payload, metadata): try: + self.processor_lock.acquire() self.process_event_callback(payload) except Exception as e: LOG.exception(e) + finally: + self.processor_lock.release()