Validate CIDR given as ip-prefix in security-group-rule-create
There was no validation for the provided ip prefix. This just adds a simple parse using netaddr and explodes with appropriate message. Also makes sure ip prefix _is_ cidr (192.168.1.1-->192.168.1.1/32). Validation occurs at the attribute level (API model) as well as at the db level, where the ethertype is validated against the ip_prefix address type. Unit test cases added - bad prefix, unmasked prefix and incorrect ethertype. Also adds attribute test cases for the added convert_ip_prefix_to_cidr method Change-Id: I71fb8c887963a122a5bd8cfdda800026c1cd3954 Closes-Bug: 1255338
This commit is contained in:
parent
7cfe65f099
commit
c147d88261
@ -319,3 +319,7 @@ class DuplicatedExtension(NeutronException):
|
||||
class DeviceIDNotOwnedByTenant(Conflict):
|
||||
message = _("The following device_id %(device_id)s is not owned by your "
|
||||
"tenant or matches another tenants router.")
|
||||
|
||||
|
||||
class InvalidCIDR(BadRequest):
|
||||
message = _("Invalid CIDR %(input)s given as IP prefix")
|
||||
|
@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import netaddr
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import orm
|
||||
from sqlalchemy.orm import exc
|
||||
@ -331,6 +332,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
|
||||
new_rules.add(rule['security_group_id'])
|
||||
|
||||
self._validate_port_range(rule)
|
||||
self._validate_ip_prefix(rule)
|
||||
|
||||
if rule['remote_ip_prefix'] and rule['remote_group_id']:
|
||||
raise ext_sg.SecurityGroupRemoteGroupAndRemoteIpPrefix()
|
||||
@ -411,6 +413,24 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
|
||||
if (i['security_group_rule'] == db_rule):
|
||||
raise ext_sg.SecurityGroupRuleExists(id=id)
|
||||
|
||||
def _validate_ip_prefix(self, rule):
|
||||
"""Check that a valid cidr was specified as remote_ip_prefix
|
||||
|
||||
No need to check that it is in fact an IP address as this is already
|
||||
validated by attribute validators.
|
||||
Check that rule ethertype is consistent with remote_ip_prefix ip type.
|
||||
Add mask to ip_prefix if absent (192.168.1.10 -> 192.168.1.10/32).
|
||||
"""
|
||||
input_prefix = rule['remote_ip_prefix']
|
||||
if input_prefix:
|
||||
addr = netaddr.IPNetwork(input_prefix)
|
||||
# set input_prefix to always include the netmask:
|
||||
rule['remote_ip_prefix'] = str(addr)
|
||||
# check consistency of ethertype with addr version
|
||||
if rule['ethertype'] != "IPv%d" % (addr.version):
|
||||
raise ext_sg.SecurityGroupRuleParameterConflict(
|
||||
ethertype=rule['ethertype'], cidr=input_prefix)
|
||||
|
||||
def get_security_group_rules(self, context, filters=None, fields=None,
|
||||
sorts=None, limit=None, marker=None,
|
||||
page_reverse=False):
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
from abc import ABCMeta
|
||||
from abc import abstractmethod
|
||||
import netaddr
|
||||
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
@ -106,6 +107,10 @@ class SecurityGroupRuleExists(qexception.InUse):
|
||||
message = _("Security group rule already exists. Group id is %(id)s.")
|
||||
|
||||
|
||||
class SecurityGroupRuleParameterConflict(qexception.InvalidInput):
|
||||
message = _("Conflicting value ethertype %(ethertype)s for CIDR %(cidr)s")
|
||||
|
||||
|
||||
def convert_protocol(value):
|
||||
if value is None:
|
||||
return
|
||||
@ -156,6 +161,16 @@ def convert_to_uuid_list_or_none(value_list):
|
||||
return value_list
|
||||
|
||||
|
||||
def convert_ip_prefix_to_cidr(ip_prefix):
|
||||
if not ip_prefix:
|
||||
return
|
||||
try:
|
||||
cidr = netaddr.IPNetwork(ip_prefix)
|
||||
return str(cidr)
|
||||
except (TypeError, netaddr.AddrFormatError):
|
||||
raise qexception.InvalidCIDR(input=ip_prefix)
|
||||
|
||||
|
||||
def _validate_name_not_default(data, valid_values=None):
|
||||
if data == "default":
|
||||
raise SecurityGroupDefaultAlreadyExists()
|
||||
@ -211,7 +226,8 @@ RESOURCE_ATTRIBUTE_MAP = {
|
||||
'convert_to': convert_ethertype_to_case_insensitive,
|
||||
'validate': {'type:values': sg_supported_ethertypes}},
|
||||
'remote_ip_prefix': {'allow_post': True, 'allow_put': False,
|
||||
'default': None, 'is_visible': True},
|
||||
'default': None, 'is_visible': True,
|
||||
'convert_to': convert_ip_prefix_to_cidr},
|
||||
'tenant_id': {'allow_post': True, 'allow_put': False,
|
||||
'required_by_policy': True,
|
||||
'is_visible': True},
|
||||
|
@ -21,10 +21,12 @@ import webob.exc
|
||||
|
||||
from neutron.api.v2 import attributes as attr
|
||||
from neutron.common import constants as const
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron import context
|
||||
from neutron.db import db_base_plugin_v2
|
||||
from neutron.db import securitygroups_db
|
||||
from neutron.extensions import securitygroup as ext_sg
|
||||
from neutron.tests import base
|
||||
from neutron.tests.unit import test_db_plugin
|
||||
|
||||
DB_PLUGIN_KLASS = ('neutron.tests.unit.test_extension_security_group.'
|
||||
@ -404,6 +406,70 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_security_group_rule_invalid_ip_prefix(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
for bad_prefix in ['bad_ip', 256, "2001:db8:a::123/129", '172.30./24']:
|
||||
with self.security_group(name, description) as sg:
|
||||
sg_id = sg['security_group']['id']
|
||||
remote_ip_prefix = bad_prefix
|
||||
rule = self._build_security_group_rule(
|
||||
sg_id,
|
||||
'ingress',
|
||||
const.PROTO_NAME_TCP,
|
||||
'22', '22',
|
||||
remote_ip_prefix)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_security_group_rule_invalid_ethertype_for_prefix(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
test_addr = {'192.168.1.1/24': 'ipv4', '192.168.1.1/24': 'IPv6',
|
||||
'2001:db8:1234::/48': 'ipv6',
|
||||
'2001:db8:1234::/48': 'IPv4'}
|
||||
for prefix, ether in test_addr.iteritems():
|
||||
with self.security_group(name, description) as sg:
|
||||
sg_id = sg['security_group']['id']
|
||||
ethertype = ether
|
||||
remote_ip_prefix = prefix
|
||||
rule = self._build_security_group_rule(
|
||||
sg_id,
|
||||
'ingress',
|
||||
const.PROTO_NAME_TCP,
|
||||
'22', '22',
|
||||
remote_ip_prefix,
|
||||
None,
|
||||
None,
|
||||
ethertype)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_security_group_rule_with_unmasked_prefix(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
addr = {'10.1.2.3': {'mask': '32', 'ethertype': 'IPv4'},
|
||||
'fe80::2677:3ff:fe7d:4c': {'mask': '128', 'ethertype': 'IPv6'}}
|
||||
for ip in addr:
|
||||
with self.security_group(name, description) as sg:
|
||||
sg_id = sg['security_group']['id']
|
||||
ethertype = addr[ip]['ethertype']
|
||||
remote_ip_prefix = ip
|
||||
rule = self._build_security_group_rule(
|
||||
sg_id,
|
||||
'ingress',
|
||||
const.PROTO_NAME_TCP,
|
||||
'22', '22',
|
||||
remote_ip_prefix,
|
||||
None,
|
||||
None,
|
||||
ethertype)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.assertEqual(res.status_int, 201)
|
||||
res_sg = self.deserialize(self.fmt, res)
|
||||
prefix = res_sg['security_group_rule']['remote_ip_prefix']
|
||||
self.assertEqual(prefix, '%s/%s' % (ip, addr[ip]['mask']))
|
||||
|
||||
def test_create_security_group_rule_tcp_protocol_as_number(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
@ -1348,5 +1414,25 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
|
||||
class TestConvertIPPrefixToCIDR(base.BaseTestCase):
|
||||
|
||||
def test_convert_bad_ip_prefix_to_cidr(self):
|
||||
for val in ['bad_ip', 256, "2001:db8:a::123/129"]:
|
||||
self.assertRaises(n_exc.InvalidCIDR,
|
||||
ext_sg.convert_ip_prefix_to_cidr, val)
|
||||
self.assertIsNone(ext_sg.convert_ip_prefix_to_cidr(None))
|
||||
|
||||
def test_convert_ip_prefix_no_netmask_to_cidr(self):
|
||||
addr = {'10.1.2.3': '32', 'fe80::2677:3ff:fe7d:4c': '128'}
|
||||
for k, v in addr.iteritems():
|
||||
self.assertEqual(ext_sg.convert_ip_prefix_to_cidr(k),
|
||||
'%s/%s' % (k, v))
|
||||
|
||||
def test_convert_ip_prefix_with_netmask_to_cidr(self):
|
||||
addresses = ['10.1.0.0/16', '10.1.2.3/32', '2001:db8:1234::/48']
|
||||
for addr in addresses:
|
||||
self.assertEqual(ext_sg.convert_ip_prefix_to_cidr(addr), addr)
|
||||
|
||||
|
||||
class TestSecurityGroupsXML(TestSecurityGroups):
|
||||
fmt = 'xml'
|
||||
|
@ -42,7 +42,7 @@ from neutron.tests.unit import test_extension_security_group as test_sg
|
||||
|
||||
|
||||
FAKE_PREFIX = {const.IPv4: '10.0.0.0/24',
|
||||
const.IPv6: '2001:0db8::/64'}
|
||||
const.IPv6: '2001:db8::/64'}
|
||||
FAKE_IP = {const.IPv4: '10.0.0.1',
|
||||
const.IPv6: 'fe80::1',
|
||||
'IPv6_GLOBAL': '2001:0db8::1',
|
||||
|
Loading…
Reference in New Issue
Block a user