Stop execution by marking all tasks SKIPPED

This commit is contained in:
Dmitry Shulyak 2015-07-16 17:41:55 +03:00
parent d331a99126
commit 9e76ddd220
3 changed files with 25 additions and 16 deletions

View File

@ -36,10 +36,11 @@ def update(uid, plan):
@click.argument('uid') @click.argument('uid')
def report(uid): def report(uid):
colors = { colors = {
'PENDING': 'blue', 'PENDING': 'cyan',
'ERROR': 'red', 'ERROR': 'red',
'SUCCESS': 'green', 'SUCCESS': 'green',
'INPROGRESS': 'yellow'} 'INPROGRESS': 'yellow',
'SKIPPED': 'blue'}
report = graph.report_topo(uid) report = graph.report_topo(uid)
for item in report: for item in report:
@ -78,7 +79,14 @@ def stop(uid):
# using revoke(terminate=True) will lead to inability to restart execution # using revoke(terminate=True) will lead to inability to restart execution
# research possibility of customizations of # research possibility of customizations of
# app.control and Panel.register in celery # app.control and Panel.register in celery
graph.soft_stop(uid) tasks.soft_stop.apply_async(args=[uid], queue='scheduler')
@orchestration.command()
@click.argument('uid')
def resume(uid):
graph.reset(uid, ['SKIPPED'])
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
@orchestration.command() @orchestration.command()
@ -94,10 +102,11 @@ def dg(uid):
plan = graph.get_graph(uid) plan = graph.get_graph(uid)
colors = { colors = {
'PENDING': 'blue', 'PENDING': 'cyan',
'ERROR': 'red', 'ERROR': 'red',
'SUCCESS': 'green', 'SUCCESS': 'green',
'INPROGRESS': 'yellow'} 'INPROGRESS': 'yellow',
'SKIPPED': 'blue'}
for n in plan: for n in plan:
color = colors[plan.node[n]['status']] color = colors[plan.node[n]['status']]

View File

@ -78,14 +78,6 @@ def reset(uid, states=None):
save_graph(uid, dg) save_graph(uid, dg)
def soft_stop(uid):
"""Graph will stop when all currently inprogress tasks will be finished
"""
dg = get_graph(uid)
dg.graph['stop'] = True
save_graph(uid, dg)
def report_topo(uid): def report_topo(uid):
dg = get_graph(uid) dg = get_graph(uid)

View File

@ -118,10 +118,18 @@ def schedule_start(plan_uid, start=None, end=None):
- apply different policies to tasks - apply different policies to tasks
""" """
dg = graph.get_graph(plan_uid) dg = graph.get_graph(plan_uid)
dg.graph['stop'] = False
schedule(plan_uid, dg) schedule(plan_uid, dg)
@app.task
def soft_stop(plan_uid):
dg = graph.get_graph(plan_uid)
for n in dg:
if dg.node[n]['status'] == 'PENDING':
dg.node[n]['status'] = 'SKIPPED'
graph.save_graph(plan_uid, dg)
@app.task @app.task
def schedule_next(task_id, status, errmsg=None): def schedule_next(task_id, status, errmsg=None):
plan_uid, task_name = task_id.rsplit(':', 1) plan_uid, task_name = task_id.rsplit(':', 1)
@ -149,7 +157,7 @@ def traverse(dg):
visited = set() visited = set()
for node in dg: for node in dg:
data = dg.node[node] data = dg.node[node]
if data['status'] not in ('PENDING', 'INPROGRESS'): if data['status'] not in ('PENDING', 'INPROGRESS', 'SKIPPED'):
visited.add(node) visited.add(node)
for node in dg: for node in dg:
@ -157,7 +165,7 @@ def traverse(dg):
if node in visited: if node in visited:
continue continue
elif data['status'] == 'INPROGRESS': elif data['status'] in ('INPROGRESS', 'SKIPPED'):
continue continue
predecessors = set(dg.predecessors(node)) predecessors = set(dg.predecessors(node))