diff --git a/simple-deployment.yaml b/simple-deployment.yaml new file mode 100755 index 00000000..7360183d --- /dev/null +++ b/simple-deployment.yaml @@ -0,0 +1,43 @@ +# HAProxy deployment with MariaDB, Keystone and Nova + +workdir: /vagrant +resource-save-path: rs/ +#test-suite: haproxy_deployment.haproxy_deployment + +resources: + - name: node1 + model: x/resources/ro_node/ + args: + ip: 10.0.0.3 + ssh_key: /vagrant/.vagrant/machines/solar-dev2/virtualbox/private_key + ssh_user: vagrant + + - name: keystone1 + model: x/resources/keystone/ + args: + ip: + image: TEST + ssh_user: + ssh_key: + + - name: haproxy_keystone_config + model: x/resources/haproxy_config/ + args: + listen_port: 5000 + ports: {} + servers: {} + + +connections: + - emitter: node1 + receiver: keystone1 + + # Multiple subscription test + - emitter: node1 + receiver: keystone1 + + - emitter: keystone1 + receiver: haproxy_keystone_config + mapping: + ip: servers + port: ports diff --git a/x/observer.py b/x/observer.py new file mode 100644 index 00000000..a8d0c546 --- /dev/null +++ b/x/observer.py @@ -0,0 +1,106 @@ +class BaseObserver(object): + type_ = None + + def __init__(self, attached_to, name, value): + """ + :param attached_to: resource.Resource + :param name: + :param value: + :return: + """ + self.attached_to = attached_to + self.name = name + self.value = value + self.receivers = [] + + def log(self, msg): + print '{} {}'.format(self, msg) + + def __repr__(self): + return '[{}:{}]'.format(self.attached_to.name, self.name) + + def notify(self, emitter): + """ + :param emitter: Observer + :return: + """ + raise NotImplementedError + + def update(self, value): + """ + :param value: + :return: + """ + raise NotImplementedError + + def subscribe(self, receiver): + """ + :param receiver: Observer + :return: + """ + self.log('Subscribe {}'.format(receiver)) + # No multiple subscriptions + fltr = [r for r in self.receivers + if r.attached_to == receiver.attached_to + and r.name == receiver.name] + if fltr: + self.log('No multiple subscriptions from {}'.format(receiver)) + return + self.receivers.append(receiver) + receiver.notify(self) + + def unsubscribe(self, receiver): + """ + :param receiver: Observer + :return: + """ + self.log('Unsubscribe {}'.format(receiver)) + self.receivers.remove(receiver) + # TODO: ? + #receiver.notify(self) + + +class Observer(BaseObserver): + type_ = 'simple' + + def __init__(self, *args, **kwargs): + super(Observer, self).__init__(*args, **kwargs) + # TODO: + # Simple observer can be attached to at most one emitter + self.emitter = None + + def notify(self, emitter): + self.log('Notify from {} value {}'.format(emitter, emitter.value)) + self.value = emitter.value + for receiver in self.receivers: + receiver.notify(self) + self.attached_to.save() + + def update(self, value): + self.log('Updating to value {}'.format(value)) + self.value = value + for receiver in self.receivers: + receiver.notify(self) + self.attached_to.save() + + def subscribe(self, receiver): + # TODO: + super(Observer, self).subscribe(receiver) + + +class ListObserver(BaseObserver): + type_ = 'list' + + def notify(self, emitter): + self.log('Notify from {} value {}'.format(emitter, emitter.value)) + self.value[emitter.attached_to.name] = emitter.value + for receiver in self.receivers: + receiver.notify(self) + self.attached_to.save() + + +def create(type_, *args, **kwargs): + for klass in BaseObserver.__subclasses__(): + if klass.type_ == type_: + return klass(*args, **kwargs) + raise NotImplementedError('No handling class for type {}'.format(type_)) diff --git a/x/resource.py b/x/resource.py index 895105d1..8603bf12 100644 --- a/x/resource.py +++ b/x/resource.py @@ -1,14 +1,15 @@ # -*- coding: UTF-8 -*- +import copy import json import os import shutil import yaml -import actions -import signals -import db - +from x import actions +from x import db +from x import observer +from x import signals from x import utils @@ -20,7 +21,10 @@ class Resource(object): self.actions = metadata['actions'].keys() if metadata['actions'] else None self.requires = metadata['input'].keys() self._validate_args(args, metadata['input']) - self.args = args + self.args = {} + for arg_name, arg_value in args.items(): + type_ = metadata.get('input-types', {}).get(arg_name, 'simple') + self.args[arg_name] = observer.create(type_, self, arg_name, arg_value) self.metadata['input'] = args self.input_types = metadata.get('input-types', {}) self.changed = [] @@ -30,10 +34,13 @@ class Resource(object): return ("Resource('name={0}', metadata={1}, args={2}, " "base_dir='{3}', tags={4})").format(self.name, json.dumps(self.metadata), - json.dumps(self.args), + json.dumps(self.args_dict()), self.base_dir, self.tags) + def args_dict(self): + return {k: v.value for k, v in self.args.items()} + def add_tag(self, tag): if tag not in self.tags: self.tags.append(tag) @@ -44,18 +51,27 @@ class Resource(object): except ValueError: pass - def update(self, args, emitter=None): - for key, value in args.iteritems(): - if self.input_types.get(key, '') == 'list': - if emitter is None: - raise Exception('I need to know the emitter when updating input of list type') - self.args[key][emitter.name] = value - else: - self.args[key] = value - self.changed.append(key) - signals.notify(self, key, value) + def notify(self, emitter): + """Update resource's args from emitter's args. - self.save() + :param emitter: Resource + :return: + """ + for key, value in emitter.args.iteritems(): + self.args[key].notify(value) + + def update(self, args): + """This method updates resource's args with a simple dict. + + :param args: + :return: + """ + # Update will be blocked if this resource is listening + # on some input that is to be updated -- we should only listen + # to the emitter and not be able to change the input's value + + for key, value in args.iteritems(): + self.args[key].update(value) def action(self, action): if action in self.actions: @@ -75,11 +91,14 @@ class Resource(object): # TODO: versioning def save(self): - self.metadata['tags'] = self.tags + metadata = copy.deepcopy(self.metadata) + + metadata['tags'] = self.tags + metadata['args'] = self.args_dict() meta_file = os.path.join(self.base_dir, 'meta.yaml') with open(meta_file, 'w') as f: - f.write(yaml.dump(self.metadata)) + f.write(yaml.dump(metadata)) def create(name, base_path, dest_path, args, connections={}): @@ -111,6 +130,7 @@ def create(name, base_path, dest_path, args, connections={}): shutil.copytree(base_path, dest_path) resource.save() db.resource_add(name, resource) + return resource @@ -136,4 +156,6 @@ def load_all(dest_path): resource = load(resource_path) ret[resource.name] = resource + signals.reconnect_all() + return ret diff --git a/x/signals.py b/x/signals.py index 7e6270c7..6679831a 100644 --- a/x/signals.py +++ b/x/signals.py @@ -62,17 +62,36 @@ def connect(emitter, receiver, mapping=None): connect_src_dst(emitter, src, receiver, dst) receiver.save() - utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) def connect_src_dst(emitter, src, receiver, dst): + if src not in emitter.args: + return + CLIENTS.setdefault(emitter.name, {}) CLIENTS[emitter.name].setdefault(src, []) CLIENTS[emitter.name][src].append((receiver.name, dst)) + emitter.args[src].subscribe(receiver.args[dst]) + # Copy emitter's values to receiver - if src in emitter.args: - receiver.update({dst: emitter.args[src]}, emitter=emitter) + #receiver.update({dst: emitter.args[src]}, emitter=emitter) + + utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) + + +def reconnect_all(): + """Reconstruct connections for resource inputs from CLIENTS. + + :return: + """ + for emitter_name, dest_dict in CLIENTS.items(): + emitter = db.get_resource(emitter_name) + for emitter_input, destinations in dest_dict.items(): + for receiver_name, receiver_input in destinations: + receiver = db.get_resource(receiver_name) + receiver.args[receiver_input].subscribe( + emitter.args[emitter_input]) def disconnect(emitter, receiver): diff --git a/x/test/test_signals.py b/x/test/test_signals.py index 26c49ded..c5ef67da 100644 --- a/x/test/test_signals.py +++ b/x/test/test_signals.py @@ -23,8 +23,8 @@ input: ) xs.connect(sample1, sample2) self.assertItemsEqual( - sample1.args['values'], - sample2.args['values'], + sample1.args['values'].value, + sample2.args['values'].value, ) @@ -59,18 +59,18 @@ input-types: xs.connect(sample1, list_input_single, mapping={'ip': 'ips'}) self.assertItemsEqual( - list_input_single.args['ips'], + list_input_single.args['ips'].value, { - 'sample1': sample1.args['ip'], + 'sample1': sample1.args['ip'].value, } ) xs.connect(sample2, list_input_single, mapping={'ip': 'ips'}) self.assertItemsEqual( - list_input_single.args['ips'], + list_input_single.args['ips'].value, { - 'sample1': sample1.args['ip'], - 'sample2': sample2.args['ip'], + 'sample1': sample1.args['ip'].value, + 'sample2': sample2.args['ip'].value, } ) @@ -107,31 +107,31 @@ input-types: xs.connect(sample1, list_input_multi, mapping={'ip': 'ips', 'port': 'ports'}) self.assertItemsEqual( - list_input_multi.args['ips'], + list_input_multi.args['ips'].value, { - 'sample1': sample1.args['ip'], + 'sample1': sample1.args['ip'].value, } ) self.assertItemsEqual( - list_input_multi.args['ports'], + list_input_multi.args['ports'].value, { - 'sample1': sample1.args['port'], + 'sample1': sample1.args['port'].value, } ) xs.connect(sample2, list_input_multi, mapping={'ip': 'ips', 'port': 'ports'}) self.assertItemsEqual( - list_input_multi.args['ips'], + list_input_multi.args['ips'].value, { - 'sample1': sample1.args['ip'], - 'sample2': sample2.args['ip'], + 'sample1': sample1.args['ip'].value, + 'sample2': sample2.args['ip'].value, } ) self.assertItemsEqual( - list_input_multi.args['ports'], + list_input_multi.args['ports'].value, { - 'sample1': sample1.args['port'], - 'sample2': sample2.args['port'], + 'sample1': sample1.args['port'].value, + 'sample2': sample2.args['port'].value, } )