parallel evaluation fixes

worker call task_done also on failure.
manager to close queues once not needed.

Change-Id: Id5e001a020c91320d4176ccbd67c4a814a3ba6a8
This commit is contained in:
Idan Hefetz 2017-11-15 09:19:28 +00:00
parent f065416545
commit 9ea04388a6
3 changed files with 14 additions and 15 deletions

View File

@ -27,6 +27,7 @@ LOG = log.getLogger(__name__)
START_EVALUATION = 'start_evaluation'
POISON_PILL = None
class EvaluatorManager(EvaluatorBase):
@ -85,7 +86,9 @@ class EvaluatorManager(EvaluatorBase):
q.join()
def stop_all_workers(self):
self._notify_and_wait(None)
self._notify_and_wait(POISON_PILL)
for q in self._worker_queues:
q.close()
self._worker_queues = list()
def reload_all_workers(self, enabled=True):
@ -126,20 +129,17 @@ class EvaluatorWorker(os_service.Service):
def _read_queue(self):
while True:
try:
next_task = self._task_queue.get()
if next_task is None:
self._task_queue.task_done()
break # poison pill
self._do_task(next_task)
next_task = self._task_queue.get()
if next_task is POISON_PILL:
self._task_queue.task_done()
# Evaluator queue may have been updated, thus the sleep:
time.sleep(0)
break
try:
self._do_task(next_task)
except Exception as e:
# TODO(ihefetz): an exception here may break all the
# TODO(ihefetz): evaluators. If task_done was not called,
# TODO(ihefetz): evaluator manager will wait forever.
LOG.exception("Exception: %s", e)
LOG.exception("Graph may not be in sync: exception %s", e)
self._task_queue.task_done()
# Evaluator queue may have been updated, thus the sleep:
time.sleep(0)
def _do_task(self, task):
(before, current, is_vertex, action) = task

View File

@ -59,7 +59,6 @@ class GraphAlgorithm(object):
In sub-graph matching algorithms complexity is high in the general case
Here it is considerably mitigated as we have an anchor in the graph.
TODO(ihefetz) document this
:type known_mappings: list
:type sub_graph: driver.Graph

View File

@ -58,7 +58,7 @@ class Connection(base.Connection):
engine = self._engine_facade.get_engine()
engine.connect()
models.Base.metadata.create_all(engine, checkfirst=False)
# TODO(ihefetz) upgrade logic is missing
# TODO(idan_hefetz) upgrade logic is missing
def disconnect(self):
self._engine_facade.get_engine().dispose()