Do the same auth checks in the v2 API as in the v1 API

- Don't limit access to admin.
- Reuse the get_limit_to_project() function from v1/acl.py

This requires some patches to wsme to fix the Response status (so we
can set it to a 401). We don't have this yet, so I am posting an
interim patch. Once we have the next version of wsme I'll fix the
status_code that is returned (see the TODO comments).

Change-Id: Ib39f045b3f9c4ff1f851e13d528fd117aed45c34
This commit is contained in:
Angus Salkeld 2013-05-10 21:19:02 +10:00
parent c9065364d4
commit d88a3097ae
8 changed files with 237 additions and 59 deletions

View File

@ -46,11 +46,7 @@ def install(app, conf):
conf=dict(conf.get(OPT_GROUP_NAME)))
class AdminAuthHook(hooks.PecanHook):
"""Verify that the user has admin rights
"""
def before(self, state):
headers = state.request.headers
if not policy.check_is_admin(headers.get('X-Roles', "").split(",")):
raise exc.HTTPUnauthorized()
def get_limited_to_project(headers):
"""Return the tenant the request should be limited to."""
if not policy.check_is_admin(headers.get('X-Roles', "").split(",")):
return headers.get('X-Tenant-Id')

View File

@ -53,9 +53,6 @@ def setup_app(pecan_config=None, extra_hooks=None):
if not pecan_config:
pecan_config = get_pecan_config()
if pecan_config.app.enable_acl:
app_hooks.append(acl.AdminAuthHook())
pecan.configuration.set_config(dict(pecan_config), overwrite=True)
app = pecan.make_app(

View File

@ -44,6 +44,8 @@ from ceilometer.openstack.common import timeutils
from ceilometer import counter
from ceilometer import pipeline
from ceilometer import storage
from ceilometer.api import acl
LOG = log.getLogger(__name__)
@ -126,8 +128,39 @@ class Query(_Base):
)
def _sanitize_query(q):
'''
Check the query to see if:
1) the request is comming from admin - then allow full visibility
2) non-admin - make sure that the query includes the requester's
project.
'''
auth_project = acl.get_limited_to_project(pecan.request.headers)
if auth_project:
proj_q = [i for i in q if i.field == 'project_id']
for i in proj_q:
if auth_project != i.value or i.op != 'eq':
# TODO(asalkeld) in the next version of wsme (0.5b3+)
# activate this code to be able to return the correct
# status code (also update api/v2/test_acl.py).
#return wsme.api.Response([return_type()],
# status_code=401)
errstr = 'Not Authorized to access project %s %s' % (i.op,
i.value)
raise wsme.exc.ClientSideError(errstr)
if not proj_q:
# The user is restricted, but they didn't specify a project
# so add it for them.
q.append(Query(field='project_id',
op='eq',
value=auth_project))
return q
def _query_to_kwargs(query, db_func):
# TODO(dhellmann): This function needs tests of its own.
query = _sanitize_query(query)
valid_keys = inspect.getargspec(db_func)[0]
if 'self' in valid_keys:
valid_keys.remove('self')
@ -474,11 +507,18 @@ class MeterController(rest.RestController):
samples = [Sample(**b) for b in body]
now = timeutils.utcnow()
auth_project = acl.get_limited_to_project(pecan.request.headers)
source = get_consistent_source()
for s in samples:
if self._id != s.counter_name:
raise wsme.exc.InvalidInput('counter_name', s.counter_name,
'should be %s' % self._id)
if auth_project and auth_project != s.project_id:
# non admin user trying to cross post to another project_id
auth_msg = 'can not post samples to other projects'
raise wsme.exc.InvalidInput('project_id', s.project_id,
auth_msg)
if s.timestamp is None or s.timestamp is wsme.Unset:
s.timestamp = now
s.source = '%s:%s' % (s.project_id, source)
@ -511,7 +551,6 @@ class MeterController(rest.RestController):
:param q: Filter rules for the data to be returned.
:param period: Returned result will be an array of statistics for a
period long of that number of seconds.
"""
kwargs = _query_to_kwargs(q, storage.SampleFilter.__init__)
kwargs['meter'] = self._id
@ -650,8 +689,9 @@ class ResourcesController(rest.RestController):
:param resource_id: The UUID of the resource.
"""
authorized_project = acl.get_limited_to_project(pecan.request.headers)
r = list(pecan.request.storage_conn.get_resources(
resource=resource_id))[0]
resource=resource_id, project=authorized_project))[0]
return Resource.from_db_and_links(r,
self._resource_links(resource_id))
@ -820,9 +860,9 @@ class AlarmsController(rest.RestController):
def delete(self, alarm_id):
"""Delete an alarm"""
conn = pecan.request.storage_conn
project_id = pecan.request.headers.get('X-Project-Id')
auth_project = acl.get_limited_to_project(pecan.request.headers)
alarms = list(conn.get_alarms(alarm_id=alarm_id,
project=project_id))
project=auth_project))
if len(alarms) < 1:
raise wsme.exc.ClientSideError(_("Unknown alarm"))
@ -831,7 +871,10 @@ class AlarmsController(rest.RestController):
@wsme_pecan.wsexpose(Alarm, wtypes.text)
def get_one(self, alarm_id):
"""Return one alarm"""
alarms = list(pecan.request.storage_conn.get_alarms(alarm_id=alarm_id))
conn = pecan.request.storage_conn
auth_project = acl.get_limited_to_project(pecan.request.headers)
alarms = list(conn.get_alarms(alarm_id=alarm_id,
project=auth_project))
if len(alarms) < 1:
raise wsme.exc.ClientSideError(_("Unknown alarm"))

View File

@ -1,26 +0,0 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Handle the ACL to acces the API server."""
from ceilometer import policy
def get_limited_to_project(headers):
"""Return the tenant the request should be limited to."""
if not policy.check_is_admin(headers.get('X-Roles', "").split(",")):
return headers.get('X-Tenant-Id')

View File

@ -94,7 +94,7 @@ from ceilometer.openstack.common import timeutils
from ceilometer import storage
from ceilometer.api.v1 import acl
from ceilometer.api import acl
LOG = log.getLogger(__name__)

View File

@ -20,12 +20,14 @@
import datetime
from oslo.config import cfg
from ceilometer import counter
from ceilometer.api import acl
from ceilometer import policy
from ceilometer.publisher import meter
from .base import FunctionalTest
VALID_TOKEN = '4562138218392831'
VALID_TOKEN2 = '4562138218392832'
class FakeMemcache(object):
@ -48,6 +50,19 @@ class FakeMemcache(object):
{'name': 'admin'},
]},
}}, dt.strftime("%s"))
if key == "tokens/%s" % VALID_TOKEN2:
dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
return ({'access': {
'token': {'id': VALID_TOKEN2},
'user': {
'id': 'user_id2',
'name': 'user-good',
'tenantId': 'project-good',
'tenantName': 'goodies',
'roles': [
{'name': 'Member'},
]},
}}, dt.strftime("%s"))
def set(self, key, value, time=None):
self.set_value = value
@ -60,6 +75,34 @@ class TestAPIACL(FunctionalTest):
super(TestAPIACL, self).setUp()
self.environ = {'fake.cache': FakeMemcache()}
for cnt in [
counter.Counter(
'meter.test',
'cumulative',
'',
1,
'user-good',
'project-good',
'resource-good',
timestamp=datetime.datetime(2012, 7, 2, 10, 40),
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter'}),
counter.Counter(
'meter.mine',
'gauge',
'',
1,
'user-fred',
'project-good',
'resource-56',
timestamp=datetime.datetime(2012, 7, 2, 10, 43),
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter4'})]:
msg = meter.meter_message_from_counter(cnt,
cfg.CONF.metering_secret,
'test_source')
self.conn.record_metering_data(msg)
def get_json(self, path, expect_errors=False, headers=None,
q=[], **params):
return super(TestAPIACL, self).get_json(path,
@ -94,7 +137,7 @@ class TestAPIACL(FunctionalTest):
# the incoming token, which we aren't providing.
#
# def test_authenticated_wrong_tenant(self):
# response = self.get_json('/sources',
# response = self.get_json('/meters',
# expect_errors=True,
# headers={
# "X-Roles": "admin",
@ -104,13 +147,60 @@ class TestAPIACL(FunctionalTest):
# self.assertEqual(response.status_int, 401)
def test_authenticated(self):
response = self.get_json('/meters',
expect_errors=True,
headers={
"X-Auth-Token": VALID_TOKEN,
"X-Roles": "admin",
"X-Tenant-Name": "admin",
"X-Tenant-Id":
"bc23a9d531064583ace8f67dad60f6bb",
})
self.assertEqual(response.status_int, 200)
data = self.get_json('/meters',
headers={"X-Auth-Token": VALID_TOKEN,
"X-Roles": "admin",
"X-Tenant-Name": "admin",
"X-Tenant-Id":
"bc23a9d531064583ace8f67dad60f6bb",
})
ids = set(r['resource_id'] for r in data)
self.assertEquals(set(['resource-good', 'resource-56']), ids)
def test_with_non_admin_missing_project_query(self):
data = self.get_json('/meters',
headers={"X-Roles": "Member",
"X-Auth-Token": VALID_TOKEN2,
"X-Tenant-Id": "project-good"})
ids = set(r['resource_id'] for r in data)
self.assertEquals(set(['resource-good', 'resource-56']), ids)
def test_with_non_admin(self):
data = self.get_json('/meters',
headers={"X-Roles": "Member",
"X-Auth-Token": VALID_TOKEN2,
"X-Tenant-Id": "project-good"},
q=[{'field': 'project_id',
'value': 'project-good',
}])
ids = set(r['resource_id'] for r in data)
self.assertEquals(set(['resource-good', 'resource-56']), ids)
def test_non_admin_wrong_project(self):
data = self.get_json('/meters',
expect_errors=True,
headers={"X-Roles": "Member",
"X-Auth-Token": VALID_TOKEN2,
"X-Tenant-Id": "project-good"},
q=[{'field': 'project_id',
'value': 'project-wrong',
}])
#TODO(asalkeld) revert this with wsme-0.5b3+
# self.assertEqual(data.status_int, 401)
self.assertEqual(data.status_int, 400)
def test_non_admin_two_projects(self):
data = self.get_json('/meters',
expect_errors=True,
headers={"X-Roles": "Member",
"X-Auth-Token": VALID_TOKEN2,
"X-Tenant-Id": "project-good"},
q=[{'field': 'project_id',
'value': 'project-good',
},
{'field': 'project_id',
'value': 'project-naughty',
}])
#TODO(asalkeld) revert this with wsme-0.5b3+
# self.assertEqual(data.status_int, 401)
self.assertEqual(data.status_int, 400)

View File

@ -290,6 +290,58 @@ class TestListResources(FunctionalTest):
ids = [r['resource_id'] for r in data]
self.assertEquals(['resource-id'], ids)
def test_with_user_non_admin(self):
counter1 = counter.Counter(
'instance',
'cumulative',
'',
1,
'user-id2',
'project-id2',
'resource-id-alternate',
timestamp=datetime.datetime(2012, 7, 2, 10, 41),
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter1',
}
)
msg2 = meter.meter_message_from_counter(counter1,
cfg.CONF.metering_secret,
'not-test',
)
self.conn.record_metering_data(msg2)
data = self.get_json('/resources',
headers={"X-Roles": "Member",
"X-Tenant-Id": "project-id2"})
ids = set(r['resource_id'] for r in data)
self.assertEquals(set(['resource-id-alternate']), ids)
def test_with_user_wrong_tenant(self):
counter1 = counter.Counter(
'instance',
'cumulative',
'',
1,
'user-id2',
'project-id2',
'resource-id-alternate',
timestamp=datetime.datetime(2012, 7, 2, 10, 41),
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter1',
}
)
msg2 = meter.meter_message_from_counter(counter1,
cfg.CONF.metering_secret,
'not-test',
)
self.conn.record_metering_data(msg2)
data = self.get_json('/resources',
headers={"X-Roles": "Member",
"X-Tenant-Id": "project-wrong"})
ids = set(r['resource_id'] for r in data)
self.assertEquals(set(), ids)
def test_metadata(self):
counter1 = counter.Counter(
'instance',

View File

@ -60,10 +60,29 @@ class TestPostSamples(FunctionalTest):
self.assertEquals(s1, data.json)
def test_wrong_counter_name(self):
def test_one(self):
s1 = [{'counter_name': 'apples',
'counter_type': 'gauge',
'counter_unit': 'instance',
'counter_volume': 1,
'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36',
'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68',
'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff',
'resource_metadata': {'name1': 'value1',
'name2': 'value2'}}]
data = self.post_json('/meters/apples/', s1)
# timestamp not given so it is generated.
s1[0]['timestamp'] = data.json[0]['timestamp']
# source is generated if not provided.
s1[0]['source'] = '%s:openstack' % s1[0]['project_id']
self.assertEquals(s1, data.json)
def test_wrong_project_id(self):
'''
do not accept cross posting samples to different meters
i.e. my_counter_name != wrong
do not accept cross posting samples to different projects
'''
s1 = [{'counter_name': 'my_counter_name',
'counter_type': 'gauge',
@ -76,7 +95,14 @@ class TestPostSamples(FunctionalTest):
'resource_metadata': {'name1': 'value1',
'name2': 'value2'}}]
data = self.post_json('/meters/wrong/', s1, expect_errors=True)
data = self.post_json('/meters/my_counter_name/', s1,
expect_errors=True,
headers={
"X-Roles": "Member",
"X-Tenant-Name": "lu-tenant",
"X-Tenant-Id":
"bc23a9d531064583ace8f67dad60f6bb",
})
self.assertEquals(data.status_int, 400)