From 2a3e25e383621590375b37cd1303d6fb3464abae Mon Sep 17 00:00:00 2001 From: Aaron Rosen Date: Tue, 8 Jan 2013 22:53:02 -0800 Subject: [PATCH] Adds port security api extension and base class Implements blueprint port-security-api-base-class This patch also updates the _create_network/port in the unit tests so that it does not remove false values from arg_list. Fixes bug 1097527 Change-Id: I22b55b0ed56c830995ffb491176c801c697abe6f --- etc/policy.json | 2 + quantum/db/portsecurity_db.py | 143 +++++++ quantum/db/securitygroups_db.py | 18 + quantum/extensions/portsecurity.py | 82 ++++ quantum/tests/unit/test_db_plugin.py | 22 +- .../tests/unit/test_extension_portsecurity.py | 366 ++++++++++++++++++ 6 files changed, 624 insertions(+), 9 deletions(-) create mode 100644 quantum/db/portsecurity_db.py create mode 100644 quantum/extensions/portsecurity.py create mode 100644 quantum/tests/unit/test_extension_portsecurity.py diff --git a/etc/policy.json b/etc/policy.json index d40741c000..fa7efe8121 100644 --- a/etc/policy.json +++ b/etc/policy.json @@ -38,9 +38,11 @@ "create_port": "", "create_port:mac_address": "rule:admin_or_network_owner", "create_port:fixed_ips": "rule:admin_or_network_owner", + "create_port:port_security_enabled": "rule:admin_or_network_owner", "get_port": "rule:admin_or_owner", "update_port": "rule:admin_or_owner", "update_port:fixed_ips": "rule:admin_or_network_owner", + "update_port:port_security_enabled": "rule:admin_or_network_owner", "delete_port": "rule:admin_or_owner", "extension:service_type:view_extended": "rule:admin_only", diff --git a/quantum/db/portsecurity_db.py b/quantum/db/portsecurity_db.py new file mode 100644 index 0000000000..c409990645 --- /dev/null +++ b/quantum/db/portsecurity_db.py @@ -0,0 +1,143 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 Nicira Networks, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Aaron Rosen, Nicira, Inc + +from sqlalchemy.orm import exc +import sqlalchemy as sa + +from quantum.db import model_base +from quantum.extensions import portsecurity as psec +from quantum.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +class PortSecurityBinding(model_base.BASEV2): + port_id = sa.Column(sa.String(36), + sa.ForeignKey('ports.id', ondelete="CASCADE"), + primary_key=True) + port_security_enabled = sa.Column(sa.Boolean(), nullable=False) + + +class NetworkSecurityBinding(model_base.BASEV2): + network_id = sa.Column(sa.String(36), + sa.ForeignKey('networks.id', ondelete="CASCADE"), + primary_key=True) + port_security_enabled = sa.Column(sa.Boolean(), nullable=False) + + +class PortSecurityDbMixin(object): + """Mixin class to add port security.""" + + def _process_network_create_port_security(self, context, network): + with context.session.begin(subtransactions=True): + db = NetworkSecurityBinding( + network_id=network['id'], + port_security_enabled=network[psec.PORTSECURITY]) + context.session.add(db) + return self._make_network_port_security_dict(db) + + def _extend_network_port_security_dict(self, context, network): + network[psec.PORTSECURITY] = self._get_network_security_binding( + context, network['id']) + + def _extend_port_port_security_dict(self, context, port): + port[psec.PORTSECURITY] = self._get_port_security_binding( + context, port['id']) + + def _get_network_security_binding(self, context, network_id): + try: + query = self._model_query(context, NetworkSecurityBinding) + binding = query.filter( + NetworkSecurityBinding.network_id == network_id).one() + except exc.NoResultFound: + raise psec.PortSecurityBindingNotFound() + return binding[psec.PORTSECURITY] + + def _get_port_security_binding(self, context, port_id): + try: + query = self._model_query(context, PortSecurityBinding) + binding = query.filter( + PortSecurityBinding.port_id == port_id).one() + except exc.NoResultFound: + raise psec.PortSecurityBindingNotFound() + return binding[psec.PORTSECURITY] + + def _update_port_security_binding(self, context, port_id, + port_security_enabled): + try: + query = self._model_query(context, PortSecurityBinding) + binding = query.filter( + PortSecurityBinding.port_id == port_id).one() + + binding.update({psec.PORTSECURITY: port_security_enabled}) + except exc.NoResultFound: + raise psec.PortSecurityBindingNotFound() + + def _update_network_security_binding(self, context, network_id, + port_security_enabled): + try: + query = self._model_query(context, NetworkSecurityBinding) + binding = query.filter( + NetworkSecurityBinding.network_id == network_id).one() + + binding.update({psec.PORTSECURITY: port_security_enabled}) + except exc.NoResultFound: + raise psec.PortSecurityBindingNotFound() + + def _make_network_port_security_dict(self, port_security, fields=None): + res = {'network_id': port_security['network_id'], + psec.PORTSECURITY: port_security[psec.PORTSECURITY]} + return self._fields(res, fields) + + def _determine_port_security_and_has_ip(self, context, port): + """Returns a tuple of (port_security_enabled, has_ip) where + port_security_enabled and has_ip are bools. Port_security is the + value assocated with the port if one is present otherwise the value + associated with the network is returned. has_ip is if the port is + associated with an ip or not. + """ + has_ip = self._ip_on_port(port) + # we don't apply security groups for dhcp, router + if (port.get('device_owner') and + port['device_owner'].startswith('network:')): + return (False, has_ip) + + if (psec.PORTSECURITY in port and + isinstance(port[psec.PORTSECURITY], bool)): + port_security_enabled = port[psec.PORTSECURITY] + else: + port_security_enabled = self._get_network_security_binding( + context, port['network_id']) + + return (port_security_enabled, has_ip) + + def _process_port_security_create(self, context, port): + with context.session.begin(subtransactions=True): + port_security_binding = PortSecurityBinding( + port_id=port['id'], + port_security_enabled=port[psec.PORTSECURITY]) + context.session.add(port_security_binding) + return self._make_port_security_dict(port_security_binding) + + def _make_port_security_dict(self, port, fields=None): + res = {'port_id': port['port_id'], + psec.PORTSECURITY: port[psec.PORTSECURITY]} + return self._fields(res, fields) + + def _ip_on_port(self, port): + return bool(port.get('fixed_ips')) diff --git a/quantum/db/securitygroups_db.py b/quantum/db/securitygroups_db.py index 50b35bba6d..6b22ca7d1b 100644 --- a/quantum/db/securitygroups_db.py +++ b/quantum/db/securitygroups_db.py @@ -484,3 +484,21 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase): else: sgids = [default_sg] port['port'][ext_sg.SECURITYGROUPS] = sgids + + def _check_update_deletes_security_groups(self, port): + """Return True if port has as a security group and it's value + is either [] or not is_attr_set, otherwise return False""" + if (ext_sg.SECURITYGROUPS in port['port'] and + not (attr.is_attr_set(port['port'][ext_sg.SECURITYGROUPS]) + and port['port'][ext_sg.SECURITYGROUPS] != [])): + return True + return False + + def _check_update_has_security_groups(self, port): + """Return True if port has as a security group and False if the + security_group field is is_attr_set or [].""" + if (ext_sg.SECURITYGROUPS in port['port'] and + (attr.is_attr_set(port['port'][ext_sg.SECURITYGROUPS]) and + port['port'][ext_sg.SECURITYGROUPS] != [])): + return True + return False diff --git a/quantum/extensions/portsecurity.py b/quantum/extensions/portsecurity.py new file mode 100644 index 0000000000..90e7ebfbd6 --- /dev/null +++ b/quantum/extensions/portsecurity.py @@ -0,0 +1,82 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 Nicira Networks, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Aaron Rosen, Nicira, Inc + + +from quantum.api.v2 import attributes +from quantum.common import exceptions as qexception + + +class PortSecurityPortHasSecurityGroup(qexception.InUse): + message = _("Port has security group associated. Cannot disable port " + "security or ip address until security group is removed") + + +class PortSecurityAndIPRequiredForSecurityGroups(qexception.InvalidInput): + message = _("Port security must be enabled and port must have an IP" + " address in order to use security groups.") + + +class PortSecurityBindingNotFound(qexception.InvalidExtenstionEnv): + message = _("Port does not have port security binding.") + +PORTSECURITY = 'port_security_enabled' +EXTENDED_ATTRIBUTES_2_0 = { + 'networks': { + PORTSECURITY: {'allow_post': True, 'allow_put': True, + 'convert_to': attributes.convert_to_boolean, + 'default': True, + 'is_visible': True}, + }, + 'ports': { + PORTSECURITY: {'allow_post': True, 'allow_put': True, + 'convert_to': attributes.convert_to_boolean, + 'default': attributes.ATTR_NOT_SPECIFIED, + 'is_visible': True}, + } +} + + +class Portsecurity(object): + """Extension class supporting port security + """ + + @classmethod + def get_name(cls): + return "Port Security" + + @classmethod + def get_alias(cls): + return "port-security" + + @classmethod + def get_description(cls): + return "Provides port security" + + @classmethod + def get_namespace(cls): + return "http://docs.openstack.org/ext/portsecurity/api/v1.0" + + @classmethod + def get_updated(cls): + return "2012-07-23T10:00:00-00:00" + + def get_extended_resources(self, version): + if version == "2.0": + return EXTENDED_ATTRIBUTES_2_0 + else: + return {} diff --git a/quantum/tests/unit/test_db_plugin.py b/quantum/tests/unit/test_db_plugin.py index fbbf77ca03..455856cae2 100644 --- a/quantum/tests/unit/test_db_plugin.py +++ b/quantum/tests/unit/test_db_plugin.py @@ -219,8 +219,8 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): 'tenant_id': self._tenant_id}} for arg in (('admin_state_up', 'tenant_id', 'shared') + (arg_list or ())): - # Arg must be present and not empty - if arg in kwargs and kwargs[arg]: + # Arg must be present + if arg in kwargs: data['network'][arg] = kwargs[arg] network_req = self.new_create_request('networks', data, fmt) if (kwargs.get('set_context') and 'tenant_id' in kwargs): @@ -279,16 +279,18 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): kwargs.update({'override': overrides}) return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs) - def _create_port(self, fmt, net_id, expected_res_status=None, **kwargs): + def _create_port(self, fmt, net_id, expected_res_status=None, + arg_list=None, **kwargs): content_type = 'application/' + fmt data = {'port': {'network_id': net_id, 'tenant_id': self._tenant_id}} - for arg in ('admin_state_up', 'device_id', + for arg in (('admin_state_up', 'device_id', 'mac_address', 'name', 'fixed_ips', - 'tenant_id', 'device_owner', 'security_groups'): - # Arg must be present and not empty - if arg in kwargs and kwargs[arg]: + 'tenant_id', 'device_owner', 'security_groups') + + (arg_list or ())): + # Arg must be present + if arg in kwargs: data['port'][arg] = kwargs[arg] port_req = self.new_create_request('ports', data, fmt) if (kwargs.get('set_context') and 'tenant_id' in kwargs): @@ -469,7 +471,8 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): enable_dhcp=True, dns_nameservers=None, host_routes=None, - shared=None): + shared=None, + do_delete=True): with (self.network() if not network else dummy_context_func()) as network_to_use: if network: @@ -487,7 +490,8 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): try: yield subnet finally: - self._delete('subnets', subnet['subnet']['id']) + if do_delete: + self._delete('subnets', subnet['subnet']['id']) @contextlib.contextmanager def port(self, subnet=None, fmt='json', no_delete=False, diff --git a/quantum/tests/unit/test_extension_portsecurity.py b/quantum/tests/unit/test_extension_portsecurity.py new file mode 100644 index 0000000000..59907327da --- /dev/null +++ b/quantum/tests/unit/test_extension_portsecurity.py @@ -0,0 +1,366 @@ +# Copyright (c) 2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from quantum.api.v2 import attributes as attr +from quantum.common.test_lib import test_config +from quantum import context +from quantum.db import db_base_plugin_v2 +from quantum.db import portsecurity_db +from quantum.db import securitygroups_db +from quantum.extensions import securitygroup as ext_sg +from quantum.extensions import portsecurity as psec +from quantum.manager import QuantumManager +from quantum import policy +from quantum.tests.unit import test_db_plugin + +DB_PLUGIN_KLASS = ('quantum.tests.unit.test_extension_portsecurity.' + 'PortSecurityTestPlugin') + + +class PortSecurityTestCase(test_db_plugin.QuantumDbPluginV2TestCase): + def setUp(self, plugin=None): + super(PortSecurityTestCase, self).setUp() + + # Check if a plugin supports security groups + plugin_obj = QuantumManager.get_plugin() + self._skip_security_group = ('security-group' not in + plugin_obj.supported_extension_aliases) + + def tearDown(self): + super(PortSecurityTestCase, self).tearDown() + self._skip_security_group = None + + +class PortSecurityTestPlugin(db_base_plugin_v2.QuantumDbPluginV2, + securitygroups_db.SecurityGroupDbMixin, + portsecurity_db.PortSecurityDbMixin): + + """ Test plugin that implements necessary calls on create/delete port for + associating ports with security groups and port security. + """ + + supported_extension_aliases = ["security-group", "port-security"] + port_security_enabled_create = "create_port:port_security_enabled" + port_security_enabled_update = "update_port:port_security_enabled" + + def _enforce_set_auth(self, context, resource, action): + return policy.enforce(context, action, resource) + + def create_network(self, context, network): + tenant_id = self._get_tenant_id_for_create(context, network['network']) + self._ensure_default_security_group(context, tenant_id) + with context.session.begin(subtransactions=True): + quantum_db = super(PortSecurityTestPlugin, self).create_network( + context, network) + quantum_db.update(network['network']) + self._process_network_create_port_security( + context, quantum_db) + self._extend_network_port_security_dict(context, quantum_db) + return quantum_db + + def update_network(self, context, id, network): + with context.session.begin(subtransactions=True): + quantum_db = super(PortSecurityTestPlugin, self).update_network( + context, id, network) + if psec.PORTSECURITY in network['network']: + self._update_network_security_binding( + context, id, network['network'][psec.PORTSECURITY]) + self._extend_network_port_security_dict( + context, quantum_db) + return quantum_db + + def get_network(self, context, id, fields=None): + with context.session.begin(subtransactions=True): + net = super(PortSecurityTestPlugin, self).get_network( + context, id) + self._extend_network_port_security_dict(context, net) + return self._fields(net, fields) + + def create_port(self, context, port): + if attr.is_attr_set(port['port'][psec.PORTSECURITY]): + self._enforce_set_auth(context, port, + self.port_security_enabled_create) + p = port['port'] + with context.session.begin(subtransactions=True): + p[ext_sg.SECURITYGROUPS] = self._get_security_groups_on_port( + context, port) + quantum_db = super(PortSecurityTestPlugin, self).create_port( + context, port) + p.update(quantum_db) + + (port_security, has_ip) = self._determine_port_security_and_has_ip( + context, p) + p[psec.PORTSECURITY] = port_security + self._process_port_security_create(context, p) + + if (attr.is_attr_set(p.get(ext_sg.SECURITYGROUPS)) and + not (port_security and has_ip)): + raise psec.PortSecurityAndIPRequiredForSecurityGroups() + + # Port requires ip and port_security enabled for security group + if has_ip and port_security: + self._ensure_default_security_group_on_port(context, port) + + if (p.get(ext_sg.SECURITYGROUPS) and p[psec.PORTSECURITY]): + self._process_port_create_security_group( + context, p['id'], p[ext_sg.SECURITYGROUPS]) + + self._extend_port_dict_security_group(context, p) + self._extend_port_port_security_dict(context, p) + + return port['port'] + + def update_port(self, context, id, port): + self._enforce_set_auth(context, port, + self.port_security_enabled_update) + delete_security_groups = self._check_update_deletes_security_groups( + port) + has_security_groups = self._check_update_has_security_groups(port) + with context.session.begin(subtransactions=True): + ret_port = super(PortSecurityTestPlugin, self).update_port( + context, id, port) + # copy values over + ret_port.update(port['port']) + + # populate port_security setting + if psec.PORTSECURITY not in ret_port: + ret_port[psec.PORTSECURITY] = self._get_port_security_binding( + context, id) + has_ip = self._ip_on_port(ret_port) + # checks if security groups were updated adding/modifying + # security groups, port security is set and port has ip + if (has_security_groups and (not ret_port[psec.PORTSECURITY] + or not has_ip)): + raise psec.PortSecurityAndIPRequiredForSecurityGroups() + + # Port security/IP was updated off. Need to check that no security + # groups are on port. + if (ret_port[psec.PORTSECURITY] is not True or not has_ip): + if has_security_groups: + raise psec.PortSecurityAndIPRequiredForSecurityGroups() + + # get security groups on port + filters = {'port_id': [id]} + security_groups = (super(PortSecurityTestPlugin, self). + _get_port_security_group_bindings( + context, filters)) + if security_groups and not delete_security_groups: + raise psec.PortSecurityPortHasSecurityGroup() + + if (delete_security_groups or has_security_groups): + # delete the port binding and read it with the new rules. + self._delete_port_security_group_bindings(context, id) + sgids = self._get_security_groups_on_port(context, port) + self._process_port_create_security_group(context, id, sgids) + + if psec.PORTSECURITY in port['port']: + self._update_port_security_binding( + context, id, ret_port[psec.PORTSECURITY]) + + self._extend_port_port_security_dict(context, ret_port) + self._extend_port_dict_security_group(context, ret_port) + + return ret_port + + +class PortSecurityDBTestCase(PortSecurityTestCase): + def setUp(self, plugin=None): + test_config['plugin_name_v2'] = DB_PLUGIN_KLASS + super(PortSecurityDBTestCase, self).setUp() + + def tearDown(self): + del test_config['plugin_name_v2'] + super(PortSecurityDBTestCase, self).tearDown() + + +class TestPortSecurity(PortSecurityDBTestCase): + def test_create_network_with_portsecurity_mac(self): + res = self._create_network('json', 'net1', True) + net = self.deserialize('json', res) + self.assertEquals(net['network'][psec.PORTSECURITY], True) + + def test_create_network_with_portsecurity_false(self): + res = self._create_network('json', 'net1', True, + arg_list=('port_security_enabled',), + port_security_enabled=False) + net = self.deserialize('json', res) + self.assertEquals(net['network'][psec.PORTSECURITY], False) + + def test_updating_network_port_security(self): + res = self._create_network('json', 'net1', True, + port_security_enabled='True') + net = self.deserialize('json', res) + self.assertEquals(net['network'][psec.PORTSECURITY], True) + update_net = {'network': {psec.PORTSECURITY: False}} + req = self.new_update_request('networks', update_net, + net['network']['id']) + net = self.deserialize('json', req.get_response(self.api)) + self.assertEquals(net['network'][psec.PORTSECURITY], False) + req = self.new_show_request('networks', net['network']['id']) + net = self.deserialize('json', req.get_response(self.api)) + self.assertEquals(net['network'][psec.PORTSECURITY], False) + + def test_create_port_default_true(self): + with self.network() as net: + res = self._create_port('json', net['network']['id']) + port = self.deserialize('json', res) + self.assertEquals(port['port'][psec.PORTSECURITY], True) + self._delete('ports', port['port']['id']) + + def test_create_port_passing_true(self): + res = self._create_network('json', 'net1', True, + arg_list=('port_security_enabled',), + port_security_enabled=True) + net = self.deserialize('json', res) + res = self._create_port('json', net['network']['id']) + port = self.deserialize('json', res) + self.assertEquals(port['port'][psec.PORTSECURITY], True) + self._delete('ports', port['port']['id']) + + def test_create_port_on_port_security_false_network(self): + res = self._create_network('json', 'net1', True, + arg_list=('port_security_enabled',), + port_security_enabled=False) + net = self.deserialize('json', res) + res = self._create_port('json', net['network']['id']) + port = self.deserialize('json', res) + self.assertEquals(port['port'][psec.PORTSECURITY], False) + self._delete('ports', port['port']['id']) + + def test_create_port_security_overrides_network_value(self): + res = self._create_network('json', 'net1', True, + arg_list=('port_security_enabled',), + port_security_enabled=False) + net = self.deserialize('json', res) + res = self._create_port('json', net['network']['id'], + arg_list=('port_security_enabled',), + port_security_enabled=True) + port = self.deserialize('json', res) + self.assertEquals(port['port'][psec.PORTSECURITY], True) + self._delete('ports', port['port']['id']) + + def test_create_port_with_default_security_group(self): + if self._skip_security_group: + self.skipTest("Plugin does not support security groups") + with self.network() as net: + with self.subnet(network=net): + res = self._create_port('json', net['network']['id']) + port = self.deserialize('json', res) + self.assertEquals(port['port'][psec.PORTSECURITY], True) + self.assertEquals(len(port['port'][ext_sg.SECURITYGROUPS]), 1) + self._delete('ports', port['port']['id']) + + def test_update_port_security_off_with_security_group(self): + if self._skip_security_group: + self.skipTest("Plugin does not support security groups") + with self.network() as net: + with self.subnet(network=net): + res = self._create_port('json', net['network']['id']) + port = self.deserialize('json', res) + self.assertEquals(port['port'][psec.PORTSECURITY], True) + + update_port = {'port': {psec.PORTSECURITY: False}} + req = self.new_update_request('ports', update_port, + port['port']['id']) + res = req.get_response(self.api) + self.assertEqual(res.status_int, 409) + # remove security group on port + update_port = {'port': {ext_sg.SECURITYGROUPS: None}} + req = self.new_update_request('ports', update_port, + port['port']['id']) + + self.deserialize('json', req.get_response(self.api)) + self._delete('ports', port['port']['id']) + + def test_update_port_remove_port_security_security_group(self): + if self._skip_security_group: + self.skipTest("Plugin does not support security groups") + with self.network() as net: + with self.subnet(network=net): + res = self._create_port('json', net['network']['id'], + arg_list=('port_security_enabled',), + port_security_enabled=True) + port = self.deserialize('json', res) + self.assertEquals(port['port'][psec.PORTSECURITY], True) + + # remove security group on port + update_port = {'port': {ext_sg.SECURITYGROUPS: None, + psec.PORTSECURITY: False}} + req = self.new_update_request('ports', update_port, + port['port']['id']) + + port = self.deserialize('json', req.get_response(self.api)) + self.assertEquals(port['port'][psec.PORTSECURITY], False) + self.assertEquals(len(port['port'][ext_sg.SECURITYGROUPS]), 0) + self._delete('ports', port['port']['id']) + + def test_update_port_remove_port_security_security_group_readd(self): + if self._skip_security_group: + self.skipTest("Plugin does not support security groups") + with self.network() as net: + with self.subnet(network=net): + res = self._create_port('json', net['network']['id'], + arg_list=('port_security_enabled',), + port_security_enabled=True) + port = self.deserialize('json', res) + self.assertEquals(port['port'][psec.PORTSECURITY], True) + + # remove security group on port + update_port = {'port': {ext_sg.SECURITYGROUPS: None, + psec.PORTSECURITY: False}} + req = self.new_update_request('ports', update_port, + port['port']['id']) + self.deserialize('json', req.get_response(self.api)) + + sg_id = port['port'][ext_sg.SECURITYGROUPS] + update_port = {'port': {ext_sg.SECURITYGROUPS: [sg_id[0]], + psec.PORTSECURITY: True}} + + req = self.new_update_request('ports', update_port, + port['port']['id']) + + port = self.deserialize('json', req.get_response(self.api)) + self.assertEquals(port['port'][psec.PORTSECURITY], True) + self.assertEquals(len(port['port'][ext_sg.SECURITYGROUPS]), 1) + self._delete('ports', port['port']['id']) + + def test_create_port_security_off_shared_network(self): + with self.network(shared=True) as net: + with self.subnet(network=net): + res = self._create_port('json', net['network']['id'], + arg_list=('port_security_enabled',), + port_security_enabled=False, + tenant_id='not_network_owner', + set_context=True) + self.deserialize('json', res) + self.assertEqual(res.status_int, 403) + + def test_update_port_security_off_shared_network(self): + with self.network(shared=True, do_delete=False) as net: + with self.subnet(network=net, do_delete=False): + res = self._create_port('json', net['network']['id'], + tenant_id='not_network_owner', + set_context=True) + port = self.deserialize('json', res) + # remove security group on port + update_port = {'port': {ext_sg.SECURITYGROUPS: None, + psec.PORTSECURITY: False}} + req = self.new_update_request('ports', update_port, + port['port']['id']) + req.environ['quantum.context'] = context.Context( + '', 'not_network_owner') + res = req.get_response(self.api) + self.assertEqual(res.status_int, 403)