Merge "Add IP version check for IP address fields."

This commit is contained in:
Jenkins 2012-09-09 05:08:48 +00:00 committed by Gerrit Code Review
commit a2452fc87f
3 changed files with 171 additions and 15 deletions

View File

@ -294,8 +294,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
pool_id = allocation_pool['id']
break
if not pool_id:
error_message = ("No allocation pool found for "
"ip address:%s" % ip_address)
error_message = _("No allocation pool found for "
"ip address:%s" % ip_address)
raise q_exc.InvalidInput(error_message=error_message)
# Two requests will be done on the database. The first will be to
# search if an entry starts with ip_address + 1 (r1). The second
@ -709,7 +709,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
pool_2=r_range,
subnet_cidr=subnet_cidr)
def _validate_host_route(self, route):
def _validate_host_route(self, route, ip_version):
try:
netaddr.IPNetwork(route['destination'])
netaddr.IPAddress(route['nexthop'])
@ -718,8 +718,11 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
raise q_exc.InvalidInput(error_message=err_msg)
except ValueError:
# netaddr.IPAddress would raise this
err_msg = ("invalid route: %s" % (str(route)))
err_msg = _("invalid route: %s") % str(route)
raise q_exc.InvalidInput(error_message=err_msg)
self._validate_ip_version(ip_version, route['nexthop'], 'nexthop')
self._validate_ip_version(ip_version, route['destination'],
'destination')
def _allocate_pools_for_subnet(self, context, subnet):
"""Create IP allocation pools for a given subnet
@ -912,9 +915,30 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
def create_subnet_bulk(self, context, subnets):
return self._create_bulk('subnet', context, subnets)
def _validate_ip_version(self, ip_version, addr, name):
"""Check IP field of a subnet match specified ip version"""
ip = netaddr.IPNetwork(addr)
if ip.version != ip_version:
msg = _("%(name)s '%(addr)s' does not match "
"the ip_version '%(ip_version)s'") % locals()
raise q_exc.InvalidInput(error_message=msg)
def _validate_subnet(self, s):
"""a subroutine to validate a subnet spec"""
# check if the number of DNS nameserver exceeds the quota
"""Validate a subnet spec"""
# The method requires the subnet spec 's' has 'ip_version' field.
# If 's' dict does not have 'ip_version' field in an API call
# (e.g., update_subnet()), you need to set 'ip_version' field
# before calling this method.
ip_ver = s['ip_version']
if 'cidr' in s:
self._validate_ip_version(ip_ver, s['cidr'], 'cidr')
if ('gateway_ip' in s and
s['gateway_ip'] and
s['gateway_ip'] != attributes.ATTR_NOT_SPECIFIED):
self._validate_ip_version(ip_ver, s['gateway_ip'], 'gateway_ip')
if 'dns_nameservers' in s and \
s['dns_nameservers'] != attributes.ATTR_NOT_SPECIFIED:
if len(s['dns_nameservers']) > cfg.CONF.max_dns_nameservers:
@ -927,8 +951,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
except Exception:
raise q_exc.InvalidInput(
error_message=("error parsing dns address %s" % dns))
self._validate_ip_version(ip_ver, dns, 'dns_nameserver')
# check if the number of host routes exceeds the quota
if 'host_routes' in s and \
s['host_routes'] != attributes.ATTR_NOT_SPECIFIED:
if len(s['host_routes']) > cfg.CONF.max_subnet_host_routes:
@ -937,7 +961,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
quota=cfg.CONF.max_subnet_host_routes)
# check if the routes are all valid
for rt in s['host_routes']:
self._validate_host_route(rt)
self._validate_host_route(rt, ip_ver)
def create_subnet(self, context, subnet):
s = subnet['subnet']
@ -998,6 +1022,9 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
gratuitous DHCP offers"""
s = subnet['subnet']
# Fill 'ip_version' field with the current value since
# _validate_subnet() expects subnet spec has 'ip_version' field.
s['ip_version'] = self._get_subnet(context, id).ip_version
self._validate_subnet(s)
with context.session.begin(subtransactions=True):

View File

@ -253,11 +253,8 @@ class MetaQuantumPluginV2Test(unittest.TestCase):
subnet_in_db2 = self.plugin.get_subnet(self.context, subnet2_ret['id'])
subnet_in_db3 = self.plugin.get_subnet(self.context, subnet3_ret['id'])
subnet1['subnet']['ip_version'] = 6
subnet1['subnet']['allocation_pools'].pop()
subnet2['subnet']['ip_version'] = 6
subnet2['subnet']['allocation_pools'].pop()
subnet3['subnet']['ip_version'] = 6
subnet3['subnet']['allocation_pools'].pop()
self.plugin.update_subnet(self.context,
@ -270,9 +267,9 @@ class MetaQuantumPluginV2Test(unittest.TestCase):
subnet_in_db2 = self.plugin.get_subnet(self.context, subnet2_ret['id'])
subnet_in_db3 = self.plugin.get_subnet(self.context, subnet3_ret['id'])
self.assertEqual(6, subnet_in_db1['ip_version'])
self.assertEqual(6, subnet_in_db2['ip_version'])
self.assertEqual(6, subnet_in_db3['ip_version'])
self.assertEqual(4, subnet_in_db1['ip_version'])
self.assertEqual(4, subnet_in_db2['ip_version'])
self.assertEqual(4, subnet_in_db3['ip_version'])
self.plugin.delete_subnet(self.context, subnet1_ret['id'])
self.plugin.delete_subnet(self.context, subnet2_ret['id'])

View File

@ -1950,7 +1950,7 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
allocation_pools = [{'start': 'fe80::2',
'end': 'fe80::ffff:fffa:ffff'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
cidr=cidr, ip_version=6,
allocation_pools=allocation_pools)
def test_create_subnet_with_large_allocation_pool(self):
@ -2032,6 +2032,85 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
shared=True)
self.assertEquals(ctx_manager.exception.code, 422)
def test_create_subnet_inconsistent_ipv6_cidrv4(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 6,
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_inconsistent_ipv4_cidrv6(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': 'fe80::0/80',
'ip_version': 4,
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_inconsistent_ipv4_gatewayv6(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'gateway_ip': 'fe80::1',
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_inconsistent_ipv6_gatewayv4(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': 'fe80::0/80',
'ip_version': 6,
'gateway_ip': '192.168.0.1',
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_inconsistent_ipv6_dns_v4(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': 'fe80::0/80',
'ip_version': 6,
'dns_nameservers': ['192.168.0.1'],
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_inconsistent_ipv4_hostroute_dst_v6(self):
host_routes = [{'destination': 'fe80::0/48',
'nexthop': '10.0.2.20'}]
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'host_routes': host_routes,
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_inconsistent_ipv4_hostroute_np_v6(self):
host_routes = [{'destination': '172.16.0.0/24',
'nexthop': 'fe80::1'}]
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'host_routes': host_routes,
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_update_subnet(self):
with self.subnet() as subnet:
data = {'subnet': {'gateway_ip': '11.0.0.1'}}
@ -2050,6 +2129,59 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
res = req.get_response(self.api)
self.assertEqual(res.status_int, 422)
def test_update_subnet_inconsistent_ipv4_gatewayv6(self):
with self.network() as network:
with self.subnet(network=network) as subnet:
data = {'subnet': {'gateway_ip': 'fe80::1'}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_update_subnet_inconsistent_ipv6_gatewayv4(self):
with self.network() as network:
with self.subnet(network=network,
ip_version=6, cidr='fe80::/48') as subnet:
data = {'subnet': {'gateway_ip': '10.1.1.1'}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_update_subnet_inconsistent_ipv4_dns_v6(self):
dns_nameservers = ['fe80::1']
with self.network() as network:
with self.subnet(network=network) as subnet:
data = {'subnet': {'dns_nameservers': dns_nameservers}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_update_subnet_inconsistent_ipv6_hostroute_dst_v4(self):
host_routes = [{'destination': 'fe80::0/48',
'nexthop': '10.0.2.20'}]
with self.network() as network:
with self.subnet(network=network,
ip_version=6, cidr='fe80::/48') as subnet:
data = {'subnet': {'host_routes': host_routes}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_update_subnet_inconsistent_ipv6_hostroute_np_v4(self):
host_routes = [{'destination': '172.16.0.0/24',
'nexthop': 'fe80::1'}]
with self.network() as network:
with self.subnet(network=network,
ip_version=6, cidr='fe80::/48') as subnet:
data = {'subnet': {'host_routes': host_routes}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_show_subnet(self):
with self.network() as network:
with self.subnet(network=network) as subnet: