Merge pull request #31 from Mirantis/system_log

System log refactoring
This commit is contained in:
Dmitry Shulyak 2015-07-18 08:57:20 +03:00
commit 426174e563
18 changed files with 492 additions and 428 deletions

View File

@ -10,9 +10,9 @@
- shell: celery multi kill 2 - shell: celery multi kill 2
chdir={{celery_dir}} chdir={{celery_dir}}
tags: [stop] tags: [stop]
- shell: celery multi start 2 -A solar.orchestration.tasks -Q:1 celery,scheduler -Q:2 celery,{{hostname.stdout}} - shell: celery multi start 2 -A solar.orchestration.runner -Q:1 scheduler,system_log -Q:2 celery,{{hostname.stdout}}
chdir={{celery_dir}} chdir={{celery_dir}}
tags: [master] tags: [master]
- shell: celery multi start 1 -A solar.orchestration.tasks -Q:1 celery,{{hostname.stdout}} - shell: celery multi start 1 -A solar.orchestration.runner -Q:1 {{hostname.stdout}}
chdir={{celery_dir}} chdir={{celery_dir}}
tags: [slave] tags: [slave]

View File

@ -28,8 +28,6 @@ import tabulate
import yaml import yaml
from solar import utils from solar import utils
from solar import operations
from solar import state
from solar.core import actions from solar.core import actions
from solar.core import resource as sresource from solar.core import resource as sresource
from solar.core.resource import assign_resources_to_nodes from solar.core.resource import assign_resources_to_nodes
@ -38,8 +36,11 @@ from solar.core.tags_set_parser import Expression
from solar.core import testing from solar.core import testing
from solar.core.resource import virtual_resource as vr from solar.core.resource import virtual_resource as vr
from solar.interfaces.db import get_db from solar.interfaces.db import get_db
from solar import errors
from solar.core.log import log
from solar.cli.orch import orchestration from solar.cli.orch import orchestration
from solar.cli.system_log import changes
# NOTE: these are extensions, they shouldn't be imported here # NOTE: these are extensions, they shouldn't be imported here
# Maybe each extension can also extend the CLI with parsers # Maybe each extension can also extend the CLI with parsers
@ -146,54 +147,6 @@ def init_actions():
actions.resource_action(resource_obj, action) actions.resource_action(resource_obj, action)
def init_changes():
@main.group()
def changes():
pass
@changes.command()
def validate():
errors = vr.validate_resources()
if errors:
for r, error in errors:
print 'ERROR: %s: %s' % (r.name, error)
sys.exit(1)
@changes.command()
def stage():
log = operations.stage_changes()
click.echo(log.show())
@changes.command()
@click.option('--one', is_flag=True, default=False)
def commit(one):
if one:
operations.commit_one()
else:
operations.commit_changes()
@changes.command()
@click.option('--limit', default=5)
def history(limit):
click.echo(state.CL().show())
@changes.command()
@click.option('--last', is_flag=True, default=False)
@click.option('--all', is_flag=True, default=False)
@click.option('--uid', default=None)
def rollback(last, all, uid):
if last:
click.echo(operations.rollback_last())
elif all:
click.echo(operations.rollback_all())
elif uid:
click.echo(operations.rollback_uid(uid))
@changes.command()
def test():
testing.test_all()
def init_cli_connect(): def init_cli_connect():
@main.command() @main.command()
@click.argument('emitter') @click.argument('emitter')
@ -289,8 +242,13 @@ def init_cli_resource():
click.echo( click.echo(
'action {} for resource {}'.format(action, resource) 'action {} for resource {}'.format(action, resource)
) )
actions.resource_action(sresource.load(resource), action)
r = sresource.load(resource)
try:
actions.resource_action(r, action)
except errors.SolarError as e:
log.debug(e)
sys.exit(1)
@resource.command() @resource.command()
def compile_all(): def compile_all():
@ -314,7 +272,8 @@ def init_cli_resource():
@resource.command() @resource.command()
@click.argument('name') @click.argument('name')
@click.argument('base_path', type=click.Path(exists=True, file_okay=True)) @click.argument(
'base_path', type=click.Path(exists=True, resolve_path=True))
@click.argument('args', nargs=-1) @click.argument('args', nargs=-1)
def create(args, base_path, name): def create(args, base_path, name):
args_parsed = {} args_parsed = {}
@ -425,13 +384,13 @@ def init_cli_resource():
def run(): def run():
init_actions() init_actions()
init_changes()
init_cli_connect() init_cli_connect()
init_cli_connections() init_cli_connections()
init_cli_deployment_config() init_cli_deployment_config()
init_cli_resource() init_cli_resource()
main.add_command(orchestration) main.add_command(orchestration)
main.add_command(changes)
main() main()

View File

@ -21,17 +21,20 @@ def orchestration():
restart <id> --reset restart <id> --reset
""" """
@orchestration.command() @orchestration.command()
@click.argument('plan', type=click.File('rb')) @click.argument('plan', type=click.File('rb'))
def create(plan): def create(plan):
click.echo(graph.create_plan(plan.read())) click.echo(graph.create_plan(plan.read()))
@orchestration.command() @orchestration.command()
@click.argument('uid') @click.argument('uid')
@click.argument('plan', type=click.File('rb')) @click.argument('plan', type=click.File('rb'))
def update(uid, plan): def update(uid, plan):
graph.update_plan(uid, plan.read()) graph.update_plan(uid, plan.read())
@orchestration.command() @orchestration.command()
@click.argument('uid') @click.argument('uid')
def report(uid): def report(uid):
@ -102,5 +105,14 @@ def dg(uid):
for n in plan: for n in plan:
color = colors[plan.node[n]['status']] color = colors[plan.node[n]['status']]
plan.node[n]['color'] = color plan.node[n]['color'] = color
nx.write_dot(plan, 'graph.dot') nx.write_dot(plan, '{name}.dot'.format(name=plan.graph['name']))
subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png']) subprocess.call(
'tred {name}.dot | dot -Tpng -o {name}.png'.format(name=plan.graph['name']),
shell=True)
click.echo('Created {name}.png'.format(name=plan.graph['name']))
@orchestration.command()
@click.argument('uid')
def show(uid):
click.echo(graph.show(uid))

View File

@ -0,0 +1,66 @@
import sys
import click
from solar.core import testing
from solar.core import resource
from solar.system_log import change
from solar.system_log import operations
from solar.system_log import data
@click.group()
def changes():
pass
@changes.command()
def validate():
errors = resource.validate_resources()
if errors:
for r, error in errors:
print 'ERROR: %s: %s' % (r.name, error)
sys.exit(1)
@changes.command()
def stage():
log = change.stage_changes()
staged = list(log.reverse())
if not staged:
click.echo('No changes')
click.echo(staged)
@changes.command()
def process():
click.echo(change.send_to_orchestration())
@changes.command()
@click.argument('uid')
def commit(uid):
operations.commit(uid)
@changes.command()
@click.option('-n', default=5)
def history(n):
commited = list(data.CL().collection(n))
if not commited:
click.echo('No history.')
return
commited.reverse()
click.echo(commited)
@changes.command()
def test():
testing.test_all()
@changes.command(name='clean-history')
def clean_history():
data.CL().clean()
data.CD().clean()

View File

@ -1,11 +1,16 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from fabric import api as fabric_api from fabric import api as fabric_api
from fabric.state import env
import os import os
from solar.core.log import log from solar.core.log import log
from solar.core.handlers.base import TempFileHandler from solar.core.handlers.base import TempFileHandler
from solar import errors
# otherwise fabric will sys.exit(1) in case of errors
env.warn_only = True
class AnsibleTemplate(TempFileHandler): class AnsibleTemplate(TempFileHandler):
def action(self, resource, action_name): def action(self, resource, action_name):
inventory_file = self._create_inventory(resource) inventory_file = self._create_inventory(resource)
@ -15,12 +20,10 @@ class AnsibleTemplate(TempFileHandler):
call_args = ['ansible-playbook', '--module-path', '/vagrant/library', '-i', inventory_file, playbook_file] call_args = ['ansible-playbook', '--module-path', '/vagrant/library', '-i', inventory_file, playbook_file]
log.debug('EXECUTING: %s', ' '.join(call_args)) log.debug('EXECUTING: %s', ' '.join(call_args))
try: out = fabric_api.local(' '.join(call_args), capture=True)
fabric_api.local(' '.join(call_args)) if out.failed:
except Exception as e: raise errors.SolarError(out)
log.error(e.output)
log.exception(e)
raise
def _create_inventory(self, r): def _create_inventory(self, r):
directory = self.dirs[r.name] directory = self.dirs[r.name]

View File

@ -7,6 +7,7 @@ __all__ = [
'load_all', 'load_all',
'prepare_meta', 'prepare_meta',
'wrap_resource', 'wrap_resource',
'validate_resources',
] ]
@ -18,3 +19,4 @@ from solar.core.resource.resource import load_all
from solar.core.resource.resource import wrap_resource from solar.core.resource.resource import wrap_resource
from solar.core.resource.virtual_resource import create from solar.core.resource.virtual_resource import create
from solar.core.resource.virtual_resource import prepare_meta from solar.core.resource.virtual_resource import prepare_meta
from solar.core.resource.virtual_resource import validate_resources

View File

@ -41,9 +41,11 @@ def create_virtual_resource(vr_name, template):
resources = template['resources'] resources = template['resources']
connections = [] connections = []
created_resources = [] created_resources = []
cwd = os.getcwd()
for resource in resources: for resource in resources:
name = resource['id'] name = resource['id']
base_path = resource['from'] base_path = os.path.join(cwd, resource['from'])
args = resource['values'] args = resource['values']
new_resources = create(name, base_path, args, vr_name) new_resources = create(name, base_path, args, vr_name)
created_resources += new_resources created_resources += new_resources

View File

@ -67,6 +67,9 @@ class RedisDB(object):
def clear(self): def clear(self):
self._r.flushdb() self._r.flushdb()
def get_set(self, collection):
return OrderedSet(self._r, collection)
def clear_collection(self, collection=COLLECTIONS.resource): def clear_collection(self, collection=COLLECTIONS.resource):
key_glob = self._make_key(collection, '*') key_glob = self._make_key(collection, '*')
@ -83,6 +86,57 @@ class RedisDB(object):
return '{0}:{1}'.format(collection, _id) return '{0}:{1}'.format(collection, _id)
class OrderedSet(object):
def __init__(self, client, collection):
self.r = client
self.collection = collection
self.order_counter = '{}:incr'.format(collection)
self.order = '{}:order'.format(collection)
def add(self, items):
pipe = self.r.pipeline()
for key, value in items:
count = self.r.incr(self.order_counter)
pipe.zadd(self.order, count, key)
pipe.hset(self.collection, key, json.dumps(value))
pipe.execute()
def rem(self, keys):
pipe = self.r.pipeline()
for key in keys:
pipe.zrem(self.order, key)
pipe.hdel(self.collection, key)
pipe.execute()
def get(self, key):
value = self.r.hget(self.collection, key)
if value:
return json.loads(value)
return None
def update(self, key, value):
self.r.hset(self.collection, key, json.dumps(value))
def clean(self):
self.rem(self.r.zrange(self.order, 0, -1))
def rem_left(self, n=1):
self.rem(r.zrevrange(self.order, 0, n-1))
def reverse(self, n=1):
result = []
for key in self.r.zrevrange(self.order, 0, n-1):
result.append(self.get(key))
return result
def list(self, n=0):
result = []
for key in self.r.zrange(self.order, 0, n-1):
result.append(self.get(key))
return result
class FakeRedisDB(RedisDB): class FakeRedisDB(RedisDB):
REDIS_CLIENT = fakeredis.FakeStrictRedis REDIS_CLIENT = fakeredis.FakeStrictRedis

View File

@ -1,197 +0,0 @@
from solar import state
from solar.core.log import log
from solar.core import signals
from solar.core import resource
from solar import utils
from solar.interfaces.db import get_db
from solar.core import actions
db = get_db()
from dictdiffer import diff, patch, revert
from fabric import api as fabric_api
import networkx as nx
def guess_action(from_, to):
# TODO(dshulyak) it should be more flexible
if not from_:
return 'run'
elif not to:
return 'remove'
else:
# it should be update
return 'update'
def connections(res, graph):
result = []
for pred in graph.predecessors(res.name):
for num, edge in graph.get_edge_data(pred, res.name).items():
if 'label' in edge:
if ':' in edge['label']:
parent, child = edge['label'].split(':')
mapping = [parent, child]
else:
mapping = [edge['label'], edge['label']]
else:
mapping = None
result.append([pred, res.name, mapping])
return result
def to_dict(resource, graph):
res = resource.to_dict()
res['connections'] = connections(resource, graph)
return res
def create_diff(staged, commited):
if 'connections' in commited:
commited['connections'].sort()
staged['connections'].sort()
if 'tags' in commited:
commited['tags'].sort()
staged['tags'].sort()
return list(diff(commited, staged))
def _stage_changes(staged_resources, conn_graph,
commited_resources, staged_log):
try:
srt = nx.topological_sort(conn_graph)
except:
for cycle in nx.simple_cycles(conn_graph):
log.debug('CYCLE: %s', cycle)
raise
for res_uid in srt:
commited_data = commited_resources.get(res_uid, {})
staged_data = staged_resources.get(res_uid, {})
df = create_diff(staged_data, commited_data)
if df:
log_item = state.LogItem(
utils.generate_uuid(),
res_uid,
df,
guess_action(commited_data, staged_data))
staged_log.append(log_item)
return staged_log
def stage_changes():
conn_graph = signals.detailed_connection_graph()
staged = {r.name: to_dict(r, conn_graph) for r in resource.load_all().values()}
commited = state.CD()
log = state.SL()
log.delete()
return _stage_changes(staged, conn_graph, commited, log)
def execute(res, action):
try:
actions.resource_action(res, action)
return state.STATES.success
except Exception as e:
return state.STATES.error
def commit(li, resources, commited, history):
staged_res = resources[li.res]
staged_data = patch(li.diff, commited.get(li.res, {}))
# TODO(dshulyak) think about this hack for update
if li.action == 'update':
commited_res = resource.wrap_resource(
commited[li.res]['metadata'])
result_state = execute(commited_res, 'remove')
staged_res.set_args_from_dict(staged_data['input'])
if result_state is state.STATES.success:
result_state = execute(staged_res, 'run')
else:
result_state = execute(staged_res, li.action)
# resource_action return None in case there is no actions
result_state = result_state or state.STATES.success
commited[li.res] = staged_data
li.state = result_state
history.append(li)
if result_state is state.STATES.error:
raise Exception('Failed')
def commit_one():
commited = state.CD()
history = state.CL()
staged = state.SL()
resources = resource.load_all()
commit(staged.popleft(), resources, commited, history)
def commit_changes():
# just shortcut to test stuff
commited = state.CD()
history = state.CL()
staged = state.SL()
resources = resource.load_all()
while staged:
commit(staged.popleft(), resources, commited, history)
def rollback(log_item):
log = state.SL()
resources = resource.load_all()
commited = state.CD()[log_item.res]
staged = revert(log_item.diff, commited)
for e, r, mapping in commited.get('connections', ()):
signals.disconnect(resources[e], resources[r])
for e, r, mapping in staged.get('connections', ()):
signals.connect(resources[e], resources[r], dict([mapping]))
df = create_diff(staged, commited)
log_item = state.LogItem(
utils.generate_uuid(),
log_item.res, df, guess_action(commited, staged))
log.append(log_item)
res = resource.load(log_item.res)
res.set_args_from_dict(staged['input'])
return log_item
def rollback_uid(uid):
item = next(l for l in state.CL() if l.uid == uid)
return rollback(item)
def rollback_last():
l = state.CL().items[-1]
return rollback(l)
def rollback_all():
cl = state.CL()
while cl:
rollback(cl.pop())

View File

@ -7,6 +7,8 @@ import networkx as nx
import redis import redis
import yaml import yaml
from solar import utils
r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1)
@ -47,13 +49,35 @@ def parse_plan(plan_data):
return dg return dg
def create_plan_from_graph(dg):
dg.graph['uid'] = "{0}:{1}".format(dg.graph['name'], str(uuid.uuid4()))
save_graph(dg.graph['uid'], dg)
return dg.graph['uid']
def show(uid):
dg = get_graph(uid)
result = {}
tasks = []
result['uid'] = dg.graph['uid']
result['name'] = dg.graph['name']
for n in nx.topological_sort(dg):
data = dg.node[n]
tasks.append(
{'uid': n,
'parameters': data,
'before': dg.successors(n),
'after': dg.predecessors(n)
})
result['tasks'] = tasks
return utils.yaml_dump(result)
def create_plan(plan_data): def create_plan(plan_data):
""" """
""" """
dg = parse_plan(plan_data) dg = parse_plan(plan_data)
dg.graph['uid'] = "{0}:{1}".format(dg.graph['name'], str(uuid.uuid4())) return create_plan_from_graph(dg)
save_graph(dg.graph['uid'], dg)
return dg.graph['uid']
def update_plan(uid, plan_data): def update_plan(uid, plan_data):

View File

@ -0,0 +1,11 @@
from celery import Celery
app = Celery(
include=['solar.system_log.tasks', 'solar.orchestration.tasks'],
backend='redis://10.0.0.2:6379/1',
broker='redis://10.0.0.2:6379/1')
app.conf.update(CELERY_ACCEPT_CONTENT = ['json'])
app.conf.update(CELERY_TASK_SERIALIZER = 'json')

View File

@ -1,12 +1,9 @@
from functools import partial, wraps from functools import partial, wraps
from itertools import islice from itertools import islice
import subprocess import subprocess
import time import time
from celery import Celery
from celery.app import task from celery.app import task
from celery import group from celery import group
from celery.exceptions import Ignore from celery.exceptions import Ignore
@ -15,28 +12,30 @@ import redis
from solar.orchestration import graph from solar.orchestration import graph
from solar.core import actions from solar.core import actions
from solar.core import resource from solar.core import resource
from solar.system_log.tasks import commit_logitem, error_logitem
from solar.orchestration.runner import app
app = Celery(
'tasks',
backend='redis://10.0.0.2:6379/1',
broker='redis://10.0.0.2:6379/1')
app.conf.update(CELERY_ACCEPT_CONTENT = ['json'])
app.conf.update(CELERY_TASK_SERIALIZER = 'json')
r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1)
__all__ = ['solar_resource', 'cmd', 'sleep',
'error', 'fault_tolerance', 'schedule_start', 'schedule_next']
# NOTE(dshulyak) i am not using celery.signals because it is not possible
# to extrace task_id from *task_success* signal
class ReportTask(task.Task): class ReportTask(task.Task):
def on_success(self, retval, task_id, args, kwargs): def on_success(self, retval, task_id, args, kwargs):
schedule_next.apply_async(args=[task_id, 'SUCCESS'], queue='scheduler') schedule_next.apply_async(args=[task_id, 'SUCCESS'], queue='scheduler')
commit_logitem.apply_async(args=[task_id], queue='system_log')
def on_failure(self, exc, task_id, args, kwargs, einfo): def on_failure(self, exc, task_id, args, kwargs, einfo):
schedule_next.apply_async( schedule_next.apply_async(
args=[task_id, 'ERROR'], args=[task_id, 'ERROR'],
kwargs={'errmsg': str(einfo.exception)}, kwargs={'errmsg': str(einfo.exception)},
queue='scheduler') queue='scheduler')
error_logitem.apply_async(args=[task_id], queue='system_log')
report_task = partial(app.task, base=ReportTask, bind=True) report_task = partial(app.task, base=ReportTask, bind=True)
@ -106,7 +105,6 @@ def anchor(ctxt, *args):
def schedule(plan_uid, dg): def schedule(plan_uid, dg):
next_tasks = list(traverse(dg)) next_tasks = list(traverse(dg))
graph.save_graph(plan_uid, dg) graph.save_graph(plan_uid, dg)
print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks)
group(next_tasks)() group(next_tasks)()

View File

@ -1,152 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import collections
from collections import deque
from functools import partial
from solar import utils
from enum import Enum
from solar.interfaces.db import get_db
db = get_db()
STATES = Enum('States', 'error inprogress pending success')
def state_file(name):
if 'log' in name:
return Log(name)
elif 'data' in name:
return Data(name)
CD = partial(state_file, 'commited_data')
SD = partial(state_file, 'staged_data')
SL = partial(state_file, 'stage_log')
IL = partial(state_file, 'inprogress_log')
CL = partial(state_file, 'commit_log')
class LogItem(object):
def __init__(self, uid, res_uid, diff, action, state=None):
self.uid = uid
self.res = res_uid
self.diff = diff
self.state = state or STATES.pending
self.action = action
def to_yaml(self):
return utils.yaml_dump(self.to_dict())
def to_dict(self):
return {'uid': self.uid,
'res': self.res,
'diff': self.diff,
'state': self.state.name,
'action': self.action}
def __str__(self):
return self.to_yaml()
def __repr__(self):
return self.to_yaml()
class Log(object):
def __init__(self, path):
self.path = path
items = []
r = db.read(path, collection=db.COLLECTIONS.state_log)
if r:
items = r or items
self.items = deque([LogItem(
l['uid'], l['res'],
l['diff'], l['action'],
getattr(STATES, l['state'])) for l in items])
def delete(self):
self.items = deque()
db.delete(self.path, db.COLLECTIONS.state_log)
def sync(self):
db.save(
self.path,
[i.to_dict() for i in self.items],
collection=db.COLLECTIONS.state_log
)
def append(self, logitem):
self.items.append(logitem)
self.sync()
def popleft(self):
item = self.items.popleft()
self.sync()
return item
def pop(self):
item = self.items.pop()
self.sync()
return item
def show(self, verbose=False):
return ['L(uuid={0}, res={1}, action={2})'.format(
l.uid, l.res, l.action) for l in self.items]
def __len__(self):
return len(self.items)
def __repr__(self):
return 'Log({0})'.format(self.path)
def __iter__(self):
return iter(self.items)
def __nonzero__(self):
return bool(self.items)
class Data(collections.MutableMapping):
def __init__(self, path):
self.path = path
self.store = {}
r = db.read(path, collection=db.COLLECTIONS.state_data)
if r:
self.store = r or self.store
def __getitem__(self, key):
return self.store[key]
def __setitem__(self, key, value):
self.store[key] = value
db.save(self.path, self.store, collection=db.COLLECTIONS.state_data)
def __delitem__(self, key):
self.store.pop(key)
db.save(self.path, self.store, collection=db.COLLECTIONS.state_data)
def __iter__(self):
return iter(self.store)
def __len__(self):
return len(self.store)

View File

View File

@ -0,0 +1,113 @@
from dictdiffer import diff, patch, revert
import networkx as nx
from solar.core.log import log
from solar.core import signals
from solar.core import resource
from solar import utils
from solar.interfaces.db import get_db
from solar.core import actions
from solar.system_log import data
from solar.orchestration import graph
db = get_db()
def guess_action(from_, to):
# NOTE(dshulyak) imo the way to solve this - is dsl for orchestration,
# something where this action will be excplicitly specified
if not from_:
return 'run'
elif not to:
return 'remove'
else:
return 'update'
def connections(res, graph):
result = []
for pred in graph.predecessors(res.name):
for num, edge in graph.get_edge_data(pred, res.name).items():
if 'label' in edge:
if ':' in edge['label']:
parent, child = edge['label'].split(':')
mapping = [parent, child]
else:
mapping = [edge['label'], edge['label']]
else:
mapping = None
result.append([pred, res.name, mapping])
return result
def create_diff(staged, commited):
return list(diff(commited, staged))
def _stage_changes(staged_resources, conn_graph,
commited_resources, staged_log):
try:
srt = nx.topological_sort(conn_graph)
except:
for cycle in nx.simple_cycles(conn_graph):
log.debug('CYCLE: %s', cycle)
raise
for res_uid in srt:
commited_data = commited_resources.get(res_uid, {})
staged_data = staged_resources.get(res_uid, {})
df = create_diff(staged_data, commited_data)
if df:
log_item = data.LogItem(
utils.generate_uuid(),
res_uid,
df,
guess_action(commited_data, staged_data))
staged_log.append(log_item)
return staged_log
def stage_changes():
log = data.SL()
log.clean()
conn_graph = signals.detailed_connection_graph()
staged = {r.name: r.args_show()
for r in resource.load_all().values()}
commited = data.CD()
return _stage_changes(staged, conn_graph, commited, log)
def send_to_orchestration():
conn_graph = signals.detailed_connection_graph()
dg = nx.DiGraph()
staged = {r.name: r.args_show()
for r in resource.load_all().values()}
commited = data.CD()
for res_uid in conn_graph:
commited_data = commited.get(res_uid, {})
staged_data = staged.get(res_uid, {})
df = create_diff(staged_data, commited_data)
if df:
dg.add_node(
res_uid, status='PENDING',
errmsg=None,
**parameters(res_uid, guess_action(commited_data, staged_data)))
for pred in conn_graph.predecessors(res_uid):
dg.add_edge(pred, res_uid)
# what it should be?
dg.graph['name'] = 'system_log'
return graph.create_plan_from_graph(dg)
def parameters(res, action):
return {'args': [res, action],
'type': 'solar_resource'}

View File

@ -0,0 +1,129 @@
import os
import collections
from collections import deque
from functools import partial
from solar import utils
from solar.interfaces.db import get_db
from enum import Enum
db = get_db()
STATES = Enum('States', 'error inprogress pending success')
def state_file(name):
if 'log' in name:
return Log(name)
elif 'data' in name:
return Data(name)
CD = partial(state_file, 'commited_data')
SL = partial(state_file, 'stage_log')
CL = partial(state_file, 'commit_log')
class LogItem(object):
def __init__(self, uid, res, diff, action, state=None):
self.uid = uid
self.res = res
self.diff = diff
self.state = state or STATES.pending
self.action = action
def to_yaml(self):
return utils.yaml_dump(self.to_dict())
def to_dict(self):
return {'uid': self.uid,
'res': self.res,
'diff': self.diff,
'state': self.state.name,
'action': self.action}
@classmethod
def from_dict(cls, **kwargs):
state = getattr(STATES, kwargs.get('state', ''), STATES.pending)
kwargs['state'] = state
return cls(**kwargs)
def __str__(self):
return self.to_yaml()
def __repr__(self):
return self.to_yaml()
class Log(object):
def __init__(self, path):
self.ordered_log = db.get_set(path)
def append(self, logitem):
self.ordered_log.add([(logitem.res, logitem.to_dict())])
def pop(self, uid):
item = self.get(uid)
if not item:
return None
self.ordered_log.rem([uid])
return item
def update(self, logitem):
self.ordered_log.update(logitem.res, logitem.to_dict())
def clean(self):
self.ordered_log.clean()
def get(self, key):
item = self.ordered_log.get(key)
if item:
return LogItem.from_dict(**item)
return None
def collection(self, n=0):
for item in self.ordered_log.reverse(n=n):
yield LogItem.from_dict(**item)
def reverse(self, n=0):
for item in self.ordered_log.list(n=n):
yield LogItem.from_dict(**item)
def __iter__(self):
return iter(self.collection())
class Data(collections.MutableMapping):
def __init__(self, path):
self.path = path
self.store = {}
r = db.read(path, collection=db.COLLECTIONS.state_data)
if r:
self.store = r or self.store
def __getitem__(self, key):
return self.store[key]
def __setitem__(self, key, value):
self.store[key] = value
db.save(self.path, self.store, collection=db.COLLECTIONS.state_data)
def __delitem__(self, key):
self.store.pop(key)
db.save(self.path, self.store, collection=db.COLLECTIONS.state_data)
def __iter__(self):
return iter(self.store)
def __len__(self):
return len(self.store)
def clean(self):
db.save(self.path, {}, collection=db.COLLECTIONS.state_data)

View File

@ -0,0 +1,24 @@
from solar.system_log import data
from dictdiffer import patch
def set_error(task_uuid, *args, **kwargs):
sl = data.SL()
item = sl.get(task_uuid)
if item:
item.state = data.STATES.error
sl.update(item)
def move_to_commited(task_uuid, *args, **kwargs):
sl = data.SL()
item = sl.pop(task_uuid)
if item:
commited = data.CD()
staged_data = patch(item.diff, commited.get(item.res, {}))
cl = data.CL()
item.state = data.STATES.success
cl.append(item)
commited[item.res] = staged_data

View File

@ -0,0 +1,16 @@
from solar.orchestration.runner import app
from solar.system_log.operations import set_error, move_to_commited
__all__ = ['error_logitem', 'commit_logitem']
@app.task
def error_logitem(task_uuid):
return set_error(task_uuid.rsplit(':', 1)[-1])
@app.task
def commit_logitem(task_uuid):
return move_to_commited(task_uuid.rsplit(':', 1)[-1])