From 8aa0f6247a488aa49c3a5e3883d4ed40f623e61b Mon Sep 17 00:00:00 2001 From: Przemyslaw Kaminski Date: Wed, 10 Jun 2015 16:02:14 +0200 Subject: [PATCH] Redis: get rid of global CLIENTS variable Now Connections are read from Redis on demand. --- solar/solar/cli.py | 2 +- solar/solar/core/observer.py | 2 +- solar/solar/core/resource.py | 2 +- solar/solar/core/signals.py | 167 +++++++++++--------------- solar/solar/interfaces/db/redis_db.py | 8 ++ solar/solar/test/test_resource.py | 38 +++++- 6 files changed, 117 insertions(+), 102 deletions(-) diff --git a/solar/solar/cli.py b/solar/solar/cli.py index 1518718f..8f216ca1 100644 --- a/solar/solar/cli.py +++ b/solar/solar/cli.py @@ -191,7 +191,7 @@ def init_cli_connections(): @connections.command() def show(): - print json.dumps(signals.CLIENTS, indent=2) + print json.dumps(signals.Connections.read_clients(), indent=2) # TODO: this requires graphing libraries @connections.command() diff --git a/solar/solar/core/observer.py b/solar/solar/core/observer.py index 655cf582..e09a964a 100644 --- a/solar/solar/core/observer.py +++ b/solar/solar/core/observer.py @@ -28,7 +28,7 @@ class BaseObserver(object): def receivers(self): from solar.core import resource - signals.CLIENTS = signals.Connections.read_clients() + #signals.CLIENTS = signals.Connections.read_clients() for receiver_name, receiver_input in signals.Connections.receivers( self._attached_to_name, self.name diff --git a/solar/solar/core/resource.py b/solar/solar/core/resource.py index d8e08121..51c4645a 100644 --- a/solar/solar/core/resource.py +++ b/solar/solar/core/resource.py @@ -196,7 +196,7 @@ def load(resource_name): raw_resource = db.read(resource_name, collection=db.COLLECTIONS.resource) if raw_resource is None: - raise NotImplementedError( + raise KeyError( 'Resource {} does not exist'.format(resource_name) ) diff --git a/solar/solar/core/signals.py b/solar/solar/core/signals.py index a67cf172..37209b59 100644 --- a/solar/solar/core/signals.py +++ b/solar/solar/core/signals.py @@ -1,42 +1,34 @@ # -*- coding: utf-8 -*- -import atexit from collections import defaultdict import itertools import networkx as nx -import os -from solar import utils from solar.interfaces.db import get_db db = get_db() -CLIENTS_CONFIG_KEY = 'clients-data-file' -#CLIENTS = utils.read_config_file(CLIENTS_CONFIG_KEY) -CLIENTS = {} - - class Connections(object): - """ - CLIENTS structure is: - - emitter_name: - emitter_input_name: - - - dst_name - - dst_input_name - - while DB structure is: - - emitter_name_key: - emitter: emitter_name - sources: - emitter_input_name: - - - dst_name - - dst_input_name - """ - @staticmethod def read_clients(): + """ + Returned structure is: + + emitter_name: + emitter_input_name: + - - dst_name + - dst_input_name + + while DB structure is: + + emitter_name_key: + emitter: emitter_name + sources: + emitter_input_name: + - - dst_name + - dst_input_name + """ + ret = {} for data in db.get_list(collection=db.COLLECTIONS.connection): @@ -45,8 +37,8 @@ class Connections(object): return ret @staticmethod - def save_clients(): - for emitter_name, sources in CLIENTS.items(): + def save_clients(clients): + for emitter_name, sources in clients.items(): data = { 'emitter': emitter_name, 'sources': sources, @@ -58,78 +50,46 @@ class Connections(object): if src not in emitter.args: return + clients = Connections.read_clients() + # TODO: implement general circular detection, this one is simple - if [emitter.name, src] in CLIENTS.get(receiver.name, {}).get(dst, []): + if [emitter.name, src] in clients.get(receiver.name, {}).get(dst, []): raise Exception('Attempted to create cycle in dependencies. Not nice.') - CLIENTS.setdefault(emitter.name, {}) - CLIENTS[emitter.name].setdefault(src, []) - if [receiver.name, dst] not in CLIENTS[emitter.name][src]: - CLIENTS[emitter.name][src].append([receiver.name, dst]) + clients.setdefault(emitter.name, {}) + clients[emitter.name].setdefault(src, []) + if [receiver.name, dst] not in clients[emitter.name][src]: + clients[emitter.name][src].append([receiver.name, dst]) - #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) - Connections.save_clients() + Connections.save_clients(clients) @staticmethod def remove(emitter, src, receiver, dst): - CLIENTS[emitter.name][src] = [ - destination for destination in CLIENTS[emitter.name][src] + clients = Connections.read_clients() + + clients[emitter.name][src] = [ + destination for destination in clients[emitter.name][src] if destination != [receiver.name, dst] ] - #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) - Connections.save_clients() - - @staticmethod - def reconnect_all(): - """Reconstruct connections for resource inputs from CLIENTS. - - :return: - """ - from solar.core.resource import wrap_resource - - for emitter_name, dest_dict in CLIENTS.items(): - emitter = wrap_resource( - db.read(emitter_name, collection=db.COLLECTIONS.resource) - ) - for emitter_input, destinations in dest_dict.items(): - for receiver_name, receiver_input in destinations: - receiver = wrap_resource( - db.read(receiver_name, collection=db.COLLECTIONS.resource) - ) - emitter.args[emitter_input].subscribe( - receiver.args[receiver_input]) + Connections.save_clients(clients) @staticmethod def receivers(emitter_name, emitter_input_name): - return CLIENTS.get(emitter_name, {}).get(emitter_input_name, []) + return Connections.read_clients().get(emitter_name, {}).get( + emitter_input_name, [] + ) @staticmethod def emitter(receiver_name, receiver_input_name): - for emitter_name, dest_dict in CLIENTS.items(): + for emitter_name, dest_dict in Connections.read_clients().items(): for emitter_input_name, destinations in dest_dict.items(): if [receiver_name, receiver_input_name] in destinations: return [emitter_name, emitter_input_name] @staticmethod def clear(): - global CLIENTS - - CLIENTS = {} - - path = utils.read_config()[CLIENTS_CONFIG_KEY] - if os.path.exists(path): - os.remove(path) - - @staticmethod - def flush(): - print 'FLUSHING Connections' - #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) - Connections.save_clients() - - -CLIENTS = Connections.read_clients() -#atexit.register(Connections.flush) + db.clear_collection(collection=db.COLLECTIONS.connection) def guess_mapping(emitter, receiver): @@ -173,9 +133,9 @@ def connect(emitter, receiver, mapping=None): def disconnect(emitter, receiver): - for src, destinations in CLIENTS[emitter.name].items(): - disconnect_by_src(emitter.name, src, receiver) + clients = Connections.read_clients() + for src, destinations in clients[emitter.name].items(): for destination in destinations: receiver_input = destination[1] if receiver_input in receiver.args: @@ -183,6 +143,8 @@ def disconnect(emitter, receiver): print 'Removing input {} from {}'.format(receiver_input, receiver.name) emitter.args[src].unsubscribe(receiver.args[receiver_input]) + disconnect_by_src(emitter.name, src, receiver) + def disconnect_receiver_by_input(receiver, input): """Find receiver connection by input and disconnect it. @@ -191,31 +153,36 @@ def disconnect_receiver_by_input(receiver, input): :param input: :return: """ - for emitter_name, inputs in CLIENTS.items(): - emitter = db.read(emitter_name, collection=db.COLLECTIONS.resource) - disconnect_by_src(emitter['id'], input, receiver) + clients = Connections.read_clients() + + for emitter_name, inputs in clients.items(): + disconnect_by_src(emitter_name, input, receiver) def disconnect_by_src(emitter_name, src, receiver): - if src in CLIENTS[emitter_name]: - CLIENTS[emitter_name][src] = [ - destination for destination in CLIENTS[emitter_name][src] + clients = Connections.read_clients() + + if src in clients[emitter_name]: + clients[emitter_name][src] = [ + destination for destination in clients[emitter_name][src] if destination[0] != receiver.name ] - #utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS) + Connections.save_clients(clients) def notify(source, key, value): - from solar.core.resource import wrap_resource + from solar.core.resource import load - CLIENTS.setdefault(source.name, {}) - print 'Notify', source.name, key, value, CLIENTS[source.name] - if key in CLIENTS[source.name]: - for client, r_key in CLIENTS[source.name][key]: - resource = wrap_resource( - db.read(client, collection=db.COLLECTIONS.resource) - ) + clients = Connections.read_clients() + + clients.setdefault(source.name, {}) + Connections.save_clients(clients) + + print 'Notify', source.name, key, value, clients[source.name] + if key in clients[source.name]: + for client, r_key in clients[source.name][key]: + resource = load(client) print 'Resource found', client if resource: resource.update({r_key: value}, emitter=source) @@ -236,7 +203,9 @@ def assign_connections(receiver, connections): def connection_graph(): resource_dependencies = {} - for source, destination_values in CLIENTS.items(): + clients = Connections.read_clients() + + for source, destination_values in clients.items(): resource_dependencies.setdefault(source, set()) for src, destinations in destination_values.items(): resource_dependencies[source].update([ @@ -262,8 +231,10 @@ def connection_graph(): def detailed_connection_graph(): g = nx.MultiDiGraph() - for emitter_name, destination_values in CLIENTS.items(): - for emitter_input, receivers in CLIENTS[emitter_name].items(): + clients = Connections.read_clients() + + for emitter_name, destination_values in clients.items(): + for emitter_input, receivers in clients[emitter_name].items(): for receiver_name, receiver_input in receivers: label = emitter_input if emitter_input != receiver_input: diff --git a/solar/solar/interfaces/db/redis_db.py b/solar/solar/interfaces/db/redis_db.py index aaec107d..fec1e947 100644 --- a/solar/solar/interfaces/db/redis_db.py +++ b/solar/solar/interfaces/db/redis_db.py @@ -47,5 +47,13 @@ class RedisDB(object): def clear(self): self._r.flushdb() + def clear_collection(self, collection=COLLECTIONS.resource): + key_glob = self._make_key(collection, '*') + + self._r.delete(self._r.keys(key_glob)) + + def delete(self, uid, collection=COLLECTIONS.resource): + self._r.delete(self._make_key(collection, uid)) + def _make_key(self, collection, _id): return '{0}:{1}'.format(collection, _id) diff --git a/solar/solar/test/test_resource.py b/solar/solar/test/test_resource.py index e95eeb10..5d7165ff 100644 --- a/solar/solar/test/test_resource.py +++ b/solar/solar/test/test_resource.py @@ -2,7 +2,8 @@ import unittest import base -from solar.core import signals as xs +from solar.core import resource +from solar.core import signals class TestResource(base.BaseResourceTest): @@ -26,6 +27,41 @@ input: sample2 = self.create_resource('sample2', sample_meta_dir, {}) self.assertEqual(sample2.args['value'].value, 0) + def test_connections_recreated_after_load(self): + """ + Create resource in some process. Then in other process load it. + All connections should remain the same. + """ + sample_meta_dir = self.make_resource_meta(""" +id: sample +handler: ansible +version: 1.0.0 +input: + value: + schema: int + value: 0 + """) + + def creating_process(): + sample1 = self.create_resource( + 'sample1', sample_meta_dir, {'value': 1} + ) + sample2 = self.create_resource( + 'sample2', sample_meta_dir, {} + ) + signals.connect(sample1, sample2) + self.assertEqual(sample1.args['value'], sample2.args['value']) + + creating_process() + + signals.CLIENTS = {} + + sample1 = resource.load('sample1') + sample2 = resource.load('sample2') + + sample1.update({'value': 2}) + self.assertEqual(sample1.args['value'], sample2.args['value']) + if __name__ == '__main__': unittest.main()