Add several examples and improvements to orchestration

- Added example.py converted to plan description
- Added error message
- Plan can be printed as dot/png graph
- Added TODO
This commit is contained in:
Dmitry Shulyak 2015-06-30 12:31:50 +03:00
parent 6d4018b56b
commit 47ca8c8fdf
6 changed files with 226 additions and 21 deletions

5
.gitignore vendored
View File

@ -17,3 +17,8 @@ rs/
solar.log
x-venv/
celery*
*.dot
*.png

41
cli.py
View File

@ -6,6 +6,8 @@ import click
from orch import graph
from orch import tasks
import networkx as nx
import subprocess
@click.group()
def orchestration():
@ -36,15 +38,17 @@ def update(uid, plan):
@click.argument('uid')
def report(uid):
colors = {
'PENDING': 'white',
'PENDING': 'blue',
'ERROR': 'red',
'SUCCESS': 'green',
'INPROGRESS': 'yellow'}
report = graph.report_topo(uid)
for item in report:
click.echo(
click.style('{} -> {}'.format(item[0], item[1]), fg=colors[item[1]]))
msg = '{} -> {}'.format(item[0], item[1])
if item[2]:
msg += ' :: {}'.format(item[2])
click.echo(click.style(msg, fg=colors[item[1]]))
@click.command()
@ -55,16 +59,14 @@ def execute(uid, start, end):
tasks.schedule_start.apply_async(
args=[uid],
kwargs={'start': start, 'end': end},
queue='master')
queue='scheduler')
@click.command()
@click.argument('uid')
@click.option('--reset', default=False, is_flag=True)
def restart(uid, reset):
if reset:
graph.reset(uid)
tasks.schedule_start.apply_async(args=[uid], queue='master')
def restart(uid):
graph.reset(uid)
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
@click.command()
@ -87,7 +89,25 @@ def stop(uid):
@click.argument('uid')
def retry(uid):
graph.reset(uid, ['ERROR'])
tasks.schedule_start.apply_async(args=[uid], queue='master')
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
@click.command()
@click.argument('uid')
def dg(uid):
plan = graph.get_graph(uid)
colors = {
'PENDING': 'blue',
'ERROR': 'red',
'SUCCESS': 'green',
'INPROGRESS': 'yellow'}
for n in plan:
color = colors[plan.node[n]['status']]
plan.node[n]['color'] = color
nx.write_dot(plan, 'graph.dot')
subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png'])
orchestration.add_command(create)
@ -98,6 +118,7 @@ orchestration.add_command(restart)
orchestration.add_command(reset)
orchestration.add_command(stop)
orchestration.add_command(retry)
orchestration.add_command(dg)
if __name__ == '__main__':

27
orch/TODO Normal file
View File

@ -0,0 +1,27 @@
1. Core orchestration
1.1. Celery integration
1.2. Orchestrate stuff based on plan traversal
1.3. Different controls
1.4. Granular execution (e.g execute start/end, execute certain tasks, execute certain path)
2. User facing interface for orchestration
2.1. How to integrate this stuff with resources?
Two options:
- Use orchestrator as driver
- Orchestrate resources externally, all logic implemented in resource driver
2.2. How to allow reuse across different Profiles ?
Solution: plan for each separate group of resources, e.g. plan for rabbitmq
deployment is not exposed, it is stored only in rabbitmq resources template,
but it exposes several *anchors* - amqp_cluster_ready and amqp_one_node_ready
3. Granular testing
3.1. How to integrate pre/post verifications
Orchestration features
-------------------------
1. Controls
- Execute task even if all predecessors failed
- Execute task when one (specific number) predecessor succeed
- Execute task when certain percent of tasks are success
- Ignore if some task failed
2. Granular execution

View File

@ -0,0 +1,143 @@
name: example_py
tasks:
- uid: rabbitmq_service1
parameters:
type: sleep
args: [rabbitmq_service1, run]
- uid: openstack_vhost
parameters:
type: solar_resource
args: [openstack_vhost, run]
after: [rabbitmq_service1]
- uid: openstack_rabbitmq_user
parameters:
type: sleep
args: [openstack_rabbitmq_user, run]
after: [rabbitmq_service1]
- uid: mariadb_service1
parameters:
type: solar_resource
args: [mariadb_service1, run]
- uid: keystone_db
parameters:
type: solar_resource
args: [keystone_db, run]
after: [mariadb_service1]
- uid: keystone_db_user
parameters:
type: solar_resource
args: [keystone_db_user, run]
after: [keystone_db]
- uid: keystone_config1
parameters:
type: solar_resource
args: [keystone_config1, run]
- uid: keystone_config2
parameters:
type: solar_resource
args: [keystone_config2, run]
- uid: keystone_service1
parameters:
type: solar_resource
args: [keystone_service1, run]
after: [keystone_config1, keystone_db_user]
- uid: keystone_service2
parameters:
type: solar_resource
args: [keystone_service2, run]
after: [keystone_config2, keystone_db_user]
- uid: haproxy_config
parameters:
type: solar_resource
args: [haproxy_config, run]
after: [keystone_service1, keystone_service2]
- uid: haproxy_service
parameters:
type: solar_resource
args: [haproxy_service, run]
after: [haproxy_config]
- uid: admin_tenant
parameters:
type: solar_resource
args: [admin_tenant, run]
after: [haproxy_service]
- uid: admin_role
parameters:
type: solar_resource
args: [admin_user, run]
after: [admin_tenant]
- uid: admin_user
parameters:
type: solar_resource
args: [admin_user, run]
after: [admin_role]
- uid: keystone_service_endpoint
parameters:
type: solar_resource
args: [keystone_service_endpoint, run]
after: [admin_user]
- uid: services_tenant
parameters:
type: solar_resource
args: [services_tenant, run]
after: [keystone_service_endpoint]
- uid: glance_keystone_user
parameters:
type: solar_resource
args: [glance_keystone_user, run]
after: [keystone_service_endpoint]
- uid: glance_keystone_role
parameters:
type: solar_resource
args: [glance_keystone_role, run]
after: [keystone_service_endpoint]
- uid: glance_db
parameters:
type: solar_resource
args: [glance_db, run]
after: [mariadb_service1]
- uid: glance_db_user
parameters:
type: solar_resource
args: [glance_db_user, run]
after: [glance_db]
- uid: glance_config
parameters:
type: solar_resource
args: [glance_config, run]
- uid: glance_api_container
parameters:
type: solar_resource
args: [glance_api_container, run]
after: [glance_config, glance_db_user, glance_keystone_user, openstack_rabbitmq_user]
- uid: glance_registry_container
parameters:
type: solar_resource
args: [glance_registry_container, run]
after: [glance_config, glance_db_user, glance_keystone_user, openstack_rabbitmq_user]
- uid: glance_api_endpoint
parameters:
type: solar_resource
args: [glance_api_endpoint, run]
after: [glance_api_container]
- uid: haproxy_service_update1_rem
parameters:
type: solar_resource
args: [haproxy_service, remove]
after: [glance_api_endpoint]
- uid: haproxy_service_update1_run
parameters:
type: solar_resource
args: [haproxy_service, run]
after: [haproxy_service_update1_rem]

View File

@ -41,7 +41,7 @@ def parse_plan(plan_data):
dg.graph['name'] = plan['name']
for task in plan['tasks']:
dg.add_node(
task['uid'], status='PENDING', **task['parameters'])
task['uid'], status='PENDING', errmsg=None, **task['parameters'])
for v in task.get('before', ()):
dg.add_edge(task['uid'], v)
for u in task.get('after', ()):
@ -94,7 +94,6 @@ def report_topo(uid):
report = []
for task in nx.topological_sort(dg):
status = dg.node[task]['status']
report.append([task, status])
report.append([task, dg.node[task]['status'], dg.node[task]['errmsg']])
return report

View File

@ -14,6 +14,10 @@ from orch import graph
import redis
from solar.core import actions
from solar.core import resource
app = Celery(
'tasks',
backend='redis://10.0.0.2:6379/1',
@ -25,10 +29,13 @@ r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1)
class ReportTask(task.Task):
def on_success(self, retval, task_id, args, kwargs):
schedule_next.apply_async(args=[task_id, 'SUCCESS'], queue='master')
schedule_next.apply_async(args=[task_id, 'SUCCESS'], queue='scheduler')
def on_failure(self, exc, task_id, args, kwargs, einfo):
schedule_next.apply_async(args=[task_id, 'ERROR'], queue='master')
schedule_next.apply_async(
args=[task_id, 'ERROR'],
kwargs={'errmsg': str(einfo.exception)},
queue='scheduler')
solar_task = partial(app.task, base=ReportTask, bind=True)
@ -47,7 +54,12 @@ def maybe_ignore(func):
@solar_task
@maybe_ignore
def solar_resource(ctxt, resource_name, action):
res = resource.load(resource_name)
return actions.resource_action(res, action)
@solar_task
def cmd(ctxt, cmd):
popen = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
@ -59,13 +71,11 @@ def cmd(ctxt, cmd):
@solar_task
@maybe_ignore
def sleep(ctxt, seconds):
time.sleep(seconds)
@solar_task
@maybe_ignore
def error(ctxt, message):
raise Exception('message')
@ -91,7 +101,6 @@ def fault_tolerance(ctxt, percent):
@solar_task
@maybe_ignore
def echo(ctxt, message):
return message
@ -135,10 +144,11 @@ def schedule_start(plan_uid, start=None, end=None):
@app.task
def schedule_next(task_id, status):
def schedule_next(task_id, status, errmsg=None):
plan_uid, task_name = task_id.rsplit(':', 1)
dg = graph.get_graph(plan_uid)
dg.node[task_name]['status'] = status
dg.node[task_name]['errmsg'] = errmsg
schedule(plan_uid, dg)
@ -195,7 +205,7 @@ def generate_task(task, dg, data, task_id):
if timeout:
timeout_task = fire_timeout.subtask([task_id], countdown=timeout)
timeout_task.set(queue='master')
timeout_task.set(queue='scheduler')
yield timeout_task