Merge "NSX|V3+P: restrict associate floatingip to router interface/DHCP ports"

This commit is contained in:
Zuul 2019-05-26 09:37:12 +00:00 committed by Gerrit Code Review
commit 4b3d0e9446
5 changed files with 148 additions and 6 deletions

View File

@ -415,6 +415,17 @@ class NsxPluginBase(db_base_plugin_v2.NeutronDbPluginV2,
"Default security group already created", tenant_id)
return self._get_default_sg_id(context, tenant_id)
def _assert_on_assoc_floatingip_to_special_ports(self, fip, internal_port):
"""Do not allow attaching fip to dedicated ports"""
port_id = fip.get('port_id')
dev_owner = internal_port.get('device_owner', '')
if (port_id and dev_owner and
(dev_owner in constants.ROUTER_INTERFACE_OWNERS or
dev_owner == constants.DEVICE_OWNER_DHCP)):
msg = _('Associating floatingip to %s port is '
'restricted') % dev_owner
raise n_exc.BadRequest(resource='floatingip', msg=msg)
def get_housekeeper(self, context, name, fields=None):
# run the job in readonly mode and get the results
self.housekeeper.run(context, name, readonly=True)

View File

@ -2024,11 +2024,18 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
nat_rule_id=self._get_fip_dnat_rule_id(fip_id))
def create_floatingip(self, context, floatingip):
# First do some validations
fip_data = floatingip['floatingip']
port_id = fip_data.get('port_id')
if port_id:
port_data = self.get_port(context, port_id)
self._assert_on_assoc_floatingip_to_special_ports(
fip_data, port_data)
new_fip = super(NsxPolicyPlugin, self).create_floatingip(
context, floatingip, initial_status=(
const.FLOATINGIP_STATUS_ACTIVE
if floatingip['floatingip']['port_id']
else const.FLOATINGIP_STATUS_DOWN))
if port_id else const.FLOATINGIP_STATUS_DOWN))
router_id = new_fip['router_id']
if not router_id:
return new_fip
@ -2053,10 +2060,18 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
super(NsxPolicyPlugin, self).delete_floatingip(context, fip_id)
def update_floatingip(self, context, fip_id, floatingip):
fip_data = floatingip['floatingip']
old_fip = self.get_floatingip(context, fip_id)
new_status = (const.FLOATINGIP_STATUS_ACTIVE
if floatingip['floatingip'].get('port_id')
if fip_data.get('port_id')
else const.FLOATINGIP_STATUS_DOWN)
updated_port_id = fip_data.get('port_id')
if updated_port_id:
updated_port_data = self.get_port(context, updated_port_id)
self._assert_on_assoc_floatingip_to_special_ports(
fip_data, updated_port_data)
new_fip = super(NsxPolicyPlugin, self).update_floatingip(
context, fip_id, floatingip)
router_id = new_fip['router_id']

View File

@ -2932,13 +2932,21 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
else const.FLOATINGIP_STATUS_DOWN))
def create_floatingip(self, context, floatingip):
# First do some validations
fip_data = floatingip['floatingip']
port_id = fip_data.get('port_id')
if port_id:
port_data = self.get_port(context, port_id)
self._assert_on_assoc_floatingip_to_special_ports(
fip_data, port_data)
# create the neutron fip
new_fip = self._create_floating_ip_wrapper(context, floatingip)
router_id = new_fip['router_id']
if not router_id:
return new_fip
port_id = floatingip['floatingip']['port_id']
if port_id:
port_data = self.get_port(context, port_id)
device_owner = port_data.get('device_owner')
fip_address = new_fip['floating_ip_address']
if (device_owner == const.DEVICE_OWNER_LOADBALANCERV2 or
@ -3000,11 +3008,19 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
super(NsxV3Plugin, self).delete_floatingip(context, fip_id)
def update_floatingip(self, context, fip_id, floatingip):
fip_data = floatingip['floatingip']
old_fip = self.get_floatingip(context, fip_id)
old_port_id = old_fip['port_id']
new_status = (const.FLOATINGIP_STATUS_ACTIVE
if floatingip['floatingip'].get('port_id')
if fip_data.get('port_id')
else const.FLOATINGIP_STATUS_DOWN)
updated_port_id = fip_data.get('port_id')
if updated_port_id:
updated_port_data = self.get_port(context, updated_port_id)
self._assert_on_assoc_floatingip_to_special_ports(
fip_data, updated_port_data)
new_fip = super(NsxV3Plugin, self).update_floatingip(
context, fip_id, floatingip)
router_id = new_fip['router_id']

View File

@ -2117,3 +2117,53 @@ class NsxPTestL3NatTestCase(NsxPTestL3NatTest,
def test_router_add_iface_ipv6_ext_ra_subnet_returns_400(self):
self.skipTest('DHCPv6 not supported')
def test_create_floatingip_invalid_fixed_ipv6_address_returns_400(self):
self.skipTest('Failed because of illegal port id')
def test_create_floatingip_with_router_interface_device_owner_fail(self):
# This tests that an error is raised when trying to assign a router
# interface port with floatingip.
with self.subnet(cidr='30.0.0.0/24', gateway_ip=None) as private_sub:
with self.port(
subnet=private_sub,
device_owner=constants.DEVICE_OWNER_ROUTER_INTF) as p:
port_id = p['port']['id']
with self.router() as r:
self._router_interface_action('add', r['router']['id'],
None, port_id)
with self.external_network() as public_net,\
self.subnet(network=public_net, cidr='12.0.0.0/24',
enable_dhcp=False) as public_sub:
self._add_external_gateway_to_router(
r['router']['id'],
public_sub['subnet']['network_id'])
self._make_floatingip(
self.fmt, public_sub['subnet']['network_id'],
port_id=port_id,
http_status=exc.HTTPBadRequest.code)
def test_assign_floatingip_to_router_interface_device_owner_fail(self):
# This tests that an error is raised when trying to assign a router
# interface port with floatingip.
with self.subnet(cidr='30.0.0.0/24', gateway_ip=None) as private_sub:
with self.port(
subnet=private_sub,
device_owner=constants.DEVICE_OWNER_ROUTER_INTF) as p:
port_id = p['port']['id']
with self.router() as r:
self._router_interface_action('add', r['router']['id'],
None, port_id)
with self.external_network() as public_net,\
self.subnet(network=public_net, cidr='12.0.0.0/24',
enable_dhcp=False) as public_sub:
self._add_external_gateway_to_router(
r['router']['id'],
public_sub['subnet']['network_id'])
fip = self._make_floatingip(self.fmt, public_sub[
'subnet']['network_id'])
self._update('floatingips', fip['floatingip'][
'id'], {'floatingip': {'port_id': port_id}},
expected_code=exc.HTTPBadRequest.code)

View File

@ -3161,6 +3161,56 @@ class TestL3NatTestCase(L3NatTest,
expected_code=exc.HTTPBadRequest.code)
self.assertIn('NeutronError', res)
def test_create_floatingip_invalid_fixed_ipv6_address_returns_400(self):
self.skipTest('Failed because of illegal port id')
def test_create_floatingip_with_router_interface_device_owner_fail(self):
# This tests that an error is raised when trying to assign a router
# interface port with floatingip.
with self.subnet(cidr='30.0.0.0/24', gateway_ip=None) as private_sub:
with self.port(
subnet=private_sub,
device_owner=constants.DEVICE_OWNER_ROUTER_INTF) as p:
port_id = p['port']['id']
with self.router() as r:
self._router_interface_action('add', r['router']['id'],
None, port_id)
with self.external_network() as public_net,\
self.subnet(
network=public_net, cidr='12.0.0.0/24') as public_sub:
self._add_external_gateway_to_router(
r['router']['id'],
public_sub['subnet']['network_id'])
self._make_floatingip(
self.fmt, public_sub['subnet']['network_id'],
port_id=port_id,
http_status=exc.HTTPBadRequest.code)
def test_assign_floatingip_to_router_interface_device_owner_fail(self):
# This tests that an error is raised when trying to assign a router
# interface port with floatingip.
with self.subnet(cidr='30.0.0.0/24', gateway_ip=None) as private_sub:
with self.port(
subnet=private_sub,
device_owner=constants.DEVICE_OWNER_ROUTER_INTF) as p:
port_id = p['port']['id']
with self.router() as r:
self._router_interface_action('add', r['router']['id'],
None, port_id)
with self.external_network() as public_net,\
self.subnet(
network=public_net, cidr='12.0.0.0/24') as public_sub:
self._add_external_gateway_to_router(
r['router']['id'],
public_sub['subnet']['network_id'])
fip = self._make_floatingip(self.fmt, public_sub[
'subnet']['network_id'])
self._update('floatingips', fip['floatingip'][
'id'], {'floatingip': {'port_id': port_id}},
expected_code=exc.HTTPBadRequest.code)
class ExtGwModeTestCase(test_ext_gw_mode.ExtGwModeIntTestCase,
L3NatTest):