ListObserver fixes

ListObserver needs to save emitter data to args. This is because
it saves values, when we re-do connections we need to do them
in the same order as for values. This can be only reliably handled
when we save emitter along with values.
This commit is contained in:
Przemyslaw Kaminski 2015-04-27 11:37:05 +02:00
parent f9686f7c98
commit e2bb4284ae
7 changed files with 130 additions and 42 deletions

View File

@ -19,14 +19,14 @@ class TestHAProxyDeployment(unittest.TestCase):
haproxy_keystone_config = db.get_resource('haproxy_keystone_config')
self.assertEqual(
haproxy_keystone_config.args['servers'],
[ip['value'] for ip in haproxy_keystone_config.args['servers'].value],
[
keystone1.args['ip'],
keystone2.args['ip'],
]
)
self.assertEqual(
haproxy_keystone_config.args['ports'],
[p['value'] for p in haproxy_keystone_config.args['ports'].value],
[
keystone1.args['port'],
keystone2.args['port'],
@ -48,14 +48,14 @@ class TestHAProxyDeployment(unittest.TestCase):
haproxy_nova_config = db.get_resource('haproxy_nova_config')
self.assertEqual(
haproxy_nova_config.args['servers'],
[ip['value'] for ip in haproxy_nova_config.args['servers'].value],
[
nova1.args['ip'],
nova2.args['ip'],
]
)
self.assertEqual(
haproxy_nova_config.args['ports'],
[p['value'] for p in haproxy_nova_config.args['ports'].value],
[
nova1.args['port'],
nova2.args['port'],
@ -73,21 +73,21 @@ class TestHAProxyDeployment(unittest.TestCase):
self.assertEqual(node5.args['ssh_key'], haproxy.args['ssh_key'])
self.assertEqual(node5.args['ssh_user'], haproxy.args['ssh_user'])
self.assertEqual(
haproxy_config.args['configs'],
[c['value'] for c in haproxy_config.args['configs'].value],
[
haproxy_keystone_config.args['servers'],
haproxy_nova_config.args['servers'],
]
)
self.assertEqual(
haproxy_config.args['configs_ports'],
[cp['value'] for cp in haproxy_config.args['configs_ports'].value],
[
haproxy_keystone_config.args['ports'],
haproxy_nova_config.args['ports'],
]
)
self.assertEqual(
haproxy_config.args['listen_ports'],
[lp['value'] for lp in haproxy_config.args['listen_ports'].value],
[
haproxy_keystone_config.args['listen_port'],
haproxy_nova_config.args['listen_port'],
@ -97,7 +97,7 @@ class TestHAProxyDeployment(unittest.TestCase):
[
haproxy_config.args['config_dir'],
],
haproxy.args['host_binds']
[hb['value'] for hb in haproxy.args['host_binds'].value]
)
self.assertEqual(
haproxy.args['ports'],

View File

@ -43,7 +43,7 @@ class BaseHandler(object):
def _make_args(self, resource):
args = {'name': resource.name}
args.update(resource.args_dict())
args.update(resource.args)
return args

View File

@ -20,7 +20,10 @@ class BaseObserver(object):
print '{} {}'.format(self, msg)
def __repr__(self):
return '[{}:{}]'.format(self.attached_to.name, self.name)
return '[{}:{}] {}'.format(self.attached_to.name, self.name, self.value)
def __unicode__(self):
return self.value
def __eq__(self, other):
if isinstance(other, BaseObserver):
@ -135,24 +138,32 @@ class Observer(BaseObserver):
class ListObserver(BaseObserver):
type_ = 'list'
def __init__(self, *args, **kwargs):
super(ListObserver, self).__init__(*args, **kwargs)
self.emitters = []
def __unicode__(self):
return unicode(self.value)
@staticmethod
def _format_value(emitter):
return {
'emitter': emitter.name,
'emitter_attached_to': emitter.attached_to.name,
'value': emitter.value,
}
def notify(self, emitter):
self.log('Notify from {} value {}'.format(emitter, emitter.value))
# Copy emitter's values to receiver
#self.value[emitter.attached_to.name] = emitter.value
idx = self._emitter_idx(emitter)
self.value[idx] = emitter.value
self.value[idx] = self._format_value(emitter)
for receiver in self.receivers:
receiver.notify(self)
self.attached_to.save()
def subscribed(self, emitter):
super(ListObserver, self).subscribed(emitter)
self.emitters.append((emitter.attached_to.name, emitter.name))
self.value.append(emitter.value)
idx = self._emitter_idx(emitter)
if idx is None:
self.value.append(self._format_value(emitter))
def unsubscribed(self, emitter):
"""
@ -160,13 +171,16 @@ class ListObserver(BaseObserver):
:return:
"""
self.log('Unsubscribed emitter {}'.format(emitter))
#self.value.pop(emitter.attached_to.name)
idx = self._emitter_idx(emitter)
self.emitters.pop(idx)
self.value.pop(idx)
def _emitter_idx(self, emitter):
return self.emitters.index((emitter.attached_to.name, emitter.name))
try:
return [i for i, e in enumerate(self.value)
if e['emitter_attached_to'] == emitter.attached_to.name
][0]
except IndexError:
return
def create(type_, *args, **kwargs):

View File

@ -34,10 +34,24 @@ class Resource(object):
return ("Resource('name={0}', metadata={1}, args={2}, "
"base_dir='{3}', tags={4})").format(self.name,
json.dumps(self.metadata),
json.dumps(self.args_dict()),
json.dumps(self.args_show()),
self.base_dir,
self.tags)
def args_show(self):
def formatter(v):
if isinstance(v, observer.ListObserver):
return v.value
elif isinstance(v, observer.Observer):
return {
'emitter': v.emitter.attached_to.name if v.emitter else None,
'value': v.value,
}
return v
return {k: formatter(v) for k, v in self.args.items()}
def args_dict(self):
return {k: v.value for k, v in self.args.items()}

View File

@ -10,14 +10,14 @@
image: {{ image }}
state: running
ports:
{% for port in ports %}
- {{ port }}:{{ port }}
{% for port in ports.value %}
- {{ port['value'] }}:{{ port['value'] }}
{% endfor %}
volumes:
# TODO: host_binds might need more work
# Currently it's not that trivial to pass custom src: dst here
# (when a config variable is passed here from other resource)
# so we mount it to the same directory as on host
{% for bind in host_binds %}
- {{ bind['src'] }}:{{ bind['dst'] }}:{{ bind.get('mode', 'ro') }}
{% for bind in host_binds.value %}
- {{ bind['value']['src'] }}:{{ bind['value']['dst'] }}:{{ bind['value'].get('mode', 'ro') }}
{% endfor %}

View File

@ -2,23 +2,23 @@
- hosts: [{{ ip }}]
sudo: yes
vars:
config_dir: {src: {{ config_dir['src'] }}, dst: {{ config_dir['dst'] }}}
config_dir: {src: {{ config_dir.value['src'] }}, dst: {{ config_dir.value['dst'] }}}
haproxy_ip: {{ ip }}
haproxy_services:
{% for service, servers, ports, port in zip(configs_names, configs, configs_ports, listen_ports) %}
- name: {{ service }}
listen_port: {{ port }}
{% for service, ports, listen_port in zip(configs.value, configs_ports.value, listen_ports.value) %}
- name: {{ service['emitter_attached_to'] }}
listen_port: {{ listen_port['value'] }}
servers:
{% for server_ip, server_port in zip(servers, ports) %}
- name: {{ name }}
ip: {{ server_ip }}
port: {{ server_port }}
{% for server_ip, server_port in zip(service['value'], ports['value']) %}
- name: {{ server_ip['emitter_attached_to'] }}
ip: {{ server_ip['value'] }}
port: {{ server_port['value'] }}
{% endfor %}
{% endfor %}
tasks:
- apt: name=python-pip state=present
- shell: pip install docker-py
- service: name=docker state=started
- file: path={{ config_dir['src'] }}/ state=directory
- file: path={{ config_dir['src'] }}/haproxy.cfg state=touch
- template: src=/vagrant/haproxy.cfg dest={{ config_dir['src'] }}/haproxy.cfg
- file: path={{ config_dir.value['src'] }}/ state=directory
- file: path={{ config_dir.value['src'] }}/haproxy.cfg state=touch
- template: src=/vagrant/haproxy.cfg dest={{ config_dir.value['src'] }}/haproxy.cfg

View File

@ -26,6 +26,10 @@ input:
sample1.args['values'],
sample2.args['values'],
)
self.assertEqual(
sample2.args['values'].emitter,
sample1.args['values']
)
# Check update
sample1.update({'values': {'a': 2}})
@ -50,6 +54,7 @@ input:
sample2.args['values'],
{'a': 2}
)
self.assertEqual(sample2.args['values'].emitter, None)
def test_multiple_resource_disjoint_connect(self):
sample_meta_dir = self.make_resource_meta("""
@ -88,6 +93,14 @@ input:
xs.connect(sample_port, sample)
self.assertEqual(sample.args['ip'], sample_ip.args['ip'])
self.assertEqual(sample.args['port'], sample_port.args['port'])
self.assertEqual(
sample.args['ip'].emitter,
sample_ip.args['ip']
)
self.assertEqual(
sample.args['port'].emitter,
sample_port.args['port']
)
def test_simple_observer_unsubscription(self):
sample_meta_dir = self.make_resource_meta("""
@ -111,11 +124,19 @@ input:
xs.connect(sample1, sample)
self.assertEqual(sample1.args['ip'], sample.args['ip'])
self.assertEqual(len(sample1.args['ip'].receivers), 1)
self.assertEqual(
sample.args['ip'].emitter,
sample1.args['ip']
)
xs.connect(sample2, sample)
self.assertEqual(sample2.args['ip'], sample.args['ip'])
# sample should be unsubscribed from sample1 and subscribed to sample2
self.assertEqual(len(sample1.args['ip'].receivers), 0)
self.assertEqual(
sample.args['ip'].emitter,
sample2.args['ip']
)
sample1.update({'ip': '10.0.0.3'})
self.assertEqual(sample2.args['ip'], sample.args['ip'])
@ -173,15 +194,34 @@ input-types:
xs.connect(sample1, list_input_single, mapping={'ip': 'ips'})
self.assertEqual(
list_input_single.args['ips'],
[ip['value'] for ip in list_input_single.args['ips'].value],
[
sample1.args['ip'],
]
)
self.assertListEqual(
[(e['emitter_attached_to'], e['emitter']) for e in list_input_single.args['ips'].value],
[(sample1.args['ip'].attached_to.name, 'ip')]
)
xs.connect(sample2, list_input_single, mapping={'ip': 'ips'})
self.assertEqual(
list_input_single.args['ips'],
[ip['value'] for ip in list_input_single.args['ips'].value],
[
sample1.args['ip'],
sample2.args['ip'],
]
)
self.assertListEqual(
[(e['emitter_attached_to'], e['emitter']) for e in list_input_single.args['ips'].value],
[(sample1.args['ip'].attached_to.name, 'ip'),
(sample2.args['ip'].attached_to.name, 'ip')]
)
# Test update
sample2.update({'ip': '10.0.0.3'})
self.assertEqual(
[ip['value'] for ip in list_input_single.args['ips'].value],
[
sample1.args['ip'],
sample2.args['ip'],
@ -191,11 +231,15 @@ input-types:
# Test disconnect
xs.disconnect(sample2, list_input_single)
self.assertEqual(
list_input_single.args['ips'],
[ip['value'] for ip in list_input_single.args['ips'].value],
[
sample1.args['ip'],
]
)
self.assertListEqual(
[(e['emitter_attached_to'], e['emitter']) for e in list_input_single.args['ips'].value],
[(sample1.args['ip'].attached_to.name, 'ip')]
)
def test_list_input_multi(self):
sample_meta_dir = self.make_resource_meta("""
@ -229,24 +273,40 @@ input-types:
)
xs.connect(sample1, list_input_multi, mapping={'ip': 'ips', 'port': 'ports'})
self.assertEqual(list_input_multi.args['ips'], [sample1.args['ip']])
self.assertEqual(list_input_multi.args['ports'], [sample1.args['port']])
self.assertEqual(
[ip['value'] for ip in list_input_multi.args['ips'].value],
[sample1.args['ip']]
)
self.assertEqual(
[p['value'] for p in list_input_multi.args['ports'].value],
[sample1.args['port']]
)
xs.connect(sample2, list_input_multi, mapping={'ip': 'ips', 'port': 'ports'})
self.assertEqual(
list_input_multi.args['ips'],
[ip['value'] for ip in list_input_multi.args['ips'].value],
[
sample1.args['ip'],
sample2.args['ip'],
]
)
self.assertListEqual(
[(e['emitter_attached_to'], e['emitter']) for e in list_input_multi.args['ips'].value],
[(sample1.args['ip'].attached_to.name, 'ip'),
(sample2.args['ip'].attached_to.name, 'ip')]
)
self.assertEqual(
list_input_multi.args['ports'],
[p['value'] for p in list_input_multi.args['ports'].value],
[
sample1.args['port'],
sample2.args['port'],
]
)
self.assertListEqual(
[(e['emitter_attached_to'], e['emitter']) for e in list_input_multi.args['ports'].value],
[(sample1.args['port'].attached_to.name, 'port'),
(sample2.args['port'].attached_to.name, 'port')]
)
class TestMultiInput(base.BaseResourceTest):