Merge pull request #86 from xarses/redis

Redis
This commit is contained in:
Łukasz Oleś 2015-06-10 12:47:40 +02:00
commit 16dc68ec2f
17 changed files with 542 additions and 363 deletions

236
cli.py
View File

@ -1,236 +0,0 @@
#!/usr/bin/env python
import click
import json
#import matplotlib
#matplotlib.use('Agg') # don't show windows
#import matplotlib.pyplot as plt
import networkx as nx
import os
import subprocess
from solar.core import actions as xa
from solar.core import resource as xr
from solar.core import signals as xs
from solar import operations
from solar import state
from solar.interfaces.db import get_db
db = get_db()
@click.group()
def cli():
pass
def init_cli_resource():
@click.group()
def resource():
pass
cli.add_command(resource)
@click.command()
@click.argument('resource_path')
@click.argument('action_name')
def action(action_name, resource_path):
print 'action', resource_path, action_name
r = xr.load(resource_path)
xa.resource_action(r, action_name)
resource.add_command(action)
@click.command()
@click.argument('name')
@click.argument('base_path')
@click.argument('dest_path')
@click.argument('args')
def create(args, dest_path, base_path, name):
print 'create', name, base_path, dest_path, args
args = json.loads(args)
xr.create(name, base_path, dest_path, args)
resource.add_command(create)
@click.command()
@click.argument('resource_path')
@click.argument('tag_name')
@click.option('--add/--delete', default=True)
def tag(add, tag_name, resource_path):
print 'Tag', resource_path, tag_name, add
r = xr.load(resource_path)
if add:
r.add_tag(tag_name)
else:
r.remove_tag(tag_name)
r.save()
resource.add_command(tag)
@click.command()
@click.argument('path')
@click.option('--all/--one', default=False)
@click.option('--tag', default=None)
@click.option('--use-json/--no-use-json', default=False)
def show(use_json, tag, all, path):
import json
import six
printer = lambda r: six.print_(r)
if use_json:
printer = lambda r: six.print_(json.dumps(r.to_dict()))
if all or tag:
for name, resource in xr.load_all(path).items():
show = True
if tag:
if tag not in resource.tags:
show = False
if show:
printer(resource)
else:
printer(xr.load(path))
resource.add_command(show)
@click.command()
@click.argument('name')
@click.argument('args')
def update(name, args):
args = json.loads(args)
all = xr.load_all()
r = all[name]
r.update(args)
resource.add_command(update)
def init_cli_connect():
@click.command()
@click.argument('emitter')
@click.argument('receiver')
@click.option('--mapping', default=None)
def connect(mapping, receiver, emitter):
print 'Connect', emitter, receiver
emitter = db.get_obj_resource(emitter)
receiver = db.get_obj_resource(receiver)
print emitter
print receiver
if mapping is not None:
mapping = json.loads(mapping)
xs.connect(emitter, receiver, mapping=mapping)
cli.add_command(connect)
@click.command()
@click.argument('emitter')
@click.argument('receiver')
def disconnect(receiver, emitter):
print 'Disconnect', emitter, receiver
emitter = db.get_obj_resource(emitter)
receiver = db.get_obj_resource(receiver)
print emitter
print receiver
xs.disconnect(emitter, receiver)
cli.add_command(disconnect)
def init_changes():
@click.group()
def changes():
pass
cli.add_command(changes)
@click.command()
def stage():
log = operations.stage_changes()
print log.show()
changes.add_command(stage)
@click.command()
@click.option('--one', is_flag=True, default=False)
def commit(one):
if one:
operations.commit_one()
else:
operations.commit_changes()
changes.add_command(commit)
@click.command()
@click.option('--limit', default=5)
def history(limit):
print state.CL().show()
changes.add_command(history)
@click.command()
@click.option('--last', is_flag=True, default=False)
@click.option('--all', is_flag=True, default=False)
@click.option('--uid', default=None)
def rollback(last, all, uid):
if last:
print operations.rollback_last()
elif all:
print operations.rollback_all()
elif uid:
print operations.rollback_uid(uid)
changes.add_command(rollback)
def init_cli_connections():
@click.group()
def connections():
pass
cli.add_command(connections)
@click.command()
def show():
print json.dumps(xs.CLIENTS, indent=2)
connections.add_command(show)
# TODO: this requires graphing libraries
@click.command()
def graph():
#g = xs.connection_graph()
g = xs.detailed_connection_graph()
nx.write_dot(g, 'graph.dot')
subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png'])
# Matplotlib
#pos = nx.spring_layout(g)
#nx.draw_networkx_nodes(g, pos)
#nx.draw_networkx_edges(g, pos, arrows=True)
#nx.draw_networkx_labels(g, pos)
#plt.axis('off')
#plt.savefig('graph.png')
connections.add_command(graph)
def init_cli_deployment_config():
@click.command()
@click.argument('filepath')
def deploy(filepath):
print 'Deploying from file {}'.format(filepath)
xd.deploy(filepath)
cli.add_command(deploy)
if __name__ == '__main__':
init_cli_resource()
init_cli_connect()
init_cli_connections()
init_cli_deployment_config()
init_changes()
cli()

View File

@ -252,7 +252,7 @@ def deploy():
def undeploy(): def undeploy():
db = get_db() db = get_db()
resources = map(resource.wrap_resource, db.get_list('resource')) resources = map(resource.wrap_resource, db.get_list(collection=db.COLLECTIONS.resource))
resources = {r.name: r for r in resources} resources = {r.name: r for r in resources}
actions.resource_action(resources['glance_api_endpoint'], 'remove') actions.resource_action(resources['glance_api_endpoint'], 'remove')

View File

@ -1 +1,4 @@
clients-data-file: /tmp/connections.yaml clients-data-file: /tmp/connections.yaml
file-system-db:
storage-path: /tmp/storage

View File

@ -3,6 +3,9 @@
- hosts: all - hosts: all
sudo: yes sudo: yes
tasks: tasks:
- apt: name=redis-server state=present
- apt: name=python-redis state=present
# Setup additional development tools # Setup additional development tools
- apt: name=vim state=present - apt: name=vim state=present
- apt: name=tmux state=present - apt: name=tmux state=present

View File

@ -7,3 +7,4 @@ requests==2.7.0
mock mock
dictdiffer==0.4.0 dictdiffer==0.4.0
enum34==1.0.4 enum34==1.0.4
redis==2.10.3

View File

@ -26,3 +26,6 @@
- {{ bind['value']['src'] }}:{{ bind['value']['dst'] }}:{{ bind['value'].get('mode', 'ro') }} - {{ bind['value']['src'] }}:{{ bind['value']['dst'] }}:{{ bind['value'].get('mode', 'ro') }}
{% endfor %} {% endfor %}
{% endif %} {% endif %}
- name: wait for glance api
wait_for: host={{ ip }} port=9393 timeout=20

View File

@ -27,3 +27,6 @@
- {{ bind['value']['src'] }}:{{ bind['value']['dst'] }}:{{ bind['value'].get('mode', 'ro') }} - {{ bind['value']['src'] }}:{{ bind['value']['dst'] }}:{{ bind['value'].get('mode', 'ro') }}
{% endfor %} {% endfor %}
{% endif %} {% endif %}
- name: wait for glance registry
wait_for: host={{ ip }} port=9191 timeout=20

View File

@ -46,4 +46,4 @@ setup(
include_package_data=True, include_package_data=True,
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'solar = solar.cli:main']}) 'solar = solar.cli:run']})

View File

@ -17,17 +17,22 @@
On create "golden" resource should be moved to special place On create "golden" resource should be moved to special place
""" """
import argparse import click
import json
import networkx as nx
import os import os
import sys
import pprint import pprint
import subprocess
import textwrap
import yaml import yaml
from solar import utils from solar import utils
from solar import operations
from solar import state
from solar.core import actions
from solar.core import resource as sresource
from solar.core.resource import assign_resources_to_nodes from solar.core.resource import assign_resources_to_nodes
from solar.core.resource import connect_resources from solar.core.resource import connect_resources
from solar.core import signals
from solar.core.tags_set_parser import Expression from solar.core.tags_set_parser import Expression
from solar.interfaces.db import get_db from solar.interfaces.db import get_db
@ -36,98 +41,19 @@ from solar.interfaces.db import get_db
from solar.extensions.modules.discovery import Discovery from solar.extensions.modules.discovery import Discovery
class Cmd(object): db = get_db()
def __init__(self):
self.parser = argparse.ArgumentParser(
description=textwrap.dedent(__doc__),
formatter_class=argparse.RawDescriptionHelpFormatter)
self.subparser = self.parser.add_subparsers(
title='actions',
description='Supported actions',
help='Provide of one valid actions')
self.register_actions()
self.db = get_db()
def parse(self, args): @click.group()
parsed = self.parser.parse_args(args) def main():
return parsed.func(parsed) pass
def register_actions(self):
parser = self.subparser.add_parser('discover') @main.command()
parser.set_defaults(func=getattr(self, 'discover')) @click.option('-n', '--nodes')
@click.option('-r', '--resources')
# Profile actions def assign(resources, nodes):
parser = self.subparser.add_parser('profile') def _get_resources_list():
parser.set_defaults(func=getattr(self, 'profile'))
parser.add_argument('-l', '--list', dest='list', action='store_true')
group = parser.add_argument_group('create')
group.add_argument('-c', '--create', dest='create', action='store_true')
group.add_argument('-t', '--tags', nargs='+', default=['env/test_env'])
group.add_argument('-i', '--id', default=utils.generate_uuid())
# Assign
parser = self.subparser.add_parser('assign')
parser.set_defaults(func=getattr(self, 'assign'))
parser.add_argument('-n', '--nodes')
parser.add_argument('-r', '--resources')
# Run action on tags
parser = self.subparser.add_parser('run')
parser.set_defaults(func=getattr(self, 'run'))
parser.add_argument('-t', '--tags')
parser.add_argument('-a', '--action')
# Perform resources connection
parser = self.subparser.add_parser('connect')
parser.set_defaults(func=getattr(self, 'connect'))
parser.add_argument(
'-p',
'--profile')
def run(self, args):
from solar.core import actions
from solar.core import signals
resources = filter(
lambda r: Expression(args.tags, r.get('tags', [])).evaluate(),
self.db.get_list('resource'))
for resource in resources:
resource_obj = self.db.get_obj_resource(resource['id'])
actions.resource_action(resource_obj, args.action)
def profile(self, args):
if args.create:
params = {'tags': args.tags, 'id': args.id}
profile_template_path = os.path.join(
utils.read_config()['template-dir'], 'profile.yml')
data = yaml.load(utils.render_template(profile_template_path, params))
self.db.store('profiles', data)
else:
pprint.pprint(self.db.get_list('profiles'))
def discover(self, args):
Discovery({'id': 'discovery'}).discover()
def connect(self, args):
profile = self.db.get_record('profiles', args.profile)
connect_resources(profile)
def assign(self, args):
nodes = filter(
lambda n: Expression(args.nodes, n.get('tags', [])).evaluate(),
self.db.get_list('nodes'))
resources = filter(
lambda r: Expression(args.resources, r.get('tags', [])).evaluate(),
self._get_resources_list())
print("For {0} nodes assign {1} resources".format(len(nodes), len(resources)))
assign_resources_to_nodes(resources, nodes)
def _get_resources_list(self):
result = [] result = []
for path in utils.find_by_mask(utils.read_config()['resources-files-mask']): for path in utils.find_by_mask(utils.read_config()['resources-files-mask']):
resource = utils.yaml_load(path) resource = utils.yaml_load(path)
@ -137,11 +63,246 @@ class Cmd(object):
return result return result
nodes = filter(
lambda n: Expression(nodes, n.get('tags', [])).evaluate(),
db.get_list('nodes'))
def main(): resources = filter(
api = Cmd() lambda r: Expression(resources, r.get('tags', [])).evaluate(),
api.parse(sys.argv[1:]) _get_resources_list())
print("For {0} nodes assign {1} resources".format(len(nodes), len(resources)))
assign_resources_to_nodes(resources, nodes)
# @main.command()
# @click.option('-p', '--profile')
# def connect(profile):
# profile_ = db.get_record('profiles', profile)
# connect_resources(profile_)
@main.command()
def discover():
Discovery({'id': 'discovery'}).discover()
@main.command()
@click.option('-c', '--create', default=False, is_flag=True)
@click.option('-t', '--tags', multiple=True)
@click.option('-i', '--id')
def profile(id, tags, create):
if not id:
id = utils.generate_uuid()
if create:
params = {'tags': tags, 'id': id}
profile_template_path = os.path.join(
utils.read_config()['template-dir'], 'profile.yml')
data = yaml.load(utils.render_template(profile_template_path, params))
db.store('profiles', data)
else:
pprint.pprint(db.get_list('profiles'))
def init_actions():
@main.command()
@click.option('-t', '--tags')
@click.option('-a', '--action')
def run(action, tags):
from solar.core import actions
from solar.core import resource
resources = filter(
lambda r: Expression(tags, r.get('tags', [])).evaluate(),
db.get_list('resource'))
for resource in resources:
resource_obj = sresource.load(resource['id'])
actions.resource_action(resource_obj, action)
def init_changes():
@main.group()
def changes():
pass
@changes.command()
def stage():
log = operations.stage_changes()
print log.show()
@changes.command()
@click.option('--one', is_flag=True, default=False)
def commit(one):
if one:
operations.commit_one()
else:
operations.commit_changes()
@changes.command()
@click.option('--limit', default=5)
def history(limit):
print state.CL().show()
@changes.command()
@click.option('--last', is_flag=True, default=False)
@click.option('--all', is_flag=True, default=False)
@click.option('--uid', default=None)
def rollback(last, all, uid):
if last:
print operations.rollback_last()
elif all:
print operations.rollback_all()
elif uid:
print operations.rollback_uid(uid)
def init_cli_connect():
@main.command()
@click.argument('emitter')
@click.argument('receiver')
@click.option('--mapping', default=None)
def connect(mapping, receiver, emitter):
print 'Connect', emitter, receiver
emitter = sresource.load(emitter)
receiver = sresource.load(receiver)
print emitter
print receiver
if mapping is not None:
mapping = json.loads(mapping)
signals.connect(emitter, receiver, mapping=mapping)
@main.command()
@click.argument('emitter')
@click.argument('receiver')
def disconnect(receiver, emitter):
print 'Disconnect', emitter, receiver
emitter = sresource.load(emitter)
receiver = sresource.load(receiver)
print emitter
print receiver
signals.disconnect(emitter, receiver)
def init_cli_connections():
@main.group()
def connections():
pass
@connections.command()
def show():
print json.dumps(signals.CLIENTS, indent=2)
# TODO: this requires graphing libraries
@connections.command()
def graph():
#g = xs.connection_graph()
g = signals.detailed_connection_graph()
nx.write_dot(g, 'graph.dot')
subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png'])
# Matplotlib
#pos = nx.spring_layout(g)
#nx.draw_networkx_nodes(g, pos)
#nx.draw_networkx_edges(g, pos, arrows=True)
#nx.draw_networkx_labels(g, pos)
#plt.axis('off')
#plt.savefig('graph.png')
def init_cli_deployment_config():
@main.command()
@click.argument('filepath')
def deploy(filepath):
print 'Deploying from file {}'.format(filepath)
xd.deploy(filepath)
def init_cli_resource():
@main.group()
def resource():
pass
@resource.command()
@click.argument('resource_path')
@click.argument('action_name')
def action(action_name, resource_path):
print 'action', resource_path, action_name
r = sresource.load(resource_path)
actions.resource_action(r, action_name)
@resource.command()
@click.argument('name')
@click.argument('base_path')
@click.argument('args')
def create(args, base_path, name):
print 'create', name, base_path, args
args = json.loads(args)
sresource.create(name, base_path, args)
@resource.command()
@click.option('--tag', default=None)
@click.option('--use-json/--no-use-json', default=False)
@click.option('--color/--no-color', default=True)
def show(color, use_json, tag):
resources = []
for name, res in sresource.load_all().items():
show = True
if tag:
if tag not in res.tags:
show = False
if show:
resources.append(res)
if use_json:
output = json.dumps([r.to_dict() for r in resources], indent=2)
else:
if color:
formatter = lambda r: r.color_repr()
else:
formatter = lambda r: unicode(r)
output = '\n'.join(formatter(r) for r in resources)
if output:
click.echo_via_pager(output)
@resource.command()
@click.argument('resource_path')
@click.argument('tag_name')
@click.option('--add/--delete', default=True)
def tag(add, tag_name, resource_path):
print 'Tag', resource_path, tag_name, add
r = sresource.load(resource_path)
if add:
r.add_tag(tag_name)
else:
r.remove_tag(tag_name)
r.save()
@resource.command()
@click.argument('name')
@click.argument('args')
def update(name, args):
args = json.loads(args)
all = sresource.load_all()
r = all[name]
r.update(args)
def run():
init_actions()
init_changes()
init_cli_connect()
init_cli_connections()
init_cli_deployment_config()
init_cli_resource()
main()
if __name__ == '__main__': if __name__ == '__main__':
main() run()

View File

@ -1,4 +1,7 @@
from solar.core import signals from solar.core import signals
from solar.interfaces.db import get_db
db = get_db()
class BaseObserver(object): class BaseObserver(object):
@ -16,6 +19,17 @@ class BaseObserver(object):
self.value = value self.value = value
self.receivers = [] self.receivers = []
# @property
# def receivers(self):
# from solar.core import resource
#
# signals.CLIENTS = signals.Connections.read_clients()
# for receiver_name, receiver_input in signals.Connections.receivers(
# self.attached_to.name,
# self.name
# ):
# yield resource.load(receiver_name).args[receiver_input]
def log(self, msg): def log(self, msg):
print '{} {}'.format(self, msg) print '{} {}'.format(self, msg)
@ -173,6 +187,8 @@ class ListObserver(BaseObserver):
self.log('Unsubscribed emitter {}'.format(emitter)) self.log('Unsubscribed emitter {}'.format(emitter))
idx = self._emitter_idx(emitter) idx = self._emitter_idx(emitter)
self.value.pop(idx) self.value.pop(idx)
for receiver in self.receivers:
receiver.notify(self)
def _emitter_idx(self, emitter): def _emitter_idx(self, emitter):
try: try:

View File

@ -46,6 +46,21 @@ class Resource(object):
return ("Resource(name='{name}', metadata={metadata}, args={args}, " return ("Resource(name='{name}', metadata={metadata}, args={args}, "
"tags={tags})").format(**self.to_dict()) "tags={tags})").format(**self.to_dict())
def color_repr(self):
import click
arg_color = 'yellow'
return ("{resource_s}({name_s}='{name}', {metadata_s}={metadata}, "
"{args_s}={args}, {tags_s}={tags})").format(
resource_s=click.style('Resource', fg='white', bold=True),
name_s=click.style('name', fg=arg_color, bold=True),
metadata_s=click.style('metadata', fg=arg_color, bold=True),
args_s=click.style('args', fg=arg_color, bold=True),
tags_s=click.style('tags', fg=arg_color, bold=True),
**self.to_dict()
)
def to_dict(self): def to_dict(self):
return { return {
'name': self.name, 'name': self.name,
@ -116,7 +131,7 @@ class Resource(object):
for k, v in self.args_dict().items(): for k, v in self.args_dict().items():
metadata['input'][k]['value'] = v metadata['input'][k]['value'] = v
db.add_resource(self.name, metadata) db.save(self.name, metadata, collection=db.COLLECTIONS.resource)
def create(name, base_path, args, tags=[], connections={}): def create(name, base_path, args, tags=[], connections={}):
@ -152,14 +167,25 @@ def wrap_resource(raw_resource):
return Resource(name, raw_resource, args, tags=tags) return Resource(name, raw_resource, args, tags=tags)
def load(resource_name):
raw_resource = db.read(resource_name, collection=db.COLLECTIONS.resource)
if raw_resource is None:
raise NotImplementedError(
'Resource {} does not exist'.format(resource_name)
)
return wrap_resource(raw_resource)
def load_all(): def load_all():
ret = {} ret = {}
for raw_resource in db.get_list('resource'): for raw_resource in db.get_list(collection=db.COLLECTIONS.resource):
resource = db.get_obj_resource(raw_resource['id']) resource = wrap_resource(raw_resource)
ret[resource.name] = resource ret[resource.name] = resource
signals.Connections.reconnect_all() #signals.Connections.reconnect_all()
return ret return ret

View File

@ -11,12 +11,48 @@ from solar.interfaces.db import get_db
db = get_db() db = get_db()
CLIENTS_CONFIG_KEY = 'clients-data-file' CLIENTS_CONFIG_KEY = 'clients-data-file'
CLIENTS = utils.read_config_file(CLIENTS_CONFIG_KEY) #CLIENTS = utils.read_config_file(CLIENTS_CONFIG_KEY)
CLIENTS = {}
class Connections(object): class Connections(object):
"""
CLIENTS structure is:
emitter_name:
emitter_input_name:
- - dst_name
- dst_input_name
while DB structure is:
emitter_name_key:
emitter: emitter_name
sources:
emitter_input_name:
- - dst_name
- dst_input_name
"""
@staticmethod
def read_clients():
ret = {}
for data in db.get_list(collection=db.COLLECTIONS.connection):
ret[data['emitter']] = data['sources']
return ret
@staticmethod
def save_clients():
for emitter_name, sources in CLIENTS.items():
data = {
'emitter': emitter_name,
'sources': sources,
}
db.save(emitter_name, data, collection=db.COLLECTIONS.connection)
@staticmethod @staticmethod
def add(emitter, src, receiver, dst): def add(emitter, src, receiver, dst):
if src not in emitter.args: if src not in emitter.args:
@ -32,6 +68,7 @@ class Connections(object):
CLIENTS[emitter.name][src].append([receiver.name, dst]) CLIENTS[emitter.name][src].append([receiver.name, dst])
#utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
Connections.save_clients()
@staticmethod @staticmethod
def remove(emitter, src, receiver, dst): def remove(emitter, src, receiver, dst):
@ -41,6 +78,7 @@ class Connections(object):
] ]
#utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
Connections.save_clients()
@staticmethod @staticmethod
def reconnect_all(): def reconnect_all():
@ -48,14 +86,24 @@ class Connections(object):
:return: :return:
""" """
from solar.core.resource import wrap_resource
for emitter_name, dest_dict in CLIENTS.items(): for emitter_name, dest_dict in CLIENTS.items():
emitter = db.get_obj_resource(emitter_name) emitter = wrap_resource(
db.read(emitter_name, collection=db.COLLECTIONS.resource)
)
for emitter_input, destinations in dest_dict.items(): for emitter_input, destinations in dest_dict.items():
for receiver_name, receiver_input in destinations: for receiver_name, receiver_input in destinations:
receiver = db.get_obj_resource(receiver_name) receiver = wrap_resource(
db.read(receiver_name, collection=db.COLLECTIONS.resource)
)
emitter.args[emitter_input].subscribe( emitter.args[emitter_input].subscribe(
receiver.args[receiver_input]) receiver.args[receiver_input])
@staticmethod
def receivers(emitter_name, emitter_input_name):
return CLIENTS.get(emitter_name, {}).get(emitter_input_name, [])
@staticmethod @staticmethod
def clear(): def clear():
global CLIENTS global CLIENTS
@ -69,10 +117,12 @@ class Connections(object):
@staticmethod @staticmethod
def flush(): def flush():
print 'FLUSHING Connections' print 'FLUSHING Connections'
utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
Connections.save_clients()
atexit.register(Connections.flush) CLIENTS = Connections.read_clients()
#atexit.register(Connections.flush)
def guess_mapping(emitter, receiver): def guess_mapping(emitter, receiver):
@ -135,7 +185,7 @@ def disconnect_receiver_by_input(receiver, input):
:return: :return:
""" """
for emitter_name, inputs in CLIENTS.items(): for emitter_name, inputs in CLIENTS.items():
emitter = db.get_resource(emitter_name) emitter = db.read(emitter_name, collection=db.COLLECTIONS.resource)
disconnect_by_src(emitter['id'], input, receiver) disconnect_by_src(emitter['id'], input, receiver)
@ -150,11 +200,15 @@ def disconnect_by_src(emitter_name, src, receiver):
def notify(source, key, value): def notify(source, key, value):
from solar.core.resource import wrap_resource
CLIENTS.setdefault(source.name, {}) CLIENTS.setdefault(source.name, {})
print 'Notify', source.name, key, value, CLIENTS[source.name] print 'Notify', source.name, key, value, CLIENTS[source.name]
if key in CLIENTS[source.name]: if key in CLIENTS[source.name]:
for client, r_key in CLIENTS[source.name][key]: for client, r_key in CLIENTS[source.name][key]:
resource = db.get_obj_resource(client) resource = wrap_resource(
db.read(client, collection=db.COLLECTIONS.resource)
)
print 'Resource found', client print 'Resource found', client
if resource: if resource:
resource.update({r_key: value}, emitter=source) resource.update({r_key: value}, emitter=source)

View File

@ -1,16 +1,19 @@
from solar.interfaces.db.file_system_db import FileSystemDB
from solar.interfaces.db.cached_file_system_db import CachedFileSystemDB from solar.interfaces.db.cached_file_system_db import CachedFileSystemDB
from solar.interfaces.db.file_system_db import FileSystemDB
from solar.interfaces.db.redis_db import RedisDB
mapping = { mapping = {
'cached_file_system': CachedFileSystemDB, 'cached_file_system': CachedFileSystemDB,
'file_system': FileSystemDB 'file_system': FileSystemDB,
'redis_db': RedisDB,
} }
DB = None DB = None
def get_db(): def get_db():
# Should be retrieved from config # Should be retrieved from config
global DB global DB
if DB is None: if DB is None:
DB = mapping['cached_file_system']() DB = mapping['redis_db']()
return DB return DB

View File

@ -0,0 +1,47 @@
from enum import Enum
import json
import redis
from solar import utils
from solar import errors
class RedisDB(object):
COLLECTIONS = Enum(
'Collections',
'connection resource state_data state_log'
)
DB = {
'host': 'localhost',
'port': 6379,
}
def __init__(self):
self._r = redis.StrictRedis(**self.DB)
self.entities = {}
def read(self, uid, collection=COLLECTIONS.resource):
try:
return json.loads(
self._r.get(self._make_key(collection, uid))
)
except TypeError:
return None
def save(self, uid, data, collection=COLLECTIONS.resource):
return self._r.set(
self._make_key(collection, uid),
json.dumps(data)
)
def get_list(self, collection=COLLECTIONS.resource):
key_glob = self._make_key(collection, '*')
for key in self._r.keys(key_glob):
yield json.loads(self._r.get(key))
def clear(self):
self._r.flushdb()
def _make_key(self, collection, _id):
return '{0}:{1}'.format(collection, _id)

View File

@ -56,7 +56,14 @@ def stage_changes():
log = state.SL() log = state.SL()
action = None action = None
for res_uid in nx.topological_sort(conn_graph): try:
srt = nx.topological_sort(conn_graph)
except:
for cycle in nx.simple_cycles(conn_graph):
print 'CYCLE: %s' % cycle
raise
for res_uid in srt:
commited_data = commited.get(res_uid, {}) commited_data = commited.get(res_uid, {})
staged_data = to_dict(resources[res_uid], conn_graph) staged_data = to_dict(resources[res_uid], conn_graph)
@ -160,7 +167,7 @@ def rollback(log_item):
log_item.res, df, guess_action(commited, staged)) log_item.res, df, guess_action(commited, staged))
log.add(log_item) log.add(log_item)
res = db.get_obj_resource(log_item.res) res = resource.load(log_item.res)
res.update(staged.get('args', {})) res.update(staged.get('args', {}))
res.save() res.save()

View File

@ -23,12 +23,10 @@ from enum import Enum
from solar.interfaces.db import get_db from solar.interfaces.db import get_db
import yaml
db = get_db() db = get_db()
STATES = Enum('States', 'pending inprogress error success') STATES = Enum('States', 'error inprogress pending success')
def state_file(name): def state_file(name):
@ -76,8 +74,9 @@ class Log(object):
def __init__(self, path): def __init__(self, path):
self.path = path self.path = path
items = [] items = []
if path in db: r = db.read(path, collection=db.COLLECTIONS.state_log)
items = db[path] or items if r:
items = r or items
self.items = deque([LogItem( self.items = deque([LogItem(
l['uid'], l['res'], l['uid'], l['res'],
@ -85,8 +84,11 @@ class Log(object):
getattr(STATES, l['state'])) for l in items]) getattr(STATES, l['state'])) for l in items])
def sync(self): def sync(self):
db[self.path] = [i.to_dict() for i in self.items] db.save(
self.path,
[i.to_dict() for i in self.items],
collection=db.COLLECTIONS.state_log
)
def add(self, logitem): def add(self, logitem):
self.items.append(logitem) self.items.append(logitem)
@ -103,7 +105,7 @@ class Log(object):
return item return item
def show(self, verbose=False): def show(self, verbose=False):
return ['L(uuid={0}, res={1}, aciton={2})'.format( return ['L(uuid={0}, res={1}, action={2})'.format(
l.uid, l.res, l.action) for l in self.items] l.uid, l.res, l.action) for l in self.items]
def __repr__(self): def __repr__(self):
@ -121,19 +123,20 @@ class Data(collections.MutableMapping):
def __init__(self, path): def __init__(self, path):
self.path = path self.path = path
self.store = {} self.store = {}
if path in db: r = db.read(path, collection=db.COLLECTIONS.state_data)
self.store = db[path] or self.store if r:
self.store = r or self.store
def __getitem__(self, key): def __getitem__(self, key):
return self.store[key] return self.store[key]
def __setitem__(self, key, value): def __setitem__(self, key, value):
self.store[key] = value self.store[key] = value
db[self.path] = self.store db.save(self.path, self.store, collection=db.COLLECTIONS.state_data)
def __delitem__(self, key): def __delitem__(self, key):
self.store.pop(key) self.store.pop(key)
db[self.path] = self.store db.save(self.path, self.store, collection=db.COLLECTIONS.state_data)
def __iter__(self): def __iter__(self):
return iter(self.store) return iter(self.store)

View File

@ -329,6 +329,91 @@ input:
(sample2.args['port'].attached_to.name, 'port')] (sample2.args['port'].attached_to.name, 'port')]
) )
# Test disconnect
xs.disconnect(sample2, list_input_multi)
self.assertEqual(
[ip['value'] for ip in list_input_multi.args['ips'].value],
[sample1.args['ip']]
)
self.assertEqual(
[p['value'] for p in list_input_multi.args['ports'].value],
[sample1.args['port']]
)
def test_nested_list_input(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
handler: ansible
version: 1.0.0
input:
ip:
schema: str
value:
port:
schema: int
value:
""")
list_input_meta_dir = self.make_resource_meta("""
id: list-input
handler: ansible
version: 1.0.0
input:
ips:
schema: [str]
value: []
ports:
schema: [int]
value: []
""")
list_input_nested_meta_dir = self.make_resource_meta("""
id: list-input-nested
handler: ansible
version: 1.0.0
input:
ipss:
schema: [[str]]
value: []
portss:
schema: [[int]]
value: []
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'ip': '10.0.0.1', 'port': '1000'}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {'ip': '10.0.0.2', 'port': '1001'}
)
list_input = self.create_resource(
'list-input', list_input_meta_dir, {'ips': [], 'ports': []}
)
list_input_nested = self.create_resource(
'list-input-nested', list_input_nested_meta_dir, {'ipss': [], 'portss': []}
)
xs.connect(sample1, list_input, mapping={'ip': 'ips', 'port': 'ports'})
xs.connect(sample2, list_input, mapping={'ip': 'ips', 'port': 'ports'})
xs.connect(list_input, list_input_nested, mapping={'ips': 'ipss', 'ports': 'portss'})
self.assertListEqual(
[ips['value'] for ips in list_input_nested.args['ipss'].value],
[list_input.args['ips'].value]
)
self.assertListEqual(
[ps['value'] for ps in list_input_nested.args['portss'].value],
[list_input.args['ports'].value]
)
# Test disconnect
xs.disconnect(sample1, list_input)
self.assertListEqual(
[[ip['value'] for ip in ips['value']] for ips in list_input_nested.args['ipss'].value],
[[sample2.args['ip'].value]]
)
self.assertListEqual(
[[p['value'] for p in ps['value']] for ps in list_input_nested.args['portss'].value],
[[sample2.args['port'].value]]
)
''' '''
class TestMultiInput(base.BaseResourceTest): class TestMultiInput(base.BaseResourceTest):