diff --git a/celery.yml b/celery.yml index fea21a99..4b5f9878 100644 --- a/celery.yml +++ b/celery.yml @@ -1,4 +1,5 @@ - hosts: all + sudo: yes vars: celery_dir: /var/run/celery tasks: @@ -8,6 +9,7 @@ register: hostname - shell: celery multi kill 2 chdir={{celery_dir}} + tags: [stop] - shell: celery multi start 2 -A solar.orchestration.tasks -Q:1 celery,scheduler -Q:2 celery,{{hostname.stdout}} chdir={{celery_dir}} tags: [master] diff --git a/solar/solar/cli/orch.py b/solar/solar/cli/orch.py index 05dd8be0..74c46ef6 100644 --- a/solar/solar/cli/orch.py +++ b/solar/solar/cli/orch.py @@ -15,7 +15,7 @@ def orchestration(): \b create solar/orchestration/examples/multi.yaml - execute + run-once report -> restart --reset @@ -48,11 +48,11 @@ def report(uid): msg += ' :: {}'.format(item[2]) click.echo(click.style(msg, fg=colors[item[1]])) -@orchestration.command() +@orchestration.command(name='run-once') @click.argument('uid') @click.option('--start', default=None) @click.option('--end', default=None) -def execute(uid, start, end): +def run_once(uid, start, end): tasks.schedule_start.apply_async( args=[uid], kwargs={'start': start, 'end': end}, diff --git a/solar/solar/orchestration/tasks.py b/solar/solar/orchestration/tasks.py index 1927845d..49794ac9 100644 --- a/solar/solar/orchestration/tasks.py +++ b/solar/solar/orchestration/tasks.py @@ -21,6 +21,8 @@ app = Celery( 'tasks', backend='redis://10.0.0.2:6379/1', broker='redis://10.0.0.2:6379/1') +app.conf.update(CELERY_ACCEPT_CONTENT = ['json']) +app.conf.update(CELERY_TASK_SERIALIZER = 'json') r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) @@ -102,11 +104,10 @@ def anchor(ctxt, *args): def schedule(plan_uid, dg): - if not dg.graph.get('stop'): - next_tasks = list(traverse(dg)) - print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) - group(next_tasks)() + next_tasks = list(traverse(dg)) graph.save_graph(plan_uid, dg) + print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) + group(next_tasks)() @app.task @@ -187,4 +188,4 @@ def generate_task(task, dg, data, task_id): def all_success(dg, nodes): - return all((n for n in nodes if dg.node[n]['status'] == 'SUCCESS')) + return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes))