From 0ccd4e7f63743d870086f98f41b1465c97402422 Mon Sep 17 00:00:00 2001 From: Nadya Privalova Date: Fri, 21 Feb 2014 20:03:19 +0400 Subject: [PATCH] Make recording and scanning data more determined HBase may contain only strings and unicodes. In current implementation some entries put to db after str() or unicode(), others are not being modified at all. It is needed to make this process more determined. It may be achived by creating serialize() and deserialized() methods. In these methods json library may be used for safe work with HBase. Partially implements bp hbase-alarming Change-Id: If67a103e1d79d98cc93b88fe60e79c97fd17bb96 --- ceilometer/storage/impl_hbase.py | 391 +++++++++++++++---------------- 1 file changed, 194 insertions(+), 197 deletions(-) diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index 11ae133ca..3afa30021 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -27,6 +27,7 @@ import os import re import six.moves.urllib.parse as urlparse +import bson.json_util import happybase from ceilometer.openstack.common.gettextutils import _ # noqa @@ -48,22 +49,36 @@ class HBaseStorage(base.StorageEngine): - user - { _id: user id - source: [ array of source ids reporting for the user ] + s_source_name: each source reported for user is stored with prefix s_ + the value of each entry is '1' + sources: this field contains the first source reported for user. + This data is not used but stored for simplification of impl } - project - { _id: project id - source: [ array of source ids reporting for the project ] + s_source_name: the same as for users + sources: the same as for users } - meter - - the raw incoming data + - {_id_reverted_ts: row key is constructed in this way for efficient + filtering + parsed_info_from_incoming_data: e.g. counter_name, counter_type + resource_metadata: raw metadata for corresponding resource + r_metadata_name: flattened metadata for corresponding resource + message: raw incoming data + recorded_at: when the sample has been recorded + source: source for the sample + } - resource - the metadata for resources - { _id: uuid of resource, - metadata: metadata dictionaries + metadata: raw metadata dictionaries + r_metadata: flattened metadata fir quick filtering timestamp: datetime of last update user_id: uuid project_id: uuid meter: [ array of {counter_name: string, counter_type: string} ] + source: source of resource } - alarm - the raw incoming alarm data @@ -184,7 +199,7 @@ class Connection(base.Connection): alarm_to_store = serialize_entry(alarm.as_dict()) alarm_table.put(_id, alarm_to_store) - stored_alarm = deserialize_entry(alarm_table.row(_id)) + stored_alarm = deserialize_entry(alarm_table.row(_id))[0] return models.Alarm(**stored_alarm) create_alarm = update_alarm @@ -201,15 +216,12 @@ class Connection(base.Connection): alarm_table = self.conn.table(self.ALARM_TABLE) - #TODO(nprivalova): to be refactored - if enabled is not None: - enabled = json.dumps(enabled) q = make_query(alarm_id=alarm_id, name=name, enabled=enabled, user_id=user, project_id=project) gen = alarm_table.scan(filter=q) for ignored, data in gen: - stored_alarm = deserialize_entry(data) + stored_alarm = deserialize_entry(data)[0] yield models.Alarm(**stored_alarm) def get_alarm_changes(self, alarm_id, on_behalf_of, @@ -230,11 +242,7 @@ class Connection(base.Connection): gen = alarm_history_table.scan(filter=q, row_start=start_row, row_stop=end_row) for ignored, data in gen: - stored_entry = deserialize_entry(data) - # It is needed to return 'details' field as string - detail = stored_entry['detail'] - if detail: - stored_entry['detail'] = json.dumps(detail) + stored_entry = deserialize_entry(data)[0] yield models.AlarmChange(**stored_entry) def record_alarm_change(self, alarm_change): @@ -259,52 +267,31 @@ class Connection(base.Connection): resource_table = self.conn.table(self.RESOURCE_TABLE) meter_table = self.conn.table(self.METER_TABLE) - # store metadata fields with prefix "r_" to make filtering on metadata - # faster - resource_metadata = {} - res_meta_copy = data['resource_metadata'] - if res_meta_copy: - for key, v in utils.dict_to_keyval(res_meta_copy): - resource_metadata['f:r_metadata.%s' % key] = unicode(v) - # Make sure we know about the user and project if data['user_id']: - user = user_table.row(data['user_id']) - sources = _load_hbase_list(user, 's') - # Update if source is new - if data['source'] not in sources: - user['f:s_%s' % data['source']] = "1" - user_table.put(data['user_id'], user) - - project = project_table.row(data['project_id']) - sources = _load_hbase_list(project, 's') - # Update if source is new - if data['source'] not in sources: - project['f:s_%s' % data['source']] = "1" - project_table.put(data['project_id'], project) - - rts = reverse_timestamp(data['timestamp']) - - resource = resource_table.row(data['resource_id']) + self._update_sources(user_table, data['user_id'], data['source']) + self._update_sources(project_table, data['project_id'], data['source']) + # Get metadata from user's data + resource_metadata = data.get('resource_metadata', {}) + # Determine the name of new meter new_meter = _format_meter_reference( data['counter_name'], data['counter_type'], data['counter_unit']) - new_resource = {'f:resource_id': data['resource_id'], - 'f:project_id': data['project_id'], - 'f:user_id': data['user_id'], - 'f:source': data["source"], - # store meters with prefix "m_" - 'f:m_%s' % new_meter: "1" - } - new_resource.update(resource_metadata) + + flatten_result, sources, meters, metadata = \ + deserialize_entry(resource_table.row(data['resource_id'])) # Update if resource has new information - if new_resource != resource: - meters = _load_hbase_list(resource, 'm') - if new_meter not in meters: - new_resource['f:m_%s' % new_meter] = "1" - - resource_table.put(data['resource_id'], new_resource) + if (data['source'] not in sources) or (new_meter not in meters) or ( + metadata != resource_metadata): + resource_table.put(data['resource_id'], + serialize_entry( + **{'sources': [data['source']], + 'meters': [new_meter], + 'metadata': resource_metadata, + 'resource_id': data['resource_id'], + 'project_id': data['project_id'], + 'user_id': data['user_id']})) # Rowkey consists of reversed timestamp, meter and an md5 of # user+resource+project for purposes of uniqueness @@ -314,45 +301,20 @@ class Connection(base.Connection): # We use reverse timestamps in rowkeys as they are sorted # alphabetically. + rts = reverse_timestamp(data['timestamp']) row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest()) - - recorded_at = timeutils.utcnow() - - # Convert timestamp to string as json.dumps won't - ts = timeutils.strtime(data['timestamp']) - recorded_at_ts = timeutils.strtime(recorded_at) - - record = {'f:timestamp': ts, - 'f:counter_name': data['counter_name'], - 'f:counter_type': data['counter_type'], - 'f:counter_volume': str(data['counter_volume']), - 'f:counter_unit': data['counter_unit'], - # TODO(shengjie) consider using QualifierFilter - # keep dimensions as column qualifier for quicker look up - # TODO(shengjie) extra dimensions need to be added as CQ - 'f:user_id': data['user_id'], - 'f:project_id': data['project_id'], - 'f:message_id': data['message_id'], - 'f:resource_id': data['resource_id'], - 'f:source': data['source'], - 'f:recorded_at': recorded_at, - # keep raw metadata as well as flattened to provide - # capability with API v2. It will be flattened in another - # way on API level - 'f:metadata': data.get('resource_metadata', '{}'), - # add in reversed_ts here for time range scan - 'f:rts': str(rts) - } - # Need to record resource_metadata for more robust filtering. - record.update(resource_metadata) - # Don't want to be changing the original data object. - data = copy.copy(data) - data['timestamp'] = ts - data['recorded_at'] = recorded_at_ts - # Save original meter. - record['f:message'] = json.dumps(data) + record = serialize_entry(data, **{'metadata': resource_metadata, + 'rts': rts, + 'message': data, + 'recorded_at': timeutils.utcnow()}) meter_table.put(row, record) + def _update_sources(self, table, id, source): + user, sources, _, _ = deserialize_entry(table.row(id)) + if source not in sources: + sources.append(source) + table.put(id, serialize_entry(user, **{'sources': sources})) + def get_users(self, source=None): """Return an iterable of user id strings. @@ -394,21 +356,9 @@ class Connection(base.Connection): :param resource: Optional resource filter. :param pagination: Optional pagination query. """ - if pagination: raise NotImplementedError(_('Pagination not implemented')) - def make_resource(data, first_ts, last_ts): - """Transform HBase fields to Resource model.""" - return models.Resource( - resource_id=data['f:resource_id'], - first_sample_timestamp=first_ts, - last_sample_timestamp=last_ts, - project_id=data['f:project_id'], - source=data['f:source'], - user_id=data['f:user_id'], - metadata=data['f:metadata'], - ) meter_table = self.conn.table(self.METER_TABLE) sample_filter = storage.SampleFilter( @@ -416,31 +366,36 @@ class Connection(base.Connection): start=start_timestamp, start_timestamp_op=start_timestamp_op, end=end_timestamp, end_timestamp_op=end_timestamp_op, resource=resource, source=source, metaquery=metaquery) - q, start_row, stop_row = make_sample_query_from_filter( sample_filter, require_meter=False) LOG.debug(_("Query Meter table: %s") % q) meters = meter_table.scan(filter=q, row_start=start_row, row_stop=stop_row) + d_meters = [] + for i, m in meters: + d_meters.append(deserialize_entry(m)) # We have to sort on resource_id before we can group by it. According # to the itertools documentation a new group is generated when the # value of the key function changes (it breaks there). - meters = sorted(meters, key=_resource_id_from_record_tuple) - + meters = sorted(d_meters, key=_resource_id_from_record_tuple) for resource_id, r_meters in itertools.groupby( meters, key=_resource_id_from_record_tuple): - meter_rows = [data[1] for data in sorted( + # We need deserialized entry(data[0]) and metadata(data[3]) + meter_rows = [(data[0], data[3]) for data in sorted( r_meters, key=_timestamp_from_record_tuple)] - latest_data = meter_rows[-1] - min_ts = timeutils.parse_strtime(meter_rows[0]['f:timestamp']) - max_ts = timeutils.parse_strtime(latest_data['f:timestamp']) - yield make_resource( - latest_data, - min_ts, - max_ts + min_ts = meter_rows[0][0]['timestamp'] + max_ts = latest_data[0]['timestamp'] + yield models.Resource( + resource_id=resource_id, + first_sample_timestamp=min_ts, + last_sample_timestamp=max_ts, + project_id=latest_data[0]['project_id'], + source=latest_data[0]['source'], + user_id=latest_data[0]['user_id'], + metadata=latest_data[1], ) def get_meters(self, user=None, project=None, resource=None, source=None, @@ -465,42 +420,28 @@ class Connection(base.Connection): gen = resource_table.scan(filter=q) for ignored, data in gen: - # Meter columns are stored like this: - # "m_{counter_name}|{counter_type}|{counter_unit}" => "1" - # where 'm' is a prefix (m for meter), value is always set to 1 - meter = None - for m in data: - if m.startswith('f:m_'): - meter = m - break - if meter is None: + flatten_result, s, m, md = deserialize_entry(data) + if not m: continue - name, type, unit = meter[4:].split("!") + # Meter table may have only one "meter" and "source". That's why + # only first lists element is get in this method + name, type, unit = m[0].split("!") yield models.Meter( name=name, type=type, unit=unit, - resource_id=data['f:resource_id'], - project_id=data['f:project_id'], - source=data['f:source'], - user_id=data['f:user_id'], + resource_id=flatten_result['resource_id'], + project_id=flatten_result['project_id'], + source=s[0] if s else None, + user_id=flatten_result['user_id'], ) - @staticmethod - def _make_sample(data): - """Transform HBase fields to Sample model.""" - data = json.loads(data['f:message']) - data['timestamp'] = timeutils.parse_strtime(data['timestamp']) - data['recorded_at'] = timeutils.parse_strtime(data['recorded_at']) - return models.Sample(**data) - def get_samples(self, sample_filter, limit=None): """Return an iterable of models.Sample instances. :param sample_filter: Filter. :param limit: Maximum number of results to return. """ - meter_table = self.conn.table(self.METER_TABLE) q, start, stop = make_sample_query_from_filter( @@ -513,7 +454,9 @@ class Connection(base.Connection): break else: limit -= 1 - yield self._make_sample(meter) + d_meter = deserialize_entry(meter)[0] + d_meter['message']['recorded_at'] = d_meter['recorded_at'] + yield models.Sample(**d_meter['message']) @staticmethod def _update_meter_stats(stat, meter): @@ -525,9 +468,9 @@ class Connection(base.Connection): :param start_time: query start time :param period: length of the time bucket """ - vol = int(meter['f:counter_volume']) - ts = timeutils.parse_strtime(meter['f:timestamp']) - stat.unit = meter['f:counter_unit'] + vol = meter['counter_volume'] + ts = meter['timestamp'] + stat.unit = meter['counter_unit'] stat.min = min(vol, stat.min or vol) stat.max = max(vol, stat.max) stat.sum = vol + (stat.sum or 0) @@ -557,22 +500,21 @@ class Connection(base.Connection): meter_table = self.conn.table(self.METER_TABLE) q, start, stop = make_sample_query_from_filter(sample_filter) - meters = list(meter for (ignored, meter) in - meter_table.scan(filter=q, row_start=start, - row_stop=stop) - ) + meters = map(deserialize_entry, list(meter for (ignored, meter) in + meter_table.scan(filter=q, row_start=start, + row_stop=stop))) if sample_filter.start: start_time = sample_filter.start elif meters: - start_time = timeutils.parse_strtime(meters[-1]['f:timestamp']) + start_time = meters[-1][0]['timestamp'] else: start_time = None if sample_filter.end: end_time = sample_filter.end elif meters: - end_time = timeutils.parse_strtime(meters[0]['f:timestamp']) + end_time = meters[0][0]['timestamp'] else: end_time = None @@ -586,7 +528,7 @@ class Connection(base.Connection): # As our HBase meters are stored as newest-first, we need to iterate # in the reverse order for meter in meters[::-1]: - ts = timeutils.parse_strtime(meter['f:timestamp']) + ts = meter[0]['timestamp'] if period: offset = int(timeutils.delta_seconds( start_time, ts) / period) * period @@ -612,7 +554,7 @@ class Connection(base.Connection): duration_end=None, groupby=None) ) - self._update_meter_stats(results[-1], meter) + self._update_meter_stats(results[-1], meter[0]) return results @@ -666,7 +608,7 @@ class MTable(object): # Extract filter name and its arguments g = re.search("(.*)\((.*),?\)", f) fname = g.group(1).strip() - fargs = [s.strip().replace('\'', '').replace('\"', '') + fargs = [s.strip().replace('\'', '') for s in g.group(2).split(',')] m = getattr(self, fname) if callable(m): @@ -769,6 +711,7 @@ def make_timestamp_query(func, start=None, start_op=None, end=None, return start_row, end_row q = [] + # We dont need to dump here because get_start_end_rts returns strings if rts_start: q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" % rts_start) @@ -805,11 +748,14 @@ def make_query(metaquery=None, **kwargs): column name in db """ q = [] - - for key, value in kwargs.iteritems(): + # Note: we use extended constructor for SingleColumnValueFilter here. + # It is explicitly specified that entry should not be returned if CF is not + # found in table. + for key, value in kwargs.items(): if value is not None: q.append("SingleColumnValueFilter " - "('f', '%s', =, 'binary:%s')" % (key, value)) + "('f', '%s', =, 'binary:%s', true, true)" % + (key, dump(value))) res_q = None if len(q): res_q = " AND ".join(q) @@ -818,8 +764,9 @@ def make_query(metaquery=None, **kwargs): meta_q = [] for k, v in metaquery.items(): meta_q.append( - "SingleColumnValueFilter ('f', '%s', =, 'binary:%s')" - % ('r_' + k, v)) + "SingleColumnValueFilter ('f', '%s', =, 'binary:%s', " + "true, true)" + % ('r_' + k, dump(v))) meta_q = " AND ".join(meta_q) # join query and metaquery if res_q is not None: @@ -827,7 +774,7 @@ def make_query(metaquery=None, **kwargs): else: res_q = meta_q # metaquery only - return res_q or "" + return res_q def make_sample_query_from_filter(sample_filter, require_meter=True): @@ -876,16 +823,6 @@ def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None): return start_row, end_row -def _load_hbase_list(d, prefix): - """Deserialise dict stored as HBase column family - """ - ret = [] - prefix = 'f:%s_' % prefix - for key in (k for k in d if k.startswith(prefix)): - ret.append(key[len(prefix):]) - return ret - - def _format_meter_reference(counter_name, counter_type, counter_unit): """Format reference to meter data. """ @@ -895,45 +832,105 @@ def _format_meter_reference(counter_name, counter_type, counter_unit): def _timestamp_from_record_tuple(record): """Extract timestamp from HBase tuple record """ - return timeutils.parse_strtime(record[1]['f:timestamp']) + return record[0]['timestamp'] def _resource_id_from_record_tuple(record): """Extract resource_id from HBase tuple record """ - return record[1]['f:resource_id'] + return record[0]['resource_id'] -#TODO(nprivalova): to be refactored, will be used everywhere in impl_hbase -# without additional ifs -def serialize_entry(entry_from_user): - result_dict = copy.copy(entry_from_user) - keys = result_dict.keys() - for key in keys: - val = result_dict[key] - if isinstance(val, datetime.datetime): - val = timeutils.strtime(val) - if not isinstance(val, basestring): - val = json.dumps(val) - result_dict['f:' + key] = val - del result_dict[key] - return result_dict +def deserialize_entry(entry, get_raw_meta=True): + """Return a list of flatten_result, sources, meters and metadata + flatten_result contains a dict of simple structures such as 'resource_id':1 + sources/meters are the lists of sources and meters correspondingly. + metadata is metadata dict. This dict may be returned as flattened if + get_raw_meta is False. + + :param entry: entry from HBase, without row name and timestamp + :param get_raw_meta: If true then raw metadata will be returned + If False metadata will be constructed from + 'f:r_metadata.' fields + """ + flatten_result = {} + sources = [] + meters = [] + metadata_flattened = {} + for k, v in entry.items(): + if k.startswith('f:s_'): + sources.append(k[4:]) + elif k.startswith('f:m_'): + meters.append(k[4:]) + elif k.startswith('f:r_metadata.'): + metadata_flattened[k[len('f:r_metadata.'):]] = load(v) + else: + flatten_result[k[2:]] = load(v) + if get_raw_meta: + metadata = flatten_result.get('metadata', {}) + else: + metadata = metadata_flattened + + return flatten_result, sources, meters, metadata -def deserialize_entry(stored_entry): - result_entry = copy.copy(stored_entry) - keys = result_entry.keys() - for key in keys: - val = result_entry[key] - try: - val = json.loads(val) - except ValueError: - pass - if "timestamp" in key and val: - val = timeutils.parse_strtime(val) - # There is no int in wsme models - if isinstance(val, (int, long, float)) and not isinstance(val, bool): - val = str(val) - result_entry[key[2:]] = val - del result_entry[key] - return result_entry +def serialize_entry(data={}, **kwargs): + """Return a dict that is ready to be stored to HBase + + :param data: dict to be serialized + :param kwargs: additional args + """ + entry_dict = copy.copy(data) + entry_dict.update(**kwargs) + + result = {} + for k, v in entry_dict.items(): + if k == 'sources': + # user and project tables may contain several sources and meters + # that's why we store it separately as pairs "source/meter name:1". + # Resource and meter table contain only one and it's possible + # to store pairs like "source/meter:source name/meter name". But to + # keep things simple it's possible to store all variants in all + # tables because it doesn't break logic and overhead is not too big + for source in v: + result['f:s_%s' % source] = dump('1') + if v: + result['f:source'] = dump(v[0]) + elif k == 'meters': + for meter in v: + result['f:m_%s' % meter] = dump('1') + elif k == 'metadata': + # keep raw metadata as well as flattened to provide + # capability with API v2. It will be flattened in another + # way on API level. But we need flattened too for quick filtering. + flattened_meta = dump_metadata(v) + for k, m in flattened_meta.items(): + result['f:r_metadata.' + k] = dump(m) + result['f:metadata'] = dump(v) + else: + result['f:' + k] = dump(v) + return result + + +def dump_metadata(meta): + resource_metadata = {} + for key, v in utils.dict_to_keyval(meta): + resource_metadata[key] = v + return resource_metadata + + +def dump(data): + return json.dumps(data, default=bson.json_util.default) + + +def load(data): + return json.loads(data, object_hook=object_hook) + + +# We don't want to have tzinfo in decoded json.This object_hook is +# overwritten json_util.object_hook for $date +def object_hook(dct): + if "$date" in dct: + dt = bson.json_util.object_hook(dct) + return dt.replace(tzinfo=None) + return bson.json_util.object_hook(dct)