Verify user/project ID for alarm created by non-admin user

Fixes bug 1297677

Previously, when a non-admin user created an alarm on behalf of
another user, the ownership of the alarm silently reverted to the
requestor's identity.

Now, we check for this case and fail with 401 Not Authorized when
a non-admin user attempts to create an alarm explicitly associated
with another identity.

Change-Id: I8f372b1523012990b8f1d48b03cf9ed9cef65c60
This commit is contained in:
Eoghan Glynn 2014-03-26 11:13:23 +00:00
parent 7381b6355f
commit e538f84cfe
2 changed files with 105 additions and 11 deletions

View File

@ -272,9 +272,10 @@ class Query(_Base):
class ProjectNotAuthorized(ClientSideError):
def __init__(self, id):
def __init__(self, id, aspect='project'):
params = dict(aspect=aspect, id=id)
super(ProjectNotAuthorized, self).__init__(
_("Not Authorized to access project %s") % id,
_("Not Authorized to access %(aspect)s %(id)s") % params,
status_code=401)
@ -2037,15 +2038,24 @@ class AlarmsController(rest.RestController):
now = timeutils.utcnow()
data.alarm_id = str(uuid.uuid4())
user, project = acl.get_limited_to(pecan.request.headers)
if user:
data.user_id = user
elif data.user_id == wtypes.Unset:
data.user_id = pecan.request.headers.get('X-User-Id')
if project:
data.project_id = project
elif data.project_id == wtypes.Unset:
data.project_id = pecan.request.headers.get('X-Project-Id')
user_limit, project_limit = acl.get_limited_to(pecan.request.headers)
def _set_ownership(aspect, owner_limitation, header):
attr = '%s_id' % aspect
requested_owner = getattr(data, attr)
explicit_owner = requested_owner != wtypes.Unset
caller = pecan.request.headers.get(header)
if (owner_limitation and explicit_owner
and requested_owner != caller):
raise ProjectNotAuthorized(requested_owner, aspect)
actual_owner = (owner_limitation or
requested_owner if explicit_owner else caller)
setattr(data, attr, actual_owner)
_set_ownership('user', user_limit, 'X-User-Id')
_set_ownership('project', project_limit, 'X-Project-Id')
data.timestamp = now
data.state_timestamp = now

View File

@ -861,6 +861,90 @@ class TestAlarms(FunctionalTest,
self.auth_headers['X-Project-Id'])
self._verify_alarm(json, alarms[0], 'added_alarm')
@staticmethod
def _alarm_representation_owned_by(identifiers):
json = {
'name': 'added_alarm',
'enabled': False,
'type': 'threshold',
'ok_actions': ['http://something/ok'],
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'metadata.field',
'op': 'eq',
'value': '5',
'type': 'string'}],
'comparison_operator': 'le',
'statistic': 'count',
'threshold': 50,
'evaluation_periods': 3,
'period': 180,
}
}
for aspect, id in identifiers.iteritems():
json['%s_id' % aspect] = id
return json
def _do_test_post_alarm_as_nonadmin_on_behalf_of_another(self,
identifiers):
"""Test that posting an alarm as non-admin on behalf of another
user/project fails with an explicit 401 instead of reverting
to the requestor's identity.
"""
json = self._alarm_representation_owned_by(identifiers)
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'demo'
resp = self.post_json('/alarms', params=json, status=401,
headers=headers)
aspect = 'user' if 'user' in identifiers else 'project'
params = dict(aspect=aspect, id=identifiers[aspect])
self.assertEqual("Not Authorized to access %(aspect)s %(id)s" % params,
jsonutils.loads(resp.body)['error_message']
['faultstring'])
def test_post_alarm_as_nonadmin_on_behalf_of_another_user(self):
identifiers = dict(user='auseridthatisnotmine')
self._do_test_post_alarm_as_nonadmin_on_behalf_of_another(identifiers)
def test_post_alarm_as_nonadmin_on_behalf_of_another_project(self):
identifiers = dict(project='aprojectidthatisnotmine')
self._do_test_post_alarm_as_nonadmin_on_behalf_of_another(identifiers)
def test_post_alarm_as_nonadmin_on_behalf_of_another_creds(self):
identifiers = dict(user='auseridthatisnotmine',
project='aprojectidthatisnotmine')
self._do_test_post_alarm_as_nonadmin_on_behalf_of_another(identifiers)
def _do_test_post_alarm_as_nonadmin_on_behalf_of_self(self, identifiers):
"""Test posting an alarm as non-admin on behalf of own user/project
creates alarm associated with the requestor's identity.
"""
json = self._alarm_representation_owned_by(identifiers)
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'demo'
self.post_json('/alarms', params=json, status=201, headers=headers)
alarms = list(self.conn.get_alarms(enabled=False))
self.assertEqual(1, len(alarms))
self.assertEqual(alarms[0].user_id,
self.auth_headers['X-User-Id'])
self.assertEqual(alarms[0].project_id,
self.auth_headers['X-Project-Id'])
def test_post_alarm_as_nonadmin_on_behalf_of_own_user(self):
identifiers = dict(user=self.auth_headers['X-User-Id'])
self._do_test_post_alarm_as_nonadmin_on_behalf_of_self(identifiers)
def test_post_alarm_as_nonadmin_on_behalf_of_own_project(self):
identifiers = dict(project=self.auth_headers['X-Project-Id'])
self._do_test_post_alarm_as_nonadmin_on_behalf_of_self(identifiers)
def test_post_alarm_as_nonadmin_on_behalf_of_own_creds(self):
identifiers = dict(user=self.auth_headers['X-User-Id'],
project=self.auth_headers['X-Project-Id'])
self._do_test_post_alarm_as_nonadmin_on_behalf_of_self(identifiers)
def test_post_alarm_combination(self):
json = {
'enabled': False,