diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index dbe870825..8eca01ef8 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -325,11 +325,13 @@ def _verify_query_segregation(query, auth_project=None): '''Ensure non-admin queries are not constrained to another project.''' auth_project = (auth_project or acl.get_limited_to_project(pecan.request.headers)) - if auth_project: - for q in query: - if q.field == 'project_id' and (auth_project != q.value or - q.op != 'eq'): - raise ProjectNotAuthorized(q.value) + + if not auth_project: + return + + for q in query: + if q.field in ('project', 'project_id') and auth_project != q.value: + raise ProjectNotAuthorized(q.value) def _validate_query(query, db_func, internal_keys=None, diff --git a/ceilometer/tests/api/v2/test_alarm_scenarios.py b/ceilometer/tests/api/v2/test_alarm_scenarios.py index aa3481edf..e7d0cba1f 100644 --- a/ceilometer/tests/api/v2/test_alarm_scenarios.py +++ b/ceilometer/tests/api/v2/test_alarm_scenarios.py @@ -266,6 +266,55 @@ class TestAlarms(FunctionalTest, self.assertEqual(alarms[0]['alarm_id'], one['alarm_id']) self.assertEqual(alarms[0]['repeat_actions'], one['repeat_actions']) + def test_get_alarm_project_filter_wrong_op_normal_user(self): + project = self.auth_headers['X-Project-Id'] + + def _test(field, op): + response = self.get_json('/alarms', + q=[{'field': field, + 'op': op, + 'value': project}], + expect_errors=True, + status=400, + headers=self.auth_headers) + faultstring = ('Invalid input for field/attribute op. ' + 'Value: \'%(op)s\'. unimplemented operator ' + 'for %(field)s' % {'field': field, 'op': op}) + self.assertEqual(faultstring, + response.json['error_message']['faultstring']) + + _test('project', 'ne') + _test('project_id', 'ne') + + def test_get_alarm_project_filter_normal_user(self): + project = self.auth_headers['X-Project-Id'] + + def _test(field): + alarms = self.get_json('/alarms', + q=[{'field': field, + 'op': 'eq', + 'value': project}]) + self.assertEqual(4, len(alarms)) + + _test('project') + _test('project_id') + + def test_get_alarm_other_project_normal_user(self): + def _test(field): + response = self.get_json('/alarms', + q=[{'field': field, + 'op': 'eq', + 'value': 'other-project'}], + expect_errors=True, + status=401, + headers=self.auth_headers) + faultstring = 'Not Authorized to access project other-project' + self.assertEqual(faultstring, + response.json['error_message']['faultstring']) + + _test('project') + _test('project_id') + def test_post_alarm_wsme_workaround(self): jsons = { 'type': {