From 47ca8c8fdff49c5107550a92db7f076cac4f0700 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Tue, 30 Jun 2015 12:31:50 +0300 Subject: [PATCH] Add several examples and improvements to orchestration - Added example.py converted to plan description - Added error message - Plan can be printed as dot/png graph - Added TODO --- .gitignore | 5 ++ cli.py | 41 +++++++--- orch/TODO | 27 +++++++ orch/examples/example_py.yml | 143 +++++++++++++++++++++++++++++++++++ orch/graph.py | 5 +- orch/tasks.py | 26 +++++-- 6 files changed, 226 insertions(+), 21 deletions(-) create mode 100644 orch/TODO create mode 100644 orch/examples/example_py.yml diff --git a/.gitignore b/.gitignore index b221e333..ffe2a79f 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,8 @@ rs/ solar.log x-venv/ + +celery* + +*.dot +*.png \ No newline at end of file diff --git a/cli.py b/cli.py index f39e4fc2..17d9dade 100755 --- a/cli.py +++ b/cli.py @@ -6,6 +6,8 @@ import click from orch import graph from orch import tasks +import networkx as nx +import subprocess @click.group() def orchestration(): @@ -36,15 +38,17 @@ def update(uid, plan): @click.argument('uid') def report(uid): colors = { - 'PENDING': 'white', + 'PENDING': 'blue', 'ERROR': 'red', 'SUCCESS': 'green', 'INPROGRESS': 'yellow'} report = graph.report_topo(uid) for item in report: - click.echo( - click.style('{} -> {}'.format(item[0], item[1]), fg=colors[item[1]])) + msg = '{} -> {}'.format(item[0], item[1]) + if item[2]: + msg += ' :: {}'.format(item[2]) + click.echo(click.style(msg, fg=colors[item[1]])) @click.command() @@ -55,16 +59,14 @@ def execute(uid, start, end): tasks.schedule_start.apply_async( args=[uid], kwargs={'start': start, 'end': end}, - queue='master') + queue='scheduler') @click.command() @click.argument('uid') -@click.option('--reset', default=False, is_flag=True) -def restart(uid, reset): - if reset: - graph.reset(uid) - tasks.schedule_start.apply_async(args=[uid], queue='master') +def restart(uid): + graph.reset(uid) + tasks.schedule_start.apply_async(args=[uid], queue='scheduler') @click.command() @@ -87,7 +89,25 @@ def stop(uid): @click.argument('uid') def retry(uid): graph.reset(uid, ['ERROR']) - tasks.schedule_start.apply_async(args=[uid], queue='master') + tasks.schedule_start.apply_async(args=[uid], queue='scheduler') + + +@click.command() +@click.argument('uid') +def dg(uid): + plan = graph.get_graph(uid) + + colors = { + 'PENDING': 'blue', + 'ERROR': 'red', + 'SUCCESS': 'green', + 'INPROGRESS': 'yellow'} + + for n in plan: + color = colors[plan.node[n]['status']] + plan.node[n]['color'] = color + nx.write_dot(plan, 'graph.dot') + subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png']) orchestration.add_command(create) @@ -98,6 +118,7 @@ orchestration.add_command(restart) orchestration.add_command(reset) orchestration.add_command(stop) orchestration.add_command(retry) +orchestration.add_command(dg) if __name__ == '__main__': diff --git a/orch/TODO b/orch/TODO new file mode 100644 index 00000000..9b02fba0 --- /dev/null +++ b/orch/TODO @@ -0,0 +1,27 @@ +1. Core orchestration +1.1. Celery integration +1.2. Orchestrate stuff based on plan traversal +1.3. Different controls +1.4. Granular execution (e.g execute start/end, execute certain tasks, execute certain path) + +2. User facing interface for orchestration +2.1. How to integrate this stuff with resources? +Two options: +- Use orchestrator as driver +- Orchestrate resources externally, all logic implemented in resource driver +2.2. How to allow reuse across different Profiles ? +Solution: plan for each separate group of resources, e.g. plan for rabbitmq +deployment is not exposed, it is stored only in rabbitmq resources template, +but it exposes several *anchors* - amqp_cluster_ready and amqp_one_node_ready + +3. Granular testing +3.1. How to integrate pre/post verifications + +Orchestration features +------------------------- +1. Controls +- Execute task even if all predecessors failed +- Execute task when one (specific number) predecessor succeed +- Execute task when certain percent of tasks are success +- Ignore if some task failed +2. Granular execution \ No newline at end of file diff --git a/orch/examples/example_py.yml b/orch/examples/example_py.yml new file mode 100644 index 00000000..8139d56a --- /dev/null +++ b/orch/examples/example_py.yml @@ -0,0 +1,143 @@ +name: example_py +tasks: + - uid: rabbitmq_service1 + parameters: + type: sleep + args: [rabbitmq_service1, run] + - uid: openstack_vhost + parameters: + type: solar_resource + args: [openstack_vhost, run] + after: [rabbitmq_service1] + - uid: openstack_rabbitmq_user + parameters: + type: sleep + args: [openstack_rabbitmq_user, run] + after: [rabbitmq_service1] + + - uid: mariadb_service1 + parameters: + type: solar_resource + args: [mariadb_service1, run] + - uid: keystone_db + parameters: + type: solar_resource + args: [keystone_db, run] + after: [mariadb_service1] + - uid: keystone_db_user + parameters: + type: solar_resource + args: [keystone_db_user, run] + after: [keystone_db] + + - uid: keystone_config1 + parameters: + type: solar_resource + args: [keystone_config1, run] + - uid: keystone_config2 + parameters: + type: solar_resource + args: [keystone_config2, run] + - uid: keystone_service1 + parameters: + type: solar_resource + args: [keystone_service1, run] + after: [keystone_config1, keystone_db_user] + + - uid: keystone_service2 + parameters: + type: solar_resource + args: [keystone_service2, run] + after: [keystone_config2, keystone_db_user] + + - uid: haproxy_config + parameters: + type: solar_resource + args: [haproxy_config, run] + after: [keystone_service1, keystone_service2] + - uid: haproxy_service + parameters: + type: solar_resource + args: [haproxy_service, run] + after: [haproxy_config] + + - uid: admin_tenant + parameters: + type: solar_resource + args: [admin_tenant, run] + after: [haproxy_service] + - uid: admin_role + parameters: + type: solar_resource + args: [admin_user, run] + after: [admin_tenant] + - uid: admin_user + parameters: + type: solar_resource + args: [admin_user, run] + after: [admin_role] + - uid: keystone_service_endpoint + parameters: + type: solar_resource + args: [keystone_service_endpoint, run] + after: [admin_user] + + - uid: services_tenant + parameters: + type: solar_resource + args: [services_tenant, run] + after: [keystone_service_endpoint] + + - uid: glance_keystone_user + parameters: + type: solar_resource + args: [glance_keystone_user, run] + after: [keystone_service_endpoint] + - uid: glance_keystone_role + parameters: + type: solar_resource + args: [glance_keystone_role, run] + after: [keystone_service_endpoint] + + - uid: glance_db + parameters: + type: solar_resource + args: [glance_db, run] + after: [mariadb_service1] + + - uid: glance_db_user + parameters: + type: solar_resource + args: [glance_db_user, run] + after: [glance_db] + + - uid: glance_config + parameters: + type: solar_resource + args: [glance_config, run] + - uid: glance_api_container + parameters: + type: solar_resource + args: [glance_api_container, run] + after: [glance_config, glance_db_user, glance_keystone_user, openstack_rabbitmq_user] + - uid: glance_registry_container + parameters: + type: solar_resource + args: [glance_registry_container, run] + after: [glance_config, glance_db_user, glance_keystone_user, openstack_rabbitmq_user] + - uid: glance_api_endpoint + parameters: + type: solar_resource + args: [glance_api_endpoint, run] + after: [glance_api_container] + + - uid: haproxy_service_update1_rem + parameters: + type: solar_resource + args: [haproxy_service, remove] + after: [glance_api_endpoint] + - uid: haproxy_service_update1_run + parameters: + type: solar_resource + args: [haproxy_service, run] + after: [haproxy_service_update1_rem] diff --git a/orch/graph.py b/orch/graph.py index 7a7aa3c4..f5658ac7 100644 --- a/orch/graph.py +++ b/orch/graph.py @@ -41,7 +41,7 @@ def parse_plan(plan_data): dg.graph['name'] = plan['name'] for task in plan['tasks']: dg.add_node( - task['uid'], status='PENDING', **task['parameters']) + task['uid'], status='PENDING', errmsg=None, **task['parameters']) for v in task.get('before', ()): dg.add_edge(task['uid'], v) for u in task.get('after', ()): @@ -94,7 +94,6 @@ def report_topo(uid): report = [] for task in nx.topological_sort(dg): - status = dg.node[task]['status'] - report.append([task, status]) + report.append([task, dg.node[task]['status'], dg.node[task]['errmsg']]) return report diff --git a/orch/tasks.py b/orch/tasks.py index 62fcd724..8b34afe7 100644 --- a/orch/tasks.py +++ b/orch/tasks.py @@ -14,6 +14,10 @@ from orch import graph import redis + +from solar.core import actions +from solar.core import resource + app = Celery( 'tasks', backend='redis://10.0.0.2:6379/1', @@ -25,10 +29,13 @@ r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) class ReportTask(task.Task): def on_success(self, retval, task_id, args, kwargs): - schedule_next.apply_async(args=[task_id, 'SUCCESS'], queue='master') + schedule_next.apply_async(args=[task_id, 'SUCCESS'], queue='scheduler') def on_failure(self, exc, task_id, args, kwargs, einfo): - schedule_next.apply_async(args=[task_id, 'ERROR'], queue='master') + schedule_next.apply_async( + args=[task_id, 'ERROR'], + kwargs={'errmsg': str(einfo.exception)}, + queue='scheduler') solar_task = partial(app.task, base=ReportTask, bind=True) @@ -47,7 +54,12 @@ def maybe_ignore(func): @solar_task -@maybe_ignore +def solar_resource(ctxt, resource_name, action): + res = resource.load(resource_name) + return actions.resource_action(res, action) + + +@solar_task def cmd(ctxt, cmd): popen = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -59,13 +71,11 @@ def cmd(ctxt, cmd): @solar_task -@maybe_ignore def sleep(ctxt, seconds): time.sleep(seconds) @solar_task -@maybe_ignore def error(ctxt, message): raise Exception('message') @@ -91,7 +101,6 @@ def fault_tolerance(ctxt, percent): @solar_task -@maybe_ignore def echo(ctxt, message): return message @@ -135,10 +144,11 @@ def schedule_start(plan_uid, start=None, end=None): @app.task -def schedule_next(task_id, status): +def schedule_next(task_id, status, errmsg=None): plan_uid, task_name = task_id.rsplit(':', 1) dg = graph.get_graph(plan_uid) dg.node[task_name]['status'] = status + dg.node[task_name]['errmsg'] = errmsg schedule(plan_uid, dg) @@ -195,7 +205,7 @@ def generate_task(task, dg, data, task_id): if timeout: timeout_task = fire_timeout.subtask([task_id], countdown=timeout) - timeout_task.set(queue='master') + timeout_task.set(queue='scheduler') yield timeout_task