diff --git a/solar/solar/dblayer/solar_models.py b/solar/solar/dblayer/solar_models.py index 3ee63be4..329d8d01 100644 --- a/solar/solar/dblayer/solar_models.py +++ b/solar/solar/dblayer/solar_models.py @@ -218,7 +218,6 @@ class InputsFieldWrp(IndexFieldWrp): other_type = self._input_type(other_resource, other_inp_name) my_type = self._input_type(my_resource, my_inp_name) - # import ipdb; ipdb.set_trace() if my_type == other_type: # if the type is the same map 1:1 my_type = InputTypes.simple @@ -248,21 +247,49 @@ class InputsFieldWrp(IndexFieldWrp): def disconnect(self, name): # ind_name = '{}_recv_bin'.format(self.fname) + if ':' in name: + # disconnect from hash with tag + normalized_name, tag_and_target = name.split(':', 1) + my_val, my_tag = tag_and_target.split('|', 1) + emit_name = None + # emit_name = '{}|{}'.format(my_tag, my_val) + full_name = '{}|{}|{}'.format(normalized_name, my_tag, my_val) + name = normalized_name + elif '|'in name: + # disconnect everything from given input|resource + my_input, other_resource, other_input = name.split('|', 2) + full_name = my_input + emit_name = '{}|{}'.format(other_resource, other_input) + normalized_name = "{}|{}".format(my_input, other_resource) + name = name.split('|', 1)[0] + my_val, my_tag = None, None + else: + # disconnect everything from given input + full_name = name + emit_name = None + normalized_name = name + my_val, my_tag = None, None indexes = self._instance._riak_object.indexes to_dels = [] recvs = filter(lambda x: x[0] == '{}_recv_bin'.format(self.fname), indexes) for recv in recvs: _, ind_value = recv - recv_name = name - if ':' in recv_name: - recv_name = recv_name.split(':')[0] - if ind_value.startswith('{}|{}|'.format(self._instance.key, recv_name)): - to_dels.append(recv) + if ind_value.startswith('{}|{}|'.format(self._instance.key, normalized_name)): + spl = ind_value.split('|') + if len(spl) == 7 and my_tag and my_val: + if spl[-3] == my_tag and spl[-2] == my_val: + to_dels.append(recv) + else: + to_dels.append(recv) emits = filter(lambda x: x[0] == '{}_emit_bin'.format(self.fname), indexes) for emit in emits: _, ind_value = emit - if ind_value.endswith('|{}|{}'.format(self._instance.key, name)): - to_dels.append(emit) + if ind_value.endswith('|{}|{}'.format(self._instance.key, full_name)): + if emit_name: + if ind_value.startswith(emit_name): + to_dels.append(emit) + else: + to_dels.append(emit) for to_del in to_dels: self._instance._remove_index(*to_del) @@ -391,17 +418,16 @@ class InputsFieldWrp(IndexFieldWrp): else: raise Exception("Not supported splen %s", splen) else: - items, tags = self._map_field_val_hash_single(recvs, other) + items, tags = self._map_field_val_hash_single(recvs, input_name, other) my_resource = self._instance my_resource_value = my_resource.inputs._get_raw_field_val(input_name) if my_resource_value: - for my_val, cres in my_resource_value.iteritems(): - items.append((my_resource.name, my_val, cres)) - tags.add(my_resource.name) + res = my_resource_value + else: + res = {} if len(tags) != 1: # TODO: add it also for during connecting raise Exception("Detected dict with different tags") - res = {} for _, my_val, value in items: res[my_val] = value self._cache[name] = res @@ -659,12 +685,54 @@ class Resource(Model): mapping.iteritems(), concurrency=2) def disconnect(self, other, inputs): + def _to_disconnect((emitter, receiver, meta)): + if not receiver[0] == other_key: + return False + # name there? + if not emitter[0] == self.key: + return False + # TODO: `delete_hash` test works with receiver[1], while lists works with emitter[1] + key = emitter[1] + if not key in converted: + return False + convs = converted[key] + for conv in convs: + if conv: + if meta['tag'] == conv['tag'] \ + and meta['destination_key'] == conv['destination_key']: + return True + else: + return True + return False + + def _convert_input(input): + spl = input.split('|') + spl_len = len(spl) + if spl_len == 1: + # normal input + return input, None + elif spl_len == 3: + return spl[0], {'tag': spl[1], + 'destination_key': spl[2]} + else: + raise Exception("Cannot convert input %r" % input) + + def _format_for_disconnect((emitter, receiver, meta)): + input = receiver[1] + if not meta: + return "{}|{}|{}".format(receiver[1], emitter[0], emitter[1]) + dest_key = meta['destination_key'] + tag = meta.get('tag', other.name) + return '{}:{}|{}'.format(input, dest_key, tag) + + + converted = defaultdict(list) + for k, v in map(_convert_input, inputs): + converted[k].append(v) other_key = other.key edges = other.inputs._edges() - edges = filter(lambda (emitter, receiver, _): receiver[0] == other_key - and emitter[1] in inputs and emitter[0] == self.name, - edges) - inputs = ['{}|{}'.format(x[1][1], self.name) for x in edges] + edges = filter(_to_disconnect, edges) + inputs = map(_format_for_disconnect, edges) solar_map(other.inputs.disconnect, inputs, concurrency=2) def save(self, *args, **kwargs): @@ -707,12 +775,21 @@ class Resource(Model): return_terms=True, max_results=999999) + to_disconnect_all = defaultdict(list) for emit_bin in inputs_index.results: index_vals = emit_bin[0].split('|') - - my_res, my_key, other_res, other_key = index_vals[:4] - emit_obj = Resource.get(other_res) - emit_obj.inputs.disconnect(other_key) + index_vals_len = len(index_vals) + if index_vals_len == 6: # hash + _, my_input, other_res, other_input, my_tag, my_val = index_vals + to_disconnect_all[other_res].append("{}|{}|{}".format(my_input, my_tag, my_val)) + elif index_vals_len == 4: + _, my_input, other_res, other_input = index_vals + to_disconnect_all[other_res].append(other_input) + else: + raise Exception("Unknown input %r" % index_vals) + for other_obj_key, to_disconnect in to_disconnect_all.items(): + other_obj = Resource.get(other_obj_key) + self.disconnect(other_obj, to_disconnect) super(Resource, self).delete()