# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright (c) 2012 OpenStack, LLC. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the 'License'); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import netaddr import re from quantum.common import constants from quantum.common import exceptions as q_exc from quantum.openstack.common import log as logging from quantum.openstack.common import uuidutils LOG = logging.getLogger(__name__) ATTR_NOT_SPECIFIED = object() # Defining a constant to avoid repeating string literal in several modules SHARED = 'shared' def _verify_dict_keys(expected_keys, target_dict, strict=True): """ Allows to verify keys in a dictionary. :param expected_keys: A list of keys expected to be present. :param target_dict: The dictionary which should be verified. :param strict: Specifies whether additional keys are allowed to be present. :return: True, if keys in the dictionary correspond to the specification. """ if not isinstance(target_dict, dict): msg = (_("Invalid input. '%(target_dict)s' must be a dictionary " "with keys: %(expected_keys)s") % dict(target_dict=target_dict, expected_keys=expected_keys)) return msg expected_keys = set(expected_keys) provided_keys = set(target_dict.keys()) predicate = expected_keys.__eq__ if strict else expected_keys.issubset if not predicate(provided_keys): msg = (_("Validation of dictionary's keys failed." "Expected keys: %(expected_keys)s " "Provided keys: %(provided_keys)s") % locals()) return msg def is_attr_set(attribute): return not (attribute is None or attribute is ATTR_NOT_SPECIFIED) def _validate_values(data, valid_values=None): if data not in valid_values: msg = (_("'%(data)s' is not in %(valid_values)s") % dict(data=data, valid_values=valid_values)) LOG.debug(msg) return msg def _validate_string(data, max_len=None): if not isinstance(data, basestring): msg = _("'%s' is not a valid string") % data LOG.debug(msg) return msg if max_len is not None and len(data) > max_len: msg = (_("'%(data)s' exceeds maximum length of %(max_len)s") % dict(data=data, max_len=max_len)) LOG.debug(msg) return msg def _validate_range(data, valid_values=None): min_value = valid_values[0] max_value = valid_values[1] if not min_value <= data <= max_value: msg = _("'%(data)s' is not in range %(min_value)s through " "%(max_value)s") % dict(data=data, min_value=min_value, max_value=max_value) LOG.debug(msg) return msg def _validate_mac_address(data, valid_values=None): try: netaddr.EUI(data) except Exception: msg = _("'%s' is not a valid MAC address") % data LOG.debug(msg) return msg def _validate_ip_address(data, valid_values=None): try: netaddr.IPAddress(data) except Exception: msg = _("'%s' is not a valid IP address") % data LOG.debug(msg) return msg def _validate_ip_pools(data, valid_values=None): """Validate that start and end IP addresses are present In addition to this the IP addresses will also be validated """ if not isinstance(data, list): msg = _("Invalid data format for IP pool: '%s'") % data LOG.debug(msg) return msg expected_keys = ['start', 'end'] for ip_pool in data: msg = _verify_dict_keys(expected_keys, ip_pool) if msg: LOG.debug(msg) return msg for k in expected_keys: msg = _validate_ip_address(ip_pool[k]) if msg: LOG.debug(msg) return msg def _validate_fixed_ips(data, valid_values=None): if not isinstance(data, list): msg = _("Invalid data format for fixed IP: '%s'") % data LOG.debug(msg) return msg ips = [] for fixed_ip in data: if not isinstance(fixed_ip, dict): msg = _("Invalid data format for fixed IP: '%s'") % fixed_ip LOG.debug(msg) return msg if 'ip_address' in fixed_ip: # Ensure that duplicate entries are not set - just checking IP # suffices. Duplicate subnet_id's are legitimate. fixed_ip_address = fixed_ip['ip_address'] if fixed_ip_address in ips: msg = _("Duplicate IP address '%s'") % fixed_ip_address else: msg = _validate_ip_address(fixed_ip_address) if msg: LOG.debug(msg) return msg ips.append(fixed_ip_address) if 'subnet_id' in fixed_ip: msg = _validate_uuid(fixed_ip['subnet_id']) if msg: LOG.debug(msg) return msg def _validate_nameservers(data, valid_values=None): if not hasattr(data, '__iter__'): msg = _("Invalid data format for nameserver: '%s'") % data LOG.debug(msg) return msg ips = [] for ip in data: msg = _validate_ip_address(ip) if msg: # This may be a hostname msg = _validate_regex(ip, HOSTNAME_PATTERN) if msg: msg = _("'%s' is not a valid nameserver") % ip LOG.debug(msg) return msg if ip in ips: msg = _("Duplicate nameserver '%s'") % ip LOG.debug(msg) return msg ips.append(ip) def _validate_hostroutes(data, valid_values=None): if not isinstance(data, list): msg = _("Invalid data format for hostroute: '%s'") % data LOG.debug(msg) return msg expected_keys = ['destination', 'nexthop'] hostroutes = [] for hostroute in data: msg = _verify_dict_keys(expected_keys, hostroute) if msg: LOG.debug(msg) return msg msg = _validate_subnet(hostroute['destination']) if msg: LOG.debug(msg) return msg msg = _validate_ip_address(hostroute['nexthop']) if msg: LOG.debug(msg) return msg if hostroute in hostroutes: msg = _("Duplicate hostroute '%s'") % hostroute LOG.debug(msg) return msg hostroutes.append(hostroute) def _validate_ip_address_or_none(data, valid_values=None): if data is None: return None return _validate_ip_address(data, valid_values) def _validate_subnet(data, valid_values=None): try: netaddr.IPNetwork(data) if len(data.split('/')) == 2: return except Exception: pass msg = _("'%s' is not a valid IP subnet") % data LOG.debug(msg) return msg def _validate_regex(data, valid_values=None): try: if re.match(valid_values, data): return except TypeError: pass msg = _("'%s' is not a valid input") % data LOG.debug(msg) return msg def _validate_uuid(data, valid_values=None): if not uuidutils.is_uuid_like(data): msg = _("'%s' is not a valid UUID") % data LOG.debug(msg) return msg def _validate_uuid_or_none(data, valid_values=None): if data is not None: return _validate_uuid(data) def _validate_uuid_list(data, valid_values=None): if not isinstance(data, list): msg = _("'%s' is not a list") % data LOG.debug(msg) return msg for item in data: msg = _validate_uuid(item) if msg: LOG.debug(msg) return msg if len(set(data)) != len(data): msg = _("Duplicate items in the list: '%s'") % ', '.join(data) LOG.debug(msg) return msg def _validate_dict(data, key_specs=None): if not isinstance(data, dict): msg = _("'%s' is not a dictionary") % data LOG.debug(msg) return msg # Do not perform any further validation, if no constraints are supplied if not key_specs: return # Check whether all required keys are present required_keys = [key for key, spec in key_specs.iteritems() if spec.get('required')] if required_keys: msg = _verify_dict_keys(required_keys, data, False) if msg: LOG.debug(msg) return msg # Perform validation of all values according to the specifications. for key, key_validator in [(k, v) for k, v in key_specs.iteritems() if k in data]: for val_name in [n for n in key_validator.iterkeys() if n.startswith('type:')]: # Check whether specified validator exists. if val_name not in validators: msg = _("Validator '%s' does not exist.") % val_name LOG.debug(msg) return msg val_func = validators[val_name] val_params = key_validator[val_name] msg = val_func(data.get(key), val_params) if msg: LOG.debug(msg) return msg def _validate_dict_or_none(data, key_specs=None): if data is not None: return _validate_dict(data, key_specs) def _validate_dict_or_empty(data, key_specs=None): if data != {}: return _validate_dict(data, key_specs) def _validate_non_negative(data, valid_values=None): try: data = int(data) except (ValueError, TypeError): msg = _("'%s' is not an integer") % data LOG.debug(msg) return msg if data < 0: msg = _("'%s' should be non-negative") % data LOG.debug(msg) return msg def convert_to_boolean(data): if isinstance(data, basestring): val = data.lower() if val == "true" or val == "1": return True if val == "false" or val == "0": return False elif isinstance(data, bool): return data elif isinstance(data, int): if data == 0: return False elif data == 1: return True msg = _("'%s' cannot be converted to boolean") % data raise q_exc.InvalidInput(error_message=msg) def convert_to_int(data): try: return int(data) except (ValueError, TypeError): msg = _("'%s' is not a integer") % data raise q_exc.InvalidInput(error_message=msg) def convert_kvp_str_to_list(data): """Convert a value of the form 'key=value' to ['key', 'value']. :raises: q_exc.InvalidInput if any of the strings are malformed (e.g. do not contain a key). """ kvp = [x.strip() for x in data.split('=', 1)] if len(kvp) == 2 and kvp[0]: return kvp msg = _("'%s' is not of the form =[value]") % data raise q_exc.InvalidInput(error_message=msg) def convert_kvp_list_to_dict(kvp_list): """Convert a list of 'key=value' strings to a dict. :raises: q_exc.InvalidInput if any of the strings are malformed (e.g. do not contain a key) or if any of the keys appear more than once. """ if kvp_list == ['True']: # No values were provided (i.e. '--flag-name') return {} kvp_map = {} for kvp_str in kvp_list: key, value = convert_kvp_str_to_list(kvp_str) kvp_map.setdefault(key, set()) kvp_map[key].add(value) return dict((x, list(y)) for x, y in kvp_map.iteritems()) def convert_none_to_empty_list(value): return [] if value is None else value def convert_none_to_empty_dict(value): return {} if value is None else value def convert_to_list(data): if data is None: return [] elif hasattr(data, '__iter__'): return list(data) else: return [data] HOSTNAME_PATTERN = ("(?=^.{1,254}$)(^(?:(?!\d+\.|-)[a-zA-Z0-9_\-]" "{1,63}(?