From 1b3af69d1d5cc7b3b97c9e9ebbed9cf07fb5b3c2 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Wed, 24 Jun 2015 15:14:02 +0300 Subject: [PATCH 01/12] Graph orchestration with celery --- orch/__init__.py | 0 orch/examples.py | 65 ++++++++++++++++++++++++ orch/graph.py | 22 ++++++++ orch/tasks.py | 129 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 216 insertions(+) create mode 100644 orch/__init__.py create mode 100644 orch/examples.py create mode 100644 orch/graph.py create mode 100644 orch/tasks.py diff --git a/orch/__init__.py b/orch/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orch/examples.py b/orch/examples.py new file mode 100644 index 00000000..954009b2 --- /dev/null +++ b/orch/examples.py @@ -0,0 +1,65 @@ + + +import networkx as nx + +from orch.tasks import * +from orch.graph import * + + +def ex1(): + dg = nx.DiGraph() + + dg.add_node('rabbitmq_cluster1.create', type='cmd', args=['echo "installing cluster"'], status='PENDING') + dg.add_node('rabbitmq_cluster2.join', type='cmd', args=['echo "joining"'], status='PENDING') + dg.add_node('rabbitmq_cluster3.join', type='cmd', args=['echo "joining"'], status='PENDING') + dg.add_node('rabbitmq_cluster.ready', type='anchor', args=[], status='PENDING') + + dg.add_edge('rabbitmq_cluster1.create', 'rabbitmq_cluster2.join') + dg.add_edge('rabbitmq_cluster1.create', 'rabbitmq_cluster3.join') + dg.add_edge('rabbitmq_cluster1.create', 'rabbitmq_cluster.ready') + dg.add_edge('rabbitmq_cluster2.join', 'rabbitmq_cluster.ready') + dg.add_edge('rabbitmq_cluster3.join', 'rabbitmq_cluster.ready') + + dg.add_node('compute1', type='cmd', args=['echo "compute1"'], status='PENDING') + dg.add_node('compute2', type='cmd', args=['echo "compute2"'], status='PENDING') + dg.add_node('compute3', type='cmd', args=['echo "compute3"'], status='PENDING') + dg.add_node('compute4', type='error', args=['echo "compute4"'], status='PENDING') + dg.add_node('compute5', type='error', args=['echo "compute5"'], status='PENDING') + dg.add_node('compute_ready', type='fault_tolerance', args=[60], status='PENDING') + + dg.add_edge('rabbitmq_cluster.ready', 'compute1') + dg.add_edge('rabbitmq_cluster.ready', 'compute2') + dg.add_edge('rabbitmq_cluster.ready', 'compute3') + dg.add_edge('rabbitmq_cluster.ready', 'compute4') + dg.add_edge('rabbitmq_cluster.ready', 'compute5') + + dg.add_edge('compute1', 'compute_ready') + dg.add_edge('compute2', 'compute_ready') + dg.add_edge('compute3', 'compute_ready') + dg.add_edge('compute4', 'compute_ready') + dg.add_edge('compute5', 'compute_ready') + + return dg + + +def ex1_exec(): + save_graph('current', ex1()) + schedule_next.apply() + + +def ex2(): + + dg = nx.DiGraph() + + dg.add_node('rabbitmq_cluster2.join', type='cmd', args=['echo "joining"'], status='PENDING') + dg.add_node('rabbitmq_cluster3.join', type='cmd', args=['echo "joining"'], status='PENDING') + dg.add_node('rabbitmq_cluster.ready', type='anchor', args=[], status='PENDING') + + dg.add_edge('rabbitmq_cluster2.join', 'rabbitmq_cluster.ready') + dg.add_edge('rabbitmq_cluster3.join', 'rabbitmq_cluster.ready') + + return dg + +def ex2_exec(): + save_graph('current', ex2()) + schedule_start.apply_async() \ No newline at end of file diff --git a/orch/graph.py b/orch/graph.py new file mode 100644 index 00000000..b3ff6114 --- /dev/null +++ b/orch/graph.py @@ -0,0 +1,22 @@ + + +import networkx as nx + +import redis +import json + +r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) + + +def save_graph(name, graph): + r.set('{}:nodes'.format(name), json.dumps(graph.node.items())) + r.set('{}:edges'.format(name), json.dumps(graph.edges(data=True))) + + +def get_graph(name): + dg = nx.DiGraph() + nodes = json.loads(r.get('{}:nodes'.format(name))) + edges = json.loads(r.get('{}:edges'.format(name))) + dg.add_nodes_from(nodes) + dg.add_edges_from(edges) + return dg diff --git a/orch/tasks.py b/orch/tasks.py new file mode 100644 index 00000000..6882793f --- /dev/null +++ b/orch/tasks.py @@ -0,0 +1,129 @@ + +from celery import Celery +from celery.app import task +from celery import group + +from functools import partial + +import subprocess +import time + +from orch import graph + + +app = Celery( + 'tasks', + backend='redis://10.0.0.2:6379/1', + broker='redis://10.0.0.2:6379/1') + + +class ReportTask(task.Task): + + def on_success(self, retval, task_id, args, kwargs): + schedule_next.apply_async(args=[task_id, 'SUCCESS'], queue='master') + + def on_failure(self, exc, task_id, args, kwargs, einfo): + schedule_next.apply_async(args=[task_id, 'ERROR'], queue='master') + + +solar_task = partial(app.task, base=ReportTask) + + +@solar_task +def cmd(cmd): + popen = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + out, err = popen.communicate() + return popen.returncode, out, err + + +@solar_task +def sleep(seconds): + time.sleep(seconds) + + +@solar_task +def error(message): + raise Exception('message') + + +@solar_task(bind=True) +def fault_tolerance(ctxt, percent): + dg = graph.get_graph('current') + success = 0.0 + predecessors = dg.predecessors(ctxt.request.id) + lth = len(predecessors) + + for s in predecessors: + if dg.node[s]['status'] == ['SUCCESS']: + success += 1 + + succes_percent = (success/lth) * 100 + if succes_percent < percent: + raise Exception('Cant proceed with, {0} < {1}'.format( + succes_percent, percent)) + + +@solar_task +def echo(message): + return message + + +@solar_task(bind=True) +def anchor(ctxt, *args): + dg = graph.get_graph('current') + for s in dg.predecessors(ctxt.request.id): + if dg.node[s]['status'] != 'SUCCESS': + raise Exception('One of the tasks erred, cant proceeed') + + +@app.task +def schedule_start(): + """On receive finished task should update storage with task result: + + - find successors that should be executed + - apply different policies to tasks + """ + dg = graph.get_graph('current') + + next_tasks = list(get_next(dg)) + print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) + graph.save_graph('current', dg) + group(next_tasks)() + + +@app.task +def schedule_next(task_id, status): + dg = graph.get_graph('current') + dg.node[task_id]['status'] = status + + next_tasks = list(get_next(dg)) + print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) + graph.save_graph('current', dg) + group(next_tasks)() + + +def get_next(dg): + visited = set() + for node in dg: + data = dg.node[node] + if data['status'] not in ('PENDING', 'INPROGRESS'): + visited.add(node) + + for node in dg: + data = dg.node[node] + + if node in visited: + continue + elif data['status'] == 'INPROGRESS': + continue + + + predecessors = set(dg.predecessors(node)) + + if predecessors <= visited: + + task_name = 'orch.tasks.{0}'.format(data['type']) + task = app.tasks[task_name] + dg.node[node]['status'] = 'INPROGRESS' + yield task.subtask(data['args'], task_id=node) From 4e349cd0c05472d01a1c5e03cbd588219101a22e Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Wed, 24 Jun 2015 15:55:01 +0300 Subject: [PATCH 02/12] Add timelimit and target examples --- orch/examples.py | 42 ++++++++++++++++++++++++++++++++++++++---- orch/graph.py | 1 + orch/tasks.py | 14 ++++++++++++-- 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/orch/examples.py b/orch/examples.py index 954009b2..ac1f3cd8 100644 --- a/orch/examples.py +++ b/orch/examples.py @@ -42,9 +42,9 @@ def ex1(): return dg -def ex1_exec(): +def test_ex1_exec(): save_graph('current', ex1()) - schedule_next.apply() + schedule_start.apply_async(queue='master') def ex2(): @@ -60,6 +60,40 @@ def ex2(): return dg -def ex2_exec(): +def test_ex2_exec(): save_graph('current', ex2()) - schedule_start.apply_async() \ No newline at end of file + schedule_start.apply_async(queue='master') + + +def test_timelimit_exec(): + + dg = nx.DiGraph() + + dg.add_node( + 'timelimit_test', type='sleep', + args=[100], status='PENDING', + time_limit=10) + + dg.add_node( + 'soft_timelimit_test', type='sleep', + args=[100], status='PENDING', + soft_time_limit=10) + + save_graph('current', dg) + schedule_start.apply_async(queue='master') + + +def test_timeout_exec(): + # TODO(dshulyak) how to handle connectivity issues? + # or hardware failure ? + return + + +def test_target_exec(): + dg = nx.DiGraph() + + dg.add_node( + 'vagrant_reload', type='cmd', + args=['vagrant reload solar-dev1'], status='PENDING', target='ipmi') + save_graph('current', dg) + schedule_start.apply_async(queue='master') diff --git a/orch/graph.py b/orch/graph.py index b3ff6114..829be8ca 100644 --- a/orch/graph.py +++ b/orch/graph.py @@ -9,6 +9,7 @@ r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) def save_graph(name, graph): + # maybe it is possible to store part of information in AsyncResult backend r.set('{}:nodes'.format(name), json.dumps(graph.node.items())) r.set('{}:edges'.format(name), json.dumps(graph.edges(data=True))) diff --git a/orch/tasks.py b/orch/tasks.py index 6882793f..8f4e747f 100644 --- a/orch/tasks.py +++ b/orch/tasks.py @@ -55,7 +55,7 @@ def fault_tolerance(ctxt, percent): lth = len(predecessors) for s in predecessors: - if dg.node[s]['status'] == ['SUCCESS']: + if dg.node[s]['status'] == 'SUCCESS': success += 1 succes_percent = (success/lth) * 100 @@ -71,6 +71,7 @@ def echo(message): @solar_task(bind=True) def anchor(ctxt, *args): + # it should be configurable to wait for atleast 1 / 3 resources dg = graph.get_graph('current') for s in dg.predecessors(ctxt.request.id): if dg.node[s]['status'] != 'SUCCESS': @@ -126,4 +127,13 @@ def get_next(dg): task_name = 'orch.tasks.{0}'.format(data['type']) task = app.tasks[task_name] dg.node[node]['status'] = 'INPROGRESS' - yield task.subtask(data['args'], task_id=node) + subtask = task.subtask( + data['args'], task_id=node, + time_limit=data.get('time_limit', None), + soft_time_limit=data.get('soft_time_limit', None)) + + if data.get('target', None): + subtask.set(queue=data['target']) + + yield subtask + From 36b17071cec6d326ce8a94155691e06d4b953e62 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 25 Jun 2015 17:16:06 +0300 Subject: [PATCH 03/12] Add example with timeout support --- orch/graph.py | 2 + orch/tasks.py | 61 +++++++++++++++++++++---- orch/{examples.py => test_examples.py} | 63 +++++++++++++++++++++++++- 3 files changed, 114 insertions(+), 12 deletions(-) rename orch/{examples.py => test_examples.py} (66%) diff --git a/orch/graph.py b/orch/graph.py index 829be8ca..c8f91c7e 100644 --- a/orch/graph.py +++ b/orch/graph.py @@ -12,12 +12,14 @@ def save_graph(name, graph): # maybe it is possible to store part of information in AsyncResult backend r.set('{}:nodes'.format(name), json.dumps(graph.node.items())) r.set('{}:edges'.format(name), json.dumps(graph.edges(data=True))) + r.set('{}:attributes'.format(name), json.dumps(graph.graph)) def get_graph(name): dg = nx.DiGraph() nodes = json.loads(r.get('{}:nodes'.format(name))) edges = json.loads(r.get('{}:edges'.format(name))) + dg.graph = json.loads(r.get('{}:attributes'.format(name))) dg.add_nodes_from(nodes) dg.add_edges_from(edges) return dg diff --git a/orch/tasks.py b/orch/tasks.py index 8f4e747f..267d7c7b 100644 --- a/orch/tasks.py +++ b/orch/tasks.py @@ -2,20 +2,25 @@ from celery import Celery from celery.app import task from celery import group +from celery.exceptions import Ignore -from functools import partial +from functools import partial, wraps +from itertools import islice import subprocess import time from orch import graph +import redis app = Celery( 'tasks', backend='redis://10.0.0.2:6379/1', broker='redis://10.0.0.2:6379/1') +r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) + class ReportTask(task.Task): @@ -26,28 +31,46 @@ class ReportTask(task.Task): schedule_next.apply_async(args=[task_id, 'ERROR'], queue='master') -solar_task = partial(app.task, base=ReportTask) +solar_task = partial(app.task, base=ReportTask, bind=True) + + +def maybe_ignore(func): + """used to ignore tasks when they are in queue, but should be discarded + """ + + @wraps(func) + def wrapper(ctxt, *args, **kwargs): + if r.sismember('tasks.ignore', ctxt.request.id): + raise Ignore() + return func(ctxt, *args, **kwargs) + return wrapper @solar_task +@maybe_ignore def cmd(cmd): popen = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) out, err = popen.communicate() + rcode = popen.returncode + if rcode: + raise Exception('Command %s failed with err %s', cmd, err) return popen.returncode, out, err @solar_task -def sleep(seconds): +@maybe_ignore +def sleep(ctxt, seconds): time.sleep(seconds) @solar_task -def error(message): +@maybe_ignore +def error(ctxt, message): raise Exception('message') -@solar_task(bind=True) +@solar_task def fault_tolerance(ctxt, percent): dg = graph.get_graph('current') success = 0.0 @@ -65,11 +88,12 @@ def fault_tolerance(ctxt, percent): @solar_task -def echo(message): +@maybe_ignore +def echo(ctxt, message): return message -@solar_task(bind=True) +@solar_task def anchor(ctxt, *args): # it should be configurable to wait for atleast 1 / 3 resources dg = graph.get_graph('current') @@ -78,6 +102,15 @@ def anchor(ctxt, *args): raise Exception('One of the tasks erred, cant proceeed') +@app.task +def fire_timeout(task_id): + result = app.AsyncResult(task_id) + if result.state in ['ERROR', 'SUCCESS']: + return + r.sadd('tasks.ignore', task_id) + schedule_next.apply_async(args=[task_id, 'ERROR'], queue='master') + + @app.task def schedule_start(): """On receive finished task should update storage with task result: @@ -87,7 +120,8 @@ def schedule_start(): """ dg = graph.get_graph('current') - next_tasks = list(get_next(dg)) + concurrency = dg.graph.get('concurrency', None) + next_tasks = list(islice(get_next(dg), 0, concurrency)) print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) graph.save_graph('current', dg) group(next_tasks)() @@ -97,8 +131,8 @@ def schedule_start(): def schedule_next(task_id, status): dg = graph.get_graph('current') dg.node[task_id]['status'] = status - - next_tasks = list(get_next(dg)) + concurrency = dg.graph.get('concurrency', None) + next_tasks = list(islice(get_next(dg), 0, concurrency)) print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) graph.save_graph('current', dg) group(next_tasks)() @@ -135,5 +169,12 @@ def get_next(dg): if data.get('target', None): subtask.set(queue=data['target']) + timeout = data.get('timeout') + yield subtask + if timeout: + timeout_task = fire_timeout.subtask([node], countdown=timeout) + timeout_task.set(queue='master') + yield timeout_task + diff --git a/orch/examples.py b/orch/test_examples.py similarity index 66% rename from orch/examples.py rename to orch/test_examples.py index ac1f3cd8..a934b340 100644 --- a/orch/examples.py +++ b/orch/test_examples.py @@ -5,6 +5,15 @@ import networkx as nx from orch.tasks import * from orch.graph import * +from pytest import fixture + +import time + + +@fixture(autouse=True) +def clean_ignored(): + r.delete('tasks.ignore') + def ex1(): dg = nx.DiGraph() @@ -83,10 +92,30 @@ def test_timelimit_exec(): schedule_start.apply_async(queue='master') -def test_timeout_exec(): +def test_timeout(): # TODO(dshulyak) how to handle connectivity issues? # or hardware failure ? - return + dg = nx.DiGraph() + + dg.add_node( + 'test_timeout', type='echo', target='unreachable', + args=['yoyoyo'], status='PENDING', + timeout=1) + + save_graph('current', dg) + # two tasks will be fired - test_timeout and fire_timeout(test_timeout) + # with countdown set to 10 sec + schedule_start.apply_async(queue='master') + # after 10 seconds fire_timeout will set test_timeout to ERROR + time.sleep(1) + + # master host will start listening from unreachable queue, but task will be ignored + # e.g it will be acked, and fetched from broker, but not processed + assert app.control.add_consumer( + 'unreachable', reply=True, destination=['celery@master']) + dg = get_graph('current') + assert dg.node['test_timeout']['status'] == 'ERROR' + def test_target_exec(): @@ -97,3 +126,33 @@ def test_target_exec(): args=['vagrant reload solar-dev1'], status='PENDING', target='ipmi') save_graph('current', dg) schedule_start.apply_async(queue='master') + + +def test_limit_concurrency(): + # - no more than 2 tasks in general + dg = nx.DiGraph() + dg.graph['concurrency'] = 2 + + for i in range(4): + dg.add_node( + str(i), type='echo', + args=[i], status='PENDING') + + save_graph('current', dg) + schedule_start.apply_async(queue='master') + + +def test_ignored(): + + dg = nx.DiGraph() + + dg.add_node( + 'test_ignored', type='echo', args=['hello'], status='PENDING') + r.sadd('tasks.ignore', 'test_ignored') + save_graph('current', dg) + + schedule_start.apply_async(queue='master') + ignored = app.AsyncResult('test_ignored') + ignored.get() + dg = get_graph('current') + assert dg.node['test_ignored']['status'] == {} From 6029c27f07cba404bd1bedea41702ce869d38049 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 26 Jun 2015 13:00:48 +0300 Subject: [PATCH 04/12] Basic API for orchestration --- cli.py | 64 +++++++++++++++++++++++++++++++++++ orch/examples/multi.yaml | 63 ++++++++++++++++++++++++++++++++++ orch/examples/simple.yaml | 10 ++++++ orch/examples/test_errors.yml | 34 +++++++++++++++++++ orch/graph.py | 52 ++++++++++++++++++++++++++++ orch/tasks.py | 32 ++++++++++-------- 6 files changed, 241 insertions(+), 14 deletions(-) create mode 100755 cli.py create mode 100644 orch/examples/multi.yaml create mode 100644 orch/examples/simple.yaml create mode 100644 orch/examples/test_errors.yml diff --git a/cli.py b/cli.py new file mode 100755 index 00000000..8a866290 --- /dev/null +++ b/cli.py @@ -0,0 +1,64 @@ +#!/usr/bin/python + +import click + +from orch import graph +from orch import tasks + + +@click.group() +def orchestration(): + pass + + +@click.command() +@click.argument('plan', type=click.File('rb')) +def create(plan): + click.echo(graph.create_plan(plan.read())) + + +@click.command() +@click.argument('uid') +def report(uid): + colors = { + 'PENDING': 'white', + 'ERROR': 'red', + 'SUCCESS': 'green', + 'INPROGRESS': 'yellow'} + + report = graph.report_topo(uid) + for item in report: + click.echo( + click.style('{} -> {}'.format(item[0], item[1]), fg=colors[item[1]])) + + +@click.command() +@click.argument('uid') +def execute(uid): + tasks.schedule_start.apply_async(args=[uid], queue='master') + + +@click.command() +@click.argument('uid') +@click.option('--reset', default=False, is_flag=True) +def restart(uid, reset): + if reset: + graph.reset(uid) + tasks.schedule_start.apply_async(args=[uid], queue='master') + + +@click.command() +@click.argument('uid') +def reset(uid): + graph.reset(uid) + + +orchestration.add_command(create) +orchestration.add_command(report) +orchestration.add_command(execute) +orchestration.add_command(restart) +orchestration.add_command(reset) + + +if __name__ == '__main__': + orchestration() diff --git a/orch/examples/multi.yaml b/orch/examples/multi.yaml new file mode 100644 index 00000000..9c8ce99e --- /dev/null +++ b/orch/examples/multi.yaml @@ -0,0 +1,63 @@ + +name: multi +tasks: + - uid: rabbitmq_cluster1.create + parameters: + type: cmd + args: ['echo rabbitmq_cluster1.create'] + before: [amqp_cluster_configured] + + - uid: rabbitmq_cluster2.join + parameters: + type: cmd + args: ['echo rabbitmq_cluster2.join'] + after: [rabbitmq_cluster1.create] + before: [amqp_cluster_configured] + - uid: rabbitmq_cluster3.join + parameters: + type: cmd + args: ['echo rabbitmq_cluster3.join'] + after: [rabbitmq_cluster1.create] + before: [amqp_cluster_configured] + + - uid: amqp_cluster_configured + parameters: + type: fault_tolerance + args: [100] + + - uid: compute1 + parameters: + type: echo + args: [compute1] + before: [compute_ready] + after: [amqp_cluster_configured] + - uid: compute2 + parameters: + type: echo + args: [compute2] + before: [compute_ready] + after: [amqp_cluster_configured] + - uid: compute3 + parameters: + type: echo + args: [compute3] + before: [compute_ready] + after: [amqp_cluster_configured] + - uid: compute4 + parameters: + type: error + args: [compute4] + before: [compute_ready] + after: [amqp_cluster_configured] + - uid: compute5 + parameters: + type: error + args: [compute5] + before: [compute_ready] + after: [amqp_cluster_configured] + + - uid: compute_ready + parameters: + type: fault_tolerance + args: [60] + diff --git a/orch/examples/simple.yaml b/orch/examples/simple.yaml new file mode 100644 index 00000000..0ff2f2e1 --- /dev/null +++ b/orch/examples/simple.yaml @@ -0,0 +1,10 @@ +name: simple +tasks: + - uid: sleep_some_time + parameters: + type: sleep + args: [10] + - uid: just_fail + parameters: + type: error + args: ['message'] diff --git a/orch/examples/test_errors.yml b/orch/examples/test_errors.yml new file mode 100644 index 00000000..fb9f3310 --- /dev/null +++ b/orch/examples/test_errors.yml @@ -0,0 +1,34 @@ + +name: errors +tasks: + - uid: compute1 + parameters: + type: echo + args: [compute1] + before: [compute_ready] + - uid: compute2 + parameters: + type: echo + args: [compute2] + before: [compute_ready] + - uid: compute3 + parameters: + type: echo + args: [compute3] + before: [compute_ready] + - uid: compute4 + parameters: + type: error + args: [compute4] + before: [compute_ready] + - uid: compute5 + parameters: + type: error + args: [compute5] + before: [compute_ready] + + - uid: compute_ready + parameters: + type: fault_tolerance + args: [80] + diff --git a/orch/graph.py b/orch/graph.py index c8f91c7e..a9bbdf95 100644 --- a/orch/graph.py +++ b/orch/graph.py @@ -5,6 +5,11 @@ import networkx as nx import redis import json +import yaml + +import uuid + + r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) @@ -23,3 +28,50 @@ def get_graph(name): dg.add_nodes_from(nodes) dg.add_edges_from(edges) return dg + + +get_plan = get_graph + + +def parse_plan(plan_data): + """ parses yaml definition and returns graph + """ + plan = yaml.load(plan_data) + dg = nx.DiGraph() + dg.graph['name'] = plan['name'] + for task in plan['tasks']: + dg.add_node( + task['uid'], status='PENDING', **task['parameters']) + for v in task.get('before', ()): + dg.add_edge(task['uid'], v) + for u in task.get('after', ()): + dg.add_edge(u, task['uid']) + return dg + + +def reset(uid): + dg = get_graph(uid) + for n in dg: + dg.node[n]['status'] = 'PENDING' + save_graph(uid, dg) + + +def create_plan(plan_data): + """ + """ + dg = parse_plan(plan_data) + dg.graph['uid'] = "{0}:{1}".format(dg.graph['name'], str(uuid.uuid4())) + save_graph(dg.graph['uid'], dg) + return dg.graph['uid'] + + +def report_topo(uid): + + dg = get_graph(uid) + report = [] + + for task in nx.topological_sort(dg): + status = dg.node[task]['status'] + report.append([task, status]) + + return report diff --git a/orch/tasks.py b/orch/tasks.py index 267d7c7b..ed751a34 100644 --- a/orch/tasks.py +++ b/orch/tasks.py @@ -48,7 +48,7 @@ def maybe_ignore(func): @solar_task @maybe_ignore -def cmd(cmd): +def cmd(ctxt, cmd): popen = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) out, err = popen.communicate() @@ -72,9 +72,12 @@ def error(ctxt, message): @solar_task def fault_tolerance(ctxt, percent): - dg = graph.get_graph('current') + task_id = ctxt.request.id + plan_uid, task_name = task_id.rsplit(':', 1) + + dg = graph.get_graph(plan_uid) success = 0.0 - predecessors = dg.predecessors(ctxt.request.id) + predecessors = dg.predecessors(task_name) lth = len(predecessors) for s in predecessors: @@ -112,29 +115,29 @@ def fire_timeout(task_id): @app.task -def schedule_start(): +def schedule_start(plan_uid): """On receive finished task should update storage with task result: - find successors that should be executed - apply different policies to tasks """ - dg = graph.get_graph('current') + dg = graph.get_graph(plan_uid) - concurrency = dg.graph.get('concurrency', None) - next_tasks = list(islice(get_next(dg), 0, concurrency)) + next_tasks = list(get_next(dg)) print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) - graph.save_graph('current', dg) + graph.save_graph(plan_uid, dg) group(next_tasks)() @app.task def schedule_next(task_id, status): - dg = graph.get_graph('current') - dg.node[task_id]['status'] = status - concurrency = dg.graph.get('concurrency', None) - next_tasks = list(islice(get_next(dg), 0, concurrency)) + plan_uid, task_name = task_id.rsplit(':', 1) + dg = graph.get_graph(plan_uid) + dg.node[task_name]['status'] = status + + next_tasks = list(get_next(dg)) print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) - graph.save_graph('current', dg) + graph.save_graph(plan_uid, dg) group(next_tasks)() @@ -157,12 +160,13 @@ def get_next(dg): predecessors = set(dg.predecessors(node)) if predecessors <= visited: + task_id = '{}:{}'.format(dg.graph['uid'], node) task_name = 'orch.tasks.{0}'.format(data['type']) task = app.tasks[task_name] dg.node[node]['status'] = 'INPROGRESS' subtask = task.subtask( - data['args'], task_id=node, + data['args'], task_id=task_id, time_limit=data.get('time_limit', None), soft_time_limit=data.get('soft_time_limit', None)) From fd66b6e7a12b850c0ec8921ab2a2127dc101b99e Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 26 Jun 2015 13:08:53 +0300 Subject: [PATCH 05/12] Add basic workflow into cli --- cli.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cli.py b/cli.py index 8a866290..3bd0635d 100755 --- a/cli.py +++ b/cli.py @@ -1,5 +1,6 @@ #!/usr/bin/python + import click from orch import graph @@ -8,7 +9,15 @@ from orch import tasks @click.group() def orchestration(): - pass + """ + \b + ./cli.py create orch/examples/multi.yaml + + ./cli.py execute + ./cli.py report + -> + ./cli.py restart --reset + """ @click.command() From 6d4018b56b809a811a67e5bc56aadc56d13afc1d Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 26 Jun 2015 13:49:30 +0300 Subject: [PATCH 06/12] Add soft stop support and update for deployment plan Only pending tasks will be affected when plan is soft-stopped Update will preserve old state of tasks, but change definition of task --- cli.py | 35 +++++++++++++++- orch/examples/for_stop.yaml | 11 +++++ orch/examples/simple.yaml | 4 +- orch/examples/upd_test_errors.yml | 34 +++++++++++++++ orch/graph.py | 37 +++++++++++++---- orch/tasks.py | 69 ++++++++++++++++++++----------- 6 files changed, 154 insertions(+), 36 deletions(-) create mode 100644 orch/examples/for_stop.yaml create mode 100644 orch/examples/upd_test_errors.yml diff --git a/cli.py b/cli.py index 3bd0635d..f39e4fc2 100755 --- a/cli.py +++ b/cli.py @@ -26,6 +26,12 @@ def create(plan): click.echo(graph.create_plan(plan.read())) +@click.command() +@click.argument('uid') +@click.argument('plan', type=click.File('rb')) +def update(uid, plan): + graph.update_plan(uid, plan.read()) + @click.command() @click.argument('uid') def report(uid): @@ -43,8 +49,13 @@ def report(uid): @click.command() @click.argument('uid') -def execute(uid): - tasks.schedule_start.apply_async(args=[uid], queue='master') +@click.option('--start', default=None) +@click.option('--end', default=None) +def execute(uid, start, end): + tasks.schedule_start.apply_async( + args=[uid], + kwargs={'start': start, 'end': end}, + queue='master') @click.command() @@ -62,11 +73,31 @@ def reset(uid): graph.reset(uid) +@click.command() +@click.argument('uid') +def stop(uid): + # TODO(dshulyak) how to do "hard" stop? + # using revoke(terminate=True) will lead to inability to restart execution + # research possibility of customizations of + # app.control and Panel.register in celery + graph.soft_stop(uid) + + +@click.command() +@click.argument('uid') +def retry(uid): + graph.reset(uid, ['ERROR']) + tasks.schedule_start.apply_async(args=[uid], queue='master') + + orchestration.add_command(create) +orchestration.add_command(update) orchestration.add_command(report) orchestration.add_command(execute) orchestration.add_command(restart) orchestration.add_command(reset) +orchestration.add_command(stop) +orchestration.add_command(retry) if __name__ == '__main__': diff --git a/orch/examples/for_stop.yaml b/orch/examples/for_stop.yaml new file mode 100644 index 00000000..fe12b144 --- /dev/null +++ b/orch/examples/for_stop.yaml @@ -0,0 +1,11 @@ +name: for_stop +tasks: + - uid: sleep_some_time + parameters: + type: sleep + args: [20] + before: [sleep_again] + - uid: sleep_again + parameters: + type: sleep + args: [20] diff --git a/orch/examples/simple.yaml b/orch/examples/simple.yaml index 0ff2f2e1..d643ff4e 100644 --- a/orch/examples/simple.yaml +++ b/orch/examples/simple.yaml @@ -1,8 +1,8 @@ name: simple tasks: - - uid: sleep_some_time + - uid: echo_stuff parameters: - type: sleep + type: echo args: [10] - uid: just_fail parameters: diff --git a/orch/examples/upd_test_errors.yml b/orch/examples/upd_test_errors.yml new file mode 100644 index 00000000..35670a14 --- /dev/null +++ b/orch/examples/upd_test_errors.yml @@ -0,0 +1,34 @@ + +name: errors +tasks: + - uid: compute1 + parameters: + type: echo + args: [compute1] + before: [compute_ready] + - uid: compute2 + parameters: + type: echo + args: [compute2] + before: [compute_ready] + - uid: compute3 + parameters: + type: echo + args: [compute3] + before: [compute_ready] + - uid: compute4 + parameters: + type: echo + args: [compute4] + before: [compute_ready] + - uid: compute5 + parameters: + type: error + args: [compute5] + before: [compute_ready] + + - uid: compute_ready + parameters: + type: fault_tolerance + args: [80] + diff --git a/orch/graph.py b/orch/graph.py index a9bbdf95..7a7aa3c4 100644 --- a/orch/graph.py +++ b/orch/graph.py @@ -49,13 +49,6 @@ def parse_plan(plan_data): return dg -def reset(uid): - dg = get_graph(uid) - for n in dg: - dg.node[n]['status'] = 'PENDING' - save_graph(uid, dg) - - def create_plan(plan_data): """ """ @@ -65,6 +58,36 @@ def create_plan(plan_data): return dg.graph['uid'] +def update_plan(uid, plan_data): + """update preserves old status of tasks if they werent removed + """ + dg = parse_plan(plan_data) + old_dg = get_graph(uid) + dg.graph = old_dg.graph + for n in dg: + if n in old_dg: + dg.node[n]['status'] = old_dg.node[n]['status'] + + save_graph(uid, dg) + return uid + + +def reset(uid, states=None): + dg = get_graph(uid) + for n in dg: + if states is None or dg.node[n]['status'] in states: + dg.node[n]['status'] = 'PENDING' + save_graph(uid, dg) + + +def soft_stop(uid): + """Graph will stop when all currently inprogress tasks will be finished + """ + dg = get_graph(uid) + dg.graph['stop'] = True + save_graph(uid, dg) + + def report_topo(uid): dg = get_graph(uid) diff --git a/orch/tasks.py b/orch/tasks.py index ed751a34..62fcd724 100644 --- a/orch/tasks.py +++ b/orch/tasks.py @@ -114,19 +114,24 @@ def fire_timeout(task_id): schedule_next.apply_async(args=[task_id, 'ERROR'], queue='master') +def schedule(plan_uid, dg): + if not dg.graph.get('stop'): + next_tasks = list(traverse(dg)) + print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) + group(next_tasks)() + graph.save_graph(plan_uid, dg) + + @app.task -def schedule_start(plan_uid): +def schedule_start(plan_uid, start=None, end=None): """On receive finished task should update storage with task result: - find successors that should be executed - apply different policies to tasks """ dg = graph.get_graph(plan_uid) - - next_tasks = list(get_next(dg)) - print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) - graph.save_graph(plan_uid, dg) - group(next_tasks)() + dg.graph['stop'] = False + schedule(plan_uid, dg) @app.task @@ -135,13 +140,17 @@ def schedule_next(task_id, status): dg = graph.get_graph(plan_uid) dg.node[task_name]['status'] = status - next_tasks = list(get_next(dg)) - print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) - graph.save_graph(plan_uid, dg) - group(next_tasks)() + schedule(plan_uid, dg) + +# TODO(dshulyak) some tasks should be evaluated even if not all predecessors +# succeded, how to identify this? +# - add ignor_error on edge +# - add ignore_predecessor_errors on task in consideration +# - make fault_tolerance not a task but a policy for all tasks +control_tasks = [fault_tolerance, anchor] -def get_next(dg): +def traverse(dg): visited = set() for node in dg: data = dg.node[node] @@ -156,7 +165,6 @@ def get_next(dg): elif data['status'] == 'INPROGRESS': continue - predecessors = set(dg.predecessors(node)) if predecessors <= visited: @@ -164,21 +172,32 @@ def get_next(dg): task_name = 'orch.tasks.{0}'.format(data['type']) task = app.tasks[task_name] - dg.node[node]['status'] = 'INPROGRESS' - subtask = task.subtask( - data['args'], task_id=task_id, - time_limit=data.get('time_limit', None), - soft_time_limit=data.get('soft_time_limit', None)) - if data.get('target', None): - subtask.set(queue=data['target']) + if all_success(dg, predecessors) or task in control_tasks: + dg.node[node]['status'] = 'INPROGRESS' + for t in generate_task(task, dg, data, task_id): + yield t - timeout = data.get('timeout') - yield subtask +def generate_task(task, dg, data, task_id): - if timeout: - timeout_task = fire_timeout.subtask([node], countdown=timeout) - timeout_task.set(queue='master') - yield timeout_task + subtask = task.subtask( + data['args'], task_id=task_id, + time_limit=data.get('time_limit', None), + soft_time_limit=data.get('soft_time_limit', None)) + if data.get('target', None): + subtask.set(queue=data['target']) + + timeout = data.get('timeout') + + yield subtask + + if timeout: + timeout_task = fire_timeout.subtask([task_id], countdown=timeout) + timeout_task.set(queue='master') + yield timeout_task + + +def all_success(dg, nodes): + return all((n for n in nodes if dg.node[n]['status'] == 'SUCCESS')) From 47ca8c8fdff49c5107550a92db7f076cac4f0700 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Tue, 30 Jun 2015 12:31:50 +0300 Subject: [PATCH 07/12] Add several examples and improvements to orchestration - Added example.py converted to plan description - Added error message - Plan can be printed as dot/png graph - Added TODO --- .gitignore | 5 ++ cli.py | 41 +++++++--- orch/TODO | 27 +++++++ orch/examples/example_py.yml | 143 +++++++++++++++++++++++++++++++++++ orch/graph.py | 5 +- orch/tasks.py | 26 +++++-- 6 files changed, 226 insertions(+), 21 deletions(-) create mode 100644 orch/TODO create mode 100644 orch/examples/example_py.yml diff --git a/.gitignore b/.gitignore index b221e333..ffe2a79f 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,8 @@ rs/ solar.log x-venv/ + +celery* + +*.dot +*.png \ No newline at end of file diff --git a/cli.py b/cli.py index f39e4fc2..17d9dade 100755 --- a/cli.py +++ b/cli.py @@ -6,6 +6,8 @@ import click from orch import graph from orch import tasks +import networkx as nx +import subprocess @click.group() def orchestration(): @@ -36,15 +38,17 @@ def update(uid, plan): @click.argument('uid') def report(uid): colors = { - 'PENDING': 'white', + 'PENDING': 'blue', 'ERROR': 'red', 'SUCCESS': 'green', 'INPROGRESS': 'yellow'} report = graph.report_topo(uid) for item in report: - click.echo( - click.style('{} -> {}'.format(item[0], item[1]), fg=colors[item[1]])) + msg = '{} -> {}'.format(item[0], item[1]) + if item[2]: + msg += ' :: {}'.format(item[2]) + click.echo(click.style(msg, fg=colors[item[1]])) @click.command() @@ -55,16 +59,14 @@ def execute(uid, start, end): tasks.schedule_start.apply_async( args=[uid], kwargs={'start': start, 'end': end}, - queue='master') + queue='scheduler') @click.command() @click.argument('uid') -@click.option('--reset', default=False, is_flag=True) -def restart(uid, reset): - if reset: - graph.reset(uid) - tasks.schedule_start.apply_async(args=[uid], queue='master') +def restart(uid): + graph.reset(uid) + tasks.schedule_start.apply_async(args=[uid], queue='scheduler') @click.command() @@ -87,7 +89,25 @@ def stop(uid): @click.argument('uid') def retry(uid): graph.reset(uid, ['ERROR']) - tasks.schedule_start.apply_async(args=[uid], queue='master') + tasks.schedule_start.apply_async(args=[uid], queue='scheduler') + + +@click.command() +@click.argument('uid') +def dg(uid): + plan = graph.get_graph(uid) + + colors = { + 'PENDING': 'blue', + 'ERROR': 'red', + 'SUCCESS': 'green', + 'INPROGRESS': 'yellow'} + + for n in plan: + color = colors[plan.node[n]['status']] + plan.node[n]['color'] = color + nx.write_dot(plan, 'graph.dot') + subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png']) orchestration.add_command(create) @@ -98,6 +118,7 @@ orchestration.add_command(restart) orchestration.add_command(reset) orchestration.add_command(stop) orchestration.add_command(retry) +orchestration.add_command(dg) if __name__ == '__main__': diff --git a/orch/TODO b/orch/TODO new file mode 100644 index 00000000..9b02fba0 --- /dev/null +++ b/orch/TODO @@ -0,0 +1,27 @@ +1. Core orchestration +1.1. Celery integration +1.2. Orchestrate stuff based on plan traversal +1.3. Different controls +1.4. Granular execution (e.g execute start/end, execute certain tasks, execute certain path) + +2. User facing interface for orchestration +2.1. How to integrate this stuff with resources? +Two options: +- Use orchestrator as driver +- Orchestrate resources externally, all logic implemented in resource driver +2.2. How to allow reuse across different Profiles ? +Solution: plan for each separate group of resources, e.g. plan for rabbitmq +deployment is not exposed, it is stored only in rabbitmq resources template, +but it exposes several *anchors* - amqp_cluster_ready and amqp_one_node_ready + +3. Granular testing +3.1. How to integrate pre/post verifications + +Orchestration features +------------------------- +1. Controls +- Execute task even if all predecessors failed +- Execute task when one (specific number) predecessor succeed +- Execute task when certain percent of tasks are success +- Ignore if some task failed +2. Granular execution \ No newline at end of file diff --git a/orch/examples/example_py.yml b/orch/examples/example_py.yml new file mode 100644 index 00000000..8139d56a --- /dev/null +++ b/orch/examples/example_py.yml @@ -0,0 +1,143 @@ +name: example_py +tasks: + - uid: rabbitmq_service1 + parameters: + type: sleep + args: [rabbitmq_service1, run] + - uid: openstack_vhost + parameters: + type: solar_resource + args: [openstack_vhost, run] + after: [rabbitmq_service1] + - uid: openstack_rabbitmq_user + parameters: + type: sleep + args: [openstack_rabbitmq_user, run] + after: [rabbitmq_service1] + + - uid: mariadb_service1 + parameters: + type: solar_resource + args: [mariadb_service1, run] + - uid: keystone_db + parameters: + type: solar_resource + args: [keystone_db, run] + after: [mariadb_service1] + - uid: keystone_db_user + parameters: + type: solar_resource + args: [keystone_db_user, run] + after: [keystone_db] + + - uid: keystone_config1 + parameters: + type: solar_resource + args: [keystone_config1, run] + - uid: keystone_config2 + parameters: + type: solar_resource + args: [keystone_config2, run] + - uid: keystone_service1 + parameters: + type: solar_resource + args: [keystone_service1, run] + after: [keystone_config1, keystone_db_user] + + - uid: keystone_service2 + parameters: + type: solar_resource + args: [keystone_service2, run] + after: [keystone_config2, keystone_db_user] + + - uid: haproxy_config + parameters: + type: solar_resource + args: [haproxy_config, run] + after: [keystone_service1, keystone_service2] + - uid: haproxy_service + parameters: + type: solar_resource + args: [haproxy_service, run] + after: [haproxy_config] + + - uid: admin_tenant + parameters: + type: solar_resource + args: [admin_tenant, run] + after: [haproxy_service] + - uid: admin_role + parameters: + type: solar_resource + args: [admin_user, run] + after: [admin_tenant] + - uid: admin_user + parameters: + type: solar_resource + args: [admin_user, run] + after: [admin_role] + - uid: keystone_service_endpoint + parameters: + type: solar_resource + args: [keystone_service_endpoint, run] + after: [admin_user] + + - uid: services_tenant + parameters: + type: solar_resource + args: [services_tenant, run] + after: [keystone_service_endpoint] + + - uid: glance_keystone_user + parameters: + type: solar_resource + args: [glance_keystone_user, run] + after: [keystone_service_endpoint] + - uid: glance_keystone_role + parameters: + type: solar_resource + args: [glance_keystone_role, run] + after: [keystone_service_endpoint] + + - uid: glance_db + parameters: + type: solar_resource + args: [glance_db, run] + after: [mariadb_service1] + + - uid: glance_db_user + parameters: + type: solar_resource + args: [glance_db_user, run] + after: [glance_db] + + - uid: glance_config + parameters: + type: solar_resource + args: [glance_config, run] + - uid: glance_api_container + parameters: + type: solar_resource + args: [glance_api_container, run] + after: [glance_config, glance_db_user, glance_keystone_user, openstack_rabbitmq_user] + - uid: glance_registry_container + parameters: + type: solar_resource + args: [glance_registry_container, run] + after: [glance_config, glance_db_user, glance_keystone_user, openstack_rabbitmq_user] + - uid: glance_api_endpoint + parameters: + type: solar_resource + args: [glance_api_endpoint, run] + after: [glance_api_container] + + - uid: haproxy_service_update1_rem + parameters: + type: solar_resource + args: [haproxy_service, remove] + after: [glance_api_endpoint] + - uid: haproxy_service_update1_run + parameters: + type: solar_resource + args: [haproxy_service, run] + after: [haproxy_service_update1_rem] diff --git a/orch/graph.py b/orch/graph.py index 7a7aa3c4..f5658ac7 100644 --- a/orch/graph.py +++ b/orch/graph.py @@ -41,7 +41,7 @@ def parse_plan(plan_data): dg.graph['name'] = plan['name'] for task in plan['tasks']: dg.add_node( - task['uid'], status='PENDING', **task['parameters']) + task['uid'], status='PENDING', errmsg=None, **task['parameters']) for v in task.get('before', ()): dg.add_edge(task['uid'], v) for u in task.get('after', ()): @@ -94,7 +94,6 @@ def report_topo(uid): report = [] for task in nx.topological_sort(dg): - status = dg.node[task]['status'] - report.append([task, status]) + report.append([task, dg.node[task]['status'], dg.node[task]['errmsg']]) return report diff --git a/orch/tasks.py b/orch/tasks.py index 62fcd724..8b34afe7 100644 --- a/orch/tasks.py +++ b/orch/tasks.py @@ -14,6 +14,10 @@ from orch import graph import redis + +from solar.core import actions +from solar.core import resource + app = Celery( 'tasks', backend='redis://10.0.0.2:6379/1', @@ -25,10 +29,13 @@ r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) class ReportTask(task.Task): def on_success(self, retval, task_id, args, kwargs): - schedule_next.apply_async(args=[task_id, 'SUCCESS'], queue='master') + schedule_next.apply_async(args=[task_id, 'SUCCESS'], queue='scheduler') def on_failure(self, exc, task_id, args, kwargs, einfo): - schedule_next.apply_async(args=[task_id, 'ERROR'], queue='master') + schedule_next.apply_async( + args=[task_id, 'ERROR'], + kwargs={'errmsg': str(einfo.exception)}, + queue='scheduler') solar_task = partial(app.task, base=ReportTask, bind=True) @@ -47,7 +54,12 @@ def maybe_ignore(func): @solar_task -@maybe_ignore +def solar_resource(ctxt, resource_name, action): + res = resource.load(resource_name) + return actions.resource_action(res, action) + + +@solar_task def cmd(ctxt, cmd): popen = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -59,13 +71,11 @@ def cmd(ctxt, cmd): @solar_task -@maybe_ignore def sleep(ctxt, seconds): time.sleep(seconds) @solar_task -@maybe_ignore def error(ctxt, message): raise Exception('message') @@ -91,7 +101,6 @@ def fault_tolerance(ctxt, percent): @solar_task -@maybe_ignore def echo(ctxt, message): return message @@ -135,10 +144,11 @@ def schedule_start(plan_uid, start=None, end=None): @app.task -def schedule_next(task_id, status): +def schedule_next(task_id, status, errmsg=None): plan_uid, task_name = task_id.rsplit(':', 1) dg = graph.get_graph(plan_uid) dg.node[task_name]['status'] = status + dg.node[task_name]['errmsg'] = errmsg schedule(plan_uid, dg) @@ -195,7 +205,7 @@ def generate_task(task, dg, data, task_id): if timeout: timeout_task = fire_timeout.subtask([task_id], countdown=timeout) - timeout_task.set(queue='master') + timeout_task.set(queue='scheduler') yield timeout_task From 508a4ec9a35ed751a8542612f0a2b282b8c50db6 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Tue, 30 Jun 2015 15:45:16 +0300 Subject: [PATCH 08/12] Add celery.yml playbook to configure celery On solar-dev setup 2 workers: 1. Listens to celery,scheduler and celery,solar-dev On solar-dev1/solar-dev2 2. Listens only to celery, --- .gitignore | 3 ++- Vagrantfile | 10 ++++++++++ celery.yml | 13 +++++++++++++ orch/examples/example_py.yml | 6 +++--- 4 files changed, 28 insertions(+), 4 deletions(-) create mode 100644 celery.yml diff --git a/.gitignore b/.gitignore index ffe2a79f..ab1b0ed3 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,8 @@ rs/ solar.log x-venv/ -celery* +celery*.pid +celery*.log *.dot *.png \ No newline at end of file diff --git a/Vagrantfile b/Vagrantfile index 9ed84d10..0e6ebafc 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -12,6 +12,14 @@ pip install ansible ansible-playbook -i "localhost," -c local /vagrant/main.yml /vagrant/docker.yml SCRIPT +master_celery = <