Synchronize notification listener and evaluator queue

Change-Id: Ib16c36f174698d9644757dcc670a6816919f0e79
This commit is contained in:
Idan Hefetz 2017-07-19 07:44:33 +00:00
parent e2b964183c
commit 6714cc4dd4

View File

@ -13,6 +13,7 @@
# under the License. # under the License.
import datetime import datetime
import threading
from oslo_log import log from oslo_log import log
import oslo_messaging import oslo_messaging
@ -42,7 +43,8 @@ class VitrageGraphService(os_service.Service):
self.processor = proc.Processor(self.conf, self.processor = proc.Processor(self.conf,
self.init, self.init,
e_graph=entity_graph) e_graph=entity_graph)
self.listener = self._create_datasources_event_listener(conf) self.processor_lock = threading.RLock()
self.listener = self._create_datasources_event_listener()
def start(self): def start(self):
LOG.info("Vitrage Graph Service - Starting...") LOG.info("Vitrage Graph Service - Starting...")
@ -65,10 +67,6 @@ class VitrageGraphService(os_service.Service):
LOG.info("Vitrage Graph Service - Stopped!") LOG.info("Vitrage Graph Service - Stopped!")
def _process_events(self):
while True:
self._process_event_non_blocking()
def _process_event_non_blocking(self): def _process_event_non_blocking(self):
"""Process events received from datasource """Process events received from datasource
@ -77,6 +75,7 @@ class VitrageGraphService(os_service.Service):
seconds and goes to sleep for 1 second. if there are more events in seconds and goes to sleep for 1 second. if there are more events in
the queue they are done when timer returns. the queue they are done when timer returns.
""" """
self.processor_lock.acquire()
start_time = datetime.datetime.now() start_time = datetime.datetime.now()
while not self.evaluator_queue.empty(): while not self.evaluator_queue.empty():
time_delta = datetime.datetime.now() - start_time time_delta = datetime.datetime.now() - start_time
@ -84,6 +83,7 @@ class VitrageGraphService(os_service.Service):
break break
if not self.evaluator_queue.empty(): if not self.evaluator_queue.empty():
self.do_process(self.evaluator_queue) self.do_process(self.evaluator_queue)
self.processor_lock.release()
def do_process(self, queue): def do_process(self, queue):
try: try:
@ -92,23 +92,28 @@ class VitrageGraphService(os_service.Service):
except Exception as e: except Exception as e:
LOG.exception("Exception: %s", e) LOG.exception("Exception: %s", e)
def _create_datasources_event_listener(self, conf): def _create_datasources_event_listener(self):
topic = conf.datasources.notification_topic_collector topic = self.conf.datasources.notification_topic_collector
transport = messaging.get_transport(conf) transport = messaging.get_transport(self.conf)
targets = [oslo_messaging.Target(topic=topic)] targets = [oslo_messaging.Target(topic=topic)]
return messaging.get_notification_listener( return messaging.get_notification_listener(
transport, transport,
targets, targets,
[PushNotificationsEndpoint(self.processor.process_event)]) [PushNotificationsEndpoint(self.processor.process_event,
self.processor_lock)])
class PushNotificationsEndpoint(object): class PushNotificationsEndpoint(object):
def __init__(self, process_event_callback): def __init__(self, process_event_callback, processor_lock):
self.process_event_callback = process_event_callback self.process_event_callback = process_event_callback
self.processor_lock = processor_lock
def info(self, ctxt, publisher_id, event_type, payload, metadata): def info(self, ctxt, publisher_id, event_type, payload, metadata):
try: try:
self.processor_lock.acquire()
self.process_event_callback(payload) self.process_event_callback(payload)
except Exception as e: except Exception as e:
LOG.exception(e) LOG.exception(e)
finally:
self.processor_lock.release()