Merge "validate and recommend the cidr"

This commit is contained in:
Jenkins 2013-07-09 13:01:38 +00:00 committed by Gerrit Code Review
commit 626c645fba
5 changed files with 47 additions and 36 deletions

View File

@ -244,14 +244,19 @@ def _validate_ip_address_or_none(data, valid_values=None):
def _validate_subnet(data, valid_values=None): def _validate_subnet(data, valid_values=None):
msg = None
try: try:
netaddr.IPNetwork(_validate_no_whitespace(data)) net = netaddr.IPNetwork(_validate_no_whitespace(data))
if len(data.split('/')) == 2: cidr = str(net.cidr)
if (cidr != data):
msg = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": data,
"cidr": cidr}
else:
return return
except Exception: except Exception:
pass
msg = _("'%s' is not a valid IP subnet") % data msg = _("'%s' is not a valid IP subnet") % data
if msg:
LOG.debug(msg) LOG.debug(msg)
return msg return msg

View File

@ -266,7 +266,7 @@ class TestAttributes(base.BaseTestCase):
[{'nexthop': '10.0.2.20', [{'nexthop': '10.0.2.20',
'destination': '100.0.0.0/8'}, 'destination': '100.0.0.0/8'},
{'nexthop': '10.0.2.20', {'nexthop': '10.0.2.20',
'destination': '100.0.0.1/8'}]] 'destination': '101.0.0.0/8'}]]
for host_routes in hostroute_pools: for host_routes in hostroute_pools:
msg = attributes._validate_hostroutes(host_routes, None) msg = attributes._validate_hostroutes(host_routes, None)
self.assertIsNone(msg) self.assertIsNone(msg)
@ -371,7 +371,7 @@ class TestAttributes(base.BaseTestCase):
self.assertIsNone(msg) self.assertIsNone(msg)
# Valid - IPv6 with final octets # Valid - IPv6 with final octets
cidr = "fe80::0/24" cidr = "fe80::/24"
msg = attributes._validate_subnet(cidr, msg = attributes._validate_subnet(cidr,
None) None)
self.assertIsNone(msg) self.assertIsNone(msg)
@ -380,21 +380,36 @@ class TestAttributes(base.BaseTestCase):
cidr = "10.0.2.0" cidr = "10.0.2.0"
msg = attributes._validate_subnet(cidr, msg = attributes._validate_subnet(cidr,
None) None)
error = "'%s' is not a valid IP subnet" % cidr error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "10.0.2.0/32"}
self.assertEqual(msg, error)
# Invalid - IPv4 with final octets
cidr = "192.168.1.1/24"
msg = attributes._validate_subnet(cidr,
None)
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "192.168.1.0/24"}
self.assertEqual(msg, error) self.assertEqual(msg, error)
# Invalid - IPv6 without final octets, missing mask # Invalid - IPv6 without final octets, missing mask
cidr = "fe80::" cidr = "fe80::"
msg = attributes._validate_subnet(cidr, msg = attributes._validate_subnet(cidr,
None) None)
error = "'%s' is not a valid IP subnet" % cidr error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "fe80::/128"}
self.assertEqual(msg, error) self.assertEqual(msg, error)
# Invalid - IPv6 with final octets, missing mask # Invalid - IPv6 with final octets, missing mask
cidr = "fe80::0" cidr = "fe80::0"
msg = attributes._validate_subnet(cidr, msg = attributes._validate_subnet(cidr,
None) None)
error = "'%s' is not a valid IP subnet" % cidr error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "fe80::/128"}
self.assertEqual(msg, error) self.assertEqual(msg, error)
# Invalid - Address format error # Invalid - Address format error

View File

@ -1347,7 +1347,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
res = self._create_subnet(self.fmt, res = self._create_subnet(self.fmt,
tenant_id=tenant_id, tenant_id=tenant_id,
net_id=net_id, net_id=net_id,
cidr='2607:f0d0:1002:51::0/124', cidr='2607:f0d0:1002:51::/124',
ip_version=6, ip_version=6,
gateway_ip=ATTR_NOT_SPECIFIED) gateway_ip=ATTR_NOT_SPECIFIED)
subnet2 = self.deserialize(self.fmt, res) subnet2 = self.deserialize(self.fmt, res)
@ -1543,26 +1543,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
res = self._create_port(self.fmt, net_id=net_id, **kwargs) res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, 400)
def test_fixed_ip_valid_ip_non_root_cidr(self):
with self.subnet(cidr='10.0.0.254/24') as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 201)
port = self.deserialize(self.fmt, res)
self._delete('ports', port['port']['id'])
def test_fixed_ip_invalid_ip_non_root_cidr(self):
with self.subnet(cidr='10.0.0.254/24') as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.1.2'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 400)
def test_requested_ips_only(self): def test_requested_ips_only(self):
with self.subnet() as subnet: with self.subnet() as subnet:
with self.port(subnet=subnet) as port: with self.port(subnet=subnet) as port:
@ -2555,6 +2535,17 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
res = subnet_req.get_response(self.api) res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, 403) self.assertEqual(res.status_int, 403)
def test_create_subnet_bad_cidr(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.5/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_subnet_bad_ip_version(self): def test_create_subnet_bad_ip_version(self):
with self.network() as network: with self.network() as network:
# Check bad IP version # Check bad IP version
@ -2800,7 +2791,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
def test_create_subnet_with_v6_allocation_pool(self): def test_create_subnet_with_v6_allocation_pool(self):
gateway_ip = 'fe80::1' gateway_ip = 'fe80::1'
cidr = 'fe80::0/80' cidr = 'fe80::/80'
allocation_pools = [{'start': 'fe80::2', allocation_pools = [{'start': 'fe80::2',
'end': 'fe80::ffff:fffa:ffff'}] 'end': 'fe80::ffff:fffa:ffff'}]
self._test_create_subnet(gateway_ip=gateway_ip, self._test_create_subnet(gateway_ip=gateway_ip,

View File

@ -27,7 +27,7 @@ from neutron.tests.unit import test_api_v2
_uuid = test_api_v2._uuid _uuid = test_api_v2._uuid
FAKE_PREFIX = {'IPv4': '10.0.0.0/24', FAKE_PREFIX = {'IPv4': '10.0.0.0/24',
'IPv6': 'fe80::0/48'} 'IPv6': 'fe80::/48'}
FAKE_IP = {'IPv4': '10.0.0.1', FAKE_IP = {'IPv4': '10.0.0.1',
'IPv6': 'fe80::1'} 'IPv6': 'fe80::1'}

View File

@ -1149,7 +1149,7 @@ class L3NatDBTestCase(L3NatTestCaseBase):
expected_code=exc.HTTPNotFound.code) expected_code=exc.HTTPNotFound.code)
def _test_floatingip_with_assoc_fails(self, plugin_class): def _test_floatingip_with_assoc_fails(self, plugin_class):
with self.subnet(cidr='200.0.0.1/24') as public_sub: with self.subnet(cidr='200.0.0.0/24') as public_sub:
self._set_net_external(public_sub['subnet']['network_id']) self._set_net_external(public_sub['subnet']['network_id'])
with self.port() as private_port: with self.port() as private_port:
with self.router() as r: with self.router() as r:
@ -1187,7 +1187,7 @@ class L3NatDBTestCase(L3NatTestCaseBase):
'neutron.db.l3_db.L3_NAT_db_mixin') 'neutron.db.l3_db.L3_NAT_db_mixin')
def _test_floatingip_with_ip_generation_failure(self, plugin_class): def _test_floatingip_with_ip_generation_failure(self, plugin_class):
with self.subnet(cidr='200.0.0.1/24') as public_sub: with self.subnet(cidr='200.0.0.0/24') as public_sub:
self._set_net_external(public_sub['subnet']['network_id']) self._set_net_external(public_sub['subnet']['network_id'])
with self.port() as private_port: with self.port() as private_port:
with self.router() as r: with self.router() as r: