From d71753f4aba52cf9fc359aabd8763d5801fab968 Mon Sep 17 00:00:00 2001 From: Igor Degtiarov Date: Fri, 3 Oct 2014 17:29:18 +0300 Subject: [PATCH] Implement redesigned separator in names of columns in HBase This change improves constructing "keys" for stored data in HBase. Now we are using separators "_!+" to construct the "key", but we don't restrict any character inside the names of metrics, so it could be a reason of incorrect parsing of stored data. In this patch only one separator ":" is used, and method "prepare_key" helps to avoid incorrect parsing by "quoting" columns and rows names. Closes-bug: #1328114 Change-Id: I8f816055bb76e67d239de73a57b7cb0baa94dee5 --- ceilometer/alarm/storage/impl_hbase.py | 7 ++-- ceilometer/storage/hbase/utils.py | 45 +++++++++++--------- ceilometer/storage/impl_hbase.py | 57 ++++++++++++-------------- 3 files changed, 56 insertions(+), 53 deletions(-) diff --git a/ceilometer/alarm/storage/impl_hbase.py b/ceilometer/alarm/storage/impl_hbase.py index cd120c48c..d85064cc8 100644 --- a/ceilometer/alarm/storage/impl_hbase.py +++ b/ceilometer/alarm/storage/impl_hbase.py @@ -59,7 +59,7 @@ class Connection(base.Connection): - alarm_h: - - row_key: uuid of alarm + "_" + reversed timestamp + - row_key: uuid of alarm + ":" + reversed timestamp - Column Families: f: raw incoming alarm_history data. Timestamp becomes now() @@ -225,5 +225,6 @@ class Connection(base.Connection): rts = hbase_utils.timestamp(ts) with self.conn_pool.connection() as conn: alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE) - alarm_history_table.put(alarm_change.get('alarm_id') + "_" + - str(rts), alarm_change_dict) + alarm_history_table.put( + hbase_utils.prepare_key(alarm_change.get('alarm_id'), rts), + alarm_change_dict) diff --git a/ceilometer/storage/hbase/utils.py b/ceilometer/storage/hbase/utils.py index b2e60824f..a3ae826c4 100644 --- a/ceilometer/storage/hbase/utils.py +++ b/ceilometer/storage/hbase/utils.py @@ -148,9 +148,9 @@ def make_query(metaquery=None, trait_query=None, **kwargs): for k, v in kwargs.items(): if v is not None: res_q = ("SingleColumnValueFilter " - "('f', '%s+%d', %s, 'binary:%s', true, true)" % - (trait_name, EVENT_TRAIT_TYPES[k], OP_SIGN[op], - dump(v))) + "('f', '%s', %s, 'binary:%s', true, true)" % + (prepare_key(trait_name, EVENT_TRAIT_TYPES[k]), + OP_SIGN[op], dump(v))) return res_q # Note: we use extended constructor for SingleColumnValueFilter here. @@ -165,11 +165,11 @@ def make_query(metaquery=None, trait_query=None, **kwargs): elif key == 'trait_type': q.append("ColumnPrefixFilter('%s')" % value) elif key == 'event_id': - q.append("RowFilter ( = , 'regexstring:\d*_%s')" % value) + q.append("RowFilter ( = , 'regexstring:\d*:%s')" % value) else: q.append("SingleColumnValueFilter " "('f', '%s', =, 'binary:%s', true, true)" % - (key, dump(value))) + (quote(key), dump(value))) res_q = None if len(q): res_q = " AND ".join(q) @@ -282,11 +282,13 @@ def make_meter_query_for_resource(start_timestamp, start_timestamp_op, end_op = end_timestamp_op or 'lt' if start_rts: - filter_value = start_rts + '+' + source if source else start_rts + filter_value = (start_rts + ':' + quote(source) if source + else start_rts) mq.append(_QualifierFilter(OP_SIGN_REV[start_op], filter_value)) if end_rts: - filter_value = end_rts + '+' + source if source else end_rts + filter_value = (end_rts + ':' + quote(source) if source + else end_rts) mq.append(_QualifierFilter(OP_SIGN_REV[end_op], filter_value)) if mq: @@ -308,18 +310,17 @@ def make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None): if some_id is None: return None, None if not rts_start: - rts_start = chr(127) - end_row = "%s_%s" % (some_id, rts_start) - start_row = "%s_%s" % (some_id, rts_end) + # NOTE(idegtiarov): Here we could not use chr > 122 because chr >= 123 + # will be quoted and character will be turn in a composition that is + # started with '%' (chr(37)) that lexicographically is less then chr + # of number + rts_start = chr(122) + end_row = prepare_key(some_id, rts_start) + start_row = prepare_key(some_id, rts_end) return start_row, end_row -def format_meter_reference(c_name, c_type, c_unit, rts, source): - """Format reference to meter data.""" - return "%s+%s+%s!%s!%s" % (rts, source, c_name, c_type, c_unit) - - def prepare_key(*args): """Prepares names for rows and columns with correct separator. @@ -367,10 +368,14 @@ def deserialize_entry(entry, get_raw_meta=True): elif k.startswith('f:r_metadata.'): metadata_flattened[k[len('f:r_metadata.'):]] = load(v) elif k.startswith("f:m_"): - meter = (k[4:], load(v)) + meter = ([unquote(i) for i in k[4:].split(':')], load(v)) meters.append(meter) else: - flatten_result[k[2:]] = load(v) + if ':' in k[2:]: + key = tuple([unquote(i) for i in k[2:].split(':')]) + else: + key = unquote(k[2:]) + flatten_result[key] = load(v) if get_raw_meta: metadata = flatten_result.get('resource_metadata', {}) else: @@ -406,11 +411,11 @@ def serialize_entry(data=None, **kwargs): # capability with API v2. It will be flattened in another # way on API level. But we need flattened too for quick filtering. flattened_meta = dump_metadata(v) - for k, m in flattened_meta.items(): - result['f:r_metadata.' + k] = dump(m) + for key, m in flattened_meta.items(): + result['f:r_metadata.' + key] = dump(m) result['f:resource_metadata'] = dump(v) else: - result['f:' + k] = dump(v) + result['f:' + quote(k, ':')] = dump(v) return result diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index 37e38de6d..cf3284d52 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -105,17 +105,17 @@ class Connection(base.Connection): f:r_metadata.display_name or f:r_metadata.tag - sources for all corresponding meters with prefix 's' - - all meters for this resource in format: + - all meters with prefix 'm' for this resource in format: .. code-block:: python - "%s+%s+%s!%s!%s" % (rts, source, counter_name, counter_type, + "%s:%s:%s:%s:%s" % (rts, source, counter_name, counter_type, counter_unit) - events: - row_key: timestamp of event's generation + uuid of event - in format: "%s+%s" % (ts, Event.message_id) + in format: "%s:%s" % (ts, Event.message_id) - Column Families: f: contains the following qualifiers: @@ -126,7 +126,7 @@ class Connection(base.Connection): .. code-block:: python - "%s+%s" % (trait_name, trait_type) + "%s:%s" % (trait_name, trait_type) """ CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, @@ -234,9 +234,9 @@ class Connection(base.Connection): resource_metadata = data.get('resource_metadata', {}) # Determine the name of new meter rts = hbase_utils.timestamp(data['timestamp']) - new_meter = hbase_utils.format_meter_reference( - data['counter_name'], data['counter_type'], - data['counter_unit'], rts, data['source']) + new_meter = hbase_utils.prepare_key( + rts, data['source'], data['counter_name'], + data['counter_type'], data['counter_unit']) # TODO(nprivalova): try not to store resource_id resource = hbase_utils.serialize_entry(**{ @@ -255,8 +255,8 @@ class Connection(base.Connection): # Rowkey consists of reversed timestamp, meter and a # message signature for purposes of uniqueness - row = "%s_%d_%s" % (data['counter_name'], rts, - data['message_signature']) + row = hbase_utils.prepare_key(data['counter_name'], rts, + data['message_signature']) record = hbase_utils.serialize_entry( data, **{'source': data['source'], 'rts': rts, 'message': data, 'recorded_at': timeutils.utcnow()}) @@ -301,7 +301,7 @@ class Connection(base.Connection): # manually first_ts = min(meters, key=operator.itemgetter(1))[1] last_ts = max(meters, key=operator.itemgetter(1))[1] - source = meters[0][0].split('+')[1] + source = meters[0][0][1] # If we use QualifierFilter then HBase returnes only # qualifiers filtered by. It will not return the whole entry. # That's why if we need to ask additional qualifiers manually. @@ -353,10 +353,9 @@ class Connection(base.Connection): flatten_result, s, meters, md = hbase_utils.deserialize_entry( data) for m in meters: - _m_rts, m_source, m_raw = m[0].split("+") - name, type, unit = m_raw.split('!') + _m_rts, m_source, name, m_type, unit = m[0] meter_dict = {'name': name, - 'type': type, + 'type': m_type, 'unit': unit, 'resource_id': flatten_result['resource_id'], 'project_id': flatten_result['project_id'], @@ -365,8 +364,8 @@ class Connection(base.Connection): if frozen_meter in result: continue result.add(frozen_meter) - meter_dict.update({'source': - m_source if m_source else None}) + meter_dict.update({'source': m_source + if m_source else None}) yield models.Meter(**meter_dict) @@ -517,13 +516,14 @@ class Connection(base.Connection): # models.Event or purposes of storage event sorted by # timestamp in the database. ts = event_model.generated - row = "%d_%s" % (hbase_utils.timestamp(ts, reverse=False), - event_model.message_id) + row = hbase_utils.prepare_key( + hbase_utils.timestamp(ts, reverse=False), + event_model.message_id) event_type = event_model.event_type traits = {} if event_model.traits: for trait in event_model.traits: - key = "%s+%d" % (trait.name, trait.dtype) + key = hbase_utils.prepare_key(trait.name, trait.dtype) traits[key] = trait.value record = hbase_utils.serialize_entry(traits, event_type=event_type, @@ -553,16 +553,15 @@ class Connection(base.Connection): traits = [] events_dict = hbase_utils.deserialize_entry(data)[0] for key, value in events_dict.items(): - if (not key.startswith('event_type') - and not key.startswith('timestamp')): - trait_name, trait_dtype = key.rsplit('+', 1) + if isinstance(key, tuple): + trait_name, trait_dtype = key traits.append(ev_models.Trait(name=trait_name, dtype=int(trait_dtype), value=value)) - ts, mess = event_id.split('_', 1) + ts, mess = event_id.split(':') yield ev_models.Event( - message_id=mess, + message_id=hbase_utils.unquote(mess), event_type=events_dict['event_type'], generated=events_dict['timestamp'], traits=sorted(traits, @@ -579,7 +578,7 @@ class Connection(base.Connection): for event_id, data in gen: events_dict = hbase_utils.deserialize_entry(data)[0] for key, value in events_dict.items(): - if key.startswith('event_type'): + if not isinstance(key, tuple) and key.startswith('event_type'): if value not in event_types: event_types.add(value) yield value @@ -600,9 +599,8 @@ class Connection(base.Connection): for event_id, data in gen: events_dict = hbase_utils.deserialize_entry(data)[0] for key, value in events_dict.items(): - if (not key.startswith('event_type') and - not key.startswith('timestamp')): - trait_name, trait_type = key.rsplit('+', 1) + if isinstance(key, tuple): + trait_name, trait_type = key if trait_name not in trait_names: # Here we check that our method return only unique # trait types, for ex. if it is found the same trait @@ -629,8 +627,7 @@ class Connection(base.Connection): for event_id, data in gen: events_dict = hbase_utils.deserialize_entry(data)[0] for key, value in events_dict.items(): - if (not key.startswith('event_type') and - not key.startswith('timestamp')): - trait_name, trait_type = key.rsplit('+', 1) + if isinstance(key, tuple): + trait_name, trait_type = key yield ev_models.Trait(name=trait_name, dtype=int(trait_type), value=value)