Merge pull request #126 from dshulyak/graph_filtering

Graph filtering
This commit is contained in:
Łukasz Oleś 2015-09-09 13:38:30 +02:00
commit c61d0198e2
11 changed files with 452 additions and 40 deletions

View File

@ -1,12 +1,11 @@
#!/usr/bin/python
import subprocess
import click
import networkx as nx
from solar.orchestration import graph
from solar.orchestration import tasks
from solar.orchestration import filters
from solar.orchestration import utils
from solar.cli.uids_history import SOLARUID
@ -26,7 +25,7 @@ def orchestration():
@orchestration.command()
@click.argument('plan', type=click.File('rb'))
def create(plan):
click.echo(graph.create_plan(plan.read()))
click.echo(graph.create_plan(plan.read()).graph['uid'])
@orchestration.command()
@ -53,16 +52,29 @@ def report(uid):
msg += ' :: {}'.format(item[2])
click.echo(click.style(msg, fg=colors[item[1]]))
@orchestration.command()
@click.argument('uid', type=SOLARUID)
@click.option('--start', '-s', multiple=True)
@click.option('--end', '-e', multiple=True)
def filter(uid, start, end):
graph.reset_filtered(uid)
plan = graph.get_graph(uid)
errors = filters.filter(plan, start=start, end=end)
if errors:
raise click.ClickException('\n'.join(errors))
graph.save_graph(uid, plan)
utils.write_graph(plan)
click.echo('Created {name}.png'.format(name=plan.graph['name']))
@orchestration.command(name='run-once')
@click.argument('uid', type=SOLARUID)
@click.option('--start', default=None)
@click.option('--end', default=None)
def run_once(uid, start, end):
def run_once(uid):
tasks.schedule_start.apply_async(
args=[uid],
kwargs={'start': start, 'end': end},
queue='scheduler')
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def restart(uid):
@ -81,7 +93,7 @@ def reset(uid):
def stop(uid):
# TODO(dshulyak) how to do "hard" stop?
# using revoke(terminate=True) will lead to inability to restart execution
# research possibility of customizations of
# research possibility of customizations
# app.control and Panel.register in celery
tasks.soft_stop.apply_async(args=[uid], queue='scheduler')
@ -102,23 +114,15 @@ def retry(uid):
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def dg(uid):
@click.option('--start', '-s', multiple=True)
@click.option('--end', '-e', multiple=True)
def dg(uid, start, end):
plan = graph.get_graph(uid)
colors = {
'PENDING': 'cyan',
'ERROR': 'red',
'SUCCESS': 'green',
'INPROGRESS': 'yellow',
'SKIPPED': 'blue'}
for n in plan:
color = colors[plan.node[n]['status']]
plan.node[n]['color'] = color
nx.write_dot(plan, '{name}.dot'.format(name=plan.graph['name']))
subprocess.call(
'tred {name}.dot | dot -Tpng -o {name}.png'.format(name=plan.graph['name']),
shell=True)
if start or end:
errors = filters.filter(plan, start=start, end=end)
if errors:
raise click.ClickException('\n'.join(errors))
utils.write_graph(plan)
click.echo('Created {name}.png'.format(name=plan.graph['name']))

View File

@ -50,7 +50,7 @@ def staged_item(log_action):
@changes.command()
def process():
uid = change.send_to_orchestration()
uid = change.send_to_orchestration().graph['uid']
remember_uid(uid)
click.echo(uid)

View File

@ -0,0 +1,3 @@

View File

@ -0,0 +1,91 @@
import networkx as nx
from .traversal import VISITED, states
from solar import errors
def get_dfs_postorder_subgraph(dg, nodes):
result = set()
for node in nodes:
result.update(nx.dfs_postorder_nodes(dg, source=node))
return dg.subgraph(result)
def end_at(dg, nodes):
"""Returns subgraph that will guarantee that predecessors are visited
dg - directed graph
nodes - iterable with node names
"""
return set(get_dfs_postorder_subgraph(dg.reverse(), nodes).nodes())
def start_from(dg, start_nodes):
"""Guarantee that all paths starting from specific *nodes* will be visited
"""
visited = {n for n in dg if dg.node[n].get('status') in VISITED}
# sorting nodes in topological order will guarantee that all predecessors
# of current node were already walked, when current going to be considered
for node in nx.topological_sort(dg):
preds = dg.predecessors(node)
if not preds and node in start_nodes:
visited.add(node)
if preds:
for pred in preds:
if pred not in visited:
break
else:
visited.add(node)
return visited
def validate(dg, start_nodes, end_nodes, err_msgs):
error_msgs = err_msgs[:]
not_in_the_graph_msg = 'Node {} is not present in graph {}'
for n in start_nodes:
if n not in dg:
error_msgs.append(not_in_the_graph_msg.format(n, dg.graph['uid']))
for n in end_nodes:
if n not in dg:
if start_nodes:
error_msgs.append('No path from {} to {}'.format(start_nodes, n))
else:
error_msgs.append(not_in_the_graph_msg.format(n, dg.graph['uid']))
return error_msgs
def filter(dg, start=None, end=None, tasks=(), skip_with=states.SKIPPED.name):
"""
TODO(dshulyak) skip_with should also support NOOP, which will instead
of blocking task, and its successors, should mark task as visited
:param skip_with: SKIPPED or NOOP
"""
error_msgs = []
subpath = dg.nodes()
if tasks:
subpath = tasks
else:
subgraph = dg
if start:
error_msgs = validate(subgraph, start, [], error_msgs)
if error_msgs:
return error_msgs
subpath = start_from(subgraph, start)
subgraph = dg.subgraph(subpath)
if end:
error_msgs = validate(subgraph, start, end, error_msgs)
if error_msgs:
return error_msgs
subpath = end_at(subgraph, end)
for node in dg:
if node not in subpath:
dg.node[node]['status'] = skip_with
return None

View File

@ -5,9 +5,9 @@ import uuid
import networkx as nx
import redis
import yaml
from solar import utils
from .traversal import states
r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1)
@ -36,12 +36,17 @@ get_plan = get_graph
def parse_plan(plan_data):
""" parses yaml definition and returns graph
"""
plan = yaml.load(plan_data)
plan = utils.yaml_load(plan_data)
dg = nx.MultiDiGraph()
dg.graph['name'] = plan['name']
for task in plan['tasks']:
defaults = {
'status': 'PENDING',
'errmsg': None,
}
defaults.update(task['parameters'])
dg.add_node(
task['uid'], status='PENDING', errmsg=None, **task['parameters'])
task['uid'], **defaults)
for v in task.get('before', ()):
dg.add_edge(task['uid'], v)
for u in task.get('after', ()):
@ -49,10 +54,11 @@ def parse_plan(plan_data):
return dg
def create_plan_from_graph(dg):
def create_plan_from_graph(dg, save=True):
dg.graph['uid'] = "{0}:{1}".format(dg.graph['name'], str(uuid.uuid4()))
save_graph(dg.graph['uid'], dg)
return dg.graph['uid']
if save:
save_graph(dg.graph['uid'], dg)
return dg
def show(uid):
@ -73,11 +79,11 @@ def show(uid):
return utils.yaml_dump(result)
def create_plan(plan_data):
def create_plan(plan_data, save=True):
"""
"""
dg = parse_plan(plan_data)
return create_plan_from_graph(dg)
return create_plan_from_graph(dg, save=save)
def update_plan(uid, plan_data):
@ -94,14 +100,18 @@ def update_plan(uid, plan_data):
return uid
def reset(uid, states=None):
def reset(uid, state_list=None):
dg = get_graph(uid)
for n in dg:
if states is None or dg.node[n]['status'] in states:
dg.node[n]['status'] = 'PENDING'
if state_list is None or dg.node[n]['status'] in state_list:
dg.node[n]['status'] = states.PENDING.name
save_graph(uid, dg)
def reset_filtered(uid):
reset(uid, state_list=[states.SKIPPED.name, states.NOOP.name])
def report_topo(uid):
dg = get_graph(uid)

View File

@ -8,6 +8,7 @@ from celery.app import task
import redis
from solar.orchestration import graph
from solar.orchestration import filters
from solar.core import actions
from solar.core import resource
from solar.system_log.tasks import commit_logitem, error_logitem
@ -116,7 +117,7 @@ def schedule(plan_uid, dg):
@app.task(name='schedule_start')
def schedule_start(plan_uid, start=None, end=None):
def schedule_start(plan_uid):
"""On receive finished task should update storage with task result:
- find successors that should be executed

View File

@ -14,9 +14,12 @@ SKIPPED - not visited, and should be skipped from execution
NOOP - task wont be executed, but should be treated as visited
"""
from enum import Enum
VISITED = ('SUCCESS', 'ERROR', 'NOOP')
BLOCKED = ('INPROGRESS', 'SKIPPED')
states = Enum('States', 'SUCCESS ERROR NOOP INPROGRESS SKIPPED PENDING')
VISITED = (states.SUCCESS.name, states.ERROR.name, states.NOOP.name)
BLOCKED = (states.INPROGRESS.name, states.SKIPPED.name)
def traverse(dg):

View File

@ -0,0 +1,24 @@
import subprocess
import networkx as nx
def write_graph(plan):
"""
:param plan: networkx Graph object
"""
colors = {
'PENDING': 'cyan',
'ERROR': 'red',
'SUCCESS': 'green',
'INPROGRESS': 'yellow',
'SKIPPED': 'blue'}
for n in plan:
color = colors[plan.node[n]['status']]
plan.node[n]['color'] = color
nx.write_dot(plan, '{name}.dot'.format(name=plan.graph['name']))
subprocess.call(
'tred {name}.dot | dot -Tpng -o {name}.png'.format(name=plan.graph['name']),
shell=True)

View File

@ -0,0 +1,154 @@
name: system_log
tasks:
- after: []
before:
- riak_service3.run
- hosts_file3.run
parameters:
args:
- node3
- run
errmsg: null
status: PENDING
type: solar_resource
uid: node3.run
- after:
- node3.run
before:
- riak_service3.run
parameters:
args:
- hosts_file3
- run
errmsg: null
status: PENDING
type: solar_resource
uid: hosts_file3.run
- after: []
before:
- riak_service2.run
- hosts_file2.run
parameters:
args:
- node2
- run
errmsg: null
status: PENDING
type: solar_resource
uid: node2.run
- after: []
before:
- hosts_file1.run
- riak_service1.run
parameters:
args:
- node1
- run
errmsg: null
status: PENDING
type: solar_resource
uid: node1.run
- after:
- node2.run
before:
- riak_service2.run
parameters:
args:
- hosts_file2
- run
errmsg: null
status: PENDING
type: solar_resource
uid: hosts_file2.run
- after:
- node1.run
before:
- riak_service1.run
parameters:
args:
- hosts_file1
- run
errmsg: null
status: PENDING
type: solar_resource
uid: hosts_file1.run
- after:
- hosts_file1.run
- node1.run
before:
- riak_service3.run
- riak_service2.run
parameters:
args:
- riak_service1
- run
errmsg: null
status: PENDING
type: solar_resource
uid: riak_service1.run
- after:
- node3.run
- riak_service1.run
- hosts_file3.run
before:
- riak_service3.join
parameters:
args:
- riak_service3
- run
errmsg: null
status: PENDING
type: solar_resource
uid: riak_service3.run
- after:
- riak_service3.run
before:
- riak_service1.commit
parameters:
args:
- riak_service3
- join
errmsg: null
status: PENDING
type: solar_resource
uid: riak_service3.join
- after:
- node2.run
- riak_service1.run
- hosts_file2.run
before:
- riak_service2.join
parameters:
args:
- riak_service2
- run
errmsg: null
status: PENDING
type: solar_resource
uid: riak_service2.run
- after:
- riak_service2.run
before:
- riak_service1.commit
parameters:
args:
- riak_service2
- join
errmsg: null
status: PENDING
type: solar_resource
uid: riak_service2.join
- after:
- riak_service2.join
- riak_service3.join
before: []
parameters:
args:
- riak_service1
- commit
errmsg: null
status: PENDING
type: solar_resource
uid: riak_service1.commit
uid: system_log:565581a1-80a0-425d-bb5c-d1cc4f48ffda

View File

@ -0,0 +1,28 @@
name: two_path
tasks:
- uid: a
parameters:
type: echo
args: [a]
- uid: b
parameters:
type: echo
args: [b]
after: [a]
- uid: c
parameters:
type: echo
args: [c]
- uid: d
parameters:
type: echo
args: [d]
after: [c]
- uid: e
parameters:
type: echo
args: [e]
after: [b,d]

View File

@ -0,0 +1,94 @@
import os
from pytest import fixture
from pytest import mark
import networkx as nx
from solar.orchestration import graph
from solar.orchestration import filters
from solar.orchestration.traversal import states
from solar.utils import yaml_load
@fixture
def dg_ex1():
dg = nx.DiGraph()
dg.add_nodes_from(['n1', 'n2', 'n3', 'n4', 'n5'])
dg.add_path(['n1', 'n5'])
dg.add_path(['n3', 'n5'])
dg.add_path(['n1', 'n2', 'n4'])
return dg
@mark.parametrize("end_nodes,visited", [
(['n5'], {'n1', 'n3', 'n5'}),
(['n4'], {'n1', 'n2', 'n4'}),
(['n4', 'n5'], {'n1', 'n2', 'n3', 'n4', 'n5'}),
])
def test_end_at(dg_ex1, end_nodes, visited):
assert set(filters.end_at(dg_ex1, end_nodes)) == visited
@mark.parametrize("start_nodes,visited", [
(['n3'], {'n3'}),
(['n1'], {'n1', 'n2', 'n4'}),
(['n1', 'n3'], {'n1', 'n2', 'n3', 'n4', 'n5'})
])
def test_start_from(dg_ex1, start_nodes, visited):
assert set(filters.start_from(dg_ex1, start_nodes)) == visited
@fixture
def dg_ex2():
dg = nx.DiGraph()
dg.add_nodes_from(['n1', 'n2', 'n3', 'n4', 'n5'])
dg.add_edges_from([('n1', 'n3'), ('n2', 'n3'), ('n3', 'n4'), ('n3', 'n5')])
return dg
@fixture
def riak_plan():
riak_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'orch_fixtures',
'riak.yml')
return graph.create_plan(riak_path, save=False)
def test_riak_start_node1(riak_plan):
assert filters.start_from(riak_plan, ['node1.run']) == {'node1.run', 'hosts_file1.run', 'riak_service1.run'}
def test_riak_end_hosts_file1(riak_plan):
assert filters.end_at(riak_plan, ['hosts_file1.run']) == {'node1.run', 'hosts_file1.run'}
def test_start_at_two_nodes(riak_plan):
assert filters.start_from(riak_plan, ['node1.run', 'node2.run']) == \
{'hosts_file1.run', 'riak_service2.run', 'riak_service2.join',
'hosts_file2.run', 'node2.run', 'riak_service1.run', 'node1.run'}
def test_initial_from_node1_traverse(riak_plan):
filters.filter(riak_plan, start=['node1.run'])
pending = {n for n in riak_plan if riak_plan.node[n]['status'] == states.PENDING.name}
assert pending == {'hosts_file1.run', 'riak_service1.run', 'node1.run'}
def test_second_from_node2_with_node1_walked(riak_plan):
success = {'hosts_file1.run', 'riak_service1.run', 'node1.run'}
for n in success:
riak_plan.node[n]['status'] = states.SUCCESS.name
filters.filter(riak_plan, start=['node2.run'])
pending = {n for n in riak_plan if riak_plan.node[n]['status'] == states.PENDING.name}
assert pending == {'hosts_file2.run', 'riak_service2.run',
'node2.run', 'riak_service2.join'}
def test_end_joins(riak_plan):
filters.filter(
riak_plan,
start=['node1.run', 'node2.run', 'node3.run'],
end=['riak_service2.join', 'riak_service3.join'])
skipped = {n for n in riak_plan if riak_plan.node[n]['status'] == states.SKIPPED.name}
assert skipped == {'riak_service1.commit'}