Ensure that HTTP 400 codes are returned for invalid input

Fixes bug 1062046

A number of additional validation functions were added. They
do basic validations.

Change-Id: I0fc207e996f11b50fdaa4d80075ae5704cba7003
This commit is contained in:
Gary Kotton 2012-10-08 05:58:00 +00:00
parent 20a63af370
commit 6a0db7180b
6 changed files with 464 additions and 41 deletions

View File

@ -36,7 +36,7 @@ def _validate_boolean(data, valid_values=None):
if data in [True, False]: if data in [True, False]:
return return
else: else:
msg = _("%s is not boolean") % data msg = _("'%s' is not boolean") % data
LOG.debug("validate_boolean: %s", msg) LOG.debug("validate_boolean: %s", msg)
return msg return msg
@ -51,6 +51,19 @@ def _validate_values(data, valid_values=None):
return msg return msg
def _validate_string(data, max_len=None):
if not isinstance(data, basestring):
msg = _("'%s' is not a valid string") % data
LOG.debug("validate_string: %s", msg)
return msg
if max_len is not None:
if len(data) > max_len:
msg = _("'%(data)s' exceeds maximum length of "
"%(max_len)s.") % locals()
LOG.debug("validate_string: %s", msg)
return msg
def _validate_range(data, valid_values=None): def _validate_range(data, valid_values=None):
min_value = valid_values[0] min_value = valid_values[0]
max_value = valid_values[1] max_value = valid_values[1]
@ -69,7 +82,7 @@ def _validate_mac_address(data, valid_values=None):
netaddr.EUI(data) netaddr.EUI(data)
return return
except Exception: except Exception:
msg = _("%s is not a valid MAC address") % data msg = _("'%s' is not a valid MAC address") % data
LOG.debug("validate_mac_address: %s", msg) LOG.debug("validate_mac_address: %s", msg)
return msg return msg
@ -79,11 +92,131 @@ def _validate_ip_address(data, valid_values=None):
netaddr.IPAddress(data) netaddr.IPAddress(data)
return return
except Exception: except Exception:
msg = _("%s is not a valid IP address") % data msg = _("'%s' is not a valid IP address") % data
LOG.debug("validate_ip_address: %s", msg) LOG.debug("validate_ip_address: %s", msg)
return msg return msg
def _validate_ip_pools(data, valid_values=None):
"""Validate that start and end IP addresses are present
In addition to this the IP addresses will also be validated"""
if not isinstance(data, list):
msg = _("'%s' in not a valid IP pool") % data
LOG.debug("validate_ip_pools: %s", msg)
return msg
expected_keys = set(['start', 'end'])
try:
for ip_pool in data:
if set(ip_pool.keys()) != expected_keys:
msg = _("Expected keys not found. Expected: %s "
"Provided: %s") % (expected_keys, ip_pool.keys())
LOG.debug("validate_ip_pools: %s", msg)
return msg
for k in expected_keys:
msg = _validate_ip_address(ip_pool[k])
if msg:
LOG.debug("validate_ip_pools: %s", msg)
return msg
except KeyError, e:
args = {'key_name': e.message, 'ip_pool': ip_pool}
msg = _("Invalid input. Required key: '%(key_name)s' "
"missing from %(ip_pool)s.") % args
LOG.debug("validate_ip_pools: %s", msg)
return msg
except TypeError, e:
msg = _("Invalid input. Pool %s must be a dictionary.") % ip_pool
LOG.debug("validate_ip_pools: %s", msg)
return msg
except Exception:
msg = _("'%s' in not a valid IP pool") % data
LOG.debug("validate_ip_pools: %s", msg)
return msg
def _validate_fixed_ips(data, valid_values=None):
if not isinstance(data, list):
msg = _("'%s' in not a valid fixed IP") % data
LOG.debug("validate_fixed_ips: %s", msg)
return msg
ips = []
try:
for fixed_ip in data:
if 'ip_address' in fixed_ip:
msg = _validate_ip_address(fixed_ip['ip_address'])
if msg:
LOG.debug("validate_fixed_ips: %s", msg)
return msg
if 'subnet_id' in fixed_ip:
msg = _validate_regex(fixed_ip['subnet_id'], UUID_PATTERN)
if msg:
LOG.debug("validate_fixed_ips: %s", msg)
return msg
# Ensure that duplicate entries are not set - just checking IP
# suffices. Duplicate subnet_id's are legitimate.
if 'ip_address' in fixed_ip:
if fixed_ip['ip_address'] in ips:
msg = _("Duplicate entry %s") % fixed_ip
LOG.debug("validate_fixed_ips: %s", msg)
return msg
ips.append(fixed_ip['ip_address'])
except Exception:
msg = _("'%s' in not a valid fixed IP") % data
LOG.debug("validate_fixed_ips: %s", msg)
return msg
def _validate_nameservers(data, valid_values=None):
if not hasattr(data, '__iter__'):
msg = _("'%s' in not a valid nameserver") % data
LOG.debug("validate_nameservers: %s", msg)
return msg
ips = set()
for ip in data:
msg = _validate_ip_address(ip)
if msg:
# This may be a hostname
msg = _validate_regex(ip, HOSTNAME_PATTERN)
if msg:
msg = _("'%s' in not a valid nameserver") % ip
LOG.debug("validate_nameservers: %s", msg)
return msg
if ip in ips:
msg = _("Duplicate nameserver %s") % ip
LOG.debug("validate_nameservers: %s", msg)
return msg
ips.add(ip)
def _validate_hostroutes(data, valid_values=None):
if not isinstance(data, list):
msg = _("'%s' in not a valid hostroute") % data
LOG.debug("validate_hostroutes: %s", msg)
return msg
hostroutes = []
try:
for hostroute in data:
msg = _validate_subnet(hostroute['destination'])
if msg:
LOG.debug("validate_hostroutes: %s", msg)
return msg
msg = _validate_ip_address(hostroute['nexthop'])
if msg:
LOG.debug("validate_hostroutes: %s", msg)
return msg
if hostroute in hostroutes:
msg = _("Duplicate hostroute %s") % hostroute
LOG.debug("validate_hostroutes: %s", msg)
if msg:
return msg
hostroutes.append(hostroute)
except:
msg = _("'%s' in not a valid hostroute") % data
LOG.debug("validate_hostroutes: %s", msg)
return msg
def _validate_ip_address_or_none(data, valid_values=None): def _validate_ip_address_or_none(data, valid_values=None):
if data is None: if data is None:
return None return None
@ -98,19 +231,21 @@ def _validate_subnet(data, valid_values=None):
except Exception: except Exception:
pass pass
msg = _("%s is not a valid IP subnet") % data msg = _("'%s' is not a valid IP subnet") % data
LOG.debug("validate_subnet: %s", msg) LOG.debug("validate_subnet: %s", msg)
return msg return msg
def _validate_regex(data, valid_values=None): def _validate_regex(data, valid_values=None):
match = re.match(valid_values, data) try:
if match: if re.match(valid_values, data):
return return
else: except TypeError:
msg = _("%s is not valid") % data pass
LOG.debug("validate_regex: %s", msg)
return msg msg = _("'%s' is not valid input") % data
LOG.debug("validate_regex: %s", msg)
return msg
def _validate_uuid(data, valid_values=None): def _validate_uuid(data, valid_values=None):
@ -131,15 +266,23 @@ def convert_to_boolean(data):
return True return True
else: else:
return False return False
except ValueError, TypeError: except (ValueError, TypeError):
if (data == "True" or data == "true"): if (data == "True" or data == "true"):
return True return True
if (data == "False" or data == "false"): if (data == "False" or data == "false"):
return False return False
msg = _("%s is not boolean") % data msg = _("'%s' is not boolean") % data
raise q_exc.InvalidInput(error_message=msg) raise q_exc.InvalidInput(error_message=msg)
def convert_to_int(data):
try:
return int(data)
except (ValueError, TypeError):
msg = _("'%s' is not a integer") % data
raise q_exc.InvalidInput(error_message=msg)
def convert_kvp_str_to_list(data): def convert_kvp_str_to_list(data):
"""Convert a value of the form 'key=value' to ['key', 'value']. """Convert a value of the form 'key=value' to ['key', 'value'].
@ -170,6 +313,8 @@ def convert_kvp_list_to_dict(kvp_list):
kvp_map[key].add(value) kvp_map[key].add(value)
return dict((x, list(y)) for x, y in kvp_map.iteritems()) return dict((x, list(y)) for x, y in kvp_map.iteritems())
HOSTNAME_PATTERN = ("(?=^.{1,254}$)(^(?:(?!\d+\.|-)[a-zA-Z0-9_\-]"
"{1,63}(?<!-)\.?)+(?:[a-zA-Z]{2,})$)")
HEX_ELEM = '[0-9A-Fa-f]' HEX_ELEM = '[0-9A-Fa-f]'
UUID_PATTERN = '-'.join([HEX_ELEM + '{8}', HEX_ELEM + '{4}', UUID_PATTERN = '-'.join([HEX_ELEM + '{8}', HEX_ELEM + '{4}',
@ -181,6 +326,11 @@ MAC_PATTERN = "^%s[aceACE02468](:%s{2}){5}$" % (HEX_ELEM, HEX_ELEM)
# Dictionary that maintains a list of validation functions # Dictionary that maintains a list of validation functions
validators = {'type:boolean': _validate_boolean, validators = {'type:boolean': _validate_boolean,
'type:values': _validate_values,
'type:string': _validate_string,
'type:range': _validate_range,
'type:mac_address': _validate_mac_address,
'type:fixed_ips': _validate_fixed_ips,
'type:ip_address': _validate_ip_address, 'type:ip_address': _validate_ip_address,
'type:ip_address_or_none': _validate_ip_address_or_none, 'type:ip_address_or_none': _validate_ip_address_or_none,
'type:mac_address': _validate_mac_address, 'type:mac_address': _validate_mac_address,
@ -188,7 +338,10 @@ validators = {'type:boolean': _validate_boolean,
'type:regex': _validate_regex, 'type:regex': _validate_regex,
'type:subnet': _validate_subnet, 'type:subnet': _validate_subnet,
'type:uuid': _validate_uuid, 'type:uuid': _validate_uuid,
'type:values': _validate_values} 'type:regex': _validate_regex,
'type:ip_pools': _validate_ip_pools,
'type:hostroutes': _validate_hostroutes,
'type:nameservers': _validate_nameservers}
# Note: a default of ATTR_NOT_SPECIFIED indicates that an # Note: a default of ATTR_NOT_SPECIFIED indicates that an
# attribute is not required, but will be generated by the plugin # attribute is not required, but will be generated by the plugin
@ -219,6 +372,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'validate': {'type:uuid': None}, 'validate': {'type:uuid': None},
'is_visible': True}, 'is_visible': True},
'name': {'allow_post': True, 'allow_put': True, 'name': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'default': '', 'is_visible': True}, 'default': '', 'is_visible': True},
'subnets': {'allow_post': False, 'allow_put': False, 'subnets': {'allow_post': False, 'allow_put': False,
'default': [], 'default': [],
@ -231,6 +385,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'status': {'allow_post': False, 'allow_put': False, 'status': {'allow_post': False, 'allow_put': False,
'is_visible': True}, 'is_visible': True},
'tenant_id': {'allow_post': True, 'allow_put': False, 'tenant_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'required_by_policy': True, 'required_by_policy': True,
'is_visible': True}, 'is_visible': True},
SHARED: {'allow_post': True, SHARED: {'allow_post': True,
@ -247,6 +402,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'validate': {'type:uuid': None}, 'validate': {'type:uuid': None},
'is_visible': True}, 'is_visible': True},
'name': {'allow_post': True, 'allow_put': True, 'default': '', 'name': {'allow_post': True, 'allow_put': True, 'default': '',
'validate': {'type:string': None},
'is_visible': True}, 'is_visible': True},
'network_id': {'allow_post': True, 'allow_put': False, 'network_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True, 'required_by_policy': True,
@ -265,15 +421,19 @@ RESOURCE_ATTRIBUTE_MAP = {
'fixed_ips': {'allow_post': True, 'allow_put': True, 'fixed_ips': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED, 'default': ATTR_NOT_SPECIFIED,
'convert_list_to': convert_kvp_list_to_dict, 'convert_list_to': convert_kvp_list_to_dict,
'validate': {'type:fixed_ips': None},
'enforce_policy': True, 'enforce_policy': True,
'is_visible': True}, 'is_visible': True},
'device_id': {'allow_post': True, 'allow_put': True, 'device_id': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'default': '', 'default': '',
'is_visible': True}, 'is_visible': True},
'device_owner': {'allow_post': True, 'allow_put': True, 'device_owner': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'default': '', 'default': '',
'is_visible': True}, 'is_visible': True},
'tenant_id': {'allow_post': True, 'allow_put': False, 'tenant_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'required_by_policy': True, 'required_by_policy': True,
'is_visible': True}, 'is_visible': True},
'status': {'allow_post': False, 'allow_put': False, 'status': {'allow_post': False, 'allow_put': False,
@ -284,9 +444,10 @@ RESOURCE_ATTRIBUTE_MAP = {
'validate': {'type:uuid': None}, 'validate': {'type:uuid': None},
'is_visible': True}, 'is_visible': True},
'name': {'allow_post': True, 'allow_put': True, 'default': '', 'name': {'allow_post': True, 'allow_put': True, 'default': '',
'validate': {'type:string': None},
'is_visible': True}, 'is_visible': True},
'ip_version': {'allow_post': True, 'allow_put': False, 'ip_version': {'allow_post': True, 'allow_put': False,
'convert_to': int, 'convert_to': convert_to_int,
'validate': {'type:values': [4, 6]}, 'validate': {'type:values': [4, 6]},
'is_visible': True}, 'is_visible': True},
'network_id': {'allow_post': True, 'allow_put': False, 'network_id': {'allow_post': True, 'allow_put': False,
@ -303,14 +464,18 @@ RESOURCE_ATTRIBUTE_MAP = {
#TODO(salvatore-orlando): Enable PUT on allocation_pools #TODO(salvatore-orlando): Enable PUT on allocation_pools
'allocation_pools': {'allow_post': True, 'allow_put': False, 'allocation_pools': {'allow_post': True, 'allow_put': False,
'default': ATTR_NOT_SPECIFIED, 'default': ATTR_NOT_SPECIFIED,
'validate': {'type:ip_pools': None},
'is_visible': True}, 'is_visible': True},
'dns_nameservers': {'allow_post': True, 'allow_put': True, 'dns_nameservers': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED, 'default': ATTR_NOT_SPECIFIED,
'validate': {'type:nameservers': None},
'is_visible': True}, 'is_visible': True},
'host_routes': {'allow_post': True, 'allow_put': True, 'host_routes': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED, 'default': ATTR_NOT_SPECIFIED,
'validate': {'type:hostroutes': None},
'is_visible': True}, 'is_visible': True},
'tenant_id': {'allow_post': True, 'allow_put': False, 'tenant_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'required_by_policy': True, 'required_by_policy': True,
'is_visible': True}, 'is_visible': True},
'enable_dhcp': {'allow_post': True, 'allow_put': True, 'enable_dhcp': {'allow_post': True, 'allow_put': True,

View File

@ -15,6 +15,7 @@
import socket import socket
import netaddr
import webob.exc import webob.exc
from quantum.api.v2 import attributes from quantum.api.v2 import attributes
@ -44,7 +45,10 @@ FAULT_MAP = {exceptions.NotFound: webob.exc.HTTPNotFound,
exceptions.HostRoutesExhausted: webob.exc.HTTPBadRequest, exceptions.HostRoutesExhausted: webob.exc.HTTPBadRequest,
exceptions.DNSNameServersExhausted: webob.exc.HTTPBadRequest, exceptions.DNSNameServersExhausted: webob.exc.HTTPBadRequest,
# Some plugins enforce policies as well # Some plugins enforce policies as well
exceptions.PolicyNotAuthorized: webob.exc.HTTPForbidden exceptions.PolicyNotAuthorized: webob.exc.HTTPForbidden,
netaddr.AddrFormatError: webob.exc.HTTPBadRequest,
AttributeError: webob.exc.HTTPBadRequest,
ValueError: webob.exc.HTTPBadRequest,
} }
QUOTAS = quota.QUOTAS QUOTAS = quota.QUOTAS

View File

@ -17,6 +17,7 @@
Utility methods for working with WSGI servers redux Utility methods for working with WSGI servers redux
""" """
import netaddr
import webob import webob
import webob.dec import webob.dec
import webob.exc import webob.exc
@ -93,7 +94,9 @@ def Resource(controller, faults=None, deserializers=None, serializers=None):
method = getattr(controller, action) method = getattr(controller, action)
result = method(request=request, **args) result = method(request=request, **args)
except exceptions.QuantumException as e: except (ValueError, AttributeError,
exceptions.QuantumException,
netaddr.AddrFormatError) as e:
LOG.exception('%s failed' % action) LOG.exception('%s failed' % action)
body = serializer({'QuantumError': str(e)}) body = serializer({'QuantumError': str(e)})
kwargs = {'body': body, 'content_type': content_type} kwargs = {'body': body, 'content_type': content_type}

View File

@ -955,8 +955,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
s['gateway_ip'] != attributes.ATTR_NOT_SPECIFIED): s['gateway_ip'] != attributes.ATTR_NOT_SPECIFIED):
self._validate_ip_version(ip_ver, s['gateway_ip'], 'gateway_ip') self._validate_ip_version(ip_ver, s['gateway_ip'], 'gateway_ip')
if 'dns_nameservers' in s and \ if ('dns_nameservers' in s and
s['dns_nameservers'] != attributes.ATTR_NOT_SPECIFIED: s['dns_nameservers'] != attributes.ATTR_NOT_SPECIFIED):
if len(s['dns_nameservers']) > cfg.CONF.max_dns_nameservers: if len(s['dns_nameservers']) > cfg.CONF.max_dns_nameservers:
raise q_exc.DNSNameServersExhausted( raise q_exc.DNSNameServersExhausted(
subnet_id=id, subnet_id=id,
@ -969,8 +969,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
error_message=("error parsing dns address %s" % dns)) error_message=("error parsing dns address %s" % dns))
self._validate_ip_version(ip_ver, dns, 'dns_nameserver') self._validate_ip_version(ip_ver, dns, 'dns_nameserver')
if 'host_routes' in s and \ if ('host_routes' in s and
s['host_routes'] != attributes.ATTR_NOT_SPECIFIED: s['host_routes'] != attributes.ATTR_NOT_SPECIFIED):
if len(s['host_routes']) > cfg.CONF.max_subnet_host_routes: if len(s['host_routes']) > cfg.CONF.max_subnet_host_routes:
raise q_exc.HostRoutesExhausted( raise q_exc.HostRoutesExhausted(
subnet_id=id, subnet_id=id,

View File

@ -23,6 +23,90 @@ from quantum.common import exceptions as q_exc
class TestAttributes(unittest2.TestCase): class TestAttributes(unittest2.TestCase):
def test_strings(self):
msg = attributes._validate_string(None, None)
self.assertEquals(msg, "'None' is not a valid string")
msg = attributes._validate_string("OK", None)
self.assertEquals(msg, None)
msg = attributes._validate_string("123456789", 9)
self.assertIsNone(msg)
msg = attributes._validate_string("1234567890", 9)
self.assertIsNotNone(msg)
def test_ip_pools(self):
pools = [[{'end': '10.0.0.254'}],
[{'start': '10.0.0.254'}],
[{'start': '1000.0.0.254',
'end': '1.1.1.1'}],
[{'start': '10.0.0.2', 'end': '10.0.0.254',
'forza': 'juve'}],
[{'start': '10.0.0.2', 'end': '10.0.0.254'},
{'end': '10.0.0.254'}],
None]
for pool in pools:
msg = attributes._validate_ip_pools(pool, None)
self.assertIsNotNone(msg)
pools = [[{'end': '10.0.0.254', 'start': '10.0.0.2'},
{'start': '11.0.0.2', 'end': '11.1.1.1'}],
[{'start': '11.0.0.2', 'end': '11.0.0.100'}]]
for pool in pools:
msg = attributes._validate_ip_pools(pool, None)
self.assertIsNone(msg)
def test_fixed_ips(self):
fixed_ips = [[{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
'ip_address': '1111.1.1.1'}],
[{'subnet_id': 'invalid'}],
None,
[{'subnet_id': '00000000-0fff-ffff-ffff-000000000000',
'ip_address': '1.1.1.1'},
{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
'ip_address': '1.1.1.1'}],
[{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
'ip_address': '1.1.1.1'},
{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
'ip_address': '1.1.1.1'}]]
for fixed in fixed_ips:
msg = attributes._validate_fixed_ips(fixed, None)
self.assertIsNotNone(msg)
def test_nameservers(self):
ns_pools = [['1.1.1.2', '1.1.1.2'],
['www.hostname.com', 'www.hostname.com'],
['77.hostname.com'],
['1000.0.0.1'],
None]
for ns in ns_pools:
msg = attributes._validate_nameservers(ns, None)
self.assertIsNotNone(msg)
ns_pools = [['100.0.0.2'],
['www.hostname.com'],
['www.great.marathons.to.travel'],
['valid'],
['www.internal.hostname.com']]
for ns in ns_pools:
msg = attributes._validate_nameservers(ns, None)
self.assertIsNone(msg)
def test_hostroutes(self):
hostroute_pools = [[{'destination': '100.0.0.0/24'}],
[{'nexthop': '10.0.2.20'}],
[{'nexthop': '10.0.2.20',
'destination': '100.0.0.0/8'},
{'nexthop': '10.0.2.20',
'destination': '100.0.0.0/8'}],
None]
for host_routes in hostroute_pools:
msg = attributes._validate_hostroutes(host_routes, None)
self.assertIsNotNone(msg)
def test_mac_addresses(self): def test_mac_addresses(self):
# Valid - 3 octets # Valid - 3 octets
base_mac = "fa:16:3e:00:00:00" base_mac = "fa:16:3e:00:00:00"
@ -40,50 +124,43 @@ class TestAttributes(unittest2.TestCase):
base_mac = "01:16:3e:4f:00:00" base_mac = "01:16:3e:4f:00:00"
msg = attributes._validate_regex(base_mac, msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN) attributes.MAC_PATTERN)
error = '%s is not valid' % base_mac self.assertIsNotNone(msg)
self.assertEquals(msg, error)
# Invalid - invalid format # Invalid - invalid format
base_mac = "a:16:3e:4f:00:00" base_mac = "a:16:3e:4f:00:00"
msg = attributes._validate_regex(base_mac, msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN) attributes.MAC_PATTERN)
error = '%s is not valid' % base_mac self.assertIsNotNone(msg)
self.assertEquals(msg, error)
# Invalid - invalid format # Invalid - invalid format
base_mac = "ffa:16:3e:4f:00:00" base_mac = "ffa:16:3e:4f:00:00"
msg = attributes._validate_regex(base_mac, msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN) attributes.MAC_PATTERN)
error = '%s is not valid' % base_mac self.assertIsNotNone(msg)
self.assertEquals(msg, error)
# Invalid - invalid format # Invalid - invalid format
base_mac = "01163e4f0000" base_mac = "01163e4f0000"
msg = attributes._validate_regex(base_mac, msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN) attributes.MAC_PATTERN)
error = '%s is not valid' % base_mac self.assertIsNotNone(msg)
self.assertEquals(msg, error)
# Invalid - invalid format # Invalid - invalid format
base_mac = "01-16-3e-4f-00-00" base_mac = "01-16-3e-4f-00-00"
msg = attributes._validate_regex(base_mac, msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN) attributes.MAC_PATTERN)
error = '%s is not valid' % base_mac self.assertIsNotNone(msg)
self.assertEquals(msg, error)
# Invalid - invalid format # Invalid - invalid format
base_mac = "00:16:3:f:00:00" base_mac = "00:16:3:f:00:00"
msg = attributes._validate_regex(base_mac, msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN) attributes.MAC_PATTERN)
error = '%s is not valid' % base_mac self.assertIsNotNone(msg)
self.assertEquals(msg, error)
# Invalid - invalid format # Invalid - invalid format
base_mac = "12:3:4:5:67:89ab" base_mac = "12:3:4:5:67:89ab"
msg = attributes._validate_regex(base_mac, msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN) attributes.MAC_PATTERN)
error = '%s is not valid' % base_mac self.assertIsNotNone(msg)
self.assertEquals(msg, error)
def test_cidr(self): def test_cidr(self):
# Valid - IPv4 # Valid - IPv4
@ -108,21 +185,21 @@ class TestAttributes(unittest2.TestCase):
cidr = "10.0.2.0" cidr = "10.0.2.0"
msg = attributes._validate_subnet(cidr, msg = attributes._validate_subnet(cidr,
None) None)
error = "%s is not a valid IP subnet" % cidr error = "'%s' is not a valid IP subnet" % cidr
self.assertEquals(msg, error) self.assertEquals(msg, error)
# Invalid - IPv6 without final octets, missing mask # Invalid - IPv6 without final octets, missing mask
cidr = "fe80::" cidr = "fe80::"
msg = attributes._validate_subnet(cidr, msg = attributes._validate_subnet(cidr,
None) None)
error = "%s is not a valid IP subnet" % cidr error = "'%s' is not a valid IP subnet" % cidr
self.assertEquals(msg, error) self.assertEquals(msg, error)
# Invalid - IPv6 with final octets, missing mask # Invalid - IPv6 with final octets, missing mask
cidr = "fe80::0" cidr = "fe80::0"
msg = attributes._validate_subnet(cidr, msg = attributes._validate_subnet(cidr,
None) None)
error = "%s is not a valid IP subnet" % cidr error = "'%s' is not a valid IP subnet" % cidr
self.assertEquals(msg, error) self.assertEquals(msg, error)

View File

@ -352,12 +352,12 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
def _do_side_effect(self, patched_plugin, orig, *args, **kwargs): def _do_side_effect(self, patched_plugin, orig, *args, **kwargs):
""" Invoked by test cases for injecting failures in plugin """ """ Invoked by test cases for injecting failures in plugin """
def second_call(*args, **kwargs): def second_call(*args, **kwargs):
raise Exception('boom') raise AttributeError
patched_plugin.side_effect = second_call patched_plugin.side_effect = second_call
return orig(*args, **kwargs) return orig(*args, **kwargs)
def _validate_behavior_on_bulk_failure(self, res, collection): def _validate_behavior_on_bulk_failure(self, res, collection):
self.assertEqual(res.status_int, 500) self.assertEqual(res.status_int, 400)
req = self.new_list_request(collection) req = self.new_list_request(collection)
res = req.get_response(self.api) res = req.get_response(self.api)
self.assertEquals(res.status_int, 200) self.assertEquals(res.status_int, 200)
@ -559,6 +559,14 @@ class TestV2HTTPResponse(QuantumDbPluginV2TestCase):
res = req.get_response(self.api) res = req.get_response(self.api)
self.assertEquals(res.status_int, 200) self.assertEquals(res.status_int, 200)
def test_update_invalid_json_400(self):
with self.network() as net:
req = self.new_update_request('networks',
'{{"name": "aaa"}}',
net['network']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_bad_route_404(self): def test_bad_route_404(self):
req = self.new_list_request('doohickeys') req = self.new_list_request('doohickeys')
res = req.get_response(self.api) res = req.get_response(self.api)
@ -808,6 +816,13 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
self.assertEqual(res['port']['admin_state_up'], self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up']) data['port']['admin_state_up'])
def test_update_device_id_null(self):
with self.port() as port:
data = {'port': {'device_id': None}}
req = self.new_update_request('ports', data, port['port']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_delete_network_if_port_exists(self): def test_delete_network_if_port_exists(self):
fmt = 'json' fmt = 'json'
with self.port() as port: with self.port() as port:
@ -1028,6 +1043,19 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
res = self._create_port(fmt, net_id=net_id, **kwargs) res = self._create_port(fmt, net_id=net_id, **kwargs)
self.assertEquals(res.status_int, 400) self.assertEquals(res.status_int, 400)
def test_overlapping_subnets(self):
fmt = 'json'
with self.subnet() as subnet:
tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
res = self._create_subnet(fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='10.0.0.225/28',
ip_version=4,
gateway_ip=ATTR_NOT_SPECIFIED)
self.assertEquals(res.status_int, 400)
def test_requested_subnet_id_v4_and_v6(self): def test_requested_subnet_id_v4_and_v6(self):
fmt = 'json' fmt = 'json'
with self.subnet() as subnet: with self.subnet() as subnet:
@ -1158,6 +1186,17 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id']) self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port2['port']['id']) self._delete('ports', port2['port']['id'])
def test_invalid_ip(self):
fmt = 'json'
with self.subnet() as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '1011.0.0.5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port = self.deserialize(fmt, res)
self.assertEquals(res.status_int, 400)
def test_requested_split(self): def test_requested_split(self):
fmt = 'json' fmt = 'json'
with self.subnet() as subnet: with self.subnet() as subnet:
@ -1205,7 +1244,29 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
net_id = subnet['subnet']['network_id'] net_id = subnet['subnet']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs) res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res) port2 = self.deserialize(fmt, res)
self.assertEquals(res.status_int, 500) self.assertEquals(res.status_int, 400)
def test_fixed_ip_invalid_subnet_id(self):
fmt = 'json'
with self.subnet() as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': 'i am invalid',
'ip_address': '10.0.0.5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res)
self.assertEquals(res.status_int, 400)
def test_fixed_ip_invalid_ip(self):
fmt = 'json'
with self.subnet() as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.55555'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res)
self.assertEquals(res.status_int, 400)
def test_requested_ips_only(self): def test_requested_ips_only(self):
fmt = 'json' fmt = 'json'
@ -1962,6 +2023,119 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
res = subnet_req.get_response(self.api) res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 403) self.assertEquals(res.status_int, 403)
def test_create_subnet_bad_ip_version(self):
with self.network() as network:
# Check bad IP version
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 'abc',
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_bad_ip_version_null(self):
with self.network() as network:
# Check bad IP version
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': None,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_bad_uuid(self):
with self.network() as network:
# Check invalid UUID
data = {'subnet': {'network_id': None,
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_bad_boolean(self):
with self.network() as network:
# Check invalid boolean
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': '4',
'enable_dhcp': None,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_bad_pools(self):
with self.network() as network:
# Check allocation pools
allocation_pools = [[{'end': '10.0.0.254'}],
[{'start': '10.0.0.254'}],
[{'start': '1000.0.0.254'}],
[{'start': '10.0.0.2', 'end': '10.0.0.254'},
{'end': '10.0.0.254'}],
None,
[{'start': '10.0.0.2', 'end': '10.0.0.3'},
{'start': '10.0.0.2', 'end': '10.0.0.3'}]]
tenant_id = network['network']['tenant_id']
for pool in allocation_pools:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': '4',
'tenant_id': tenant_id,
'gateway_ip': '10.0.2.1',
'allocation_pools': pool}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_bad_nameserver(self):
with self.network() as network:
# Check nameservers
nameserver_pools = [['1100.0.0.2'],
['1.1.1.2', '1.1000.1.3'],
['1.1.1.2', '1.1.1.2'],
None]
tenant_id = network['network']['tenant_id']
for nameservers in nameserver_pools:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': '4',
'tenant_id': tenant_id,
'gateway_ip': '10.0.2.1',
'dns_nameservers': nameservers}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_bad_hostroutes(self):
with self.network() as network:
# Check hostroutes
hostroute_pools = [[{'destination': '100.0.0.0/24'}],
[{'nexthop': '10.0.2.20'}],
[{'nexthop': '10.0.2.20',
'destination': '100.0.0.0/8'},
{'nexthop': '10.0.2.20',
'destination': '100.0.0.0/8'}],
None]
tenant_id = network['network']['tenant_id']
for hostroutes in hostroute_pools:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': '4',
'tenant_id': tenant_id,
'gateway_ip': '10.0.2.1',
'host_routes': hostroutes}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_defaults(self): def test_create_subnet_defaults(self):
gateway = '10.0.0.1' gateway = '10.0.0.1'
cidr = '10.0.0.0/24' cidr = '10.0.0.0/24'