Implementation of blueprint ip-validation

First draft. Added a forms.Field wrapper for IPAddress.
Implemented IPv4 and IPv6 checks, subnet mask range,
optional mask range limitation.

As far as I see now, there is only 1 place in Dashboard
to accept IP fields as input - the Security rules.
I've tried to input IPv6 rule and it was accepted.
The previous version of the code doesn't accept
IPv6, only IPv4. I am not sure if IPv6 should be
accepted here. It however works.

Patch set 3: Now using netaddr library(used also by nova),
which provides support for validation of IP addresses.
Using this library, now the IPField can support more
ways to enter an IP - like short versions:
10/8 - for all 10.xxx.xxx.xxx
192.168/16 - for all 192.168.xxx.xxx

Regarding IPy library - it performs some strict
subnet validation, which will not accept cidr like this:
192.168.1.1/20
because the only mask that matches this IP is 32.
IPy doesn't allow broader masks. But my assumption is
that the operators should take the responsibility for
the data they enter. At least this CIDR is valid after all.

Change-Id: Ie497fe65fde3af25a18109a182ab78255ad7ec60
This commit is contained in:
Tihomir Trifonov 2012-05-09 16:43:17 +03:00
parent 856983fbcd
commit cb48cebefc
5 changed files with 227 additions and 21 deletions

View File

@ -30,8 +30,8 @@ from novaclient import exceptions as novaclient_exceptions
from horizon import api
from horizon import exceptions
from horizon import forms
from horizon.utils.validators import validate_ipv4_cidr
from horizon.utils.validators import validate_port_range
from horizon.utils import fields
LOG = logging.getLogger(__name__)
@ -82,12 +82,13 @@ class AddRule(forms.SelfHandlingForm):
validators=[validate_port_range])
source_group = forms.ChoiceField(label=_('Source Group'), required=False)
cidr = forms.CharField(label=_("CIDR"),
cidr = fields.IPField(label=_("CIDR"),
required=False,
initial="0.0.0.0/0",
help_text=_("Classless Inter-Domain Routing "
"(e.g. 192.168.0.0/24)"),
validators=[validate_ipv4_cidr])
version=fields.IPv4 | fields.IPv6,
mask=True)
security_group_id = forms.IntegerField(widget=forms.HiddenInput())

View File

@ -16,7 +16,8 @@
from horizon import test
from horizon.utils import validators
from django.core.exceptions import ValidationError
from horizon.utils import fields
class ValidatorsTests(test.TestCase):
@ -27,16 +28,144 @@ class ValidatorsTests(test.TestCase):
"10.144.11.107/4",
"255.255.255.255/0",
"0.1.2.3/16",
"0.0.0.0/32")
BAD_CIDRS = ("255.255.255.256",
"256.255.255.255",
"1.2.3.4.5",
"0.0.0.0",
"0.0.0.0/32",
# short form
"128.0/16",
"128/4")
BAD_CIDRS = ("255.255.255.256\\",
"256.255.255.255$",
"1.2.3.4.5/41",
"0.0.0.0/99",
"127.0.0.1/",
"127.0.0.1/33",
"127.0.0.1/-1",
"127.0.0.1/100")
"127.0.0.1/100",
# some valid IPv6 addresses
"fe80::204:61ff:254.157.241.86/4",
"fe80::204:61ff:254.157.241.86/0",
"2001:0DB8::CD30:0:0:0:0/60",
"2001:0DB8::CD30:0/90")
ip = fields.IPField(mask=True, version=fields.IPv4)
for cidr in GOOD_CIDRS:
self.assertTrue(validators.ipv4_cidr_re.match(cidr))
self.assertIsNone(ip.validate(cidr))
for cidr in BAD_CIDRS:
self.assertFalse(validators.ipv4_cidr_re.match(cidr))
self.assertRaises(ValidationError, ip.validate, cidr)
def test_validate_ipv6_cidr(self):
GOOD_CIDRS = ("::ffff:0:0/56",
"2001:0db8::1428:57ab/17",
"FEC0::/10",
"fe80::204:61ff:254.157.241.86/4",
"fe80::204:61ff:254.157.241.86/0",
"2001:0DB8::CD30:0:0:0:0/60",
"2001:0DB8::CD30:0/90",
"::1/128")
BAD_CIDRS = ("1111:2222:3333:4444:::/",
"::2222:3333:4444:5555:6666:7777:8888:\\",
":1111:2222:3333:4444::6666:1.2.3.4/1000",
"1111:2222::4444:5555:6666::8888@",
"1111:2222::4444:5555:6666:8888/",
"::ffff:0:0/129",
"1.2.3.4:1111:2222::5555//22",
"fe80::204:61ff:254.157.241.86/200",
# some valid IPv4 addresses
"10.144.11.107/4",
"255.255.255.255/0",
"0.1.2.3/16")
ip = fields.IPField(mask=True, version=fields.IPv6)
for cidr in GOOD_CIDRS:
self.assertIsNone(ip.validate(cidr))
for cidr in BAD_CIDRS:
self.assertRaises(ValidationError, ip.validate, cidr)
def test_validate_mixed_cidr(self):
GOOD_CIDRS = ("::ffff:0:0/56",
"2001:0db8::1428:57ab/17",
"FEC0::/10",
"fe80::204:61ff:254.157.241.86/4",
"fe80::204:61ff:254.157.241.86/0",
"2001:0DB8::CD30:0:0:0:0/60",
"0.0.0.0/16",
"10.144.11.107/4",
"255.255.255.255/0",
"0.1.2.3/16",
# short form
"128.0/16",
"10/4")
BAD_CIDRS = ("1111:2222:3333:4444::://",
"::2222:3333:4444:5555:6666:7777:8888:",
":1111:2222:3333:4444::6666:1.2.3.4/1/1",
"1111:2222::4444:5555:6666::8888\\2",
"1111:2222::4444:5555:6666:8888/",
"1111:2222::4444:5555:6666::8888/130",
"127.0.0.1/",
"127.0.0.1/33",
"127.0.0.1/-1")
ip = fields.IPField(mask=True, version=fields.IPv4 | fields.IPv6)
for cidr in GOOD_CIDRS:
self.assertIsNone(ip.validate(cidr))
for cidr in BAD_CIDRS:
self.assertRaises(ValidationError, ip.validate, cidr)
def test_validate_IPs(self):
GOOD_IPS_V4 = ("0.0.0.0",
"10.144.11.107",
"169.144.11.107",
"172.100.11.107",
"255.255.255.255",
"0.1.2.3")
GOOD_IPS_V6 = ("",
"::ffff:0:0",
"2001:0db8::1428:57ab",
"FEC0::",
"fe80::204:61ff:254.157.241.86",
"fe80::204:61ff:254.157.241.86",
"2001:0DB8::CD30:0:0:0:0")
BAD_IPS_V4 = ("1111:2222:3333:4444:::",
"::2222:3333:4444:5555:6666:7777:8888:",
":1111:2222:3333:4444::6666:1.2.3.4",
"1111:2222::4444:5555:6666::8888",
"1111:2222::4444:5555:6666:8888/",
"1111:2222::4444:5555:6666::8888/130",
"127.0.0.1/",
"127.0.0.1/33",
"127.0.0.1/-1")
BAD_IPS_V6 = ("1111:2222:3333:4444:::",
"::2222:3333:4444:5555:6666:7777:8888:",
":1111:2222:3333:4444::6666:1.2.3.4",
"1111:2222::4444:5555:6666::8888",
"1111:2222::4444:5555:6666:8888/",
"1111:2222::4444:5555:6666::8888/130")
ipv4 = fields.IPField(required=True, version=fields.IPv4)
ipv6 = fields.IPField(required=False, version=fields.IPv6)
ipmixed = fields.IPField(required=False,
version=fields.IPv4 | fields.IPv6)
for ip_addr in GOOD_IPS_V4:
self.assertIsNone(ipv4.validate(ip_addr))
self.assertIsNone(ipmixed.validate(ip_addr))
for ip_addr in GOOD_IPS_V6:
self.assertIsNone(ipv6.validate(ip_addr))
self.assertIsNone(ipmixed.validate(ip_addr))
for ip_addr in BAD_IPS_V4:
self.assertRaises(ValidationError, ipv4.validate, ip_addr)
self.assertRaises(ValidationError, ipmixed.validate, ip_addr)
for ip_addr in BAD_IPS_V6:
self.assertRaises(ValidationError, ipv6.validate, ip_addr)
self.assertRaises(ValidationError, ipmixed.validate, ip_addr)
self.assertRaises(ValidationError, ipv4.validate, "") # required=True
iprange = fields.IPField(required=False,
mask=True,
mask_range_from=10,
version=fields.IPv4 | fields.IPv6)
self.assertRaises(ValidationError, iprange.validate,
"fe80::204:61ff:254.157.241.86/6")
self.assertRaises(ValidationError, iprange.validate,
"169.144.11.107/8")
self.assertIsNone(iprange.validate("fe80::204:61ff:254.157.241.86/36"))
self.assertIsNone(iprange.validate("169.144.11.107/18"))

84
horizon/utils/fields.py Normal file
View File

@ -0,0 +1,84 @@
import re
import netaddr
from django.core.exceptions import ValidationError
from django.forms import forms
from django.utils.translation import ugettext as _
ip_allowed_symbols_re = re.compile(r'^[a-fA-F0-9:/\.]+$')
IPv4 = 1
IPv6 = 2
class IPField(forms.Field):
"""
Form field for entering IP/range values, with validation.
Supports IPv4/IPv6 in the format:
.. xxx.xxx.xxx.xxx
.. xxx.xxx.xxx.xxx/zz
.. ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff
.. ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/zz
and all compressed forms. Also the short forms
are supported:
xxx/yy
xxx.xxx/yy
.. attribute:: version
Specifies which IP version to validate,
valid values are 1 (fields.IPv4), 2 (fields.IPv6) or
both - 3 (fields.IPv4 | fields.IPv6).
Defaults to IPv4 (1)
.. attribute:: mask
Boolean flag to validate subnet masks along with IP address.
E.g: 10.0.0.1/32
.. attribute:: mask_range_from
Subnet range limitation, e.g. 16
That means the input mask will be checked to be in the range
16:max_value. Useful to limit the subnet ranges
to A/B/C-class networks.
"""
invalid_format_message = _("Incorrect format for IP address")
invalid_version_message = _("Invalid version for IP address")
invalid_mask_message = _("Invalid subnet mask")
max_v4_mask = 32
max_v6_mask = 128
def __init__(self, *args, **kwargs):
self.mask = kwargs.pop("mask", None)
self.min_mask = kwargs.pop("mask_range_from", 0)
self.version = kwargs.pop('version', IPv4)
super(IPField, self).__init__(*args, **kwargs)
def validate(self, value):
super(IPField, self).validate(value)
if not value and not self.required:
return
try:
if self.mask:
self.ip = netaddr.IPNetwork(value)
else:
self.ip = netaddr.IPAddress(value)
except:
raise ValidationError(self.invalid_format_message)
if not any([self.version & IPv4 > 0 and self.ip.version == 4,
self.version & IPv6 > 0 and self.ip.version == 6]):
raise ValidationError(self.invalid_version_message)
if self.mask:
if self.ip.version == 4 and \
not self.min_mask <= self.ip.prefixlen <= self.max_v4_mask:
raise ValidationError(self.invalid_mask_message)
if self.ip.version == 6 and \
not self.min_mask <= self.ip.prefixlen <= self.max_v6_mask:
raise ValidationError(self.invalid_mask_message)
def clean(self, value):
super(IPField, self).clean(value)
return str(getattr(self, "ip", ""))

View File

@ -14,19 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
from django.conf import settings
from django.core import validators
from django.core.exceptions import ValidationError
from django.utils.translation import ugettext as _
ipv4_cidr_re = re.compile(r'^(25[0-5]|2[0-4]\d|[0-1]?\d?\d)' # 0-255
'(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}' # 3x .0-255
'/(3[0-2]|[1-2]?\d)$') # /0-32
validate_ipv4_cidr = validators.RegexValidator(ipv4_cidr_re)
horizon_config = getattr(settings, "HORIZON_CONFIG", {})
password_config = horizon_config.get("password_validator", {})

View File

@ -8,6 +8,7 @@ pep8
pylint
distribute>=0.6.24
selenium
netaddr
# Docs Requirements
sphinx