Merge "[Hbase] Add column for source filter in _get_meter_samples"
This commit is contained in:
commit
a31b137539
@ -185,17 +185,39 @@ def make_query(metaquery=None, trait_query=None, **kwargs):
|
||||
return res_q
|
||||
|
||||
|
||||
def get_meter_columns(metaquery, **kwargs):
|
||||
"""Return a list of required columns in meter table to be scanned .
|
||||
def get_meter_columns(metaquery=None, need_timestamp=False, **kwargs):
|
||||
"""Return a list of required columns in meter table to be scanned.
|
||||
|
||||
SingleColumnFilter has 'columns' filter that should be used to determine
|
||||
what columns we are interested in. But if we want to use 'filter' and
|
||||
'columns' together we have to include columns we are filtering by
|
||||
to columns list.
|
||||
|
||||
Please see an example: If we make scan with filter
|
||||
"SingleColumnValueFilter ('f', 's_test-1', =, 'binary:\"1\"')"
|
||||
and columns ['f:rts'], the output will be always empty
|
||||
because only 'rts' will be returned and filter will be applied
|
||||
to this data so 's_test-1' cannot be find.
|
||||
To make this request correct it should be fixed as follows:
|
||||
filter = "SingleColumnValueFilter ('f', 's_test-1', =, 'binary:\"1\"')",
|
||||
columns = ['f:rts','f:s_test-1']}
|
||||
|
||||
:param metaquery: optional metaquery dict
|
||||
:param need_timestamp: flag, which defines the need for timestamp columns
|
||||
:param kwargs: key-value pairs to filter on. Key should be a real
|
||||
column name in db
|
||||
"""
|
||||
columns = ['f:message', 'f:recorded_at']
|
||||
columns.extend("f:%s" % k for k, v in kwargs.items() if v)
|
||||
columns.extend("f:%s" % k for k, v in kwargs.items()
|
||||
if v is not None)
|
||||
if metaquery:
|
||||
columns.extend("f:r_%s" % k for k, v in metaquery.items() if v)
|
||||
columns.extend("f:r_%s" % k for k, v in metaquery.items()
|
||||
if v is not None)
|
||||
source = kwargs.get('source')
|
||||
if source:
|
||||
columns.append("f:s_%s" % source)
|
||||
if need_timestamp:
|
||||
columns.extend(['f:rts', 'f:timestamp'])
|
||||
return columns
|
||||
|
||||
|
||||
@ -215,7 +237,6 @@ def make_sample_query_from_filter(sample_filter, require_meter=True):
|
||||
start=sample_filter.start, start_op=sample_filter.start_timestamp_op,
|
||||
end=sample_filter.end, end_op=sample_filter.end_timestamp_op,
|
||||
some_id=meter)
|
||||
|
||||
kwargs = dict(user_id=sample_filter.user,
|
||||
project_id=sample_filter.project,
|
||||
counter_name=meter,
|
||||
@ -229,7 +250,10 @@ def make_sample_query_from_filter(sample_filter, require_meter=True):
|
||||
res_q = q + " AND " + ts_query if ts_query else q
|
||||
else:
|
||||
res_q = ts_query if ts_query else None
|
||||
columns = get_meter_columns(metaquery=sample_filter.metaquery, **kwargs)
|
||||
|
||||
need_timestamp = (sample_filter.start or sample_filter.end) is not None
|
||||
columns = get_meter_columns(metaquery=sample_filter.metaquery,
|
||||
need_timestamp=need_timestamp, **kwargs)
|
||||
return res_q, start_row, end_row, columns
|
||||
|
||||
|
||||
|
@ -380,7 +380,7 @@ class Connection(base.Connection):
|
||||
(sample_filter, require_meter=False))
|
||||
LOG.debug(_("Query Meter Table: %s") % q)
|
||||
gen = meter_table.scan(filter=q, row_start=start, row_stop=stop,
|
||||
limit=limit)
|
||||
limit=limit, columns=columns)
|
||||
for ignored, meter in gen:
|
||||
d_meter = hbase_utils.deserialize_entry(meter)[0]
|
||||
d_meter['message']['recorded_at'] = d_meter['recorded_at']
|
||||
|
Loading…
x
Reference in New Issue
Block a user