Merge "Improve performance of api requests with hbase scan"
This commit is contained in:
commit
2b562814cc
@ -334,7 +334,7 @@ class Connection(base.Connection):
|
||||
start=start_timestamp, start_timestamp_op=start_timestamp_op,
|
||||
end=end_timestamp, end_timestamp_op=end_timestamp_op,
|
||||
resource=resource, source=source, metaquery=metaquery)
|
||||
q, start_row, stop_row = make_sample_query_from_filter(
|
||||
q, start_row, stop_row, __ = make_sample_query_from_filter(
|
||||
sample_filter, require_meter=False)
|
||||
with self.conn_pool.connection() as conn:
|
||||
meter_table = conn.table(self.METER_TABLE)
|
||||
@ -425,17 +425,18 @@ class Connection(base.Connection):
|
||||
"""
|
||||
with self.conn_pool.connection() as conn:
|
||||
meter_table = conn.table(self.METER_TABLE)
|
||||
|
||||
q, start, stop = make_sample_query_from_filter(
|
||||
q, start, stop, columns = make_sample_query_from_filter(
|
||||
sample_filter, require_meter=False)
|
||||
LOG.debug(_("Query Meter Table: %s") % q)
|
||||
gen = meter_table.scan(filter=q, row_start=start, row_stop=stop)
|
||||
gen = meter_table.scan(filter=q, row_start=start, row_stop=stop,
|
||||
columns=columns)
|
||||
for ignored, meter in gen:
|
||||
if limit is not None:
|
||||
if limit == 0:
|
||||
break
|
||||
else:
|
||||
limit -= 1
|
||||
|
||||
d_meter = deserialize_entry(meter)[0]
|
||||
d_meter['message']['recorded_at'] = d_meter['recorded_at']
|
||||
yield models.Sample(**d_meter['message'])
|
||||
@ -486,10 +487,14 @@ class Connection(base.Connection):
|
||||
|
||||
with self.conn_pool.connection() as conn:
|
||||
meter_table = conn.table(self.METER_TABLE)
|
||||
q, start, stop = make_sample_query_from_filter(sample_filter)
|
||||
q, start, stop, columns = make_sample_query_from_filter(
|
||||
sample_filter)
|
||||
# These fields are used in statistics' calculating
|
||||
columns.extend(['f:timestamp', 'f:counter_volume',
|
||||
'f:counter_unit'])
|
||||
meters = map(deserialize_entry, list(meter for (ignored, meter) in
|
||||
meter_table.scan(filter=q, row_start=start,
|
||||
row_stop=stop)))
|
||||
row_stop=stop, columns=columns)))
|
||||
|
||||
if sample_filter.start:
|
||||
start_time = sample_filter.start
|
||||
@ -591,7 +596,7 @@ class MTable(object):
|
||||
if key in columns:
|
||||
ret[row] = data
|
||||
rows = ret
|
||||
elif filter:
|
||||
if filter:
|
||||
# TODO(jdanjou): we should really parse this properly,
|
||||
# but at the moment we are only going to support AND here
|
||||
filters = filter.split('AND')
|
||||
@ -788,6 +793,20 @@ def make_query(metaquery=None, **kwargs):
|
||||
return res_q
|
||||
|
||||
|
||||
def _get_meter_columns(metaquery, **kwargs):
|
||||
"""Return a list of required columns in meter table to be scanned .
|
||||
|
||||
:param metaquery: optional metaquery dict
|
||||
:param kwargs: key-value pairs to filter on. Key should be a real
|
||||
column name in db
|
||||
"""
|
||||
columns = ['f:message', 'f:recorded_at']
|
||||
columns.extend(["f:%s" % k for k, v in kwargs.items() if v])
|
||||
if metaquery:
|
||||
columns.extend(["f:r_%s" % k for k, v in metaquery.items() if v])
|
||||
return columns
|
||||
|
||||
|
||||
def make_sample_query_from_filter(sample_filter, require_meter=True):
|
||||
"""Return a query dictionary based on the settings in the filter.
|
||||
|
||||
@ -795,6 +814,7 @@ def make_sample_query_from_filter(sample_filter, require_meter=True):
|
||||
:param require_meter: If true and the filter does not have a meter,
|
||||
raise an error.
|
||||
"""
|
||||
|
||||
meter = sample_filter.meter
|
||||
if not meter and require_meter:
|
||||
raise RuntimeError('Missing required meter specifier')
|
||||
@ -804,20 +824,22 @@ def make_sample_query_from_filter(sample_filter, require_meter=True):
|
||||
end=sample_filter.end, end_op=sample_filter.end_timestamp_op,
|
||||
some_id=meter)
|
||||
|
||||
q = make_query(metaquery=sample_filter.metaquery,
|
||||
user_id=sample_filter.user,
|
||||
project_id=sample_filter.project,
|
||||
counter_name=meter,
|
||||
resource_id=sample_filter.resource,
|
||||
source=sample_filter.source,
|
||||
message_id=sample_filter.message_id)
|
||||
kwargs = dict(user_id=sample_filter.user,
|
||||
project_id=sample_filter.project,
|
||||
counter_name=meter,
|
||||
resource_id=sample_filter.resource,
|
||||
source=sample_filter.source,
|
||||
message_id=sample_filter.message_id)
|
||||
|
||||
q = make_query(metaquery=sample_filter.metaquery, **kwargs)
|
||||
|
||||
if q:
|
||||
ts_query = (" AND " + ts_query) if ts_query else ""
|
||||
res_q = q + ts_query if ts_query else q
|
||||
else:
|
||||
res_q = ts_query if ts_query else None
|
||||
return res_q, start_row, end_row
|
||||
columns = _get_meter_columns(metaquery=sample_filter.metaquery, **kwargs)
|
||||
return res_q, start_row, end_row, columns
|
||||
|
||||
|
||||
def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
|
||||
|
Loading…
Reference in New Issue
Block a user