Fixed complex disconnects scenarios with new_db
This commit is contained in:
parent
2e40d4cdc2
commit
1529c2c96d
@ -218,7 +218,6 @@ class InputsFieldWrp(IndexFieldWrp):
|
|||||||
other_type = self._input_type(other_resource, other_inp_name)
|
other_type = self._input_type(other_resource, other_inp_name)
|
||||||
my_type = self._input_type(my_resource, my_inp_name)
|
my_type = self._input_type(my_resource, my_inp_name)
|
||||||
|
|
||||||
# import ipdb; ipdb.set_trace()
|
|
||||||
if my_type == other_type:
|
if my_type == other_type:
|
||||||
# if the type is the same map 1:1
|
# if the type is the same map 1:1
|
||||||
my_type = InputTypes.simple
|
my_type = InputTypes.simple
|
||||||
@ -248,21 +247,49 @@ class InputsFieldWrp(IndexFieldWrp):
|
|||||||
|
|
||||||
def disconnect(self, name):
|
def disconnect(self, name):
|
||||||
# ind_name = '{}_recv_bin'.format(self.fname)
|
# ind_name = '{}_recv_bin'.format(self.fname)
|
||||||
|
if ':' in name:
|
||||||
|
# disconnect from hash with tag
|
||||||
|
normalized_name, tag_and_target = name.split(':', 1)
|
||||||
|
my_val, my_tag = tag_and_target.split('|', 1)
|
||||||
|
emit_name = None
|
||||||
|
# emit_name = '{}|{}'.format(my_tag, my_val)
|
||||||
|
full_name = '{}|{}|{}'.format(normalized_name, my_tag, my_val)
|
||||||
|
name = normalized_name
|
||||||
|
elif '|'in name:
|
||||||
|
# disconnect everything from given input|resource
|
||||||
|
my_input, other_resource, other_input = name.split('|', 2)
|
||||||
|
full_name = my_input
|
||||||
|
emit_name = '{}|{}'.format(other_resource, other_input)
|
||||||
|
normalized_name = "{}|{}".format(my_input, other_resource)
|
||||||
|
name = name.split('|', 1)[0]
|
||||||
|
my_val, my_tag = None, None
|
||||||
|
else:
|
||||||
|
# disconnect everything from given input
|
||||||
|
full_name = name
|
||||||
|
emit_name = None
|
||||||
|
normalized_name = name
|
||||||
|
my_val, my_tag = None, None
|
||||||
indexes = self._instance._riak_object.indexes
|
indexes = self._instance._riak_object.indexes
|
||||||
to_dels = []
|
to_dels = []
|
||||||
recvs = filter(lambda x: x[0] == '{}_recv_bin'.format(self.fname), indexes)
|
recvs = filter(lambda x: x[0] == '{}_recv_bin'.format(self.fname), indexes)
|
||||||
for recv in recvs:
|
for recv in recvs:
|
||||||
_, ind_value = recv
|
_, ind_value = recv
|
||||||
recv_name = name
|
if ind_value.startswith('{}|{}|'.format(self._instance.key, normalized_name)):
|
||||||
if ':' in recv_name:
|
spl = ind_value.split('|')
|
||||||
recv_name = recv_name.split(':')[0]
|
if len(spl) == 7 and my_tag and my_val:
|
||||||
if ind_value.startswith('{}|{}|'.format(self._instance.key, recv_name)):
|
if spl[-3] == my_tag and spl[-2] == my_val:
|
||||||
to_dels.append(recv)
|
to_dels.append(recv)
|
||||||
|
else:
|
||||||
|
to_dels.append(recv)
|
||||||
emits = filter(lambda x: x[0] == '{}_emit_bin'.format(self.fname), indexes)
|
emits = filter(lambda x: x[0] == '{}_emit_bin'.format(self.fname), indexes)
|
||||||
for emit in emits:
|
for emit in emits:
|
||||||
_, ind_value = emit
|
_, ind_value = emit
|
||||||
if ind_value.endswith('|{}|{}'.format(self._instance.key, name)):
|
if ind_value.endswith('|{}|{}'.format(self._instance.key, full_name)):
|
||||||
to_dels.append(emit)
|
if emit_name:
|
||||||
|
if ind_value.startswith(emit_name):
|
||||||
|
to_dels.append(emit)
|
||||||
|
else:
|
||||||
|
to_dels.append(emit)
|
||||||
|
|
||||||
for to_del in to_dels:
|
for to_del in to_dels:
|
||||||
self._instance._remove_index(*to_del)
|
self._instance._remove_index(*to_del)
|
||||||
@ -391,17 +418,16 @@ class InputsFieldWrp(IndexFieldWrp):
|
|||||||
else:
|
else:
|
||||||
raise Exception("Not supported splen %s", splen)
|
raise Exception("Not supported splen %s", splen)
|
||||||
else:
|
else:
|
||||||
items, tags = self._map_field_val_hash_single(recvs, other)
|
items, tags = self._map_field_val_hash_single(recvs, input_name, other)
|
||||||
my_resource = self._instance
|
my_resource = self._instance
|
||||||
my_resource_value = my_resource.inputs._get_raw_field_val(input_name)
|
my_resource_value = my_resource.inputs._get_raw_field_val(input_name)
|
||||||
if my_resource_value:
|
if my_resource_value:
|
||||||
for my_val, cres in my_resource_value.iteritems():
|
res = my_resource_value
|
||||||
items.append((my_resource.name, my_val, cres))
|
else:
|
||||||
tags.add(my_resource.name)
|
res = {}
|
||||||
if len(tags) != 1:
|
if len(tags) != 1:
|
||||||
# TODO: add it also for during connecting
|
# TODO: add it also for during connecting
|
||||||
raise Exception("Detected dict with different tags")
|
raise Exception("Detected dict with different tags")
|
||||||
res = {}
|
|
||||||
for _, my_val, value in items:
|
for _, my_val, value in items:
|
||||||
res[my_val] = value
|
res[my_val] = value
|
||||||
self._cache[name] = res
|
self._cache[name] = res
|
||||||
@ -659,12 +685,54 @@ class Resource(Model):
|
|||||||
mapping.iteritems(), concurrency=2)
|
mapping.iteritems(), concurrency=2)
|
||||||
|
|
||||||
def disconnect(self, other, inputs):
|
def disconnect(self, other, inputs):
|
||||||
|
def _to_disconnect((emitter, receiver, meta)):
|
||||||
|
if not receiver[0] == other_key:
|
||||||
|
return False
|
||||||
|
# name there?
|
||||||
|
if not emitter[0] == self.key:
|
||||||
|
return False
|
||||||
|
# TODO: `delete_hash` test works with receiver[1], while lists works with emitter[1]
|
||||||
|
key = emitter[1]
|
||||||
|
if not key in converted:
|
||||||
|
return False
|
||||||
|
convs = converted[key]
|
||||||
|
for conv in convs:
|
||||||
|
if conv:
|
||||||
|
if meta['tag'] == conv['tag'] \
|
||||||
|
and meta['destination_key'] == conv['destination_key']:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _convert_input(input):
|
||||||
|
spl = input.split('|')
|
||||||
|
spl_len = len(spl)
|
||||||
|
if spl_len == 1:
|
||||||
|
# normal input
|
||||||
|
return input, None
|
||||||
|
elif spl_len == 3:
|
||||||
|
return spl[0], {'tag': spl[1],
|
||||||
|
'destination_key': spl[2]}
|
||||||
|
else:
|
||||||
|
raise Exception("Cannot convert input %r" % input)
|
||||||
|
|
||||||
|
def _format_for_disconnect((emitter, receiver, meta)):
|
||||||
|
input = receiver[1]
|
||||||
|
if not meta:
|
||||||
|
return "{}|{}|{}".format(receiver[1], emitter[0], emitter[1])
|
||||||
|
dest_key = meta['destination_key']
|
||||||
|
tag = meta.get('tag', other.name)
|
||||||
|
return '{}:{}|{}'.format(input, dest_key, tag)
|
||||||
|
|
||||||
|
|
||||||
|
converted = defaultdict(list)
|
||||||
|
for k, v in map(_convert_input, inputs):
|
||||||
|
converted[k].append(v)
|
||||||
other_key = other.key
|
other_key = other.key
|
||||||
edges = other.inputs._edges()
|
edges = other.inputs._edges()
|
||||||
edges = filter(lambda (emitter, receiver, _): receiver[0] == other_key
|
edges = filter(_to_disconnect, edges)
|
||||||
and emitter[1] in inputs and emitter[0] == self.name,
|
inputs = map(_format_for_disconnect, edges)
|
||||||
edges)
|
|
||||||
inputs = ['{}|{}'.format(x[1][1], self.name) for x in edges]
|
|
||||||
solar_map(other.inputs.disconnect, inputs, concurrency=2)
|
solar_map(other.inputs.disconnect, inputs, concurrency=2)
|
||||||
|
|
||||||
def save(self, *args, **kwargs):
|
def save(self, *args, **kwargs):
|
||||||
@ -707,12 +775,21 @@ class Resource(Model):
|
|||||||
return_terms=True,
|
return_terms=True,
|
||||||
max_results=999999)
|
max_results=999999)
|
||||||
|
|
||||||
|
to_disconnect_all = defaultdict(list)
|
||||||
for emit_bin in inputs_index.results:
|
for emit_bin in inputs_index.results:
|
||||||
index_vals = emit_bin[0].split('|')
|
index_vals = emit_bin[0].split('|')
|
||||||
|
index_vals_len = len(index_vals)
|
||||||
my_res, my_key, other_res, other_key = index_vals[:4]
|
if index_vals_len == 6: # hash
|
||||||
emit_obj = Resource.get(other_res)
|
_, my_input, other_res, other_input, my_tag, my_val = index_vals
|
||||||
emit_obj.inputs.disconnect(other_key)
|
to_disconnect_all[other_res].append("{}|{}|{}".format(my_input, my_tag, my_val))
|
||||||
|
elif index_vals_len == 4:
|
||||||
|
_, my_input, other_res, other_input = index_vals
|
||||||
|
to_disconnect_all[other_res].append(other_input)
|
||||||
|
else:
|
||||||
|
raise Exception("Unknown input %r" % index_vals)
|
||||||
|
for other_obj_key, to_disconnect in to_disconnect_all.items():
|
||||||
|
other_obj = Resource.get(other_obj_key)
|
||||||
|
self.disconnect(other_obj, to_disconnect)
|
||||||
super(Resource, self).delete()
|
super(Resource, self).delete()
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user