Merge pull request #350 from pigmej/new_db_fix_complex_disconnects

Fixed complex disconnects scenarios with new_db
This commit is contained in:
Łukasz Oleś 2015-11-20 10:48:11 +01:00
commit ffd388e6ec

View File

@ -218,7 +218,6 @@ class InputsFieldWrp(IndexFieldWrp):
other_type = self._input_type(other_resource, other_inp_name)
my_type = self._input_type(my_resource, my_inp_name)
# import ipdb; ipdb.set_trace()
if my_type == other_type:
# if the type is the same map 1:1
my_type = InputTypes.simple
@ -248,20 +247,48 @@ class InputsFieldWrp(IndexFieldWrp):
def disconnect(self, name):
# ind_name = '{}_recv_bin'.format(self.fname)
if ':' in name:
# disconnect from hash with tag
normalized_name, tag_and_target = name.split(':', 1)
my_val, my_tag = tag_and_target.split('|', 1)
emit_name = None
# emit_name = '{}|{}'.format(my_tag, my_val)
full_name = '{}|{}|{}'.format(normalized_name, my_tag, my_val)
name = normalized_name
elif '|'in name:
# disconnect everything from given input|resource
my_input, other_resource, other_input = name.split('|', 2)
full_name = my_input
emit_name = '{}|{}'.format(other_resource, other_input)
normalized_name = "{}|{}".format(my_input, other_resource)
name = name.split('|', 1)[0]
my_val, my_tag = None, None
else:
# disconnect everything from given input
full_name = name
emit_name = None
normalized_name = name
my_val, my_tag = None, None
indexes = self._instance._riak_object.indexes
to_dels = []
recvs = filter(lambda x: x[0] == '{}_recv_bin'.format(self.fname), indexes)
for recv in recvs:
_, ind_value = recv
recv_name = name
if ':' in recv_name:
recv_name = recv_name.split(':')[0]
if ind_value.startswith('{}|{}|'.format(self._instance.key, recv_name)):
if ind_value.startswith('{}|{}|'.format(self._instance.key, normalized_name)):
spl = ind_value.split('|')
if len(spl) == 7 and my_tag and my_val:
if spl[-3] == my_tag and spl[-2] == my_val:
to_dels.append(recv)
else:
to_dels.append(recv)
emits = filter(lambda x: x[0] == '{}_emit_bin'.format(self.fname), indexes)
for emit in emits:
_, ind_value = emit
if ind_value.endswith('|{}|{}'.format(self._instance.key, name)):
if ind_value.endswith('|{}|{}'.format(self._instance.key, full_name)):
if emit_name:
if ind_value.startswith(emit_name):
to_dels.append(emit)
else:
to_dels.append(emit)
for to_del in to_dels:
@ -391,17 +418,16 @@ class InputsFieldWrp(IndexFieldWrp):
else:
raise Exception("Not supported splen %s", splen)
else:
items, tags = self._map_field_val_hash_single(recvs, other)
items, tags = self._map_field_val_hash_single(recvs, input_name, other)
my_resource = self._instance
my_resource_value = my_resource.inputs._get_raw_field_val(input_name)
if my_resource_value:
for my_val, cres in my_resource_value.iteritems():
items.append((my_resource.name, my_val, cres))
tags.add(my_resource.name)
res = my_resource_value
else:
res = {}
if len(tags) != 1:
# TODO: add it also for during connecting
raise Exception("Detected dict with different tags")
res = {}
for _, my_val, value in items:
res[my_val] = value
self._cache[name] = res
@ -659,12 +685,54 @@ class Resource(Model):
mapping.iteritems(), concurrency=2)
def disconnect(self, other, inputs):
def _to_disconnect((emitter, receiver, meta)):
if not receiver[0] == other_key:
return False
# name there?
if not emitter[0] == self.key:
return False
# TODO: `delete_hash` test works with receiver[1], while lists works with emitter[1]
key = emitter[1]
if not key in converted:
return False
convs = converted[key]
for conv in convs:
if conv:
if meta['tag'] == conv['tag'] \
and meta['destination_key'] == conv['destination_key']:
return True
else:
return True
return False
def _convert_input(input):
spl = input.split('|')
spl_len = len(spl)
if spl_len == 1:
# normal input
return input, None
elif spl_len == 3:
return spl[0], {'tag': spl[1],
'destination_key': spl[2]}
else:
raise Exception("Cannot convert input %r" % input)
def _format_for_disconnect((emitter, receiver, meta)):
input = receiver[1]
if not meta:
return "{}|{}|{}".format(receiver[1], emitter[0], emitter[1])
dest_key = meta['destination_key']
tag = meta.get('tag', other.name)
return '{}:{}|{}'.format(input, dest_key, tag)
converted = defaultdict(list)
for k, v in map(_convert_input, inputs):
converted[k].append(v)
other_key = other.key
edges = other.inputs._edges()
edges = filter(lambda (emitter, receiver, _): receiver[0] == other_key
and emitter[1] in inputs and emitter[0] == self.name,
edges)
inputs = ['{}|{}'.format(x[1][1], self.name) for x in edges]
edges = filter(_to_disconnect, edges)
inputs = map(_format_for_disconnect, edges)
solar_map(other.inputs.disconnect, inputs, concurrency=2)
def save(self, *args, **kwargs):
@ -707,12 +775,21 @@ class Resource(Model):
return_terms=True,
max_results=999999)
to_disconnect_all = defaultdict(list)
for emit_bin in inputs_index.results:
index_vals = emit_bin[0].split('|')
my_res, my_key, other_res, other_key = index_vals[:4]
emit_obj = Resource.get(other_res)
emit_obj.inputs.disconnect(other_key)
index_vals_len = len(index_vals)
if index_vals_len == 6: # hash
_, my_input, other_res, other_input, my_tag, my_val = index_vals
to_disconnect_all[other_res].append("{}|{}|{}".format(my_input, my_tag, my_val))
elif index_vals_len == 4:
_, my_input, other_res, other_input = index_vals
to_disconnect_all[other_res].append(other_input)
else:
raise Exception("Unknown input %r" % index_vals)
for other_obj_key, to_disconnect in to_disconnect_all.items():
other_obj = Resource.get(other_obj_key)
self.disconnect(other_obj, to_disconnect)
super(Resource, self).delete()