Alarms support in HBase Part 2
Refactor query creation: it is needed to make two different methods for query creation and start and stop - rows determination. Partialy implements bp hbase-alarming Change-Id: I2a2a728597d92c27b86e4a93dd0f151b6f52403b
This commit is contained in:
parent
c1d3ffa2ca
commit
b4bb537e90
@ -33,6 +33,7 @@ from ceilometer.openstack.common.gettextutils import _ # noqa
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.openstack.common import network_utils
|
||||
from ceilometer.openstack.common import timeutils
|
||||
from ceilometer import storage
|
||||
from ceilometer.storage import base
|
||||
from ceilometer.storage import models
|
||||
from ceilometer import utils
|
||||
@ -203,10 +204,9 @@ class Connection(base.Connection):
|
||||
#TODO(nprivalova): to be refactored
|
||||
if enabled is not None:
|
||||
enabled = json.dumps(enabled)
|
||||
q = make_query(alarm_id=alarm_id, name=name, enabled=enabled,
|
||||
user_id=user, project_id=project)
|
||||
|
||||
q = make_query(require_meter=False, query_only=True,
|
||||
alarm_id=alarm_id, name=name,
|
||||
enabled=enabled, user_id=user, project_id=project)
|
||||
gen = alarm_table.scan(filter=q)
|
||||
for ignored, data in gen:
|
||||
stored_alarm = deserialize_entry(data)
|
||||
@ -218,26 +218,17 @@ class Connection(base.Connection):
|
||||
end_timestamp=None, end_timestamp_op=None):
|
||||
alarm_history_table = self.conn.table(self.ALARM_HISTORY_TABLE)
|
||||
|
||||
q = make_query(start=start_timestamp,
|
||||
start_op=start_timestamp_op,
|
||||
end=end_timestamp,
|
||||
end_op=end_timestamp_op,
|
||||
require_meter=False, query_only=True,
|
||||
include_rts_in_filters=False, alarm_id=alarm_id,
|
||||
on_behalf_of=on_behalf_of, type=type, user_id=user,
|
||||
project_id=project)
|
||||
#TODO(nprivalova): refactor make_query to move this stuff there
|
||||
rts_start = str(reverse_timestamp(start_timestamp) + 1) \
|
||||
if start_timestamp else ""
|
||||
rts_end = str(reverse_timestamp(end_timestamp) + 1) \
|
||||
if end_timestamp else ""
|
||||
if start_timestamp_op == 'gt':
|
||||
rts_start = str(long(rts_start) - 2)
|
||||
if end_timestamp_op == 'le':
|
||||
rts_end = str(long(rts_end) - 1)
|
||||
q = make_query(alarm_id=alarm_id, on_behalf_of=on_behalf_of, type=type,
|
||||
user_id=user, project_id=project)
|
||||
|
||||
gen = alarm_history_table.scan(filter=q, row_start=rts_end,
|
||||
row_stop=rts_start)
|
||||
start_row, end_row = make_timestamp_query(
|
||||
_make_general_rowkey_scan,
|
||||
start=start_timestamp, start_op=start_timestamp_op,
|
||||
end=end_timestamp, end_op=end_timestamp_op, bounds_only=True,
|
||||
some_id=alarm_id)
|
||||
|
||||
gen = alarm_history_table.scan(filter=q, row_start=start_row,
|
||||
row_stop=end_row)
|
||||
for ignored, data in gen:
|
||||
stored_entry = deserialize_entry(data)
|
||||
# It is needed to return 'details' field as string
|
||||
@ -420,17 +411,15 @@ class Connection(base.Connection):
|
||||
)
|
||||
meter_table = self.conn.table(self.METER_TABLE)
|
||||
|
||||
q, start_row, stop_row = make_query(user_id=user,
|
||||
project_id=project,
|
||||
source=source,
|
||||
resource_id=resource,
|
||||
start=start_timestamp,
|
||||
start_op=start_timestamp_op,
|
||||
end=end_timestamp,
|
||||
end_op=end_timestamp_op,
|
||||
metaquery=metaquery,
|
||||
require_meter=False,
|
||||
query_only=False)
|
||||
sample_filter = storage.SampleFilter(
|
||||
user=user, project=project,
|
||||
start=start_timestamp, start_timestamp_op=start_timestamp_op,
|
||||
end=end_timestamp, end_timestamp_op=end_timestamp_op,
|
||||
resource=resource, source=source, metaquery=metaquery)
|
||||
|
||||
q, start_row, stop_row = make_sample_query_from_filter(
|
||||
sample_filter, require_meter=False)
|
||||
|
||||
LOG.debug(_("Query Meter table: %s") % q)
|
||||
meters = meter_table.scan(filter=q, row_start=start_row,
|
||||
row_stop=stop_row)
|
||||
@ -469,9 +458,8 @@ class Connection(base.Connection):
|
||||
if pagination:
|
||||
raise NotImplementedError(_('Pagination not implemented'))
|
||||
resource_table = self.conn.table(self.RESOURCE_TABLE)
|
||||
q = make_query(user_id=user, project_id=project, resource_id=resource,
|
||||
source=source, metaquery=metaquery,
|
||||
require_meter=False, query_only=True)
|
||||
q = make_query(metaquery=metaquery, user_id=user, project_id=project,
|
||||
resource_id=resource, source=source)
|
||||
LOG.debug(_("Query Resource table: %s") % q)
|
||||
|
||||
gen = resource_table.scan(filter=q)
|
||||
@ -515,8 +503,8 @@ class Connection(base.Connection):
|
||||
|
||||
meter_table = self.conn.table(self.METER_TABLE)
|
||||
|
||||
q, start, stop = make_query_from_filter(sample_filter,
|
||||
require_meter=False)
|
||||
q, start, stop = make_sample_query_from_filter(
|
||||
sample_filter, require_meter=False)
|
||||
LOG.debug(_("Query Meter Table: %s") % q)
|
||||
gen = meter_table.scan(filter=q, row_start=start, row_stop=stop)
|
||||
for ignored, meter in gen:
|
||||
@ -568,9 +556,7 @@ class Connection(base.Connection):
|
||||
raise NotImplementedError("Group by not implemented.")
|
||||
|
||||
meter_table = self.conn.table(self.METER_TABLE)
|
||||
|
||||
q, start, stop = make_query_from_filter(sample_filter)
|
||||
|
||||
q, start, stop = make_sample_query_from_filter(sample_filter)
|
||||
meters = list(meter for (ignored, meter) in
|
||||
meter_table.scan(filter=q, row_start=start,
|
||||
row_stop=stop)
|
||||
@ -764,30 +750,41 @@ def reverse_timestamp(dt):
|
||||
return 0x7fffffffffffffff - ts
|
||||
|
||||
|
||||
def make_query(meter=None, start=None, start_op=None,
|
||||
end=None, end_op=None, metaquery=None,
|
||||
require_meter=True, query_only=False,
|
||||
include_rts_in_filters=True, **kwargs):
|
||||
"""Return a filter query string based on the selected parameters.
|
||||
|
||||
:param meter: Optional counter-name
|
||||
def make_timestamp_query(func, start=None, start_op=None, end=None,
|
||||
end_op=None, bounds_only=False, **kwargs):
|
||||
"""Return a filter start and stop row for filtering and a query
|
||||
which based on the fact that CF-name is 'rts'
|
||||
:param start: Optional start timestamp
|
||||
:param start_op: Optional start timestamp operator, like gt, ge
|
||||
:param end: Optional end timestamp
|
||||
:param end_op: Optional end timestamp operator, like lt, le
|
||||
:param require_meter: If true and the filter does not have a meter,
|
||||
raise an error.
|
||||
:param query_only: If true only returns the filter query,
|
||||
otherwise also returns start and stop rowkeys
|
||||
:param bounds_only: if True than query will not be returned
|
||||
:param func: a function that provide a format of row
|
||||
:param kwargs: kwargs for :param func
|
||||
"""
|
||||
rts_start, rts_end = get_start_end_rts(start, start_op, end, end_op)
|
||||
start_row, end_row = func(rts_start, rts_end, **kwargs)
|
||||
|
||||
if bounds_only:
|
||||
return start_row, end_row
|
||||
|
||||
q = []
|
||||
if rts_start:
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" %
|
||||
rts_start)
|
||||
if rts_end:
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" %
|
||||
rts_end)
|
||||
|
||||
for key, value in kwargs.iteritems():
|
||||
if value is not None:
|
||||
q.append("SingleColumnValueFilter "
|
||||
"('f', '%s', =, 'binary:%s')" % (key, value))
|
||||
res_q = None
|
||||
if len(q):
|
||||
res_q = " AND ".join(q)
|
||||
|
||||
return start_row, end_row, res_q
|
||||
|
||||
|
||||
def get_start_end_rts(start, start_op, end, end_op):
|
||||
|
||||
start_row, end_row = "", ""
|
||||
rts_start = str(reverse_timestamp(start) + 1) if start else ""
|
||||
rts_end = str(reverse_timestamp(end) + 1) if end else ""
|
||||
|
||||
@ -797,26 +794,22 @@ def make_query(meter=None, start=None, start_op=None,
|
||||
if end_op == 'le':
|
||||
rts_end = str(long(rts_end) - 1)
|
||||
|
||||
# when start_time and end_time is provided,
|
||||
# if it's filtered by meter,
|
||||
# rowkey will be used in the query;
|
||||
# else it's non meter filter query(e.g. project_id, user_id etc),
|
||||
# SingleColumnValueFilter against rts will be appended to the query
|
||||
# query other tables should have no start and end passed in
|
||||
if meter:
|
||||
start_row, end_row = _make_rowkey_scan(meter, rts_start, rts_end)
|
||||
q.append("SingleColumnValueFilter "
|
||||
"('f', 'counter_name', =, 'binary:%s')" % meter)
|
||||
elif require_meter:
|
||||
raise RuntimeError('Missing required meter specifier')
|
||||
elif include_rts_in_filters:
|
||||
if rts_start:
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" %
|
||||
rts_start)
|
||||
if rts_end:
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" %
|
||||
rts_end)
|
||||
return rts_start, rts_end
|
||||
|
||||
|
||||
def make_query(metaquery=None, **kwargs):
|
||||
"""Return a filter query string based on the selected parameters.
|
||||
|
||||
:param metaquery: optional metaquery dict
|
||||
:param kwargs: key-value pairs to filter on. Key should be a real
|
||||
column name in db
|
||||
"""
|
||||
q = []
|
||||
|
||||
for key, value in kwargs.iteritems():
|
||||
if value is not None:
|
||||
q.append("SingleColumnValueFilter "
|
||||
"('f', '%s', =, 'binary:%s')" % (key, value))
|
||||
res_q = None
|
||||
if len(q):
|
||||
res_q = " AND ".join(q)
|
||||
@ -834,41 +827,51 @@ def make_query(meter=None, start=None, start_op=None,
|
||||
else:
|
||||
res_q = meta_q # metaquery only
|
||||
|
||||
if query_only:
|
||||
return res_q
|
||||
else:
|
||||
return res_q, start_row, end_row
|
||||
return res_q or ""
|
||||
|
||||
|
||||
def make_query_from_filter(sample_filter, require_meter=True):
|
||||
def make_sample_query_from_filter(sample_filter, require_meter=True):
|
||||
"""Return a query dictionary based on the settings in the filter.
|
||||
|
||||
:param sample_filter: SampleFilter instance
|
||||
:param require_meter: If true and the filter does not have a meter,
|
||||
raise an error.
|
||||
"""
|
||||
return make_query(user_id=sample_filter.user,
|
||||
project_id=sample_filter.project,
|
||||
meter=sample_filter.meter,
|
||||
resource_id=sample_filter.resource,
|
||||
source=sample_filter.source,
|
||||
start=sample_filter.start,
|
||||
start_op=sample_filter.start_timestamp_op,
|
||||
end=sample_filter.end,
|
||||
end_op=sample_filter.end_timestamp_op,
|
||||
metaquery=sample_filter.metaquery,
|
||||
message_id=sample_filter.message_id,
|
||||
require_meter=require_meter)
|
||||
meter = sample_filter.meter
|
||||
if not meter and require_meter:
|
||||
raise RuntimeError('Missing required meter specifier')
|
||||
start_row, end_row, ts_query = make_timestamp_query(
|
||||
_make_general_rowkey_scan,
|
||||
start=sample_filter.start, start_op=sample_filter.start_timestamp_op,
|
||||
end=sample_filter.end, end_op=sample_filter.end_timestamp_op,
|
||||
some_id=meter)
|
||||
|
||||
q = make_query(metaquery=sample_filter.metaquery,
|
||||
user_id=sample_filter.user,
|
||||
project_id=sample_filter.project,
|
||||
counter_name=meter,
|
||||
resource_id=sample_filter.resource,
|
||||
source=sample_filter.source,
|
||||
message_id=sample_filter.message_id)
|
||||
|
||||
if q:
|
||||
ts_query = (" AND " + ts_query) if ts_query else ""
|
||||
res_q = q + ts_query if ts_query else q
|
||||
else:
|
||||
res_q = ts_query if ts_query else None
|
||||
return res_q, start_row, end_row
|
||||
|
||||
|
||||
def _make_rowkey_scan(meter, rts_start=None, rts_end=None):
|
||||
"""If it's meter filter without start and end,
|
||||
start_row = meter while end_row = meter + MAX_BYTE
|
||||
def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
|
||||
"""If it's filter on some_id without start and end,
|
||||
start_row = some_id while end_row = some_id + MAX_BYTE
|
||||
"""
|
||||
if some_id is None:
|
||||
return None, None
|
||||
if not rts_start:
|
||||
rts_start = chr(127)
|
||||
end_row = "%s_%s" % (meter, rts_start)
|
||||
start_row = "%s_%s" % (meter, rts_end)
|
||||
end_row = "%s_%s" % (some_id, rts_start)
|
||||
start_row = "%s_%s" % (some_id, rts_end)
|
||||
|
||||
return start_row, end_row
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user