diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index 5cb904c22..11ae133ca 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -33,6 +33,7 @@ from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common import network_utils from ceilometer.openstack.common import timeutils +from ceilometer import storage from ceilometer.storage import base from ceilometer.storage import models from ceilometer import utils @@ -203,10 +204,9 @@ class Connection(base.Connection): #TODO(nprivalova): to be refactored if enabled is not None: enabled = json.dumps(enabled) + q = make_query(alarm_id=alarm_id, name=name, enabled=enabled, + user_id=user, project_id=project) - q = make_query(require_meter=False, query_only=True, - alarm_id=alarm_id, name=name, - enabled=enabled, user_id=user, project_id=project) gen = alarm_table.scan(filter=q) for ignored, data in gen: stored_alarm = deserialize_entry(data) @@ -218,26 +218,17 @@ class Connection(base.Connection): end_timestamp=None, end_timestamp_op=None): alarm_history_table = self.conn.table(self.ALARM_HISTORY_TABLE) - q = make_query(start=start_timestamp, - start_op=start_timestamp_op, - end=end_timestamp, - end_op=end_timestamp_op, - require_meter=False, query_only=True, - include_rts_in_filters=False, alarm_id=alarm_id, - on_behalf_of=on_behalf_of, type=type, user_id=user, - project_id=project) - #TODO(nprivalova): refactor make_query to move this stuff there - rts_start = str(reverse_timestamp(start_timestamp) + 1) \ - if start_timestamp else "" - rts_end = str(reverse_timestamp(end_timestamp) + 1) \ - if end_timestamp else "" - if start_timestamp_op == 'gt': - rts_start = str(long(rts_start) - 2) - if end_timestamp_op == 'le': - rts_end = str(long(rts_end) - 1) + q = make_query(alarm_id=alarm_id, on_behalf_of=on_behalf_of, type=type, + user_id=user, project_id=project) - gen = alarm_history_table.scan(filter=q, row_start=rts_end, - row_stop=rts_start) + start_row, end_row = make_timestamp_query( + _make_general_rowkey_scan, + start=start_timestamp, start_op=start_timestamp_op, + end=end_timestamp, end_op=end_timestamp_op, bounds_only=True, + some_id=alarm_id) + + gen = alarm_history_table.scan(filter=q, row_start=start_row, + row_stop=end_row) for ignored, data in gen: stored_entry = deserialize_entry(data) # It is needed to return 'details' field as string @@ -420,17 +411,15 @@ class Connection(base.Connection): ) meter_table = self.conn.table(self.METER_TABLE) - q, start_row, stop_row = make_query(user_id=user, - project_id=project, - source=source, - resource_id=resource, - start=start_timestamp, - start_op=start_timestamp_op, - end=end_timestamp, - end_op=end_timestamp_op, - metaquery=metaquery, - require_meter=False, - query_only=False) + sample_filter = storage.SampleFilter( + user=user, project=project, + start=start_timestamp, start_timestamp_op=start_timestamp_op, + end=end_timestamp, end_timestamp_op=end_timestamp_op, + resource=resource, source=source, metaquery=metaquery) + + q, start_row, stop_row = make_sample_query_from_filter( + sample_filter, require_meter=False) + LOG.debug(_("Query Meter table: %s") % q) meters = meter_table.scan(filter=q, row_start=start_row, row_stop=stop_row) @@ -469,9 +458,8 @@ class Connection(base.Connection): if pagination: raise NotImplementedError(_('Pagination not implemented')) resource_table = self.conn.table(self.RESOURCE_TABLE) - q = make_query(user_id=user, project_id=project, resource_id=resource, - source=source, metaquery=metaquery, - require_meter=False, query_only=True) + q = make_query(metaquery=metaquery, user_id=user, project_id=project, + resource_id=resource, source=source) LOG.debug(_("Query Resource table: %s") % q) gen = resource_table.scan(filter=q) @@ -515,8 +503,8 @@ class Connection(base.Connection): meter_table = self.conn.table(self.METER_TABLE) - q, start, stop = make_query_from_filter(sample_filter, - require_meter=False) + q, start, stop = make_sample_query_from_filter( + sample_filter, require_meter=False) LOG.debug(_("Query Meter Table: %s") % q) gen = meter_table.scan(filter=q, row_start=start, row_stop=stop) for ignored, meter in gen: @@ -568,9 +556,7 @@ class Connection(base.Connection): raise NotImplementedError("Group by not implemented.") meter_table = self.conn.table(self.METER_TABLE) - - q, start, stop = make_query_from_filter(sample_filter) - + q, start, stop = make_sample_query_from_filter(sample_filter) meters = list(meter for (ignored, meter) in meter_table.scan(filter=q, row_start=start, row_stop=stop) @@ -764,30 +750,41 @@ def reverse_timestamp(dt): return 0x7fffffffffffffff - ts -def make_query(meter=None, start=None, start_op=None, - end=None, end_op=None, metaquery=None, - require_meter=True, query_only=False, - include_rts_in_filters=True, **kwargs): - """Return a filter query string based on the selected parameters. - - :param meter: Optional counter-name +def make_timestamp_query(func, start=None, start_op=None, end=None, + end_op=None, bounds_only=False, **kwargs): + """Return a filter start and stop row for filtering and a query + which based on the fact that CF-name is 'rts' :param start: Optional start timestamp :param start_op: Optional start timestamp operator, like gt, ge :param end: Optional end timestamp :param end_op: Optional end timestamp operator, like lt, le - :param require_meter: If true and the filter does not have a meter, - raise an error. - :param query_only: If true only returns the filter query, - otherwise also returns start and stop rowkeys + :param bounds_only: if True than query will not be returned + :param func: a function that provide a format of row + :param kwargs: kwargs for :param func """ + rts_start, rts_end = get_start_end_rts(start, start_op, end, end_op) + start_row, end_row = func(rts_start, rts_end, **kwargs) + + if bounds_only: + return start_row, end_row + q = [] + if rts_start: + q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" % + rts_start) + if rts_end: + q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" % + rts_end) - for key, value in kwargs.iteritems(): - if value is not None: - q.append("SingleColumnValueFilter " - "('f', '%s', =, 'binary:%s')" % (key, value)) + res_q = None + if len(q): + res_q = " AND ".join(q) + + return start_row, end_row, res_q + + +def get_start_end_rts(start, start_op, end, end_op): - start_row, end_row = "", "" rts_start = str(reverse_timestamp(start) + 1) if start else "" rts_end = str(reverse_timestamp(end) + 1) if end else "" @@ -797,26 +794,22 @@ def make_query(meter=None, start=None, start_op=None, if end_op == 'le': rts_end = str(long(rts_end) - 1) - # when start_time and end_time is provided, - # if it's filtered by meter, - # rowkey will be used in the query; - # else it's non meter filter query(e.g. project_id, user_id etc), - # SingleColumnValueFilter against rts will be appended to the query - # query other tables should have no start and end passed in - if meter: - start_row, end_row = _make_rowkey_scan(meter, rts_start, rts_end) - q.append("SingleColumnValueFilter " - "('f', 'counter_name', =, 'binary:%s')" % meter) - elif require_meter: - raise RuntimeError('Missing required meter specifier') - elif include_rts_in_filters: - if rts_start: - q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" % - rts_start) - if rts_end: - q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" % - rts_end) + return rts_start, rts_end + +def make_query(metaquery=None, **kwargs): + """Return a filter query string based on the selected parameters. + + :param metaquery: optional metaquery dict + :param kwargs: key-value pairs to filter on. Key should be a real + column name in db + """ + q = [] + + for key, value in kwargs.iteritems(): + if value is not None: + q.append("SingleColumnValueFilter " + "('f', '%s', =, 'binary:%s')" % (key, value)) res_q = None if len(q): res_q = " AND ".join(q) @@ -834,41 +827,51 @@ def make_query(meter=None, start=None, start_op=None, else: res_q = meta_q # metaquery only - if query_only: - return res_q - else: - return res_q, start_row, end_row + return res_q or "" -def make_query_from_filter(sample_filter, require_meter=True): +def make_sample_query_from_filter(sample_filter, require_meter=True): """Return a query dictionary based on the settings in the filter. :param sample_filter: SampleFilter instance :param require_meter: If true and the filter does not have a meter, raise an error. """ - return make_query(user_id=sample_filter.user, - project_id=sample_filter.project, - meter=sample_filter.meter, - resource_id=sample_filter.resource, - source=sample_filter.source, - start=sample_filter.start, - start_op=sample_filter.start_timestamp_op, - end=sample_filter.end, - end_op=sample_filter.end_timestamp_op, - metaquery=sample_filter.metaquery, - message_id=sample_filter.message_id, - require_meter=require_meter) + meter = sample_filter.meter + if not meter and require_meter: + raise RuntimeError('Missing required meter specifier') + start_row, end_row, ts_query = make_timestamp_query( + _make_general_rowkey_scan, + start=sample_filter.start, start_op=sample_filter.start_timestamp_op, + end=sample_filter.end, end_op=sample_filter.end_timestamp_op, + some_id=meter) + + q = make_query(metaquery=sample_filter.metaquery, + user_id=sample_filter.user, + project_id=sample_filter.project, + counter_name=meter, + resource_id=sample_filter.resource, + source=sample_filter.source, + message_id=sample_filter.message_id) + + if q: + ts_query = (" AND " + ts_query) if ts_query else "" + res_q = q + ts_query if ts_query else q + else: + res_q = ts_query if ts_query else None + return res_q, start_row, end_row -def _make_rowkey_scan(meter, rts_start=None, rts_end=None): - """If it's meter filter without start and end, - start_row = meter while end_row = meter + MAX_BYTE +def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None): + """If it's filter on some_id without start and end, + start_row = some_id while end_row = some_id + MAX_BYTE """ + if some_id is None: + return None, None if not rts_start: rts_start = chr(127) - end_row = "%s_%s" % (meter, rts_start) - start_row = "%s_%s" % (meter, rts_end) + end_row = "%s_%s" % (some_id, rts_start) + start_row = "%s_%s" % (some_id, rts_end) return start_row, end_row