Merge "Added policy checks for add interface and remove interface"
This commit is contained in:
commit
a6a4272806
@ -12,6 +12,8 @@
|
||||
|
||||
"extension:router:view": [["rule:regular_user"]],
|
||||
"extension:router:set": [["rule:admin_only"]],
|
||||
"extension:router:add_router_interface": [["rule:admin_only"]],
|
||||
"extension:router:remove_router_interface": [["rule:admin_or_owner"]],
|
||||
|
||||
"subnets:private:read": [["rule:admin_or_owner"]],
|
||||
"subnets:private:write": [["rule:admin_or_owner"]],
|
||||
|
@ -274,12 +274,19 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
pass
|
||||
|
||||
def add_router_interface(self, context, router_id, interface_info):
|
||||
# make sure router exists - will raise if not
|
||||
self._get_router(context, router_id)
|
||||
# make sure router exists
|
||||
router = self._get_router(context, router_id)
|
||||
if not interface_info:
|
||||
msg = "Either subnet_id or port_id must be specified"
|
||||
raise q_exc.BadRequest(resource='router', msg=msg)
|
||||
|
||||
try:
|
||||
policy.enforce(context,
|
||||
"extension:router:add_router_interface",
|
||||
self._make_router_dict(router))
|
||||
except q_exc.PolicyNotAuthorized:
|
||||
raise l3.RouterNotFound(router_id=router_id)
|
||||
|
||||
if 'port_id' in interface_info:
|
||||
if 'subnet_id' in interface_info:
|
||||
msg = "cannot specify both subnet-id and port-id"
|
||||
@ -327,6 +334,12 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
def remove_router_interface(self, context, router_id, interface_info):
|
||||
# make sure router exists
|
||||
router = self._get_router(context, router_id)
|
||||
try:
|
||||
policy.enforce(context,
|
||||
"extension:router:remove_router_interface",
|
||||
self._make_router_dict(router))
|
||||
except q_exc.PolicyNotAuthorized:
|
||||
raise l3.RouterNotFound(router_id=router_id)
|
||||
|
||||
if not interface_info:
|
||||
msg = "Either subnet_id or port_id must be specified"
|
||||
|
@ -329,8 +329,9 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
|
||||
return self.deserialize('json', res)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def router(self, name='router1', admin_status_up=True, fmt='json'):
|
||||
res = self._create_router(fmt, _uuid(), name=name,
|
||||
def router(self, name='router1', admin_status_up=True,
|
||||
fmt='json', tenant_id=_uuid()):
|
||||
res = self._create_router(fmt, tenant_id, name=name,
|
||||
admin_state_up=admin_status_up)
|
||||
router = self.deserialize(fmt, res)
|
||||
yield router
|
||||
@ -387,6 +388,41 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
|
||||
body = self._show('ports', r_port_id,
|
||||
expected_code=exc.HTTPNotFound.code)
|
||||
|
||||
def test_router_add_interface_subnet_with_bad_tenant(self):
|
||||
with mock.patch('quantum.context.Context.to_dict') as tdict:
|
||||
tenant_id = _uuid()
|
||||
admin_context = {'roles': ['admin']}
|
||||
tenant_context = {'tenant_id': 'bad_tenant',
|
||||
'roles': []}
|
||||
tdict.return_value = admin_context
|
||||
with self.router(tenant_id=tenant_id) as r:
|
||||
with self.network(tenant_id=tenant_id) as n:
|
||||
with self.subnet(network=n) as s:
|
||||
tdict.return_value = tenant_context
|
||||
err_code = exc.HTTPNotFound.code
|
||||
self._router_interface_action('add',
|
||||
r['router']['id'],
|
||||
s['subnet']['id'],
|
||||
None,
|
||||
err_code)
|
||||
tdict.return_value = admin_context
|
||||
body = self._router_interface_action('add',
|
||||
r['router']['id'],
|
||||
s['subnet']['id'],
|
||||
None)
|
||||
self.assertTrue('port_id' in body)
|
||||
tdict.return_value = tenant_context
|
||||
self._router_interface_action('remove',
|
||||
r['router']['id'],
|
||||
s['subnet']['id'],
|
||||
None,
|
||||
err_code)
|
||||
tdict.return_value = admin_context
|
||||
body = self._router_interface_action('remove',
|
||||
r['router']['id'],
|
||||
s['subnet']['id'],
|
||||
None)
|
||||
|
||||
def test_router_add_interface_port(self):
|
||||
with self.router() as r:
|
||||
with self.port(no_delete=True) as p:
|
||||
@ -407,6 +443,42 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
|
||||
None,
|
||||
p['port']['id'])
|
||||
|
||||
def test_router_add_interface_port_bad_tenant(self):
|
||||
with mock.patch('quantum.context.Context.to_dict') as tdict:
|
||||
tenant_id = _uuid()
|
||||
admin_context = {'roles': ['admin']}
|
||||
tenant_context = {'tenant_id': 'bad_tenant',
|
||||
'roles': []}
|
||||
tdict.return_value = admin_context
|
||||
with self.router() as r:
|
||||
with self.port(no_delete=True) as p:
|
||||
tdict.return_value = tenant_context
|
||||
err_code = exc.HTTPNotFound.code
|
||||
body = self._router_interface_action('add',
|
||||
r['router']['id'],
|
||||
None,
|
||||
p['port']['id'],
|
||||
err_code)
|
||||
tdict.return_value = admin_context
|
||||
body = self._router_interface_action('add',
|
||||
r['router']['id'],
|
||||
None,
|
||||
p['port']['id'])
|
||||
|
||||
tdict.return_value = tenant_context
|
||||
# clean-up
|
||||
self._router_interface_action('remove',
|
||||
r['router']['id'],
|
||||
None,
|
||||
p['port']['id'],
|
||||
err_code)
|
||||
|
||||
tdict.return_value = admin_context
|
||||
self._router_interface_action('remove',
|
||||
r['router']['id'],
|
||||
None,
|
||||
p['port']['id'])
|
||||
|
||||
def test_router_add_interface_dup_subnet1(self):
|
||||
with self.router() as r:
|
||||
with self.subnet() as s:
|
||||
|
Loading…
Reference in New Issue
Block a user