Filter query op:gt does not work as expected

At present, when using timestamp to filter resource or sample data, ceilometer
is using gte for start_timestamp and lt for end_timestamp.
This fix will pass the timestamp operator in the request to the storage level
to support gt operator for the start_timestamp and le for the end_stamp.

The changes will be applied to the get_resources and get_samples methods of
hbase, mongodb and sqlalchemy.

Change-Id: I3fb7651b588400df804ab8bb522084e766744f96
Fixes: Bug 1193926
This commit is contained in:
xingzhou 2013-07-02 03:02:44 -04:00
parent ef17d89e08
commit 0c841abf26
8 changed files with 206 additions and 45 deletions

View File

@ -171,16 +171,12 @@ def _query_to_kwargs(query, db_func):
metaquery = {}
for i in query:
if i.field == 'timestamp':
# FIXME(dhellmann): This logic is not consistent with the
# way the timestamps are treated inside the mongo driver
# (the end timestamp is always tested using $lt). We
# should just pass a single timestamp through to the
# storage layer with the operator and let the storage
# layer use that operator.
if i.op in ('lt', 'le'):
stamp['end_timestamp'] = i.value
stamp['end_timestamp_op'] = i.op
elif i.op in ('gt', 'ge'):
stamp['start_timestamp'] = i.value
stamp['start_timestamp_op'] = i.op
else:
LOG.warn('_query_to_kwargs ignoring %r unexpected op %r"' %
(i.field, i.op))
@ -209,6 +205,10 @@ def _query_to_kwargs(query, db_func):
else:
raise wsme.exc.UnknownArgument('timestamp',
"not valid for this resource")
if 'start_timestamp_op' in stamp:
kwargs['start_timestamp_op'] = stamp['start_timestamp_op']
if 'end_timestamp_op' in stamp:
kwargs['end_timestamp_op'] = stamp['end_timestamp_op']
if trans:
for k in trans:

View File

@ -81,19 +81,26 @@ class SampleFilter(object):
:param user: The sample owner.
:param project: The sample project.
:param start: Earliest timestamp to include.
:param end: Only include samples with timestamp less than this.
:param start: Earliest time point in the request.
:param start_timestamp_op: Earliest timestamp operation in the request.
:param end: Latest time point in the request.
:param end_timestamp_op: Latest timestamp operation in the request.
:param resource: Optional filter for resource id.
:param meter: Optional filter for meter type using the meter name.
:param source: Optional source filter.
:param metaquery: Optional filter on the metadata
"""
def __init__(self, user=None, project=None, start=None, end=None,
resource=None, meter=None, source=None, metaquery={}):
def __init__(self, user=None, project=None,
start=None, start_timestamp_op=None,
end=None, end_timestamp_op=None,
resource=None, meter=None,
source=None, metaquery={}):
self.user = user
self.project = project
self.start = utils.sanitize_timestamp(start)
self.start_timestamp_op = start_timestamp_op
self.end = utils.sanitize_timestamp(end)
self.end_timestamp_op = end_timestamp_op
self.resource = resource
self.meter = meter
self.source = source

View File

@ -98,7 +98,8 @@ class Connection(object):
@abc.abstractmethod
def get_resources(self, user=None, project=None, source=None,
start_timestamp=None, end_timestamp=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None,
metaquery={}, resource=None):
"""Return an iterable of models.Resource instances containing
resource information.
@ -107,7 +108,9 @@ class Connection(object):
:param project: Optional ID for project that owns the resource.
:param source: Optional source filter.
:param start_timestamp: Optional modified timestamp start range.
:param start_timestamp_op: Optional timestamp start range operation.
:param end_timestamp: Optional modified timestamp end range.
:param end_timestamp_op: Optional timestamp end range operation.
:param metaquery: Optional dict with metadata to match on.
:param resource: Optional resource filter.
"""

View File

@ -278,7 +278,8 @@ class Connection(base.Connection):
return (key for key, ignored in self.project.scan(**scan_args))
def get_resources(self, user=None, project=None, source=None,
start_timestamp=None, end_timestamp=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None,
metaquery={}):
"""Return an iterable of models.Resource instances
@ -286,7 +287,9 @@ class Connection(base.Connection):
:param project: Optional ID for project that owns the resource.
:param source: Optional source filter.
:param start_timestamp: Optional modified timestamp start range.
:param start_timestamp_op: Optional start time operator, like ge, gt.
:param end_timestamp: Optional modified timestamp end range.
:param end_timestamp_op: Optional end time operator, like lt, le.
:param metaquery: Optional dict with metadata to match on.
"""
def make_resource(data):
@ -313,7 +316,9 @@ class Connection(base.Connection):
project=project,
source=source,
start=start_timestamp,
start_op=start_timestamp_op,
end=end_timestamp,
end_op=end_timestamp_op,
require_meter=False,
query_only=False)
LOG.debug("Query Meter table: %s" % q)
@ -692,8 +697,8 @@ def reverse_timestamp(dt):
def make_query(user=None, project=None, meter=None,
resource=None, source=None, start=None, end=None,
require_meter=True, query_only=False):
resource=None, source=None, start=None, start_op=None,
end=None, end_op=None, require_meter=True, query_only=False):
"""Return a filter query string based on the selected parameters.
:param user: Optional user-id
@ -702,7 +707,9 @@ def make_query(user=None, project=None, meter=None,
:param resource: Optional resource-id
:param source: Optional source-id
:param start: Optional start timestamp
:param start_op: Optional start timestamp operator, like gt, ge
:param end: Optional end timestamp
:param end_op: Optional end timestamp operator, like lt, le
:param require_meter: If true and the filter does not have a meter,
raise an error.
:param query_only: If true only returns the filter query,
@ -727,6 +734,12 @@ def make_query(user=None, project=None, meter=None,
rts_start = str(reverse_timestamp(start) + 1) if start else ""
rts_end = str(reverse_timestamp(end) + 1) if end else ""
#By default, we are using ge for lower bound and lt for upper bound
if start_op == 'gt':
rts_start = str(long(rts_start) - 2)
if end_op == 'le':
rts_end = str(long(rts_end) - 1)
# when start_time and end_time is provided,
# if it's filtered by meter,
# rowkey will be used in the query;
@ -765,7 +778,10 @@ def make_query_from_filter(sample_filter, require_meter=True):
return make_query(sample_filter.user, sample_filter.project,
sample_filter.meter, sample_filter.resource,
sample_filter.source, sample_filter.start,
sample_filter.end, require_meter)
sample_filter.start_timestamp_op,
sample_filter.end,
sample_filter.end_timestamp_op,
require_meter)
def _make_rowkey_scan(meter, rts_start=None, rts_end=None):

View File

@ -77,7 +77,8 @@ class Connection(base.Connection):
return []
def get_resources(self, user=None, project=None, source=None,
start_timestamp=None, end_timestamp=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None,
metaquery={}, resource=None):
"""Return an iterable of dictionaries containing resource information.
@ -93,7 +94,9 @@ class Connection(base.Connection):
:param project: Optional ID for project that owns the resource.
:param source: Optional source filter.
:param start_timestamp: Optional modified timestamp start range.
:param start_timestamp_op: Optional start time operator, like gt, ge.
:param end_timestamp: Optional modified timestamp end range.
:param end_timestamp_op: Optional end time operator, like lt, le.
:param metaquery: Optional dict with metadata to match on.
:param resource: Optional resource filter.
"""

View File

@ -93,17 +93,28 @@ class MongoDBStorage(base.StorageEngine):
return Connection(conf)
def make_timestamp_range(start, end):
"""Given two possible datetimes, create the query
document to find timestamps within that range
using $gte for the lower bound and $lt for the
def make_timestamp_range(start, end,
start_timestamp_op=None, end_timestamp_op=None):
"""Given two possible datetimes and their operations, create the query
document to find timestamps within that range.
By default, using $gte for the lower bound and $lt for the
upper bound.
"""
ts_range = {}
if start:
ts_range['$gte'] = start
if start_timestamp_op == 'gt':
start_timestamp_op = '$gt'
else:
start_timestamp_op = '$gte'
ts_range[start_timestamp_op] = start
if end:
ts_range['$lt'] = end
if end_timestamp_op == 'le':
end_timestamp_op = '$lte'
else:
end_timestamp_op = '$lt'
ts_range[end_timestamp_op] = end
return ts_range
@ -126,7 +137,9 @@ def make_query_from_filter(sample_filter, require_meter=True):
elif require_meter:
raise RuntimeError('Missing required meter specifier')
ts_range = make_timestamp_range(sample_filter.start, sample_filter.end)
ts_range = make_timestamp_range(sample_filter.start, sample_filter.end,
sample_filter.start_timestamp_op,
sample_filter.end_timestamp_op)
if ts_range:
q['timestamp'] = ts_range
@ -355,7 +368,8 @@ class Connection(base.Connection):
return sorted(self.db.project.find(q).distinct('_id'))
def get_resources(self, user=None, project=None, source=None,
start_timestamp=None, end_timestamp=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None,
metaquery={}, resource=None):
"""Return an iterable of models.Resource instances
@ -363,7 +377,9 @@ class Connection(base.Connection):
:param project: Optional ID for project that owns the resource.
:param source: Optional source filter.
:param start_timestamp: Optional modified timestamp start range.
:param start_timestamp_op: Optional start time operator, like gt, ge.
:param end_timestamp: Optional modified timestamp end range.
:param end_timestamp_op: Optional end time operator, like lt, le.
:param metaquery: Optional dict with metadata to match on.
:param resource: Optional resource filter.
"""
@ -388,7 +404,9 @@ class Connection(base.Connection):
# Look for resources matching the above criteria and with
# samples in the time range we care about, then change the
# resource query to return just those resources by id.
ts_range = make_timestamp_range(start_timestamp, end_timestamp)
ts_range = make_timestamp_range(start_timestamp, end_timestamp,
start_timestamp_op,
end_timestamp_op)
if ts_range:
q['timestamp'] = ts_range

View File

@ -119,10 +119,16 @@ def make_query_from_filter(query, sample_filter, require_meter=True):
query = query.filter(Meter.sources.any(id=sample_filter.source))
if sample_filter.start:
ts_start = sample_filter.start
query = query.filter(Meter.timestamp >= ts_start)
if sample_filter.start_timestamp_op == 'gt':
query = query.filter(Meter.timestamp > ts_start)
else:
query = query.filter(Meter.timestamp >= ts_start)
if sample_filter.end:
ts_end = sample_filter.end
query = query.filter(Meter.timestamp < ts_end)
if sample_filter.end_timestamp_op == 'le':
query = query.filter(Meter.timestamp <= ts_end)
else:
query = query.filter(Meter.timestamp < ts_end)
if sample_filter.user:
query = query.filter_by(user_id=sample_filter.user)
if sample_filter.project:
@ -240,7 +246,8 @@ class Connection(base.Connection):
@staticmethod
def get_resources(user=None, project=None, source=None,
start_timestamp=None, end_timestamp=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None,
metaquery={}, resource=None):
"""Return an iterable of api_models.Resource instances
@ -248,7 +255,9 @@ class Connection(base.Connection):
:param project: Optional ID for project that owns the resource.
:param source: Optional source filter.
:param start_timestamp: Optional modified timestamp start range.
:param start_timestamp_op: Optonal start time operator, like gt, ge.
:param end_timestamp: Optional modified timestamp end range.
:param end_timestamp_op: Optional end time operator, like lt, le.
:param metaquery: Optional dict with metadata to match on.
:param resource: Optional resource filter.
"""
@ -259,9 +268,15 @@ class Connection(base.Connection):
if source is not None:
query = query.filter(Meter.sources.any(id=source))
if start_timestamp:
query = query.filter(Meter.timestamp >= start_timestamp)
if start_timestamp_op == 'gt':
query = query.filter(Meter.timestamp > start_timestamp)
else:
query = query.filter(Meter.timestamp >= start_timestamp)
if end_timestamp:
query = query.filter(Meter.timestamp < end_timestamp)
if end_timestamp_op == 'le':
query = query.filter(Meter.timestamp <= end_timestamp)
else:
query = query.filter(Meter.timestamp < end_timestamp)
if project is not None:
query = query.filter(Meter.project_id == project)
if resource is not None:

View File

@ -193,28 +193,81 @@ class ResourceTest(DBTestBase):
def test_get_resources_start_timestamp(self):
timestamp = datetime.datetime(2012, 7, 2, 10, 42)
resources = list(self.conn.get_resources(start_timestamp=timestamp))
resource_ids = [r.resource_id for r in resources]
expected = set(['resource-id-2', 'resource-id-3', 'resource-id-4',
'resource-id-6', 'resource-id-8'])
resources = list(self.conn.get_resources(start_timestamp=timestamp))
resource_ids = [r.resource_id for r in resources]
self.assertEqual(set(resource_ids), expected)
resources = list(self.conn.get_resources(start_timestamp=timestamp,
start_timestamp_op='ge'))
resource_ids = [r.resource_id for r in resources]
self.assertEqual(set(resource_ids), expected)
resources = list(self.conn.get_resources(start_timestamp=timestamp,
start_timestamp_op='gt'))
resource_ids = [r.resource_id for r in resources]
expected.remove('resource-id-2')
self.assertEqual(set(resource_ids), expected)
def test_get_resources_end_timestamp(self):
timestamp = datetime.datetime(2012, 7, 2, 10, 42)
resources = list(self.conn.get_resources(end_timestamp=timestamp))
resource_ids = [r.resource_id for r in resources]
expected = set(['resource-id', 'resource-id-alternate',
'resource-id-5', 'resource-id-7'])
resources = list(self.conn.get_resources(end_timestamp=timestamp))
resource_ids = [r.resource_id for r in resources]
self.assertEqual(set(resource_ids), expected)
resources = list(self.conn.get_resources(end_timestamp=timestamp,
end_timestamp_op='lt'))
resource_ids = [r.resource_id for r in resources]
self.assertEqual(set(resource_ids), expected)
resources = list(self.conn.get_resources(end_timestamp=timestamp,
end_timestamp_op='le'))
resource_ids = [r.resource_id for r in resources]
expected.add('resource-id-2')
self.assertEqual(set(resource_ids), expected)
def test_get_resources_both_timestamps(self):
start_ts = datetime.datetime(2012, 7, 2, 10, 42)
end_ts = datetime.datetime(2012, 7, 2, 10, 43)
resources = list(self.conn.get_resources(start_timestamp=start_ts,
end_timestamp=end_ts))
resource_ids = [r.resource_id for r in resources]
assert set(resource_ids) == set(['resource-id-2'])
resources = list(self.conn.get_resources(start_timestamp=start_ts,
end_timestamp=end_ts,
start_timestamp_op='ge',
end_timestamp_op='lt'))
resource_ids = [r.resource_id for r in resources]
assert set(resource_ids) == set(['resource-id-2'])
resources = list(self.conn.get_resources(start_timestamp=start_ts,
end_timestamp=end_ts,
start_timestamp_op='gt',
end_timestamp_op='lt'))
resource_ids = [r.resource_id for r in resources]
assert len(resource_ids) == 0
resources = list(self.conn.get_resources(start_timestamp=start_ts,
end_timestamp=end_ts,
start_timestamp_op='gt',
end_timestamp_op='le'))
resource_ids = [r.resource_id for r in resources]
assert set(resource_ids) == set(['resource-id-3'])
resources = list(self.conn.get_resources(start_timestamp=start_ts,
end_timestamp=end_ts,
start_timestamp_op='ge',
end_timestamp_op='le'))
resource_ids = [r.resource_id for r in resources]
assert set(resource_ids) == set(['resource-id-2', 'resource-id-3'])
def test_get_resources_by_source(self):
resources = list(self.conn.get_resources(source='test-1'))
assert len(resources) == 1
@ -353,33 +406,79 @@ class RawSampleTest(DBTestBase):
self.assertTrue(got_not_imp)
def test_get_samples_by_start_time(self):
timestamp = datetime.datetime(2012, 7, 2, 10, 41)
f = storage.SampleFilter(
user='user-id',
start=datetime.datetime(2012, 7, 2, 10, 41),
start=timestamp,
)
results = list(self.conn.get_samples(f))
assert len(results) == 1
assert results[0].timestamp == datetime.datetime(2012, 7, 2, 10, 41)
assert results[0].timestamp == timestamp
f.start_timestamp_op = 'ge'
results = list(self.conn.get_samples(f))
assert len(results) == 1
assert results[0].timestamp == timestamp
f.start_timestamp_op = 'gt'
results = list(self.conn.get_samples(f))
assert len(results) == 0
def test_get_samples_by_end_time(self):
timestamp = datetime.datetime(2012, 7, 2, 10, 40)
f = storage.SampleFilter(
user='user-id',
end=datetime.datetime(2012, 7, 2, 10, 41),
end=timestamp,
)
results = list(self.conn.get_samples(f))
length = len(results)
assert length == 1
assert results[0].timestamp == datetime.datetime(2012, 7, 2, 10, 40)
assert len(results) == 0
f.end_timestamp_op = 'lt'
results = list(self.conn.get_samples(f))
assert len(results) == 0
f.end_timestamp_op = 'le'
results = list(self.conn.get_samples(f))
assert len(results) == 1
assert results[0].timestamp == timestamp
def test_get_samples_by_both_times(self):
start_ts = datetime.datetime(2012, 7, 2, 10, 42)
end_ts = datetime.datetime(2012, 7, 2, 10, 43)
f = storage.SampleFilter(
start=datetime.datetime(2012, 7, 2, 10, 42),
end=datetime.datetime(2012, 7, 2, 10, 43),
start=start_ts,
end=end_ts,
)
results = list(self.conn.get_samples(f))
length = len(results)
assert length == 1
assert results[0].timestamp == datetime.datetime(2012, 7, 2, 10, 42)
assert len(results) == 1
assert results[0].timestamp == start_ts
f.start_timestamp_op = 'gt'
f.end_timestamp_op = 'lt'
results = list(self.conn.get_samples(f))
assert len(results) == 0
f.start_timestamp_op = 'ge'
f.end_timestamp_op = 'lt'
results = list(self.conn.get_samples(f))
assert len(results) == 1
assert results[0].timestamp == start_ts
f.start_timestamp_op = 'gt'
f.end_timestamp_op = 'le'
results = list(self.conn.get_samples(f))
assert len(results) == 1
assert results[0].timestamp == end_ts
f.start_timestamp_op = 'ge'
f.end_timestamp_op = 'le'
results = list(self.conn.get_samples(f))
assert len(results) == 2
assert results[0].timestamp == end_ts
assert results[1].timestamp == start_ts
def test_get_samples_by_name(self):
f = storage.SampleFilter(user='user-id', meter='no-such-meter')