From 903fc0b2cbf9319138987f51ed66d864bc451ec9 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Tue, 21 Jul 2015 14:28:55 +0300 Subject: [PATCH 1/9] Add chain and rules for specifying limits --- solar/solar/orchestration/limits.py | 59 +++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 solar/solar/orchestration/limits.py diff --git a/solar/solar/orchestration/limits.py b/solar/solar/orchestration/limits.py new file mode 100644 index 00000000..5df1a774 --- /dev/null +++ b/solar/solar/orchestration/limits.py @@ -0,0 +1,59 @@ + +from functools import partial + + +class Chain(object): + + def __init__(self, dg, inprogress, added): + self.dg = dg + self.inprogress + self.added = added + self.rules = [] + + def add_rule(self, rule): + self.rules.append(rule) + + @property + def filtered(self): + for item in self.added: + for rule in self.rules: + if not rule(self.dg, self.inprogress, item): + break + else: + yield item + + def __iter__(self): + return iter(self.filtered) + + +def get_default_chain(dg, inprogress, added): + chain = Chain(dg, inprogress, added) + chain.add_rule(items_rule) + chain.add_rule(target_based_rule) + chain.add_rule(type_based_rule) + return chain + + +def type_based_rule(dg, inprogress, item): + """condition will be specified like: + type_limit: 2 + """ + _type = item['resource_type'] + type_count = 0 + for n in inprogress: + if dg.node[n].get('resource_type') == _type: + type_count += 1 + return item['type_limit'] > type_count + + +def target_based_rule(dg, inprogress, item, limit=1): + target = item['target'] + target_count = 0 + for n in inprogress: + if dg.node[n].get('target') == target: + target_count += 1 + return limit > target_count + + +def items_rule(dg, inprogress, item, limit=10): + return len(inprogress) < limit From ca02b59932b5f0c4b18bec0fb823a4c2e505df7f Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Tue, 21 Jul 2015 16:41:41 +0300 Subject: [PATCH 2/9] Move traversal to separate module --- solar/solar/orchestration/limits.py | 2 - solar/solar/orchestration/tasks.py | 62 +------------------------- solar/solar/orchestration/traversal.py | 60 +++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 62 deletions(-) create mode 100644 solar/solar/orchestration/traversal.py diff --git a/solar/solar/orchestration/limits.py b/solar/solar/orchestration/limits.py index 5df1a774..40e0070e 100644 --- a/solar/solar/orchestration/limits.py +++ b/solar/solar/orchestration/limits.py @@ -1,6 +1,4 @@ -from functools import partial - class Chain(object): diff --git a/solar/solar/orchestration/tasks.py b/solar/solar/orchestration/tasks.py index 87175086..9b614c86 100644 --- a/solar/solar/orchestration/tasks.py +++ b/solar/solar/orchestration/tasks.py @@ -14,6 +14,7 @@ from solar.core import actions from solar.core import resource from solar.system_log.tasks import commit_logitem, error_logitem from solar.orchestration.runner import app +from solar.orchestration.traversal import traversal r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) @@ -103,7 +104,7 @@ def anchor(ctxt, *args): def schedule(plan_uid, dg): - next_tasks = list(traverse(dg)) + next_tasks = list(traverse(dg, control_tasks=(fault_tolerance,))) graph.save_graph(plan_uid, dg) group(next_tasks)() @@ -136,62 +137,3 @@ def schedule_next(task_id, status, errmsg=None): dg.node[task_name]['errmsg'] = errmsg schedule(plan_uid, dg) - -# TODO(dshulyak) some tasks should be evaluated even if not all predecessors -# succeded, how to identify this? -# - add ignor_error on edge -# - add ignore_predecessor_errors on task in consideration -# - make fault_tolerance not a task but a policy for all tasks -control_tasks = [fault_tolerance, anchor] - - -def traverse(dg): - """ - 1. Node should be visited only when all predecessors already visited - 2. Visited nodes should have any state except PENDING, INPROGRESS, for now - is SUCCESS or ERROR, but it can be extended - 3. If node is INPROGRESS it should not be visited once again - """ - visited = set() - for node in dg: - data = dg.node[node] - if data['status'] not in ('PENDING', 'INPROGRESS', 'SKIPPED'): - visited.add(node) - - for node in dg: - data = dg.node[node] - - if node in visited: - continue - elif data['status'] in ('INPROGRESS', 'SKIPPED'): - continue - - predecessors = set(dg.predecessors(node)) - - if predecessors <= visited: - task_id = '{}:{}'.format(dg.graph['uid'], node) - - task_name = '{}.{}'.format(__name__, data['type']) - task = app.tasks[task_name] - - if all_success(dg, predecessors) or task in control_tasks: - dg.node[node]['status'] = 'INPROGRESS' - for t in generate_task(task, dg, data, task_id): - yield t - - -def generate_task(task, dg, data, task_id): - - subtask = task.subtask( - data['args'], task_id=task_id, - time_limit=data.get('time_limit', None), - soft_time_limit=data.get('soft_time_limit', None)) - - if data.get('target', None): - subtask.set(queue=data['target']) - - yield subtask - - -def all_success(dg, nodes): - return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes)) diff --git a/solar/solar/orchestration/traversal.py b/solar/solar/orchestration/traversal.py new file mode 100644 index 00000000..653cac87 --- /dev/null +++ b/solar/solar/orchestration/traversal.py @@ -0,0 +1,60 @@ + + +from solar.orchestration.runner import app + + +# TODO(dshulyak) some tasks should be evaluated even if not all predecessors +# succeded, how to identify this? +# - add ignor_error on edge +# - add ignore_predecessor_errors on task in consideration +# - make fault_tolerance not a task but a policy for all tasks +def traverse(dg, control_tasks=()): + """ + 1. Node should be visited only when all predecessors already visited + 2. Visited nodes should have any state except PENDING, INPROGRESS, for now + is SUCCESS or ERROR, but it can be extended + 3. If node is INPROGRESS it should not be visited once again + """ + visited = set() + for node in dg: + data = dg.node[node] + if data['status'] not in ('PENDING', 'INPROGRESS', 'SKIPPED'): + visited.add(node) + + for node in dg: + data = dg.node[node] + + if node in visited: + continue + elif data['status'] in ('INPROGRESS', 'SKIPPED'): + continue + + predecessors = set(dg.predecessors(node)) + + if predecessors <= visited: + task_id = '{}:{}'.format(dg.graph['uid'], node) + + task_name = '{}.{}'.format(__name__, data['type']) + task = app.tasks[task_name] + + if all_success(dg, predecessors) or task in control_tasks: + dg.node[node]['status'] = 'INPROGRESS' + for t in generate_task(task, dg, data, task_id): + yield t + + +def generate_task(task, dg, data, task_id): + + subtask = task.subtask( + data['args'], task_id=task_id, + time_limit=data.get('time_limit', None), + soft_time_limit=data.get('soft_time_limit', None)) + + if data.get('target', None): + subtask.set(queue=data['target']) + + yield subtask + + +def all_success(dg, nodes): + return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes)) From 4b900ca6b2db945518a3852c1e5a601155a085d0 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Wed, 22 Jul 2015 17:04:36 +0300 Subject: [PATCH 3/9] Cleanup traversal module --- solar/solar/orchestration/executor.py | 33 +++++++++++++ solar/solar/orchestration/tasks.py | 24 +++++----- solar/solar/orchestration/traversal.py | 64 +++++++++----------------- 3 files changed, 67 insertions(+), 54 deletions(-) create mode 100644 solar/solar/orchestration/executor.py diff --git a/solar/solar/orchestration/executor.py b/solar/solar/orchestration/executor.py new file mode 100644 index 00000000..1191f6fd --- /dev/null +++ b/solar/solar/orchestration/executor.py @@ -0,0 +1,33 @@ + +from solar.orchestration.runner import app +from celery import group + + +def celery_executor(dg, tasks): + to_execute = [] + for task in tasks: + task_id = '{}.{}'.format(dg.graph['uid'], task) + task = app.tasks[dg.node[task]['type']] + + if all_success(dg, dg.predecessors(task)) or task in control_tasks: + dg.node[node]['status'] = 'INPROGRESS' + for t in generate_task(task, dg.node[task], task_id): + to_execute.append(t) + group(to_execute)() + + +def generate_task(task, data, task_id): + + subtask = task.subtask( + data['args'], task_id=task_id, + time_limit=data.get('time_limit', None), + soft_time_limit=data.get('soft_time_limit', None)) + + if data.get('target', None): + subtask.set(queue=data['target']) + + yield subtask + + +def all_success(dg, nodes): + return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes)) \ No newline at end of file diff --git a/solar/solar/orchestration/tasks.py b/solar/solar/orchestration/tasks.py index 9b614c86..9443c4ac 100644 --- a/solar/solar/orchestration/tasks.py +++ b/solar/solar/orchestration/tasks.py @@ -24,7 +24,7 @@ __all__ = ['solar_resource', 'cmd', 'sleep', 'error', 'fault_tolerance', 'schedule_start', 'schedule_next'] # NOTE(dshulyak) i am not using celery.signals because it is not possible -# to extrace task_id from *task_success* signal +# to extract task_id from *task_success* signal class ReportTask(task.Task): def on_success(self, retval, task_id, args, kwargs): @@ -42,13 +42,13 @@ class ReportTask(task.Task): report_task = partial(app.task, base=ReportTask, bind=True) -@report_task +@report_task(name='solar_resource') def solar_resource(ctxt, resource_name, action): res = resource.load(resource_name) return actions.resource_action(res, action) -@report_task +@report_task(name='cmd') def cmd(ctxt, cmd): popen = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -59,17 +59,17 @@ def cmd(ctxt, cmd): return popen.returncode, out, err -@report_task +@report_task(name='sleep') def sleep(ctxt, seconds): time.sleep(seconds) -@report_task +@report_task(name='error') def error(ctxt, message): raise Exception('message') -@report_task +@report_task(name='fault_tolerance') def fault_tolerance(ctxt, percent): task_id = ctxt.request.id plan_uid, task_name = task_id.rsplit(':', 1) @@ -89,12 +89,12 @@ def fault_tolerance(ctxt, percent): succes_percent, percent)) -@report_task +@report_task(name='echo') def echo(ctxt, message): return message -@report_task +@report_task(name='anchor') def anchor(ctxt, *args): # such tasks should be walked when atleast 1/3/exact number of resources visited dg = graph.get_graph('current') @@ -104,12 +104,12 @@ def anchor(ctxt, *args): def schedule(plan_uid, dg): - next_tasks = list(traverse(dg, control_tasks=(fault_tolerance,))) + next_tasks = list(traverse(dg, control_tasks=('fault_tolerance',))) graph.save_graph(plan_uid, dg) group(next_tasks)() -@app.task +@app.task(name='schedule_start') def schedule_start(plan_uid, start=None, end=None): """On receive finished task should update storage with task result: @@ -120,7 +120,7 @@ def schedule_start(plan_uid, start=None, end=None): schedule(plan_uid, dg) -@app.task +@app.task(name='soft_stop') def soft_stop(plan_uid): dg = graph.get_graph(plan_uid) for n in dg: @@ -129,7 +129,7 @@ def soft_stop(plan_uid): graph.save_graph(plan_uid, dg) -@app.task +@app.task(name='schedule_next') def schedule_next(task_id, status, errmsg=None): plan_uid, task_name = task_id.rsplit(':', 1) dg = graph.get_graph(plan_uid) diff --git a/solar/solar/orchestration/traversal.py b/solar/solar/orchestration/traversal.py index 653cac87..9a3e42e6 100644 --- a/solar/solar/orchestration/traversal.py +++ b/solar/solar/orchestration/traversal.py @@ -1,60 +1,40 @@ +""" + +task should be visited only when predecessors are visited, +visited node could be only in SUCCESS or ERROR + +task can be scheduled for execution if it is not yet visited, and state +not in SKIPPED, INPROGRESS + +PENDING - task that is scheduled to be executed +ERROR - visited node, but failed, can be failed by timeout +SUCCESS - visited node, successfull +INPROGRESS - task already scheduled, can be moved to ERROR or SUCCESS +SKIPPED - not visited, and should be skipped from execution +""" -from solar.orchestration.runner import app - +VISITED = ('SUCCESS', 'ERROR') +BLOCKED = ('INPROGRESS', 'SKIPPED') # TODO(dshulyak) some tasks should be evaluated even if not all predecessors # succeded, how to identify this? # - add ignor_error on edge # - add ignore_predecessor_errors on task in consideration # - make fault_tolerance not a task but a policy for all tasks -def traverse(dg, control_tasks=()): - """ - 1. Node should be visited only when all predecessors already visited - 2. Visited nodes should have any state except PENDING, INPROGRESS, for now - is SUCCESS or ERROR, but it can be extended - 3. If node is INPROGRESS it should not be visited once again - """ +def traverse(dg): + visited = set() for node in dg: data = dg.node[node] - if data['status'] not in ('PENDING', 'INPROGRESS', 'SKIPPED'): + if data['status'] in VISITED: visited.add(node) for node in dg: data = dg.node[node] - if node in visited: - continue - elif data['status'] in ('INPROGRESS', 'SKIPPED'): + if node in visited or data['status'] in BLOCKED: continue - predecessors = set(dg.predecessors(node)) - - if predecessors <= visited: - task_id = '{}:{}'.format(dg.graph['uid'], node) - - task_name = '{}.{}'.format(__name__, data['type']) - task = app.tasks[task_name] - - if all_success(dg, predecessors) or task in control_tasks: - dg.node[node]['status'] = 'INPROGRESS' - for t in generate_task(task, dg, data, task_id): - yield t - - -def generate_task(task, dg, data, task_id): - - subtask = task.subtask( - data['args'], task_id=task_id, - time_limit=data.get('time_limit', None), - soft_time_limit=data.get('soft_time_limit', None)) - - if data.get('target', None): - subtask.set(queue=data['target']) - - yield subtask - - -def all_success(dg, nodes): - return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes)) + if set(dg.predecessors(node)) <= visited: + yield node From 0592683c17aa716231aa94aaee227968dd1a5cdf Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 23 Jul 2015 11:46:58 +0300 Subject: [PATCH 4/9] Integrate traversal/limits/execution in schedule action --- solar/solar/orchestration/executor.py | 10 ++++++---- solar/solar/orchestration/tasks.py | 17 ++++++++++++----- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/solar/solar/orchestration/executor.py b/solar/solar/orchestration/executor.py index 1191f6fd..634c49d6 100644 --- a/solar/solar/orchestration/executor.py +++ b/solar/solar/orchestration/executor.py @@ -3,17 +3,19 @@ from solar.orchestration.runner import app from celery import group -def celery_executor(dg, tasks): +def celery_executor(dg, tasks, control_tasks=()): to_execute = [] for task in tasks: + # task_id needs to be unique, so for each plan we will use + # generated uid of this plan and task_name task_id = '{}.{}'.format(dg.graph['uid'], task) task = app.tasks[dg.node[task]['type']] if all_success(dg, dg.predecessors(task)) or task in control_tasks: - dg.node[node]['status'] = 'INPROGRESS' + dg.node[task]['status'] = 'INPROGRESS' for t in generate_task(task, dg.node[task], task_id): to_execute.append(t) - group(to_execute)() + return group(to_execute) def generate_task(task, data, task_id): @@ -30,4 +32,4 @@ def generate_task(task, data, task_id): def all_success(dg, nodes): - return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes)) \ No newline at end of file + return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes)) diff --git a/solar/solar/orchestration/tasks.py b/solar/solar/orchestration/tasks.py index 9443c4ac..592b8759 100644 --- a/solar/solar/orchestration/tasks.py +++ b/solar/solar/orchestration/tasks.py @@ -5,8 +5,6 @@ import subprocess import time from celery.app import task -from celery import group -from celery.exceptions import Ignore import redis from solar.orchestration import graph @@ -14,7 +12,9 @@ from solar.core import actions from solar.core import resource from solar.system_log.tasks import commit_logitem, error_logitem from solar.orchestration.runner import app -from solar.orchestration.traversal import traversal +from solar.orchestration.traversal import traverse +from solar.orchestration import limits +from solar.orchestration import executor r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) @@ -104,9 +104,16 @@ def anchor(ctxt, *args): def schedule(plan_uid, dg): - next_tasks = list(traverse(dg, control_tasks=('fault_tolerance',))) + tasks = traverse(dg) + limit_chain = limits.get_default_chain( + dg, + [t for t in dg if dg.node[t]['status'] == 'INPROGRESS'], + tasks) + execution = executor.celery_executor( + dg, limit_chain, control_tasks=('fault_tolerance',)) + graph.save_graph(plan_uid, dg) - group(next_tasks)() + execution() @app.task(name='schedule_start') From 48a6cce7614884f2e1fa1b2aaeb00ee4df2a7921 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 23 Jul 2015 13:01:55 +0300 Subject: [PATCH 5/9] Remove/Update outdated tests --- solar/solar/test/test_diff_generation.py | 8 +- .../solar/test/test_stage_commit_procedure.py | 74 -------- .../solar/test/test_update_propagated_data.py | 169 ------------------ 3 files changed, 4 insertions(+), 247 deletions(-) delete mode 100644 solar/solar/test/test_stage_commit_procedure.py delete mode 100644 solar/solar/test/test_update_propagated_data.py diff --git a/solar/solar/test/test_diff_generation.py b/solar/solar/test/test_diff_generation.py index f89b4e71..1245c4a2 100644 --- a/solar/solar/test/test_diff_generation.py +++ b/solar/solar/test/test_diff_generation.py @@ -3,7 +3,7 @@ from pytest import fixture from dictdiffer import revert, patch import networkx as nx -from solar import operations +from solar.system_log import change from solar.core.resource import wrap_resource @@ -32,12 +32,12 @@ def commited(): @fixture def full_diff(staged): - return operations.create_diff(staged, {}) + return change.create_diff(staged, {}) @fixture def diff_for_update(staged, commited): - return operations.create_diff(staged, commited) + return change.create_diff(staged, commited) def test_create_diff_with_empty_commited(full_diff): @@ -98,7 +98,7 @@ def conn_graph(): def test_stage_changes(resources, conn_graph): commited = {} - log = operations._stage_changes(resources, conn_graph, commited, []) + log = change._stage_changes(resources, conn_graph, commited, []) assert len(log) == 3 assert [l.res for l in log] == ['n.1', 'r.1', 'h.1'] diff --git a/solar/solar/test/test_stage_commit_procedure.py b/solar/solar/test/test_stage_commit_procedure.py deleted file mode 100644 index 3d74273c..00000000 --- a/solar/solar/test/test_stage_commit_procedure.py +++ /dev/null @@ -1,74 +0,0 @@ - -import pytest - -from solar.core import resource -from solar import operations -from solar import state - - -@pytest.fixture -def default_resources(): - from solar.core import signals - from solar.core import resource - - node1 = resource.wrap_resource( - {'id': 'node1', - 'input': {'ip': {'value':'10.0.0.3'}}}) - rabbitmq_service1 = resource.wrap_resource( - {'id':'rabbitmq', - 'input': { - 'ip' : {'value': ''}, - 'image': {'value': 'rabbitmq:3-management'}}}) - signals.connect(node1, rabbitmq_service1) - return resource.load_all() - - -@pytest.mark.usefixtures("default_resources") -def test_changes_on_update_image(): - log = operations.stage_changes() - - assert len(log) == 2 - - operations.commit_changes() - - rabbitmq = resource.load('rabbitmq') - rabbitmq.update({'image': 'different'}) - log = operations.stage_changes() - - assert len(log) == 1 - - item = log.items[0] - - assert item.diff == [ - ('change', u'input.image.value', - (u'rabbitmq:3-management', u'different')), - ('change', u'metadata.input.image.value', - (u'rabbitmq:3-management', u'different'))] - - assert item.action == 'update' - - operations.commit_changes() - - commited = state.CD() - - assert commited['rabbitmq']['input']['image'] == { - u'emitter': None, u'value': u'different'} - - reverse = operations.rollback(state.CL().items[-1]) - - assert reverse.diff == [ - ('change', u'input.image.value', - (u'different', u'rabbitmq:3-management')), - ('change', u'metadata.input.image.value', - (u'different', u'rabbitmq:3-management'))] - - operations.commit_changes() - - commited = state.CD() - - assert commited['rabbitmq']['input']['image'] == { - u'emitter': None, u'value': u'rabbitmq:3-management'} - - - - diff --git a/solar/solar/test/test_update_propagated_data.py b/solar/solar/test/test_update_propagated_data.py deleted file mode 100644 index af3d21f4..00000000 --- a/solar/solar/test/test_update_propagated_data.py +++ /dev/null @@ -1,169 +0,0 @@ -import pytest - -from solar.core import signals -from solar.core import resource -from solar import operations - -@pytest.fixture -def resources(): - - node1 = resource.wrap_resource( - {'id': 'node1', - 'input': {'ip': {'value': '10.0.0.3'}}}) - mariadb_service1 = resource.wrap_resource( - {'id': 'mariadb', - 'input': { - 'port' : {'value': 3306}, - 'ip': {'value': ''}}}) - keystone_db = resource.wrap_resource( - {'id':'keystone_db', - 'input': { - 'login_port' : {'value': ''}, - 'ip': {'value': ''}}}) - signals.connect(node1, mariadb_service1) - signals.connect(node1, keystone_db) - signals.connect(mariadb_service1, keystone_db, {'port': 'login_port'}) - return resource.load_all() - - -def test_update_port_on_mariadb(resources): - operations.stage_changes() - operations.commit_changes() - - mariadb = resources['mariadb'] - - mariadb.update({'port': 4400}) - - log = operations.stage_changes() - - assert len(log) == 2 - - mariadb_log = log.items[0] - - assert mariadb_log.diff == [ - ('change', u'input.port.value', (3306, 4400)), - ('change', u'metadata.input.port.value', (3306, 4400))] - - keystone_db_log = log.items[1] - - assert keystone_db_log.diff == [ - ('change', u'input.login_port.value', (3306, 4400)), - ('change', u'metadata.input.login_port.value', (3306, 4400))] - - -@pytest.fixture -def simple_input(): - res1 = resource.wrap_resource( - {'id': 'res1', - 'input': {'ip': {'value': '10.10.0.2'}}}) - res2 = resource.wrap_resource( - {'id': 'res2', - 'input': {'ip': {'value': '10.10.0.3'}}}) - - signals.connect(res1, res2) - return resource.load_all() - - -def test_update_simple_resource(simple_input): - operations.stage_changes() - operations.commit_changes() - - res1 = simple_input['res1'] - res1.update({'ip': '10.0.0.3'}) - - log = operations.stage_changes() - - assert len(log) == 2 - - assert log.items[0].diff == [ - ('change', u'input.ip.value', ('10.10.0.2', '10.0.0.3')), - ('change', 'metadata.input.ip.value', ('10.10.0.2', '10.0.0.3')), - ] - assert log.items[1].diff == [ - ('change', u'input.ip.value', ('10.10.0.2', '10.0.0.3')), - ('change', 'metadata.input.ip.value', ('10.10.0.2', '10.0.0.3')), - ] - - operations.commit_changes() - assert simple_input['res1'].args_dict() == { - 'ip': '10.0.0.3', - } - assert simple_input['res2'].args_dict() == { - 'ip': '10.0.0.3', - } - - log_item = operations.rollback_last() - assert log_item.diff == [ - ('change', u'input.ip.value', (u'10.0.0.3', u'10.10.0.2')), - ('change', 'metadata.input.ip.value', ('10.0.0.3', '10.10.0.2')), - ] - - res2 = resource.load('res2') - assert res2.args_dict() == { - 'ip': '10.10.0.2', - } - - -@pytest.fixture -def list_input(): - res1 = resource.wrap_resource( - {'id': 'res1', - 'input': {'ip': {'value': '10.10.0.2'}}}) - res2 = resource.wrap_resource( - {'id': 'res2', - 'input': {'ip': {'value': '10.10.0.3'}}}) - consumer = resource.wrap_resource( - {'id': 'consumer', - 'input': - {'ips': {'value': [], - 'schema': ['str']}}}) - - signals.connect(res1, consumer, {'ip': 'ips'}) - signals.connect(res2, consumer, {'ip': 'ips'}) - return resource.load_all() - - -def test_update_list_resource(list_input): - operations.stage_changes() - operations.commit_changes() - - res3 = resource.wrap_resource( - {'id': 'res3', - 'input': {'ip': {'value': '10.10.0.4'}}}) - signals.connect(res3, list_input['consumer'], {'ip': 'ips'}) - - log = operations.stage_changes() - - assert len(log) == 2 - - assert log.items[0].res == res3.name - assert log.items[1].diff == [ - ('add', u'connections', [(2, ['res3', u'consumer', ['ip', 'ips']])]), - ('add', u'input.ips', [ - (2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})]), - ('add', u'metadata.input.ips.value', - [(2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})])] - - operations.commit_changes() - assert list_input['consumer'].args_dict() == { - u'ips': [ - {u'emitter_attached_to': u'res1', u'emitter': u'ip', u'value': u'10.10.0.2'}, - {u'emitter_attached_to': u'res2', u'emitter': u'ip', u'value': u'10.10.0.3'}, - {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'}]} - - log_item = operations.rollback_last() - assert log_item.diff == [ - ('remove', u'connections', [(2, ['res3', u'consumer', ['ip', 'ips']])]), - ('remove', u'input.ips', [ - (2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})]), - ('remove', u'metadata.input.ips.value', - [(2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})])] - - consumer = resource.load('consumer') - assert consumer.args_dict() == { - u'ips': [{u'emitter': u'ip', - u'emitter_attached_to': u'res1', - u'value': u'10.10.0.2'}, - {u'emitter': u'ip', - u'emitter_attached_to': u'res2', - u'value': u'10.10.0.3'}]} From d4418a81e1fa7c875ef2f3d63d5811b7d0352369 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 23 Jul 2015 14:12:51 +0300 Subject: [PATCH 6/9] Introduce NOOP state with unit tests for traversal Difference between NOOP and SKIPPED: - NOOP will allow successors to be walked, while SKIPPED will ensure that that successors wont be walked --- solar/solar/orchestration/traversal.py | 8 +--- solar/solar/test/test_traversal.py | 56 ++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 6 deletions(-) create mode 100644 solar/solar/test/test_traversal.py diff --git a/solar/solar/orchestration/traversal.py b/solar/solar/orchestration/traversal.py index 9a3e42e6..2d1be6d2 100644 --- a/solar/solar/orchestration/traversal.py +++ b/solar/solar/orchestration/traversal.py @@ -14,14 +14,10 @@ SKIPPED - not visited, and should be skipped from execution """ -VISITED = ('SUCCESS', 'ERROR') +VISITED = ('SUCCESS', 'ERROR', 'NOOP') BLOCKED = ('INPROGRESS', 'SKIPPED') -# TODO(dshulyak) some tasks should be evaluated even if not all predecessors -# succeded, how to identify this? -# - add ignor_error on edge -# - add ignore_predecessor_errors on task in consideration -# - make fault_tolerance not a task but a policy for all tasks + def traverse(dg): visited = set() diff --git a/solar/solar/test/test_traversal.py b/solar/solar/test/test_traversal.py new file mode 100644 index 00000000..2c2c735d --- /dev/null +++ b/solar/solar/test/test_traversal.py @@ -0,0 +1,56 @@ + + +import networkx as nx +from pytest import fixture + +from solar.orchestration.traversal import traverse + +@fixture +def tasks(): + return [ + {'id': 't1', 'status': 'PENDING'}, + {'id': 't2', 'status': 'PENDING'}, + {'id': 't3', 'status': 'PENDING'}, + {'id': 't4', 'status': 'PENDING'}, + {'id': 't5', 'status': 'PENDING'}] + +@fixture +def dg(tasks): + ex = nx.DiGraph() + for t in tasks: + ex.add_node(t['id'], status=t['status']) + return ex + + +def test_parallel(dg): + dg.add_path(['t1', 't3', 't4', 't5']) + dg.add_path(['t2', 't3']) + + assert set(traverse(dg)) == {'t1', 't2'} + + +def test_walked_only_when_all_predecessors_visited(dg): + dg.add_path(['t1', 't3', 't4', 't5']) + dg.add_path(['t2', 't3']) + + dg.node['t1']['status'] = 'SUCCESS' + dg.node['t2']['status'] = 'INPROGRESS' + + assert set(traverse(dg)) == set() + + dg.node['t2']['status'] = 'SUCCESS' + + assert set(traverse(dg)) == {'t3'} + + +def test_nothing_will_be_walked_if_parent_is_skipped(dg): + dg.add_path(['t1', 't2', 't3', 't4', 't5']) + dg.node['t1']['status'] = 'SKIPPED' + + assert set(traverse(dg)) == set() + +def test_node_will_be_walked_if_parent_is_noop(dg): + dg.add_path(['t1', 't2', 't3', 't4', 't5']) + dg.node['t1']['status'] = 'NOOP' + + assert set(traverse(dg)) == {'t2'} From 54502c72b9719e1aa04f7755149b213b68c007e5 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 23 Jul 2015 15:24:05 +0300 Subject: [PATCH 7/9] Add tests for rules and filtering chain --- solar/solar/orchestration/limits.py | 16 ++++++--- solar/solar/test/test_limits.py | 50 +++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 5 deletions(-) create mode 100644 solar/solar/test/test_limits.py diff --git a/solar/solar/orchestration/limits.py b/solar/solar/orchestration/limits.py index 40e0070e..7e5841b9 100644 --- a/solar/solar/orchestration/limits.py +++ b/solar/solar/orchestration/limits.py @@ -4,7 +4,7 @@ class Chain(object): def __init__(self, dg, inprogress, added): self.dg = dg - self.inprogress + self.inprogress = inprogress self.added = added self.rules = [] @@ -18,6 +18,7 @@ class Chain(object): if not rule(self.dg, self.inprogress, item): break else: + self.inprogress.append(item) yield item def __iter__(self): @@ -36,16 +37,21 @@ def type_based_rule(dg, inprogress, item): """condition will be specified like: type_limit: 2 """ - _type = item['resource_type'] + _type = dg.node[item].get('resource_type') + if not 'type_limit' in dg.node[item]: return True + if not _type: return True + type_count = 0 for n in inprogress: if dg.node[n].get('resource_type') == _type: type_count += 1 - return item['type_limit'] > type_count + return dg.node[item]['type_limit'] > type_count def target_based_rule(dg, inprogress, item, limit=1): - target = item['target'] + target = dg.node[item].get('target') + if not target: return True + target_count = 0 for n in inprogress: if dg.node[n].get('target') == target: @@ -53,5 +59,5 @@ def target_based_rule(dg, inprogress, item, limit=1): return limit > target_count -def items_rule(dg, inprogress, item, limit=10): +def items_rule(dg, inprogress, item, limit=100): return len(inprogress) < limit diff --git a/solar/solar/test/test_limits.py b/solar/solar/test/test_limits.py new file mode 100644 index 00000000..81b2e22d --- /dev/null +++ b/solar/solar/test/test_limits.py @@ -0,0 +1,50 @@ + + +from pytest import fixture +import networkx as nx + +from solar.orchestration import limits + + +@fixture +def dg(): + ex = nx.DiGraph() + ex.add_node('t1', status='PENDING', target='1', + resource_type='node', type_limit=2) + ex.add_node('t2', status='PENDING', target='1', + resource_type='node', type_limit=2) + ex.add_node('t3', status='PENDING', target='1', + resource_type='node', type_limit=2) + return ex + + +def test_target_rule(dg): + + assert limits.target_based_rule(dg, [], 't1') == True + assert limits.target_based_rule(dg, ['t1'], 't2') == False + + +def test_type_limit_rule(dg): + assert limits.type_based_rule(dg, ['t1'], 't2') == True + assert limits.type_based_rule(dg, ['t1', 't2'], 't3') == False + + +def test_items_rule(dg): + + assert limits.items_rule(dg, ['1']*99, '2') + assert limits.items_rule(dg, ['1']*99, '2', limit=10) == False + + +@fixture +def target_dg(): + ex = nx.DiGraph() + ex.add_node('t1', status='PENDING', target='1') + ex.add_node('t2', status='PENDING', target='1') + + return ex + + +def test_filtering_chain(target_dg): + + chain = limits.get_default_chain(target_dg, [], ['t1', 't2']) + assert list(chain) == ['t1'] From e1a83b985503ab67d02bd15609f1a7ee80ec8ff0 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 23 Jul 2015 16:30:27 +0300 Subject: [PATCH 8/9] Add fixes for celery executor and unit test --- solar/solar/orchestration/executor.py | 14 ++++++++------ solar/solar/orchestration/tasks.py | 1 - solar/solar/test/test_celery_executor.py | 22 ++++++++++++++++++++++ 3 files changed, 30 insertions(+), 7 deletions(-) create mode 100644 solar/solar/test/test_celery_executor.py diff --git a/solar/solar/orchestration/executor.py b/solar/solar/orchestration/executor.py index 634c49d6..a27ba564 100644 --- a/solar/solar/orchestration/executor.py +++ b/solar/solar/orchestration/executor.py @@ -5,15 +5,17 @@ from celery import group def celery_executor(dg, tasks, control_tasks=()): to_execute = [] - for task in tasks: + + for task_name in tasks: + # task_id needs to be unique, so for each plan we will use # generated uid of this plan and task_name - task_id = '{}.{}'.format(dg.graph['uid'], task) - task = app.tasks[dg.node[task]['type']] + task_id = '{}:{}'.format(dg.graph['uid'], task_name) + task = app.tasks[dg.node[task_name]['type']] - if all_success(dg, dg.predecessors(task)) or task in control_tasks: - dg.node[task]['status'] = 'INPROGRESS' - for t in generate_task(task, dg.node[task], task_id): + if all_success(dg, dg.predecessors(task_name)) or task_name in control_tasks: + dg.node[task_name]['status'] = 'INPROGRESS' + for t in generate_task(task, dg.node[task_name], task_id): to_execute.append(t) return group(to_execute) diff --git a/solar/solar/orchestration/tasks.py b/solar/solar/orchestration/tasks.py index 592b8759..726980de 100644 --- a/solar/solar/orchestration/tasks.py +++ b/solar/solar/orchestration/tasks.py @@ -111,7 +111,6 @@ def schedule(plan_uid, dg): tasks) execution = executor.celery_executor( dg, limit_chain, control_tasks=('fault_tolerance',)) - graph.save_graph(plan_uid, dg) execution() diff --git a/solar/solar/test/test_celery_executor.py b/solar/solar/test/test_celery_executor.py new file mode 100644 index 00000000..fd2ac90a --- /dev/null +++ b/solar/solar/test/test_celery_executor.py @@ -0,0 +1,22 @@ + +import networkx as nx +from pytest import fixture +from mock import patch + +from solar.orchestration import executor + + +@fixture +def dg(): + ex = nx.DiGraph() + ex.add_node('t1', args=['t'], status='PENDING', type='echo') + ex.graph['uid'] = 'some_string' + return ex + + +@patch.object(executor, 'app') +def test_celery_executor(mapp, dg): + """Just check that it doesnt fail for now. + """ + assert executor.celery_executor(dg, ['t1']) + assert dg.node['t1']['status'] == 'INPROGRESS' From a5683bfd52e1f4415c4a2f072042667c6ca9a064 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 24 Jul 2015 13:34:21 +0300 Subject: [PATCH 9/9] Add test-requirements.txt --- run_tests.sh | 2 +- solar/requirements.txt | 2 +- solar/test-requirements.txt | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) create mode 100644 solar/test-requirements.txt diff --git a/run_tests.sh b/run_tests.sh index f2709cba..7964a5b5 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -16,7 +16,7 @@ fi . $VENV/bin/activate -pip install -r solar/requirements.txt --download-cache=/tmp/$JOB_NAME +pip install -r solar/test-requirements.txt --download-cache=/tmp/$JOB_NAME pushd solar/solar diff --git a/solar/requirements.txt b/solar/requirements.txt index 1fbc343a..1896bfa0 100644 --- a/solar/requirements.txt +++ b/solar/requirements.txt @@ -6,7 +6,6 @@ networkx==1.9.1 PyYAML==3.11 jsonschema==2.4.0 requests==2.7.0 -#mock dictdiffer==0.4.0 enum34==1.0.4 redis==2.10.3 @@ -16,3 +15,4 @@ inflection Fabric==1.10.2 tabulate==0.7.5 ansible +celery diff --git a/solar/test-requirements.txt b/solar/test-requirements.txt new file mode 100644 index 00000000..fe653b6e --- /dev/null +++ b/solar/test-requirements.txt @@ -0,0 +1,2 @@ +-r requirements.txt +mock