diff --git a/run_tests.sh b/run_tests.sh index f2709cba..7964a5b5 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -16,7 +16,7 @@ fi . $VENV/bin/activate -pip install -r solar/requirements.txt --download-cache=/tmp/$JOB_NAME +pip install -r solar/test-requirements.txt --download-cache=/tmp/$JOB_NAME pushd solar/solar diff --git a/solar/requirements.txt b/solar/requirements.txt index 1fbc343a..1896bfa0 100644 --- a/solar/requirements.txt +++ b/solar/requirements.txt @@ -6,7 +6,6 @@ networkx==1.9.1 PyYAML==3.11 jsonschema==2.4.0 requests==2.7.0 -#mock dictdiffer==0.4.0 enum34==1.0.4 redis==2.10.3 @@ -16,3 +15,4 @@ inflection Fabric==1.10.2 tabulate==0.7.5 ansible +celery diff --git a/solar/solar/orchestration/executor.py b/solar/solar/orchestration/executor.py new file mode 100644 index 00000000..a27ba564 --- /dev/null +++ b/solar/solar/orchestration/executor.py @@ -0,0 +1,37 @@ + +from solar.orchestration.runner import app +from celery import group + + +def celery_executor(dg, tasks, control_tasks=()): + to_execute = [] + + for task_name in tasks: + + # task_id needs to be unique, so for each plan we will use + # generated uid of this plan and task_name + task_id = '{}:{}'.format(dg.graph['uid'], task_name) + task = app.tasks[dg.node[task_name]['type']] + + if all_success(dg, dg.predecessors(task_name)) or task_name in control_tasks: + dg.node[task_name]['status'] = 'INPROGRESS' + for t in generate_task(task, dg.node[task_name], task_id): + to_execute.append(t) + return group(to_execute) + + +def generate_task(task, data, task_id): + + subtask = task.subtask( + data['args'], task_id=task_id, + time_limit=data.get('time_limit', None), + soft_time_limit=data.get('soft_time_limit', None)) + + if data.get('target', None): + subtask.set(queue=data['target']) + + yield subtask + + +def all_success(dg, nodes): + return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes)) diff --git a/solar/solar/orchestration/limits.py b/solar/solar/orchestration/limits.py new file mode 100644 index 00000000..7e5841b9 --- /dev/null +++ b/solar/solar/orchestration/limits.py @@ -0,0 +1,63 @@ + + +class Chain(object): + + def __init__(self, dg, inprogress, added): + self.dg = dg + self.inprogress = inprogress + self.added = added + self.rules = [] + + def add_rule(self, rule): + self.rules.append(rule) + + @property + def filtered(self): + for item in self.added: + for rule in self.rules: + if not rule(self.dg, self.inprogress, item): + break + else: + self.inprogress.append(item) + yield item + + def __iter__(self): + return iter(self.filtered) + + +def get_default_chain(dg, inprogress, added): + chain = Chain(dg, inprogress, added) + chain.add_rule(items_rule) + chain.add_rule(target_based_rule) + chain.add_rule(type_based_rule) + return chain + + +def type_based_rule(dg, inprogress, item): + """condition will be specified like: + type_limit: 2 + """ + _type = dg.node[item].get('resource_type') + if not 'type_limit' in dg.node[item]: return True + if not _type: return True + + type_count = 0 + for n in inprogress: + if dg.node[n].get('resource_type') == _type: + type_count += 1 + return dg.node[item]['type_limit'] > type_count + + +def target_based_rule(dg, inprogress, item, limit=1): + target = dg.node[item].get('target') + if not target: return True + + target_count = 0 + for n in inprogress: + if dg.node[n].get('target') == target: + target_count += 1 + return limit > target_count + + +def items_rule(dg, inprogress, item, limit=100): + return len(inprogress) < limit diff --git a/solar/solar/orchestration/tasks.py b/solar/solar/orchestration/tasks.py index 87175086..726980de 100644 --- a/solar/solar/orchestration/tasks.py +++ b/solar/solar/orchestration/tasks.py @@ -5,8 +5,6 @@ import subprocess import time from celery.app import task -from celery import group -from celery.exceptions import Ignore import redis from solar.orchestration import graph @@ -14,6 +12,9 @@ from solar.core import actions from solar.core import resource from solar.system_log.tasks import commit_logitem, error_logitem from solar.orchestration.runner import app +from solar.orchestration.traversal import traverse +from solar.orchestration import limits +from solar.orchestration import executor r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) @@ -23,7 +24,7 @@ __all__ = ['solar_resource', 'cmd', 'sleep', 'error', 'fault_tolerance', 'schedule_start', 'schedule_next'] # NOTE(dshulyak) i am not using celery.signals because it is not possible -# to extrace task_id from *task_success* signal +# to extract task_id from *task_success* signal class ReportTask(task.Task): def on_success(self, retval, task_id, args, kwargs): @@ -41,13 +42,13 @@ class ReportTask(task.Task): report_task = partial(app.task, base=ReportTask, bind=True) -@report_task +@report_task(name='solar_resource') def solar_resource(ctxt, resource_name, action): res = resource.load(resource_name) return actions.resource_action(res, action) -@report_task +@report_task(name='cmd') def cmd(ctxt, cmd): popen = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -58,17 +59,17 @@ def cmd(ctxt, cmd): return popen.returncode, out, err -@report_task +@report_task(name='sleep') def sleep(ctxt, seconds): time.sleep(seconds) -@report_task +@report_task(name='error') def error(ctxt, message): raise Exception('message') -@report_task +@report_task(name='fault_tolerance') def fault_tolerance(ctxt, percent): task_id = ctxt.request.id plan_uid, task_name = task_id.rsplit(':', 1) @@ -88,12 +89,12 @@ def fault_tolerance(ctxt, percent): succes_percent, percent)) -@report_task +@report_task(name='echo') def echo(ctxt, message): return message -@report_task +@report_task(name='anchor') def anchor(ctxt, *args): # such tasks should be walked when atleast 1/3/exact number of resources visited dg = graph.get_graph('current') @@ -103,12 +104,18 @@ def anchor(ctxt, *args): def schedule(plan_uid, dg): - next_tasks = list(traverse(dg)) + tasks = traverse(dg) + limit_chain = limits.get_default_chain( + dg, + [t for t in dg if dg.node[t]['status'] == 'INPROGRESS'], + tasks) + execution = executor.celery_executor( + dg, limit_chain, control_tasks=('fault_tolerance',)) graph.save_graph(plan_uid, dg) - group(next_tasks)() + execution() -@app.task +@app.task(name='schedule_start') def schedule_start(plan_uid, start=None, end=None): """On receive finished task should update storage with task result: @@ -119,7 +126,7 @@ def schedule_start(plan_uid, start=None, end=None): schedule(plan_uid, dg) -@app.task +@app.task(name='soft_stop') def soft_stop(plan_uid): dg = graph.get_graph(plan_uid) for n in dg: @@ -128,7 +135,7 @@ def soft_stop(plan_uid): graph.save_graph(plan_uid, dg) -@app.task +@app.task(name='schedule_next') def schedule_next(task_id, status, errmsg=None): plan_uid, task_name = task_id.rsplit(':', 1) dg = graph.get_graph(plan_uid) @@ -136,62 +143,3 @@ def schedule_next(task_id, status, errmsg=None): dg.node[task_name]['errmsg'] = errmsg schedule(plan_uid, dg) - -# TODO(dshulyak) some tasks should be evaluated even if not all predecessors -# succeded, how to identify this? -# - add ignor_error on edge -# - add ignore_predecessor_errors on task in consideration -# - make fault_tolerance not a task but a policy for all tasks -control_tasks = [fault_tolerance, anchor] - - -def traverse(dg): - """ - 1. Node should be visited only when all predecessors already visited - 2. Visited nodes should have any state except PENDING, INPROGRESS, for now - is SUCCESS or ERROR, but it can be extended - 3. If node is INPROGRESS it should not be visited once again - """ - visited = set() - for node in dg: - data = dg.node[node] - if data['status'] not in ('PENDING', 'INPROGRESS', 'SKIPPED'): - visited.add(node) - - for node in dg: - data = dg.node[node] - - if node in visited: - continue - elif data['status'] in ('INPROGRESS', 'SKIPPED'): - continue - - predecessors = set(dg.predecessors(node)) - - if predecessors <= visited: - task_id = '{}:{}'.format(dg.graph['uid'], node) - - task_name = '{}.{}'.format(__name__, data['type']) - task = app.tasks[task_name] - - if all_success(dg, predecessors) or task in control_tasks: - dg.node[node]['status'] = 'INPROGRESS' - for t in generate_task(task, dg, data, task_id): - yield t - - -def generate_task(task, dg, data, task_id): - - subtask = task.subtask( - data['args'], task_id=task_id, - time_limit=data.get('time_limit', None), - soft_time_limit=data.get('soft_time_limit', None)) - - if data.get('target', None): - subtask.set(queue=data['target']) - - yield subtask - - -def all_success(dg, nodes): - return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes)) diff --git a/solar/solar/orchestration/traversal.py b/solar/solar/orchestration/traversal.py new file mode 100644 index 00000000..2d1be6d2 --- /dev/null +++ b/solar/solar/orchestration/traversal.py @@ -0,0 +1,36 @@ +""" + +task should be visited only when predecessors are visited, +visited node could be only in SUCCESS or ERROR + +task can be scheduled for execution if it is not yet visited, and state +not in SKIPPED, INPROGRESS + +PENDING - task that is scheduled to be executed +ERROR - visited node, but failed, can be failed by timeout +SUCCESS - visited node, successfull +INPROGRESS - task already scheduled, can be moved to ERROR or SUCCESS +SKIPPED - not visited, and should be skipped from execution +""" + + +VISITED = ('SUCCESS', 'ERROR', 'NOOP') +BLOCKED = ('INPROGRESS', 'SKIPPED') + + +def traverse(dg): + + visited = set() + for node in dg: + data = dg.node[node] + if data['status'] in VISITED: + visited.add(node) + + for node in dg: + data = dg.node[node] + + if node in visited or data['status'] in BLOCKED: + continue + + if set(dg.predecessors(node)) <= visited: + yield node diff --git a/solar/solar/test/test_celery_executor.py b/solar/solar/test/test_celery_executor.py new file mode 100644 index 00000000..fd2ac90a --- /dev/null +++ b/solar/solar/test/test_celery_executor.py @@ -0,0 +1,22 @@ + +import networkx as nx +from pytest import fixture +from mock import patch + +from solar.orchestration import executor + + +@fixture +def dg(): + ex = nx.DiGraph() + ex.add_node('t1', args=['t'], status='PENDING', type='echo') + ex.graph['uid'] = 'some_string' + return ex + + +@patch.object(executor, 'app') +def test_celery_executor(mapp, dg): + """Just check that it doesnt fail for now. + """ + assert executor.celery_executor(dg, ['t1']) + assert dg.node['t1']['status'] == 'INPROGRESS' diff --git a/solar/solar/test/test_diff_generation.py b/solar/solar/test/test_diff_generation.py index f89b4e71..1245c4a2 100644 --- a/solar/solar/test/test_diff_generation.py +++ b/solar/solar/test/test_diff_generation.py @@ -3,7 +3,7 @@ from pytest import fixture from dictdiffer import revert, patch import networkx as nx -from solar import operations +from solar.system_log import change from solar.core.resource import wrap_resource @@ -32,12 +32,12 @@ def commited(): @fixture def full_diff(staged): - return operations.create_diff(staged, {}) + return change.create_diff(staged, {}) @fixture def diff_for_update(staged, commited): - return operations.create_diff(staged, commited) + return change.create_diff(staged, commited) def test_create_diff_with_empty_commited(full_diff): @@ -98,7 +98,7 @@ def conn_graph(): def test_stage_changes(resources, conn_graph): commited = {} - log = operations._stage_changes(resources, conn_graph, commited, []) + log = change._stage_changes(resources, conn_graph, commited, []) assert len(log) == 3 assert [l.res for l in log] == ['n.1', 'r.1', 'h.1'] diff --git a/solar/solar/test/test_limits.py b/solar/solar/test/test_limits.py new file mode 100644 index 00000000..81b2e22d --- /dev/null +++ b/solar/solar/test/test_limits.py @@ -0,0 +1,50 @@ + + +from pytest import fixture +import networkx as nx + +from solar.orchestration import limits + + +@fixture +def dg(): + ex = nx.DiGraph() + ex.add_node('t1', status='PENDING', target='1', + resource_type='node', type_limit=2) + ex.add_node('t2', status='PENDING', target='1', + resource_type='node', type_limit=2) + ex.add_node('t3', status='PENDING', target='1', + resource_type='node', type_limit=2) + return ex + + +def test_target_rule(dg): + + assert limits.target_based_rule(dg, [], 't1') == True + assert limits.target_based_rule(dg, ['t1'], 't2') == False + + +def test_type_limit_rule(dg): + assert limits.type_based_rule(dg, ['t1'], 't2') == True + assert limits.type_based_rule(dg, ['t1', 't2'], 't3') == False + + +def test_items_rule(dg): + + assert limits.items_rule(dg, ['1']*99, '2') + assert limits.items_rule(dg, ['1']*99, '2', limit=10) == False + + +@fixture +def target_dg(): + ex = nx.DiGraph() + ex.add_node('t1', status='PENDING', target='1') + ex.add_node('t2', status='PENDING', target='1') + + return ex + + +def test_filtering_chain(target_dg): + + chain = limits.get_default_chain(target_dg, [], ['t1', 't2']) + assert list(chain) == ['t1'] diff --git a/solar/solar/test/test_stage_commit_procedure.py b/solar/solar/test/test_stage_commit_procedure.py deleted file mode 100644 index 3d74273c..00000000 --- a/solar/solar/test/test_stage_commit_procedure.py +++ /dev/null @@ -1,74 +0,0 @@ - -import pytest - -from solar.core import resource -from solar import operations -from solar import state - - -@pytest.fixture -def default_resources(): - from solar.core import signals - from solar.core import resource - - node1 = resource.wrap_resource( - {'id': 'node1', - 'input': {'ip': {'value':'10.0.0.3'}}}) - rabbitmq_service1 = resource.wrap_resource( - {'id':'rabbitmq', - 'input': { - 'ip' : {'value': ''}, - 'image': {'value': 'rabbitmq:3-management'}}}) - signals.connect(node1, rabbitmq_service1) - return resource.load_all() - - -@pytest.mark.usefixtures("default_resources") -def test_changes_on_update_image(): - log = operations.stage_changes() - - assert len(log) == 2 - - operations.commit_changes() - - rabbitmq = resource.load('rabbitmq') - rabbitmq.update({'image': 'different'}) - log = operations.stage_changes() - - assert len(log) == 1 - - item = log.items[0] - - assert item.diff == [ - ('change', u'input.image.value', - (u'rabbitmq:3-management', u'different')), - ('change', u'metadata.input.image.value', - (u'rabbitmq:3-management', u'different'))] - - assert item.action == 'update' - - operations.commit_changes() - - commited = state.CD() - - assert commited['rabbitmq']['input']['image'] == { - u'emitter': None, u'value': u'different'} - - reverse = operations.rollback(state.CL().items[-1]) - - assert reverse.diff == [ - ('change', u'input.image.value', - (u'different', u'rabbitmq:3-management')), - ('change', u'metadata.input.image.value', - (u'different', u'rabbitmq:3-management'))] - - operations.commit_changes() - - commited = state.CD() - - assert commited['rabbitmq']['input']['image'] == { - u'emitter': None, u'value': u'rabbitmq:3-management'} - - - - diff --git a/solar/solar/test/test_traversal.py b/solar/solar/test/test_traversal.py new file mode 100644 index 00000000..2c2c735d --- /dev/null +++ b/solar/solar/test/test_traversal.py @@ -0,0 +1,56 @@ + + +import networkx as nx +from pytest import fixture + +from solar.orchestration.traversal import traverse + +@fixture +def tasks(): + return [ + {'id': 't1', 'status': 'PENDING'}, + {'id': 't2', 'status': 'PENDING'}, + {'id': 't3', 'status': 'PENDING'}, + {'id': 't4', 'status': 'PENDING'}, + {'id': 't5', 'status': 'PENDING'}] + +@fixture +def dg(tasks): + ex = nx.DiGraph() + for t in tasks: + ex.add_node(t['id'], status=t['status']) + return ex + + +def test_parallel(dg): + dg.add_path(['t1', 't3', 't4', 't5']) + dg.add_path(['t2', 't3']) + + assert set(traverse(dg)) == {'t1', 't2'} + + +def test_walked_only_when_all_predecessors_visited(dg): + dg.add_path(['t1', 't3', 't4', 't5']) + dg.add_path(['t2', 't3']) + + dg.node['t1']['status'] = 'SUCCESS' + dg.node['t2']['status'] = 'INPROGRESS' + + assert set(traverse(dg)) == set() + + dg.node['t2']['status'] = 'SUCCESS' + + assert set(traverse(dg)) == {'t3'} + + +def test_nothing_will_be_walked_if_parent_is_skipped(dg): + dg.add_path(['t1', 't2', 't3', 't4', 't5']) + dg.node['t1']['status'] = 'SKIPPED' + + assert set(traverse(dg)) == set() + +def test_node_will_be_walked_if_parent_is_noop(dg): + dg.add_path(['t1', 't2', 't3', 't4', 't5']) + dg.node['t1']['status'] = 'NOOP' + + assert set(traverse(dg)) == {'t2'} diff --git a/solar/solar/test/test_update_propagated_data.py b/solar/solar/test/test_update_propagated_data.py deleted file mode 100644 index af3d21f4..00000000 --- a/solar/solar/test/test_update_propagated_data.py +++ /dev/null @@ -1,169 +0,0 @@ -import pytest - -from solar.core import signals -from solar.core import resource -from solar import operations - -@pytest.fixture -def resources(): - - node1 = resource.wrap_resource( - {'id': 'node1', - 'input': {'ip': {'value': '10.0.0.3'}}}) - mariadb_service1 = resource.wrap_resource( - {'id': 'mariadb', - 'input': { - 'port' : {'value': 3306}, - 'ip': {'value': ''}}}) - keystone_db = resource.wrap_resource( - {'id':'keystone_db', - 'input': { - 'login_port' : {'value': ''}, - 'ip': {'value': ''}}}) - signals.connect(node1, mariadb_service1) - signals.connect(node1, keystone_db) - signals.connect(mariadb_service1, keystone_db, {'port': 'login_port'}) - return resource.load_all() - - -def test_update_port_on_mariadb(resources): - operations.stage_changes() - operations.commit_changes() - - mariadb = resources['mariadb'] - - mariadb.update({'port': 4400}) - - log = operations.stage_changes() - - assert len(log) == 2 - - mariadb_log = log.items[0] - - assert mariadb_log.diff == [ - ('change', u'input.port.value', (3306, 4400)), - ('change', u'metadata.input.port.value', (3306, 4400))] - - keystone_db_log = log.items[1] - - assert keystone_db_log.diff == [ - ('change', u'input.login_port.value', (3306, 4400)), - ('change', u'metadata.input.login_port.value', (3306, 4400))] - - -@pytest.fixture -def simple_input(): - res1 = resource.wrap_resource( - {'id': 'res1', - 'input': {'ip': {'value': '10.10.0.2'}}}) - res2 = resource.wrap_resource( - {'id': 'res2', - 'input': {'ip': {'value': '10.10.0.3'}}}) - - signals.connect(res1, res2) - return resource.load_all() - - -def test_update_simple_resource(simple_input): - operations.stage_changes() - operations.commit_changes() - - res1 = simple_input['res1'] - res1.update({'ip': '10.0.0.3'}) - - log = operations.stage_changes() - - assert len(log) == 2 - - assert log.items[0].diff == [ - ('change', u'input.ip.value', ('10.10.0.2', '10.0.0.3')), - ('change', 'metadata.input.ip.value', ('10.10.0.2', '10.0.0.3')), - ] - assert log.items[1].diff == [ - ('change', u'input.ip.value', ('10.10.0.2', '10.0.0.3')), - ('change', 'metadata.input.ip.value', ('10.10.0.2', '10.0.0.3')), - ] - - operations.commit_changes() - assert simple_input['res1'].args_dict() == { - 'ip': '10.0.0.3', - } - assert simple_input['res2'].args_dict() == { - 'ip': '10.0.0.3', - } - - log_item = operations.rollback_last() - assert log_item.diff == [ - ('change', u'input.ip.value', (u'10.0.0.3', u'10.10.0.2')), - ('change', 'metadata.input.ip.value', ('10.0.0.3', '10.10.0.2')), - ] - - res2 = resource.load('res2') - assert res2.args_dict() == { - 'ip': '10.10.0.2', - } - - -@pytest.fixture -def list_input(): - res1 = resource.wrap_resource( - {'id': 'res1', - 'input': {'ip': {'value': '10.10.0.2'}}}) - res2 = resource.wrap_resource( - {'id': 'res2', - 'input': {'ip': {'value': '10.10.0.3'}}}) - consumer = resource.wrap_resource( - {'id': 'consumer', - 'input': - {'ips': {'value': [], - 'schema': ['str']}}}) - - signals.connect(res1, consumer, {'ip': 'ips'}) - signals.connect(res2, consumer, {'ip': 'ips'}) - return resource.load_all() - - -def test_update_list_resource(list_input): - operations.stage_changes() - operations.commit_changes() - - res3 = resource.wrap_resource( - {'id': 'res3', - 'input': {'ip': {'value': '10.10.0.4'}}}) - signals.connect(res3, list_input['consumer'], {'ip': 'ips'}) - - log = operations.stage_changes() - - assert len(log) == 2 - - assert log.items[0].res == res3.name - assert log.items[1].diff == [ - ('add', u'connections', [(2, ['res3', u'consumer', ['ip', 'ips']])]), - ('add', u'input.ips', [ - (2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})]), - ('add', u'metadata.input.ips.value', - [(2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})])] - - operations.commit_changes() - assert list_input['consumer'].args_dict() == { - u'ips': [ - {u'emitter_attached_to': u'res1', u'emitter': u'ip', u'value': u'10.10.0.2'}, - {u'emitter_attached_to': u'res2', u'emitter': u'ip', u'value': u'10.10.0.3'}, - {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'}]} - - log_item = operations.rollback_last() - assert log_item.diff == [ - ('remove', u'connections', [(2, ['res3', u'consumer', ['ip', 'ips']])]), - ('remove', u'input.ips', [ - (2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})]), - ('remove', u'metadata.input.ips.value', - [(2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})])] - - consumer = resource.load('consumer') - assert consumer.args_dict() == { - u'ips': [{u'emitter': u'ip', - u'emitter_attached_to': u'res1', - u'value': u'10.10.0.2'}, - {u'emitter': u'ip', - u'emitter_attached_to': u'res2', - u'value': u'10.10.0.3'}]} diff --git a/solar/test-requirements.txt b/solar/test-requirements.txt new file mode 100644 index 00000000..fe653b6e --- /dev/null +++ b/solar/test-requirements.txt @@ -0,0 +1,2 @@ +-r requirements.txt +mock