Relax IPAM validation for port IPs

Allow for setting multiple IPs from the same subnet, only if
the subnet is v4 and DHCP is disabled for the subnet.

Still make sure at most 1 IPv4 and 1 IPv6 subnet can be
configured.

Change-Id: I1113a24fa8cc09892bc89917d50c64f6a72c0dab
This commit is contained in:
Salvatore Orlando 2021-05-03 07:31:18 -07:00 committed by Salvatore
parent ea2d0a120c
commit 607afed94d
3 changed files with 156 additions and 53 deletions

View File

@ -627,30 +627,88 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
if not validators.is_attr_set(fixed_ip_list):
return
msg = _('Exceeded maximum amount of fixed ips per port and ip version')
if len(fixed_ip_list) > 2:
raise n_exc.InvalidInput(error_message=msg)
if len(fixed_ip_list) < 2:
# Do not perform additional checks in the trivial case where there is a
# single fixed IP
if len(fixed_ip_list) <= 1:
return
def get_fixed_ip_version(i):
if 'ip_address' in fixed_ip_list[i]:
return netaddr.IPAddress(
fixed_ip_list[i]['ip_address']).version
if 'subnet_id' in fixed_ip_list[i]:
subnet = self.get_subnet(context.elevated(),
fixed_ip_list[i]['subnet_id'])
return subnet['ip_version']
return None
# Validation needed
err_msg = _("Exceeded maximum amount of fixed IPs "
"per port and IP version")
ip1_err_msg = ("Subnet %s is IPv6 or has DHCP enabled. IP %s "
"already allocated for this port. Cannot allocate %s")
ip2_err_msg = ("Subnet %s is IPv6 or has DHCP enabled. Can only "
"allocate a single IP per subnet. Requested %d")
ipver1 = get_fixed_ip_version(0)
ipver2 = get_fixed_ip_version(1)
if ipver1 and ipver2 and ipver1 != ipver2:
# Build a map of fixed IPs per subnet id
subnet_fixed_ip_dict = {}
fixed_ips_no_subnet = []
for item in fixed_ip_list:
if item.get('subnet_id'):
subnet_id = item['subnet_id']
subnet_fixed_ips = subnet_fixed_ip_dict.setdefault(
subnet_id, [])
subnet_fixed_ips.append(item.get('ip_address', 'UNSPECIFIED'))
else:
# Log something so we know we know this entry will be
# processed differently
LOG.debug("Fixed IP entry %s has no subnet ID", item)
if item.get('ip_address'):
fixed_ips_no_subnet.append(item['ip_address'])
if len(subnet_fixed_ip_dict) > 2:
raise n_exc.InvalidInput(error_message=err_msg)
# This works as a cache, fetch from DB only the subnets we need
# and reuse them -> optimize DB access
subnet_map = {}
def infer_fixed_ip_version(subnet_id, fixed_ips):
# Infer from IPs
for ip_address in fixed_ips:
if ip_address != 'UNSPECIFIED':
return (subnet_id,
netaddr.IPAddress(ip_address).version)
# Infer from subnet
subnet = subnet_map.setdefault(
subnet_id, self.get_subnet(context.elevated(), subnet_id))
return (subnet_id, subnet['ip_version'])
# If we are here we have at most 2 subnets, and must have different IP
# version
ip_versions = [infer_fixed_ip_version(subnet_id, fixed_ips) for
(subnet_id, fixed_ips) in subnet_fixed_ip_dict.items()]
for item in zip(ip_versions, ip_versions):
if item[0][0] != item[1][0] and item[0][1] == item[1][1]:
# Not good! Different subnet but same IP version
# One fixed IP is allowed for each IP version
return
raise n_exc.InvalidInput(error_message=err_msg)
raise n_exc.InvalidInput(error_message=msg)
def validate_subnet_dhcp_and_ipv6_state(subnet_id, fixed_ips):
subnet = subnet_map.setdefault(
subnet_id, self.get_subnet(context.elevated(), subnet_id))
if ((subnet['enable_dhcp'] or subnet.get('ip_version') == 6) and
len(fixed_ips) > 1):
raise n_exc.InvalidInput(
error_message=ip2_err_msg %
(subnet_id, len(fixed_ips)))
if ((subnet['enable_dhcp'] or subnet.get('ip_version') == 6) and
fixed_ips):
cidr = netaddr.IPNetwork(subnet['cidr'])
for ip in fixed_ips_no_subnet:
# netaddr converts string IP into IPAddress
if ip in cidr:
raise n_exc.InvalidInput(
error_message=ip1_err_msg %
(subnet_id, ",".join(fixed_ips), ip))
# Validate if any "unbound" IP would fall on a subnet which has DHCP
# enabled and other IPs. Also ensure no more than one IP for any
# DHCP subnet is requested. fixed_ip attribute does not have patch
# behaviour, so all requested IP allocations must be included in it
for subnet_id, fixed_ips in subnet_fixed_ip_dict.items():
validate_subnet_dhcp_and_ipv6_state(subnet_id, fixed_ips)
def _get_subnets_for_fixed_ips_on_port(self, context, port_data):
# get the subnet id from the fixed ips of the port
@ -1696,10 +1754,11 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
LOG.error("Unable to delete DHCP server mapping for "
"network %s", network_id)
def _filter_ipv4_dhcp_fixed_ips(self, context, fixed_ips):
def _filter_dhcp_fixed_ips(self, context, fixed_ips, version=None):
ips = []
for fixed_ip in fixed_ips:
if netaddr.IPNetwork(fixed_ip['ip_address']).version != 4:
if (version and
netaddr.IPNetwork(fixed_ip['ip_address']).version != version):
continue
with db_api.CONTEXT_READER.using(context):
subnet = self.get_subnet(context, fixed_ip['subnet_id'])
@ -1707,6 +1766,9 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
ips.append(fixed_ip)
return ips
def _filter_ipv4_dhcp_fixed_ips(self, context, fixed_ips):
return self._filter_dhcp_fixed_ips(context, fixed_ips, 4)
def _add_port_mp_dhcp_binding(self, context, port):
if not utils.is_port_dhcp_configurable(port):
return

View File

@ -17,9 +17,12 @@ import contextlib
import decorator
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
import netaddr
from neutron_lib import constants
from webob import exc
class FixExternalNetBaseTest(object):
@ -400,8 +403,75 @@ class NsxV3TestPorts(test_plugin.TestPortsV2):
def test_create_port_anticipating_allocation(self):
self.skipTest('Multiple fixed ips on a port are not supported')
def test_create_port_additional_ip(self):
"""Test that creation of port with additional IP fails."""
with self.subnet() as subnet:
data = {'port': {'network_id': subnet['subnet']['network_id'],
'tenant_id': subnet['subnet']['tenant_id'],
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
port_req = self.new_create_request('ports', data)
res = port_req.get_response(self.api)
self.assertEqual(exc.HTTPBadRequest.code,
res.status_int)
def test_create_port_additional_ip_no_dhcp(self):
"""Creation of port with additional IP succeeds with DHCP off."""
with self.subnet(enable_dhcp=False) as subnet:
data = {'port': {'network_id': subnet['subnet']['network_id'],
'tenant_id': subnet['subnet']['tenant_id'],
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
port_req = self.new_create_request('ports', data)
res = port_req.get_response(self.api)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertNotEqual(ips[0]['ip_address'],
ips[1]['ip_address'])
network_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
self.assertIn(ips[0]['ip_address'], network_ip_net)
self.assertIn(ips[1]['ip_address'], network_ip_net)
def test_update_port_add_additional_ip(self):
self.skipTest('Multiple fixed ips on a port are not supported')
"""Test update of port with additional IP fails."""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(exc.HTTPBadRequest.code,
res.status_int)
def test_update_port_add_additional_ip_no_dhcp(self):
"""Test update of port with additional IP succeeds if DHCP is off."""
with self.subnet(enable_dhcp=False) as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertNotEqual(ips[0]['ip_address'],
ips[1]['ip_address'])
network_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
self.assertIn(ips[0]['ip_address'], network_ip_net)
self.assertIn(ips[1]['ip_address'], network_ip_net)
def test_delete_network_port_exists_owned_by_network_race(self):
self.skipTest('Skip need to address in future')

View File

@ -1141,7 +1141,7 @@ class TestPortsV2(common_v3.NsxV3SubnetMixin,
'fixed_ips': []}
}
port = self.plugin.create_port(self.ctx, data)
data['port']['fixed_ips'] = '10.0.0.1'
data['port']['fixed_ips'] = [{'ip_address': '10.0.0.1'}]
self.assertRaises(
n_exc.InvalidInput,
self.plugin.update_port, self.ctx, port['id'], data)
@ -1589,35 +1589,6 @@ class TestPortsV2(common_v3.NsxV3SubnetMixin,
created_port = self.plugin.create_port(self.ctx, data)
self.assertEqual('fake_device', created_port['device_id'])
def test_update_port_add_additional_ip(self):
"""Test update of port with additional IP fails."""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(exc.HTTPBadRequest.code,
res.status_int)
def test_create_port_additional_ip(self):
"""Test that creation of port with additional IP fails."""
with self.subnet() as subnet:
data = {'port': {'network_id': subnet['subnet']['network_id'],
'tenant_id': subnet['subnet']['tenant_id'],
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
port_req = self.new_create_request('ports', data)
res = port_req.get_response(self.api)
self.assertEqual(exc.HTTPBadRequest.code,
res.status_int)
def test_create_port_with_switching_profiles(self):
"""Tests that nsx ports get the configures switching profiles"""
self.plugin = directory.get_plugin()