Redis: first version

This commit is contained in:
Przemyslaw Kaminski 2015-06-08 12:02:43 +02:00
parent 3f0cc4bda9
commit 8db397d365
10 changed files with 89 additions and 15 deletions

View File

@ -252,7 +252,7 @@ def deploy():
def undeploy():
db = get_db()
resources = map(resource.wrap_resource, db.get_list('resource'))
resources = map(resource.wrap_resource, db.get_list(collection_name=db.COLLECTIONS.resource))
resources = {r.name: r for r in resources}
actions.resource_action(resources['glance_api_endpoint'], 'remove')

View File

@ -3,6 +3,9 @@
- hosts: all
sudo: yes
tasks:
- apt: name=redis-server state=present
- apt: name=python-redis state=present
# Setup additional development tools
- apt: name=vim state=present
- apt: name=tmux state=present

View File

@ -7,3 +7,4 @@ requests==2.7.0
mock
dictdiffer==0.4.0
enum34==1.0.4
redis==2.10.3

View File

@ -26,3 +26,6 @@
- {{ bind['value']['src'] }}:{{ bind['value']['dst'] }}:{{ bind['value'].get('mode', 'ro') }}
{% endfor %}
{% endif %}
- name: wait for glance api
wait_for: host={{ ip }} port=9393 timeout=20

View File

@ -27,3 +27,6 @@
- {{ bind['value']['src'] }}:{{ bind['value']['dst'] }}:{{ bind['value'].get('mode', 'ro') }}
{% endfor %}
{% endif %}
- name: wait for glance registry
wait_for: host={{ ip }} port=9191 timeout=20

View File

@ -116,7 +116,7 @@ class Resource(object):
for k, v in self.args_dict().items():
metadata['input'][k]['value'] = v
db.add_resource(self.name, metadata)
db.save(self.name, metadata, collection_name=db.COLLECTIONS.resource)
def create(name, base_path, args, tags=[], connections={}):
@ -155,8 +155,8 @@ def wrap_resource(raw_resource):
def load_all():
ret = {}
for raw_resource in db.get_list('resource'):
resource = db.get_obj_resource(raw_resource['id'])
for raw_resource in db.get_list(collection_name=db.COLLECTIONS.resource):
resource = wrap_resource(raw_resource)
ret[resource.name] = resource
signals.Connections.reconnect_all()

View File

@ -31,7 +31,7 @@ class Connections(object):
if [receiver.name, dst] not in CLIENTS[emitter.name][src]:
CLIENTS[emitter.name][src].append([receiver.name, dst])
#utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
@staticmethod
def remove(emitter, src, receiver, dst):
@ -40,7 +40,7 @@ class Connections(object):
if destination != [receiver.name, dst]
]
#utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
@staticmethod
def reconnect_all():
@ -48,11 +48,17 @@ class Connections(object):
:return:
"""
from solar.core.resource import wrap_resource
for emitter_name, dest_dict in CLIENTS.items():
emitter = db.get_obj_resource(emitter_name)
emitter = wrap_resource(
db.read(emitter_name, collection_name=db.COLLECTIONS.resource)
)
for emitter_input, destinations in dest_dict.items():
for receiver_name, receiver_input in destinations:
receiver = db.get_obj_resource(receiver_name)
receiver = wrap_resource(
db.read(receiver_name, collection_name=db.COLLECTIONS.resource)
)
emitter.args[emitter_input].subscribe(
receiver.args[receiver_input])
@ -72,7 +78,7 @@ class Connections(object):
utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
atexit.register(Connections.flush)
#atexit.register(Connections.flush)
def guess_mapping(emitter, receiver):
@ -135,7 +141,7 @@ def disconnect_receiver_by_input(receiver, input):
:return:
"""
for emitter_name, inputs in CLIENTS.items():
emitter = db.get_resource(emitter_name)
emitter = db.read(emitter_name, collection_name=db.COLLECTIONS.resource)
disconnect_by_src(emitter['id'], input, receiver)
@ -150,11 +156,15 @@ def disconnect_by_src(emitter_name, src, receiver):
def notify(source, key, value):
from solar.core.resource import wrap_resource
CLIENTS.setdefault(source.name, {})
print 'Notify', source.name, key, value, CLIENTS[source.name]
if key in CLIENTS[source.name]:
for client, r_key in CLIENTS[source.name][key]:
resource = db.get_obj_resource(client)
resource = wrap_resource(
db.read(client, collection_name=db.COLLECTIONS.resource)
)
print 'Resource found', client
if resource:
resource.update({r_key: value}, emitter=source)

View File

@ -1,16 +1,19 @@
from solar.interfaces.db.file_system_db import FileSystemDB
from solar.interfaces.db.cached_file_system_db import CachedFileSystemDB
from solar.interfaces.db.file_system_db import FileSystemDB
from solar.interfaces.db.redis_db import RedisDB
mapping = {
'cached_file_system': CachedFileSystemDB,
'file_system': FileSystemDB
'file_system': FileSystemDB,
'redis_db': RedisDB,
}
DB = None
def get_db():
# Should be retrieved from config
global DB
if DB is None:
DB = mapping['cached_file_system']()
DB = mapping['redis_db']()
return DB

View File

@ -0,0 +1,44 @@
from enum import Enum
import json
import redis
from solar import utils
from solar import errors
class RedisDB(object):
COLLECTIONS = Enum(
'Collections',
'connection resource'
)
DB = {
'host': 'localhost',
'port': 6379,
}
def __init__(self):
self._r = redis.StrictRedis(**self.DB)
self.entities = {}
def read(self, uid, collection_name=COLLECTIONS.resource):
return json.loads(
self._r.get(self._make_key(collection_name, uid))
)
def save(self, uid, data, collection_name=COLLECTIONS.resource):
return self._r.set(
self._make_key(collection_name, uid),
json.dumps(data)
)
def get_list(self, collection_name=COLLECTIONS.resource):
key_glob = self._make_key(collection_name, '*')
for key in self._r.keys(key_glob):
yield json.loads(self._r.get(key))
def clear(self):
self._r.flushdb()
def _make_key(self, collection, _id):
return '{0}-{1}'.format(collection, _id)

View File

@ -56,7 +56,14 @@ def stage_changes():
log = state.SL()
action = None
for res_uid in nx.topological_sort(conn_graph):
try:
srt = nx.topological_sort(conn_graph)
except:
for cycle in nx.simple_cycles(conn_graph):
print 'CYCLE: %s' % cycle
raise
for res_uid in srt:
commited_data = commited.get(res_uid, {})
staged_data = to_dict(resources[res_uid], conn_graph)