Fix v2 API policy checks when keystone is in use.

bug 1022032.  There were significant errors in how
the policy framework was being invoked, but existing
tests did now show them, since they always invoked the
API with no keystone context.  This patch fixes those
issues and add a significant amount of test coverage
simulating API calls with keystone enabled.

As part of this patch, we also needed to add a tenant-id
attribute to the subnet object.  Furthermore, we
changed the API validation code to prevent tenant-id from
being changed with a PUT, since changing it after creation
could invalidate some of the create checks we're planning
on adding (e.g., that a port is being created on a network
owned by the same tenant).

Change-Id: I6da61f0d4ac3b259aa0afcd42cfa8e6ec1a9b035
This commit is contained in:
Dan Wendlandt 2012-07-08 12:34:22 -07:00
parent 02f7d25f49
commit 6e8c9eeaf0
6 changed files with 138 additions and 28 deletions

View File

@ -184,7 +184,7 @@ class Controller(object):
action = "delete_%s" % self._resource
# Check authz
obj = self._item(request, id)
obj = self._item(request, id)[self._resource]
try:
policy.enforce(request.context, action, obj)
except exceptions.PolicyNotAuthorized:
@ -201,7 +201,7 @@ class Controller(object):
action = "update_%s" % self._resource
# Check authz
orig_obj = self._item(request, id)
orig_obj = self._item(request, id)[self._resource]
try:
policy.enforce(request.context, action, orig_obj)
except exceptions.PolicyNotAuthorized:
@ -216,9 +216,6 @@ class Controller(object):
def _populate_tenant_id(self, context, res_dict, is_create):
if self._resource not in ['network', 'port']:
return
if (('tenant_id' in res_dict and
res_dict['tenant_id'] != context.tenant_id and
not context.is_admin)):

View File

@ -57,7 +57,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'admin_state_up': {'allow_post': True, 'allow_put': True,
'default': True},
'status': {'allow_post': False, 'allow_put': False},
'tenant_id': {'allow_post': True, 'allow_put': True},
'tenant_id': {'allow_post': True, 'allow_put': False},
},
'ports': {
'id': {'allow_post': False, 'allow_put': False},
@ -71,7 +71,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'host_routes': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED},
'device_id': {'allow_post': True, 'allow_put': True, 'default': ''},
'tenant_id': {'allow_post': True, 'allow_put': True},
'tenant_id': {'allow_post': True, 'allow_put': False},
},
'subnets': {
'id': {'allow_post': False, 'allow_put': False},
@ -87,6 +87,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'default': ATTR_NOT_SPECIFIED},
'additional_host_routes': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED},
'tenant_id': {'allow_post': True, 'allow_put': False},
}
}

View File

@ -64,8 +64,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
query = context.session.query(model)
# NOTE(jkoelker) non-admin queries are scoped to their tenant_id
if not context.is_admin and hasattr(model.tenant_id):
query = query.filter(tenant_id=context.tenant_id)
if not context.is_admin and hasattr(model, 'tenant_id'):
query = query.filter(model.tenant_id == context.tenant_id)
return query
@ -614,6 +614,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
def _make_subnet_dict(self, subnet, fields=None):
res = {'id': subnet['id'],
'tenant_id': subnet['tenant_id'],
'network_id': subnet['network_id'],
'ip_version': subnet['ip_version'],
'cidr': subnet['cidr'],
@ -687,10 +688,12 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
if s['gateway_ip'] == api_router.ATTR_NOT_SPECIFIED:
s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1))
tenant_id = self._get_tenant_id_for_create(context, s)
with context.session.begin():
network = self._get_network(context, s["network_id"])
self._validate_subnet_cidr(network, s['cidr'])
subnet = models_v2.Subnet(network_id=s['network_id'],
subnet = models_v2.Subnet(tenant_id=tenant_id,
network_id=s['network_id'],
ip_version=s['ip_version'],
cidr=s['cidr'],
gateway_ip=s['gateway_ip'])

View File

@ -97,7 +97,7 @@ class Port(model_base.BASEV2, HasId, HasTenant):
device_id = sa.Column(sa.String(255), nullable=False)
class Subnet(model_base.BASEV2, HasId):
class Subnet(model_base.BASEV2, HasId, HasTenant):
"""Represents a quantum subnet.
When a subnet is created the first and last entries will be created. These

View File

@ -379,24 +379,44 @@ class APIv2TestCase(unittest.TestCase):
# Note: since all resources use the same controller and validation
# logic, we actually get really good coverage from testing just networks.
class JSONV2TestCase(APIv2TestCase):
def test_list(self):
def _test_list(self, req_tenant_id, real_tenant_id):
env = {}
if req_tenant_id:
env = {'quantum.context': context.Context('', req_tenant_id)}
input_dict = {'id': str(uuid.uuid4()),
'name': 'net1',
'admin_state_up': True,
'status': "ACTIVE",
'tenant_id': str(uuid.uuid4()),
'tenant_id': real_tenant_id,
'subnets': []}
return_value = [input_dict]
instance = self.plugin.return_value
instance.get_networks.return_value = return_value
res = self.api.get(_get_path('networks'))
res = self.api.get(_get_path('networks'), extra_environ=env)
self.assertTrue('networks' in res.json)
self.assertEqual(len(res.json['networks']), 1)
output_dict = res.json['networks'][0]
self.assertEqual(len(input_dict), len(output_dict))
for k, v in input_dict.iteritems():
self.assertEqual(v, output_dict[k])
if not req_tenant_id or req_tenant_id == real_tenant_id:
# expect full list returned
self.assertEqual(len(res.json['networks']), 1)
output_dict = res.json['networks'][0]
self.assertEqual(len(input_dict), len(output_dict))
for k, v in input_dict.iteritems():
self.assertEqual(v, output_dict[k])
else:
# expect no results
self.assertEqual(len(res.json['networks']), 0)
def test_list_noauth(self):
self._test_list(None, _uuid())
def test_list_keystone(self):
tenant_id = _uuid()
self._test_list(tenant_id, tenant_id)
def test_list_keystone_bad(self):
tenant_id = _uuid()
self._test_list(tenant_id + "bad", tenant_id)
def test_create(self):
net_id = _uuid()
@ -579,24 +599,90 @@ class JSONV2TestCase(APIv2TestCase):
self.api.get(_get_path('networks', id=str(uuid.uuid4())))
def test_delete(self):
def _test_delete(self, req_tenant_id, real_tenant_id, expected_code,
expect_errors=False):
env = {}
if req_tenant_id:
env = {'quantum.context': context.Context('', req_tenant_id)}
instance = self.plugin.return_value
instance.get_network.return_value = {'tenant_id': real_tenant_id}
instance.delete_network.return_value = None
res = self.api.delete(_get_path('networks', id=str(uuid.uuid4())))
self.assertEqual(res.status_int, exc.HTTPNoContent.code)
res = self.api.delete(_get_path('networks', id=str(uuid.uuid4())),
extra_environ=env, expect_errors=expect_errors)
self.assertEqual(res.status_int, expected_code)
def test_update(self):
def test_delete_noauth(self):
self._test_delete(None, _uuid(), exc.HTTPNoContent.code)
def test_delete_keystone(self):
tenant_id = _uuid()
self._test_delete(tenant_id, tenant_id, exc.HTTPNoContent.code)
def test_delete_keystone_bad_tenant(self):
tenant_id = _uuid()
self._test_delete(tenant_id + "bad", tenant_id,
exc.HTTPNotFound.code, expect_errors=True)
def _test_get(self, req_tenant_id, real_tenant_id, expected_code,
expect_errors=False):
env = {}
if req_tenant_id:
env = {'quantum.context': context.Context('', req_tenant_id)}
data = {'tenant_id': real_tenant_id}
instance = self.plugin.return_value
instance.get_network.return_value = data
res = self.api.get(_get_path('networks',
id=str(uuid.uuid4())),
extra_environ=env,
expect_errors=expect_errors)
self.assertEqual(res.status_int, expected_code)
def test_get_noauth(self):
self._test_get(None, _uuid(), 200)
def test_get_keystone(self):
tenant_id = _uuid()
self._test_get(tenant_id, tenant_id, 200)
def test_get_keystone_bad_tenant(self):
tenant_id = _uuid()
self._test_get(tenant_id + "bad", tenant_id,
exc.HTTPNotFound.code, expect_errors=True)
def _test_update(self, req_tenant_id, real_tenant_id, expected_code,
expect_errors=False):
env = {}
if req_tenant_id:
env = {'quantum.context': context.Context('', req_tenant_id)}
# leave out 'name' field intentionally
data = {'network': {'admin_state_up': True}}
return_value = {'subnets': []}
return_value.update(data['network'].copy())
instance = self.plugin.return_value
instance.get_network.return_value = {'tenant_id': real_tenant_id}
instance.update_network.return_value = return_value
self.api.put_json(_get_path('networks',
id=str(uuid.uuid4())), data)
res = self.api.put_json(_get_path('networks',
id=str(uuid.uuid4())),
data,
extra_environ=env,
expect_errors=expect_errors)
self.assertEqual(res.status_int, expected_code)
def test_update_noauth(self):
self._test_update(None, _uuid(), 200)
def test_update_keystone(self):
tenant_id = _uuid()
self._test_update(tenant_id, tenant_id, 200)
def test_update_keystone_bad_tenant(self):
tenant_id = _uuid()
self._test_update(tenant_id + "bad", tenant_id,
exc.HTTPNotFound.code, expect_errors=True)
def test_update_readonly_field(self):
data = {'network': {'status': "NANANA"}}

View File

@ -25,6 +25,7 @@ import quantum
from quantum.api.v2.router import APIRouter
from quantum.common import config
from quantum.common import exceptions as q_exc
from quantum import context
from quantum.db import api as db
from quantum.openstack.common import cfg
from quantum.tests.unit.testlib_api import create_request
@ -112,9 +113,10 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
network_req = self.new_create_request('networks', data, fmt)
return network_req.get_response(self.api)
def _create_subnet(self, fmt, net_id, gateway_ip, cidr,
def _create_subnet(self, fmt, tenant_id, net_id, gateway_ip, cidr,
allocation_pools=None, ip_version=4):
data = {'subnet': {'network_id': net_id,
data = {'subnet': {'tenant_id': tenant_id,
'network_id': net_id,
'cidr': cidr,
'ip_version': ip_version}}
if gateway_ip:
@ -140,6 +142,7 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
def _make_subnet(self, fmt, network, gateway, cidr,
allocation_pools=None, ip_version=4):
res = self._create_subnet(fmt,
network['network']['tenant_id'],
network['network']['id'],
gateway,
cidr,
@ -211,6 +214,25 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
self._delete('ports', port['port']['id'])
class TestBasicGet(QuantumDbPluginV2TestCase):
def test_single_get_admin(self):
plugin = quantum.db.db_base_plugin_v2.QuantumDbPluginV2()
with self.network() as network:
net_id = network['network']['id']
ctx = context.get_admin_context()
n = plugin._get_network(ctx, net_id)
self.assertEqual(net_id, n.id)
def test_single_get_tenant(self):
plugin = quantum.db.db_base_plugin_v2.QuantumDbPluginV2()
with self.network() as network:
net_id = network['network']['id']
ctx = context.get_admin_context()
n = plugin._get_network(ctx, net_id)
self.assertEqual(net_id, n.id)
class TestV2HTTPResponse(QuantumDbPluginV2TestCase):
def test_create_returns_201(self):
res = self._create_network('json', 'net2', True)
@ -476,8 +498,9 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
fmt = 'json'
with self.subnet() as subnet:
# Get a IPv4 and IPv6 address
tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
res = self._create_subnet(fmt, net_id=net_id,
res = self._create_subnet(fmt, tenant_id, net_id=net_id,
cidr='2607:f0d0:1002:51::0/124',
ip_version=6, gateway_ip=None)
subnet2 = self.deserialize(fmt, res)