[HBase] Refactor hbase.utils
This change implements a refactor for hbase.utils. Preparing of filters is shifted from make_events_query_from_filter to make_query method. Implicit using of comparison operators in filters has become explicit. Change-Id: Ib6ab7838c718c30a84dd5e59dcc986515dc54731
This commit is contained in:
parent
e60e5ccaa4
commit
2339036d8a
@ -135,9 +135,15 @@ class MTable(object):
|
||||
if op == '=':
|
||||
if column in data and data[column] == value:
|
||||
r[row] = data
|
||||
elif op == '<':
|
||||
if column in data and data[column] < value:
|
||||
r[row] = data
|
||||
elif op == '<=':
|
||||
if column in data and data[column] <= value:
|
||||
r[row] = data
|
||||
elif op == '>':
|
||||
if column in data and data[column] > value:
|
||||
r[row] = data
|
||||
elif op == '>=':
|
||||
if column in data and data[column] >= value:
|
||||
r[row] = data
|
||||
@ -218,7 +224,9 @@ class MTable(object):
|
||||
for key in data:
|
||||
if ((op == '=' and key.startswith(column)) or
|
||||
(op == '>=' and key >= column) or
|
||||
(op == '<=' and key <= column)):
|
||||
(op == '<=' and key <= column) or
|
||||
(op == '>' and key > column) or
|
||||
(op == '<' and key < column)):
|
||||
r_data[key] = data[key]
|
||||
else:
|
||||
raise NotImplementedError("In-memory QualifierFilter "
|
||||
|
@ -22,8 +22,11 @@ from ceilometer import utils
|
||||
|
||||
EVENT_TRAIT_TYPES = {'none': 0, 'string': 1, 'integer': 2, 'float': 3,
|
||||
'datetime': 4}
|
||||
OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>',
|
||||
'ge': '>='}
|
||||
OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>', 'ge': '>='}
|
||||
# We need this additional dictionary because we have reverted timestamp in
|
||||
# row-keys for stored metrics
|
||||
OP_SIGN_REV = {'eq': '=', 'lt': '>', 'le': '>=', 'ne': '!=', 'gt': '<',
|
||||
'ge': '<='}
|
||||
|
||||
|
||||
def _QualifierFilter(op, qualifier):
|
||||
@ -56,20 +59,13 @@ def make_events_query_from_filter(event_filter):
|
||||
Query is based on the selected parameter.
|
||||
:param event_filter: storage.EventFilter object.
|
||||
"""
|
||||
q = []
|
||||
res_q = None
|
||||
start = "%s" % (timestamp(event_filter.start_time, reverse=False)
|
||||
if event_filter.start_time else "")
|
||||
stop = "%s" % (timestamp(event_filter.end_time, reverse=False)
|
||||
if event_filter.end_time else "")
|
||||
if event_filter.event_type:
|
||||
q.append("SingleColumnValueFilter ('f', 'event_type', = , "
|
||||
"'binary:%s')" % dump(event_filter.event_type))
|
||||
if event_filter.message_id:
|
||||
q.append("RowFilter ( = , 'regexstring:\d*_%s')" %
|
||||
event_filter.message_id)
|
||||
if len(q):
|
||||
res_q = " AND ".join(q)
|
||||
kwargs = {'event_type': event_filter.event_type,
|
||||
'event_id': event_filter.message_id}
|
||||
res_q = make_query(**kwargs)
|
||||
|
||||
if event_filter.traits_filter:
|
||||
for trait_filter in event_filter.traits_filter:
|
||||
@ -95,20 +91,22 @@ def make_timestamp_query(func, start=None, start_op=None, end=None,
|
||||
:param func: a function that provide a format of row
|
||||
:param kwargs: kwargs for :param func
|
||||
"""
|
||||
rts_start, rts_end = get_start_end_rts(start, start_op, end, end_op)
|
||||
# We don't need to dump here because get_start_end_rts returns strings
|
||||
rts_start, rts_end = get_start_end_rts(start, end)
|
||||
start_row, end_row = func(rts_start, rts_end, **kwargs)
|
||||
|
||||
if bounds_only:
|
||||
return start_row, end_row
|
||||
|
||||
q = []
|
||||
# We don't need to dump here because get_start_end_rts returns strings
|
||||
start_op = start_op or 'ge'
|
||||
end_op = end_op or 'lt'
|
||||
if rts_start:
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" %
|
||||
rts_start)
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', %s, 'binary:%s')" %
|
||||
(OP_SIGN_REV[start_op], rts_start))
|
||||
if rts_end:
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" %
|
||||
rts_end)
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', %s, 'binary:%s')" %
|
||||
(OP_SIGN_REV[end_op], rts_end))
|
||||
|
||||
res_q = None
|
||||
if len(q):
|
||||
@ -117,17 +115,10 @@ def make_timestamp_query(func, start=None, start_op=None, end=None,
|
||||
return start_row, end_row, res_q
|
||||
|
||||
|
||||
def get_start_end_rts(start, start_op, end, end_op):
|
||||
|
||||
rts_start = str(timestamp(start) + 1) if start else ""
|
||||
rts_end = str(timestamp(end) + 1) if end else ""
|
||||
|
||||
# By default, we are using ge for lower bound and lt for upper bound
|
||||
if start_op == 'gt':
|
||||
rts_start = str(long(rts_start) - 2)
|
||||
if end_op == 'le':
|
||||
rts_end = str(long(rts_end) - 1)
|
||||
def get_start_end_rts(start, end):
|
||||
|
||||
rts_start = str(timestamp(start)) if start else ""
|
||||
rts_end = str(timestamp(end)) if end else ""
|
||||
return rts_start, rts_end
|
||||
|
||||
|
||||
@ -167,6 +158,8 @@ def make_query(metaquery=None, trait_query=None, **kwargs):
|
||||
(value, dump('1')))
|
||||
elif key == 'trait_type':
|
||||
q.append("ColumnPrefixFilter('%s')" % value)
|
||||
elif key == 'event_id':
|
||||
q.append("RowFilter ( = , 'regexstring:\d*_%s')" % value)
|
||||
else:
|
||||
q.append("SingleColumnValueFilter "
|
||||
"('f', '%s', =, 'binary:%s', true, true)" %
|
||||
@ -233,8 +226,7 @@ def make_sample_query_from_filter(sample_filter, require_meter=True):
|
||||
q = make_query(metaquery=sample_filter.metaquery, **kwargs)
|
||||
|
||||
if q:
|
||||
ts_query = (" AND " + ts_query) if ts_query else ""
|
||||
res_q = q + ts_query if ts_query else q
|
||||
res_q = q + " AND " + ts_query if ts_query else q
|
||||
else:
|
||||
res_q = ts_query if ts_query else None
|
||||
columns = get_meter_columns(metaquery=sample_filter.metaquery, **kwargs)
|
||||
@ -254,18 +246,18 @@ def make_meter_query_for_resource(start_timestamp, start_timestamp_op,
|
||||
:param source: source filter.
|
||||
:param query: a query string to concatenate with.
|
||||
"""
|
||||
start_rts, end_rts = get_start_end_rts(start_timestamp,
|
||||
start_timestamp_op,
|
||||
end_timestamp, end_timestamp_op)
|
||||
start_rts, end_rts = get_start_end_rts(start_timestamp, end_timestamp)
|
||||
mq = []
|
||||
start_op = start_timestamp_op or 'ge'
|
||||
end_op = end_timestamp_op or 'lt'
|
||||
|
||||
if start_rts:
|
||||
filter_value = start_rts + '+' + source if source else start_rts
|
||||
mq.append(_QualifierFilter("<=", filter_value))
|
||||
mq.append(_QualifierFilter(OP_SIGN_REV[start_op], filter_value))
|
||||
|
||||
if end_rts:
|
||||
filter_value = end_rts + '+' + source if source else end_rts
|
||||
mq.append(_QualifierFilter(">=", filter_value))
|
||||
mq.append(_QualifierFilter(OP_SIGN_REV[end_op], filter_value))
|
||||
|
||||
if mq:
|
||||
meter_q = " AND ".join(mq)
|
||||
|
Loading…
x
Reference in New Issue
Block a user