diff --git a/.gitignore b/.gitignore index 596446eb..b221e333 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,6 @@ tmp/ state/ clients.json rs/ + +solar.log +x-venv/ diff --git a/example.py b/example.py index 2c1254fb..04e7e235 100644 --- a/example.py +++ b/example.py @@ -27,48 +27,48 @@ def deploy(): node1 = resource.create('node1', 'resources/ro_node/', {'ip': '10.0.0.3', 'ssh_key': '/vagrant/.vagrant/machines/solar-dev1/virtualbox/private_key', 'ssh_user': 'vagrant'}) node2 = resource.create('node2', 'resources/ro_node/', {'ip': '10.0.0.4', 'ssh_key': '/vagrant/.vagrant/machines/solar-dev2/virtualbox/private_key', 'ssh_user': 'vagrant'}) - rabbitmq_service1 = resource.create('rabbitmq_service1', 'resources/rabbitmq_service/', {'ssh_user': '', 'ip': '','management_port': '15672', 'port': '5672', 'ssh_key': '', 'container_name': 'rabbitmq_service1', 'image': 'rabbitmq:3-management'}) - openstack_vhost = resource.create('openstack_vhost', 'resources/rabbitmq_vhost/', {'ssh_user': '', 'ip': '', 'ssh_key': '', 'vhost_name': 'openstack', 'container_name': ''}) - openstack_rabbitmq_user = resource.create('openstack_rabbitmq_user', 'resources/rabbitmq_user/', {'ssh_user': '', 'ip': '', 'ssh_key': '', 'vhost_name': '', 'user_name': 'openstack', 'password': 'openstack_password', 'container_name': ''}) + rabbitmq_service1 = resource.create('rabbitmq_service1', 'resources/rabbitmq_service/', {'management_port': '15672', 'port': '5672', 'container_name': 'rabbitmq_service1', 'image': 'rabbitmq:3-management'}) + openstack_vhost = resource.create('openstack_vhost', 'resources/rabbitmq_vhost/', {'vhost_name': 'openstack'}) + openstack_rabbitmq_user = resource.create('openstack_rabbitmq_user', 'resources/rabbitmq_user/', {'user_name': 'openstack', 'password': 'openstack_password'}) - mariadb_service1 = resource.create('mariadb_service1', 'resources/mariadb_service', {'image': 'mariadb', 'root_password': 'mariadb', 'port': 3306, 'ip': '', 'ssh_user': '', 'ssh_key': ''}) - keystone_db = resource.create('keystone_db', 'resources/mariadb_keystone_db/', {'db_name': 'keystone_db', 'login_password': '', 'login_user': 'root', 'login_port': '', 'ip': '', 'ssh_user': '', 'ssh_key': ''}) - keystone_db_user = resource.create('keystone_db_user', 'resources/mariadb_keystone_user/', {'new_user_name': 'keystone', 'new_user_password': 'keystone', 'db_name': '', 'login_password': '', 'login_user': 'root', 'login_port': '', 'ip': '', 'ssh_user': '', 'ssh_key': ''}) + mariadb_service1 = resource.create('mariadb_service1', 'resources/mariadb_service', {'image': 'mariadb', 'root_password': 'mariadb', 'port': 3306}) + keystone_db = resource.create('keystone_db', 'resources/mariadb_keystone_db/', {'db_name': 'keystone_db', 'login_user': 'root'}) + keystone_db_user = resource.create('keystone_db_user', 'resources/mariadb_keystone_user/', {'new_user_name': 'keystone', 'new_user_password': 'keystone', 'login_user': 'root'}) - keystone_config1 = resource.create('keystone_config1', 'resources/keystone_config/', {'config_dir': '/etc/solar/keystone', 'ip': '', 'ssh_user': '', 'ssh_key': '', 'admin_token': 'admin', 'db_password': '', 'db_name': '', 'db_user': '', 'db_host': '', 'db_port': ''}) - keystone_service1 = resource.create('keystone_service1', 'resources/keystone_service/', {'port': 5001, 'admin_port': 35357, 'image': '', 'ip': '', 'ssh_key': '', 'ssh_user': '', 'config_dir': ''}) + keystone_config1 = resource.create('keystone_config1', 'resources/keystone_config/', {'config_dir': '/etc/solar/keystone', 'admin_token': 'admin'}) + keystone_service1 = resource.create('keystone_service1', 'resources/keystone_service/', {'port': 5001, 'admin_port': 35357}) - keystone_config2 = resource.create('keystone_config2', 'resources/keystone_config/', {'config_dir': '/etc/solar/keystone', 'ip': '', 'ssh_user': '', 'ssh_key': '', 'admin_token': 'admin', 'db_password': '', 'db_name': '', 'db_user': '', 'db_host': '', 'db_port': ''}) - keystone_service2 = resource.create('keystone_service2', 'resources/keystone_service/', {'port': 5002, 'admin_port': 35358, 'image': '', 'ip': '', 'ssh_key': '', 'ssh_user': '', 'config_dir': ''}) + keystone_config2 = resource.create('keystone_config2', 'resources/keystone_config/', {'config_dir': '/etc/solar/keystone', 'admin_token': 'admin'}) + keystone_service2 = resource.create('keystone_service2', 'resources/keystone_service/', {'port': 5002, 'admin_port': 35358}) haproxy_keystone_config = resource.create('haproxy_keystone1_config', 'resources/haproxy_service_config/', {'name': 'keystone_config', 'listen_port': 5000, 'servers':[], 'ports':[]}) - haproxy_config = resource.create('haproxy_config', 'resources/haproxy_config', {'ip': '', 'ssh_key': '', 'ssh_user': '', 'configs_names':[], 'configs_ports':[], 'listen_ports':[], 'configs':[], 'config_dir': ''}) - haproxy_service = resource.create('haproxy_service', 'resources/docker_container/', {'image': 'tutum/haproxy', 'ports': [], 'host_binds': [], 'volume_binds':[], 'ip': '', 'ssh_key': '', 'ssh_user': ''}) + haproxy_config = resource.create('haproxy_config', 'resources/haproxy_config', {'configs_names':[], 'configs_ports':[], 'listen_ports':[], 'configs':[]}) + haproxy_service = resource.create('haproxy_service', 'resources/docker_container/', {'image': 'tutum/haproxy', 'ports': [], 'host_binds': [], 'volume_binds':[]}) - glance_db = resource.create('glance_db', 'resources/mariadb_db/', {'db_name': 'glance_db', 'login_password': '', 'login_user': 'root', 'login_port': '', 'ip': '', 'ssh_user': '', 'ssh_key': ''}) - glance_db_user = resource.create('glance_db_user', 'resources/mariadb_user/', {'new_user_name': 'glance', 'new_user_password': 'glance', 'db_name': '', 'login_password': '', 'login_user': 'root', 'login_port': '', 'ip': '', 'ssh_user': '', 'ssh_key': ''}) + glance_db = resource.create('glance_db', 'resources/mariadb_db/', {'db_name': 'glance_db', 'login_user': 'root'}) + glance_db_user = resource.create('glance_db_user', 'resources/mariadb_user/', {'new_user_name': 'glance', 'new_user_password': 'glance', 'login_user': 'root'}) - services_tenant = resource.create('glance_keystone_tenant', 'resources/keystone_tenant', {'keystone_host': '', 'keystone_port': '', 'login_user': 'admin', 'admin_token': '', 'tenant_name': 'services', 'ip': '', 'ssh_user': '', 'ssh_key': ''}) + services_tenant = resource.create('glance_keystone_tenant', 'resources/keystone_tenant', {'tenant_name': 'services'}) - glance_keystone_user = resource.create('glance_keystone_user', 'resources/keystone_user', {'user_name': 'glance_admin', 'user_password': 'password1234', 'tenant_name': 'service_admins', 'role_name': 'glance_admin', 'keystone_host': '', 'keystone_port': '', 'admin_token': '', 'ip': '', 'ssh_key': '', 'ssh_user': ''}) - glance_keystone_role = resource.create('glance_keystone_role', 'resources/keystone_role', {'keystone_host': '', 'keystone_port': '', 'login_user': 'admin', 'admin_token': '', 'tenant_name': '', 'user_name': '', 'role_name': 'admin', 'ip': '', 'ssh_user': '', 'ssh_key': ''}) + glance_keystone_user = resource.create('glance_keystone_user', 'resources/keystone_user', {'user_name': 'glance_admin', 'user_password': 'password1234', 'tenant_name': 'service_admins'}) + glance_keystone_role = resource.create('glance_keystone_role', 'resources/keystone_role', {'role_name': 'admin'}) # TODO: add api_host and registry_host -- they can be different! Currently 'ip' is used. - glance_config = resource.create('glance_config', 'resources/glance_config/', {'ip': '', 'ssh_key': '', 'ssh_user': '', 'keystone_ip': '', 'keystone_port': '', 'config_dir': {}, 'api_port': 9393, 'registry_port': '', 'mysql_ip': '', 'mysql_db': '', 'mysql_user': '', 'mysql_password': '', 'keystone_admin_user': '', 'keystone_admin_password': '', 'keystone_admin_port': '', 'keystone_admin_tenant': ''}) - glance_api_container = resource.create('glance_api_container', 'resources/glance_api_service/', {'image': 'cgenie/centos-rdo-glance-api', 'ports': [{'value': [{'value': 9393}]}], 'host_binds': [], 'volume_binds': [], 'db_password': '', 'keystone_password': '', 'keystone_admin_token': '', 'keystone_host': '', 'ip': '', 'ssh_key': '', 'ssh_user': ''}) - glance_registry_container = resource.create('glance_registry_container', 'resources/glance_registry_service/', {'image': 'cgenie/centos-rdo-glance-registry', 'ports': [{'value': [{'value': 9191}]}], 'host_binds': [], 'volume_binds': [], 'db_host': '', 'db_root_password': '', 'db_password': '', 'db_name': '', 'db_user': '', 'keystone_admin_tenant': '', 'keystone_password': '', 'keystone_user': '', 'keystone_admin_token': '', 'keystone_host': '', 'ip': '', 'ssh_key': '', 'ssh_user': ''}) + glance_config = resource.create('glance_config', 'resources/glance_config/', {'api_port': 9393}) + glance_api_container = resource.create('glance_api_container', 'resources/glance_api_service/', {'image': 'cgenie/centos-rdo-glance-api', 'ports': [{'value': [{'value': 9393}]}], 'host_binds': [], 'volume_binds': []}) + glance_registry_container = resource.create('glance_registry_container', 'resources/glance_registry_service/', {'image': 'cgenie/centos-rdo-glance-registry', 'ports': [{'value': [{'value': 9191}]}], 'host_binds': [], 'volume_binds': []}) # TODO: admin_port should be refactored, we need to rethink docker # container resource and make it common for all # resources used in this demo - glance_api_endpoint = resource.create('glance_api_endpoint', 'resources/keystone_service_endpoint/', {'ip': '', 'ssh_key': '', 'ssh_user': '', 'admin_port': '', 'admin_token': '', 'adminurl': 'http://{{ip}}:{{admin_port}}', 'internalurl': 'http://{{ip}}:{{port}}', 'publicurl': 'http://{{ip}}:{{port}}', 'description': 'OpenStack Image Service', 'keystone_host': '', 'keystone_port': '', 'name': 'glance', 'port': '', 'type': 'image'}) + glance_api_endpoint = resource.create('glance_api_endpoint', 'resources/keystone_service_endpoint/', {'adminurl': 'http://{{ip}}:{{admin_port}}', 'internalurl': 'http://{{ip}}:{{port}}', 'publicurl': 'http://{{ip}}:{{port}}', 'description': 'OpenStack Image Service', 'type': 'image'}) # TODO: ports value 9393 is a HACK -- fix glance_api_container's port and move to some config # TODO: glance registry container's API port needs to point to haproxy_config haproxy_glance_api_config = resource.create('haproxy_glance_api_config', 'resources/haproxy_service_config/', {'name': 'glance_api_config', 'listen_port': 9292, 'servers': [], 'ports':[{'value': 9393}]}) - admin_tenant = resource.create('admin_tenant', 'resources/keystone_tenant', {'keystone_host': '', 'keystone_port': '', 'login_user': 'admin', 'admin_token': '', 'tenant_name': 'admin', 'ip': '', 'ssh_user': '', 'ssh_key': ''}) - admin_user = resource.create('admin_user', 'resources/keystone_user', {'keystone_host': '', 'keystone_port': '', 'login_user': 'admin', 'admin_token': '', 'tenant_name': '', 'user_name': 'admin', 'user_password': 'admin', 'ip': '', 'ssh_user': '', 'ssh_key': ''}) - admin_role = resource.create('admin_role', 'resources/keystone_role', {'keystone_host': '', 'keystone_port': '', 'login_user': 'admin', 'admin_token': '', 'tenant_name': '', 'user_name': '', 'role_name': 'admin', 'ip': '', 'ssh_user': '', 'ssh_key': ''}) - keystone_service_endpoint = resource.create('keystone_service_endpoint', 'resources/keystone_service_endpoint/', {'ip': '', 'ssh_key': '', 'ssh_user': '', 'admin_port': '', 'admin_token': '', 'adminurl': 'http://{{ip}}:{{admin_port}}/v2.0', 'internalurl': 'http://{{ip}}:{{port}}/v2.0', 'publicurl': 'http://{{ip}}:{{port}}/v2.0', 'description': 'OpenStack Identity Service', 'keystone_host': '', 'keystone_port': '', 'name': 'keystone', 'port': '', 'type': 'identity'}) + admin_tenant = resource.create('admin_tenant', 'resources/keystone_tenant', {'tenant_name': 'admin'}) + admin_user = resource.create('admin_user', 'resources/keystone_user', {'user_name': 'admin', 'user_password': 'admin'}) + admin_role = resource.create('admin_role', 'resources/keystone_role', {'role_name': 'admin'}) + keystone_service_endpoint = resource.create('keystone_service_endpoint', 'resources/keystone_service_endpoint/', {'adminurl': 'http://{{ip}}:{{admin_port}}/v2.0', 'internalurl': 'http://{{ip}}:{{port}}/v2.0', 'publicurl': 'http://{{ip}}:{{port}}/v2.0', 'description': 'OpenStack Identity Service', 'type': 'identity'}) #### @@ -180,8 +180,6 @@ def deploy(): signals.connect(haproxy_glance_api_config, glance_api_endpoint, {'listen_port': 'admin_port'}) signals.connect(haproxy_glance_api_config, glance_api_endpoint, {'listen_port': 'port'}) - signals.Connections.flush() - has_errors = False for r in locals().values(): diff --git a/resources/glance_api_service/actions/run.yml b/resources/glance_api_service/actions/run.yml index 38d24259..137ba277 100644 --- a/resources/glance_api_service/actions/run.yml +++ b/resources/glance_api_service/actions/run.yml @@ -24,4 +24,4 @@ {% endif %} - name: wait for glance api - wait_for: host={{ ip }} port=9393 timeout=20 + wait_for: host={{ ip }} port={{ ports.value[0]['value']['value'] }} timeout=20 diff --git a/resources/keystone_service/meta.yaml b/resources/keystone_service/meta.yaml index 0a24811d..1afa7f4c 100644 --- a/resources/keystone_service/meta.yaml +++ b/resources/keystone_service/meta.yaml @@ -4,7 +4,7 @@ version: 1.0.0 input: image: schema: str! - value: kollaglue/centos-rdo-k-keystone + value: kollaglue/centos-rdo-j-keystone config_dir: schema: str! value: /etc/solar/keystone diff --git a/run_tests.sh b/run_tests.sh index 531344d0..fa31e702 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -20,7 +20,6 @@ pip install -r solar/requirements.txt --download-cache=/tmp/$JOB_NAME pushd solar/solar -PYTHONPATH=$WORKSPACE/solar CONFIG_FILE=$CONFIG_FILE python test/test_signals.py -PYTHONPATH=$WORKSPACE/solar CONFIG_FILE=$CONFIG_FILE python test/test_validation.py +PYTHONPATH=$WORKSPACE/solar CONFIG_FILE=$CONFIG_FILE py.test test/ popd diff --git a/solar/requirements.txt b/solar/requirements.txt index 670b4eaf..a3d48744 100644 --- a/solar/requirements.txt +++ b/solar/requirements.txt @@ -10,3 +10,5 @@ mock dictdiffer==0.4.0 enum34==1.0.4 redis==2.10.3 +pytest +fakeredis diff --git a/solar/solar/cli.py b/solar/solar/cli.py index 1518718f..adfa69c7 100644 --- a/solar/solar/cli.py +++ b/solar/solar/cli.py @@ -31,7 +31,6 @@ from solar import state from solar.core import actions from solar.core import resource as sresource from solar.core.resource import assign_resources_to_nodes -from solar.core.resource import connect_resources from solar.core import signals from solar.core.tags_set_parser import Expression from solar.interfaces.db import get_db @@ -71,7 +70,9 @@ def assign(resources, nodes): lambda r: Expression(resources, r.get('tags', [])).evaluate(), _get_resources_list()) - print("For {0} nodes assign {1} resources".format(len(nodes), len(resources))) + click.echo( + "For {0} nodes assign {1} resources".format(len(nodes), len(resources)) + ) assign_resources_to_nodes(resources, nodes) @@ -129,7 +130,7 @@ def init_changes(): @changes.command() def stage(): log = operations.stage_changes() - print log.show() + click.echo(log.show()) @changes.command() @click.option('--one', is_flag=True, default=False) @@ -142,7 +143,7 @@ def init_changes(): @changes.command() @click.option('--limit', default=5) def history(limit): - print state.CL().show() + click.echo(state.CL().show()) @changes.command() @click.option('--last', is_flag=True, default=False) @@ -150,11 +151,11 @@ def init_changes(): @click.option('--uid', default=None) def rollback(last, all, uid): if last: - print operations.rollback_last() + click.echo(operations.rollback_last()) elif all: - print operations.rollback_all() + click.echo(operations.rollback_all()) elif uid: - print operations.rollback_uid(uid) + click.echo(operations.rollback_uid(uid)) def init_cli_connect(): @@ -163,11 +164,11 @@ def init_cli_connect(): @click.argument('receiver') @click.option('--mapping', default=None) def connect(mapping, receiver, emitter): - print 'Connect', emitter, receiver + click.echo('Connect {} to {}'.format(emitter, receiver)) emitter = sresource.load(emitter) receiver = sresource.load(receiver) - print emitter - print receiver + click.echo(emitter) + click.echo(receiver) if mapping is not None: mapping = json.loads(mapping) signals.connect(emitter, receiver, mapping=mapping) @@ -176,11 +177,11 @@ def init_cli_connect(): @click.argument('emitter') @click.argument('receiver') def disconnect(receiver, emitter): - print 'Disconnect', emitter, receiver + click.echo('Disconnect {} from {}'.format(emitter, receiver)) emitter = sresource.load(emitter) receiver = sresource.load(receiver) - print emitter - print receiver + click.echo(emitter) + click.echo(receiver) signals.disconnect(emitter, receiver) @@ -191,13 +192,42 @@ def init_cli_connections(): @connections.command() def show(): - print json.dumps(signals.CLIENTS, indent=2) + def format_resource_input(resource_name, resource_input_name): + return '{}::{}'.format( + #click.style(resource_name, fg='white', bold=True), + resource_name, + click.style(resource_input_name, fg='yellow') + ) + + def show_emitter_connections(emitter_name, destinations): + inputs = sorted(destinations) + + for emitter_input in inputs: + click.echo( + '{} -> {}'.format( + format_resource_input(emitter_name, emitter_input), + '[{}]'.format( + ', '.join( + format_resource_input(*r) + for r in destinations[emitter_input] + ) + ) + ) + ) + + clients = signals.Connections.read_clients() + keys = sorted(clients) + for emitter_name in keys: + show_emitter_connections(emitter_name, clients[emitter_name]) # TODO: this requires graphing libraries @connections.command() - def graph(): + @click.option('--start-with', default=None) + @click.option('--end-with', default=None) + def graph(end_with, start_with): #g = xs.connection_graph() - g = signals.detailed_connection_graph() + g = signals.detailed_connection_graph(start_with=start_with, + end_with=end_with) nx.write_dot(g, 'graph.dot') subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png']) @@ -215,7 +245,7 @@ def init_cli_deployment_config(): @main.command() @click.argument('filepath') def deploy(filepath): - print 'Deploying from file {}'.format(filepath) + click.echo('Deploying from file {}'.format(filepath)) xd.deploy(filepath) @@ -228,7 +258,9 @@ def init_cli_resource(): @click.argument('resource_path') @click.argument('action_name') def action(action_name, resource_path): - print 'action', resource_path, action_name + click.echo( + 'action {} for resource {}'.format(action_name, resource_path) + ) r = sresource.load(resource_path) actions.resource_action(r, action_name) @@ -237,7 +269,7 @@ def init_cli_resource(): @click.argument('base_path') @click.argument('args') def create(args, base_path, name): - print 'create', name, base_path, args + click.echo('create {} {} {}'.format(name, base_path, args)) args = json.loads(args) sresource.create(name, base_path, args) @@ -275,7 +307,7 @@ def init_cli_resource(): @click.argument('tag_name') @click.option('--add/--delete', default=True) def tag(add, tag_name, resource_path): - print 'Tag', resource_path, tag_name, add + click.echo('Tag {} with {} {}'.format(resource_path, tag_name, add)) r = sresource.load(resource_path) if add: r.add_tag(tag_name) diff --git a/solar/solar/core/deployment.py b/solar/solar/core/deployment.py index e67f622a..aebe464f 100644 --- a/solar/solar/core/deployment.py +++ b/solar/solar/core/deployment.py @@ -5,6 +5,7 @@ import shutil import yaml from solar.core import db +from solar.core.log import log from solar.core import resource as xr from solar.core import signals as xs @@ -27,7 +28,7 @@ def deploy(filename): name = resource_definition['name'] model = os.path.join(workdir, resource_definition['model']) args = resource_definition.get('args', {}) - print 'Creating ', name, model, resource_save_path, args + log.debug('Creating %s %s %s %s', name, model, resource_save_path, args) xr.create(name, model, resource_save_path, args=args) # Create resource connections @@ -35,11 +36,11 @@ def deploy(filename): emitter = db.get_resource(connection['emitter']) receiver = db.get_resource(connection['receiver']) mapping = connection.get('mapping') - print 'Connecting ', emitter.name, receiver.name, mapping + log.debug('Connecting %s %s %s', emitter.name, receiver.name, mapping) xs.connect(emitter, receiver, mapping=mapping) # Run all tests if 'test-suite' in config: - print 'Running tests from {}'.format(config['test-suite']) + log.debug('Running tests from %s', config['test-suite']) test_suite = __import__(config['test-suite'], {}, {}, ['main']) test_suite.main() diff --git a/solar/solar/core/handlers/ansible.py b/solar/solar/core/handlers/ansible.py index 7ce7f456..41b9d33a 100644 --- a/solar/solar/core/handlers/ansible.py +++ b/solar/solar/core/handlers/ansible.py @@ -2,23 +2,24 @@ import os import subprocess +from solar.core.log import log from solar.core.handlers.base import BaseHandler -from solar.state import STATES class Ansible(BaseHandler): def action(self, resource, action_name): inventory_file = self._create_inventory(resource) playbook_file = self._create_playbook(resource, action_name) - print 'inventory_file', inventory_file - print 'playbook_file', playbook_file + log.debug('inventory_file: %s', inventory_file) + log.debug('playbook_file: %s', playbook_file) call_args = ['ansible-playbook', '--module-path', '/vagrant/library', '-i', inventory_file, playbook_file] - print 'EXECUTING: ', ' '.join(call_args) + log.debug('EXECUTING: %s', ' '.join(call_args)) try: subprocess.check_output(call_args) except subprocess.CalledProcessError as e: - print e.output + log.error(e.output) + log.exception(e) raise def _create_inventory(self, r): @@ -31,11 +32,8 @@ class Ansible(BaseHandler): def _render_inventory(self, r): inventory = '{0} ansible_ssh_host={1} ansible_connection=ssh ansible_ssh_user={2} ansible_ssh_private_key_file={3}' host, user, ssh_key = r.args['ip'].value, r.args['ssh_user'].value, r.args['ssh_key'].value - print host - print user - print ssh_key inventory = inventory.format(host, host, user, ssh_key) - print inventory + log.debug(inventory) return inventory def _create_playbook(self, resource, action): diff --git a/solar/solar/core/handlers/base.py b/solar/solar/core/handlers/base.py index 914a6a58..c2bebf1f 100644 --- a/solar/solar/core/handlers/base.py +++ b/solar/solar/core/handlers/base.py @@ -5,6 +5,8 @@ import tempfile from jinja2 import Template +from solar.core.log import log + class BaseHandler(object): def __init__(self, resources): @@ -19,7 +21,7 @@ class BaseHandler(object): return self def __exit__(self, type, value, traceback): - print self.dst + log.debug(self.dst) return shutil.rmtree(self.dst) @@ -33,10 +35,11 @@ class BaseHandler(object): return dest_file def _render_action(self, resource, action): - print 'Rendering %s %s' % (resource.name, action) + log.debug('Rendering %s %s', resource.name, action) action_file = resource.metadata['actions'][action] action_file = os.path.join(resource.metadata['actions_path'], action_file) + log.debug('action file: %s', action_file) args = self._make_args(resource) with open(action_file) as f: diff --git a/solar/solar/core/log.py b/solar/solar/core/log.py new file mode 100644 index 00000000..fe9ebaae --- /dev/null +++ b/solar/solar/core/log.py @@ -0,0 +1,22 @@ +import logging +import sys + + +log = logging.getLogger('solar') + + +def setup_logger(): + handler = logging.FileHandler('solar.log') + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s (%(filename)s::%(lineno)s)::%(message)s') + handler.setFormatter(formatter) + log.addHandler(handler) + + print_formatter = logging.Formatter('%(levelname)s (%(filename)s::%(lineno)s)::%(message)s') + print_handler = logging.StreamHandler(stream=sys.stdout) + print_handler.setFormatter(print_formatter) + log.addHandler(print_handler) + + log.setLevel(logging.DEBUG) + +setup_logger() \ No newline at end of file diff --git a/solar/solar/core/observer.py b/solar/solar/core/observer.py index 3ff35230..0442e239 100644 --- a/solar/solar/core/observer.py +++ b/solar/solar/core/observer.py @@ -1,3 +1,4 @@ +from solar.core.log import log from solar.core import signals from solar.interfaces.db import get_db @@ -14,27 +15,28 @@ class BaseObserver(object): :param value: :return: """ - self.attached_to = attached_to + self._attached_to_name = attached_to.name self.name = name self.value = value - self.receivers = [] - # @property - # def receivers(self): - # from solar.core import resource - # - # signals.CLIENTS = signals.Connections.read_clients() - # for receiver_name, receiver_input in signals.Connections.receivers( - # self.attached_to.name, - # self.name - # ): - # yield resource.load(receiver_name).args[receiver_input] + @property + def attached_to(self): + from solar.core import resource - def log(self, msg): - print '{} {}'.format(self, msg) + return resource.load(self._attached_to_name) + + @property + def receivers(self): + from solar.core import resource + + for receiver_name, receiver_input in signals.Connections.receivers( + self._attached_to_name, + self.name + ): + yield resource.load(receiver_name).args[receiver_input] def __repr__(self): - return '[{}:{}] {}'.format(self.attached_to.name, self.name, self.value) + return '[{}:{}] {}'.format(self._attached_to_name, self.name, self.value) def __unicode__(self): return unicode(self.value) @@ -61,7 +63,7 @@ class BaseObserver(object): def find_receiver(self, receiver): fltr = [r for r in self.receivers - if r.attached_to == receiver.attached_to + if r._attached_to_name == receiver._attached_to_name and r.name == receiver.name] if fltr: return fltr[0] @@ -71,12 +73,11 @@ class BaseObserver(object): :param receiver: Observer :return: """ - self.log('Subscribe {}'.format(receiver)) + log.debug('Subscribe %s', receiver) # No multiple subscriptions if self.find_receiver(receiver): - self.log('No multiple subscriptions from {}'.format(receiver)) + log.error('No multiple subscriptions from %s', receiver) return - self.receivers.append(receiver) receiver.subscribed(self) signals.Connections.add( @@ -89,16 +90,15 @@ class BaseObserver(object): receiver.notify(self) def subscribed(self, emitter): - self.log('Subscribed {}'.format(emitter)) + log.debug('Subscribed %s', emitter) def unsubscribe(self, receiver): """ :param receiver: Observer :return: """ - self.log('Unsubscribe {}'.format(receiver)) + log.debug('Unsubscribe %s', receiver) if self.find_receiver(receiver): - self.receivers.remove(receiver) receiver.unsubscribed(self) signals.Connections.remove( @@ -112,41 +112,42 @@ class BaseObserver(object): #receiver.notify(self) def unsubscribed(self, emitter): - self.log('Unsubscribed {}'.format(emitter)) + log.debug('Unsubscribed %s', emitter) class Observer(BaseObserver): type_ = 'simple' - def __init__(self, *args, **kwargs): - super(Observer, self).__init__(*args, **kwargs) - self.emitter = None + @property + def emitter(self): + from solar.core import resource + + emitter = signals.Connections.emitter(self._attached_to_name, self.name) + + if emitter is not None: + emitter_name, emitter_input_name = emitter + return resource.load(emitter_name).args[emitter_input_name] def notify(self, emitter): - self.log('Notify from {} value {}'.format(emitter, emitter.value)) + log.debug('Notify from %s value %s', emitter, emitter.value) # Copy emitter's values to receiver self.value = emitter.value for receiver in self.receivers: receiver.notify(self) - self.attached_to.save() + self.attached_to.set_args_from_dict({self.name: self.value}) def update(self, value): - self.log('Updating to value {}'.format(value)) + log.debug('Updating to value %s', value) self.value = value for receiver in self.receivers: receiver.notify(self) - self.attached_to.save() + self.attached_to.set_args_from_dict({self.name: self.value}) def subscribed(self, emitter): super(Observer, self).subscribed(emitter) # Simple observer can be attached to at most one emitter if self.emitter is not None: self.emitter.unsubscribe(self) - self.emitter = emitter - - def unsubscribed(self, emitter): - super(Observer, self).unsubscribed(emitter) - self.emitter = None class ListObserver(BaseObserver): @@ -159,41 +160,42 @@ class ListObserver(BaseObserver): def _format_value(emitter): return { 'emitter': emitter.name, - 'emitter_attached_to': emitter.attached_to.name, + 'emitter_attached_to': emitter._attached_to_name, 'value': emitter.value, } def notify(self, emitter): - self.log('Notify from {} value {}'.format(emitter, emitter.value)) + log.debug('Notify from %s value %s', emitter, emitter.value) # Copy emitter's values to receiver - #self.value[emitter.attached_to.name] = emitter.value idx = self._emitter_idx(emitter) self.value[idx] = self._format_value(emitter) for receiver in self.receivers: receiver.notify(self) - self.attached_to.save() + self.attached_to.set_args_from_dict({self.name: self.value}) def subscribed(self, emitter): super(ListObserver, self).subscribed(emitter) idx = self._emitter_idx(emitter) if idx is None: self.value.append(self._format_value(emitter)) + self.attached_to.set_args_from_dict({self.name: self.value}) def unsubscribed(self, emitter): """ :param receiver: Observer :return: """ - self.log('Unsubscribed emitter {}'.format(emitter)) + log.debug('Unsubscribed emitter %s', emitter) idx = self._emitter_idx(emitter) self.value.pop(idx) + self.attached_to.set_args_from_dict({self.name: self.value}) for receiver in self.receivers: receiver.notify(self) def _emitter_idx(self, emitter): try: return [i for i, e in enumerate(self.value) - if e['emitter_attached_to'] == emitter.attached_to.name + if e['emitter_attached_to'] == emitter._attached_to_name ][0] except IndexError: return diff --git a/solar/solar/core/resource.py b/solar/solar/core/resource.py index b3e7babf..e1e25d75 100644 --- a/solar/solar/core/resource.py +++ b/solar/solar/core/resource.py @@ -4,8 +4,6 @@ import os from copy import deepcopy -import yaml - import solar from solar.core import actions @@ -24,26 +22,60 @@ class Resource(object): def __init__(self, name, metadata, args, tags=None): self.name = name self.metadata = metadata - self.actions = metadata['actions'].keys() if metadata['actions'] else None - self.args = {} - for arg_name, arg_value in args.items(): - if not self.metadata['input'].get(arg_name): - continue + self.tags = tags or [] + self.set_args_from_dict(args) - metadata_arg = self.metadata['input'][arg_name] + @property + def actions(self): + return self.metadata.get('actions') or [] + + @property + def args(self): + ret = {} + + args = self.args_dict() + + for arg_name, metadata_arg in self.metadata['input'].items(): type_ = validation.schema_input_type(metadata_arg.get('schema', 'str')) - value = arg_value - if not value and metadata_arg['value']: - value = metadata_arg['value'] + ret[arg_name] = observer.create( + type_, self, arg_name, args.get(arg_name) + ) - self.args[arg_name] = observer.create(type_, self, arg_name, value) - self.changed = [] - self.tags = tags or [] + return ret + + def args_dict(self): + raw_resource = db.read(self.name, collection=db.COLLECTIONS.resource) + if raw_resource is None: + return {} + + self.metadata = raw_resource + + args = self.metadata['input'] + + return {k: v['value'] for k, v in args.items()} + + def set_args_from_dict(self, new_args): + args = self.args_dict() + args.update(new_args) + + self.metadata['tags'] = self.tags + for k, v in args.items(): + if k not in self.metadata['input']: + raise NotImplementedError( + 'Argument {} not implemented for resource {}'.format(k, self) + ) + + self.metadata['input'][k]['value'] = v + + db.save(self.name, self.metadata, collection=db.COLLECTIONS.resource) + + def set_args(self, args): + self.set_args_from_dict({k: v.value for k, v in args.items()}) def __repr__(self): - return ("Resource(name='{name}', metadata={metadata}, args={args}, " + return ("Resource(name='{id}', metadata={metadata}, args={input}, " "tags={tags})").format(**self.to_dict()) def color_repr(self): @@ -51,8 +83,8 @@ class Resource(object): arg_color = 'yellow' - return ("{resource_s}({name_s}='{name}', {metadata_s}={metadata}, " - "{args_s}={args}, {tags_s}={tags})").format( + return ("{resource_s}({name_s}='{id}', {metadata_s}={metadata}, " + "{args_s}={input}, {tags_s}={tags})").format( resource_s=click.style('Resource', fg='white', bold=True), name_s=click.style('name', fg=arg_color, bold=True), metadata_s=click.style('metadata', fg=arg_color, bold=True), @@ -63,9 +95,9 @@ class Resource(object): def to_dict(self): return { - 'name': self.name, + 'id': self.name, 'metadata': self.metadata, - 'args': self.args_show(), + 'input': self.args_show(), 'tags': self.tags, } @@ -83,9 +115,6 @@ class Resource(object): return {k: formatter(v) for k, v in self.args.items()} - def args_dict(self): - return {k: v.value for k, v in self.args.items()} - def add_tag(self, tag): if tag not in self.tags: self.tags.append(tag) @@ -102,8 +131,10 @@ class Resource(object): :param emitter: Resource :return: """ + r_args = self.args + for key, value in emitter.args.iteritems(): - self.args[key].notify(value) + r_args[key].notify(value) def update(self, args): """This method updates resource's args with a simple dict. @@ -114,8 +145,12 @@ class Resource(object): # Update will be blocked if this resource is listening # on some input that is to be updated -- we should only listen # to the emitter and not be able to change the input's value + r_args = self.args + for key, value in args.iteritems(): - self.args[key].update(value) + r_args[key].update(value) + + self.set_args(r_args) def action(self, action): if action in self.actions: @@ -123,16 +158,6 @@ class Resource(object): else: raise Exception('Uuups, action is not available') - # TODO: versioning - def save(self): - metadata = copy.deepcopy(self.metadata) - - metadata['tags'] = self.tags - for k, v in self.args_dict().items(): - metadata['input'][k]['value'] = v - - db.save(self.name, metadata, collection=db.COLLECTIONS.resource) - def create(name, base_path, args, tags=[], connections={}): if not os.path.exists(base_path): @@ -154,7 +179,6 @@ def create(name, base_path, args, tags=[], connections={}): resource = Resource(name, meta, args, tags=tags) signals.assign_connections(resource, connections) - resource.save() return resource @@ -171,7 +195,7 @@ def load(resource_name): raw_resource = db.read(resource_name, collection=db.COLLECTIONS.resource) if raw_resource is None: - raise NotImplementedError( + raise KeyError( 'Resource {} does not exist'.format(resource_name) ) @@ -185,8 +209,6 @@ def load_all(): resource = wrap_resource(raw_resource) ret[resource.name] = resource - signals.Connections.reconnect_all() - return ret diff --git a/solar/solar/core/signals.py b/solar/solar/core/signals.py index 0b3a911d..f013849f 100644 --- a/solar/solar/core/signals.py +++ b/solar/solar/core/signals.py @@ -1,42 +1,35 @@ # -*- coding: utf-8 -*- -import atexit from collections import defaultdict import itertools import networkx as nx -import os -from solar import utils +from solar.core.log import log from solar.interfaces.db import get_db db = get_db() -CLIENTS_CONFIG_KEY = 'clients-data-file' -#CLIENTS = utils.read_config_file(CLIENTS_CONFIG_KEY) -CLIENTS = {} - - class Connections(object): - """ - CLIENTS structure is: - - emitter_name: - emitter_input_name: - - - dst_name - - dst_input_name - - while DB structure is: - - emitter_name_key: - emitter: emitter_name - sources: - emitter_input_name: - - - dst_name - - dst_input_name - """ - @staticmethod def read_clients(): + """ + Returned structure is: + + emitter_name: + emitter_input_name: + - - dst_name + - dst_input_name + + while DB structure is: + + emitter_name_key: + emitter: emitter_name + sources: + emitter_input_name: + - - dst_name + - dst_input_name + """ + ret = {} for data in db.get_list(collection=db.COLLECTIONS.connection): @@ -45,84 +38,64 @@ class Connections(object): return ret @staticmethod - def save_clients(): - for emitter_name, sources in CLIENTS.items(): - data = { - 'emitter': emitter_name, - 'sources': sources, - } - db.save(emitter_name, data, collection=db.COLLECTIONS.connection) + def save_clients(clients): + data = [] + + for emitter_name, sources in clients.items(): + data.append(( + emitter_name, + { + 'emitter': emitter_name, + 'sources': sources, + })) + + db.save_list(data, collection=db.COLLECTIONS.connection) @staticmethod def add(emitter, src, receiver, dst): if src not in emitter.args: return + clients = Connections.read_clients() + # TODO: implement general circular detection, this one is simple - if [emitter.name, src] in CLIENTS.get(receiver.name, {}).get(dst, []): + if [emitter.name, src] in clients.get(receiver.name, {}).get(dst, []): raise Exception('Attempted to create cycle in dependencies. Not nice.') - CLIENTS.setdefault(emitter.name, {}) - CLIENTS[emitter.name].setdefault(src, []) - if [receiver.name, dst] not in CLIENTS[emitter.name][src]: - CLIENTS[emitter.name][src].append([receiver.name, dst]) + clients.setdefault(emitter.name, {}) + clients[emitter.name].setdefault(src, []) + if [receiver.name, dst] not in clients[emitter.name][src]: + clients[emitter.name][src].append([receiver.name, dst]) - #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) - Connections.save_clients() + Connections.save_clients(clients) @staticmethod def remove(emitter, src, receiver, dst): - CLIENTS[emitter.name][src] = [ - destination for destination in CLIENTS[emitter.name][src] + clients = Connections.read_clients() + + clients[emitter.name][src] = [ + destination for destination in clients[emitter.name][src] if destination != [receiver.name, dst] ] - #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) - Connections.save_clients() - - @staticmethod - def reconnect_all(): - """Reconstruct connections for resource inputs from CLIENTS. - - :return: - """ - from solar.core.resource import wrap_resource - - for emitter_name, dest_dict in CLIENTS.items(): - emitter = wrap_resource( - db.read(emitter_name, collection=db.COLLECTIONS.resource) - ) - for emitter_input, destinations in dest_dict.items(): - for receiver_name, receiver_input in destinations: - receiver = wrap_resource( - db.read(receiver_name, collection=db.COLLECTIONS.resource) - ) - emitter.args[emitter_input].subscribe( - receiver.args[receiver_input]) + Connections.save_clients(clients) @staticmethod def receivers(emitter_name, emitter_input_name): - return CLIENTS.get(emitter_name, {}).get(emitter_input_name, []) + return Connections.read_clients().get(emitter_name, {}).get( + emitter_input_name, [] + ) + + @staticmethod + def emitter(receiver_name, receiver_input_name): + for emitter_name, dest_dict in Connections.read_clients().items(): + for emitter_input_name, destinations in dest_dict.items(): + if [receiver_name, receiver_input_name] in destinations: + return [emitter_name, emitter_input_name] @staticmethod def clear(): - global CLIENTS - - CLIENTS = {} - - path = utils.read_config()[CLIENTS_CONFIG_KEY] - if os.path.exists(path): - os.remove(path) - - @staticmethod - def flush(): - print 'FLUSHING Connections' - #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) - Connections.save_clients() - - -CLIENTS = Connections.read_clients() -#atexit.register(Connections.flush) + db.clear_collection(collection=db.COLLECTIONS.connection) def guess_mapping(emitter, receiver): @@ -162,20 +135,24 @@ def connect(emitter, receiver, mapping=None): emitter.args[src].subscribe(receiver.args[dst]) - receiver.save() + #receiver.save() def disconnect(emitter, receiver): - for src, destinations in CLIENTS[emitter.name].items(): - disconnect_by_src(emitter.name, src, receiver) + clients = Connections.read_clients() + for src, destinations in clients[emitter.name].items(): for destination in destinations: receiver_input = destination[1] if receiver_input in receiver.args: if receiver.args[receiver_input].type_ != 'list': - print 'Removing input {} from {}'.format(receiver_input, receiver.name) + log.debug( + 'Removing input %s from %s', receiver_input, receiver.name + ) emitter.args[src].unsubscribe(receiver.args[receiver_input]) + disconnect_by_src(emitter.name, src, receiver) + def disconnect_receiver_by_input(receiver, input): """Find receiver connection by input and disconnect it. @@ -184,36 +161,42 @@ def disconnect_receiver_by_input(receiver, input): :param input: :return: """ - for emitter_name, inputs in CLIENTS.items(): - emitter = db.read(emitter_name, collection=db.COLLECTIONS.resource) - disconnect_by_src(emitter['id'], input, receiver) + clients = Connections.read_clients() + + for emitter_name, inputs in clients.items(): + disconnect_by_src(emitter_name, input, receiver) def disconnect_by_src(emitter_name, src, receiver): - if src in CLIENTS[emitter_name]: - CLIENTS[emitter_name][src] = [ - destination for destination in CLIENTS[emitter_name][src] + clients = Connections.read_clients() + + if src in clients[emitter_name]: + clients[emitter_name][src] = [ + destination for destination in clients[emitter_name][src] if destination[0] != receiver.name ] - #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) + Connections.save_clients(clients) def notify(source, key, value): - from solar.core.resource import wrap_resource + from solar.core.resource import load - CLIENTS.setdefault(source.name, {}) - print 'Notify', source.name, key, value, CLIENTS[source.name] - if key in CLIENTS[source.name]: - for client, r_key in CLIENTS[source.name][key]: - resource = wrap_resource( - db.read(client, collection=db.COLLECTIONS.resource) - ) - print 'Resource found', client + clients = Connections.read_clients() + + if source.name not in clients: + clients[source.name] = {} + Connections.save_clients(clients) + + log.debug('Notify %s %s %s %s', source.name, key, value, clients[source.name]) + if key in clients[source.name]: + for client, r_key in clients[source.name][key]: + resource = load(client) + log.debug('Resource found: %s', client) if resource: resource.update({r_key: value}, emitter=source) else: - print 'Resource {} deleted?'.format(client) + log.debug('Resource %s deleted?', client) pass @@ -229,38 +212,51 @@ def assign_connections(receiver, connections): def connection_graph(): resource_dependencies = {} - for source, destination_values in CLIENTS.items(): - resource_dependencies.setdefault(source, set()) - for src, destinations in destination_values.items(): - resource_dependencies[source].update([ - destination[0] for destination in destinations - ]) + clients = Connections.read_clients() + + for emitter_name, destination_values in clients.items(): + resource_dependencies.setdefault(emitter_name, set()) + for emitter_input, receivers in destination_values.items(): + resource_dependencies[emitter_name].update( + receiver[0] for receiver in receivers + ) g = nx.DiGraph() # TODO: tags as graph node attributes - for source, destinations in resource_dependencies.items(): - g.add_node(source) - g.add_nodes_from(destinations) + for emitter_name, receivers in resource_dependencies.items(): + g.add_node(emitter_name) + g.add_nodes_from(receivers) g.add_edges_from( itertools.izip( - itertools.repeat(source), - destinations + itertools.repeat(emitter_name), + receivers ) ) return g -def detailed_connection_graph(): +def detailed_connection_graph(start_with=None, end_with=None): g = nx.MultiDiGraph() - for emitter_name, destination_values in CLIENTS.items(): - for emitter_input, receivers in CLIENTS[emitter_name].items(): + clients = Connections.read_clients() + + for emitter_name, destination_values in clients.items(): + for emitter_input, receivers in destination_values.items(): for receiver_name, receiver_input in receivers: - label = emitter_input - if emitter_input != receiver_input: - label = '{}:{}'.format(emitter_input, receiver_input) + label = '{}:{}'.format(emitter_input, receiver_input) g.add_edge(emitter_name, receiver_name, label=label) - return g + ret = g + + if start_with is not None: + ret = g.subgraph( + nx.dfs_postorder_nodes(ret, start_with) + ) + if end_with is not None: + ret = g.subgraph( + nx.dfs_postorder_nodes(ret.reverse(), end_with) + ) + + return ret diff --git a/solar/solar/core/validation.py b/solar/solar/core/validation.py index 01045f8a..df8f4196 100644 --- a/solar/solar/core/validation.py +++ b/solar/solar/core/validation.py @@ -1,4 +1,6 @@ -from jsonschema import validate, ValidationError, SchemaError +from jsonschema import validate, ValidationError + +from solar.core.log import log def schema_input_type(schema): @@ -86,9 +88,10 @@ def validate_input(value, jsonschema=None, schema=None): validate(value, jsonschema) except ValidationError as e: return [e.message] - except: - print 'jsonschema', jsonschema - print 'value', value + except Exception as e: + log.error('jsonschema: %s', jsonschema) + log.error('value: %s', value) + log.exception(e) raise diff --git a/solar/solar/interfaces/db/__init__.py b/solar/solar/interfaces/db/__init__.py index 92f519a6..306c3013 100644 --- a/solar/solar/interfaces/db/__init__.py +++ b/solar/solar/interfaces/db/__init__.py @@ -1,11 +1,10 @@ -from solar.interfaces.db.cached_file_system_db import CachedFileSystemDB -from solar.interfaces.db.file_system_db import FileSystemDB + from solar.interfaces.db.redis_db import RedisDB +from solar.interfaces.db.redis_db import FakeRedisDB mapping = { - 'cached_file_system': CachedFileSystemDB, - 'file_system': FileSystemDB, 'redis_db': RedisDB, + 'fakeredis_db': FakeRedisDB } DB = None diff --git a/solar/solar/interfaces/db/cached_file_system_db.py b/solar/solar/interfaces/db/cached_file_system_db.py deleted file mode 100644 index 7955bae7..00000000 --- a/solar/solar/interfaces/db/cached_file_system_db.py +++ /dev/null @@ -1,106 +0,0 @@ -from solar.third_party.dir_dbm import DirDBM - -import atexit -import os -import types -import yaml - -from solar import utils -from solar import errors - - -class CachedFileSystemDB(DirDBM): - STORAGE_PATH = utils.read_config()['file-system-db']['storage-path'] - RESOURCE_COLLECTION_NAME = 'resource' - - _CACHE = {} - - def __init__(self): - utils.create_dir(self.STORAGE_PATH) - super(CachedFileSystemDB, self).__init__(self.STORAGE_PATH) - self.entities = {} - - atexit.register(self.flush) - - def __setitem__(self, k, v): - """ - C{dirdbm[k] = v} - Create or modify a textfile in this directory - @type k: strings @param k: key to setitem - @type v: strings @param v: value to associate with C{k} - """ - assert type(k) == types.StringType, "DirDBM key must be a string" - # NOTE: Can be not a string if _writeFile in the child is redefined - # assert type(v) == types.StringType, "DirDBM value must be a string" - k = self._encode(k) - - # we create a new file with extension .new, write the data to it, and - # if the write succeeds delete the old file and rename the new one. - old = os.path.join(self.dname, k) - try: - self._writeFile(old, v) - except: - raise - - def get_resource(self, uid): - return self[self._make_key(self.RESOURCE_COLLECTION_NAME, uid)] - - def get_obj_resource(self, uid): - from solar.core.resource import wrap_resource - raw_resource = self[self._make_key(self.RESOURCE_COLLECTION_NAME, uid)] - - return wrap_resource(raw_resource) - - def add_resource(self, uid, resource): - self[self._make_key(self.RESOURCE_COLLECTION_NAME, uid)] = resource - - def store(self, collection, obj): - if 'id' in obj: - self[self._make_key(collection, obj['id'])] = obj - else: - raise errors.CannotFindID('Cannot find id for object {0}'.format(obj)) - - def store_list(self, collection, objs): - for obj in objs: - self.store(collection, obj) - - def get_list(self, collection): - collection_keys = filter( - lambda k: k.startswith('{0}-'.format(collection)), - self.keys()) - - return map(lambda k: self[k], collection_keys) - - def get_record(self, collection, _id): - key = self._make_key(collection, _id) - if key not in self: - return None - - return self[key] - - def _make_key(self, collection, _id): - return '{0}-{1}'.format(collection, _id) - - def _readFile(self, path): - if path not in self._CACHE: - data = yaml.load(super(CachedFileSystemDB, self)._readFile(path)) - self._CACHE[path] = data - return data - - return self._CACHE[path] - - def _writeFile(self, path, data): - self._CACHE[path] = data - - def _encode(self, key): - """Override method of the parent not to use base64 as a key for encoding""" - return key - - def _decode(self, key): - """Override method of the parent not to use base64 as a key for encoding""" - return key - - def flush(self): - print 'FLUSHING DB' - for path, data in self._CACHE.items(): - super(CachedFileSystemDB, self)._writeFile(path, yaml.dump(data)) diff --git a/solar/solar/interfaces/db/file_system_db.py b/solar/solar/interfaces/db/file_system_db.py deleted file mode 100644 index 4c3c733e..00000000 --- a/solar/solar/interfaces/db/file_system_db.py +++ /dev/null @@ -1,70 +0,0 @@ -from solar.third_party.dir_dbm import DirDBM - -import yaml - -from solar import utils -from solar import errors - - -class FileSystemDB(DirDBM): - STORAGE_PATH = utils.read_config()['file-system-db']['storage-path'] - RESOURCE_COLLECTION_NAME = 'resource' - - def __init__(self): - utils.create_dir(self.STORAGE_PATH) - super(FileSystemDB, self).__init__(self.STORAGE_PATH) - self.entities = {} - - def get_resource(self, uid): - return self[self._make_key(self.RESOURCE_COLLECTION_NAME, uid)] - - def get_obj_resource(self, uid): - if not uid in self.entities: - from solar.core.resource import wrap_resource - raw_resource = self[self._make_key(self.RESOURCE_COLLECTION_NAME, uid)] - self.entities[uid] = wrap_resource(raw_resource) - return self.entities[uid] - - def add_resource(self, uid, resource): - self[self._make_key(self.RESOURCE_COLLECTION_NAME, uid)] = resource - - def store(self, collection, obj): - if 'id' in obj: - self[self._make_key(collection, obj['id'])] = obj - else: - raise errors.CannotFindID('Cannot find id for object {0}'.format(obj)) - - def store_list(self, collection, objs): - for obj in objs: - self.store(collection, obj) - - def get_list(self, collection): - collection_keys = filter( - lambda k: k.startswith('{0}-'.format(collection)), - self.keys()) - - return map(lambda k: self[k], collection_keys) - - def get_record(self, collection, _id): - key = self._make_key(collection, _id) - if key not in self: - return None - - return self[key] - - def _make_key(self, collection, _id): - return '{0}-{1}'.format(collection, _id) - - def _readFile(self, path): - return yaml.load(super(FileSystemDB, self)._readFile(path)) - - def _writeFile(self, path, data): - return super(FileSystemDB, self)._writeFile(path, utils.yaml_dump(data)) - - def _encode(self, key): - """Override method of the parent not to use base64 as a key for encoding""" - return key - - def _decode(self, key): - """Override method of the parent not to use base64 as a key for encoding""" - return key diff --git a/solar/solar/interfaces/db/redis_db.py b/solar/solar/interfaces/db/redis_db.py index 3c43ccb7..337f0b76 100644 --- a/solar/solar/interfaces/db/redis_db.py +++ b/solar/solar/interfaces/db/redis_db.py @@ -1,6 +1,7 @@ from enum import Enum import json import redis +import fakeredis from solar import utils from solar import errors @@ -15,9 +16,11 @@ class RedisDB(object): 'host': 'localhost', 'port': 6379, } + REDIS_CLIENT = redis.StrictRedis + def __init__(self): - self._r = redis.StrictRedis(**self.DB) + self._r = self.REDIS_CLIENT(**self.DB) self.entities = {} def read(self, uid, collection=COLLECTIONS.resource): @@ -28,20 +31,57 @@ class RedisDB(object): except TypeError: return None + def get_list(self, collection=COLLECTIONS.resource): + key_glob = self._make_key(collection, '*') + + keys = self._r.keys(key_glob) + + with self._r.pipeline() as pipe: + pipe.multi() + + values = [self._r.get(key) for key in keys] + + pipe.execute() + + for value in values: + yield json.loads(value) + def save(self, uid, data, collection=COLLECTIONS.resource): - return self._r.set( + ret = self._r.set( self._make_key(collection, uid), json.dumps(data) ) - def get_list(self, collection=COLLECTIONS.resource): - key_glob = self._make_key(collection, '*') + return ret - for key in self._r.keys(key_glob): - yield json.loads(self._r.get(key)) + def save_list(self, lst, collection=COLLECTIONS.resource): + with self._r.pipeline() as pipe: + pipe.multi() + + for uid, data in lst: + key = self._make_key(collection, uid) + pipe.set(key, json.dumps(data)) + + pipe.execute() def clear(self): self._r.flushdb() + def clear_collection(self, collection=COLLECTIONS.resource): + key_glob = self._make_key(collection, '*') + + self._r.delete(self._r.keys(key_glob)) + + def delete(self, uid, collection=COLLECTIONS.resource): + self._r.delete(self._make_key(collection, uid)) + def _make_key(self, collection, _id): + if isinstance(collection, self.COLLECTIONS): + collection = collection.name + return '{0}:{1}'.format(collection, _id) + + +class FakeRedisDB(RedisDB): + + REDIS_CLIENT = fakeredis.FakeStrictRedis diff --git a/solar/solar/operations.py b/solar/solar/operations.py index a3e753fc..55d6475e 100644 --- a/solar/solar/operations.py +++ b/solar/solar/operations.py @@ -1,6 +1,7 @@ from solar import state +from solar.core.log import log from solar.core import signals from solar.core import resource from solar import utils @@ -42,39 +43,40 @@ def connections(res, graph): def to_dict(resource, graph): - return {'uid': resource.name, - 'tags': resource.tags, - 'args': resource.args_dict(), - 'connections': connections(resource, graph)} + res = resource.to_dict() + res['connections'] = connections(resource, graph) + return res -def stage_changes(): - resources = resource.load_all() - conn_graph = signals.detailed_connection_graph() +def create_diff(staged, commited): + if 'connections' in commited: + commited['connections'].sort() + staged['connections'].sort() + if 'tags' in commited: + commited['tags'].sort() + staged['tags'].sort() + + return list(diff(commited, staged)) + + +def _stage_changes(staged_resources, conn_graph, + commited_resources, staged_log): - commited = state.CD() - log = state.SL() action = None try: srt = nx.topological_sort(conn_graph) except: for cycle in nx.simple_cycles(conn_graph): - print 'CYCLE: %s' % cycle + log.debug('CYCLE: %s', cycle) raise for res_uid in srt: - commited_data = commited.get(res_uid, {}) - staged_data = to_dict(resources[res_uid], conn_graph) + commited_data = commited_resources.get(res_uid, {}) + staged_data = staged_resources.get(res_uid, {}) - if 'connections' in commited_data: - commited_data['connections'].sort() - staged_data['connections'].sort() - if 'tags' in commited_data: - commited_data['tags'].sort() - staged_data['tags'].sort() + df = create_diff(staged_data, commited_data) - df = list(diff(commited_data, staged_data)) if df: log_item = state.LogItem( @@ -82,11 +84,21 @@ def stage_changes(): res_uid, df, guess_action(commited_data, staged_data)) - log.add(log_item) - return log + staged_log.append(log_item) + return staged_log + + +def stage_changes(): + conn_graph = signals.detailed_connection_graph() + staged = {r.name: to_dict(r, conn_graph) for r in resource.load_all().values()} + commited = state.CD() + log = state.SL() + log.delete() + return _stage_changes(staged, conn_graph, commited, log) def execute(res, action): + return state.STATES.success try: actions.resource_action(res, action) return state.STATES.success @@ -94,22 +106,15 @@ def execute(res, action): return state.STATES.error -def commit(li, resources): - commited = state.CD() - history = state.CL() - staged = state.SL() +def commit(li, resources, commited, history): staged_res = resources[li.res] - staged_data = patch(li.diff, commited.get(li.res, {})) # TODO(dshulyak) think about this hack for update if li.action == 'update': - commited_res = resource.Resource( - staged_res.name, - staged_res.metadata, - commited[li.res]['args'], - commited[li.res]['tags']) + commited_res = resource.wrap_resource( + commited[li.res]['metadata']) result_state = execute(commited_res, 'remove') if result_state is state.STATES.success: @@ -123,16 +128,19 @@ def commit(li, resources): commited[li.res] = staged_data li.state = result_state - history.add(li) + history.append(li) if result_state is state.STATES.error: raise Exception('Failed') def commit_one(): + commited = state.CD() + history = state.CL() staged = state.SL() + resources = resource.load_all() - commit(staged.popleft(), resources) + commit(staged.popleft(), resources, commited, history) def commit_changes(): @@ -143,7 +151,7 @@ def commit_changes(): resources = resource.load_all() while staged: - commit(staged.popleft(), resources) + commit(staged.popleft(), resources, commited, history) def rollback(log_item): @@ -160,18 +168,17 @@ def rollback(log_item): for e, r, mapping in staged.get('connections', ()): signals.connect(resources[e], resources[r], dict([mapping])) - df = list(diff(commited, staged)) + df = create_diff(staged, commited) log_item = state.LogItem( utils.generate_uuid(), log_item.res, df, guess_action(commited, staged)) - log.add(log_item) + log.append(log_item) res = resource.load(log_item.res) - res.update(staged.get('args', {})) - res.save() + res.set_args_from_dict(staged['input']) - return log + return log_item def rollback_uid(uid): diff --git a/solar/solar/state.py b/solar/solar/state.py index 0f3af6c2..d6c6d7c9 100644 --- a/solar/solar/state.py +++ b/solar/solar/state.py @@ -83,6 +83,10 @@ class Log(object): l['diff'], l['action'], getattr(STATES, l['state'])) for l in items]) + def delete(self): + self.items = deque() + db.delete(self.path, db.COLLECTIONS.state_log) + def sync(self): db.save( self.path, @@ -90,7 +94,7 @@ class Log(object): collection=db.COLLECTIONS.state_log ) - def add(self, logitem): + def append(self, logitem): self.items.append(logitem) self.sync() @@ -108,6 +112,9 @@ class Log(object): return ['L(uuid={0}, res={1}, action={2})'.format( l.uid, l.res, l.action) for l in self.items] + def __len__(self): + return len(self.items) + def __repr__(self): return 'Log({0})'.format(self.path) diff --git a/solar/solar/test/conftest.py b/solar/solar/test/conftest.py new file mode 100644 index 00000000..8e3f45d4 --- /dev/null +++ b/solar/solar/test/conftest.py @@ -0,0 +1,21 @@ + +from pytest import fixture + +from solar.interfaces import db + + +def pytest_configure(): + db.DB = db.mapping['fakeredis_db']() + + +@fixture(autouse=True) +def cleanup(request): + + def fin(): + from solar.core import signals + + db.get_db().clear() + signals.Connections.clear() + + request.addfinalizer(fin) + diff --git a/solar/solar/test/test_diff_generation.py b/solar/solar/test/test_diff_generation.py new file mode 100644 index 00000000..f89b4e71 --- /dev/null +++ b/solar/solar/test/test_diff_generation.py @@ -0,0 +1,108 @@ + +from pytest import fixture +from dictdiffer import revert, patch +import networkx as nx + +from solar import operations +from solar.core.resource import wrap_resource + + +@fixture +def staged(): + return {'id': 'res.1', + 'tags': ['res', 'node.1'], + 'input': {'ip': {'value': '10.0.0.2'}, + 'list_val': {'value': [1, 2]}}, + 'metadata': {}, + 'connections': [ + ['node.1', 'res.1', ['ip', 'ip']], + ['node.1', 'res.1', ['key', 'key']]] + } + +@fixture +def commited(): + return {'id': 'res.1', + 'tags': ['res', 'node.1'], + 'input': {'ip': '10.0.0.2', + 'list_val': [1]}, + 'metadata': {}, + 'connections': [ + ['node.1', 'res.1', ['ip', 'ip']]] + } + +@fixture +def full_diff(staged): + return operations.create_diff(staged, {}) + + +@fixture +def diff_for_update(staged, commited): + return operations.create_diff(staged, commited) + + +def test_create_diff_with_empty_commited(full_diff): + # add will be executed + expected = [('add', '', [('connections', [['node.1', 'res.1', ['ip', 'ip']], ['node.1', 'res.1', ['key', 'key']]]), ('input', {'ip': {'value': '10.0.0.2'}, 'list_val': {'value': [1, 2]}}), ('metadata', {}), ('id', 'res.1'), ('tags', ['res', 'node.1'])])] + assert full_diff == expected + + +def test_create_diff_modified(diff_for_update): + assert diff_for_update == [ + ('add', 'connections', + [(1, ['node.1', 'res.1', ['key', 'key']])]), + ('change', 'input.ip', ('10.0.0.2', {'value': '10.0.0.2'})), + ('change', 'input.list_val', ([1], {'value': [1, 2]}))] + + +def test_verify_patch_creates_expected(staged, diff_for_update, commited): + expected = patch(diff_for_update, commited) + assert expected == staged + + +def test_revert_update(staged, diff_for_update, commited): + expected = revert(diff_for_update, staged) + assert expected == commited + + +@fixture +def resources(): + r = {'n.1': + {'uid': 'n.1', + 'args': {'ip': '10.20.0.2'}, + 'connections': [], + 'tags': []}, + 'r.1': + {'uid': 'r.1', + 'args': {'ip': '10.20.0.2'}, + 'connections': [['n.1', 'r.1', ['ip', 'ip']]], + 'tags': []}, + 'h.1': + {'uid': 'h.1', + 'args': {'ip': '10.20.0.2', + 'ips': ['10.20.0.2']}, + 'connections': [['n.1', 'h.1', ['ip', 'ip']]], + 'tags': []}} + return r + +@fixture +def conn_graph(): + edges = [ + ('n.1', 'r.1', {'label': 'ip:ip'}), + ('n.1', 'h.1', {'label': 'ip:ip'}), + ('r.1', 'h.1', {'label': 'ip:ips'}) + ] + mdg = nx.MultiDiGraph() + mdg.add_edges_from(edges) + return mdg + + +def test_stage_changes(resources, conn_graph): + commited = {} + log = operations._stage_changes(resources, conn_graph, commited, []) + + assert len(log) == 3 + assert [l.res for l in log] == ['n.1', 'r.1', 'h.1'] + + +def test_resource_fixture(staged): + res = wrap_resource(staged) diff --git a/solar/solar/test/test_resource.py b/solar/solar/test/test_resource.py new file mode 100644 index 00000000..5d7165ff --- /dev/null +++ b/solar/solar/test/test_resource.py @@ -0,0 +1,67 @@ +import unittest + +import base + +from solar.core import resource +from solar.core import signals + + +class TestResource(base.BaseResourceTest): + def test_resource_args(self): + sample_meta_dir = self.make_resource_meta(""" +id: sample +handler: ansible +version: 1.0.0 +input: + value: + schema: int + value: 0 + """) + + sample1 = self.create_resource( + 'sample1', sample_meta_dir, {'value': 1} + ) + self.assertEqual(sample1.args['value'].value, 1) + + # test default value + sample2 = self.create_resource('sample2', sample_meta_dir, {}) + self.assertEqual(sample2.args['value'].value, 0) + + def test_connections_recreated_after_load(self): + """ + Create resource in some process. Then in other process load it. + All connections should remain the same. + """ + sample_meta_dir = self.make_resource_meta(""" +id: sample +handler: ansible +version: 1.0.0 +input: + value: + schema: int + value: 0 + """) + + def creating_process(): + sample1 = self.create_resource( + 'sample1', sample_meta_dir, {'value': 1} + ) + sample2 = self.create_resource( + 'sample2', sample_meta_dir, {} + ) + signals.connect(sample1, sample2) + self.assertEqual(sample1.args['value'], sample2.args['value']) + + creating_process() + + signals.CLIENTS = {} + + sample1 = resource.load('sample1') + sample2 = resource.load('sample2') + + sample1.update({'value': 2}) + self.assertEqual(sample1.args['value'], sample2.args['value']) + + +if __name__ == '__main__': + unittest.main() diff --git a/solar/solar/test/test_signals.py b/solar/solar/test/test_signals.py index 45862fdf..30295bff 100644 --- a/solar/solar/test/test_signals.py +++ b/solar/solar/test/test_signals.py @@ -26,7 +26,7 @@ input: xs.connect(sample1, sample2) self.assertEqual( sample1.args['values'], - sample2.args['values'], + sample2.args['values'] ) self.assertEqual( sample2.args['values'].emitter, @@ -135,7 +135,7 @@ input: xs.connect(sample1, sample) self.assertEqual(sample1.args['ip'], sample.args['ip']) - self.assertEqual(len(sample1.args['ip'].receivers), 1) + self.assertEqual(len(list(sample1.args['ip'].receivers)), 1) self.assertEqual( sample.args['ip'].emitter, sample1.args['ip'] @@ -144,7 +144,7 @@ input: xs.connect(sample2, sample) self.assertEqual(sample2.args['ip'], sample.args['ip']) # sample should be unsubscribed from sample1 and subscribed to sample2 - self.assertEqual(len(sample1.args['ip'].receivers), 0) + self.assertEqual(len(list(sample1.args['ip'].receivers)), 0) self.assertEqual(sample.args['ip'].emitter, sample2.args['ip']) sample2.update({'ip': '10.0.0.3'}) @@ -387,10 +387,10 @@ input: 'sample2', sample_meta_dir, {'ip': '10.0.0.2', 'port': '1001'} ) list_input = self.create_resource( - 'list-input', list_input_meta_dir, {'ips': [], 'ports': []} + 'list-input', list_input_meta_dir, {} ) list_input_nested = self.create_resource( - 'list-input-nested', list_input_nested_meta_dir, {'ipss': [], 'portss': []} + 'list-input-nested', list_input_nested_meta_dir, {} ) xs.connect(sample1, list_input, mapping={'ip': 'ips', 'port': 'ports'}) diff --git a/solar/solar/test/test_stage_commit_procedure.py b/solar/solar/test/test_stage_commit_procedure.py new file mode 100644 index 00000000..6ac6aba8 --- /dev/null +++ b/solar/solar/test/test_stage_commit_procedure.py @@ -0,0 +1,73 @@ + +import pytest + +from solar.core import resource +from solar import operations +from solar import state + + +@pytest.fixture +def default_resources(): + from solar.core import signals + from solar.core import resource + + node1 = resource.wrap_resource( + {'id': 'node1', + 'input': {'ip': {'value':'10.0.0.3'}}}) + rabbitmq_service1 = resource.wrap_resource( + {'id':'rabbitmq', 'input': { + 'ip' : {'value': ''}, + 'image': {'value': 'rabbitmq:3-management'}}}) + signals.connect(node1, rabbitmq_service1) + return resource.load_all() + + +@pytest.mark.usefixtures("default_resources") +def test_changes_on_update_image(): + log = operations.stage_changes() + + assert len(log) == 2 + + operations.commit_changes() + + rabbitmq = resource.load('rabbitmq') + rabbitmq.update({'image': 'different'}) + log = operations.stage_changes() + + assert len(log) == 1 + + item = log.items[0] + + assert item.diff == [ + ('change', u'input.image.value', + (u'rabbitmq:3-management', u'different')), + ('change', u'metadata.input.image.value', + (u'rabbitmq:3-management', u'different'))] + + assert item.action == 'update' + + operations.commit_changes() + + commited = state.CD() + + assert commited['rabbitmq']['input']['image'] == { + u'emitter': None, u'value': u'different'} + + reverse = operations.rollback(state.CL().items[-1]) + + assert reverse.diff == [ + ('change', u'input.image.value', + (u'different', u'rabbitmq:3-management')), + ('change', u'metadata.input.image.value', + (u'different', u'rabbitmq:3-management'))] + + operations.commit_changes() + + commited = state.CD() + + assert commited['rabbitmq']['input']['image'] == { + u'emitter': None, u'value': u'rabbitmq:3-management'} + + + + diff --git a/solar/solar/test/test_update_propagated_data.py b/solar/solar/test/test_update_propagated_data.py new file mode 100644 index 00000000..007e4109 --- /dev/null +++ b/solar/solar/test/test_update_propagated_data.py @@ -0,0 +1,114 @@ +import pytest + +from solar.core import signals +from solar.core import resource +from solar import operations + +@pytest.fixture +def resources(): + + node1 = resource.wrap_resource( + {'id': 'node1', + 'input': {'ip': {'value': '10.0.0.3'}}}) + mariadb_service1 = resource.wrap_resource( + {'id': 'mariadb', 'input': { + 'port' : {'value': 3306}, + 'ip': {'value': ''}}}) + keystone_db = resource.wrap_resource( + {'id':'keystone_db', + 'input': { + 'login_port' : {'value': ''}, + 'ip': {'value': ''}}}) + signals.connect(node1, mariadb_service1) + signals.connect(node1, keystone_db) + signals.connect(mariadb_service1, keystone_db, {'port': 'login_port'}) + return resource.load_all() + + +def test_update_port_on_mariadb(resources): + operations.stage_changes() + operations.commit_changes() + + mariadb = resources['mariadb'] + + mariadb.update({'port': 4400}) + + log = operations.stage_changes() + + assert len(log) == 2 + + mariadb_log = log.items[0] + + assert mariadb_log.diff == [ + ('change', u'input.port.value', (3306, 4400)), + ('change', u'metadata.input.port.value', (3306, 4400))] + + keystone_db_log = log.items[1] + + assert keystone_db_log.diff == [ + ('change', u'input.login_port.value', (3306, 4400)), + ('change', u'metadata.input.login_port.value', (3306, 4400))] + + +@pytest.fixture +def list_input(): + res1 = resource.wrap_resource( + {'id': 'res1', 'input': {'ip': {'value': '10.10.0.2'}}}) + res2 = resource.wrap_resource( + {'id': 'res2', 'input': {'ip': {'value': '10.10.0.3'}}}) + consumer = resource.wrap_resource( + {'id': 'consumer', 'input': + {'ips': {'value': [], + 'schema': ['str']}}}) + + signals.connect(res1, consumer, {'ip': 'ips'}) + signals.connect(res2, consumer, {'ip': 'ips'}) + return resource.load_all() + + +@pytest.mark.xfail +def test_update_list_resource(list_input): + operations.stage_changes() + operations.commit_changes() + + res3 = resource.wrap_resource( + {'id': 'res3', 'input': {'ip': {'value': '10.10.0.4'}}}) + signals.connect(res3, list_input['consumer'], {'ip': 'ips'}) + + log = operations.stage_changes() + + assert len(log) == 2 + + assert log.items[0].res == res3.name + assert log.items[1].diff == [ + ('add', u'connections', [(2, ['res3', u'consumer', ['ip', 'ips']])]), + ('add', u'input.ips', [ + (2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})]), + ('add', u'metadata.input.ips.value', + [(2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})])] + + operations.commit_changes() + assert list_input['consumer'].args_dict() == { + u'ips': [ + {u'emitter_attached_to': u'res1', u'emitter': u'ip', u'value': u'10.10.0.2'}, + {u'emitter_attached_to': u'res2', u'emitter': u'ip', u'value': u'10.10.0.3'}, + {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'}]} + + log_item = operations.rollback_last() + assert log_item.diff == [ + ('remove', u'connections', [(2, ['res3', u'consumer', ['ip', 'ips']])]), + ('remove', u'input.ips', [ + (2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})]), + ('remove', u'metadata.input.ips.value', + [(2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})])] + + consumer = resource.load('consumer') + assert consumer.args_dict() == { + u'ips': [{u'emitter': u'ip', + u'emitter_attached_to': u'res1', + u'value': u'10.10.0.2'}, + {u'emitter': u'ip', + u'emitter_attached_to': u'res2', + u'value': u'10.10.0.3'}]} + + diff --git a/solar/solar/third_party/__init__.py b/solar/solar/third_party/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/solar/solar/third_party/dir_dbm.py b/solar/solar/third_party/dir_dbm.py deleted file mode 100644 index cf7a4624..00000000 --- a/solar/solar/third_party/dir_dbm.py +++ /dev/null @@ -1,297 +0,0 @@ -# -*- test-case-name: twisted.test.test_dirdbm -*- -# -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - - - -""" -DBM-style interface to a directory. -Each key is stored as a single file. This is not expected to be very fast or -efficient, but it's good for easy debugging. -DirDBMs are *not* thread-safe, they should only be accessed by one thread attacheda time. -No files should be placed in the working directory of a DirDBM save those -created by the DirDBM itself! -Maintainer: Itamar Shtull-Trauring -""" - - -import os -import types -import base64 -import glob - -try: - import cPickle as pickle -except ImportError: - import pickle - -try: - _open -except NameError: - _open = open - - -class DirDBM(object): - """A directory with a DBM interface. - - This class presents a hash-like interface to a directory of small, - flat files. It can only use strings as keys or values. - """ - - def __init__(self, name): - """ - @type name: strings - @param name: Base path to use for the directory storage. - """ - self.dname = os.path.abspath(name) - if not os.path.isdir(self.dname): - os.mkdir(self.dname) - else: - # Run recovery, in case we crashed. we delete all files ending - # with ".new". Then we find all files who end with ".rpl". If about - # corresponding file exists without ".rpl", we assume the write - # failed and delete the ".rpl" file. If only a ".rpl" exist we - # assume the program crashed right after deleting the old entry - # but before renaming the replacement entry. - # - # NOTE: '.' is NOT in the base64 alphabet! - for f in glob.glob(os.path.join(self.dname, "*.new")): - os.remove(f) - replacements = glob.glob(os.path.join(self.dname, "*.rpl")) - for f in replacements: - old = f[:-4] - if os.path.exists(old): - os.remove(f) - else: - os.rename(f, old) - - def _encode(self, k): - """Encode a key so it can be used as a filename. - """ - # NOTE: '_' is NOT in the base64 alphabet! - return base64.encodestring(k).replace('\n', '_').replace("/", "-") - - def _decode(self, k): - """Decode a filename to get the key. - """ - return base64.decodestring(k.replace('_', '\n').replace("-", "/")) - - def _readFile(self, path): - """Read in the contents of a file. - - Override in subclasses to e.g. provide transparently encrypted dirdbm. - """ - f = _open(path, "rb") - s = f.read() - f.close() - return s - - def _writeFile(self, path, data): - """Write data to a file. - - Override in subclasses to e.g. provide transparently encrypted dirdbm. - """ - f = _open(path, "wb") - f.write(data) - f.flush() - f.close() - - def __len__(self): - """ - @return: The number of key/value pairs in this Shelf - """ - return len(os.listdir(self.dname)) - - def __setitem__(self, k, v): - """ - C{dirdbm[k] = v} - Create or modify a textfile in this directory - @type k: strings @param k: key to setitem - @type v: strings @param v: value to associate with C{k} - """ - assert type(k) == types.StringType, "DirDBM key must be a string" - # NOTE: Can be not a string if _writeFile in the child is redefined - # assert type(v) == types.StringType, "DirDBM value must be a string" - k = self._encode(k) - - # we create a new file with extension .new, write the data to it, and - # if the write succeeds delete the old file and rename the new one. - old = os.path.join(self.dname, k) - if os.path.exists(old): - new = old + ".rpl" # replacement entry - else: - new = old + ".new" # new entry - try: - self._writeFile(new, v) - except: - os.remove(new) - raise - else: - if os.path.exists(old): os.remove(old) - os.rename(new, old) - - def __getitem__(self, k): - """ - C{dirdbm[k]} - Get the contents of a file in this directory as a string. - - @type k: string @param k: key to lookup - - @return: The value associated with C{k} - @raise KeyError: Raised when there is no such keyerror - """ - assert type(k) == types.StringType, "DirDBM key must be a string" - path = os.path.join(self.dname, self._encode(k)) - try: - return self._readFile(path) - except Exception as exc: - raise KeyError, k - - def __delitem__(self, k): - """ - C{del dirdbm[foo]} - Delete a file in this directory. - - @type k: string - @param k: key to delete - - @raise KeyError: Raised when there is no such keyerror - """ - assert type(k) == types.StringType, "DirDBM key must be a string" - k = self._encode(k) - try: os.remove(os.path.join(self.dname, k)) - except (OSError, IOError): raise KeyError(self._decode(k)) - - def keys(self): - """ - @return: a C{list} of filenames (keys). - """ - return map(self._decode, os.listdir(self.dname)) - - def values(self): - """ - @return: a C{list} of file-contents (values). - """ - vals = [] - keys = self.keys() - for key in keys: - vals.append(self[key]) - return vals - - def items(self): - """ - @return: a C{list} of 2-tuples containing key/value pairs. - """ - items = [] - keys = self.keys() - for key in keys: - items.append((key, self[key])) - return items - - def has_key(self, key): - """ - @type key: string - @param key: The key to test - - @return: A true value if this dirdbm has the specified key, a faluse - value otherwise. - """ - assert type(key) == types.StringType, "DirDBM key must be a string" - key = self._encode(key) - return os.path.isfile(os.path.join(self.dname, key)) - - def setdefault(self, key, value): - """ - @type key: string - @param key: The key to lookup - - @param value: The value to associate with key if key is not already - associated with a value. - """ - if not self.has_key(key): - self[key] = value - return value - return self[key] - - def get(self, key, default = None): - """ - @type key: string - @param key: The key to lookup - - @param default: The value to return if the given key does not exist - @return: The value associated with C{key} or C{default} if note - C{self.has_key(key)} - """ - if self.has_key(key): - return self[key] - else: - return default - - def __contains__(self, key): - """ - C{key in dirdbm} - @type key: string - @param key: The key to test - - @return: A true value if C{self.has_key(key)}, a false value otherwise. - """ - assert type(key) == types.StringType, "DirDBM key must be a string" - key = self._encode(key) - return os.path.isfile(os.path.join(self.dname, key)) - - def update(self, dict): - """ - Add all the key/value pairs in C{dict} to this dirdbm. Any conflicting - keys will be overwritten with the values from C{dict}. - @type dict: mapping - @param dict: A mapping of key/value pairs to add to this dirdbm. - """ - for key, val in dict.items(): - self[key]=valid - - def copyTo(self, path): - """ - Copy the contents of this dirdbm to the dirdbm at C{path}. - - @type path: C{str} - @param path: The path of the dirdbm to copy to. If a dirdbm - exists at the destination path, it is cleared first. - - @rtype: C{DirDBM} - @return: The dirdbm this dirdbm was copied to. - """ - path = os.path.abspath(path) - assert path != self.dname - - d = self.__class__(path) - d.clear() - for k in self.keys(): - d[k] = self[k] - return data - - def clear(self): - """ - Delete all key/value pairs in this dirdbm. - """ - for k in self.keys(): - del self[k] - - def close(self): - """ - Close this dbm: no-op, for dbm-style interface compliance. - """ - - def getModificationTime(self, key): - """ - Returns modification time of an entry. - - @return: Last modification date (seconds since epoch) of entry C{key} - @raise KeyError: Raised when there is no such keyerror - """ - assert type(key) == types.StringType, "DirDBM key must be a string" - path = os.path.join(self.dname, self._encode(key)) - if os.path.isfile(path): - return os.path.getmtime(path) - else: - raise KeyError, key