From 0d8c951b21544e90f3856fea3bfca8b69f847f38 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 12 May 2016 11:27:23 +0300 Subject: [PATCH] Refresh transaction after each orchestration report tick It was possible to fetch stale data from database with high isolation levels, to avoid such issues we will simply restart transaction after each report interval Change-Id: I64a9843aa64adf4c710a9f593bdeaa2f5b3c5fce --- solar/orchestration/graph.py | 9 ++++++--- solar/test/test_graph_api.py | 3 ++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/solar/orchestration/graph.py b/solar/orchestration/graph.py index f78e38ad..a929477c 100644 --- a/solar/orchestration/graph.py +++ b/solar/orchestration/graph.py @@ -19,7 +19,6 @@ from collections import Counter import networkx as nx -from solar.dblayer.model import clear_cache from solar.dblayer.model import ModelMeta from solar.dblayer.solar_models import Task from solar import errors @@ -212,8 +211,6 @@ def wait_finish(uid, timeout): start_time = time.time() while start_time + timeout >= time.time(): - # need to clear cache before fetching updated status - clear_cache() dg = get_graph(uid) summary = Counter() summary.update({s.name: 0 for s in states}) @@ -221,6 +218,12 @@ def wait_finish(uid, timeout): yield summary if summary[states.PENDING.name] + summary[states.INPROGRESS.name] == 0: return + else: + # on db backends with snapshot isolation level and higher + # updates wont be visible after start of transaction, + # in order to report state correctly we will "refresh" transaction + ModelMeta.session_end() + ModelMeta.session_start() else: raise errors.ExecutionTimeout( diff --git a/solar/test/test_graph_api.py b/solar/test/test_graph_api.py index c2af4931..3b527836 100644 --- a/solar/test/test_graph_api.py +++ b/solar/test/test_graph_api.py @@ -16,6 +16,7 @@ from mock import Mock import networkx as nx from pytest import fixture +from solar.dblayer.model import clear_cache from solar.orchestration import graph from solar.orchestration.traversal import states @@ -86,7 +87,7 @@ def test_several_updates(simple_plan): if t.name == 'echo_stuff') echo_task.status = states.ERROR.name echo_task.save() - + clear_cache() assert next(graph.wait_finish(simple_plan.graph['uid'], 10)) == { 'SKIPPED': 0, 'SUCCESS': 0,