From 4d66005043550e8dceb5efeb97b303804c8b3f7c Mon Sep 17 00:00:00 2001 From: Yong Sheng Gong Date: Sun, 7 Jul 2013 21:20:36 +0800 Subject: [PATCH] validate and recommend the cidr Bug #1195974 It is hard to validate the CIDR typed in by user, the simple way is to recognize only one and recommend it if user's input is not the one. Change-Id: Ic8defe30a43a5ae69c3f737094f866b36bb68f59 --- neutron/api/v2/attributes.py | 17 ++++++---- neutron/tests/unit/test_attributes.py | 25 +++++++++++--- neutron/tests/unit/test_db_plugin.py | 35 ++++++++------------ neutron/tests/unit/test_iptables_firewall.py | 2 +- neutron/tests/unit/test_l3_plugin.py | 4 +-- 5 files changed, 47 insertions(+), 36 deletions(-) diff --git a/neutron/api/v2/attributes.py b/neutron/api/v2/attributes.py index 3b988699a5..68f2fa5b77 100644 --- a/neutron/api/v2/attributes.py +++ b/neutron/api/v2/attributes.py @@ -244,15 +244,20 @@ def _validate_ip_address_or_none(data, valid_values=None): def _validate_subnet(data, valid_values=None): + msg = None try: - netaddr.IPNetwork(_validate_no_whitespace(data)) - if len(data.split('/')) == 2: + net = netaddr.IPNetwork(_validate_no_whitespace(data)) + cidr = str(net.cidr) + if (cidr != data): + msg = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": data, + "cidr": cidr} + else: return except Exception: - pass - - msg = _("'%s' is not a valid IP subnet") % data - LOG.debug(msg) + msg = _("'%s' is not a valid IP subnet") % data + if msg: + LOG.debug(msg) return msg diff --git a/neutron/tests/unit/test_attributes.py b/neutron/tests/unit/test_attributes.py index d02fa61d39..49b57e8198 100644 --- a/neutron/tests/unit/test_attributes.py +++ b/neutron/tests/unit/test_attributes.py @@ -266,7 +266,7 @@ class TestAttributes(base.BaseTestCase): [{'nexthop': '10.0.2.20', 'destination': '100.0.0.0/8'}, {'nexthop': '10.0.2.20', - 'destination': '100.0.0.1/8'}]] + 'destination': '101.0.0.0/8'}]] for host_routes in hostroute_pools: msg = attributes._validate_hostroutes(host_routes, None) self.assertIsNone(msg) @@ -371,7 +371,7 @@ class TestAttributes(base.BaseTestCase): self.assertIsNone(msg) # Valid - IPv6 with final octets - cidr = "fe80::0/24" + cidr = "fe80::/24" msg = attributes._validate_subnet(cidr, None) self.assertIsNone(msg) @@ -380,21 +380,36 @@ class TestAttributes(base.BaseTestCase): cidr = "10.0.2.0" msg = attributes._validate_subnet(cidr, None) - error = "'%s' is not a valid IP subnet" % cidr + error = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": cidr, + "cidr": "10.0.2.0/32"} + self.assertEqual(msg, error) + + # Invalid - IPv4 with final octets + cidr = "192.168.1.1/24" + msg = attributes._validate_subnet(cidr, + None) + error = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": cidr, + "cidr": "192.168.1.0/24"} self.assertEqual(msg, error) # Invalid - IPv6 without final octets, missing mask cidr = "fe80::" msg = attributes._validate_subnet(cidr, None) - error = "'%s' is not a valid IP subnet" % cidr + error = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": cidr, + "cidr": "fe80::/128"} self.assertEqual(msg, error) # Invalid - IPv6 with final octets, missing mask cidr = "fe80::0" msg = attributes._validate_subnet(cidr, None) - error = "'%s' is not a valid IP subnet" % cidr + error = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": cidr, + "cidr": "fe80::/128"} self.assertEqual(msg, error) # Invalid - Address format error diff --git a/neutron/tests/unit/test_db_plugin.py b/neutron/tests/unit/test_db_plugin.py index e8ad031ce3..4df9200d71 100644 --- a/neutron/tests/unit/test_db_plugin.py +++ b/neutron/tests/unit/test_db_plugin.py @@ -1347,7 +1347,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s res = self._create_subnet(self.fmt, tenant_id=tenant_id, net_id=net_id, - cidr='2607:f0d0:1002:51::0/124', + cidr='2607:f0d0:1002:51::/124', ip_version=6, gateway_ip=ATTR_NOT_SPECIFIED) subnet2 = self.deserialize(self.fmt, res) @@ -1543,26 +1543,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s res = self._create_port(self.fmt, net_id=net_id, **kwargs) self.assertEqual(res.status_int, 400) - def test_fixed_ip_valid_ip_non_root_cidr(self): - with self.subnet(cidr='10.0.0.254/24') as subnet: - # Allocate specific IP - kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'], - 'ip_address': '10.0.0.2'}]} - net_id = subnet['subnet']['network_id'] - res = self._create_port(self.fmt, net_id=net_id, **kwargs) - self.assertEqual(res.status_int, 201) - port = self.deserialize(self.fmt, res) - self._delete('ports', port['port']['id']) - - def test_fixed_ip_invalid_ip_non_root_cidr(self): - with self.subnet(cidr='10.0.0.254/24') as subnet: - # Allocate specific IP - kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'], - 'ip_address': '10.0.1.2'}]} - net_id = subnet['subnet']['network_id'] - res = self._create_port(self.fmt, net_id=net_id, **kwargs) - self.assertEqual(res.status_int, 400) - def test_requested_ips_only(self): with self.subnet() as subnet: with self.port(subnet=subnet) as port: @@ -2555,6 +2535,17 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): res = subnet_req.get_response(self.api) self.assertEqual(res.status_int, 403) + def test_create_subnet_bad_cidr(self): + with self.network() as network: + data = {'subnet': {'network_id': network['network']['id'], + 'cidr': '10.0.2.5/24', + 'ip_version': 4, + 'tenant_id': network['network']['tenant_id']}} + + subnet_req = self.new_create_request('subnets', data) + res = subnet_req.get_response(self.api) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) + def test_create_subnet_bad_ip_version(self): with self.network() as network: # Check bad IP version @@ -2800,7 +2791,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): def test_create_subnet_with_v6_allocation_pool(self): gateway_ip = 'fe80::1' - cidr = 'fe80::0/80' + cidr = 'fe80::/80' allocation_pools = [{'start': 'fe80::2', 'end': 'fe80::ffff:fffa:ffff'}] self._test_create_subnet(gateway_ip=gateway_ip, diff --git a/neutron/tests/unit/test_iptables_firewall.py b/neutron/tests/unit/test_iptables_firewall.py index 508af20691..12132a27e3 100644 --- a/neutron/tests/unit/test_iptables_firewall.py +++ b/neutron/tests/unit/test_iptables_firewall.py @@ -27,7 +27,7 @@ from neutron.tests.unit import test_api_v2 _uuid = test_api_v2._uuid FAKE_PREFIX = {'IPv4': '10.0.0.0/24', - 'IPv6': 'fe80::0/48'} + 'IPv6': 'fe80::/48'} FAKE_IP = {'IPv4': '10.0.0.1', 'IPv6': 'fe80::1'} diff --git a/neutron/tests/unit/test_l3_plugin.py b/neutron/tests/unit/test_l3_plugin.py index dbd48dc39f..105b9f2f49 100644 --- a/neutron/tests/unit/test_l3_plugin.py +++ b/neutron/tests/unit/test_l3_plugin.py @@ -1149,7 +1149,7 @@ class L3NatDBTestCase(L3NatTestCaseBase): expected_code=exc.HTTPNotFound.code) def _test_floatingip_with_assoc_fails(self, plugin_class): - with self.subnet(cidr='200.0.0.1/24') as public_sub: + with self.subnet(cidr='200.0.0.0/24') as public_sub: self._set_net_external(public_sub['subnet']['network_id']) with self.port() as private_port: with self.router() as r: @@ -1187,7 +1187,7 @@ class L3NatDBTestCase(L3NatTestCaseBase): 'neutron.db.l3_db.L3_NAT_db_mixin') def _test_floatingip_with_ip_generation_failure(self, plugin_class): - with self.subnet(cidr='200.0.0.1/24') as public_sub: + with self.subnet(cidr='200.0.0.0/24') as public_sub: self._set_net_external(public_sub['subnet']['network_id']) with self.port() as private_port: with self.router() as r: