From 6c7954ee9d67956c0b6df57aaa9f03a67f39c509 Mon Sep 17 00:00:00 2001 From: liu-sheng Date: Thu, 8 May 2014 14:43:45 +0800 Subject: [PATCH] Improve the timestamp validation of ceilometer API Currently, ceilometer API is lack of timestamp validation. Timestamp in query should be checked if it can be parsed to iso timestamp, and raise explicit exception if not. Change-Id: I02cb2f30b00bde46bf8a0e4e161a0e2ce9d44c09 Closes-bug: #1270394 --- ceilometer/api/controllers/v2.py | 34 ++++++++++++++------------- ceilometer/tests/api/v2/test_query.py | 12 ++++++++++ 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index 3f8475fe8..540006535 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -590,8 +590,8 @@ def _get_query_timestamps(args=None): query_end: Final timestamp to use for query end_timestamp: end_timestamp parameter from request search_offset: search_offset parameter from request - """ + if args is None: return {'query_start': None, 'query_end': None, @@ -600,23 +600,25 @@ def _get_query_timestamps(args=None): 'search_offset': 0} search_offset = int(args.get('search_offset', 0)) + def _parse_timestamp(timestamp): + if not timestamp: + return None + try: + iso_timestamp = timeutils.parse_isotime(timestamp) + iso_timestamp = iso_timestamp.replace(tzinfo=None) + except ValueError: + raise wsme.exc.InvalidInput('timestamp', timestamp, + 'invalid timestamp format') + return iso_timestamp + start_timestamp = args.get('start_timestamp') - if start_timestamp: - start_timestamp = timeutils.parse_isotime(start_timestamp) - start_timestamp = start_timestamp.replace(tzinfo=None) - query_start = (start_timestamp - - datetime.timedelta(minutes=search_offset)) - else: - query_start = None - end_timestamp = args.get('end_timestamp') - if end_timestamp: - end_timestamp = timeutils.parse_isotime(end_timestamp) - end_timestamp = end_timestamp.replace(tzinfo=None) - query_end = end_timestamp + datetime.timedelta(minutes=search_offset) - else: - query_end = None - + start_timestamp = _parse_timestamp(start_timestamp) + end_timestamp = _parse_timestamp(end_timestamp) + query_start = start_timestamp - datetime.timedelta( + minutes=search_offset) if start_timestamp else None + query_end = end_timestamp + datetime.timedelta( + minutes=search_offset) if end_timestamp else None return {'query_start': query_start, 'query_end': query_end, 'start_timestamp': start_timestamp, diff --git a/ceilometer/tests/api/v2/test_query.py b/ceilometer/tests/api/v2/test_query.py index c23f5d384..5967e3b90 100644 --- a/ceilometer/tests/api/v2/test_query.py +++ b/ceilometer/tests/api/v2/test_query.py @@ -321,3 +321,15 @@ class TestQueryToKwArgs(tests_base.BaseTestCase): storage.SampleFilter.__init__) for o in ['user', 'project', 'resource']: self.assertEqual('fake_%s_id' % o, kwargs.get(o)) + + def test_timestamp_validation(self): + q = [api.Query(field='timestamp', + op='le', + value='123')] + + exc = self.assertRaises( + wsme.exc.InvalidInput, + api._query_to_kwargs, q, storage.SampleFilter.__init__) + expected_exc = wsme.exc.InvalidInput('timestamp', '123', + 'invalid timestamp format') + self.assertEqual(str(expected_exc), str(exc))