diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index e0a4c0839..5058c2454 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -272,9 +272,10 @@ class Query(_Base): class ProjectNotAuthorized(ClientSideError): - def __init__(self, id): + def __init__(self, id, aspect='project'): + params = dict(aspect=aspect, id=id) super(ProjectNotAuthorized, self).__init__( - _("Not Authorized to access project %s") % id, + _("Not Authorized to access %(aspect)s %(id)s") % params, status_code=401) @@ -2037,15 +2038,24 @@ class AlarmsController(rest.RestController): now = timeutils.utcnow() data.alarm_id = str(uuid.uuid4()) - user, project = acl.get_limited_to(pecan.request.headers) - if user: - data.user_id = user - elif data.user_id == wtypes.Unset: - data.user_id = pecan.request.headers.get('X-User-Id') - if project: - data.project_id = project - elif data.project_id == wtypes.Unset: - data.project_id = pecan.request.headers.get('X-Project-Id') + user_limit, project_limit = acl.get_limited_to(pecan.request.headers) + + def _set_ownership(aspect, owner_limitation, header): + attr = '%s_id' % aspect + requested_owner = getattr(data, attr) + explicit_owner = requested_owner != wtypes.Unset + caller = pecan.request.headers.get(header) + if (owner_limitation and explicit_owner + and requested_owner != caller): + raise ProjectNotAuthorized(requested_owner, aspect) + + actual_owner = (owner_limitation or + requested_owner if explicit_owner else caller) + setattr(data, attr, actual_owner) + + _set_ownership('user', user_limit, 'X-User-Id') + _set_ownership('project', project_limit, 'X-Project-Id') + data.timestamp = now data.state_timestamp = now diff --git a/ceilometer/tests/api/v2/test_alarm_scenarios.py b/ceilometer/tests/api/v2/test_alarm_scenarios.py index 4758c492b..6693dbc72 100644 --- a/ceilometer/tests/api/v2/test_alarm_scenarios.py +++ b/ceilometer/tests/api/v2/test_alarm_scenarios.py @@ -857,6 +857,90 @@ class TestAlarms(FunctionalTest, alarms[0].project_id) self._verify_alarm(json, alarms[0], 'added_alarm') + @staticmethod + def _alarm_representation_owned_by(identifiers): + json = { + 'name': 'added_alarm', + 'enabled': False, + 'type': 'threshold', + 'ok_actions': ['http://something/ok'], + 'threshold_rule': { + 'meter_name': 'ameter', + 'query': [{'field': 'metadata.field', + 'op': 'eq', + 'value': '5', + 'type': 'string'}], + 'comparison_operator': 'le', + 'statistic': 'count', + 'threshold': 50, + 'evaluation_periods': 3, + 'period': 180, + } + } + for aspect, id in identifiers.iteritems(): + json['%s_id' % aspect] = id + return json + + def _do_test_post_alarm_as_nonadmin_on_behalf_of_another(self, + identifiers): + """Test that posting an alarm as non-admin on behalf of another + user/project fails with an explicit 401 instead of reverting + to the requestor's identity. + """ + json = self._alarm_representation_owned_by(identifiers) + headers = {} + headers.update(self.auth_headers) + headers['X-Roles'] = 'demo' + resp = self.post_json('/alarms', params=json, status=401, + headers=headers) + aspect = 'user' if 'user' in identifiers else 'project' + params = dict(aspect=aspect, id=identifiers[aspect]) + self.assertEqual("Not Authorized to access %(aspect)s %(id)s" % params, + jsonutils.loads(resp.body)['error_message'] + ['faultstring']) + + def test_post_alarm_as_nonadmin_on_behalf_of_another_user(self): + identifiers = dict(user='auseridthatisnotmine') + self._do_test_post_alarm_as_nonadmin_on_behalf_of_another(identifiers) + + def test_post_alarm_as_nonadmin_on_behalf_of_another_project(self): + identifiers = dict(project='aprojectidthatisnotmine') + self._do_test_post_alarm_as_nonadmin_on_behalf_of_another(identifiers) + + def test_post_alarm_as_nonadmin_on_behalf_of_another_creds(self): + identifiers = dict(user='auseridthatisnotmine', + project='aprojectidthatisnotmine') + self._do_test_post_alarm_as_nonadmin_on_behalf_of_another(identifiers) + + def _do_test_post_alarm_as_nonadmin_on_behalf_of_self(self, identifiers): + """Test posting an alarm as non-admin on behalf of own user/project + creates alarm associated with the requestor's identity. + """ + json = self._alarm_representation_owned_by(identifiers) + headers = {} + headers.update(self.auth_headers) + headers['X-Roles'] = 'demo' + self.post_json('/alarms', params=json, status=201, headers=headers) + alarms = list(self.conn.get_alarms(enabled=False)) + self.assertEqual(1, len(alarms)) + self.assertEqual(alarms[0].user_id, + self.auth_headers['X-User-Id']) + self.assertEqual(alarms[0].project_id, + self.auth_headers['X-Project-Id']) + + def test_post_alarm_as_nonadmin_on_behalf_of_own_user(self): + identifiers = dict(user=self.auth_headers['X-User-Id']) + self._do_test_post_alarm_as_nonadmin_on_behalf_of_self(identifiers) + + def test_post_alarm_as_nonadmin_on_behalf_of_own_project(self): + identifiers = dict(project=self.auth_headers['X-Project-Id']) + self._do_test_post_alarm_as_nonadmin_on_behalf_of_self(identifiers) + + def test_post_alarm_as_nonadmin_on_behalf_of_own_creds(self): + identifiers = dict(user=self.auth_headers['X-User-Id'], + project=self.auth_headers['X-Project-Id']) + self._do_test_post_alarm_as_nonadmin_on_behalf_of_self(identifiers) + def test_post_alarm_combination(self): json = { 'enabled': False,