validate and recommend the cidr
Bug #1195974 It is hard to validate the CIDR typed in by user, the simple way is to recognize only one and recommend it if user's input is not the one. Change-Id: Ic8defe30a43a5ae69c3f737094f866b36bb68f59
This commit is contained in:
parent
31c7bdac68
commit
53a66b299f
@ -244,15 +244,20 @@ def _validate_ip_address_or_none(data, valid_values=None):
|
|||||||
|
|
||||||
|
|
||||||
def _validate_subnet(data, valid_values=None):
|
def _validate_subnet(data, valid_values=None):
|
||||||
|
msg = None
|
||||||
try:
|
try:
|
||||||
netaddr.IPNetwork(_validate_no_whitespace(data))
|
net = netaddr.IPNetwork(_validate_no_whitespace(data))
|
||||||
if len(data.split('/')) == 2:
|
cidr = str(net.cidr)
|
||||||
|
if (cidr != data):
|
||||||
|
msg = _("'%(data)s' isn't a recognized IP subnet cidr,"
|
||||||
|
" '%(cidr)s' is recommended") % {"data": data,
|
||||||
|
"cidr": cidr}
|
||||||
|
else:
|
||||||
return
|
return
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
msg = _("'%s' is not a valid IP subnet") % data
|
||||||
|
if msg:
|
||||||
msg = _("'%s' is not a valid IP subnet") % data
|
LOG.debug(msg)
|
||||||
LOG.debug(msg)
|
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
|
|
||||||
|
@ -266,7 +266,7 @@ class TestAttributes(base.BaseTestCase):
|
|||||||
[{'nexthop': '10.0.2.20',
|
[{'nexthop': '10.0.2.20',
|
||||||
'destination': '100.0.0.0/8'},
|
'destination': '100.0.0.0/8'},
|
||||||
{'nexthop': '10.0.2.20',
|
{'nexthop': '10.0.2.20',
|
||||||
'destination': '100.0.0.1/8'}]]
|
'destination': '101.0.0.0/8'}]]
|
||||||
for host_routes in hostroute_pools:
|
for host_routes in hostroute_pools:
|
||||||
msg = attributes._validate_hostroutes(host_routes, None)
|
msg = attributes._validate_hostroutes(host_routes, None)
|
||||||
self.assertIsNone(msg)
|
self.assertIsNone(msg)
|
||||||
@ -371,7 +371,7 @@ class TestAttributes(base.BaseTestCase):
|
|||||||
self.assertIsNone(msg)
|
self.assertIsNone(msg)
|
||||||
|
|
||||||
# Valid - IPv6 with final octets
|
# Valid - IPv6 with final octets
|
||||||
cidr = "fe80::0/24"
|
cidr = "fe80::/24"
|
||||||
msg = attributes._validate_subnet(cidr,
|
msg = attributes._validate_subnet(cidr,
|
||||||
None)
|
None)
|
||||||
self.assertIsNone(msg)
|
self.assertIsNone(msg)
|
||||||
@ -380,21 +380,36 @@ class TestAttributes(base.BaseTestCase):
|
|||||||
cidr = "10.0.2.0"
|
cidr = "10.0.2.0"
|
||||||
msg = attributes._validate_subnet(cidr,
|
msg = attributes._validate_subnet(cidr,
|
||||||
None)
|
None)
|
||||||
error = "'%s' is not a valid IP subnet" % cidr
|
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
|
||||||
|
" '%(cidr)s' is recommended") % {"data": cidr,
|
||||||
|
"cidr": "10.0.2.0/32"}
|
||||||
|
self.assertEqual(msg, error)
|
||||||
|
|
||||||
|
# Invalid - IPv4 with final octets
|
||||||
|
cidr = "192.168.1.1/24"
|
||||||
|
msg = attributes._validate_subnet(cidr,
|
||||||
|
None)
|
||||||
|
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
|
||||||
|
" '%(cidr)s' is recommended") % {"data": cidr,
|
||||||
|
"cidr": "192.168.1.0/24"}
|
||||||
self.assertEqual(msg, error)
|
self.assertEqual(msg, error)
|
||||||
|
|
||||||
# Invalid - IPv6 without final octets, missing mask
|
# Invalid - IPv6 without final octets, missing mask
|
||||||
cidr = "fe80::"
|
cidr = "fe80::"
|
||||||
msg = attributes._validate_subnet(cidr,
|
msg = attributes._validate_subnet(cidr,
|
||||||
None)
|
None)
|
||||||
error = "'%s' is not a valid IP subnet" % cidr
|
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
|
||||||
|
" '%(cidr)s' is recommended") % {"data": cidr,
|
||||||
|
"cidr": "fe80::/128"}
|
||||||
self.assertEqual(msg, error)
|
self.assertEqual(msg, error)
|
||||||
|
|
||||||
# Invalid - IPv6 with final octets, missing mask
|
# Invalid - IPv6 with final octets, missing mask
|
||||||
cidr = "fe80::0"
|
cidr = "fe80::0"
|
||||||
msg = attributes._validate_subnet(cidr,
|
msg = attributes._validate_subnet(cidr,
|
||||||
None)
|
None)
|
||||||
error = "'%s' is not a valid IP subnet" % cidr
|
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
|
||||||
|
" '%(cidr)s' is recommended") % {"data": cidr,
|
||||||
|
"cidr": "fe80::/128"}
|
||||||
self.assertEqual(msg, error)
|
self.assertEqual(msg, error)
|
||||||
|
|
||||||
# Invalid - Address format error
|
# Invalid - Address format error
|
||||||
|
@ -1347,7 +1347,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
|
|||||||
res = self._create_subnet(self.fmt,
|
res = self._create_subnet(self.fmt,
|
||||||
tenant_id=tenant_id,
|
tenant_id=tenant_id,
|
||||||
net_id=net_id,
|
net_id=net_id,
|
||||||
cidr='2607:f0d0:1002:51::0/124',
|
cidr='2607:f0d0:1002:51::/124',
|
||||||
ip_version=6,
|
ip_version=6,
|
||||||
gateway_ip=ATTR_NOT_SPECIFIED)
|
gateway_ip=ATTR_NOT_SPECIFIED)
|
||||||
subnet2 = self.deserialize(self.fmt, res)
|
subnet2 = self.deserialize(self.fmt, res)
|
||||||
@ -1543,26 +1543,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
|
|||||||
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
|
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
|
||||||
self.assertEqual(res.status_int, 400)
|
self.assertEqual(res.status_int, 400)
|
||||||
|
|
||||||
def test_fixed_ip_valid_ip_non_root_cidr(self):
|
|
||||||
with self.subnet(cidr='10.0.0.254/24') as subnet:
|
|
||||||
# Allocate specific IP
|
|
||||||
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
|
|
||||||
'ip_address': '10.0.0.2'}]}
|
|
||||||
net_id = subnet['subnet']['network_id']
|
|
||||||
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
|
|
||||||
self.assertEqual(res.status_int, 201)
|
|
||||||
port = self.deserialize(self.fmt, res)
|
|
||||||
self._delete('ports', port['port']['id'])
|
|
||||||
|
|
||||||
def test_fixed_ip_invalid_ip_non_root_cidr(self):
|
|
||||||
with self.subnet(cidr='10.0.0.254/24') as subnet:
|
|
||||||
# Allocate specific IP
|
|
||||||
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
|
|
||||||
'ip_address': '10.0.1.2'}]}
|
|
||||||
net_id = subnet['subnet']['network_id']
|
|
||||||
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
|
|
||||||
self.assertEqual(res.status_int, 400)
|
|
||||||
|
|
||||||
def test_requested_ips_only(self):
|
def test_requested_ips_only(self):
|
||||||
with self.subnet() as subnet:
|
with self.subnet() as subnet:
|
||||||
with self.port(subnet=subnet) as port:
|
with self.port(subnet=subnet) as port:
|
||||||
@ -2555,6 +2535,17 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
|
|||||||
res = subnet_req.get_response(self.api)
|
res = subnet_req.get_response(self.api)
|
||||||
self.assertEqual(res.status_int, 403)
|
self.assertEqual(res.status_int, 403)
|
||||||
|
|
||||||
|
def test_create_subnet_bad_cidr(self):
|
||||||
|
with self.network() as network:
|
||||||
|
data = {'subnet': {'network_id': network['network']['id'],
|
||||||
|
'cidr': '10.0.2.5/24',
|
||||||
|
'ip_version': 4,
|
||||||
|
'tenant_id': network['network']['tenant_id']}}
|
||||||
|
|
||||||
|
subnet_req = self.new_create_request('subnets', data)
|
||||||
|
res = subnet_req.get_response(self.api)
|
||||||
|
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||||
|
|
||||||
def test_create_subnet_bad_ip_version(self):
|
def test_create_subnet_bad_ip_version(self):
|
||||||
with self.network() as network:
|
with self.network() as network:
|
||||||
# Check bad IP version
|
# Check bad IP version
|
||||||
@ -2800,7 +2791,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
|
|||||||
|
|
||||||
def test_create_subnet_with_v6_allocation_pool(self):
|
def test_create_subnet_with_v6_allocation_pool(self):
|
||||||
gateway_ip = 'fe80::1'
|
gateway_ip = 'fe80::1'
|
||||||
cidr = 'fe80::0/80'
|
cidr = 'fe80::/80'
|
||||||
allocation_pools = [{'start': 'fe80::2',
|
allocation_pools = [{'start': 'fe80::2',
|
||||||
'end': 'fe80::ffff:fffa:ffff'}]
|
'end': 'fe80::ffff:fffa:ffff'}]
|
||||||
self._test_create_subnet(gateway_ip=gateway_ip,
|
self._test_create_subnet(gateway_ip=gateway_ip,
|
||||||
|
@ -27,7 +27,7 @@ from neutron.tests.unit import test_api_v2
|
|||||||
|
|
||||||
_uuid = test_api_v2._uuid
|
_uuid = test_api_v2._uuid
|
||||||
FAKE_PREFIX = {'IPv4': '10.0.0.0/24',
|
FAKE_PREFIX = {'IPv4': '10.0.0.0/24',
|
||||||
'IPv6': 'fe80::0/48'}
|
'IPv6': 'fe80::/48'}
|
||||||
FAKE_IP = {'IPv4': '10.0.0.1',
|
FAKE_IP = {'IPv4': '10.0.0.1',
|
||||||
'IPv6': 'fe80::1'}
|
'IPv6': 'fe80::1'}
|
||||||
|
|
||||||
|
@ -1149,7 +1149,7 @@ class L3NatDBTestCase(L3NatTestCaseBase):
|
|||||||
expected_code=exc.HTTPNotFound.code)
|
expected_code=exc.HTTPNotFound.code)
|
||||||
|
|
||||||
def _test_floatingip_with_assoc_fails(self, plugin_class):
|
def _test_floatingip_with_assoc_fails(self, plugin_class):
|
||||||
with self.subnet(cidr='200.0.0.1/24') as public_sub:
|
with self.subnet(cidr='200.0.0.0/24') as public_sub:
|
||||||
self._set_net_external(public_sub['subnet']['network_id'])
|
self._set_net_external(public_sub['subnet']['network_id'])
|
||||||
with self.port() as private_port:
|
with self.port() as private_port:
|
||||||
with self.router() as r:
|
with self.router() as r:
|
||||||
@ -1187,7 +1187,7 @@ class L3NatDBTestCase(L3NatTestCaseBase):
|
|||||||
'neutron.db.l3_db.L3_NAT_db_mixin')
|
'neutron.db.l3_db.L3_NAT_db_mixin')
|
||||||
|
|
||||||
def _test_floatingip_with_ip_generation_failure(self, plugin_class):
|
def _test_floatingip_with_ip_generation_failure(self, plugin_class):
|
||||||
with self.subnet(cidr='200.0.0.1/24') as public_sub:
|
with self.subnet(cidr='200.0.0.0/24') as public_sub:
|
||||||
self._set_net_external(public_sub['subnet']['network_id'])
|
self._set_net_external(public_sub['subnet']['network_id'])
|
||||||
with self.port() as private_port:
|
with self.port() as private_port:
|
||||||
with self.router() as r:
|
with self.router() as r:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user