Merge "Relax IPAM validation for port IPs"

This commit is contained in:
Zuul 2021-05-10 18:12:00 +00:00 committed by Gerrit Code Review
commit 8b6611d4ca
3 changed files with 156 additions and 53 deletions

View File

@ -627,30 +627,88 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
if not validators.is_attr_set(fixed_ip_list):
return
msg = _('Exceeded maximum amount of fixed ips per port and ip version')
if len(fixed_ip_list) > 2:
raise n_exc.InvalidInput(error_message=msg)
if len(fixed_ip_list) < 2:
# Do not perform additional checks in the trivial case where there is a
# single fixed IP
if len(fixed_ip_list) <= 1:
return
def get_fixed_ip_version(i):
if 'ip_address' in fixed_ip_list[i]:
return netaddr.IPAddress(
fixed_ip_list[i]['ip_address']).version
if 'subnet_id' in fixed_ip_list[i]:
subnet = self.get_subnet(context.elevated(),
fixed_ip_list[i]['subnet_id'])
return subnet['ip_version']
return None
# Validation needed
err_msg = _("Exceeded maximum amount of fixed IPs "
"per port and IP version")
ip1_err_msg = ("Subnet %s is IPv6 or has DHCP enabled. IP %s "
"already allocated for this port. Cannot allocate %s")
ip2_err_msg = ("Subnet %s is IPv6 or has DHCP enabled. Can only "
"allocate a single IP per subnet. Requested %d")
ipver1 = get_fixed_ip_version(0)
ipver2 = get_fixed_ip_version(1)
if ipver1 and ipver2 and ipver1 != ipver2:
# Build a map of fixed IPs per subnet id
subnet_fixed_ip_dict = {}
fixed_ips_no_subnet = []
for item in fixed_ip_list:
if item.get('subnet_id'):
subnet_id = item['subnet_id']
subnet_fixed_ips = subnet_fixed_ip_dict.setdefault(
subnet_id, [])
subnet_fixed_ips.append(item.get('ip_address', 'UNSPECIFIED'))
else:
# Log something so we know we know this entry will be
# processed differently
LOG.debug("Fixed IP entry %s has no subnet ID", item)
if item.get('ip_address'):
fixed_ips_no_subnet.append(item['ip_address'])
if len(subnet_fixed_ip_dict) > 2:
raise n_exc.InvalidInput(error_message=err_msg)
# This works as a cache, fetch from DB only the subnets we need
# and reuse them -> optimize DB access
subnet_map = {}
def infer_fixed_ip_version(subnet_id, fixed_ips):
# Infer from IPs
for ip_address in fixed_ips:
if ip_address != 'UNSPECIFIED':
return (subnet_id,
netaddr.IPAddress(ip_address).version)
# Infer from subnet
subnet = subnet_map.setdefault(
subnet_id, self.get_subnet(context.elevated(), subnet_id))
return (subnet_id, subnet['ip_version'])
# If we are here we have at most 2 subnets, and must have different IP
# version
ip_versions = [infer_fixed_ip_version(subnet_id, fixed_ips) for
(subnet_id, fixed_ips) in subnet_fixed_ip_dict.items()]
for item in zip(ip_versions, ip_versions):
if item[0][0] != item[1][0] and item[0][1] == item[1][1]:
# Not good! Different subnet but same IP version
# One fixed IP is allowed for each IP version
return
raise n_exc.InvalidInput(error_message=err_msg)
raise n_exc.InvalidInput(error_message=msg)
def validate_subnet_dhcp_and_ipv6_state(subnet_id, fixed_ips):
subnet = subnet_map.setdefault(
subnet_id, self.get_subnet(context.elevated(), subnet_id))
if ((subnet['enable_dhcp'] or subnet.get('ip_version') == 6) and
len(fixed_ips) > 1):
raise n_exc.InvalidInput(
error_message=ip2_err_msg %
(subnet_id, len(fixed_ips)))
if ((subnet['enable_dhcp'] or subnet.get('ip_version') == 6) and
fixed_ips):
cidr = netaddr.IPNetwork(subnet['cidr'])
for ip in fixed_ips_no_subnet:
# netaddr converts string IP into IPAddress
if ip in cidr:
raise n_exc.InvalidInput(
error_message=ip1_err_msg %
(subnet_id, ",".join(fixed_ips), ip))
# Validate if any "unbound" IP would fall on a subnet which has DHCP
# enabled and other IPs. Also ensure no more than one IP for any
# DHCP subnet is requested. fixed_ip attribute does not have patch
# behaviour, so all requested IP allocations must be included in it
for subnet_id, fixed_ips in subnet_fixed_ip_dict.items():
validate_subnet_dhcp_and_ipv6_state(subnet_id, fixed_ips)
def _get_subnets_for_fixed_ips_on_port(self, context, port_data):
# get the subnet id from the fixed ips of the port
@ -1696,10 +1754,11 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
LOG.error("Unable to delete DHCP server mapping for "
"network %s", network_id)
def _filter_ipv4_dhcp_fixed_ips(self, context, fixed_ips):
def _filter_dhcp_fixed_ips(self, context, fixed_ips, version=None):
ips = []
for fixed_ip in fixed_ips:
if netaddr.IPNetwork(fixed_ip['ip_address']).version != 4:
if (version and
netaddr.IPNetwork(fixed_ip['ip_address']).version != version):
continue
with db_api.CONTEXT_READER.using(context):
subnet = self.get_subnet(context, fixed_ip['subnet_id'])
@ -1707,6 +1766,9 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
ips.append(fixed_ip)
return ips
def _filter_ipv4_dhcp_fixed_ips(self, context, fixed_ips):
return self._filter_dhcp_fixed_ips(context, fixed_ips, 4)
def _add_port_mp_dhcp_binding(self, context, port):
if not utils.is_port_dhcp_configurable(port):
return

View File

@ -17,9 +17,12 @@ import contextlib
import decorator
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
import netaddr
from neutron_lib import constants
from webob import exc
class FixExternalNetBaseTest(object):
@ -400,8 +403,75 @@ class NsxV3TestPorts(test_plugin.TestPortsV2):
def test_create_port_anticipating_allocation(self):
self.skipTest('Multiple fixed ips on a port are not supported')
def test_create_port_additional_ip(self):
"""Test that creation of port with additional IP fails."""
with self.subnet() as subnet:
data = {'port': {'network_id': subnet['subnet']['network_id'],
'tenant_id': subnet['subnet']['tenant_id'],
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
port_req = self.new_create_request('ports', data)
res = port_req.get_response(self.api)
self.assertEqual(exc.HTTPBadRequest.code,
res.status_int)
def test_create_port_additional_ip_no_dhcp(self):
"""Creation of port with additional IP succeeds with DHCP off."""
with self.subnet(enable_dhcp=False) as subnet:
data = {'port': {'network_id': subnet['subnet']['network_id'],
'tenant_id': subnet['subnet']['tenant_id'],
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
port_req = self.new_create_request('ports', data)
res = port_req.get_response(self.api)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertNotEqual(ips[0]['ip_address'],
ips[1]['ip_address'])
network_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
self.assertIn(ips[0]['ip_address'], network_ip_net)
self.assertIn(ips[1]['ip_address'], network_ip_net)
def test_update_port_add_additional_ip(self):
self.skipTest('Multiple fixed ips on a port are not supported')
"""Test update of port with additional IP fails."""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(exc.HTTPBadRequest.code,
res.status_int)
def test_update_port_add_additional_ip_no_dhcp(self):
"""Test update of port with additional IP succeeds if DHCP is off."""
with self.subnet(enable_dhcp=False) as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertNotEqual(ips[0]['ip_address'],
ips[1]['ip_address'])
network_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
self.assertIn(ips[0]['ip_address'], network_ip_net)
self.assertIn(ips[1]['ip_address'], network_ip_net)
def test_delete_network_port_exists_owned_by_network_race(self):
self.skipTest('Skip need to address in future')

View File

@ -1141,7 +1141,7 @@ class TestPortsV2(common_v3.NsxV3SubnetMixin,
'fixed_ips': []}
}
port = self.plugin.create_port(self.ctx, data)
data['port']['fixed_ips'] = '10.0.0.1'
data['port']['fixed_ips'] = [{'ip_address': '10.0.0.1'}]
self.assertRaises(
n_exc.InvalidInput,
self.plugin.update_port, self.ctx, port['id'], data)
@ -1589,35 +1589,6 @@ class TestPortsV2(common_v3.NsxV3SubnetMixin,
created_port = self.plugin.create_port(self.ctx, data)
self.assertEqual('fake_device', created_port['device_id'])
def test_update_port_add_additional_ip(self):
"""Test update of port with additional IP fails."""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(exc.HTTPBadRequest.code,
res.status_int)
def test_create_port_additional_ip(self):
"""Test that creation of port with additional IP fails."""
with self.subnet() as subnet:
data = {'port': {'network_id': subnet['subnet']['network_id'],
'tenant_id': subnet['subnet']['tenant_id'],
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
port_req = self.new_create_request('ports', data)
res = port_req.get_response(self.api)
self.assertEqual(exc.HTTPBadRequest.code,
res.status_int)
def test_create_port_with_switching_profiles(self):
"""Tests that nsx ports get the configures switching profiles"""
self.plugin = directory.get_plugin()