Merge branch 'master' into puppet-tests
This commit is contained in:
commit
9c23d39863
6
.gitignore
vendored
6
.gitignore
vendored
@ -17,3 +17,9 @@ rs/
|
||||
|
||||
solar.log
|
||||
x-venv/
|
||||
|
||||
celery*.pid
|
||||
celery*.log
|
||||
|
||||
*.dot
|
||||
*.png
|
10
Vagrantfile
vendored
10
Vagrantfile
vendored
@ -19,6 +19,14 @@ pip install ansible
|
||||
ansible-playbook -i "localhost," -c local /vagrant/main.yml /vagrant/docker.yml /vagrant/slave.yml
|
||||
SCRIPT
|
||||
|
||||
master_celery = <<SCRIPT
|
||||
ansible-playbook -i "localhost," -c local /vagrant/celery.yml --skip-tags slave
|
||||
SCRIPT
|
||||
|
||||
slave_celery = <<SCRIPT
|
||||
ansible-playbook -i "localhost," -c local /vagrant/celery.yml --skip-tags master
|
||||
SCRIPT
|
||||
|
||||
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
|
||||
|
||||
#config.vm.box = "deb/jessie-amd64"
|
||||
@ -27,6 +35,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
|
||||
|
||||
config.vm.define "solar-dev", primary: true do |config|
|
||||
config.vm.provision "shell", inline: init_script, privileged: true
|
||||
config.vm.provision "shell", inline: master_celery, privileged: true
|
||||
config.vm.provision "file", source: "~/.vagrant.d/insecure_private_key", destination: "/vagrant/tmp/keys/ssh_private"
|
||||
config.vm.provision "file", source: "ansible.cfg", destination: "/home/vagrant/.ansible.cfg"
|
||||
config.vm.network "private_network", ip: "10.0.0.2"
|
||||
@ -44,6 +53,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
|
||||
config.vm.define "solar-dev#{index}" do |config|
|
||||
config.vm.provision "shell", inline: init_script, privileged: true
|
||||
config.vm.provision "shell", inline: slave_script, privileged: true
|
||||
config.vm.provision "shell", inline: slave_celery, privileged: true
|
||||
config.vm.network "private_network", ip: "10.0.0.#{ip_index}"
|
||||
config.vm.host_name = "solar-dev#{index}"
|
||||
|
||||
|
18
celery.yml
Normal file
18
celery.yml
Normal file
@ -0,0 +1,18 @@
|
||||
- hosts: all
|
||||
sudo: yes
|
||||
vars:
|
||||
celery_dir: /var/run/celery
|
||||
tasks:
|
||||
- shell: mkdir -p {{celery_dir}}
|
||||
- shell: pip install celery
|
||||
- shell: hostname
|
||||
register: hostname
|
||||
- shell: celery multi kill 2
|
||||
chdir={{celery_dir}}
|
||||
tags: [stop]
|
||||
- shell: celery multi start 2 -A solar.orchestration.tasks -Q:1 celery,scheduler -Q:2 celery,{{hostname.stdout}}
|
||||
chdir={{celery_dir}}
|
||||
tags: [master]
|
||||
- shell: celery multi start 1 -A solar.orchestration.tasks -Q:1 celery,{{hostname.stdout}}
|
||||
chdir={{celery_dir}}
|
||||
tags: [slave]
|
143
examples/orch/example_py.yml
Normal file
143
examples/orch/example_py.yml
Normal file
@ -0,0 +1,143 @@
|
||||
name: example_py
|
||||
tasks:
|
||||
- uid: rabbitmq_service1
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [rabbitmq_service1, run]
|
||||
- uid: openstack_vhost
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [openstack_vhost, run]
|
||||
after: [rabbitmq_service1]
|
||||
- uid: openstack_rabbitmq_user
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [openstack_rabbitmq_user, run]
|
||||
after: [rabbitmq_service1]
|
||||
|
||||
- uid: mariadb_service1
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [mariadb_service1, run]
|
||||
- uid: keystone_db
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [keystone_db, run]
|
||||
after: [mariadb_service1]
|
||||
- uid: keystone_db_user
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [keystone_db_user, run]
|
||||
after: [keystone_db]
|
||||
|
||||
- uid: keystone_config1
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [keystone_config1, run]
|
||||
- uid: keystone_config2
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [keystone_config2, run]
|
||||
- uid: keystone_service1
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [keystone_service1, run]
|
||||
after: [keystone_config1, keystone_db_user]
|
||||
|
||||
- uid: keystone_service2
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [keystone_service2, run]
|
||||
after: [keystone_config2, keystone_db_user]
|
||||
|
||||
- uid: haproxy_config
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [haproxy_config, run]
|
||||
after: [keystone_service1, keystone_service2]
|
||||
- uid: haproxy_service
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [haproxy_service, run]
|
||||
after: [haproxy_config]
|
||||
|
||||
- uid: admin_tenant
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [admin_tenant, run]
|
||||
after: [haproxy_service]
|
||||
- uid: admin_role
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [admin_user, run]
|
||||
after: [admin_tenant]
|
||||
- uid: admin_user
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [admin_user, run]
|
||||
after: [admin_role]
|
||||
- uid: keystone_service_endpoint
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [keystone_service_endpoint, run]
|
||||
after: [admin_user]
|
||||
|
||||
- uid: services_tenant
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [glance_keystone_tenant, run]
|
||||
after: [keystone_service_endpoint]
|
||||
|
||||
- uid: glance_keystone_user
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [glance_keystone_user, run]
|
||||
after: [keystone_service_endpoint]
|
||||
- uid: glance_keystone_role
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [glance_keystone_role, run]
|
||||
after: [keystone_service_endpoint]
|
||||
|
||||
- uid: glance_db
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [glance_db, run]
|
||||
after: [mariadb_service1]
|
||||
|
||||
- uid: glance_db_user
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [glance_db_user, run]
|
||||
after: [glance_db]
|
||||
|
||||
- uid: glance_config
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [glance_config, run]
|
||||
- uid: glance_api_container
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [glance_api_container, run]
|
||||
after: [glance_config, glance_db_user, glance_keystone_user, openstack_rabbitmq_user]
|
||||
- uid: glance_registry_container
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [glance_registry_container, run]
|
||||
after: [glance_config, glance_db_user, glance_keystone_user, openstack_rabbitmq_user]
|
||||
- uid: glance_api_endpoint
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [glance_api_endpoint, run]
|
||||
after: [glance_api_container]
|
||||
|
||||
- uid: haproxy_service_update1_rem
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [haproxy_service, remove]
|
||||
after: [glance_api_endpoint]
|
||||
- uid: haproxy_service_update1_run
|
||||
parameters:
|
||||
type: solar_resource
|
||||
args: [haproxy_service, run]
|
||||
after: [haproxy_service_update1_rem]
|
11
examples/orch/for_stop.yaml
Normal file
11
examples/orch/for_stop.yaml
Normal file
@ -0,0 +1,11 @@
|
||||
name: for_stop
|
||||
tasks:
|
||||
- uid: sleep_some_time
|
||||
parameters:
|
||||
type: sleep
|
||||
args: [20]
|
||||
before: [sleep_again]
|
||||
- uid: sleep_again
|
||||
parameters:
|
||||
type: sleep
|
||||
args: [20]
|
63
examples/orch/multi.yaml
Normal file
63
examples/orch/multi.yaml
Normal file
@ -0,0 +1,63 @@
|
||||
|
||||
name: multi
|
||||
tasks:
|
||||
- uid: rabbitmq_cluster1.create
|
||||
parameters:
|
||||
type: cmd
|
||||
args: ['echo rabbitmq_cluster1.create']
|
||||
before: [amqp_cluster_configured]
|
||||
|
||||
- uid: rabbitmq_cluster2.join
|
||||
parameters:
|
||||
type: cmd
|
||||
args: ['echo rabbitmq_cluster2.join']
|
||||
after: [rabbitmq_cluster1.create]
|
||||
before: [amqp_cluster_configured]
|
||||
- uid: rabbitmq_cluster3.join
|
||||
parameters:
|
||||
type: cmd
|
||||
args: ['echo rabbitmq_cluster3.join']
|
||||
after: [rabbitmq_cluster1.create]
|
||||
before: [amqp_cluster_configured]
|
||||
|
||||
- uid: amqp_cluster_configured
|
||||
parameters:
|
||||
type: fault_tolerance
|
||||
args: [100]
|
||||
|
||||
- uid: compute1
|
||||
parameters:
|
||||
type: echo
|
||||
args: [compute1]
|
||||
before: [compute_ready]
|
||||
after: [amqp_cluster_configured]
|
||||
- uid: compute2
|
||||
parameters:
|
||||
type: echo
|
||||
args: [compute2]
|
||||
before: [compute_ready]
|
||||
after: [amqp_cluster_configured]
|
||||
- uid: compute3
|
||||
parameters:
|
||||
type: echo
|
||||
args: [compute3]
|
||||
before: [compute_ready]
|
||||
after: [amqp_cluster_configured]
|
||||
- uid: compute4
|
||||
parameters:
|
||||
type: error
|
||||
args: [compute4]
|
||||
before: [compute_ready]
|
||||
after: [amqp_cluster_configured]
|
||||
- uid: compute5
|
||||
parameters:
|
||||
type: error
|
||||
args: [compute5]
|
||||
before: [compute_ready]
|
||||
after: [amqp_cluster_configured]
|
||||
|
||||
- uid: compute_ready
|
||||
parameters:
|
||||
type: fault_tolerance
|
||||
args: [60]
|
||||
|
10
examples/orch/simple.yaml
Normal file
10
examples/orch/simple.yaml
Normal file
@ -0,0 +1,10 @@
|
||||
name: simple
|
||||
tasks:
|
||||
- uid: echo_stuff
|
||||
parameters:
|
||||
type: echo
|
||||
args: [10]
|
||||
- uid: just_fail
|
||||
parameters:
|
||||
type: error
|
||||
args: ['message']
|
34
examples/orch/test_errors.yml
Normal file
34
examples/orch/test_errors.yml
Normal file
@ -0,0 +1,34 @@
|
||||
|
||||
name: errors
|
||||
tasks:
|
||||
- uid: compute1
|
||||
parameters:
|
||||
type: echo
|
||||
args: [compute1]
|
||||
before: [compute_ready]
|
||||
- uid: compute2
|
||||
parameters:
|
||||
type: echo
|
||||
args: [compute2]
|
||||
before: [compute_ready]
|
||||
- uid: compute3
|
||||
parameters:
|
||||
type: echo
|
||||
args: [compute3]
|
||||
before: [compute_ready]
|
||||
- uid: compute4
|
||||
parameters:
|
||||
type: error
|
||||
args: [compute4]
|
||||
before: [compute_ready]
|
||||
- uid: compute5
|
||||
parameters:
|
||||
type: error
|
||||
args: [compute5]
|
||||
before: [compute_ready]
|
||||
|
||||
- uid: compute_ready
|
||||
parameters:
|
||||
type: fault_tolerance
|
||||
args: [80]
|
||||
|
34
examples/orch/upd_test_errors.yml
Normal file
34
examples/orch/upd_test_errors.yml
Normal file
@ -0,0 +1,34 @@
|
||||
|
||||
name: errors
|
||||
tasks:
|
||||
- uid: compute1
|
||||
parameters:
|
||||
type: echo
|
||||
args: [compute1]
|
||||
before: [compute_ready]
|
||||
- uid: compute2
|
||||
parameters:
|
||||
type: echo
|
||||
args: [compute2]
|
||||
before: [compute_ready]
|
||||
- uid: compute3
|
||||
parameters:
|
||||
type: echo
|
||||
args: [compute3]
|
||||
before: [compute_ready]
|
||||
- uid: compute4
|
||||
parameters:
|
||||
type: echo
|
||||
args: [compute4]
|
||||
before: [compute_ready]
|
||||
- uid: compute5
|
||||
parameters:
|
||||
type: error
|
||||
args: [compute5]
|
||||
before: [compute_ready]
|
||||
|
||||
- uid: compute_ready
|
||||
parameters:
|
||||
type: fault_tolerance
|
||||
args: [80]
|
||||
|
@ -46,4 +46,4 @@ setup(
|
||||
include_package_data=True,
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'solar = solar.cli:run']})
|
||||
'solar = solar.cli.main:run']})
|
||||
|
0
solar/solar/cli/__init__.py
Normal file
0
solar/solar/cli/__init__.py
Normal file
@ -39,6 +39,8 @@ from solar.core import testing
|
||||
from solar.core import virtual_resource as vr
|
||||
from solar.interfaces.db import get_db
|
||||
|
||||
from solar.cli.orch import orchestration
|
||||
|
||||
# NOTE: these are extensions, they shouldn't be imported here
|
||||
# Maybe each extension can also extend the CLI with parsers
|
||||
from solar.extensions.modules.discovery import Discovery
|
||||
@ -414,6 +416,7 @@ def run():
|
||||
init_cli_deployment_config()
|
||||
init_cli_resource()
|
||||
|
||||
main.add_command(orchestration)
|
||||
main()
|
||||
|
||||
|
106
solar/solar/cli/orch.py
Normal file
106
solar/solar/cli/orch.py
Normal file
@ -0,0 +1,106 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import subprocess
|
||||
|
||||
import click
|
||||
import networkx as nx
|
||||
|
||||
from solar.orchestration import graph
|
||||
from solar.orchestration import tasks
|
||||
|
||||
|
||||
@click.group(name='orch')
|
||||
def orchestration():
|
||||
"""
|
||||
\b
|
||||
create solar/orchestration/examples/multi.yaml
|
||||
<id>
|
||||
run-once <id>
|
||||
report <id>
|
||||
<task> -> <status>
|
||||
restart <id> --reset
|
||||
"""
|
||||
|
||||
@orchestration.command()
|
||||
@click.argument('plan', type=click.File('rb'))
|
||||
def create(plan):
|
||||
click.echo(graph.create_plan(plan.read()))
|
||||
|
||||
@orchestration.command()
|
||||
@click.argument('uid')
|
||||
@click.argument('plan', type=click.File('rb'))
|
||||
def update(uid, plan):
|
||||
graph.update_plan(uid, plan.read())
|
||||
|
||||
@orchestration.command()
|
||||
@click.argument('uid')
|
||||
def report(uid):
|
||||
colors = {
|
||||
'PENDING': 'blue',
|
||||
'ERROR': 'red',
|
||||
'SUCCESS': 'green',
|
||||
'INPROGRESS': 'yellow'}
|
||||
|
||||
report = graph.report_topo(uid)
|
||||
for item in report:
|
||||
msg = '{} -> {}'.format(item[0], item[1])
|
||||
if item[2]:
|
||||
msg += ' :: {}'.format(item[2])
|
||||
click.echo(click.style(msg, fg=colors[item[1]]))
|
||||
|
||||
@orchestration.command(name='run-once')
|
||||
@click.argument('uid')
|
||||
@click.option('--start', default=None)
|
||||
@click.option('--end', default=None)
|
||||
def run_once(uid, start, end):
|
||||
tasks.schedule_start.apply_async(
|
||||
args=[uid],
|
||||
kwargs={'start': start, 'end': end},
|
||||
queue='scheduler')
|
||||
|
||||
@orchestration.command()
|
||||
@click.argument('uid')
|
||||
def restart(uid):
|
||||
graph.reset(uid)
|
||||
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
|
||||
|
||||
|
||||
@orchestration.command()
|
||||
@click.argument('uid')
|
||||
def reset(uid):
|
||||
graph.reset(uid)
|
||||
|
||||
|
||||
@orchestration.command()
|
||||
@click.argument('uid')
|
||||
def stop(uid):
|
||||
# TODO(dshulyak) how to do "hard" stop?
|
||||
# using revoke(terminate=True) will lead to inability to restart execution
|
||||
# research possibility of customizations of
|
||||
# app.control and Panel.register in celery
|
||||
graph.soft_stop(uid)
|
||||
|
||||
|
||||
@orchestration.command()
|
||||
@click.argument('uid')
|
||||
def retry(uid):
|
||||
graph.reset(uid, ['ERROR'])
|
||||
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
|
||||
|
||||
|
||||
@orchestration.command()
|
||||
@click.argument('uid')
|
||||
def dg(uid):
|
||||
plan = graph.get_graph(uid)
|
||||
|
||||
colors = {
|
||||
'PENDING': 'blue',
|
||||
'ERROR': 'red',
|
||||
'SUCCESS': 'green',
|
||||
'INPROGRESS': 'yellow'}
|
||||
|
||||
for n in plan:
|
||||
color = colors[plan.node[n]['status']]
|
||||
plan.node[n]['color'] = color
|
||||
nx.write_dot(plan, 'graph.dot')
|
||||
subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png'])
|
29
solar/solar/orchestration/TODO
Normal file
29
solar/solar/orchestration/TODO
Normal file
@ -0,0 +1,29 @@
|
||||
1. Core orchestration
|
||||
1.1. Celery integration
|
||||
1.2. Orchestrate stuff based on plan traversal
|
||||
1.3. Different controls
|
||||
1.4. Granular execution (e.g execute start/end, execute certain tasks, execute certain path)
|
||||
|
||||
2. User facing interface for orchestration
|
||||
2.1. How to integrate this stuff with resources?
|
||||
Two options:
|
||||
- Use orchestrator as driver
|
||||
- Orchestrate resources externally, all logic implemented in resource driver
|
||||
2.2. How to allow reuse across different Profiles ?
|
||||
Solution: plan for each separate group of resources, e.g. plan for rabbitmq
|
||||
deployment is not exposed, it is stored only in rabbitmq resources template,
|
||||
but it exposes several *anchors* - amqp_cluster_ready and amqp_one_node_ready
|
||||
|
||||
3. Granular testing
|
||||
3.1. How to integrate pre/post verifications with graph execution
|
||||
|
||||
4. Add back timeout support
|
||||
|
||||
Orchestration features
|
||||
-------------------------
|
||||
1. Controls
|
||||
- Execute task even if all predecessors failed
|
||||
- Execute task when one (specific number) predecessor succeed
|
||||
- Execute task when certain percent of tasks are success
|
||||
- Ignore if some task failed
|
||||
2. Granular execution
|
0
solar/solar/orchestration/__init__.py
Normal file
0
solar/solar/orchestration/__init__.py
Normal file
97
solar/solar/orchestration/graph.py
Normal file
97
solar/solar/orchestration/graph.py
Normal file
@ -0,0 +1,97 @@
|
||||
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import networkx as nx
|
||||
import redis
|
||||
import yaml
|
||||
|
||||
|
||||
r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1)
|
||||
|
||||
|
||||
def save_graph(name, graph):
|
||||
# maybe it is possible to store part of information in AsyncResult backend
|
||||
r.set('{}:nodes'.format(name), json.dumps(graph.node.items()))
|
||||
r.set('{}:edges'.format(name), json.dumps(graph.edges(data=True)))
|
||||
r.set('{}:attributes'.format(name), json.dumps(graph.graph))
|
||||
|
||||
|
||||
def get_graph(name):
|
||||
dg = nx.DiGraph()
|
||||
nodes = json.loads(r.get('{}:nodes'.format(name)))
|
||||
edges = json.loads(r.get('{}:edges'.format(name)))
|
||||
dg.graph = json.loads(r.get('{}:attributes'.format(name)))
|
||||
dg.add_nodes_from(nodes)
|
||||
dg.add_edges_from(edges)
|
||||
return dg
|
||||
|
||||
|
||||
get_plan = get_graph
|
||||
|
||||
|
||||
def parse_plan(plan_data):
|
||||
""" parses yaml definition and returns graph
|
||||
"""
|
||||
plan = yaml.load(plan_data)
|
||||
dg = nx.DiGraph()
|
||||
dg.graph['name'] = plan['name']
|
||||
for task in plan['tasks']:
|
||||
dg.add_node(
|
||||
task['uid'], status='PENDING', errmsg=None, **task['parameters'])
|
||||
for v in task.get('before', ()):
|
||||
dg.add_edge(task['uid'], v)
|
||||
for u in task.get('after', ()):
|
||||
dg.add_edge(u, task['uid'])
|
||||
return dg
|
||||
|
||||
|
||||
def create_plan(plan_data):
|
||||
"""
|
||||
"""
|
||||
dg = parse_plan(plan_data)
|
||||
dg.graph['uid'] = "{0}:{1}".format(dg.graph['name'], str(uuid.uuid4()))
|
||||
save_graph(dg.graph['uid'], dg)
|
||||
return dg.graph['uid']
|
||||
|
||||
|
||||
def update_plan(uid, plan_data):
|
||||
"""update preserves old status of tasks if they werent removed
|
||||
"""
|
||||
dg = parse_plan(plan_data)
|
||||
old_dg = get_graph(uid)
|
||||
dg.graph = old_dg.graph
|
||||
for n in dg:
|
||||
if n in old_dg:
|
||||
dg.node[n]['status'] = old_dg.node[n]['status']
|
||||
|
||||
save_graph(uid, dg)
|
||||
return uid
|
||||
|
||||
|
||||
def reset(uid, states=None):
|
||||
dg = get_graph(uid)
|
||||
for n in dg:
|
||||
if states is None or dg.node[n]['status'] in states:
|
||||
dg.node[n]['status'] = 'PENDING'
|
||||
save_graph(uid, dg)
|
||||
|
||||
|
||||
def soft_stop(uid):
|
||||
"""Graph will stop when all currently inprogress tasks will be finished
|
||||
"""
|
||||
dg = get_graph(uid)
|
||||
dg.graph['stop'] = True
|
||||
save_graph(uid, dg)
|
||||
|
||||
|
||||
def report_topo(uid):
|
||||
|
||||
dg = get_graph(uid)
|
||||
report = []
|
||||
|
||||
for task in nx.topological_sort(dg):
|
||||
report.append([task, dg.node[task]['status'], dg.node[task]['errmsg']])
|
||||
|
||||
return report
|
191
solar/solar/orchestration/tasks.py
Normal file
191
solar/solar/orchestration/tasks.py
Normal file
@ -0,0 +1,191 @@
|
||||
|
||||
|
||||
|
||||
from functools import partial, wraps
|
||||
from itertools import islice
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from celery import Celery
|
||||
from celery.app import task
|
||||
from celery import group
|
||||
from celery.exceptions import Ignore
|
||||
import redis
|
||||
|
||||
from solar.orchestration import graph
|
||||
from solar.core import actions
|
||||
from solar.core import resource
|
||||
|
||||
|
||||
app = Celery(
|
||||
'tasks',
|
||||
backend='redis://10.0.0.2:6379/1',
|
||||
broker='redis://10.0.0.2:6379/1')
|
||||
app.conf.update(CELERY_ACCEPT_CONTENT = ['json'])
|
||||
app.conf.update(CELERY_TASK_SERIALIZER = 'json')
|
||||
|
||||
r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1)
|
||||
|
||||
|
||||
class ReportTask(task.Task):
|
||||
|
||||
def on_success(self, retval, task_id, args, kwargs):
|
||||
schedule_next.apply_async(args=[task_id, 'SUCCESS'], queue='scheduler')
|
||||
|
||||
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
||||
schedule_next.apply_async(
|
||||
args=[task_id, 'ERROR'],
|
||||
kwargs={'errmsg': str(einfo.exception)},
|
||||
queue='scheduler')
|
||||
|
||||
|
||||
report_task = partial(app.task, base=ReportTask, bind=True)
|
||||
|
||||
|
||||
@report_task
|
||||
def solar_resource(ctxt, resource_name, action):
|
||||
res = resource.load(resource_name)
|
||||
return actions.resource_action(res, action)
|
||||
|
||||
|
||||
@report_task
|
||||
def cmd(ctxt, cmd):
|
||||
popen = subprocess.Popen(
|
||||
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
||||
out, err = popen.communicate()
|
||||
rcode = popen.returncode
|
||||
if rcode:
|
||||
raise Exception('Command %s failed with err %s', cmd, err)
|
||||
return popen.returncode, out, err
|
||||
|
||||
|
||||
@report_task
|
||||
def sleep(ctxt, seconds):
|
||||
time.sleep(seconds)
|
||||
|
||||
|
||||
@report_task
|
||||
def error(ctxt, message):
|
||||
raise Exception('message')
|
||||
|
||||
|
||||
@report_task
|
||||
def fault_tolerance(ctxt, percent):
|
||||
task_id = ctxt.request.id
|
||||
plan_uid, task_name = task_id.rsplit(':', 1)
|
||||
|
||||
dg = graph.get_graph(plan_uid)
|
||||
success = 0.0
|
||||
predecessors = dg.predecessors(task_name)
|
||||
lth = len(predecessors)
|
||||
|
||||
for s in predecessors:
|
||||
if dg.node[s]['status'] == 'SUCCESS':
|
||||
success += 1
|
||||
|
||||
succes_percent = (success/lth) * 100
|
||||
if succes_percent < percent:
|
||||
raise Exception('Cant proceed with, {0} < {1}'.format(
|
||||
succes_percent, percent))
|
||||
|
||||
|
||||
@report_task
|
||||
def echo(ctxt, message):
|
||||
return message
|
||||
|
||||
|
||||
@report_task
|
||||
def anchor(ctxt, *args):
|
||||
# such tasks should be walked when atleast 1/3/exact number of resources visited
|
||||
dg = graph.get_graph('current')
|
||||
for s in dg.predecessors(ctxt.request.id):
|
||||
if dg.node[s]['status'] != 'SUCCESS':
|
||||
raise Exception('One of the tasks erred, cant proceeed')
|
||||
|
||||
|
||||
def schedule(plan_uid, dg):
|
||||
next_tasks = list(traverse(dg))
|
||||
graph.save_graph(plan_uid, dg)
|
||||
print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks)
|
||||
group(next_tasks)()
|
||||
|
||||
|
||||
@app.task
|
||||
def schedule_start(plan_uid, start=None, end=None):
|
||||
"""On receive finished task should update storage with task result:
|
||||
|
||||
- find successors that should be executed
|
||||
- apply different policies to tasks
|
||||
"""
|
||||
dg = graph.get_graph(plan_uid)
|
||||
dg.graph['stop'] = False
|
||||
schedule(plan_uid, dg)
|
||||
|
||||
|
||||
@app.task
|
||||
def schedule_next(task_id, status, errmsg=None):
|
||||
plan_uid, task_name = task_id.rsplit(':', 1)
|
||||
dg = graph.get_graph(plan_uid)
|
||||
dg.node[task_name]['status'] = status
|
||||
dg.node[task_name]['errmsg'] = errmsg
|
||||
|
||||
schedule(plan_uid, dg)
|
||||
|
||||
# TODO(dshulyak) some tasks should be evaluated even if not all predecessors
|
||||
# succeded, how to identify this?
|
||||
# - add ignor_error on edge
|
||||
# - add ignore_predecessor_errors on task in consideration
|
||||
# - make fault_tolerance not a task but a policy for all tasks
|
||||
control_tasks = [fault_tolerance, anchor]
|
||||
|
||||
|
||||
def traverse(dg):
|
||||
"""
|
||||
1. Node should be visited only when all predecessors already visited
|
||||
2. Visited nodes should have any state except PENDING, INPROGRESS, for now
|
||||
is SUCCESS or ERROR, but it can be extended
|
||||
3. If node is INPROGRESS it should not be visited once again
|
||||
"""
|
||||
visited = set()
|
||||
for node in dg:
|
||||
data = dg.node[node]
|
||||
if data['status'] not in ('PENDING', 'INPROGRESS'):
|
||||
visited.add(node)
|
||||
|
||||
for node in dg:
|
||||
data = dg.node[node]
|
||||
|
||||
if node in visited:
|
||||
continue
|
||||
elif data['status'] == 'INPROGRESS':
|
||||
continue
|
||||
|
||||
predecessors = set(dg.predecessors(node))
|
||||
|
||||
if predecessors <= visited:
|
||||
task_id = '{}:{}'.format(dg.graph['uid'], node)
|
||||
|
||||
task_name = '{}.{}'.format(__name__, data['type'])
|
||||
task = app.tasks[task_name]
|
||||
|
||||
if all_success(dg, predecessors) or task in control_tasks:
|
||||
dg.node[node]['status'] = 'INPROGRESS'
|
||||
for t in generate_task(task, dg, data, task_id):
|
||||
yield t
|
||||
|
||||
|
||||
def generate_task(task, dg, data, task_id):
|
||||
|
||||
subtask = task.subtask(
|
||||
data['args'], task_id=task_id,
|
||||
time_limit=data.get('time_limit', None),
|
||||
soft_time_limit=data.get('soft_time_limit', None))
|
||||
|
||||
if data.get('target', None):
|
||||
subtask.set(queue=data['target'])
|
||||
|
||||
yield subtask
|
||||
|
||||
|
||||
def all_success(dg, nodes):
|
||||
return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes))
|
Loading…
x
Reference in New Issue
Block a user