From cb48cebefc277912c870eaee76fe0466308abed4 Mon Sep 17 00:00:00 2001 From: Tihomir Trifonov Date: Wed, 9 May 2012 16:43:17 +0300 Subject: [PATCH] Implementation of blueprint ip-validation First draft. Added a forms.Field wrapper for IPAddress. Implemented IPv4 and IPv6 checks, subnet mask range, optional mask range limitation. As far as I see now, there is only 1 place in Dashboard to accept IP fields as input - the Security rules. I've tried to input IPv6 rule and it was accepted. The previous version of the code doesn't accept IPv6, only IPv4. I am not sure if IPv6 should be accepted here. It however works. Patch set 3: Now using netaddr library(used also by nova), which provides support for validation of IP addresses. Using this library, now the IPField can support more ways to enter an IP - like short versions: 10/8 - for all 10.xxx.xxx.xxx 192.168/16 - for all 192.168.xxx.xxx Regarding IPy library - it performs some strict subnet validation, which will not accept cidr like this: 192.168.1.1/20 because the only mask that matches this IP is 32. IPy doesn't allow broader masks. But my assumption is that the operators should take the responsibility for the data they enter. At least this CIDR is valid after all. Change-Id: Ie497fe65fde3af25a18109a182ab78255ad7ec60 --- .../security_groups/forms.py | 7 +- horizon/tests/utils_tests.py | 147 ++++++++++++++++-- horizon/utils/fields.py | 84 ++++++++++ horizon/utils/validators.py | 9 -- tools/test-requires | 1 + 5 files changed, 227 insertions(+), 21 deletions(-) create mode 100644 horizon/utils/fields.py diff --git a/horizon/dashboards/nova/access_and_security/security_groups/forms.py b/horizon/dashboards/nova/access_and_security/security_groups/forms.py index f516cf315..f991fc9b6 100644 --- a/horizon/dashboards/nova/access_and_security/security_groups/forms.py +++ b/horizon/dashboards/nova/access_and_security/security_groups/forms.py @@ -30,8 +30,8 @@ from novaclient import exceptions as novaclient_exceptions from horizon import api from horizon import exceptions from horizon import forms -from horizon.utils.validators import validate_ipv4_cidr from horizon.utils.validators import validate_port_range +from horizon.utils import fields LOG = logging.getLogger(__name__) @@ -82,12 +82,13 @@ class AddRule(forms.SelfHandlingForm): validators=[validate_port_range]) source_group = forms.ChoiceField(label=_('Source Group'), required=False) - cidr = forms.CharField(label=_("CIDR"), + cidr = fields.IPField(label=_("CIDR"), required=False, initial="0.0.0.0/0", help_text=_("Classless Inter-Domain Routing " "(e.g. 192.168.0.0/24)"), - validators=[validate_ipv4_cidr]) + version=fields.IPv4 | fields.IPv6, + mask=True) security_group_id = forms.IntegerField(widget=forms.HiddenInput()) diff --git a/horizon/tests/utils_tests.py b/horizon/tests/utils_tests.py index aa09aa256..c7588dbc0 100644 --- a/horizon/tests/utils_tests.py +++ b/horizon/tests/utils_tests.py @@ -16,7 +16,8 @@ from horizon import test -from horizon.utils import validators +from django.core.exceptions import ValidationError +from horizon.utils import fields class ValidatorsTests(test.TestCase): @@ -27,16 +28,144 @@ class ValidatorsTests(test.TestCase): "10.144.11.107/4", "255.255.255.255/0", "0.1.2.3/16", - "0.0.0.0/32") - BAD_CIDRS = ("255.255.255.256", - "256.255.255.255", - "1.2.3.4.5", - "0.0.0.0", + "0.0.0.0/32", + # short form + "128.0/16", + "128/4") + BAD_CIDRS = ("255.255.255.256\\", + "256.255.255.255$", + "1.2.3.4.5/41", + "0.0.0.0/99", "127.0.0.1/", "127.0.0.1/33", "127.0.0.1/-1", - "127.0.0.1/100") + "127.0.0.1/100", + # some valid IPv6 addresses + "fe80::204:61ff:254.157.241.86/4", + "fe80::204:61ff:254.157.241.86/0", + "2001:0DB8::CD30:0:0:0:0/60", + "2001:0DB8::CD30:0/90") + ip = fields.IPField(mask=True, version=fields.IPv4) for cidr in GOOD_CIDRS: - self.assertTrue(validators.ipv4_cidr_re.match(cidr)) + self.assertIsNone(ip.validate(cidr)) for cidr in BAD_CIDRS: - self.assertFalse(validators.ipv4_cidr_re.match(cidr)) + self.assertRaises(ValidationError, ip.validate, cidr) + + def test_validate_ipv6_cidr(self): + GOOD_CIDRS = ("::ffff:0:0/56", + "2001:0db8::1428:57ab/17", + "FEC0::/10", + "fe80::204:61ff:254.157.241.86/4", + "fe80::204:61ff:254.157.241.86/0", + "2001:0DB8::CD30:0:0:0:0/60", + "2001:0DB8::CD30:0/90", + "::1/128") + BAD_CIDRS = ("1111:2222:3333:4444:::/", + "::2222:3333:4444:5555:6666:7777:8888:\\", + ":1111:2222:3333:4444::6666:1.2.3.4/1000", + "1111:2222::4444:5555:6666::8888@", + "1111:2222::4444:5555:6666:8888/", + "::ffff:0:0/129", + "1.2.3.4:1111:2222::5555//22", + "fe80::204:61ff:254.157.241.86/200", + # some valid IPv4 addresses + "10.144.11.107/4", + "255.255.255.255/0", + "0.1.2.3/16") + ip = fields.IPField(mask=True, version=fields.IPv6) + for cidr in GOOD_CIDRS: + self.assertIsNone(ip.validate(cidr)) + for cidr in BAD_CIDRS: + self.assertRaises(ValidationError, ip.validate, cidr) + + def test_validate_mixed_cidr(self): + GOOD_CIDRS = ("::ffff:0:0/56", + "2001:0db8::1428:57ab/17", + "FEC0::/10", + "fe80::204:61ff:254.157.241.86/4", + "fe80::204:61ff:254.157.241.86/0", + "2001:0DB8::CD30:0:0:0:0/60", + "0.0.0.0/16", + "10.144.11.107/4", + "255.255.255.255/0", + "0.1.2.3/16", + # short form + "128.0/16", + "10/4") + BAD_CIDRS = ("1111:2222:3333:4444::://", + "::2222:3333:4444:5555:6666:7777:8888:", + ":1111:2222:3333:4444::6666:1.2.3.4/1/1", + "1111:2222::4444:5555:6666::8888\\2", + "1111:2222::4444:5555:6666:8888/", + "1111:2222::4444:5555:6666::8888/130", + "127.0.0.1/", + "127.0.0.1/33", + "127.0.0.1/-1") + ip = fields.IPField(mask=True, version=fields.IPv4 | fields.IPv6) + for cidr in GOOD_CIDRS: + self.assertIsNone(ip.validate(cidr)) + for cidr in BAD_CIDRS: + self.assertRaises(ValidationError, ip.validate, cidr) + + def test_validate_IPs(self): + GOOD_IPS_V4 = ("0.0.0.0", + "10.144.11.107", + "169.144.11.107", + "172.100.11.107", + "255.255.255.255", + "0.1.2.3") + GOOD_IPS_V6 = ("", + "::ffff:0:0", + "2001:0db8::1428:57ab", + "FEC0::", + "fe80::204:61ff:254.157.241.86", + "fe80::204:61ff:254.157.241.86", + "2001:0DB8::CD30:0:0:0:0") + BAD_IPS_V4 = ("1111:2222:3333:4444:::", + "::2222:3333:4444:5555:6666:7777:8888:", + ":1111:2222:3333:4444::6666:1.2.3.4", + "1111:2222::4444:5555:6666::8888", + "1111:2222::4444:5555:6666:8888/", + "1111:2222::4444:5555:6666::8888/130", + "127.0.0.1/", + "127.0.0.1/33", + "127.0.0.1/-1") + BAD_IPS_V6 = ("1111:2222:3333:4444:::", + "::2222:3333:4444:5555:6666:7777:8888:", + ":1111:2222:3333:4444::6666:1.2.3.4", + "1111:2222::4444:5555:6666::8888", + "1111:2222::4444:5555:6666:8888/", + "1111:2222::4444:5555:6666::8888/130") + ipv4 = fields.IPField(required=True, version=fields.IPv4) + ipv6 = fields.IPField(required=False, version=fields.IPv6) + ipmixed = fields.IPField(required=False, + version=fields.IPv4 | fields.IPv6) + + for ip_addr in GOOD_IPS_V4: + self.assertIsNone(ipv4.validate(ip_addr)) + self.assertIsNone(ipmixed.validate(ip_addr)) + + for ip_addr in GOOD_IPS_V6: + self.assertIsNone(ipv6.validate(ip_addr)) + self.assertIsNone(ipmixed.validate(ip_addr)) + + for ip_addr in BAD_IPS_V4: + self.assertRaises(ValidationError, ipv4.validate, ip_addr) + self.assertRaises(ValidationError, ipmixed.validate, ip_addr) + + for ip_addr in BAD_IPS_V6: + self.assertRaises(ValidationError, ipv6.validate, ip_addr) + self.assertRaises(ValidationError, ipmixed.validate, ip_addr) + + self.assertRaises(ValidationError, ipv4.validate, "") # required=True + + iprange = fields.IPField(required=False, + mask=True, + mask_range_from=10, + version=fields.IPv4 | fields.IPv6) + self.assertRaises(ValidationError, iprange.validate, + "fe80::204:61ff:254.157.241.86/6") + self.assertRaises(ValidationError, iprange.validate, + "169.144.11.107/8") + self.assertIsNone(iprange.validate("fe80::204:61ff:254.157.241.86/36")) + self.assertIsNone(iprange.validate("169.144.11.107/18")) diff --git a/horizon/utils/fields.py b/horizon/utils/fields.py new file mode 100644 index 000000000..7230fb6f8 --- /dev/null +++ b/horizon/utils/fields.py @@ -0,0 +1,84 @@ +import re +import netaddr +from django.core.exceptions import ValidationError +from django.forms import forms +from django.utils.translation import ugettext as _ + +ip_allowed_symbols_re = re.compile(r'^[a-fA-F0-9:/\.]+$') +IPv4 = 1 +IPv6 = 2 + + +class IPField(forms.Field): + """ + Form field for entering IP/range values, with validation. + Supports IPv4/IPv6 in the format: + .. xxx.xxx.xxx.xxx + .. xxx.xxx.xxx.xxx/zz + .. ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff + .. ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/zz + and all compressed forms. Also the short forms + are supported: + xxx/yy + xxx.xxx/yy + + .. attribute:: version + + Specifies which IP version to validate, + valid values are 1 (fields.IPv4), 2 (fields.IPv6) or + both - 3 (fields.IPv4 | fields.IPv6). + Defaults to IPv4 (1) + + .. attribute:: mask + + Boolean flag to validate subnet masks along with IP address. + E.g: 10.0.0.1/32 + + .. attribute:: mask_range_from + Subnet range limitation, e.g. 16 + That means the input mask will be checked to be in the range + 16:max_value. Useful to limit the subnet ranges + to A/B/C-class networks. + """ + invalid_format_message = _("Incorrect format for IP address") + invalid_version_message = _("Invalid version for IP address") + invalid_mask_message = _("Invalid subnet mask") + max_v4_mask = 32 + max_v6_mask = 128 + + def __init__(self, *args, **kwargs): + self.mask = kwargs.pop("mask", None) + self.min_mask = kwargs.pop("mask_range_from", 0) + self.version = kwargs.pop('version', IPv4) + + super(IPField, self).__init__(*args, **kwargs) + + def validate(self, value): + super(IPField, self).validate(value) + if not value and not self.required: + return + + try: + if self.mask: + self.ip = netaddr.IPNetwork(value) + else: + self.ip = netaddr.IPAddress(value) + except: + raise ValidationError(self.invalid_format_message) + + if not any([self.version & IPv4 > 0 and self.ip.version == 4, + self.version & IPv6 > 0 and self.ip.version == 6]): + raise ValidationError(self.invalid_version_message) + + if self.mask: + if self.ip.version == 4 and \ + not self.min_mask <= self.ip.prefixlen <= self.max_v4_mask: + raise ValidationError(self.invalid_mask_message) + + if self.ip.version == 6 and \ + not self.min_mask <= self.ip.prefixlen <= self.max_v6_mask: + raise ValidationError(self.invalid_mask_message) + + def clean(self, value): + super(IPField, self).clean(value) + return str(getattr(self, "ip", "")) diff --git a/horizon/utils/validators.py b/horizon/utils/validators.py index 1e63824a2..8ea1350ce 100644 --- a/horizon/utils/validators.py +++ b/horizon/utils/validators.py @@ -14,19 +14,10 @@ # License for the specific language governing permissions and limitations # under the License. -import re - from django.conf import settings -from django.core import validators from django.core.exceptions import ValidationError from django.utils.translation import ugettext as _ -ipv4_cidr_re = re.compile(r'^(25[0-5]|2[0-4]\d|[0-1]?\d?\d)' # 0-255 - '(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}' # 3x .0-255 - '/(3[0-2]|[1-2]?\d)$') # /0-32 - - -validate_ipv4_cidr = validators.RegexValidator(ipv4_cidr_re) horizon_config = getattr(settings, "HORIZON_CONFIG", {}) password_config = horizon_config.get("password_validator", {}) diff --git a/tools/test-requires b/tools/test-requires index 116ff8d6e..c33108bf9 100644 --- a/tools/test-requires +++ b/tools/test-requires @@ -8,6 +8,7 @@ pep8 pylint distribute>=0.6.24 selenium +netaddr # Docs Requirements sphinx