Further work on riak_example

This commit is contained in:
Jedrzej Nowak 2015-08-18 19:44:04 +02:00
parent 64740f6083
commit ce2f31598b
3 changed files with 56 additions and 14 deletions

View File

@ -33,7 +33,9 @@ def setup_riak():
ip = ips % (num + 2) # XXX: da rade inaczej ?
r = vr.create('riak_service%d' % num,
'resources/riak_node',
{'riak_name': 'riak%d@%s' % (num, ip)})[0]
{'riak_self_name': 'riak%d' % num,
'riak_hostname': 'riak_server%d.solar' % num,
'riak_name': 'riak%d@riak_server%d.solar' % (num, num)})[0]
riak_services.append(r)
for i, riak in enumerate(riak_services):
@ -42,6 +44,18 @@ def setup_riak():
for i, riak in enumerate(riak_services[1:]):
signals.connect(riak_services[0], riak, {'riak_name': 'join_to'})
hosts_services = []
for i, riak in enumerate(riak_services):
num = i + 1
hosts_file = vr.create('hosts_file%d' % num,
'resources/hosts_file', {})[0]
hosts_services.append(hosts_file)
signals.connect(nodes[i], hosts_file)
for riak in riak_services:
for hosts_file in hosts_services:
signals.connect(riak, hosts_file, {'riak_hostname': 'hosts_names', 'ip': 'hosts_ips'}, events=False)
has_errors = False
for r in locals().values():
@ -60,14 +74,18 @@ def setup_riak():
sys.exit(1)
events = [
Dep('riak_service2', 'run', 'success', 'riak_service3', 'join'),
Dep('riak_service3', 'run', 'success', 'riak_service2', 'join'),
Dep('hosts_file1', 'run', 'success', 'riak_service1', 'run'),
Dep('hosts_file2', 'run', 'success', 'riak_service2', 'run'),
Dep('hosts_file3', 'run', 'success', 'riak_service3', 'run'),
Dep('riak_service2', 'run', 'success', 'riak_service2', 'join'),
Dep('riak_service3', 'run', 'success', 'riak_service3', 'join'),
React('riak_service1', 'run', 'success', 'riak_service2', 'join'),
React('riak_service1', 'run', 'success', 'riak_service3', 'join'),
React('riak_service2', 'run', 'success', 'riak_service2', 'join'),
React('riak_service3', 'run', 'success', 'riak_service3', 'join'),
# React('riak_service2', 'run', 'success', 'riak_service2', 'join'),
# React('riak_service3', 'run', 'success', 'riak_service3', 'join'),
React('riak_service3', 'join', 'success', 'riak_service1', 'commit'),
React('riak_service2', 'join', 'success', 'riak_service1', 'commit')
@ -111,12 +129,12 @@ def setup_haproxies():
for single_hpsc in hpsc_http:
for riak in riaks:
signals.connect(riak, single_hpsc, {'ip': 'servers',
signals.connect(riak, single_hpsc, {'riak_hostname': 'servers',
'riak_port_http': 'ports'})
for single_hpsc in hpsc_pb:
for riak in riaks:
signals.connect(riak, single_hpsc, {'ip': 'servers',
signals.connect(riak, single_hpsc, {'riak_hostname': 'servers',
'riak_port_pb': 'ports'})
# haproxy config to haproxy service

View File

@ -15,9 +15,16 @@ input:
ssh_user:
schema: str!
value:
riak_name:
riak_self_name:
schema: str!
value:
riak_hostname:
schema: str!
value:
riak_name:
schema: str!
# value: "{{riak_self_name}}@{{riak_hostname}}"
value: "{{riak_self_name}}@{{ip}}"
riak_port_http:
schema: int!
value: 18098

View File

@ -8,6 +8,7 @@ from solar.interfaces.db import get_db
from solar.events.api import add_events
from solar.events.controls import Dependency
db = get_db()
@ -135,7 +136,14 @@ def connect_single(emitter, src, receiver, dst):
emitter.args[src].subscribe(receiver.args[dst])
def connect(emitter, receiver, mapping=None):
def connect(emitter, receiver, mapping=None, events=None):
# convert if needed
# TODO: handle invalid resource
# if isinstance(emitter, basestring):
# emitter = resource.load(emitter)
# if isinstance(receiver, basestring):
# receiver = resource.load(receiver)
mapping = mapping or guess_mapping(emitter, receiver)
if isinstance(mapping, set):
@ -151,15 +159,24 @@ def connect(emitter, receiver, mapping=None):
connect_single(emitter, src, receiver, dst)
events = [
Dependency(emitter.name, 'run', 'success', receiver.name, 'run'),
Dependency(emitter.name, 'update', 'success', receiver.name, 'update')
# possibility to set events, when False it will NOT add events at all
if events is not False:
events = [
Dependency(emitter.name, 'run', 'success', receiver.name, 'run'),
Dependency(emitter.name, 'update', 'success', receiver.name, 'update')
]
add_events(emitter.name, events)
#receiver.save()
add_events(emitter.name, events)
# receiver.save()
def disconnect(emitter, receiver):
# convert if needed
# TODO: handle invalid resource
# if isinstance(emitter, basestring):
# emitter = resource.load(emitter)
# if isinstance(receiver, basestring):
# receiver = resource.load(receiver)
clients = Connections.read_clients()
for src, destinations in clients[emitter.name].items():