validate and recommend the cidr

Bug #1195974

It is hard to validate the CIDR typed in by user,
the simple way is to recognize only one and recommend it
if user's input is not the one.

Change-Id: Ic8defe30a43a5ae69c3f737094f866b36bb68f59
This commit is contained in:
Yong Sheng Gong 2013-07-07 21:20:36 +08:00
parent 370d97fedd
commit 4d66005043
5 changed files with 47 additions and 36 deletions

View File

@ -244,15 +244,20 @@ def _validate_ip_address_or_none(data, valid_values=None):
def _validate_subnet(data, valid_values=None): def _validate_subnet(data, valid_values=None):
msg = None
try: try:
netaddr.IPNetwork(_validate_no_whitespace(data)) net = netaddr.IPNetwork(_validate_no_whitespace(data))
if len(data.split('/')) == 2: cidr = str(net.cidr)
if (cidr != data):
msg = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": data,
"cidr": cidr}
else:
return return
except Exception: except Exception:
pass msg = _("'%s' is not a valid IP subnet") % data
if msg:
msg = _("'%s' is not a valid IP subnet") % data LOG.debug(msg)
LOG.debug(msg)
return msg return msg

View File

@ -266,7 +266,7 @@ class TestAttributes(base.BaseTestCase):
[{'nexthop': '10.0.2.20', [{'nexthop': '10.0.2.20',
'destination': '100.0.0.0/8'}, 'destination': '100.0.0.0/8'},
{'nexthop': '10.0.2.20', {'nexthop': '10.0.2.20',
'destination': '100.0.0.1/8'}]] 'destination': '101.0.0.0/8'}]]
for host_routes in hostroute_pools: for host_routes in hostroute_pools:
msg = attributes._validate_hostroutes(host_routes, None) msg = attributes._validate_hostroutes(host_routes, None)
self.assertIsNone(msg) self.assertIsNone(msg)
@ -371,7 +371,7 @@ class TestAttributes(base.BaseTestCase):
self.assertIsNone(msg) self.assertIsNone(msg)
# Valid - IPv6 with final octets # Valid - IPv6 with final octets
cidr = "fe80::0/24" cidr = "fe80::/24"
msg = attributes._validate_subnet(cidr, msg = attributes._validate_subnet(cidr,
None) None)
self.assertIsNone(msg) self.assertIsNone(msg)
@ -380,21 +380,36 @@ class TestAttributes(base.BaseTestCase):
cidr = "10.0.2.0" cidr = "10.0.2.0"
msg = attributes._validate_subnet(cidr, msg = attributes._validate_subnet(cidr,
None) None)
error = "'%s' is not a valid IP subnet" % cidr error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "10.0.2.0/32"}
self.assertEqual(msg, error)
# Invalid - IPv4 with final octets
cidr = "192.168.1.1/24"
msg = attributes._validate_subnet(cidr,
None)
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "192.168.1.0/24"}
self.assertEqual(msg, error) self.assertEqual(msg, error)
# Invalid - IPv6 without final octets, missing mask # Invalid - IPv6 without final octets, missing mask
cidr = "fe80::" cidr = "fe80::"
msg = attributes._validate_subnet(cidr, msg = attributes._validate_subnet(cidr,
None) None)
error = "'%s' is not a valid IP subnet" % cidr error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "fe80::/128"}
self.assertEqual(msg, error) self.assertEqual(msg, error)
# Invalid - IPv6 with final octets, missing mask # Invalid - IPv6 with final octets, missing mask
cidr = "fe80::0" cidr = "fe80::0"
msg = attributes._validate_subnet(cidr, msg = attributes._validate_subnet(cidr,
None) None)
error = "'%s' is not a valid IP subnet" % cidr error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "fe80::/128"}
self.assertEqual(msg, error) self.assertEqual(msg, error)
# Invalid - Address format error # Invalid - Address format error

View File

@ -1347,7 +1347,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
res = self._create_subnet(self.fmt, res = self._create_subnet(self.fmt,
tenant_id=tenant_id, tenant_id=tenant_id,
net_id=net_id, net_id=net_id,
cidr='2607:f0d0:1002:51::0/124', cidr='2607:f0d0:1002:51::/124',
ip_version=6, ip_version=6,
gateway_ip=ATTR_NOT_SPECIFIED) gateway_ip=ATTR_NOT_SPECIFIED)
subnet2 = self.deserialize(self.fmt, res) subnet2 = self.deserialize(self.fmt, res)
@ -1543,26 +1543,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
res = self._create_port(self.fmt, net_id=net_id, **kwargs) res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, 400)
def test_fixed_ip_valid_ip_non_root_cidr(self):
with self.subnet(cidr='10.0.0.254/24') as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 201)
port = self.deserialize(self.fmt, res)
self._delete('ports', port['port']['id'])
def test_fixed_ip_invalid_ip_non_root_cidr(self):
with self.subnet(cidr='10.0.0.254/24') as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.1.2'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 400)
def test_requested_ips_only(self): def test_requested_ips_only(self):
with self.subnet() as subnet: with self.subnet() as subnet:
with self.port(subnet=subnet) as port: with self.port(subnet=subnet) as port:
@ -2555,6 +2535,17 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
res = subnet_req.get_response(self.api) res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, 403) self.assertEqual(res.status_int, 403)
def test_create_subnet_bad_cidr(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.5/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_subnet_bad_ip_version(self): def test_create_subnet_bad_ip_version(self):
with self.network() as network: with self.network() as network:
# Check bad IP version # Check bad IP version
@ -2800,7 +2791,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
def test_create_subnet_with_v6_allocation_pool(self): def test_create_subnet_with_v6_allocation_pool(self):
gateway_ip = 'fe80::1' gateway_ip = 'fe80::1'
cidr = 'fe80::0/80' cidr = 'fe80::/80'
allocation_pools = [{'start': 'fe80::2', allocation_pools = [{'start': 'fe80::2',
'end': 'fe80::ffff:fffa:ffff'}] 'end': 'fe80::ffff:fffa:ffff'}]
self._test_create_subnet(gateway_ip=gateway_ip, self._test_create_subnet(gateway_ip=gateway_ip,

View File

@ -27,7 +27,7 @@ from neutron.tests.unit import test_api_v2
_uuid = test_api_v2._uuid _uuid = test_api_v2._uuid
FAKE_PREFIX = {'IPv4': '10.0.0.0/24', FAKE_PREFIX = {'IPv4': '10.0.0.0/24',
'IPv6': 'fe80::0/48'} 'IPv6': 'fe80::/48'}
FAKE_IP = {'IPv4': '10.0.0.1', FAKE_IP = {'IPv4': '10.0.0.1',
'IPv6': 'fe80::1'} 'IPv6': 'fe80::1'}

View File

@ -1149,7 +1149,7 @@ class L3NatDBTestCase(L3NatTestCaseBase):
expected_code=exc.HTTPNotFound.code) expected_code=exc.HTTPNotFound.code)
def _test_floatingip_with_assoc_fails(self, plugin_class): def _test_floatingip_with_assoc_fails(self, plugin_class):
with self.subnet(cidr='200.0.0.1/24') as public_sub: with self.subnet(cidr='200.0.0.0/24') as public_sub:
self._set_net_external(public_sub['subnet']['network_id']) self._set_net_external(public_sub['subnet']['network_id'])
with self.port() as private_port: with self.port() as private_port:
with self.router() as r: with self.router() as r:
@ -1187,7 +1187,7 @@ class L3NatDBTestCase(L3NatTestCaseBase):
'neutron.db.l3_db.L3_NAT_db_mixin') 'neutron.db.l3_db.L3_NAT_db_mixin')
def _test_floatingip_with_ip_generation_failure(self, plugin_class): def _test_floatingip_with_ip_generation_failure(self, plugin_class):
with self.subnet(cidr='200.0.0.1/24') as public_sub: with self.subnet(cidr='200.0.0.0/24') as public_sub:
self._set_net_external(public_sub['subnet']['network_id']) self._set_net_external(public_sub['subnet']['network_id'])
with self.port() as private_port: with self.port() as private_port:
with self.router() as r: with self.router() as r: