From bc26f404910091abe929a77b0191a88675c02af1 Mon Sep 17 00:00:00 2001 From: Aaron Rosen Date: Tue, 23 Aug 2016 17:17:54 -0400 Subject: [PATCH] Fix provider sg delete by non admin and non admin rule change This patch restricts the deleting of an provider security group only to the admin thus preventing the tenant from deleting it. It also prevents a non admin user from adding or deleting rules from this group. NOTE: we are using the following policy.json entry to prevent the creation of a provider security group by a normal tenant: "create_security_group:provider": "rule:admin_only" Change-Id: Ie195225654b0c7cd8cfb715691c5a3bb4c8ee13d --- vmware_nsx/db/extended_security_group.py | 16 +++- .../extensions/providersecuritygroup.py | 5 ++ vmware_nsx/plugins/nsx_v/plugin.py | 4 + vmware_nsx/plugins/nsx_v3/plugin.py | 5 +- .../test_provider_security_groups.py | 78 ++++++++++++++++++- 5 files changed, 102 insertions(+), 6 deletions(-) diff --git a/vmware_nsx/db/extended_security_group.py b/vmware_nsx/db/extended_security_group.py index a5e6dd369b..9df96ae800 100644 --- a/vmware_nsx/db/extended_security_group.py +++ b/vmware_nsx/db/extended_security_group.py @@ -16,6 +16,7 @@ from oslo_utils import uuidutils import sqlalchemy as sa from sqlalchemy import orm +from sqlalchemy.orm import exc from neutron.api.v2 import attributes from neutron.common import utils as n_utils @@ -94,9 +95,12 @@ class ExtendedSecurityGroupPropertiesMixin(object): def _get_security_group_properties(self, context, security_group_id): with context.session.begin(subtransactions=True): - prop = context.session.query( - NsxExtendedSecurityGroupProperties).filter_by( - security_group_id=security_group_id).one() + try: + prop = context.session.query( + NsxExtendedSecurityGroupProperties).filter_by( + security_group_id=security_group_id).one() + except exc.NoResultFound: + raise ext_sg.SecurityGroupNotFound(id=security_group_id) return prop def _process_security_group_properties_update(self, context, @@ -238,6 +242,12 @@ class ExtendedSecurityGroupPropertiesMixin(object): context, updated_port, updated_port[provider_sg.PROVIDER_SECURITYGROUPS]) + def _prevent_non_admin_delete_provider_sg(self, context, sg_id): + # Only someone who is an admin is allowed to delete this. + if not context.is_admin and self._is_provider_security_group(context, + sg_id): + raise provider_sg.ProviderSecurityGroupDeleteNotAdmin(id=sg_id) + def _extend_security_group_with_properties(self, sg_res, sg_db): if sg_db.ext_properties: sg_res[sg_logging.LOGGING] = sg_db.ext_properties.logging diff --git a/vmware_nsx/extensions/providersecuritygroup.py b/vmware_nsx/extensions/providersecuritygroup.py index 600afc144e..13c632ce30 100644 --- a/vmware_nsx/extensions/providersecuritygroup.py +++ b/vmware_nsx/extensions/providersecuritygroup.py @@ -61,6 +61,11 @@ class DefaultSecurityGroupIsNotProvider(nexception.InvalidInput): "security-group.") +class ProviderSecurityGroupDeleteNotAdmin(nexception.NotAuthorized): + message = _("Security group %(id)s is a provider security group and " + "requires an admin to delete it.") + + class Providersecuritygroup(extensions.ExtensionDescriptor): """Provider security-group extension.""" diff --git a/vmware_nsx/plugins/nsx_v/plugin.py b/vmware_nsx/plugins/nsx_v/plugin.py index 2846db93cb..7b28d3e2cf 100644 --- a/vmware_nsx/plugins/nsx_v/plugin.py +++ b/vmware_nsx/plugins/nsx_v/plugin.py @@ -2943,6 +2943,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, def delete_security_group(self, context, id): """Delete a security group.""" + self._prevent_non_admin_delete_provider_sg(context, id) try: # Find nsx rule sections section_uri = self._get_section_uri(context.session, id) @@ -3042,6 +3043,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, """ sg_rules = security_group_rules['security_group_rules'] sg_id = sg_rules[0]['security_group_rule']['security_group_id'] + self._prevent_non_admin_delete_provider_sg(context, sg_id) + ruleids = set() nsx_rules = [] @@ -3102,6 +3105,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, """Delete a security group rule.""" rule_db = self._get_security_group_rule(context, id) security_group_id = rule_db['security_group_id'] + self._prevent_non_admin_delete_provider_sg(context, security_group_id) # Get the nsx rule from neutron DB and delete it nsx_rule_id = nsxv_db.get_nsx_rule_id(context.session, id) diff --git a/vmware_nsx/plugins/nsx_v3/plugin.py b/vmware_nsx/plugins/nsx_v3/plugin.py index 3c4cad7ac2..1760f8aef3 100644 --- a/vmware_nsx/plugins/nsx_v3/plugin.py +++ b/vmware_nsx/plugins/nsx_v3/plugin.py @@ -2884,6 +2884,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, return secgroup_res def delete_security_group(self, context, id): + self._prevent_non_admin_delete_provider_sg(context, id) nsgroup_id, section_id = self.nsxlib.get_sg_mappings( context.session, id) super(NsxV3Plugin, self).delete_security_group(context, id) @@ -2918,8 +2919,9 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, # NOTE(arosen): here are are assuming that all of the security # group rules being added are part of the same security # group. We should be validating that this is the case though... - sg_id = sg_rules[0]['security_group_rule']['security_group_id'] + self._prevent_non_admin_delete_provider_sg(context, sg_id) + security_group = self.get_security_group( context, sg_id) action = firewall.ALLOW @@ -2947,6 +2949,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, def delete_security_group_rule(self, context, id): rule_db = self._get_security_group_rule(context, id) sg_id = rule_db['security_group_id'] + self._prevent_non_admin_delete_provider_sg(context, sg_id) _, section_id = self.nsxlib.get_sg_mappings(context.session, sg_id) fw_rule_id = nsx_db.get_sg_rule_mapping(context.session, id) self.nsxlib.delete_rule(section_id, fw_rule_id) diff --git a/vmware_nsx/tests/unit/extensions/test_provider_security_groups.py b/vmware_nsx/tests/unit/extensions/test_provider_security_groups.py index 1ba66c757a..9378c526a8 100644 --- a/vmware_nsx/tests/unit/extensions/test_provider_security_groups.py +++ b/vmware_nsx/tests/unit/extensions/test_provider_security_groups.py @@ -96,6 +96,25 @@ class ProviderSecurityGroupTestPlugin( context, port, original_port, updated_port) return self.get_port(context, id) + def delete_security_group(self, context, id): + self._prevent_non_admin_delete_provider_sg(context, id) + super(ProviderSecurityGroupTestPlugin, + self).delete_security_group(context, id) + + def delete_security_group_rule(self, context, id): + rule_db = self._get_security_group_rule(context, id) + sg_id = rule_db['security_group_id'] + self._prevent_non_admin_delete_provider_sg(context, sg_id) + return super(ProviderSecurityGroupTestPlugin, + self).delete_security_group_rule(context, id) + + def create_security_group_rule(self, context, security_group_rule): + id = security_group_rule['security_group_rule']['security_group_id'] + self._prevent_non_admin_delete_provider_sg(context, id) + return super(ProviderSecurityGroupTestPlugin, + self).create_security_group_rule(context, + security_group_rule) + class ProviderSecurityGroupExtTestCase( test_securitygroup.SecurityGroupDBTestCase): @@ -243,9 +262,64 @@ class ProviderSecurityGroupExtTestCase( port['port']['provider_security_groups']) self.assertEqual([sg_id], port['port']['security_groups']) + def test_non_admin_cannot_delete_provider_sg_and_admin_can(self): + provider_secgroup = self._create_provider_security_group() + pvd_sg_id = provider_secgroup['security_group']['id'] -class TestNSXv3ProviderSecurityGrp(ProviderSecurityGroupExtTestCase, - test_nsxv3_plugin.NsxV3PluginTestCaseMixin): + # Try deleting the request as the normal tenant returns forbidden + # as a tenant is not allowed to delete this. + ctx = context.Context('', self._tenant_id) + self._delete('security-groups', pvd_sg_id, + expected_code=webob.exc.HTTPForbidden.code, + neutron_context=ctx) + # can be deleted though as admin + self._delete('security-groups', pvd_sg_id, + expected_code=webob.exc.HTTPNoContent.code) + + def test_non_admin_cannot_delete_provider_sg_rule(self): + provider_secgroup = self._create_provider_security_group() + pvd_sg_id = provider_secgroup['security_group']['id'] + + data = {'security_group_rule': {'security_group_id': pvd_sg_id, + 'direction': 'ingress', + 'protocol': 'tcp', + 'ethertype': 'IPv4', + 'tenant_id': self._tenant_id}} + + req = self.new_create_request('security-group-rules', data) + res = self.deserialize(self.fmt, req.get_response(self.ext_api)) + + sg_rule_id = res['security_group_rule']['id'] + + # Try deleting the request as the normal tenant returns forbidden + # as a tenant is not allowed to delete this. + ctx = context.Context('', self._tenant_id) + self._delete('security-group-rules', sg_rule_id, + expected_code=webob.exc.HTTPForbidden.code, + neutron_context=ctx) + # can be deleted though as admin + self._delete('security-group-rules', sg_rule_id, + expected_code=webob.exc.HTTPNoContent.code) + + def test_non_admin_cannot_add_provider_sg_rule(self): + provider_secgroup = self._create_provider_security_group() + pvd_sg_id = provider_secgroup['security_group']['id'] + + data = {'security_group_rule': {'security_group_id': pvd_sg_id, + 'direction': 'ingress', + 'protocol': 'tcp', + 'ethertype': 'IPv4', + 'tenant_id': self._tenant_id}} + + req = self.new_create_request( + 'security-group-rules', data) + req.environ['neutron.context'] = context.Context('', self._tenant_id) + res = req.get_response(self.ext_api) + self.assertEqual(webob.exc.HTTPForbidden.code, res.status_int) + + +class TestNSXv3ProviderSecurityGrp(test_nsxv3_plugin.NsxV3PluginTestCaseMixin, + ProviderSecurityGroupExtTestCase): pass