Redis: get rid of global CLIENTS variable

Now Connections are read from Redis on demand.
This commit is contained in:
Przemyslaw Kaminski 2015-06-10 16:02:14 +02:00
parent ac74bf73fc
commit 8aa0f6247a
6 changed files with 117 additions and 102 deletions

View File

@ -191,7 +191,7 @@ def init_cli_connections():
@connections.command()
def show():
print json.dumps(signals.CLIENTS, indent=2)
print json.dumps(signals.Connections.read_clients(), indent=2)
# TODO: this requires graphing libraries
@connections.command()

View File

@ -28,7 +28,7 @@ class BaseObserver(object):
def receivers(self):
from solar.core import resource
signals.CLIENTS = signals.Connections.read_clients()
#signals.CLIENTS = signals.Connections.read_clients()
for receiver_name, receiver_input in signals.Connections.receivers(
self._attached_to_name,
self.name

View File

@ -196,7 +196,7 @@ def load(resource_name):
raw_resource = db.read(resource_name, collection=db.COLLECTIONS.resource)
if raw_resource is None:
raise NotImplementedError(
raise KeyError(
'Resource {} does not exist'.format(resource_name)
)

View File

@ -1,42 +1,34 @@
# -*- coding: utf-8 -*-
import atexit
from collections import defaultdict
import itertools
import networkx as nx
import os
from solar import utils
from solar.interfaces.db import get_db
db = get_db()
CLIENTS_CONFIG_KEY = 'clients-data-file'
#CLIENTS = utils.read_config_file(CLIENTS_CONFIG_KEY)
CLIENTS = {}
class Connections(object):
"""
CLIENTS structure is:
emitter_name:
emitter_input_name:
- - dst_name
- dst_input_name
while DB structure is:
emitter_name_key:
emitter: emitter_name
sources:
emitter_input_name:
- - dst_name
- dst_input_name
"""
@staticmethod
def read_clients():
"""
Returned structure is:
emitter_name:
emitter_input_name:
- - dst_name
- dst_input_name
while DB structure is:
emitter_name_key:
emitter: emitter_name
sources:
emitter_input_name:
- - dst_name
- dst_input_name
"""
ret = {}
for data in db.get_list(collection=db.COLLECTIONS.connection):
@ -45,8 +37,8 @@ class Connections(object):
return ret
@staticmethod
def save_clients():
for emitter_name, sources in CLIENTS.items():
def save_clients(clients):
for emitter_name, sources in clients.items():
data = {
'emitter': emitter_name,
'sources': sources,
@ -58,78 +50,46 @@ class Connections(object):
if src not in emitter.args:
return
clients = Connections.read_clients()
# TODO: implement general circular detection, this one is simple
if [emitter.name, src] in CLIENTS.get(receiver.name, {}).get(dst, []):
if [emitter.name, src] in clients.get(receiver.name, {}).get(dst, []):
raise Exception('Attempted to create cycle in dependencies. Not nice.')
CLIENTS.setdefault(emitter.name, {})
CLIENTS[emitter.name].setdefault(src, [])
if [receiver.name, dst] not in CLIENTS[emitter.name][src]:
CLIENTS[emitter.name][src].append([receiver.name, dst])
clients.setdefault(emitter.name, {})
clients[emitter.name].setdefault(src, [])
if [receiver.name, dst] not in clients[emitter.name][src]:
clients[emitter.name][src].append([receiver.name, dst])
#utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
Connections.save_clients()
Connections.save_clients(clients)
@staticmethod
def remove(emitter, src, receiver, dst):
CLIENTS[emitter.name][src] = [
destination for destination in CLIENTS[emitter.name][src]
clients = Connections.read_clients()
clients[emitter.name][src] = [
destination for destination in clients[emitter.name][src]
if destination != [receiver.name, dst]
]
#utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
Connections.save_clients()
@staticmethod
def reconnect_all():
"""Reconstruct connections for resource inputs from CLIENTS.
:return:
"""
from solar.core.resource import wrap_resource
for emitter_name, dest_dict in CLIENTS.items():
emitter = wrap_resource(
db.read(emitter_name, collection=db.COLLECTIONS.resource)
)
for emitter_input, destinations in dest_dict.items():
for receiver_name, receiver_input in destinations:
receiver = wrap_resource(
db.read(receiver_name, collection=db.COLLECTIONS.resource)
)
emitter.args[emitter_input].subscribe(
receiver.args[receiver_input])
Connections.save_clients(clients)
@staticmethod
def receivers(emitter_name, emitter_input_name):
return CLIENTS.get(emitter_name, {}).get(emitter_input_name, [])
return Connections.read_clients().get(emitter_name, {}).get(
emitter_input_name, []
)
@staticmethod
def emitter(receiver_name, receiver_input_name):
for emitter_name, dest_dict in CLIENTS.items():
for emitter_name, dest_dict in Connections.read_clients().items():
for emitter_input_name, destinations in dest_dict.items():
if [receiver_name, receiver_input_name] in destinations:
return [emitter_name, emitter_input_name]
@staticmethod
def clear():
global CLIENTS
CLIENTS = {}
path = utils.read_config()[CLIENTS_CONFIG_KEY]
if os.path.exists(path):
os.remove(path)
@staticmethod
def flush():
print 'FLUSHING Connections'
#utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
Connections.save_clients()
CLIENTS = Connections.read_clients()
#atexit.register(Connections.flush)
db.clear_collection(collection=db.COLLECTIONS.connection)
def guess_mapping(emitter, receiver):
@ -173,9 +133,9 @@ def connect(emitter, receiver, mapping=None):
def disconnect(emitter, receiver):
for src, destinations in CLIENTS[emitter.name].items():
disconnect_by_src(emitter.name, src, receiver)
clients = Connections.read_clients()
for src, destinations in clients[emitter.name].items():
for destination in destinations:
receiver_input = destination[1]
if receiver_input in receiver.args:
@ -183,6 +143,8 @@ def disconnect(emitter, receiver):
print 'Removing input {} from {}'.format(receiver_input, receiver.name)
emitter.args[src].unsubscribe(receiver.args[receiver_input])
disconnect_by_src(emitter.name, src, receiver)
def disconnect_receiver_by_input(receiver, input):
"""Find receiver connection by input and disconnect it.
@ -191,31 +153,36 @@ def disconnect_receiver_by_input(receiver, input):
:param input:
:return:
"""
for emitter_name, inputs in CLIENTS.items():
emitter = db.read(emitter_name, collection=db.COLLECTIONS.resource)
disconnect_by_src(emitter['id'], input, receiver)
clients = Connections.read_clients()
for emitter_name, inputs in clients.items():
disconnect_by_src(emitter_name, input, receiver)
def disconnect_by_src(emitter_name, src, receiver):
if src in CLIENTS[emitter_name]:
CLIENTS[emitter_name][src] = [
destination for destination in CLIENTS[emitter_name][src]
clients = Connections.read_clients()
if src in clients[emitter_name]:
clients[emitter_name][src] = [
destination for destination in clients[emitter_name][src]
if destination[0] != receiver.name
]
#utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
Connections.save_clients(clients)
def notify(source, key, value):
from solar.core.resource import wrap_resource
from solar.core.resource import load
CLIENTS.setdefault(source.name, {})
print 'Notify', source.name, key, value, CLIENTS[source.name]
if key in CLIENTS[source.name]:
for client, r_key in CLIENTS[source.name][key]:
resource = wrap_resource(
db.read(client, collection=db.COLLECTIONS.resource)
)
clients = Connections.read_clients()
clients.setdefault(source.name, {})
Connections.save_clients(clients)
print 'Notify', source.name, key, value, clients[source.name]
if key in clients[source.name]:
for client, r_key in clients[source.name][key]:
resource = load(client)
print 'Resource found', client
if resource:
resource.update({r_key: value}, emitter=source)
@ -236,7 +203,9 @@ def assign_connections(receiver, connections):
def connection_graph():
resource_dependencies = {}
for source, destination_values in CLIENTS.items():
clients = Connections.read_clients()
for source, destination_values in clients.items():
resource_dependencies.setdefault(source, set())
for src, destinations in destination_values.items():
resource_dependencies[source].update([
@ -262,8 +231,10 @@ def connection_graph():
def detailed_connection_graph():
g = nx.MultiDiGraph()
for emitter_name, destination_values in CLIENTS.items():
for emitter_input, receivers in CLIENTS[emitter_name].items():
clients = Connections.read_clients()
for emitter_name, destination_values in clients.items():
for emitter_input, receivers in clients[emitter_name].items():
for receiver_name, receiver_input in receivers:
label = emitter_input
if emitter_input != receiver_input:

View File

@ -47,5 +47,13 @@ class RedisDB(object):
def clear(self):
self._r.flushdb()
def clear_collection(self, collection=COLLECTIONS.resource):
key_glob = self._make_key(collection, '*')
self._r.delete(self._r.keys(key_glob))
def delete(self, uid, collection=COLLECTIONS.resource):
self._r.delete(self._make_key(collection, uid))
def _make_key(self, collection, _id):
return '{0}:{1}'.format(collection, _id)

View File

@ -2,7 +2,8 @@ import unittest
import base
from solar.core import signals as xs
from solar.core import resource
from solar.core import signals
class TestResource(base.BaseResourceTest):
@ -26,6 +27,41 @@ input:
sample2 = self.create_resource('sample2', sample_meta_dir, {})
self.assertEqual(sample2.args['value'].value, 0)
def test_connections_recreated_after_load(self):
"""
Create resource in some process. Then in other process load it.
All connections should remain the same.
"""
sample_meta_dir = self.make_resource_meta("""
id: sample
handler: ansible
version: 1.0.0
input:
value:
schema: int
value: 0
""")
def creating_process():
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'value': 1}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {}
)
signals.connect(sample1, sample2)
self.assertEqual(sample1.args['value'], sample2.args['value'])
creating_process()
signals.CLIENTS = {}
sample1 = resource.load('sample1')
sample2 = resource.load('sample2')
sample1.update({'value': 2})
self.assertEqual(sample1.args['value'], sample2.args['value'])
if __name__ == '__main__':
unittest.main()