Adds port security api extension and base class
Implements blueprint port-security-api-base-class This patch also updates the _create_network/port in the unit tests so that it does not remove false values from arg_list. Fixes bug 1097527 Change-Id: I22b55b0ed56c830995ffb491176c801c697abe6f
This commit is contained in:
parent
200f9d53be
commit
2a3e25e383
@ -38,9 +38,11 @@
|
||||
"create_port": "",
|
||||
"create_port:mac_address": "rule:admin_or_network_owner",
|
||||
"create_port:fixed_ips": "rule:admin_or_network_owner",
|
||||
"create_port:port_security_enabled": "rule:admin_or_network_owner",
|
||||
"get_port": "rule:admin_or_owner",
|
||||
"update_port": "rule:admin_or_owner",
|
||||
"update_port:fixed_ips": "rule:admin_or_network_owner",
|
||||
"update_port:port_security_enabled": "rule:admin_or_network_owner",
|
||||
"delete_port": "rule:admin_or_owner",
|
||||
|
||||
"extension:service_type:view_extended": "rule:admin_only",
|
||||
|
143
quantum/db/portsecurity_db.py
Normal file
143
quantum/db/portsecurity_db.py
Normal file
@ -0,0 +1,143 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 Nicira Networks, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Aaron Rosen, Nicira, Inc
|
||||
|
||||
from sqlalchemy.orm import exc
|
||||
import sqlalchemy as sa
|
||||
|
||||
from quantum.db import model_base
|
||||
from quantum.extensions import portsecurity as psec
|
||||
from quantum.openstack.common import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PortSecurityBinding(model_base.BASEV2):
|
||||
port_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey('ports.id', ondelete="CASCADE"),
|
||||
primary_key=True)
|
||||
port_security_enabled = sa.Column(sa.Boolean(), nullable=False)
|
||||
|
||||
|
||||
class NetworkSecurityBinding(model_base.BASEV2):
|
||||
network_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey('networks.id', ondelete="CASCADE"),
|
||||
primary_key=True)
|
||||
port_security_enabled = sa.Column(sa.Boolean(), nullable=False)
|
||||
|
||||
|
||||
class PortSecurityDbMixin(object):
|
||||
"""Mixin class to add port security."""
|
||||
|
||||
def _process_network_create_port_security(self, context, network):
|
||||
with context.session.begin(subtransactions=True):
|
||||
db = NetworkSecurityBinding(
|
||||
network_id=network['id'],
|
||||
port_security_enabled=network[psec.PORTSECURITY])
|
||||
context.session.add(db)
|
||||
return self._make_network_port_security_dict(db)
|
||||
|
||||
def _extend_network_port_security_dict(self, context, network):
|
||||
network[psec.PORTSECURITY] = self._get_network_security_binding(
|
||||
context, network['id'])
|
||||
|
||||
def _extend_port_port_security_dict(self, context, port):
|
||||
port[psec.PORTSECURITY] = self._get_port_security_binding(
|
||||
context, port['id'])
|
||||
|
||||
def _get_network_security_binding(self, context, network_id):
|
||||
try:
|
||||
query = self._model_query(context, NetworkSecurityBinding)
|
||||
binding = query.filter(
|
||||
NetworkSecurityBinding.network_id == network_id).one()
|
||||
except exc.NoResultFound:
|
||||
raise psec.PortSecurityBindingNotFound()
|
||||
return binding[psec.PORTSECURITY]
|
||||
|
||||
def _get_port_security_binding(self, context, port_id):
|
||||
try:
|
||||
query = self._model_query(context, PortSecurityBinding)
|
||||
binding = query.filter(
|
||||
PortSecurityBinding.port_id == port_id).one()
|
||||
except exc.NoResultFound:
|
||||
raise psec.PortSecurityBindingNotFound()
|
||||
return binding[psec.PORTSECURITY]
|
||||
|
||||
def _update_port_security_binding(self, context, port_id,
|
||||
port_security_enabled):
|
||||
try:
|
||||
query = self._model_query(context, PortSecurityBinding)
|
||||
binding = query.filter(
|
||||
PortSecurityBinding.port_id == port_id).one()
|
||||
|
||||
binding.update({psec.PORTSECURITY: port_security_enabled})
|
||||
except exc.NoResultFound:
|
||||
raise psec.PortSecurityBindingNotFound()
|
||||
|
||||
def _update_network_security_binding(self, context, network_id,
|
||||
port_security_enabled):
|
||||
try:
|
||||
query = self._model_query(context, NetworkSecurityBinding)
|
||||
binding = query.filter(
|
||||
NetworkSecurityBinding.network_id == network_id).one()
|
||||
|
||||
binding.update({psec.PORTSECURITY: port_security_enabled})
|
||||
except exc.NoResultFound:
|
||||
raise psec.PortSecurityBindingNotFound()
|
||||
|
||||
def _make_network_port_security_dict(self, port_security, fields=None):
|
||||
res = {'network_id': port_security['network_id'],
|
||||
psec.PORTSECURITY: port_security[psec.PORTSECURITY]}
|
||||
return self._fields(res, fields)
|
||||
|
||||
def _determine_port_security_and_has_ip(self, context, port):
|
||||
"""Returns a tuple of (port_security_enabled, has_ip) where
|
||||
port_security_enabled and has_ip are bools. Port_security is the
|
||||
value assocated with the port if one is present otherwise the value
|
||||
associated with the network is returned. has_ip is if the port is
|
||||
associated with an ip or not.
|
||||
"""
|
||||
has_ip = self._ip_on_port(port)
|
||||
# we don't apply security groups for dhcp, router
|
||||
if (port.get('device_owner') and
|
||||
port['device_owner'].startswith('network:')):
|
||||
return (False, has_ip)
|
||||
|
||||
if (psec.PORTSECURITY in port and
|
||||
isinstance(port[psec.PORTSECURITY], bool)):
|
||||
port_security_enabled = port[psec.PORTSECURITY]
|
||||
else:
|
||||
port_security_enabled = self._get_network_security_binding(
|
||||
context, port['network_id'])
|
||||
|
||||
return (port_security_enabled, has_ip)
|
||||
|
||||
def _process_port_security_create(self, context, port):
|
||||
with context.session.begin(subtransactions=True):
|
||||
port_security_binding = PortSecurityBinding(
|
||||
port_id=port['id'],
|
||||
port_security_enabled=port[psec.PORTSECURITY])
|
||||
context.session.add(port_security_binding)
|
||||
return self._make_port_security_dict(port_security_binding)
|
||||
|
||||
def _make_port_security_dict(self, port, fields=None):
|
||||
res = {'port_id': port['port_id'],
|
||||
psec.PORTSECURITY: port[psec.PORTSECURITY]}
|
||||
return self._fields(res, fields)
|
||||
|
||||
def _ip_on_port(self, port):
|
||||
return bool(port.get('fixed_ips'))
|
@ -484,3 +484,21 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
|
||||
else:
|
||||
sgids = [default_sg]
|
||||
port['port'][ext_sg.SECURITYGROUPS] = sgids
|
||||
|
||||
def _check_update_deletes_security_groups(self, port):
|
||||
"""Return True if port has as a security group and it's value
|
||||
is either [] or not is_attr_set, otherwise return False"""
|
||||
if (ext_sg.SECURITYGROUPS in port['port'] and
|
||||
not (attr.is_attr_set(port['port'][ext_sg.SECURITYGROUPS])
|
||||
and port['port'][ext_sg.SECURITYGROUPS] != [])):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _check_update_has_security_groups(self, port):
|
||||
"""Return True if port has as a security group and False if the
|
||||
security_group field is is_attr_set or []."""
|
||||
if (ext_sg.SECURITYGROUPS in port['port'] and
|
||||
(attr.is_attr_set(port['port'][ext_sg.SECURITYGROUPS]) and
|
||||
port['port'][ext_sg.SECURITYGROUPS] != [])):
|
||||
return True
|
||||
return False
|
||||
|
82
quantum/extensions/portsecurity.py
Normal file
82
quantum/extensions/portsecurity.py
Normal file
@ -0,0 +1,82 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 Nicira Networks, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Aaron Rosen, Nicira, Inc
|
||||
|
||||
|
||||
from quantum.api.v2 import attributes
|
||||
from quantum.common import exceptions as qexception
|
||||
|
||||
|
||||
class PortSecurityPortHasSecurityGroup(qexception.InUse):
|
||||
message = _("Port has security group associated. Cannot disable port "
|
||||
"security or ip address until security group is removed")
|
||||
|
||||
|
||||
class PortSecurityAndIPRequiredForSecurityGroups(qexception.InvalidInput):
|
||||
message = _("Port security must be enabled and port must have an IP"
|
||||
" address in order to use security groups.")
|
||||
|
||||
|
||||
class PortSecurityBindingNotFound(qexception.InvalidExtenstionEnv):
|
||||
message = _("Port does not have port security binding.")
|
||||
|
||||
PORTSECURITY = 'port_security_enabled'
|
||||
EXTENDED_ATTRIBUTES_2_0 = {
|
||||
'networks': {
|
||||
PORTSECURITY: {'allow_post': True, 'allow_put': True,
|
||||
'convert_to': attributes.convert_to_boolean,
|
||||
'default': True,
|
||||
'is_visible': True},
|
||||
},
|
||||
'ports': {
|
||||
PORTSECURITY: {'allow_post': True, 'allow_put': True,
|
||||
'convert_to': attributes.convert_to_boolean,
|
||||
'default': attributes.ATTR_NOT_SPECIFIED,
|
||||
'is_visible': True},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class Portsecurity(object):
|
||||
"""Extension class supporting port security
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
return "Port Security"
|
||||
|
||||
@classmethod
|
||||
def get_alias(cls):
|
||||
return "port-security"
|
||||
|
||||
@classmethod
|
||||
def get_description(cls):
|
||||
return "Provides port security"
|
||||
|
||||
@classmethod
|
||||
def get_namespace(cls):
|
||||
return "http://docs.openstack.org/ext/portsecurity/api/v1.0"
|
||||
|
||||
@classmethod
|
||||
def get_updated(cls):
|
||||
return "2012-07-23T10:00:00-00:00"
|
||||
|
||||
def get_extended_resources(self, version):
|
||||
if version == "2.0":
|
||||
return EXTENDED_ATTRIBUTES_2_0
|
||||
else:
|
||||
return {}
|
@ -219,8 +219,8 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
|
||||
'tenant_id': self._tenant_id}}
|
||||
for arg in (('admin_state_up', 'tenant_id', 'shared') +
|
||||
(arg_list or ())):
|
||||
# Arg must be present and not empty
|
||||
if arg in kwargs and kwargs[arg]:
|
||||
# Arg must be present
|
||||
if arg in kwargs:
|
||||
data['network'][arg] = kwargs[arg]
|
||||
network_req = self.new_create_request('networks', data, fmt)
|
||||
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
|
||||
@ -279,16 +279,18 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
|
||||
kwargs.update({'override': overrides})
|
||||
return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
|
||||
|
||||
def _create_port(self, fmt, net_id, expected_res_status=None, **kwargs):
|
||||
def _create_port(self, fmt, net_id, expected_res_status=None,
|
||||
arg_list=None, **kwargs):
|
||||
content_type = 'application/' + fmt
|
||||
data = {'port': {'network_id': net_id,
|
||||
'tenant_id': self._tenant_id}}
|
||||
|
||||
for arg in ('admin_state_up', 'device_id',
|
||||
for arg in (('admin_state_up', 'device_id',
|
||||
'mac_address', 'name', 'fixed_ips',
|
||||
'tenant_id', 'device_owner', 'security_groups'):
|
||||
# Arg must be present and not empty
|
||||
if arg in kwargs and kwargs[arg]:
|
||||
'tenant_id', 'device_owner', 'security_groups') +
|
||||
(arg_list or ())):
|
||||
# Arg must be present
|
||||
if arg in kwargs:
|
||||
data['port'][arg] = kwargs[arg]
|
||||
port_req = self.new_create_request('ports', data, fmt)
|
||||
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
|
||||
@ -469,7 +471,8 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
|
||||
enable_dhcp=True,
|
||||
dns_nameservers=None,
|
||||
host_routes=None,
|
||||
shared=None):
|
||||
shared=None,
|
||||
do_delete=True):
|
||||
with (self.network() if not network
|
||||
else dummy_context_func()) as network_to_use:
|
||||
if network:
|
||||
@ -487,6 +490,7 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
|
||||
try:
|
||||
yield subnet
|
||||
finally:
|
||||
if do_delete:
|
||||
self._delete('subnets', subnet['subnet']['id'])
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
366
quantum/tests/unit/test_extension_portsecurity.py
Normal file
366
quantum/tests/unit/test_extension_portsecurity.py
Normal file
@ -0,0 +1,366 @@
|
||||
# Copyright (c) 2012 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from quantum.api.v2 import attributes as attr
|
||||
from quantum.common.test_lib import test_config
|
||||
from quantum import context
|
||||
from quantum.db import db_base_plugin_v2
|
||||
from quantum.db import portsecurity_db
|
||||
from quantum.db import securitygroups_db
|
||||
from quantum.extensions import securitygroup as ext_sg
|
||||
from quantum.extensions import portsecurity as psec
|
||||
from quantum.manager import QuantumManager
|
||||
from quantum import policy
|
||||
from quantum.tests.unit import test_db_plugin
|
||||
|
||||
DB_PLUGIN_KLASS = ('quantum.tests.unit.test_extension_portsecurity.'
|
||||
'PortSecurityTestPlugin')
|
||||
|
||||
|
||||
class PortSecurityTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
|
||||
def setUp(self, plugin=None):
|
||||
super(PortSecurityTestCase, self).setUp()
|
||||
|
||||
# Check if a plugin supports security groups
|
||||
plugin_obj = QuantumManager.get_plugin()
|
||||
self._skip_security_group = ('security-group' not in
|
||||
plugin_obj.supported_extension_aliases)
|
||||
|
||||
def tearDown(self):
|
||||
super(PortSecurityTestCase, self).tearDown()
|
||||
self._skip_security_group = None
|
||||
|
||||
|
||||
class PortSecurityTestPlugin(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
securitygroups_db.SecurityGroupDbMixin,
|
||||
portsecurity_db.PortSecurityDbMixin):
|
||||
|
||||
""" Test plugin that implements necessary calls on create/delete port for
|
||||
associating ports with security groups and port security.
|
||||
"""
|
||||
|
||||
supported_extension_aliases = ["security-group", "port-security"]
|
||||
port_security_enabled_create = "create_port:port_security_enabled"
|
||||
port_security_enabled_update = "update_port:port_security_enabled"
|
||||
|
||||
def _enforce_set_auth(self, context, resource, action):
|
||||
return policy.enforce(context, action, resource)
|
||||
|
||||
def create_network(self, context, network):
|
||||
tenant_id = self._get_tenant_id_for_create(context, network['network'])
|
||||
self._ensure_default_security_group(context, tenant_id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
quantum_db = super(PortSecurityTestPlugin, self).create_network(
|
||||
context, network)
|
||||
quantum_db.update(network['network'])
|
||||
self._process_network_create_port_security(
|
||||
context, quantum_db)
|
||||
self._extend_network_port_security_dict(context, quantum_db)
|
||||
return quantum_db
|
||||
|
||||
def update_network(self, context, id, network):
|
||||
with context.session.begin(subtransactions=True):
|
||||
quantum_db = super(PortSecurityTestPlugin, self).update_network(
|
||||
context, id, network)
|
||||
if psec.PORTSECURITY in network['network']:
|
||||
self._update_network_security_binding(
|
||||
context, id, network['network'][psec.PORTSECURITY])
|
||||
self._extend_network_port_security_dict(
|
||||
context, quantum_db)
|
||||
return quantum_db
|
||||
|
||||
def get_network(self, context, id, fields=None):
|
||||
with context.session.begin(subtransactions=True):
|
||||
net = super(PortSecurityTestPlugin, self).get_network(
|
||||
context, id)
|
||||
self._extend_network_port_security_dict(context, net)
|
||||
return self._fields(net, fields)
|
||||
|
||||
def create_port(self, context, port):
|
||||
if attr.is_attr_set(port['port'][psec.PORTSECURITY]):
|
||||
self._enforce_set_auth(context, port,
|
||||
self.port_security_enabled_create)
|
||||
p = port['port']
|
||||
with context.session.begin(subtransactions=True):
|
||||
p[ext_sg.SECURITYGROUPS] = self._get_security_groups_on_port(
|
||||
context, port)
|
||||
quantum_db = super(PortSecurityTestPlugin, self).create_port(
|
||||
context, port)
|
||||
p.update(quantum_db)
|
||||
|
||||
(port_security, has_ip) = self._determine_port_security_and_has_ip(
|
||||
context, p)
|
||||
p[psec.PORTSECURITY] = port_security
|
||||
self._process_port_security_create(context, p)
|
||||
|
||||
if (attr.is_attr_set(p.get(ext_sg.SECURITYGROUPS)) and
|
||||
not (port_security and has_ip)):
|
||||
raise psec.PortSecurityAndIPRequiredForSecurityGroups()
|
||||
|
||||
# Port requires ip and port_security enabled for security group
|
||||
if has_ip and port_security:
|
||||
self._ensure_default_security_group_on_port(context, port)
|
||||
|
||||
if (p.get(ext_sg.SECURITYGROUPS) and p[psec.PORTSECURITY]):
|
||||
self._process_port_create_security_group(
|
||||
context, p['id'], p[ext_sg.SECURITYGROUPS])
|
||||
|
||||
self._extend_port_dict_security_group(context, p)
|
||||
self._extend_port_port_security_dict(context, p)
|
||||
|
||||
return port['port']
|
||||
|
||||
def update_port(self, context, id, port):
|
||||
self._enforce_set_auth(context, port,
|
||||
self.port_security_enabled_update)
|
||||
delete_security_groups = self._check_update_deletes_security_groups(
|
||||
port)
|
||||
has_security_groups = self._check_update_has_security_groups(port)
|
||||
with context.session.begin(subtransactions=True):
|
||||
ret_port = super(PortSecurityTestPlugin, self).update_port(
|
||||
context, id, port)
|
||||
# copy values over
|
||||
ret_port.update(port['port'])
|
||||
|
||||
# populate port_security setting
|
||||
if psec.PORTSECURITY not in ret_port:
|
||||
ret_port[psec.PORTSECURITY] = self._get_port_security_binding(
|
||||
context, id)
|
||||
has_ip = self._ip_on_port(ret_port)
|
||||
# checks if security groups were updated adding/modifying
|
||||
# security groups, port security is set and port has ip
|
||||
if (has_security_groups and (not ret_port[psec.PORTSECURITY]
|
||||
or not has_ip)):
|
||||
raise psec.PortSecurityAndIPRequiredForSecurityGroups()
|
||||
|
||||
# Port security/IP was updated off. Need to check that no security
|
||||
# groups are on port.
|
||||
if (ret_port[psec.PORTSECURITY] is not True or not has_ip):
|
||||
if has_security_groups:
|
||||
raise psec.PortSecurityAndIPRequiredForSecurityGroups()
|
||||
|
||||
# get security groups on port
|
||||
filters = {'port_id': [id]}
|
||||
security_groups = (super(PortSecurityTestPlugin, self).
|
||||
_get_port_security_group_bindings(
|
||||
context, filters))
|
||||
if security_groups and not delete_security_groups:
|
||||
raise psec.PortSecurityPortHasSecurityGroup()
|
||||
|
||||
if (delete_security_groups or has_security_groups):
|
||||
# delete the port binding and read it with the new rules.
|
||||
self._delete_port_security_group_bindings(context, id)
|
||||
sgids = self._get_security_groups_on_port(context, port)
|
||||
self._process_port_create_security_group(context, id, sgids)
|
||||
|
||||
if psec.PORTSECURITY in port['port']:
|
||||
self._update_port_security_binding(
|
||||
context, id, ret_port[psec.PORTSECURITY])
|
||||
|
||||
self._extend_port_port_security_dict(context, ret_port)
|
||||
self._extend_port_dict_security_group(context, ret_port)
|
||||
|
||||
return ret_port
|
||||
|
||||
|
||||
class PortSecurityDBTestCase(PortSecurityTestCase):
|
||||
def setUp(self, plugin=None):
|
||||
test_config['plugin_name_v2'] = DB_PLUGIN_KLASS
|
||||
super(PortSecurityDBTestCase, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
del test_config['plugin_name_v2']
|
||||
super(PortSecurityDBTestCase, self).tearDown()
|
||||
|
||||
|
||||
class TestPortSecurity(PortSecurityDBTestCase):
|
||||
def test_create_network_with_portsecurity_mac(self):
|
||||
res = self._create_network('json', 'net1', True)
|
||||
net = self.deserialize('json', res)
|
||||
self.assertEquals(net['network'][psec.PORTSECURITY], True)
|
||||
|
||||
def test_create_network_with_portsecurity_false(self):
|
||||
res = self._create_network('json', 'net1', True,
|
||||
arg_list=('port_security_enabled',),
|
||||
port_security_enabled=False)
|
||||
net = self.deserialize('json', res)
|
||||
self.assertEquals(net['network'][psec.PORTSECURITY], False)
|
||||
|
||||
def test_updating_network_port_security(self):
|
||||
res = self._create_network('json', 'net1', True,
|
||||
port_security_enabled='True')
|
||||
net = self.deserialize('json', res)
|
||||
self.assertEquals(net['network'][psec.PORTSECURITY], True)
|
||||
update_net = {'network': {psec.PORTSECURITY: False}}
|
||||
req = self.new_update_request('networks', update_net,
|
||||
net['network']['id'])
|
||||
net = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertEquals(net['network'][psec.PORTSECURITY], False)
|
||||
req = self.new_show_request('networks', net['network']['id'])
|
||||
net = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertEquals(net['network'][psec.PORTSECURITY], False)
|
||||
|
||||
def test_create_port_default_true(self):
|
||||
with self.network() as net:
|
||||
res = self._create_port('json', net['network']['id'])
|
||||
port = self.deserialize('json', res)
|
||||
self.assertEquals(port['port'][psec.PORTSECURITY], True)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_create_port_passing_true(self):
|
||||
res = self._create_network('json', 'net1', True,
|
||||
arg_list=('port_security_enabled',),
|
||||
port_security_enabled=True)
|
||||
net = self.deserialize('json', res)
|
||||
res = self._create_port('json', net['network']['id'])
|
||||
port = self.deserialize('json', res)
|
||||
self.assertEquals(port['port'][psec.PORTSECURITY], True)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_create_port_on_port_security_false_network(self):
|
||||
res = self._create_network('json', 'net1', True,
|
||||
arg_list=('port_security_enabled',),
|
||||
port_security_enabled=False)
|
||||
net = self.deserialize('json', res)
|
||||
res = self._create_port('json', net['network']['id'])
|
||||
port = self.deserialize('json', res)
|
||||
self.assertEquals(port['port'][psec.PORTSECURITY], False)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_create_port_security_overrides_network_value(self):
|
||||
res = self._create_network('json', 'net1', True,
|
||||
arg_list=('port_security_enabled',),
|
||||
port_security_enabled=False)
|
||||
net = self.deserialize('json', res)
|
||||
res = self._create_port('json', net['network']['id'],
|
||||
arg_list=('port_security_enabled',),
|
||||
port_security_enabled=True)
|
||||
port = self.deserialize('json', res)
|
||||
self.assertEquals(port['port'][psec.PORTSECURITY], True)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_create_port_with_default_security_group(self):
|
||||
if self._skip_security_group:
|
||||
self.skipTest("Plugin does not support security groups")
|
||||
with self.network() as net:
|
||||
with self.subnet(network=net):
|
||||
res = self._create_port('json', net['network']['id'])
|
||||
port = self.deserialize('json', res)
|
||||
self.assertEquals(port['port'][psec.PORTSECURITY], True)
|
||||
self.assertEquals(len(port['port'][ext_sg.SECURITYGROUPS]), 1)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_update_port_security_off_with_security_group(self):
|
||||
if self._skip_security_group:
|
||||
self.skipTest("Plugin does not support security groups")
|
||||
with self.network() as net:
|
||||
with self.subnet(network=net):
|
||||
res = self._create_port('json', net['network']['id'])
|
||||
port = self.deserialize('json', res)
|
||||
self.assertEquals(port['port'][psec.PORTSECURITY], True)
|
||||
|
||||
update_port = {'port': {psec.PORTSECURITY: False}}
|
||||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(res.status_int, 409)
|
||||
# remove security group on port
|
||||
update_port = {'port': {ext_sg.SECURITYGROUPS: None}}
|
||||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
|
||||
self.deserialize('json', req.get_response(self.api))
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_update_port_remove_port_security_security_group(self):
|
||||
if self._skip_security_group:
|
||||
self.skipTest("Plugin does not support security groups")
|
||||
with self.network() as net:
|
||||
with self.subnet(network=net):
|
||||
res = self._create_port('json', net['network']['id'],
|
||||
arg_list=('port_security_enabled',),
|
||||
port_security_enabled=True)
|
||||
port = self.deserialize('json', res)
|
||||
self.assertEquals(port['port'][psec.PORTSECURITY], True)
|
||||
|
||||
# remove security group on port
|
||||
update_port = {'port': {ext_sg.SECURITYGROUPS: None,
|
||||
psec.PORTSECURITY: False}}
|
||||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
|
||||
port = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertEquals(port['port'][psec.PORTSECURITY], False)
|
||||
self.assertEquals(len(port['port'][ext_sg.SECURITYGROUPS]), 0)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_update_port_remove_port_security_security_group_readd(self):
|
||||
if self._skip_security_group:
|
||||
self.skipTest("Plugin does not support security groups")
|
||||
with self.network() as net:
|
||||
with self.subnet(network=net):
|
||||
res = self._create_port('json', net['network']['id'],
|
||||
arg_list=('port_security_enabled',),
|
||||
port_security_enabled=True)
|
||||
port = self.deserialize('json', res)
|
||||
self.assertEquals(port['port'][psec.PORTSECURITY], True)
|
||||
|
||||
# remove security group on port
|
||||
update_port = {'port': {ext_sg.SECURITYGROUPS: None,
|
||||
psec.PORTSECURITY: False}}
|
||||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
self.deserialize('json', req.get_response(self.api))
|
||||
|
||||
sg_id = port['port'][ext_sg.SECURITYGROUPS]
|
||||
update_port = {'port': {ext_sg.SECURITYGROUPS: [sg_id[0]],
|
||||
psec.PORTSECURITY: True}}
|
||||
|
||||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
|
||||
port = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertEquals(port['port'][psec.PORTSECURITY], True)
|
||||
self.assertEquals(len(port['port'][ext_sg.SECURITYGROUPS]), 1)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_create_port_security_off_shared_network(self):
|
||||
with self.network(shared=True) as net:
|
||||
with self.subnet(network=net):
|
||||
res = self._create_port('json', net['network']['id'],
|
||||
arg_list=('port_security_enabled',),
|
||||
port_security_enabled=False,
|
||||
tenant_id='not_network_owner',
|
||||
set_context=True)
|
||||
self.deserialize('json', res)
|
||||
self.assertEqual(res.status_int, 403)
|
||||
|
||||
def test_update_port_security_off_shared_network(self):
|
||||
with self.network(shared=True, do_delete=False) as net:
|
||||
with self.subnet(network=net, do_delete=False):
|
||||
res = self._create_port('json', net['network']['id'],
|
||||
tenant_id='not_network_owner',
|
||||
set_context=True)
|
||||
port = self.deserialize('json', res)
|
||||
# remove security group on port
|
||||
update_port = {'port': {ext_sg.SECURITYGROUPS: None,
|
||||
psec.PORTSECURITY: False}}
|
||||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
req.environ['quantum.context'] = context.Context(
|
||||
'', 'not_network_owner')
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(res.status_int, 403)
|
Loading…
Reference in New Issue
Block a user