diff --git a/celery.yml b/celery.yml index 4b5f9878..3874e2f2 100644 --- a/celery.yml +++ b/celery.yml @@ -10,9 +10,9 @@ - shell: celery multi kill 2 chdir={{celery_dir}} tags: [stop] - - shell: celery multi start 2 -A solar.orchestration.tasks -Q:1 celery,scheduler -Q:2 celery,{{hostname.stdout}} + - shell: celery multi start 2 -A solar.orchestration.runner -Q:1 scheduler,system_log -Q:2 celery,{{hostname.stdout}} chdir={{celery_dir}} tags: [master] - - shell: celery multi start 1 -A solar.orchestration.tasks -Q:1 celery,{{hostname.stdout}} + - shell: celery multi start 1 -A solar.orchestration.runner -Q:1 {{hostname.stdout}} chdir={{celery_dir}} - tags: [slave] \ No newline at end of file + tags: [slave] diff --git a/solar/solar/cli/main.py b/solar/solar/cli/main.py index 49f6bd85..71d3b36b 100644 --- a/solar/solar/cli/main.py +++ b/solar/solar/cli/main.py @@ -28,8 +28,6 @@ import tabulate import yaml from solar import utils -from solar import operations -from solar import state from solar.core import actions from solar.core import resource as sresource from solar.core.resource import assign_resources_to_nodes @@ -38,8 +36,11 @@ from solar.core.tags_set_parser import Expression from solar.core import testing from solar.core.resource import virtual_resource as vr from solar.interfaces.db import get_db +from solar import errors +from solar.core.log import log from solar.cli.orch import orchestration +from solar.cli.system_log import changes # NOTE: these are extensions, they shouldn't be imported here # Maybe each extension can also extend the CLI with parsers @@ -146,54 +147,6 @@ def init_actions(): actions.resource_action(resource_obj, action) -def init_changes(): - @main.group() - def changes(): - pass - - @changes.command() - def validate(): - errors = vr.validate_resources() - if errors: - for r, error in errors: - print 'ERROR: %s: %s' % (r.name, error) - sys.exit(1) - - @changes.command() - def stage(): - log = operations.stage_changes() - click.echo(log.show()) - - @changes.command() - @click.option('--one', is_flag=True, default=False) - def commit(one): - if one: - operations.commit_one() - else: - operations.commit_changes() - - @changes.command() - @click.option('--limit', default=5) - def history(limit): - click.echo(state.CL().show()) - - @changes.command() - @click.option('--last', is_flag=True, default=False) - @click.option('--all', is_flag=True, default=False) - @click.option('--uid', default=None) - def rollback(last, all, uid): - if last: - click.echo(operations.rollback_last()) - elif all: - click.echo(operations.rollback_all()) - elif uid: - click.echo(operations.rollback_uid(uid)) - - @changes.command() - def test(): - testing.test_all() - - def init_cli_connect(): @main.command() @click.argument('emitter') @@ -289,8 +242,13 @@ def init_cli_resource(): click.echo( 'action {} for resource {}'.format(action, resource) ) - actions.resource_action(sresource.load(resource), action) + r = sresource.load(resource) + try: + actions.resource_action(r, action) + except errors.SolarError as e: + log.debug(e) + sys.exit(1) @resource.command() def compile_all(): @@ -314,7 +272,8 @@ def init_cli_resource(): @resource.command() @click.argument('name') - @click.argument('base_path', type=click.Path(exists=True, file_okay=True)) + @click.argument( + 'base_path', type=click.Path(exists=True, resolve_path=True)) @click.argument('args', nargs=-1) def create(args, base_path, name): args_parsed = {} @@ -425,13 +384,13 @@ def init_cli_resource(): def run(): init_actions() - init_changes() init_cli_connect() init_cli_connections() init_cli_deployment_config() init_cli_resource() main.add_command(orchestration) + main.add_command(changes) main() diff --git a/solar/solar/cli/orch.py b/solar/solar/cli/orch.py index 74c46ef6..b17a8a33 100644 --- a/solar/solar/cli/orch.py +++ b/solar/solar/cli/orch.py @@ -21,17 +21,20 @@ def orchestration(): restart --reset """ + @orchestration.command() @click.argument('plan', type=click.File('rb')) def create(plan): click.echo(graph.create_plan(plan.read())) + @orchestration.command() @click.argument('uid') @click.argument('plan', type=click.File('rb')) def update(uid, plan): graph.update_plan(uid, plan.read()) + @orchestration.command() @click.argument('uid') def report(uid): @@ -102,5 +105,14 @@ def dg(uid): for n in plan: color = colors[plan.node[n]['status']] plan.node[n]['color'] = color - nx.write_dot(plan, 'graph.dot') - subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png']) + nx.write_dot(plan, '{name}.dot'.format(name=plan.graph['name'])) + subprocess.call( + 'tred {name}.dot | dot -Tpng -o {name}.png'.format(name=plan.graph['name']), + shell=True) + click.echo('Created {name}.png'.format(name=plan.graph['name'])) + + +@orchestration.command() +@click.argument('uid') +def show(uid): + click.echo(graph.show(uid)) diff --git a/solar/solar/cli/system_log.py b/solar/solar/cli/system_log.py new file mode 100644 index 00000000..896655f5 --- /dev/null +++ b/solar/solar/cli/system_log.py @@ -0,0 +1,66 @@ + +import sys + +import click + +from solar.core import testing +from solar.core import resource +from solar.system_log import change +from solar.system_log import operations +from solar.system_log import data + + +@click.group() +def changes(): + pass + + +@changes.command() +def validate(): + errors = resource.validate_resources() + if errors: + for r, error in errors: + print 'ERROR: %s: %s' % (r.name, error) + sys.exit(1) + + +@changes.command() +def stage(): + log = change.stage_changes() + staged = list(log.reverse()) + if not staged: + click.echo('No changes') + click.echo(staged) + + +@changes.command() +def process(): + click.echo(change.send_to_orchestration()) + + +@changes.command() +@click.argument('uid') +def commit(uid): + operations.commit(uid) + + +@changes.command() +@click.option('-n', default=5) +def history(n): + commited = list(data.CL().collection(n)) + if not commited: + click.echo('No history.') + return + commited.reverse() + click.echo(commited) + + +@changes.command() +def test(): + testing.test_all() + + +@changes.command(name='clean-history') +def clean_history(): + data.CL().clean() + data.CD().clean() diff --git a/solar/solar/core/handlers/ansible_template.py b/solar/solar/core/handlers/ansible_template.py index dc6e8d45..f90d9c6a 100644 --- a/solar/solar/core/handlers/ansible_template.py +++ b/solar/solar/core/handlers/ansible_template.py @@ -1,11 +1,16 @@ # -*- coding: utf-8 -*- from fabric import api as fabric_api +from fabric.state import env import os from solar.core.log import log from solar.core.handlers.base import TempFileHandler +from solar import errors +# otherwise fabric will sys.exit(1) in case of errors +env.warn_only = True + class AnsibleTemplate(TempFileHandler): def action(self, resource, action_name): inventory_file = self._create_inventory(resource) @@ -15,12 +20,10 @@ class AnsibleTemplate(TempFileHandler): call_args = ['ansible-playbook', '--module-path', '/vagrant/library', '-i', inventory_file, playbook_file] log.debug('EXECUTING: %s', ' '.join(call_args)) - try: - fabric_api.local(' '.join(call_args)) - except Exception as e: - log.error(e.output) - log.exception(e) - raise + out = fabric_api.local(' '.join(call_args), capture=True) + if out.failed: + raise errors.SolarError(out) + def _create_inventory(self, r): directory = self.dirs[r.name] diff --git a/solar/solar/core/resource/__init__.py b/solar/solar/core/resource/__init__.py index bbc61314..2a53b59d 100644 --- a/solar/solar/core/resource/__init__.py +++ b/solar/solar/core/resource/__init__.py @@ -7,6 +7,7 @@ __all__ = [ 'load_all', 'prepare_meta', 'wrap_resource', + 'validate_resources', ] @@ -18,3 +19,4 @@ from solar.core.resource.resource import load_all from solar.core.resource.resource import wrap_resource from solar.core.resource.virtual_resource import create from solar.core.resource.virtual_resource import prepare_meta +from solar.core.resource.virtual_resource import validate_resources diff --git a/solar/solar/core/resource/virtual_resource.py b/solar/solar/core/resource/virtual_resource.py index 569e690c..8e2d3633 100644 --- a/solar/solar/core/resource/virtual_resource.py +++ b/solar/solar/core/resource/virtual_resource.py @@ -41,9 +41,11 @@ def create_virtual_resource(vr_name, template): resources = template['resources'] connections = [] created_resources = [] + + cwd = os.getcwd() for resource in resources: name = resource['id'] - base_path = resource['from'] + base_path = os.path.join(cwd, resource['from']) args = resource['values'] new_resources = create(name, base_path, args, vr_name) created_resources += new_resources diff --git a/solar/solar/interfaces/db/redis_db.py b/solar/solar/interfaces/db/redis_db.py index ad4730fa..eaea121c 100644 --- a/solar/solar/interfaces/db/redis_db.py +++ b/solar/solar/interfaces/db/redis_db.py @@ -67,6 +67,9 @@ class RedisDB(object): def clear(self): self._r.flushdb() + def get_set(self, collection): + return OrderedSet(self._r, collection) + def clear_collection(self, collection=COLLECTIONS.resource): key_glob = self._make_key(collection, '*') @@ -83,6 +86,57 @@ class RedisDB(object): return '{0}:{1}'.format(collection, _id) +class OrderedSet(object): + + def __init__(self, client, collection): + self.r = client + self.collection = collection + self.order_counter = '{}:incr'.format(collection) + self.order = '{}:order'.format(collection) + + def add(self, items): + pipe = self.r.pipeline() + for key, value in items: + count = self.r.incr(self.order_counter) + pipe.zadd(self.order, count, key) + pipe.hset(self.collection, key, json.dumps(value)) + pipe.execute() + + def rem(self, keys): + pipe = self.r.pipeline() + for key in keys: + pipe.zrem(self.order, key) + pipe.hdel(self.collection, key) + pipe.execute() + + def get(self, key): + value = self.r.hget(self.collection, key) + if value: + return json.loads(value) + return None + + def update(self, key, value): + self.r.hset(self.collection, key, json.dumps(value)) + + def clean(self): + self.rem(self.r.zrange(self.order, 0, -1)) + + def rem_left(self, n=1): + self.rem(r.zrevrange(self.order, 0, n-1)) + + def reverse(self, n=1): + result = [] + for key in self.r.zrevrange(self.order, 0, n-1): + result.append(self.get(key)) + return result + + def list(self, n=0): + result = [] + for key in self.r.zrange(self.order, 0, n-1): + result.append(self.get(key)) + return result + + class FakeRedisDB(RedisDB): REDIS_CLIENT = fakeredis.FakeStrictRedis diff --git a/solar/solar/operations.py b/solar/solar/operations.py deleted file mode 100644 index 6bf91051..00000000 --- a/solar/solar/operations.py +++ /dev/null @@ -1,197 +0,0 @@ - - -from solar import state -from solar.core.log import log -from solar.core import signals -from solar.core import resource -from solar import utils -from solar.interfaces.db import get_db -from solar.core import actions - -db = get_db() - -from dictdiffer import diff, patch, revert -from fabric import api as fabric_api -import networkx as nx - - -def guess_action(from_, to): - # TODO(dshulyak) it should be more flexible - if not from_: - return 'run' - elif not to: - return 'remove' - else: - # it should be update - return 'update' - - -def connections(res, graph): - result = [] - for pred in graph.predecessors(res.name): - for num, edge in graph.get_edge_data(pred, res.name).items(): - if 'label' in edge: - if ':' in edge['label']: - parent, child = edge['label'].split(':') - mapping = [parent, child] - else: - mapping = [edge['label'], edge['label']] - else: - mapping = None - result.append([pred, res.name, mapping]) - return result - - -def to_dict(resource, graph): - res = resource.to_dict() - res['connections'] = connections(resource, graph) - return res - - -def create_diff(staged, commited): - if 'connections' in commited: - commited['connections'].sort() - staged['connections'].sort() - if 'tags' in commited: - commited['tags'].sort() - staged['tags'].sort() - - return list(diff(commited, staged)) - - -def _stage_changes(staged_resources, conn_graph, - commited_resources, staged_log): - - try: - srt = nx.topological_sort(conn_graph) - except: - for cycle in nx.simple_cycles(conn_graph): - log.debug('CYCLE: %s', cycle) - raise - - for res_uid in srt: - commited_data = commited_resources.get(res_uid, {}) - staged_data = staged_resources.get(res_uid, {}) - - df = create_diff(staged_data, commited_data) - - if df: - - log_item = state.LogItem( - utils.generate_uuid(), - res_uid, - df, - guess_action(commited_data, staged_data)) - staged_log.append(log_item) - return staged_log - - -def stage_changes(): - conn_graph = signals.detailed_connection_graph() - staged = {r.name: to_dict(r, conn_graph) for r in resource.load_all().values()} - commited = state.CD() - log = state.SL() - log.delete() - return _stage_changes(staged, conn_graph, commited, log) - - -def execute(res, action): - try: - actions.resource_action(res, action) - return state.STATES.success - except Exception as e: - return state.STATES.error - - -def commit(li, resources, commited, history): - - staged_res = resources[li.res] - staged_data = patch(li.diff, commited.get(li.res, {})) - - # TODO(dshulyak) think about this hack for update - if li.action == 'update': - commited_res = resource.wrap_resource( - commited[li.res]['metadata']) - result_state = execute(commited_res, 'remove') - - staged_res.set_args_from_dict(staged_data['input']) - - if result_state is state.STATES.success: - result_state = execute(staged_res, 'run') - else: - result_state = execute(staged_res, li.action) - - # resource_action return None in case there is no actions - result_state = result_state or state.STATES.success - - commited[li.res] = staged_data - li.state = result_state - - history.append(li) - - if result_state is state.STATES.error: - raise Exception('Failed') - - -def commit_one(): - commited = state.CD() - history = state.CL() - staged = state.SL() - - resources = resource.load_all() - commit(staged.popleft(), resources, commited, history) - - -def commit_changes(): - # just shortcut to test stuff - commited = state.CD() - history = state.CL() - staged = state.SL() - resources = resource.load_all() - - while staged: - commit(staged.popleft(), resources, commited, history) - - -def rollback(log_item): - log = state.SL() - - resources = resource.load_all() - commited = state.CD()[log_item.res] - - staged = revert(log_item.diff, commited) - - for e, r, mapping in commited.get('connections', ()): - signals.disconnect(resources[e], resources[r]) - - for e, r, mapping in staged.get('connections', ()): - signals.connect(resources[e], resources[r], dict([mapping])) - - df = create_diff(staged, commited) - - log_item = state.LogItem( - utils.generate_uuid(), - log_item.res, df, guess_action(commited, staged)) - log.append(log_item) - - res = resource.load(log_item.res) - res.set_args_from_dict(staged['input']) - - return log_item - - -def rollback_uid(uid): - item = next(l for l in state.CL() if l.uid == uid) - return rollback(item) - - -def rollback_last(): - l = state.CL().items[-1] - return rollback(l) - - -def rollback_all(): - cl = state.CL() - - while cl: - rollback(cl.pop()) diff --git a/solar/solar/orchestration/graph.py b/solar/solar/orchestration/graph.py index d7342e23..4913ab34 100644 --- a/solar/solar/orchestration/graph.py +++ b/solar/solar/orchestration/graph.py @@ -7,6 +7,8 @@ import networkx as nx import redis import yaml +from solar import utils + r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) @@ -47,13 +49,35 @@ def parse_plan(plan_data): return dg +def create_plan_from_graph(dg): + dg.graph['uid'] = "{0}:{1}".format(dg.graph['name'], str(uuid.uuid4())) + save_graph(dg.graph['uid'], dg) + return dg.graph['uid'] + + +def show(uid): + dg = get_graph(uid) + result = {} + tasks = [] + result['uid'] = dg.graph['uid'] + result['name'] = dg.graph['name'] + for n in nx.topological_sort(dg): + data = dg.node[n] + tasks.append( + {'uid': n, + 'parameters': data, + 'before': dg.successors(n), + 'after': dg.predecessors(n) + }) + result['tasks'] = tasks + return utils.yaml_dump(result) + + def create_plan(plan_data): """ """ dg = parse_plan(plan_data) - dg.graph['uid'] = "{0}:{1}".format(dg.graph['name'], str(uuid.uuid4())) - save_graph(dg.graph['uid'], dg) - return dg.graph['uid'] + return create_plan_from_graph(dg) def update_plan(uid, plan_data): diff --git a/solar/solar/orchestration/runner.py b/solar/solar/orchestration/runner.py new file mode 100644 index 00000000..a98ea438 --- /dev/null +++ b/solar/solar/orchestration/runner.py @@ -0,0 +1,11 @@ + + +from celery import Celery + +app = Celery( + include=['solar.system_log.tasks', 'solar.orchestration.tasks'], + backend='redis://10.0.0.2:6379/1', + broker='redis://10.0.0.2:6379/1') +app.conf.update(CELERY_ACCEPT_CONTENT = ['json']) +app.conf.update(CELERY_TASK_SERIALIZER = 'json') + diff --git a/solar/solar/orchestration/tasks.py b/solar/solar/orchestration/tasks.py index 49794ac9..16f4bc85 100644 --- a/solar/solar/orchestration/tasks.py +++ b/solar/solar/orchestration/tasks.py @@ -1,12 +1,9 @@ - - from functools import partial, wraps from itertools import islice import subprocess import time -from celery import Celery from celery.app import task from celery import group from celery.exceptions import Ignore @@ -15,28 +12,30 @@ import redis from solar.orchestration import graph from solar.core import actions from solar.core import resource +from solar.system_log.tasks import commit_logitem, error_logitem +from solar.orchestration.runner import app -app = Celery( - 'tasks', - backend='redis://10.0.0.2:6379/1', - broker='redis://10.0.0.2:6379/1') -app.conf.update(CELERY_ACCEPT_CONTENT = ['json']) -app.conf.update(CELERY_TASK_SERIALIZER = 'json') - r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) +__all__ = ['solar_resource', 'cmd', 'sleep', + 'error', 'fault_tolerance', 'schedule_start', 'schedule_next'] + +# NOTE(dshulyak) i am not using celery.signals because it is not possible +# to extrace task_id from *task_success* signal class ReportTask(task.Task): def on_success(self, retval, task_id, args, kwargs): schedule_next.apply_async(args=[task_id, 'SUCCESS'], queue='scheduler') + commit_logitem.apply_async(args=[task_id], queue='system_log') def on_failure(self, exc, task_id, args, kwargs, einfo): schedule_next.apply_async( args=[task_id, 'ERROR'], kwargs={'errmsg': str(einfo.exception)}, queue='scheduler') + error_logitem.apply_async(args=[task_id], queue='system_log') report_task = partial(app.task, base=ReportTask, bind=True) @@ -106,7 +105,6 @@ def anchor(ctxt, *args): def schedule(plan_uid, dg): next_tasks = list(traverse(dg)) graph.save_graph(plan_uid, dg) - print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) group(next_tasks)() diff --git a/solar/solar/state.py b/solar/solar/state.py deleted file mode 100644 index d6c6d7c9..00000000 --- a/solar/solar/state.py +++ /dev/null @@ -1,152 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import collections -from collections import deque -from functools import partial - -from solar import utils - -from enum import Enum - -from solar.interfaces.db import get_db - -db = get_db() - - -STATES = Enum('States', 'error inprogress pending success') - - -def state_file(name): - if 'log' in name: - return Log(name) - elif 'data' in name: - return Data(name) - - -CD = partial(state_file, 'commited_data') -SD = partial(state_file, 'staged_data') -SL = partial(state_file, 'stage_log') -IL = partial(state_file, 'inprogress_log') -CL = partial(state_file, 'commit_log') - - -class LogItem(object): - - def __init__(self, uid, res_uid, diff, action, state=None): - self.uid = uid - self.res = res_uid - self.diff = diff - self.state = state or STATES.pending - self.action = action - - def to_yaml(self): - return utils.yaml_dump(self.to_dict()) - - def to_dict(self): - return {'uid': self.uid, - 'res': self.res, - 'diff': self.diff, - 'state': self.state.name, - 'action': self.action} - - def __str__(self): - return self.to_yaml() - - def __repr__(self): - return self.to_yaml() - - -class Log(object): - - def __init__(self, path): - self.path = path - items = [] - r = db.read(path, collection=db.COLLECTIONS.state_log) - if r: - items = r or items - - self.items = deque([LogItem( - l['uid'], l['res'], - l['diff'], l['action'], - getattr(STATES, l['state'])) for l in items]) - - def delete(self): - self.items = deque() - db.delete(self.path, db.COLLECTIONS.state_log) - - def sync(self): - db.save( - self.path, - [i.to_dict() for i in self.items], - collection=db.COLLECTIONS.state_log - ) - - def append(self, logitem): - self.items.append(logitem) - self.sync() - - def popleft(self): - item = self.items.popleft() - self.sync() - return item - - def pop(self): - item = self.items.pop() - self.sync() - return item - - def show(self, verbose=False): - return ['L(uuid={0}, res={1}, action={2})'.format( - l.uid, l.res, l.action) for l in self.items] - - def __len__(self): - return len(self.items) - - def __repr__(self): - return 'Log({0})'.format(self.path) - - def __iter__(self): - return iter(self.items) - - def __nonzero__(self): - return bool(self.items) - - -class Data(collections.MutableMapping): - - def __init__(self, path): - self.path = path - self.store = {} - r = db.read(path, collection=db.COLLECTIONS.state_data) - if r: - self.store = r or self.store - - def __getitem__(self, key): - return self.store[key] - - def __setitem__(self, key, value): - self.store[key] = value - db.save(self.path, self.store, collection=db.COLLECTIONS.state_data) - - def __delitem__(self, key): - self.store.pop(key) - db.save(self.path, self.store, collection=db.COLLECTIONS.state_data) - - def __iter__(self): - return iter(self.store) - - def __len__(self): - return len(self.store) diff --git a/solar/solar/system_log/__init__.py b/solar/solar/system_log/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/solar/solar/system_log/change.py b/solar/solar/system_log/change.py new file mode 100644 index 00000000..16588c46 --- /dev/null +++ b/solar/solar/system_log/change.py @@ -0,0 +1,113 @@ + + +from dictdiffer import diff, patch, revert +import networkx as nx + +from solar.core.log import log +from solar.core import signals +from solar.core import resource +from solar import utils +from solar.interfaces.db import get_db +from solar.core import actions +from solar.system_log import data +from solar.orchestration import graph + +db = get_db() + + +def guess_action(from_, to): + # NOTE(dshulyak) imo the way to solve this - is dsl for orchestration, + # something where this action will be excplicitly specified + if not from_: + return 'run' + elif not to: + return 'remove' + else: + return 'update' + + +def connections(res, graph): + result = [] + for pred in graph.predecessors(res.name): + for num, edge in graph.get_edge_data(pred, res.name).items(): + if 'label' in edge: + if ':' in edge['label']: + parent, child = edge['label'].split(':') + mapping = [parent, child] + else: + mapping = [edge['label'], edge['label']] + else: + mapping = None + result.append([pred, res.name, mapping]) + return result + + +def create_diff(staged, commited): + return list(diff(commited, staged)) + + +def _stage_changes(staged_resources, conn_graph, + commited_resources, staged_log): + + try: + srt = nx.topological_sort(conn_graph) + except: + for cycle in nx.simple_cycles(conn_graph): + log.debug('CYCLE: %s', cycle) + raise + + for res_uid in srt: + commited_data = commited_resources.get(res_uid, {}) + staged_data = staged_resources.get(res_uid, {}) + + df = create_diff(staged_data, commited_data) + + if df: + log_item = data.LogItem( + utils.generate_uuid(), + res_uid, + df, + guess_action(commited_data, staged_data)) + staged_log.append(log_item) + return staged_log + + +def stage_changes(): + log = data.SL() + log.clean() + conn_graph = signals.detailed_connection_graph() + staged = {r.name: r.args_show() + for r in resource.load_all().values()} + commited = data.CD() + return _stage_changes(staged, conn_graph, commited, log) + + +def send_to_orchestration(): + conn_graph = signals.detailed_connection_graph() + dg = nx.DiGraph() + staged = {r.name: r.args_show() + for r in resource.load_all().values()} + commited = data.CD() + + for res_uid in conn_graph: + commited_data = commited.get(res_uid, {}) + staged_data = staged.get(res_uid, {}) + + df = create_diff(staged_data, commited_data) + + if df: + dg.add_node( + res_uid, status='PENDING', + errmsg=None, + **parameters(res_uid, guess_action(commited_data, staged_data))) + for pred in conn_graph.predecessors(res_uid): + dg.add_edge(pred, res_uid) + + # what it should be? + dg.graph['name'] = 'system_log' + return graph.create_plan_from_graph(dg) + + +def parameters(res, action): + return {'args': [res, action], + 'type': 'solar_resource'} diff --git a/solar/solar/system_log/data.py b/solar/solar/system_log/data.py new file mode 100644 index 00000000..0c99fdce --- /dev/null +++ b/solar/solar/system_log/data.py @@ -0,0 +1,129 @@ + +import os +import collections +from collections import deque +from functools import partial + +from solar import utils +from solar.interfaces.db import get_db + +from enum import Enum + + +db = get_db() + + +STATES = Enum('States', 'error inprogress pending success') + + +def state_file(name): + if 'log' in name: + return Log(name) + elif 'data' in name: + return Data(name) + + +CD = partial(state_file, 'commited_data') +SL = partial(state_file, 'stage_log') +CL = partial(state_file, 'commit_log') + + +class LogItem(object): + + def __init__(self, uid, res, diff, action, state=None): + self.uid = uid + self.res = res + self.diff = diff + self.state = state or STATES.pending + self.action = action + + def to_yaml(self): + return utils.yaml_dump(self.to_dict()) + + def to_dict(self): + return {'uid': self.uid, + 'res': self.res, + 'diff': self.diff, + 'state': self.state.name, + 'action': self.action} + + @classmethod + def from_dict(cls, **kwargs): + state = getattr(STATES, kwargs.get('state', ''), STATES.pending) + kwargs['state'] = state + return cls(**kwargs) + + def __str__(self): + return self.to_yaml() + + def __repr__(self): + return self.to_yaml() + + +class Log(object): + + def __init__(self, path): + self.ordered_log = db.get_set(path) + + def append(self, logitem): + self.ordered_log.add([(logitem.res, logitem.to_dict())]) + + def pop(self, uid): + item = self.get(uid) + if not item: + return None + self.ordered_log.rem([uid]) + return item + + def update(self, logitem): + self.ordered_log.update(logitem.res, logitem.to_dict()) + + def clean(self): + self.ordered_log.clean() + + def get(self, key): + item = self.ordered_log.get(key) + if item: + return LogItem.from_dict(**item) + return None + + def collection(self, n=0): + for item in self.ordered_log.reverse(n=n): + yield LogItem.from_dict(**item) + + def reverse(self, n=0): + for item in self.ordered_log.list(n=n): + yield LogItem.from_dict(**item) + + def __iter__(self): + return iter(self.collection()) + + +class Data(collections.MutableMapping): + + def __init__(self, path): + self.path = path + self.store = {} + r = db.read(path, collection=db.COLLECTIONS.state_data) + if r: + self.store = r or self.store + + def __getitem__(self, key): + return self.store[key] + + def __setitem__(self, key, value): + self.store[key] = value + db.save(self.path, self.store, collection=db.COLLECTIONS.state_data) + + def __delitem__(self, key): + self.store.pop(key) + db.save(self.path, self.store, collection=db.COLLECTIONS.state_data) + + def __iter__(self): + return iter(self.store) + + def __len__(self): + return len(self.store) + + def clean(self): + db.save(self.path, {}, collection=db.COLLECTIONS.state_data) diff --git a/solar/solar/system_log/operations.py b/solar/solar/system_log/operations.py new file mode 100644 index 00000000..82714837 --- /dev/null +++ b/solar/solar/system_log/operations.py @@ -0,0 +1,24 @@ + + +from solar.system_log import data +from dictdiffer import patch + + +def set_error(task_uuid, *args, **kwargs): + sl = data.SL() + item = sl.get(task_uuid) + if item: + item.state = data.STATES.error + sl.update(item) + + +def move_to_commited(task_uuid, *args, **kwargs): + sl = data.SL() + item = sl.pop(task_uuid) + if item: + commited = data.CD() + staged_data = patch(item.diff, commited.get(item.res, {})) + cl = data.CL() + item.state = data.STATES.success + cl.append(item) + commited[item.res] = staged_data diff --git a/solar/solar/system_log/tasks.py b/solar/solar/system_log/tasks.py new file mode 100644 index 00000000..7f715bd7 --- /dev/null +++ b/solar/solar/system_log/tasks.py @@ -0,0 +1,16 @@ + + +from solar.orchestration.runner import app +from solar.system_log.operations import set_error, move_to_commited + +__all__ = ['error_logitem', 'commit_logitem'] + + +@app.task +def error_logitem(task_uuid): + return set_error(task_uuid.rsplit(':', 1)[-1]) + + +@app.task +def commit_logitem(task_uuid): + return move_to_commited(task_uuid.rsplit(':', 1)[-1])