Fix the way how metadata is stored in HBase

The goal of this patch is to fix filtering on metadata.
In HBase we should use the same approach as in SQL now:
store raw metadata as well as flattened one.
When user sends get request on meters, samples or resources
it is possible not to fetch all the data but construct corresponding
filter based on flattened metadata values.
Unfortunatelly, it is not possible not to store raw metadata because
flattening on API level is done in a different way. This
improvement is postponed till API v3.

Change-Id: I1206535ab12037817175d5e79716b223594f246a
This commit is contained in:
Nadya Privalova 2014-01-23 13:23:56 +04:00
parent 2f9300f66c
commit 8bdc6d5421

View File

@ -173,13 +173,13 @@ class Connection(base.Connection):
resource_table = self.conn.table(self.RESOURCE_TABLE)
meter_table = self.conn.table(self.METER_TABLE)
# store metadata fields with prefix "r_"
# store metadata fields with prefix "r_" to make filtering on metadata
# faster
resource_metadata = {}
res_meta_copy = data['resource_metadata']
if res_meta_copy:
for key, v in utils.recursive_keypairs(res_meta_copy,
separator='.'):
resource_metadata['f:r_%s' % key] = unicode(v)
for key, v in utils.dict_to_keyval(res_meta_copy):
resource_metadata['f:r_metadata.%s' % key] = unicode(v)
# Make sure we know about the user and project
if data['user_id']:
@ -250,6 +250,10 @@ class Connection(base.Connection):
'f:resource_id': data['resource_id'],
'f:source': data['source'],
'f:recorded_at': recorded_at,
# keep raw metadata as well as flattened to provide
# capability with API v2. It will be flattened in another
# way on API level
'f:metadata': data.get('resource_metadata', '{}'),
# add in reversed_ts here for time range scan
'f:rts': str(rts)
}
@ -310,9 +314,6 @@ class Connection(base.Connection):
def make_resource(data, first_ts, last_ts):
"""Transform HBase fields to Resource model."""
# convert HBase metadata e.g. f:r_display_name to display_name
data['f:metadata'] = _metadata_from_document(data)
return models.Resource(
resource_id=data['f:resource_id'],
first_sample_timestamp=first_ts,
@ -332,6 +333,7 @@ class Connection(base.Connection):
start_op=start_timestamp_op,
end=end_timestamp,
end_op=end_timestamp_op,
metaquery=metaquery,
require_meter=False,
query_only=False)
LOG.debug(_("Query Meter table: %s") % q)
@ -351,20 +353,11 @@ class Connection(base.Connection):
latest_data = meter_rows[-1]
min_ts = timeutils.parse_strtime(meter_rows[0]['f:timestamp'])
max_ts = timeutils.parse_strtime(latest_data['f:timestamp'])
if metaquery:
for k, v in metaquery.iteritems():
if latest_data['f:r_' + k.split('.', 1)[1]] == v:
yield make_resource(
latest_data,
min_ts,
max_ts
)
else:
yield make_resource(
latest_data,
min_ts,
max_ts
)
yield make_resource(
latest_data,
min_ts,
max_ts
)
def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery={}, pagination=None):
@ -380,26 +373,12 @@ class Connection(base.Connection):
if pagination:
raise NotImplementedError(_('Pagination not implemented'))
resource_table = self.conn.table(self.RESOURCE_TABLE)
q = make_query(user=user, project=project, resource=resource,
source=source, require_meter=False, query_only=True)
source=source, metaquery=metaquery,
require_meter=False, query_only=True)
LOG.debug(_("Query Resource table: %s") % q)
# handle metaquery
if metaquery:
meta_q = []
for k, v in metaquery.iteritems():
meta_q.append(
"SingleColumnValueFilter ('f', '%s', =, 'binary:%s')"
% ('r_' + k.split('.', 1)[1], v))
meta_q = " AND ".join(meta_q)
# join query and metaquery
if q is not None:
q += " AND " + meta_q
else:
q = meta_q # metaquery only
gen = resource_table.scan(filter=q)
for ignored, data in gen:
@ -444,42 +423,14 @@ class Connection(base.Connection):
q, start, stop = make_query_from_filter(sample_filter,
require_meter=False)
LOG.debug(_("Query Meter Table: %s") % q)
gen = meter_table.scan(filter=q, row_start=start, row_stop=stop)
for ignored, meter in gen:
# TODO(shengjie) put this implementation here because it's failing
# the test. bp hbase-meter-table-enhancement will address this
# properly.
# handle metaquery
metaquery = sample_filter.metaquery
# TODO(jd) implements using HBase capabilities
if limit == 0:
break
if metaquery:
for k, v in metaquery.iteritems():
message = json.loads(meter['f:message'])
metadata = message['resource_metadata']
keys = k.split('.')
# Support the dictionary type of metadata
for key in keys[1:]:
if key in metadata:
metadata = metadata[key]
else:
break
# NOTE (flwang) For multiple level searching, the matadata
# object will be drilled down to check if it's matched
# with the searched value.
if metadata != v:
break
if limit is not None:
if limit == 0:
break
else:
if limit:
limit -= 1
yield self._make_sample(meter)
else:
if limit:
limit -= 1
yield self._make_sample(meter)
yield self._make_sample(meter)
@staticmethod
def _update_meter_stats(stat, meter):
@ -718,8 +669,8 @@ def reverse_timestamp(dt):
def make_query(user=None, project=None, meter=None,
resource=None, source=None, start=None, start_op=None,
end=None, end_op=None, message_id=None, require_meter=True,
query_only=False):
end=None, end_op=None, metaquery=None, message_id=None,
require_meter=True, query_only=False):
"""Return a filter query string based on the selected parameters.
:param user: Optional user-id
@ -785,14 +736,27 @@ def make_query(user=None, project=None, meter=None,
q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" %
rts_end)
sample_filter = None
if q:
sample_filter = " AND ".join(q)
res_q = None
if len(q):
res_q = " AND ".join(q)
if metaquery:
meta_q = []
for k, v in metaquery.items():
meta_q.append(
"SingleColumnValueFilter ('f', '%s', =, 'binary:%s')"
% ('r_' + k, v))
meta_q = " AND ".join(meta_q)
# join query and metaquery
if res_q is not None:
res_q += " AND " + meta_q
else:
res_q = meta_q # metaquery only
if query_only:
return sample_filter
return res_q
else:
return sample_filter, start_row, end_row
return res_q, start_row, end_row
def make_query_from_filter(sample_filter, require_meter=True):
@ -806,9 +770,8 @@ def make_query_from_filter(sample_filter, require_meter=True):
sample_filter.meter, sample_filter.resource,
sample_filter.source, sample_filter.start,
sample_filter.start_timestamp_op,
sample_filter.end,
sample_filter.end_timestamp_op,
sample_filter.message_id,
sample_filter.end, sample_filter.end_timestamp_op,
sample_filter.metaquery, sample_filter.message_id,
require_meter)
@ -840,14 +803,6 @@ def _format_meter_reference(counter_name, counter_type, counter_unit):
return "%s!%s!%s" % (counter_name, counter_type, counter_unit)
def _metadata_from_document(doc):
"""Extract resource metadata from HBase document using prefix specific
to HBase implementation.
"""
return dict(
(k[4:], v) for k, v in doc.iteritems() if k.startswith('f:r_'))
def _timestamp_from_record_tuple(record):
"""Extract timestamp from HBase tuple record
"""