diff --git a/cli.py b/cli.py deleted file mode 100755 index 371d5283..00000000 --- a/cli.py +++ /dev/null @@ -1,236 +0,0 @@ -#!/usr/bin/env python -import click -import json -#import matplotlib -#matplotlib.use('Agg') # don't show windows -#import matplotlib.pyplot as plt -import networkx as nx -import os -import subprocess - -from solar.core import actions as xa -from solar.core import resource as xr -from solar.core import signals as xs -from solar import operations -from solar import state - -from solar.interfaces.db import get_db - -db = get_db() - - -@click.group() -def cli(): - pass - - -def init_cli_resource(): - @click.group() - def resource(): - pass - - cli.add_command(resource) - - @click.command() - @click.argument('resource_path') - @click.argument('action_name') - def action(action_name, resource_path): - print 'action', resource_path, action_name - r = xr.load(resource_path) - xa.resource_action(r, action_name) - - resource.add_command(action) - - @click.command() - @click.argument('name') - @click.argument('base_path') - @click.argument('dest_path') - @click.argument('args') - def create(args, dest_path, base_path, name): - print 'create', name, base_path, dest_path, args - args = json.loads(args) - xr.create(name, base_path, dest_path, args) - - resource.add_command(create) - - @click.command() - @click.argument('resource_path') - @click.argument('tag_name') - @click.option('--add/--delete', default=True) - def tag(add, tag_name, resource_path): - print 'Tag', resource_path, tag_name, add - r = xr.load(resource_path) - if add: - r.add_tag(tag_name) - else: - r.remove_tag(tag_name) - r.save() - - resource.add_command(tag) - - @click.command() - @click.argument('path') - @click.option('--all/--one', default=False) - @click.option('--tag', default=None) - @click.option('--use-json/--no-use-json', default=False) - def show(use_json, tag, all, path): - import json - import six - - printer = lambda r: six.print_(r) - if use_json: - printer = lambda r: six.print_(json.dumps(r.to_dict())) - - if all or tag: - for name, resource in xr.load_all(path).items(): - show = True - if tag: - if tag not in resource.tags: - show = False - - if show: - printer(resource) - else: - printer(xr.load(path)) - - resource.add_command(show) - - @click.command() - @click.argument('name') - @click.argument('args') - def update(name, args): - args = json.loads(args) - all = xr.load_all() - r = all[name] - r.update(args) - resource.add_command(update) - - -def init_cli_connect(): - @click.command() - @click.argument('emitter') - @click.argument('receiver') - @click.option('--mapping', default=None) - def connect(mapping, receiver, emitter): - print 'Connect', emitter, receiver - emitter = db.get_obj_resource(emitter) - receiver = db.get_obj_resource(receiver) - print emitter - print receiver - if mapping is not None: - mapping = json.loads(mapping) - xs.connect(emitter, receiver, mapping=mapping) - - cli.add_command(connect) - - @click.command() - @click.argument('emitter') - @click.argument('receiver') - def disconnect(receiver, emitter): - print 'Disconnect', emitter, receiver - emitter = db.get_obj_resource(emitter) - receiver = db.get_obj_resource(receiver) - print emitter - print receiver - xs.disconnect(emitter, receiver) - - cli.add_command(disconnect) - - -def init_changes(): - @click.group() - def changes(): - pass - - cli.add_command(changes) - - @click.command() - def stage(): - log = operations.stage_changes() - print log.show() - - changes.add_command(stage) - - @click.command() - @click.option('--one', is_flag=True, default=False) - def commit(one): - if one: - operations.commit_one() - else: - operations.commit_changes() - - changes.add_command(commit) - - @click.command() - @click.option('--limit', default=5) - def history(limit): - print state.CL().show() - - changes.add_command(history) - - @click.command() - @click.option('--last', is_flag=True, default=False) - @click.option('--all', is_flag=True, default=False) - @click.option('--uid', default=None) - def rollback(last, all, uid): - if last: - print operations.rollback_last() - elif all: - print operations.rollback_all() - elif uid: - print operations.rollback_uid(uid) - - changes.add_command(rollback) - - -def init_cli_connections(): - @click.group() - def connections(): - pass - - cli.add_command(connections) - - @click.command() - def show(): - print json.dumps(xs.CLIENTS, indent=2) - - connections.add_command(show) - - # TODO: this requires graphing libraries - @click.command() - def graph(): - #g = xs.connection_graph() - g = xs.detailed_connection_graph() - - nx.write_dot(g, 'graph.dot') - subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png']) - - # Matplotlib - #pos = nx.spring_layout(g) - #nx.draw_networkx_nodes(g, pos) - #nx.draw_networkx_edges(g, pos, arrows=True) - #nx.draw_networkx_labels(g, pos) - #plt.axis('off') - #plt.savefig('graph.png') - - connections.add_command(graph) - - -def init_cli_deployment_config(): - @click.command() - @click.argument('filepath') - def deploy(filepath): - print 'Deploying from file {}'.format(filepath) - xd.deploy(filepath) - - cli.add_command(deploy) - - -if __name__ == '__main__': - init_cli_resource() - init_cli_connect() - init_cli_connections() - init_cli_deployment_config() - init_changes() - - cli() diff --git a/example.py b/example.py index 5a628575..f3869ee0 100644 --- a/example.py +++ b/example.py @@ -252,7 +252,7 @@ def deploy(): def undeploy(): db = get_db() - resources = map(resource.wrap_resource, db.get_list('resource')) + resources = map(resource.wrap_resource, db.get_list(collection=db.COLLECTIONS.resource)) resources = {r.name: r for r in resources} actions.resource_action(resources['glance_api_endpoint'], 'remove') diff --git a/jenkins-config.yaml b/jenkins-config.yaml index e8c2adaf..e21e7446 100644 --- a/jenkins-config.yaml +++ b/jenkins-config.yaml @@ -1 +1,4 @@ clients-data-file: /tmp/connections.yaml + +file-system-db: + storage-path: /tmp/storage diff --git a/main.yml b/main.yml index 073149f4..eedf726c 100644 --- a/main.yml +++ b/main.yml @@ -3,6 +3,9 @@ - hosts: all sudo: yes tasks: + - apt: name=redis-server state=present + - apt: name=python-redis state=present + # Setup additional development tools - apt: name=vim state=present - apt: name=tmux state=present diff --git a/requirements.txt b/requirements.txt index de631f81..2a39176f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ requests==2.7.0 mock dictdiffer==0.4.0 enum34==1.0.4 +redis==2.10.3 diff --git a/resources/glance_api_service/actions/run.yml b/resources/glance_api_service/actions/run.yml index 567748b0..292fe6bd 100644 --- a/resources/glance_api_service/actions/run.yml +++ b/resources/glance_api_service/actions/run.yml @@ -26,3 +26,6 @@ - {{ bind['value']['src'] }}:{{ bind['value']['dst'] }}:{{ bind['value'].get('mode', 'ro') }} {% endfor %} {% endif %} + + - name: wait for glance api + wait_for: host={{ ip }} port=9393 timeout=20 diff --git a/resources/glance_registry_service/actions/run.yml b/resources/glance_registry_service/actions/run.yml index 51b208b6..2139641d 100644 --- a/resources/glance_registry_service/actions/run.yml +++ b/resources/glance_registry_service/actions/run.yml @@ -27,3 +27,6 @@ - {{ bind['value']['src'] }}:{{ bind['value']['dst'] }}:{{ bind['value'].get('mode', 'ro') }} {% endfor %} {% endif %} + + - name: wait for glance registry + wait_for: host={{ ip }} port=9191 timeout=20 diff --git a/solar/setup.py b/solar/setup.py index d50a6575..1a846ea9 100644 --- a/solar/setup.py +++ b/solar/setup.py @@ -46,4 +46,4 @@ setup( include_package_data=True, entry_points={ 'console_scripts': [ - 'solar = solar.cli:main']}) + 'solar = solar.cli:run']}) diff --git a/solar/solar/cli.py b/solar/solar/cli.py index b5a4493c..1518718f 100644 --- a/solar/solar/cli.py +++ b/solar/solar/cli.py @@ -17,17 +17,22 @@ On create "golden" resource should be moved to special place """ -import argparse +import click +import json +import networkx as nx import os -import sys import pprint - -import textwrap +import subprocess import yaml from solar import utils +from solar import operations +from solar import state +from solar.core import actions +from solar.core import resource as sresource from solar.core.resource import assign_resources_to_nodes from solar.core.resource import connect_resources +from solar.core import signals from solar.core.tags_set_parser import Expression from solar.interfaces.db import get_db @@ -36,98 +41,19 @@ from solar.interfaces.db import get_db from solar.extensions.modules.discovery import Discovery -class Cmd(object): +db = get_db() - def __init__(self): - self.parser = argparse.ArgumentParser( - description=textwrap.dedent(__doc__), - formatter_class=argparse.RawDescriptionHelpFormatter) - self.subparser = self.parser.add_subparsers( - title='actions', - description='Supported actions', - help='Provide of one valid actions') - self.register_actions() - self.db = get_db() - def parse(self, args): - parsed = self.parser.parse_args(args) - return parsed.func(parsed) +@click.group() +def main(): + pass - def register_actions(self): - parser = self.subparser.add_parser('discover') - parser.set_defaults(func=getattr(self, 'discover')) - - # Profile actions - parser = self.subparser.add_parser('profile') - parser.set_defaults(func=getattr(self, 'profile')) - parser.add_argument('-l', '--list', dest='list', action='store_true') - group = parser.add_argument_group('create') - group.add_argument('-c', '--create', dest='create', action='store_true') - group.add_argument('-t', '--tags', nargs='+', default=['env/test_env']) - group.add_argument('-i', '--id', default=utils.generate_uuid()) - - # Assign - parser = self.subparser.add_parser('assign') - parser.set_defaults(func=getattr(self, 'assign')) - parser.add_argument('-n', '--nodes') - parser.add_argument('-r', '--resources') - - # Run action on tags - parser = self.subparser.add_parser('run') - parser.set_defaults(func=getattr(self, 'run')) - parser.add_argument('-t', '--tags') - parser.add_argument('-a', '--action') - - # Perform resources connection - parser = self.subparser.add_parser('connect') - parser.set_defaults(func=getattr(self, 'connect')) - parser.add_argument( - '-p', - '--profile') - - def run(self, args): - from solar.core import actions - from solar.core import signals - - resources = filter( - lambda r: Expression(args.tags, r.get('tags', [])).evaluate(), - self.db.get_list('resource')) - - for resource in resources: - resource_obj = self.db.get_obj_resource(resource['id']) - actions.resource_action(resource_obj, args.action) - - def profile(self, args): - if args.create: - params = {'tags': args.tags, 'id': args.id} - profile_template_path = os.path.join( - utils.read_config()['template-dir'], 'profile.yml') - data = yaml.load(utils.render_template(profile_template_path, params)) - self.db.store('profiles', data) - else: - pprint.pprint(self.db.get_list('profiles')) - - def discover(self, args): - Discovery({'id': 'discovery'}).discover() - - def connect(self, args): - profile = self.db.get_record('profiles', args.profile) - connect_resources(profile) - - def assign(self, args): - nodes = filter( - lambda n: Expression(args.nodes, n.get('tags', [])).evaluate(), - self.db.get_list('nodes')) - - resources = filter( - lambda r: Expression(args.resources, r.get('tags', [])).evaluate(), - self._get_resources_list()) - - print("For {0} nodes assign {1} resources".format(len(nodes), len(resources))) - assign_resources_to_nodes(resources, nodes) - - def _get_resources_list(self): +@main.command() +@click.option('-n', '--nodes') +@click.option('-r', '--resources') +def assign(resources, nodes): + def _get_resources_list(): result = [] for path in utils.find_by_mask(utils.read_config()['resources-files-mask']): resource = utils.yaml_load(path) @@ -137,11 +63,246 @@ class Cmd(object): return result + nodes = filter( + lambda n: Expression(nodes, n.get('tags', [])).evaluate(), + db.get_list('nodes')) -def main(): - api = Cmd() - api.parse(sys.argv[1:]) + resources = filter( + lambda r: Expression(resources, r.get('tags', [])).evaluate(), + _get_resources_list()) + + print("For {0} nodes assign {1} resources".format(len(nodes), len(resources))) + assign_resources_to_nodes(resources, nodes) + + +# @main.command() +# @click.option('-p', '--profile') +# def connect(profile): +# profile_ = db.get_record('profiles', profile) +# connect_resources(profile_) + + +@main.command() +def discover(): + Discovery({'id': 'discovery'}).discover() + + +@main.command() +@click.option('-c', '--create', default=False, is_flag=True) +@click.option('-t', '--tags', multiple=True) +@click.option('-i', '--id') +def profile(id, tags, create): + if not id: + id = utils.generate_uuid() + if create: + params = {'tags': tags, 'id': id} + profile_template_path = os.path.join( + utils.read_config()['template-dir'], 'profile.yml') + data = yaml.load(utils.render_template(profile_template_path, params)) + db.store('profiles', data) + else: + pprint.pprint(db.get_list('profiles')) + + +def init_actions(): + @main.command() + @click.option('-t', '--tags') + @click.option('-a', '--action') + def run(action, tags): + from solar.core import actions + from solar.core import resource + + resources = filter( + lambda r: Expression(tags, r.get('tags', [])).evaluate(), + db.get_list('resource')) + + for resource in resources: + resource_obj = sresource.load(resource['id']) + actions.resource_action(resource_obj, action) + + +def init_changes(): + @main.group() + def changes(): + pass + + @changes.command() + def stage(): + log = operations.stage_changes() + print log.show() + + @changes.command() + @click.option('--one', is_flag=True, default=False) + def commit(one): + if one: + operations.commit_one() + else: + operations.commit_changes() + + @changes.command() + @click.option('--limit', default=5) + def history(limit): + print state.CL().show() + + @changes.command() + @click.option('--last', is_flag=True, default=False) + @click.option('--all', is_flag=True, default=False) + @click.option('--uid', default=None) + def rollback(last, all, uid): + if last: + print operations.rollback_last() + elif all: + print operations.rollback_all() + elif uid: + print operations.rollback_uid(uid) + + +def init_cli_connect(): + @main.command() + @click.argument('emitter') + @click.argument('receiver') + @click.option('--mapping', default=None) + def connect(mapping, receiver, emitter): + print 'Connect', emitter, receiver + emitter = sresource.load(emitter) + receiver = sresource.load(receiver) + print emitter + print receiver + if mapping is not None: + mapping = json.loads(mapping) + signals.connect(emitter, receiver, mapping=mapping) + + @main.command() + @click.argument('emitter') + @click.argument('receiver') + def disconnect(receiver, emitter): + print 'Disconnect', emitter, receiver + emitter = sresource.load(emitter) + receiver = sresource.load(receiver) + print emitter + print receiver + signals.disconnect(emitter, receiver) + + +def init_cli_connections(): + @main.group() + def connections(): + pass + + @connections.command() + def show(): + print json.dumps(signals.CLIENTS, indent=2) + + # TODO: this requires graphing libraries + @connections.command() + def graph(): + #g = xs.connection_graph() + g = signals.detailed_connection_graph() + + nx.write_dot(g, 'graph.dot') + subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png']) + + # Matplotlib + #pos = nx.spring_layout(g) + #nx.draw_networkx_nodes(g, pos) + #nx.draw_networkx_edges(g, pos, arrows=True) + #nx.draw_networkx_labels(g, pos) + #plt.axis('off') + #plt.savefig('graph.png') + + +def init_cli_deployment_config(): + @main.command() + @click.argument('filepath') + def deploy(filepath): + print 'Deploying from file {}'.format(filepath) + xd.deploy(filepath) + + +def init_cli_resource(): + @main.group() + def resource(): + pass + + @resource.command() + @click.argument('resource_path') + @click.argument('action_name') + def action(action_name, resource_path): + print 'action', resource_path, action_name + r = sresource.load(resource_path) + actions.resource_action(r, action_name) + + @resource.command() + @click.argument('name') + @click.argument('base_path') + @click.argument('args') + def create(args, base_path, name): + print 'create', name, base_path, args + args = json.loads(args) + sresource.create(name, base_path, args) + + @resource.command() + @click.option('--tag', default=None) + @click.option('--use-json/--no-use-json', default=False) + @click.option('--color/--no-color', default=True) + def show(color, use_json, tag): + resources = [] + + for name, res in sresource.load_all().items(): + show = True + if tag: + if tag not in res.tags: + show = False + + if show: + resources.append(res) + + if use_json: + output = json.dumps([r.to_dict() for r in resources], indent=2) + else: + if color: + formatter = lambda r: r.color_repr() + else: + formatter = lambda r: unicode(r) + output = '\n'.join(formatter(r) for r in resources) + + if output: + click.echo_via_pager(output) + + + @resource.command() + @click.argument('resource_path') + @click.argument('tag_name') + @click.option('--add/--delete', default=True) + def tag(add, tag_name, resource_path): + print 'Tag', resource_path, tag_name, add + r = sresource.load(resource_path) + if add: + r.add_tag(tag_name) + else: + r.remove_tag(tag_name) + r.save() + + @resource.command() + @click.argument('name') + @click.argument('args') + def update(name, args): + args = json.loads(args) + all = sresource.load_all() + r = all[name] + r.update(args) + + +def run(): + init_actions() + init_changes() + init_cli_connect() + init_cli_connections() + init_cli_deployment_config() + init_cli_resource() + + main() if __name__ == '__main__': - main() + run() diff --git a/solar/solar/core/observer.py b/solar/solar/core/observer.py index 3c3cd718..3ff35230 100644 --- a/solar/solar/core/observer.py +++ b/solar/solar/core/observer.py @@ -1,4 +1,7 @@ from solar.core import signals +from solar.interfaces.db import get_db + +db = get_db() class BaseObserver(object): @@ -16,6 +19,17 @@ class BaseObserver(object): self.value = value self.receivers = [] + # @property + # def receivers(self): + # from solar.core import resource + # + # signals.CLIENTS = signals.Connections.read_clients() + # for receiver_name, receiver_input in signals.Connections.receivers( + # self.attached_to.name, + # self.name + # ): + # yield resource.load(receiver_name).args[receiver_input] + def log(self, msg): print '{} {}'.format(self, msg) @@ -173,6 +187,8 @@ class ListObserver(BaseObserver): self.log('Unsubscribed emitter {}'.format(emitter)) idx = self._emitter_idx(emitter) self.value.pop(idx) + for receiver in self.receivers: + receiver.notify(self) def _emitter_idx(self, emitter): try: diff --git a/solar/solar/core/resource.py b/solar/solar/core/resource.py index 607be42a..f7749c87 100644 --- a/solar/solar/core/resource.py +++ b/solar/solar/core/resource.py @@ -46,6 +46,21 @@ class Resource(object): return ("Resource(name='{name}', metadata={metadata}, args={args}, " "tags={tags})").format(**self.to_dict()) + def color_repr(self): + import click + + arg_color = 'yellow' + + return ("{resource_s}({name_s}='{name}', {metadata_s}={metadata}, " + "{args_s}={args}, {tags_s}={tags})").format( + resource_s=click.style('Resource', fg='white', bold=True), + name_s=click.style('name', fg=arg_color, bold=True), + metadata_s=click.style('metadata', fg=arg_color, bold=True), + args_s=click.style('args', fg=arg_color, bold=True), + tags_s=click.style('tags', fg=arg_color, bold=True), + **self.to_dict() + ) + def to_dict(self): return { 'name': self.name, @@ -116,7 +131,7 @@ class Resource(object): for k, v in self.args_dict().items(): metadata['input'][k]['value'] = v - db.add_resource(self.name, metadata) + db.save(self.name, metadata, collection=db.COLLECTIONS.resource) def create(name, base_path, args, tags=[], connections={}): @@ -152,14 +167,25 @@ def wrap_resource(raw_resource): return Resource(name, raw_resource, args, tags=tags) +def load(resource_name): + raw_resource = db.read(resource_name, collection=db.COLLECTIONS.resource) + + if raw_resource is None: + raise NotImplementedError( + 'Resource {} does not exist'.format(resource_name) + ) + + return wrap_resource(raw_resource) + + def load_all(): ret = {} - for raw_resource in db.get_list('resource'): - resource = db.get_obj_resource(raw_resource['id']) + for raw_resource in db.get_list(collection=db.COLLECTIONS.resource): + resource = wrap_resource(raw_resource) ret[resource.name] = resource - signals.Connections.reconnect_all() + #signals.Connections.reconnect_all() return ret diff --git a/solar/solar/core/signals.py b/solar/solar/core/signals.py index a819422e..0b3a911d 100644 --- a/solar/solar/core/signals.py +++ b/solar/solar/core/signals.py @@ -11,12 +11,48 @@ from solar.interfaces.db import get_db db = get_db() - CLIENTS_CONFIG_KEY = 'clients-data-file' -CLIENTS = utils.read_config_file(CLIENTS_CONFIG_KEY) +#CLIENTS = utils.read_config_file(CLIENTS_CONFIG_KEY) +CLIENTS = {} class Connections(object): + """ + CLIENTS structure is: + + emitter_name: + emitter_input_name: + - - dst_name + - dst_input_name + + while DB structure is: + + emitter_name_key: + emitter: emitter_name + sources: + emitter_input_name: + - - dst_name + - dst_input_name + """ + + @staticmethod + def read_clients(): + ret = {} + + for data in db.get_list(collection=db.COLLECTIONS.connection): + ret[data['emitter']] = data['sources'] + + return ret + + @staticmethod + def save_clients(): + for emitter_name, sources in CLIENTS.items(): + data = { + 'emitter': emitter_name, + 'sources': sources, + } + db.save(emitter_name, data, collection=db.COLLECTIONS.connection) + @staticmethod def add(emitter, src, receiver, dst): if src not in emitter.args: @@ -32,6 +68,7 @@ class Connections(object): CLIENTS[emitter.name][src].append([receiver.name, dst]) #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) + Connections.save_clients() @staticmethod def remove(emitter, src, receiver, dst): @@ -41,6 +78,7 @@ class Connections(object): ] #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) + Connections.save_clients() @staticmethod def reconnect_all(): @@ -48,14 +86,24 @@ class Connections(object): :return: """ + from solar.core.resource import wrap_resource + for emitter_name, dest_dict in CLIENTS.items(): - emitter = db.get_obj_resource(emitter_name) + emitter = wrap_resource( + db.read(emitter_name, collection=db.COLLECTIONS.resource) + ) for emitter_input, destinations in dest_dict.items(): for receiver_name, receiver_input in destinations: - receiver = db.get_obj_resource(receiver_name) + receiver = wrap_resource( + db.read(receiver_name, collection=db.COLLECTIONS.resource) + ) emitter.args[emitter_input].subscribe( receiver.args[receiver_input]) + @staticmethod + def receivers(emitter_name, emitter_input_name): + return CLIENTS.get(emitter_name, {}).get(emitter_input_name, []) + @staticmethod def clear(): global CLIENTS @@ -69,10 +117,12 @@ class Connections(object): @staticmethod def flush(): print 'FLUSHING Connections' - utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) + #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) + Connections.save_clients() -atexit.register(Connections.flush) +CLIENTS = Connections.read_clients() +#atexit.register(Connections.flush) def guess_mapping(emitter, receiver): @@ -135,7 +185,7 @@ def disconnect_receiver_by_input(receiver, input): :return: """ for emitter_name, inputs in CLIENTS.items(): - emitter = db.get_resource(emitter_name) + emitter = db.read(emitter_name, collection=db.COLLECTIONS.resource) disconnect_by_src(emitter['id'], input, receiver) @@ -150,11 +200,15 @@ def disconnect_by_src(emitter_name, src, receiver): def notify(source, key, value): + from solar.core.resource import wrap_resource + CLIENTS.setdefault(source.name, {}) print 'Notify', source.name, key, value, CLIENTS[source.name] if key in CLIENTS[source.name]: for client, r_key in CLIENTS[source.name][key]: - resource = db.get_obj_resource(client) + resource = wrap_resource( + db.read(client, collection=db.COLLECTIONS.resource) + ) print 'Resource found', client if resource: resource.update({r_key: value}, emitter=source) diff --git a/solar/solar/interfaces/db/__init__.py b/solar/solar/interfaces/db/__init__.py index 9db436bb..92f519a6 100644 --- a/solar/solar/interfaces/db/__init__.py +++ b/solar/solar/interfaces/db/__init__.py @@ -1,16 +1,19 @@ -from solar.interfaces.db.file_system_db import FileSystemDB from solar.interfaces.db.cached_file_system_db import CachedFileSystemDB +from solar.interfaces.db.file_system_db import FileSystemDB +from solar.interfaces.db.redis_db import RedisDB mapping = { 'cached_file_system': CachedFileSystemDB, - 'file_system': FileSystemDB + 'file_system': FileSystemDB, + 'redis_db': RedisDB, } DB = None + def get_db(): # Should be retrieved from config global DB if DB is None: - DB = mapping['cached_file_system']() + DB = mapping['redis_db']() return DB diff --git a/solar/solar/interfaces/db/redis_db.py b/solar/solar/interfaces/db/redis_db.py new file mode 100644 index 00000000..3c43ccb7 --- /dev/null +++ b/solar/solar/interfaces/db/redis_db.py @@ -0,0 +1,47 @@ +from enum import Enum +import json +import redis + +from solar import utils +from solar import errors + + +class RedisDB(object): + COLLECTIONS = Enum( + 'Collections', + 'connection resource state_data state_log' + ) + DB = { + 'host': 'localhost', + 'port': 6379, + } + + def __init__(self): + self._r = redis.StrictRedis(**self.DB) + self.entities = {} + + def read(self, uid, collection=COLLECTIONS.resource): + try: + return json.loads( + self._r.get(self._make_key(collection, uid)) + ) + except TypeError: + return None + + def save(self, uid, data, collection=COLLECTIONS.resource): + return self._r.set( + self._make_key(collection, uid), + json.dumps(data) + ) + + def get_list(self, collection=COLLECTIONS.resource): + key_glob = self._make_key(collection, '*') + + for key in self._r.keys(key_glob): + yield json.loads(self._r.get(key)) + + def clear(self): + self._r.flushdb() + + def _make_key(self, collection, _id): + return '{0}:{1}'.format(collection, _id) diff --git a/solar/solar/operations.py b/solar/solar/operations.py index e0006ddf..a3e753fc 100644 --- a/solar/solar/operations.py +++ b/solar/solar/operations.py @@ -56,7 +56,14 @@ def stage_changes(): log = state.SL() action = None - for res_uid in nx.topological_sort(conn_graph): + try: + srt = nx.topological_sort(conn_graph) + except: + for cycle in nx.simple_cycles(conn_graph): + print 'CYCLE: %s' % cycle + raise + + for res_uid in srt: commited_data = commited.get(res_uid, {}) staged_data = to_dict(resources[res_uid], conn_graph) @@ -160,7 +167,7 @@ def rollback(log_item): log_item.res, df, guess_action(commited, staged)) log.add(log_item) - res = db.get_obj_resource(log_item.res) + res = resource.load(log_item.res) res.update(staged.get('args', {})) res.save() diff --git a/solar/solar/state.py b/solar/solar/state.py index 3d1070e5..0f3af6c2 100644 --- a/solar/solar/state.py +++ b/solar/solar/state.py @@ -23,12 +23,10 @@ from enum import Enum from solar.interfaces.db import get_db -import yaml - db = get_db() -STATES = Enum('States', 'pending inprogress error success') +STATES = Enum('States', 'error inprogress pending success') def state_file(name): @@ -76,8 +74,9 @@ class Log(object): def __init__(self, path): self.path = path items = [] - if path in db: - items = db[path] or items + r = db.read(path, collection=db.COLLECTIONS.state_log) + if r: + items = r or items self.items = deque([LogItem( l['uid'], l['res'], @@ -85,8 +84,11 @@ class Log(object): getattr(STATES, l['state'])) for l in items]) def sync(self): - db[self.path] = [i.to_dict() for i in self.items] - + db.save( + self.path, + [i.to_dict() for i in self.items], + collection=db.COLLECTIONS.state_log + ) def add(self, logitem): self.items.append(logitem) @@ -103,7 +105,7 @@ class Log(object): return item def show(self, verbose=False): - return ['L(uuid={0}, res={1}, aciton={2})'.format( + return ['L(uuid={0}, res={1}, action={2})'.format( l.uid, l.res, l.action) for l in self.items] def __repr__(self): @@ -121,19 +123,20 @@ class Data(collections.MutableMapping): def __init__(self, path): self.path = path self.store = {} - if path in db: - self.store = db[path] or self.store + r = db.read(path, collection=db.COLLECTIONS.state_data) + if r: + self.store = r or self.store def __getitem__(self, key): return self.store[key] def __setitem__(self, key, value): self.store[key] = value - db[self.path] = self.store + db.save(self.path, self.store, collection=db.COLLECTIONS.state_data) def __delitem__(self, key): self.store.pop(key) - db[self.path] = self.store + db.save(self.path, self.store, collection=db.COLLECTIONS.state_data) def __iter__(self): return iter(self.store) diff --git a/solar/solar/test/test_signals.py b/solar/solar/test/test_signals.py index 8b80e708..08348977 100644 --- a/solar/solar/test/test_signals.py +++ b/solar/solar/test/test_signals.py @@ -329,6 +329,91 @@ input: (sample2.args['port'].attached_to.name, 'port')] ) + # Test disconnect + xs.disconnect(sample2, list_input_multi) + self.assertEqual( + [ip['value'] for ip in list_input_multi.args['ips'].value], + [sample1.args['ip']] + ) + self.assertEqual( + [p['value'] for p in list_input_multi.args['ports'].value], + [sample1.args['port']] + ) + + def test_nested_list_input(self): + sample_meta_dir = self.make_resource_meta(""" +id: sample +handler: ansible +version: 1.0.0 +input: + ip: + schema: str + value: + port: + schema: int + value: + """) + list_input_meta_dir = self.make_resource_meta(""" +id: list-input +handler: ansible +version: 1.0.0 +input: + ips: + schema: [str] + value: [] + ports: + schema: [int] + value: [] + """) + list_input_nested_meta_dir = self.make_resource_meta(""" +id: list-input-nested +handler: ansible +version: 1.0.0 +input: + ipss: + schema: [[str]] + value: [] + portss: + schema: [[int]] + value: [] + """) + + sample1 = self.create_resource( + 'sample1', sample_meta_dir, {'ip': '10.0.0.1', 'port': '1000'} + ) + sample2 = self.create_resource( + 'sample2', sample_meta_dir, {'ip': '10.0.0.2', 'port': '1001'} + ) + list_input = self.create_resource( + 'list-input', list_input_meta_dir, {'ips': [], 'ports': []} + ) + list_input_nested = self.create_resource( + 'list-input-nested', list_input_nested_meta_dir, {'ipss': [], 'portss': []} + ) + + xs.connect(sample1, list_input, mapping={'ip': 'ips', 'port': 'ports'}) + xs.connect(sample2, list_input, mapping={'ip': 'ips', 'port': 'ports'}) + xs.connect(list_input, list_input_nested, mapping={'ips': 'ipss', 'ports': 'portss'}) + self.assertListEqual( + [ips['value'] for ips in list_input_nested.args['ipss'].value], + [list_input.args['ips'].value] + ) + self.assertListEqual( + [ps['value'] for ps in list_input_nested.args['portss'].value], + [list_input.args['ports'].value] + ) + + # Test disconnect + xs.disconnect(sample1, list_input) + self.assertListEqual( + [[ip['value'] for ip in ips['value']] for ips in list_input_nested.args['ipss'].value], + [[sample2.args['ip'].value]] + ) + self.assertListEqual( + [[p['value'] for p in ps['value']] for ps in list_input_nested.args['portss'].value], + [[sample2.args['port'].value]] + ) + ''' class TestMultiInput(base.BaseResourceTest):