Add discovery of tasks stored in separate modules

This commit is contained in:
Dmitry Shulyak 2015-07-13 17:28:13 +03:00
parent 3c0e8b1e1a
commit bd6286ca84
3 changed files with 25 additions and 13 deletions

View File

@ -10,9 +10,9 @@
- shell: celery multi kill 2
chdir={{celery_dir}}
tags: [stop]
- shell: celery multi start 2 -A solar.orchestration.tasks -Q:1 celery,scheduler -Q:2 celery,{{hostname.stdout}}
- shell: celery multi start 2 -A solar.orchestration.runner -Q:1 celery,scheduler -Q:2 celery,{{hostname.stdout}}
chdir={{celery_dir}}
tags: [master]
- shell: celery multi start 1 -A solar.orchestration.tasks -Q:1 celery,{{hostname.stdout}}
- shell: celery multi start 1 -A solar.orchestration.runner -Q:1 celery,{{hostname.stdout}}
chdir={{celery_dir}}
tags: [slave]

View File

@ -0,0 +1,18 @@
from celery import Celery
app = Celery(
include=['solar.system_log.tasks', 'solar.orchestration.tasks'],
backend='redis://10.0.0.2:6379/1',
broker='redis://10.0.0.2:6379/1')
app.conf.update(CELERY_ACCEPT_CONTENT = ['json'])
app.conf.update(CELERY_TASK_SERIALIZER = 'json')
# NOTE(dshulyak) some autodiscovery system
# maybe https://github.com/mitsuhiko/pluginbase/ ?
from solar.system_log.signals import *
from solar.system_log.tasks import *
from solar.orchestration.tasks import *

View File

@ -1,12 +1,9 @@
from functools import partial, wraps
from itertools import islice
import subprocess
import time
from celery import Celery
from celery.app import task
from celery import group
from celery.exceptions import Ignore
@ -15,18 +12,16 @@ import redis
from solar.orchestration import graph
from solar.core import actions
from solar.core import resource
from solar.orchestration.runner import app
app = Celery(
'tasks',
backend='redis://10.0.0.2:6379/1',
broker='redis://10.0.0.2:6379/1')
app.conf.update(CELERY_ACCEPT_CONTENT = ['json'])
app.conf.update(CELERY_TASK_SERIALIZER = 'json')
r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1)
__all__ = ['solar_resource', 'cmd', 'sleep',
'error', 'fault_tolerance', 'schedule_start', 'schedule_next']
class ReportTask(task.Task):
def on_success(self, retval, task_id, args, kwargs):
@ -106,7 +101,6 @@ def anchor(ctxt, *args):
def schedule(plan_uid, dg):
next_tasks = list(traverse(dg))
graph.save_graph(plan_uid, dg)
print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks)
group(next_tasks)()