Merge "Fix the way how metadata is stored in HBase"
This commit is contained in:
commit
827401f30e
@ -173,13 +173,13 @@ class Connection(base.Connection):
|
||||
resource_table = self.conn.table(self.RESOURCE_TABLE)
|
||||
meter_table = self.conn.table(self.METER_TABLE)
|
||||
|
||||
# store metadata fields with prefix "r_"
|
||||
# store metadata fields with prefix "r_" to make filtering on metadata
|
||||
# faster
|
||||
resource_metadata = {}
|
||||
res_meta_copy = data['resource_metadata']
|
||||
if res_meta_copy:
|
||||
for key, v in utils.recursive_keypairs(res_meta_copy,
|
||||
separator='.'):
|
||||
resource_metadata['f:r_%s' % key] = unicode(v)
|
||||
for key, v in utils.dict_to_keyval(res_meta_copy):
|
||||
resource_metadata['f:r_metadata.%s' % key] = unicode(v)
|
||||
|
||||
# Make sure we know about the user and project
|
||||
if data['user_id']:
|
||||
@ -250,6 +250,10 @@ class Connection(base.Connection):
|
||||
'f:resource_id': data['resource_id'],
|
||||
'f:source': data['source'],
|
||||
'f:recorded_at': recorded_at,
|
||||
# keep raw metadata as well as flattened to provide
|
||||
# capability with API v2. It will be flattened in another
|
||||
# way on API level
|
||||
'f:metadata': data.get('resource_metadata', '{}'),
|
||||
# add in reversed_ts here for time range scan
|
||||
'f:rts': str(rts)
|
||||
}
|
||||
@ -310,9 +314,6 @@ class Connection(base.Connection):
|
||||
|
||||
def make_resource(data, first_ts, last_ts):
|
||||
"""Transform HBase fields to Resource model."""
|
||||
# convert HBase metadata e.g. f:r_display_name to display_name
|
||||
data['f:metadata'] = _metadata_from_document(data)
|
||||
|
||||
return models.Resource(
|
||||
resource_id=data['f:resource_id'],
|
||||
first_sample_timestamp=first_ts,
|
||||
@ -332,6 +333,7 @@ class Connection(base.Connection):
|
||||
start_op=start_timestamp_op,
|
||||
end=end_timestamp,
|
||||
end_op=end_timestamp_op,
|
||||
metaquery=metaquery,
|
||||
require_meter=False,
|
||||
query_only=False)
|
||||
LOG.debug(_("Query Meter table: %s") % q)
|
||||
@ -351,20 +353,11 @@ class Connection(base.Connection):
|
||||
latest_data = meter_rows[-1]
|
||||
min_ts = timeutils.parse_strtime(meter_rows[0]['f:timestamp'])
|
||||
max_ts = timeutils.parse_strtime(latest_data['f:timestamp'])
|
||||
if metaquery:
|
||||
for k, v in metaquery.iteritems():
|
||||
if latest_data['f:r_' + k.split('.', 1)[1]] == v:
|
||||
yield make_resource(
|
||||
latest_data,
|
||||
min_ts,
|
||||
max_ts
|
||||
)
|
||||
else:
|
||||
yield make_resource(
|
||||
latest_data,
|
||||
min_ts,
|
||||
max_ts
|
||||
)
|
||||
yield make_resource(
|
||||
latest_data,
|
||||
min_ts,
|
||||
max_ts
|
||||
)
|
||||
|
||||
def get_meters(self, user=None, project=None, resource=None, source=None,
|
||||
metaquery={}, pagination=None):
|
||||
@ -380,26 +373,12 @@ class Connection(base.Connection):
|
||||
|
||||
if pagination:
|
||||
raise NotImplementedError(_('Pagination not implemented'))
|
||||
|
||||
resource_table = self.conn.table(self.RESOURCE_TABLE)
|
||||
q = make_query(user=user, project=project, resource=resource,
|
||||
source=source, require_meter=False, query_only=True)
|
||||
source=source, metaquery=metaquery,
|
||||
require_meter=False, query_only=True)
|
||||
LOG.debug(_("Query Resource table: %s") % q)
|
||||
|
||||
# handle metaquery
|
||||
if metaquery:
|
||||
meta_q = []
|
||||
for k, v in metaquery.iteritems():
|
||||
meta_q.append(
|
||||
"SingleColumnValueFilter ('f', '%s', =, 'binary:%s')"
|
||||
% ('r_' + k.split('.', 1)[1], v))
|
||||
meta_q = " AND ".join(meta_q)
|
||||
# join query and metaquery
|
||||
if q is not None:
|
||||
q += " AND " + meta_q
|
||||
else:
|
||||
q = meta_q # metaquery only
|
||||
|
||||
gen = resource_table.scan(filter=q)
|
||||
|
||||
for ignored, data in gen:
|
||||
@ -444,42 +423,14 @@ class Connection(base.Connection):
|
||||
q, start, stop = make_query_from_filter(sample_filter,
|
||||
require_meter=False)
|
||||
LOG.debug(_("Query Meter Table: %s") % q)
|
||||
|
||||
gen = meter_table.scan(filter=q, row_start=start, row_stop=stop)
|
||||
|
||||
for ignored, meter in gen:
|
||||
# TODO(shengjie) put this implementation here because it's failing
|
||||
# the test. bp hbase-meter-table-enhancement will address this
|
||||
# properly.
|
||||
# handle metaquery
|
||||
metaquery = sample_filter.metaquery
|
||||
# TODO(jd) implements using HBase capabilities
|
||||
if limit == 0:
|
||||
break
|
||||
if metaquery:
|
||||
for k, v in metaquery.iteritems():
|
||||
message = json.loads(meter['f:message'])
|
||||
metadata = message['resource_metadata']
|
||||
keys = k.split('.')
|
||||
# Support the dictionary type of metadata
|
||||
for key in keys[1:]:
|
||||
if key in metadata:
|
||||
metadata = metadata[key]
|
||||
else:
|
||||
break
|
||||
# NOTE (flwang) For multiple level searching, the matadata
|
||||
# object will be drilled down to check if it's matched
|
||||
# with the searched value.
|
||||
if metadata != v:
|
||||
break
|
||||
if limit is not None:
|
||||
if limit == 0:
|
||||
break
|
||||
else:
|
||||
if limit:
|
||||
limit -= 1
|
||||
yield self._make_sample(meter)
|
||||
else:
|
||||
if limit:
|
||||
limit -= 1
|
||||
yield self._make_sample(meter)
|
||||
yield self._make_sample(meter)
|
||||
|
||||
@staticmethod
|
||||
def _update_meter_stats(stat, meter):
|
||||
@ -718,8 +669,8 @@ def reverse_timestamp(dt):
|
||||
|
||||
def make_query(user=None, project=None, meter=None,
|
||||
resource=None, source=None, start=None, start_op=None,
|
||||
end=None, end_op=None, message_id=None, require_meter=True,
|
||||
query_only=False):
|
||||
end=None, end_op=None, metaquery=None, message_id=None,
|
||||
require_meter=True, query_only=False):
|
||||
"""Return a filter query string based on the selected parameters.
|
||||
|
||||
:param user: Optional user-id
|
||||
@ -785,14 +736,27 @@ def make_query(user=None, project=None, meter=None,
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" %
|
||||
rts_end)
|
||||
|
||||
sample_filter = None
|
||||
if q:
|
||||
sample_filter = " AND ".join(q)
|
||||
res_q = None
|
||||
if len(q):
|
||||
res_q = " AND ".join(q)
|
||||
|
||||
if metaquery:
|
||||
meta_q = []
|
||||
for k, v in metaquery.items():
|
||||
meta_q.append(
|
||||
"SingleColumnValueFilter ('f', '%s', =, 'binary:%s')"
|
||||
% ('r_' + k, v))
|
||||
meta_q = " AND ".join(meta_q)
|
||||
# join query and metaquery
|
||||
if res_q is not None:
|
||||
res_q += " AND " + meta_q
|
||||
else:
|
||||
res_q = meta_q # metaquery only
|
||||
|
||||
if query_only:
|
||||
return sample_filter
|
||||
return res_q
|
||||
else:
|
||||
return sample_filter, start_row, end_row
|
||||
return res_q, start_row, end_row
|
||||
|
||||
|
||||
def make_query_from_filter(sample_filter, require_meter=True):
|
||||
@ -806,9 +770,8 @@ def make_query_from_filter(sample_filter, require_meter=True):
|
||||
sample_filter.meter, sample_filter.resource,
|
||||
sample_filter.source, sample_filter.start,
|
||||
sample_filter.start_timestamp_op,
|
||||
sample_filter.end,
|
||||
sample_filter.end_timestamp_op,
|
||||
sample_filter.message_id,
|
||||
sample_filter.end, sample_filter.end_timestamp_op,
|
||||
sample_filter.metaquery, sample_filter.message_id,
|
||||
require_meter)
|
||||
|
||||
|
||||
@ -840,14 +803,6 @@ def _format_meter_reference(counter_name, counter_type, counter_unit):
|
||||
return "%s!%s!%s" % (counter_name, counter_type, counter_unit)
|
||||
|
||||
|
||||
def _metadata_from_document(doc):
|
||||
"""Extract resource metadata from HBase document using prefix specific
|
||||
to HBase implementation.
|
||||
"""
|
||||
return dict(
|
||||
(k[4:], v) for k, v in doc.iteritems() if k.startswith('f:r_'))
|
||||
|
||||
|
||||
def _timestamp_from_record_tuple(record):
|
||||
"""Extract timestamp from HBase tuple record
|
||||
"""
|
||||
|
Loading…
x
Reference in New Issue
Block a user