diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index 11ae133ca..3afa30021 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -27,6 +27,7 @@ import os import re import six.moves.urllib.parse as urlparse +import bson.json_util import happybase from ceilometer.openstack.common.gettextutils import _ # noqa @@ -48,22 +49,36 @@ class HBaseStorage(base.StorageEngine): - user - { _id: user id - source: [ array of source ids reporting for the user ] + s_source_name: each source reported for user is stored with prefix s_ + the value of each entry is '1' + sources: this field contains the first source reported for user. + This data is not used but stored for simplification of impl } - project - { _id: project id - source: [ array of source ids reporting for the project ] + s_source_name: the same as for users + sources: the same as for users } - meter - - the raw incoming data + - {_id_reverted_ts: row key is constructed in this way for efficient + filtering + parsed_info_from_incoming_data: e.g. counter_name, counter_type + resource_metadata: raw metadata for corresponding resource + r_metadata_name: flattened metadata for corresponding resource + message: raw incoming data + recorded_at: when the sample has been recorded + source: source for the sample + } - resource - the metadata for resources - { _id: uuid of resource, - metadata: metadata dictionaries + metadata: raw metadata dictionaries + r_metadata: flattened metadata fir quick filtering timestamp: datetime of last update user_id: uuid project_id: uuid meter: [ array of {counter_name: string, counter_type: string} ] + source: source of resource } - alarm - the raw incoming alarm data @@ -184,7 +199,7 @@ class Connection(base.Connection): alarm_to_store = serialize_entry(alarm.as_dict()) alarm_table.put(_id, alarm_to_store) - stored_alarm = deserialize_entry(alarm_table.row(_id)) + stored_alarm = deserialize_entry(alarm_table.row(_id))[0] return models.Alarm(**stored_alarm) create_alarm = update_alarm @@ -201,15 +216,12 @@ class Connection(base.Connection): alarm_table = self.conn.table(self.ALARM_TABLE) - #TODO(nprivalova): to be refactored - if enabled is not None: - enabled = json.dumps(enabled) q = make_query(alarm_id=alarm_id, name=name, enabled=enabled, user_id=user, project_id=project) gen = alarm_table.scan(filter=q) for ignored, data in gen: - stored_alarm = deserialize_entry(data) + stored_alarm = deserialize_entry(data)[0] yield models.Alarm(**stored_alarm) def get_alarm_changes(self, alarm_id, on_behalf_of, @@ -230,11 +242,7 @@ class Connection(base.Connection): gen = alarm_history_table.scan(filter=q, row_start=start_row, row_stop=end_row) for ignored, data in gen: - stored_entry = deserialize_entry(data) - # It is needed to return 'details' field as string - detail = stored_entry['detail'] - if detail: - stored_entry['detail'] = json.dumps(detail) + stored_entry = deserialize_entry(data)[0] yield models.AlarmChange(**stored_entry) def record_alarm_change(self, alarm_change): @@ -259,52 +267,31 @@ class Connection(base.Connection): resource_table = self.conn.table(self.RESOURCE_TABLE) meter_table = self.conn.table(self.METER_TABLE) - # store metadata fields with prefix "r_" to make filtering on metadata - # faster - resource_metadata = {} - res_meta_copy = data['resource_metadata'] - if res_meta_copy: - for key, v in utils.dict_to_keyval(res_meta_copy): - resource_metadata['f:r_metadata.%s' % key] = unicode(v) - # Make sure we know about the user and project if data['user_id']: - user = user_table.row(data['user_id']) - sources = _load_hbase_list(user, 's') - # Update if source is new - if data['source'] not in sources: - user['f:s_%s' % data['source']] = "1" - user_table.put(data['user_id'], user) - - project = project_table.row(data['project_id']) - sources = _load_hbase_list(project, 's') - # Update if source is new - if data['source'] not in sources: - project['f:s_%s' % data['source']] = "1" - project_table.put(data['project_id'], project) - - rts = reverse_timestamp(data['timestamp']) - - resource = resource_table.row(data['resource_id']) + self._update_sources(user_table, data['user_id'], data['source']) + self._update_sources(project_table, data['project_id'], data['source']) + # Get metadata from user's data + resource_metadata = data.get('resource_metadata', {}) + # Determine the name of new meter new_meter = _format_meter_reference( data['counter_name'], data['counter_type'], data['counter_unit']) - new_resource = {'f:resource_id': data['resource_id'], - 'f:project_id': data['project_id'], - 'f:user_id': data['user_id'], - 'f:source': data["source"], - # store meters with prefix "m_" - 'f:m_%s' % new_meter: "1" - } - new_resource.update(resource_metadata) + + flatten_result, sources, meters, metadata = \ + deserialize_entry(resource_table.row(data['resource_id'])) # Update if resource has new information - if new_resource != resource: - meters = _load_hbase_list(resource, 'm') - if new_meter not in meters: - new_resource['f:m_%s' % new_meter] = "1" - - resource_table.put(data['resource_id'], new_resource) + if (data['source'] not in sources) or (new_meter not in meters) or ( + metadata != resource_metadata): + resource_table.put(data['resource_id'], + serialize_entry( + **{'sources': [data['source']], + 'meters': [new_meter], + 'metadata': resource_metadata, + 'resource_id': data['resource_id'], + 'project_id': data['project_id'], + 'user_id': data['user_id']})) # Rowkey consists of reversed timestamp, meter and an md5 of # user+resource+project for purposes of uniqueness @@ -314,45 +301,20 @@ class Connection(base.Connection): # We use reverse timestamps in rowkeys as they are sorted # alphabetically. + rts = reverse_timestamp(data['timestamp']) row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest()) - - recorded_at = timeutils.utcnow() - - # Convert timestamp to string as json.dumps won't - ts = timeutils.strtime(data['timestamp']) - recorded_at_ts = timeutils.strtime(recorded_at) - - record = {'f:timestamp': ts, - 'f:counter_name': data['counter_name'], - 'f:counter_type': data['counter_type'], - 'f:counter_volume': str(data['counter_volume']), - 'f:counter_unit': data['counter_unit'], - # TODO(shengjie) consider using QualifierFilter - # keep dimensions as column qualifier for quicker look up - # TODO(shengjie) extra dimensions need to be added as CQ - 'f:user_id': data['user_id'], - 'f:project_id': data['project_id'], - 'f:message_id': data['message_id'], - 'f:resource_id': data['resource_id'], - 'f:source': data['source'], - 'f:recorded_at': recorded_at, - # keep raw metadata as well as flattened to provide - # capability with API v2. It will be flattened in another - # way on API level - 'f:metadata': data.get('resource_metadata', '{}'), - # add in reversed_ts here for time range scan - 'f:rts': str(rts) - } - # Need to record resource_metadata for more robust filtering. - record.update(resource_metadata) - # Don't want to be changing the original data object. - data = copy.copy(data) - data['timestamp'] = ts - data['recorded_at'] = recorded_at_ts - # Save original meter. - record['f:message'] = json.dumps(data) + record = serialize_entry(data, **{'metadata': resource_metadata, + 'rts': rts, + 'message': data, + 'recorded_at': timeutils.utcnow()}) meter_table.put(row, record) + def _update_sources(self, table, id, source): + user, sources, _, _ = deserialize_entry(table.row(id)) + if source not in sources: + sources.append(source) + table.put(id, serialize_entry(user, **{'sources': sources})) + def get_users(self, source=None): """Return an iterable of user id strings. @@ -394,21 +356,9 @@ class Connection(base.Connection): :param resource: Optional resource filter. :param pagination: Optional pagination query. """ - if pagination: raise NotImplementedError(_('Pagination not implemented')) - def make_resource(data, first_ts, last_ts): - """Transform HBase fields to Resource model.""" - return models.Resource( - resource_id=data['f:resource_id'], - first_sample_timestamp=first_ts, - last_sample_timestamp=last_ts, - project_id=data['f:project_id'], - source=data['f:source'], - user_id=data['f:user_id'], - metadata=data['f:metadata'], - ) meter_table = self.conn.table(self.METER_TABLE) sample_filter = storage.SampleFilter( @@ -416,31 +366,36 @@ class Connection(base.Connection): start=start_timestamp, start_timestamp_op=start_timestamp_op, end=end_timestamp, end_timestamp_op=end_timestamp_op, resource=resource, source=source, metaquery=metaquery) - q, start_row, stop_row = make_sample_query_from_filter( sample_filter, require_meter=False) LOG.debug(_("Query Meter table: %s") % q) meters = meter_table.scan(filter=q, row_start=start_row, row_stop=stop_row) + d_meters = [] + for i, m in meters: + d_meters.append(deserialize_entry(m)) # We have to sort on resource_id before we can group by it. According # to the itertools documentation a new group is generated when the # value of the key function changes (it breaks there). - meters = sorted(meters, key=_resource_id_from_record_tuple) - + meters = sorted(d_meters, key=_resource_id_from_record_tuple) for resource_id, r_meters in itertools.groupby( meters, key=_resource_id_from_record_tuple): - meter_rows = [data[1] for data in sorted( + # We need deserialized entry(data[0]) and metadata(data[3]) + meter_rows = [(data[0], data[3]) for data in sorted( r_meters, key=_timestamp_from_record_tuple)] - latest_data = meter_rows[-1] - min_ts = timeutils.parse_strtime(meter_rows[0]['f:timestamp']) - max_ts = timeutils.parse_strtime(latest_data['f:timestamp']) - yield make_resource( - latest_data, - min_ts, - max_ts + min_ts = meter_rows[0][0]['timestamp'] + max_ts = latest_data[0]['timestamp'] + yield models.Resource( + resource_id=resource_id, + first_sample_timestamp=min_ts, + last_sample_timestamp=max_ts, + project_id=latest_data[0]['project_id'], + source=latest_data[0]['source'], + user_id=latest_data[0]['user_id'], + metadata=latest_data[1], ) def get_meters(self, user=None, project=None, resource=None, source=None, @@ -465,42 +420,28 @@ class Connection(base.Connection): gen = resource_table.scan(filter=q) for ignored, data in gen: - # Meter columns are stored like this: - # "m_{counter_name}|{counter_type}|{counter_unit}" => "1" - # where 'm' is a prefix (m for meter), value is always set to 1 - meter = None - for m in data: - if m.startswith('f:m_'): - meter = m - break - if meter is None: + flatten_result, s, m, md = deserialize_entry(data) + if not m: continue - name, type, unit = meter[4:].split("!") + # Meter table may have only one "meter" and "source". That's why + # only first lists element is get in this method + name, type, unit = m[0].split("!") yield models.Meter( name=name, type=type, unit=unit, - resource_id=data['f:resource_id'], - project_id=data['f:project_id'], - source=data['f:source'], - user_id=data['f:user_id'], + resource_id=flatten_result['resource_id'], + project_id=flatten_result['project_id'], + source=s[0] if s else None, + user_id=flatten_result['user_id'], ) - @staticmethod - def _make_sample(data): - """Transform HBase fields to Sample model.""" - data = json.loads(data['f:message']) - data['timestamp'] = timeutils.parse_strtime(data['timestamp']) - data['recorded_at'] = timeutils.parse_strtime(data['recorded_at']) - return models.Sample(**data) - def get_samples(self, sample_filter, limit=None): """Return an iterable of models.Sample instances. :param sample_filter: Filter. :param limit: Maximum number of results to return. """ - meter_table = self.conn.table(self.METER_TABLE) q, start, stop = make_sample_query_from_filter( @@ -513,7 +454,9 @@ class Connection(base.Connection): break else: limit -= 1 - yield self._make_sample(meter) + d_meter = deserialize_entry(meter)[0] + d_meter['message']['recorded_at'] = d_meter['recorded_at'] + yield models.Sample(**d_meter['message']) @staticmethod def _update_meter_stats(stat, meter): @@ -525,9 +468,9 @@ class Connection(base.Connection): :param start_time: query start time :param period: length of the time bucket """ - vol = int(meter['f:counter_volume']) - ts = timeutils.parse_strtime(meter['f:timestamp']) - stat.unit = meter['f:counter_unit'] + vol = meter['counter_volume'] + ts = meter['timestamp'] + stat.unit = meter['counter_unit'] stat.min = min(vol, stat.min or vol) stat.max = max(vol, stat.max) stat.sum = vol + (stat.sum or 0) @@ -557,22 +500,21 @@ class Connection(base.Connection): meter_table = self.conn.table(self.METER_TABLE) q, start, stop = make_sample_query_from_filter(sample_filter) - meters = list(meter for (ignored, meter) in - meter_table.scan(filter=q, row_start=start, - row_stop=stop) - ) + meters = map(deserialize_entry, list(meter for (ignored, meter) in + meter_table.scan(filter=q, row_start=start, + row_stop=stop))) if sample_filter.start: start_time = sample_filter.start elif meters: - start_time = timeutils.parse_strtime(meters[-1]['f:timestamp']) + start_time = meters[-1][0]['timestamp'] else: start_time = None if sample_filter.end: end_time = sample_filter.end elif meters: - end_time = timeutils.parse_strtime(meters[0]['f:timestamp']) + end_time = meters[0][0]['timestamp'] else: end_time = None @@ -586,7 +528,7 @@ class Connection(base.Connection): # As our HBase meters are stored as newest-first, we need to iterate # in the reverse order for meter in meters[::-1]: - ts = timeutils.parse_strtime(meter['f:timestamp']) + ts = meter[0]['timestamp'] if period: offset = int(timeutils.delta_seconds( start_time, ts) / period) * period @@ -612,7 +554,7 @@ class Connection(base.Connection): duration_end=None, groupby=None) ) - self._update_meter_stats(results[-1], meter) + self._update_meter_stats(results[-1], meter[0]) return results @@ -666,7 +608,7 @@ class MTable(object): # Extract filter name and its arguments g = re.search("(.*)\((.*),?\)", f) fname = g.group(1).strip() - fargs = [s.strip().replace('\'', '').replace('\"', '') + fargs = [s.strip().replace('\'', '') for s in g.group(2).split(',')] m = getattr(self, fname) if callable(m): @@ -769,6 +711,7 @@ def make_timestamp_query(func, start=None, start_op=None, end=None, return start_row, end_row q = [] + # We dont need to dump here because get_start_end_rts returns strings if rts_start: q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" % rts_start) @@ -805,11 +748,14 @@ def make_query(metaquery=None, **kwargs): column name in db """ q = [] - - for key, value in kwargs.iteritems(): + # Note: we use extended constructor for SingleColumnValueFilter here. + # It is explicitly specified that entry should not be returned if CF is not + # found in table. + for key, value in kwargs.items(): if value is not None: q.append("SingleColumnValueFilter " - "('f', '%s', =, 'binary:%s')" % (key, value)) + "('f', '%s', =, 'binary:%s', true, true)" % + (key, dump(value))) res_q = None if len(q): res_q = " AND ".join(q) @@ -818,8 +764,9 @@ def make_query(metaquery=None, **kwargs): meta_q = [] for k, v in metaquery.items(): meta_q.append( - "SingleColumnValueFilter ('f', '%s', =, 'binary:%s')" - % ('r_' + k, v)) + "SingleColumnValueFilter ('f', '%s', =, 'binary:%s', " + "true, true)" + % ('r_' + k, dump(v))) meta_q = " AND ".join(meta_q) # join query and metaquery if res_q is not None: @@ -827,7 +774,7 @@ def make_query(metaquery=None, **kwargs): else: res_q = meta_q # metaquery only - return res_q or "" + return res_q def make_sample_query_from_filter(sample_filter, require_meter=True): @@ -876,16 +823,6 @@ def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None): return start_row, end_row -def _load_hbase_list(d, prefix): - """Deserialise dict stored as HBase column family - """ - ret = [] - prefix = 'f:%s_' % prefix - for key in (k for k in d if k.startswith(prefix)): - ret.append(key[len(prefix):]) - return ret - - def _format_meter_reference(counter_name, counter_type, counter_unit): """Format reference to meter data. """ @@ -895,45 +832,105 @@ def _format_meter_reference(counter_name, counter_type, counter_unit): def _timestamp_from_record_tuple(record): """Extract timestamp from HBase tuple record """ - return timeutils.parse_strtime(record[1]['f:timestamp']) + return record[0]['timestamp'] def _resource_id_from_record_tuple(record): """Extract resource_id from HBase tuple record """ - return record[1]['f:resource_id'] + return record[0]['resource_id'] -#TODO(nprivalova): to be refactored, will be used everywhere in impl_hbase -# without additional ifs -def serialize_entry(entry_from_user): - result_dict = copy.copy(entry_from_user) - keys = result_dict.keys() - for key in keys: - val = result_dict[key] - if isinstance(val, datetime.datetime): - val = timeutils.strtime(val) - if not isinstance(val, basestring): - val = json.dumps(val) - result_dict['f:' + key] = val - del result_dict[key] - return result_dict +def deserialize_entry(entry, get_raw_meta=True): + """Return a list of flatten_result, sources, meters and metadata + flatten_result contains a dict of simple structures such as 'resource_id':1 + sources/meters are the lists of sources and meters correspondingly. + metadata is metadata dict. This dict may be returned as flattened if + get_raw_meta is False. + + :param entry: entry from HBase, without row name and timestamp + :param get_raw_meta: If true then raw metadata will be returned + If False metadata will be constructed from + 'f:r_metadata.' fields + """ + flatten_result = {} + sources = [] + meters = [] + metadata_flattened = {} + for k, v in entry.items(): + if k.startswith('f:s_'): + sources.append(k[4:]) + elif k.startswith('f:m_'): + meters.append(k[4:]) + elif k.startswith('f:r_metadata.'): + metadata_flattened[k[len('f:r_metadata.'):]] = load(v) + else: + flatten_result[k[2:]] = load(v) + if get_raw_meta: + metadata = flatten_result.get('metadata', {}) + else: + metadata = metadata_flattened + + return flatten_result, sources, meters, metadata -def deserialize_entry(stored_entry): - result_entry = copy.copy(stored_entry) - keys = result_entry.keys() - for key in keys: - val = result_entry[key] - try: - val = json.loads(val) - except ValueError: - pass - if "timestamp" in key and val: - val = timeutils.parse_strtime(val) - # There is no int in wsme models - if isinstance(val, (int, long, float)) and not isinstance(val, bool): - val = str(val) - result_entry[key[2:]] = val - del result_entry[key] - return result_entry +def serialize_entry(data={}, **kwargs): + """Return a dict that is ready to be stored to HBase + + :param data: dict to be serialized + :param kwargs: additional args + """ + entry_dict = copy.copy(data) + entry_dict.update(**kwargs) + + result = {} + for k, v in entry_dict.items(): + if k == 'sources': + # user and project tables may contain several sources and meters + # that's why we store it separately as pairs "source/meter name:1". + # Resource and meter table contain only one and it's possible + # to store pairs like "source/meter:source name/meter name". But to + # keep things simple it's possible to store all variants in all + # tables because it doesn't break logic and overhead is not too big + for source in v: + result['f:s_%s' % source] = dump('1') + if v: + result['f:source'] = dump(v[0]) + elif k == 'meters': + for meter in v: + result['f:m_%s' % meter] = dump('1') + elif k == 'metadata': + # keep raw metadata as well as flattened to provide + # capability with API v2. It will be flattened in another + # way on API level. But we need flattened too for quick filtering. + flattened_meta = dump_metadata(v) + for k, m in flattened_meta.items(): + result['f:r_metadata.' + k] = dump(m) + result['f:metadata'] = dump(v) + else: + result['f:' + k] = dump(v) + return result + + +def dump_metadata(meta): + resource_metadata = {} + for key, v in utils.dict_to_keyval(meta): + resource_metadata[key] = v + return resource_metadata + + +def dump(data): + return json.dumps(data, default=bson.json_util.default) + + +def load(data): + return json.loads(data, object_hook=object_hook) + + +# We don't want to have tzinfo in decoded json.This object_hook is +# overwritten json_util.object_hook for $date +def object_hook(dct): + if "$date" in dct: + dt = bson.json_util.object_hook(dct) + return dt.replace(tzinfo=None) + return bson.json_util.object_hook(dct)