Load full contexts from request headers

Ensure full of the context attributes are read and honored when
policy rules are evaluated. This is required to support rules which
depend on attributes other than project_id, user_id and role. (This
means no rule for system scope has had worked in aodh till now)

Also make sure is_admin check honors the context_is_admin rule.

Change-Id: I4a57e5baf3edcbcd3f37a7d436226d33305fcbe8
This commit is contained in:
Takashi Kajinami 2024-09-21 15:03:57 +09:00
parent e52699f9eb
commit 2813c83e5b
5 changed files with 41 additions and 45 deletions

View File

@ -366,7 +366,7 @@ class Alarm(base.Base):
"maximum": max_actions}
raise base.ClientSideError(error)
limited = rbac.get_limited_to_project(pecan.request.headers,
limited = rbac.get_limited_to_project(pecan.request,
pecan.request.enforcer)
for action in actions:
@ -578,7 +578,8 @@ class AlarmController(rest.RestController):
auth_project = pecan.request.headers.get('X-Project-Id')
filters = {'alarm_id': self._id}
if not rbac.is_admin(pecan.request.headers):
is_admin = rbac.is_admin(pecan.request, pecan.request.enforcer)
if not is_admin:
filters['project_id'] = auth_project
alarms = pecan.request.storage.get_alarms(**filters)
@ -588,7 +589,7 @@ class AlarmController(rest.RestController):
alarm = alarms[0]
target = {'user_id': alarm.user_id,
'project_id': alarm.project_id}
rbac.enforce(rbac_directive, pecan.request.headers,
rbac.enforce(rbac_directive, pecan.request,
pecan.request.enforcer, target)
return alarm
@ -662,7 +663,7 @@ class AlarmController(rest.RestController):
data.alarm_id = self._id
user, project = rbac.get_limited_to(pecan.request.headers,
user, project = rbac.get_limited_to(pecan.request,
pecan.request.enforcer)
if user:
data.user_id = user
@ -727,7 +728,7 @@ class AlarmController(rest.RestController):
# allow history to be returned for deleted alarms, but scope changes
# returned to those carried out on behalf of the auth'd tenant, to
# avoid inappropriate cross-tenant visibility of alarm history
auth_project = rbac.get_limited_to_project(pecan.request.headers,
auth_project = rbac.get_limited_to_project(pecan.request,
pecan.request.enforcer)
conn = pecan.request.storage
kwargs = v2_utils.query_to_kwargs(
@ -813,14 +814,14 @@ class AlarmsController(rest.RestController):
:param data: an alarm within the request body.
"""
rbac.enforce('create_alarm', pecan.request.headers,
rbac.enforce('create_alarm', pecan.request,
pecan.request.enforcer, {})
conn = pecan.request.storage
now = timeutils.utcnow()
data.alarm_id = uuidutils.generate_uuid()
user_limit, project_limit = rbac.get_limited_to(pecan.request.headers,
user_limit, project_limit = rbac.get_limited_to(pecan.request,
pecan.request.enforcer)
def _set_ownership(aspect, owner_limitation, header):
@ -874,8 +875,8 @@ class AlarmsController(rest.RestController):
:param marker: The pagination query marker.
"""
target = rbac.target_from_segregation_rule(
pecan.request.headers, pecan.request.enforcer)
rbac.enforce('get_alarms', pecan.request.headers,
pecan.request, pecan.request.enforcer)
rbac.enforce('get_alarms', pecan.request,
pecan.request.enforcer, target)
q = q or []
@ -898,12 +899,12 @@ class AlarmsController(rest.RestController):
if 'all_projects' in keys:
if v2_utils.get_query_value(q, 'all_projects', 'boolean'):
rbac.enforce('get_alarms:all_projects', pecan.request.headers,
rbac.enforce('get_alarms:all_projects', pecan.request,
pecan.request.enforcer, target)
keys.remove('all_projects')
else:
project_id = pecan.request.headers.get('X-Project-Id')
is_admin = rbac.is_admin(pecan.request.headers)
is_admin = rbac.is_admin(pecan.request, pecan.request.enforcer)
if not v2_utils.is_field_exist(q, 'project_id'):
q.append(

View File

@ -273,7 +273,7 @@ class ValidatedComplexQuery(object):
"and <visibility_field>=<tenant's project_id>" clause to the query.
"""
authorized_project = rbac.get_limited_to_project(
pecan.request.headers, pecan.request.enforcer)
pecan.request, pecan.request.enforcer)
is_admin = authorized_project is None
if not is_admin:
self._restrict_to_project(authorized_project, visibility_field)
@ -354,8 +354,8 @@ class QueryAlarmHistoryController(rest.RestController):
:param body: Query rules for the alarm history to be returned.
"""
target = rbac.target_from_segregation_rule(
pecan.request.headers, pecan.request.enforcer)
rbac.enforce('query_alarm_history', pecan.request.headers,
pecan.request, pecan.request.enforcer)
rbac.enforce('query_alarm_history', pecan.request,
pecan.request.enforcer, target)
query = ValidatedComplexQuery(body,
@ -380,8 +380,8 @@ class QueryAlarmsController(rest.RestController):
"""
target = rbac.target_from_segregation_rule(
pecan.request.headers, pecan.request.enforcer)
rbac.enforce('query_alarm', pecan.request.headers,
pecan.request, pecan.request.enforcer)
rbac.enforce('query_alarm', pecan.request,
pecan.request.enforcer, target)
query = ValidatedComplexQuery(body,

View File

@ -48,7 +48,7 @@ class QuotasController(rest.RestController):
"""
request_project = pecan.request.headers.get('X-Project-Id')
project_id = project_id if project_id else request_project
is_admin = rbac.is_admin(pecan.request.headers)
is_admin = rbac.is_admin(pecan.request, pecan.request.enforcer)
if project_id != request_project and not is_admin:
raise base.ProjectNotAuthorized(project_id)
@ -68,7 +68,7 @@ class QuotasController(rest.RestController):
@wsme_pecan.wsexpose(Quotas, body=Quotas, status_code=201)
def post(self, body):
"""Create or update quota."""
rbac.enforce('update_quotas', pecan.request.headers,
rbac.enforce('update_quotas', pecan.request,
pecan.request.enforcer, {})
params = body.to_dict()
@ -86,6 +86,6 @@ class QuotasController(rest.RestController):
@wsme_pecan.wsexpose(None, str, status_code=204)
def delete(self, project_id):
"""Delete quotas for the given project."""
rbac.enforce('delete_quotas', pecan.request.headers,
rbac.enforce('delete_quotas', pecan.request,
pecan.request.enforcer, {})
pecan.request.storage.delete_quotas(project_id)

View File

@ -39,7 +39,7 @@ def get_auth_project(on_behalf_of=None):
# Hence, for null auth_project (indicating admin-ness) we check if
# the creating tenant differs from the tenant on whose behalf the
# alarm is being created
auth_project = rbac.get_limited_to_project(pecan.request.headers,
auth_project = rbac.get_limited_to_project(pecan.request,
pecan.request.enforcer)
created_by = pecan.request.headers.get('X-Project-Id')
is_admin = auth_project is None
@ -76,7 +76,7 @@ def sanitize_query(query, db_func, on_behalf_of=None):
def _verify_query_segregation(query, auth_project=None):
"""Ensure non-admin queries are not constrained to another project."""
auth_project = (auth_project or
rbac.get_limited_to_project(pecan.request.headers,
rbac.get_limited_to_project(pecan.request,
pecan.request.enforcer))
if not auth_project:

View File

@ -17,44 +17,40 @@
"""Access Control Lists (ACL's) control access the API server."""
from oslo_context import context
import pecan
def target_from_segregation_rule(headers, enforcer):
def target_from_segregation_rule(req, enforcer):
"""Return a target corresponding to an alarm returned by segregation rule
This allows to use project_id: in an oslo_policy rule for query/listing.
:param headers: HTTP headers dictionary
:param req: Webob Request object
:param enforcer: policy enforcer
:returns: target
"""
project_id = get_limited_to_project(headers, enforcer)
project_id = get_limited_to_project(req, enforcer)
if project_id is not None:
return {'project_id': project_id}
return {}
def enforce(policy_name, headers, enforcer, target):
def enforce(policy_name, req, enforcer, target):
"""Return the user and project the request should be limited to.
:param policy_name: the policy name to validate authz against.
:param headers: HTTP headers dictionary
:param req: Webob Request object
:param enforcer: policy enforcer
:param target: the alarm or "auto" to
"""
rule_method = "telemetry:" + policy_name
ctxt = context.RequestContext.from_environ(req.environ)
credentials = {
'roles': headers.get('X-Roles', "").split(","),
'user_id': headers.get('X-User-Id'),
'project_id': headers.get('X-Project-Id'),
}
if not enforcer.enforce(rule_method, target, credentials):
if not enforcer.enforce(rule_method, target, ctxt.to_dict()):
pecan.core.abort(status_code=403,
detail='RBAC Authorization Failed')
@ -62,10 +58,10 @@ def enforce(policy_name, headers, enforcer, target):
# TODO(fabiog): these methods are still used because the scoping part is really
# convoluted and difficult to separate out.
def get_limited_to(headers, enforcer):
def get_limited_to(req, enforcer):
"""Return the user and project the request should be limited to.
:param headers: HTTP headers dictionary
:param req: Webob Request object
:param enforcer: policy enforcer
:return: A tuple of (user, project), set to None if there's no limit on
one of these.
@ -76,29 +72,28 @@ def get_limited_to(headers, enforcer):
# creating more enhanced rbac. But for now we enforce the
# scoping of request to the project-id, so...
target = {}
credentials = {
'roles': headers.get('X-Roles', "").split(","),
}
ctxt = context.RequestContext.from_environ(req.environ)
# maintain backward compat with Juno and previous by using context_is_admin
# rule if the segregation rule (added in Kilo) is not defined
rules = enforcer.rules.keys()
rule_name = 'segregation' if 'segregation' in rules else 'context_is_admin'
if not enforcer.enforce(rule_name, target, credentials):
return headers.get('X-User-Id'), headers.get('X-Project-Id')
if not enforcer.enforce(rule_name, target, ctxt.to_dict()):
return ctxt.user_id, ctxt.project_id
return None, None
def get_limited_to_project(headers, enforcer):
def get_limited_to_project(req, enforcer):
"""Return the project the request should be limited to.
:param headers: HTTP headers dictionary
:param req: Webob Request object
:param enforcer: policy enforcer
:return: A project, or None if there's no limit on it.
"""
return get_limited_to(headers, enforcer)[1]
return get_limited_to(req, enforcer)[1]
def is_admin(headers):
return 'admin' in headers.get('X-Roles', "").split(",")
def is_admin(req, enforcer):
ctxt = context.RequestContext.from_environ(req.environ)
return enforcer.enforce('context_is_admin', {}, ctxt.to_dict())