diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index 1e41232db..5834fe4e8 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -173,13 +173,13 @@ class Connection(base.Connection): resource_table = self.conn.table(self.RESOURCE_TABLE) meter_table = self.conn.table(self.METER_TABLE) - # store metadata fields with prefix "r_" + # store metadata fields with prefix "r_" to make filtering on metadata + # faster resource_metadata = {} res_meta_copy = data['resource_metadata'] if res_meta_copy: - for key, v in utils.recursive_keypairs(res_meta_copy, - separator='.'): - resource_metadata['f:r_%s' % key] = unicode(v) + for key, v in utils.dict_to_keyval(res_meta_copy): + resource_metadata['f:r_metadata.%s' % key] = unicode(v) # Make sure we know about the user and project if data['user_id']: @@ -250,6 +250,10 @@ class Connection(base.Connection): 'f:resource_id': data['resource_id'], 'f:source': data['source'], 'f:recorded_at': recorded_at, + # keep raw metadata as well as flattened to provide + # capability with API v2. It will be flattened in another + # way on API level + 'f:metadata': data.get('resource_metadata', '{}'), # add in reversed_ts here for time range scan 'f:rts': str(rts) } @@ -310,9 +314,6 @@ class Connection(base.Connection): def make_resource(data, first_ts, last_ts): """Transform HBase fields to Resource model.""" - # convert HBase metadata e.g. f:r_display_name to display_name - data['f:metadata'] = _metadata_from_document(data) - return models.Resource( resource_id=data['f:resource_id'], first_sample_timestamp=first_ts, @@ -332,6 +333,7 @@ class Connection(base.Connection): start_op=start_timestamp_op, end=end_timestamp, end_op=end_timestamp_op, + metaquery=metaquery, require_meter=False, query_only=False) LOG.debug(_("Query Meter table: %s") % q) @@ -351,20 +353,11 @@ class Connection(base.Connection): latest_data = meter_rows[-1] min_ts = timeutils.parse_strtime(meter_rows[0]['f:timestamp']) max_ts = timeutils.parse_strtime(latest_data['f:timestamp']) - if metaquery: - for k, v in metaquery.iteritems(): - if latest_data['f:r_' + k.split('.', 1)[1]] == v: - yield make_resource( - latest_data, - min_ts, - max_ts - ) - else: - yield make_resource( - latest_data, - min_ts, - max_ts - ) + yield make_resource( + latest_data, + min_ts, + max_ts + ) def get_meters(self, user=None, project=None, resource=None, source=None, metaquery={}, pagination=None): @@ -380,26 +373,12 @@ class Connection(base.Connection): if pagination: raise NotImplementedError(_('Pagination not implemented')) - resource_table = self.conn.table(self.RESOURCE_TABLE) q = make_query(user=user, project=project, resource=resource, - source=source, require_meter=False, query_only=True) + source=source, metaquery=metaquery, + require_meter=False, query_only=True) LOG.debug(_("Query Resource table: %s") % q) - # handle metaquery - if metaquery: - meta_q = [] - for k, v in metaquery.iteritems(): - meta_q.append( - "SingleColumnValueFilter ('f', '%s', =, 'binary:%s')" - % ('r_' + k.split('.', 1)[1], v)) - meta_q = " AND ".join(meta_q) - # join query and metaquery - if q is not None: - q += " AND " + meta_q - else: - q = meta_q # metaquery only - gen = resource_table.scan(filter=q) for ignored, data in gen: @@ -444,42 +423,14 @@ class Connection(base.Connection): q, start, stop = make_query_from_filter(sample_filter, require_meter=False) LOG.debug(_("Query Meter Table: %s") % q) - gen = meter_table.scan(filter=q, row_start=start, row_stop=stop) - for ignored, meter in gen: - # TODO(shengjie) put this implementation here because it's failing - # the test. bp hbase-meter-table-enhancement will address this - # properly. - # handle metaquery - metaquery = sample_filter.metaquery - # TODO(jd) implements using HBase capabilities - if limit == 0: - break - if metaquery: - for k, v in metaquery.iteritems(): - message = json.loads(meter['f:message']) - metadata = message['resource_metadata'] - keys = k.split('.') - # Support the dictionary type of metadata - for key in keys[1:]: - if key in metadata: - metadata = metadata[key] - else: - break - # NOTE (flwang) For multiple level searching, the matadata - # object will be drilled down to check if it's matched - # with the searched value. - if metadata != v: - break + if limit is not None: + if limit == 0: + break else: - if limit: - limit -= 1 - yield self._make_sample(meter) - else: - if limit: limit -= 1 - yield self._make_sample(meter) + yield self._make_sample(meter) @staticmethod def _update_meter_stats(stat, meter): @@ -718,8 +669,8 @@ def reverse_timestamp(dt): def make_query(user=None, project=None, meter=None, resource=None, source=None, start=None, start_op=None, - end=None, end_op=None, message_id=None, require_meter=True, - query_only=False): + end=None, end_op=None, metaquery=None, message_id=None, + require_meter=True, query_only=False): """Return a filter query string based on the selected parameters. :param user: Optional user-id @@ -785,14 +736,27 @@ def make_query(user=None, project=None, meter=None, q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" % rts_end) - sample_filter = None - if q: - sample_filter = " AND ".join(q) + res_q = None + if len(q): + res_q = " AND ".join(q) + + if metaquery: + meta_q = [] + for k, v in metaquery.items(): + meta_q.append( + "SingleColumnValueFilter ('f', '%s', =, 'binary:%s')" + % ('r_' + k, v)) + meta_q = " AND ".join(meta_q) + # join query and metaquery + if res_q is not None: + res_q += " AND " + meta_q + else: + res_q = meta_q # metaquery only if query_only: - return sample_filter + return res_q else: - return sample_filter, start_row, end_row + return res_q, start_row, end_row def make_query_from_filter(sample_filter, require_meter=True): @@ -806,9 +770,8 @@ def make_query_from_filter(sample_filter, require_meter=True): sample_filter.meter, sample_filter.resource, sample_filter.source, sample_filter.start, sample_filter.start_timestamp_op, - sample_filter.end, - sample_filter.end_timestamp_op, - sample_filter.message_id, + sample_filter.end, sample_filter.end_timestamp_op, + sample_filter.metaquery, sample_filter.message_id, require_meter) @@ -840,14 +803,6 @@ def _format_meter_reference(counter_name, counter_type, counter_unit): return "%s!%s!%s" % (counter_name, counter_type, counter_unit) -def _metadata_from_document(doc): - """Extract resource metadata from HBase document using prefix specific - to HBase implementation. - """ - return dict( - (k[4:], v) for k, v in doc.iteritems() if k.startswith('f:r_')) - - def _timestamp_from_record_tuple(record): """Extract timestamp from HBase tuple record """