From 9e76ddd22009608c8199dd5b603b37f7e08643d6 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 16 Jul 2015 17:41:55 +0300 Subject: [PATCH] Stop execution by marking all tasks SKIPPED --- solar/solar/cli/orch.py | 19 ++++++++++++++----- solar/solar/orchestration/graph.py | 8 -------- solar/solar/orchestration/tasks.py | 14 +++++++++++--- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/solar/solar/cli/orch.py b/solar/solar/cli/orch.py index 74c46ef6..16898a36 100644 --- a/solar/solar/cli/orch.py +++ b/solar/solar/cli/orch.py @@ -36,10 +36,11 @@ def update(uid, plan): @click.argument('uid') def report(uid): colors = { - 'PENDING': 'blue', + 'PENDING': 'cyan', 'ERROR': 'red', 'SUCCESS': 'green', - 'INPROGRESS': 'yellow'} + 'INPROGRESS': 'yellow', + 'SKIPPED': 'blue'} report = graph.report_topo(uid) for item in report: @@ -78,7 +79,14 @@ def stop(uid): # using revoke(terminate=True) will lead to inability to restart execution # research possibility of customizations of # app.control and Panel.register in celery - graph.soft_stop(uid) + tasks.soft_stop.apply_async(args=[uid], queue='scheduler') + + +@orchestration.command() +@click.argument('uid') +def resume(uid): + graph.reset(uid, ['SKIPPED']) + tasks.schedule_start.apply_async(args=[uid], queue='scheduler') @orchestration.command() @@ -94,10 +102,11 @@ def dg(uid): plan = graph.get_graph(uid) colors = { - 'PENDING': 'blue', + 'PENDING': 'cyan', 'ERROR': 'red', 'SUCCESS': 'green', - 'INPROGRESS': 'yellow'} + 'INPROGRESS': 'yellow', + 'SKIPPED': 'blue'} for n in plan: color = colors[plan.node[n]['status']] diff --git a/solar/solar/orchestration/graph.py b/solar/solar/orchestration/graph.py index d7342e23..d71b5404 100644 --- a/solar/solar/orchestration/graph.py +++ b/solar/solar/orchestration/graph.py @@ -78,14 +78,6 @@ def reset(uid, states=None): save_graph(uid, dg) -def soft_stop(uid): - """Graph will stop when all currently inprogress tasks will be finished - """ - dg = get_graph(uid) - dg.graph['stop'] = True - save_graph(uid, dg) - - def report_topo(uid): dg = get_graph(uid) diff --git a/solar/solar/orchestration/tasks.py b/solar/solar/orchestration/tasks.py index 49794ac9..3478b92a 100644 --- a/solar/solar/orchestration/tasks.py +++ b/solar/solar/orchestration/tasks.py @@ -118,10 +118,18 @@ def schedule_start(plan_uid, start=None, end=None): - apply different policies to tasks """ dg = graph.get_graph(plan_uid) - dg.graph['stop'] = False schedule(plan_uid, dg) +@app.task +def soft_stop(plan_uid): + dg = graph.get_graph(plan_uid) + for n in dg: + if dg.node[n]['status'] == 'PENDING': + dg.node[n]['status'] = 'SKIPPED' + graph.save_graph(plan_uid, dg) + + @app.task def schedule_next(task_id, status, errmsg=None): plan_uid, task_name = task_id.rsplit(':', 1) @@ -149,7 +157,7 @@ def traverse(dg): visited = set() for node in dg: data = dg.node[node] - if data['status'] not in ('PENDING', 'INPROGRESS'): + if data['status'] not in ('PENDING', 'INPROGRESS', 'SKIPPED'): visited.add(node) for node in dg: @@ -157,7 +165,7 @@ def traverse(dg): if node in visited: continue - elif data['status'] == 'INPROGRESS': + elif data['status'] in ('INPROGRESS', 'SKIPPED'): continue predecessors = set(dg.predecessors(node))