Improve the timestamp validation of ceilometer API
Currently, ceilometer API is lack of timestamp validation. Timestamp in query should be checked if it can be parsed to iso timestamp, and raise explicit exception if not. Change-Id: I02cb2f30b00bde46bf8a0e4e161a0e2ce9d44c09 Closes-bug: #1270394
This commit is contained in:
parent
3263c39f9e
commit
6c7954ee9d
@ -590,8 +590,8 @@ def _get_query_timestamps(args=None):
|
||||
query_end: Final timestamp to use for query
|
||||
end_timestamp: end_timestamp parameter from request
|
||||
search_offset: search_offset parameter from request
|
||||
|
||||
"""
|
||||
|
||||
if args is None:
|
||||
return {'query_start': None,
|
||||
'query_end': None,
|
||||
@ -600,23 +600,25 @@ def _get_query_timestamps(args=None):
|
||||
'search_offset': 0}
|
||||
search_offset = int(args.get('search_offset', 0))
|
||||
|
||||
def _parse_timestamp(timestamp):
|
||||
if not timestamp:
|
||||
return None
|
||||
try:
|
||||
iso_timestamp = timeutils.parse_isotime(timestamp)
|
||||
iso_timestamp = iso_timestamp.replace(tzinfo=None)
|
||||
except ValueError:
|
||||
raise wsme.exc.InvalidInput('timestamp', timestamp,
|
||||
'invalid timestamp format')
|
||||
return iso_timestamp
|
||||
|
||||
start_timestamp = args.get('start_timestamp')
|
||||
if start_timestamp:
|
||||
start_timestamp = timeutils.parse_isotime(start_timestamp)
|
||||
start_timestamp = start_timestamp.replace(tzinfo=None)
|
||||
query_start = (start_timestamp -
|
||||
datetime.timedelta(minutes=search_offset))
|
||||
else:
|
||||
query_start = None
|
||||
|
||||
end_timestamp = args.get('end_timestamp')
|
||||
if end_timestamp:
|
||||
end_timestamp = timeutils.parse_isotime(end_timestamp)
|
||||
end_timestamp = end_timestamp.replace(tzinfo=None)
|
||||
query_end = end_timestamp + datetime.timedelta(minutes=search_offset)
|
||||
else:
|
||||
query_end = None
|
||||
|
||||
start_timestamp = _parse_timestamp(start_timestamp)
|
||||
end_timestamp = _parse_timestamp(end_timestamp)
|
||||
query_start = start_timestamp - datetime.timedelta(
|
||||
minutes=search_offset) if start_timestamp else None
|
||||
query_end = end_timestamp + datetime.timedelta(
|
||||
minutes=search_offset) if end_timestamp else None
|
||||
return {'query_start': query_start,
|
||||
'query_end': query_end,
|
||||
'start_timestamp': start_timestamp,
|
||||
|
@ -321,3 +321,15 @@ class TestQueryToKwArgs(tests_base.BaseTestCase):
|
||||
storage.SampleFilter.__init__)
|
||||
for o in ['user', 'project', 'resource']:
|
||||
self.assertEqual('fake_%s_id' % o, kwargs.get(o))
|
||||
|
||||
def test_timestamp_validation(self):
|
||||
q = [api.Query(field='timestamp',
|
||||
op='le',
|
||||
value='123')]
|
||||
|
||||
exc = self.assertRaises(
|
||||
wsme.exc.InvalidInput,
|
||||
api._query_to_kwargs, q, storage.SampleFilter.__init__)
|
||||
expected_exc = wsme.exc.InvalidInput('timestamp', '123',
|
||||
'invalid timestamp format')
|
||||
self.assertEqual(str(expected_exc), str(exc))
|
||||
|
Loading…
x
Reference in New Issue
Block a user