Merge "NEC plugin: PFC packet fitler support"

This commit is contained in:
Jenkins 2014-03-05 14:22:11 +00:00 committed by Gerrit Code Review
commit 5e44c8452c
17 changed files with 1129 additions and 122 deletions

View File

@ -161,6 +161,12 @@ def _validate_mac_address(data, valid_values=None):
return msg
def _validate_mac_address_or_none(data, valid_values=None):
if data is None:
return
return _validate_mac_address(data, valid_values)
def _validate_ip_address(data, valid_values=None):
try:
netaddr.IPAddress(_validate_no_whitespace(data))
@ -315,6 +321,12 @@ def _validate_subnet_list(data, valid_values=None):
return msg
def _validate_subnet_or_none(data, valid_values=None):
if data is None:
return
return _validate_subnet(data, valid_values)
def _validate_regex(data, valid_values=None):
try:
if re.match(valid_values, data):
@ -327,6 +339,12 @@ def _validate_regex(data, valid_values=None):
return msg
def _validate_regex_or_none(data, valid_values=None):
if data is None:
return
return _validate_regex(data, valid_values)
def _validate_uuid(data, valid_values=None):
if not uuidutils.is_uuid_like(data):
msg = _("'%s' is not a valid UUID") % data
@ -534,10 +552,12 @@ validators = {'type:dict': _validate_dict,
'type:ip_address_or_none': _validate_ip_address_or_none,
'type:ip_pools': _validate_ip_pools,
'type:mac_address': _validate_mac_address,
'type:mac_address_or_none': _validate_mac_address_or_none,
'type:nameservers': _validate_nameservers,
'type:non_negative': _validate_non_negative,
'type:range': _validate_range,
'type:regex': _validate_regex,
'type:regex_or_none': _validate_regex_or_none,
'type:string': _validate_string,
'type:string_or_none': _validate_string_or_none,
'type:not_empty_string': _validate_not_empty_string,
@ -545,6 +565,7 @@ validators = {'type:dict': _validate_dict,
_validate_not_empty_string_or_none,
'type:subnet': _validate_subnet,
'type:subnet_list': _validate_subnet_list,
'type:subnet_or_none': _validate_subnet_or_none,
'type:uuid': _validate_uuid,
'type:uuid_or_none': _validate_uuid_or_none,
'type:uuid_list': _validate_uuid_list,

View File

@ -15,6 +15,8 @@
# under the License.
# @author: Ryota MIBU
# TODO(amotoki): bug 1287432: Rename quantum_id column in ID mapping tables.
import sqlalchemy as sa
from neutron.db import api as db
@ -202,6 +204,21 @@ def del_portinfo(session, id):
"port_id: %s"), id)
def get_active_ports_on_ofc(context, network_id, port_id=None):
"""Retrieve ports on OFC on a given network.
It returns a list of tuple (neutron port_id, OFC id).
"""
query = context.session.query(nmodels.OFCPortMapping)
query = query.join(models_v2.Port,
nmodels.OFCPortMapping.quantum_id == models_v2.Port.id)
query = query.filter(models_v2.Port.network_id == network_id)
if port_id:
query = query.filter(nmodels.OFCPortMapping.quantum_id == port_id)
return [(p['quantum_id'], p['ofc_id']) for p in query]
def get_port_from_device(port_id):
"""Get port from database."""
LOG.debug(_("get_port_with_securitygroups() called:port_id=%s"), port_id)

View File

@ -20,19 +20,18 @@ from sqlalchemy import orm
from sqlalchemy.orm import exc as sa_exc
from neutron.api.v2 import attributes
from neutron.common import exceptions
from neutron.db import model_base
from neutron.db import models_v2
from neutron.openstack.common import uuidutils
from neutron.plugins.nec.db import models as nmodels
from neutron.plugins.nec.extensions import packetfilter as ext_pf
PF_STATUS_ACTIVE = 'ACTIVE'
PF_STATUS_DOWN = 'DOWN'
PF_STATUS_ERROR = 'ERROR'
class PacketFilterNotFound(exceptions.NotFound):
message = _("PacketFilter %(id)s could not be found")
INT_FIELDS = ('eth_type', 'src_port', 'dst_port')
class PacketFilter(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
@ -80,14 +79,15 @@ class PacketFilterDbMixin(object):
'action': pf_entry['action'],
'priority': pf_entry['priority'],
'in_port': pf_entry['in_port'],
'src_mac': pf_entry['src_mac'],
'dst_mac': pf_entry['dst_mac'],
'eth_type': pf_entry['eth_type'],
'src_cidr': pf_entry['src_cidr'],
'dst_cidr': pf_entry['dst_cidr'],
'protocol': pf_entry['protocol'],
'src_port': pf_entry['src_port'],
'dst_port': pf_entry['dst_port'],
# "or None" ensure the filed is None if empty
'src_mac': pf_entry['src_mac'] or None,
'dst_mac': pf_entry['dst_mac'] or None,
'eth_type': pf_entry['eth_type'] or None,
'src_cidr': pf_entry['src_cidr'] or None,
'dst_cidr': pf_entry['dst_cidr'] or None,
'protocol': pf_entry['protocol'] or None,
'src_port': pf_entry['src_port'] or None,
'dst_port': pf_entry['dst_port'] or None,
'admin_state_up': pf_entry['admin_state_up'],
'status': pf_entry['status']}
return self._fields(res, fields)
@ -96,7 +96,7 @@ class PacketFilterDbMixin(object):
try:
pf_entry = self._get_by_id(context, PacketFilter, id)
except sa_exc.NoResultFound:
raise PacketFilterNotFound(id=id)
raise ext_pf.PacketFilterNotFound(id=id)
return pf_entry
def get_packet_filter(self, context, id, fields=None):
@ -110,6 +110,39 @@ class PacketFilterDbMixin(object):
filters=filters,
fields=fields)
def _replace_unspecified_field(self, params, key):
if not attributes.is_attr_set(params[key]):
if key == 'in_port':
params[key] = None
elif key in INT_FIELDS:
# Integer field
params[key] = 0
else:
params[key] = ''
def _get_eth_type_for_protocol(self, protocol):
if protocol.upper() in ("ICMP", "TCP", "UDP"):
return 0x800
elif protocol.upper() == "ARP":
return 0x806
def _set_eth_type_from_protocol(self, filter_dict):
if filter_dict.get('protocol'):
eth_type = self._get_eth_type_for_protocol(filter_dict['protocol'])
if eth_type:
filter_dict['eth_type'] = eth_type
def _check_eth_type_and_protocol(self, new_filter, current_filter):
if 'protocol' in new_filter or 'eth_type' not in new_filter:
return
eth_type = self._get_eth_type_for_protocol(current_filter['protocol'])
if not eth_type:
return
if eth_type != new_filter['eth_type']:
raise ext_pf.PacketFilterEtherTypeProtocolMismatch(
eth_type=hex(new_filter['eth_type']),
protocol=current_filter['protocol'])
def create_packet_filter(self, context, packet_filter):
pf_dict = packet_filter['packet_filter']
tenant_id = self._get_tenant_id_for_create(context, pf_dict)
@ -138,12 +171,9 @@ class PacketFilterDbMixin(object):
'src_port': pf_dict['src_port'],
'dst_port': pf_dict['dst_port'],
'protocol': pf_dict['protocol']}
for key, default in params.items():
if params[key] == attributes.ATTR_NOT_SPECIFIED:
if key == 'in_port':
params[key] = None
else:
params[key] = ''
for key in params:
self._replace_unspecified_field(params, key)
self._set_eth_type_from_protocol(params)
with context.session.begin(subtransactions=True):
pf_entry = PacketFilter(**params)
@ -152,13 +182,40 @@ class PacketFilterDbMixin(object):
return self._make_packet_filter_dict(pf_entry)
def update_packet_filter(self, context, id, packet_filter):
pf = packet_filter['packet_filter']
params = packet_filter['packet_filter']
for key in params:
self._replace_unspecified_field(params, key)
self._set_eth_type_from_protocol(params)
with context.session.begin(subtransactions=True):
pf_entry = self._get_packet_filter(context, id)
pf_entry.update(pf)
self._check_eth_type_and_protocol(params, pf_entry)
pf_entry.update(params)
return self._make_packet_filter_dict(pf_entry)
def delete_packet_filter(self, context, id):
with context.session.begin(subtransactions=True):
pf_entry = self._get_packet_filter(context, id)
context.session.delete(pf_entry)
def get_packet_filters_for_port(self, context, port):
"""Retrieve packet filters on OFC on a given port.
It returns a list of tuple (neutron filter_id, OFC id).
"""
# TODO(amotoki): bug 1287432
# Rename quantum_id column in ID mapping tables.
query = (context.session.query(nmodels.OFCFilterMapping)
.join(PacketFilter,
nmodels.OFCFilterMapping.quantum_id == PacketFilter.id)
.filter(PacketFilter.admin_state_up == True))
network_id = port['network_id']
net_pf_query = (query.filter(PacketFilter.network_id == network_id)
.filter(PacketFilter.in_port == None))
net_filters = [(pf['quantum_id'], pf['ofc_id']) for pf in net_pf_query]
port_pf_query = query.filter(PacketFilter.in_port == port['id'])
port_filters = [(pf['quantum_id'], pf['ofc_id'])
for pf in port_pf_query]
return net_filters + port_filters

View File

@ -26,10 +26,11 @@ DRIVER_LIST = {
'trema_port': DRIVER_PATH % "trema.TremaPortBaseDriver",
'trema_portmac': DRIVER_PATH % "trema.TremaPortMACBaseDriver",
'trema_mac': DRIVER_PATH % "trema.TremaMACBaseDriver",
'pfc': DRIVER_PATH % "pfc.PFCV4Driver",
'pfc': DRIVER_PATH % "pfc.PFCV51Driver",
'pfc_v3': DRIVER_PATH % "pfc.PFCV3Driver",
'pfc_v4': DRIVER_PATH % "pfc.PFCV4Driver",
'pfc_v5': DRIVER_PATH % "pfc.PFCV5Driver",
'pfc_v51': DRIVER_PATH % "pfc.PFCV51Driver",
}

View File

@ -19,11 +19,23 @@
import re
import uuid
import netaddr
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as qexc
from neutron.common import log as call_log
from neutron import manager
from neutron.plugins.nec.common import ofc_client
from neutron.plugins.nec.db import api as ndb
from neutron.plugins.nec.extensions import packetfilter as ext_pf
from neutron.plugins.nec import ofc_driver_base
class InvalidOFCIdFormat(qexc.NeutronException):
message = _("OFC %(resource)s ID has an invalid format: %(ofc_id)s")
class PFCDriverBase(ofc_driver_base.OFCDriverBase):
"""Base Class for PDC Drivers.
@ -35,6 +47,12 @@ class PFCDriverBase(ofc_driver_base.OFCDriverBase):
router_supported = False
match_ofc_network_id = re.compile(
"^/tenants/(?P<tenant_id>[^/]+)/networks/(?P<network_id>[^/]+)$")
match_ofc_port_id = re.compile(
"^/tenants/(?P<tenant_id>[^/]+)/networks/(?P<network_id>[^/]+)"
"/ports/(?P<port_id>[^/]+)$")
def __init__(self, conf_ofc):
self.client = ofc_client.OFCClient(host=conf_ofc.host,
port=conf_ofc.port,
@ -76,8 +94,18 @@ class PFCDriverBase(ofc_driver_base.OFCDriverBase):
return self._generate_pfc_str(desc)[:127]
def _extract_ofc_network_id(self, ofc_network_id):
# ofc_network_id : /tenants/<tenant-id>/networks/<network-id>
return ofc_network_id.split('/')[4]
match = self.match_ofc_network_id.match(ofc_network_id)
if match:
return match.group('network_id')
raise InvalidOFCIdFormat(resource='network', ofc_id=ofc_network_id)
def _extract_ofc_port_id(self, ofc_port_id):
match = self.match_ofc_port_id.match(ofc_port_id)
if match:
return {'tenant': match.group('tenant_id'),
'network': match.group('network_id'),
'port': match.group('port_id')}
raise InvalidOFCIdFormat(resource='port', ofc_id=ofc_port_id)
def create_tenant(self, description, tenant_id=None):
ofc_tenant_id = self._generate_pfc_id(tenant_id)
@ -100,11 +128,14 @@ class PFCDriverBase(ofc_driver_base.OFCDriverBase):
return self.client.delete(ofc_network_id)
def create_port(self, ofc_network_id, portinfo,
port_id=None):
port_id=None, filters=None):
path = "%s/ports" % ofc_network_id
body = {'datapath_id': portinfo.datapath_id,
'port': str(portinfo.port_no),
'vid': str(portinfo.vlan_id)}
if self.filter_supported() and filters:
body['filters'] = [self._extract_ofc_filter_id(pf[1])
for pf in filters]
res = self.client.post(path, body=body)
ofc_port_id = res['id']
return path + '/' + ofc_port_id
@ -142,6 +173,154 @@ class PFCDriverBase(ofc_driver_base.OFCDriverBase):
return '%(network)s/ports/%(port)s' % params
class PFCFilterDriverMixin(object):
"""PFC PacketFilter Driver Mixin."""
filters_path = "/filters"
filter_path = "/filters/%s"
# PFC specific constants
MIN_PRIORITY = 1
MAX_PRIORITY = 32766
CREATE_ONLY_FIELDS = ['action', 'priority']
PFC_ALLOW_ACTION = "pass"
PFC_DROP_ACTION = "drop"
match_ofc_filter_id = re.compile("^/filters/(?P<filter_id>[^/]+)$")
@classmethod
def filter_supported(cls):
return True
def _set_param(self, filter_dict, body, key, create, convert_to=None):
if key in filter_dict:
if filter_dict[key]:
if convert_to:
body[key] = convert_to(filter_dict[key])
else:
body[key] = filter_dict[key]
elif not create:
body[key] = ""
def _generate_body(self, filter_dict, apply_ports=None, create=True):
body = {}
if create:
# action : pass, drop (mandatory)
if filter_dict['action'].lower() in ext_pf.ALLOW_ACTIONS:
body['action'] = self.PFC_ALLOW_ACTION
else:
body['action'] = self.PFC_DROP_ACTION
# priority : mandatory
body['priority'] = filter_dict['priority']
for key in ['src_mac', 'dst_mac', 'src_port', 'dst_port']:
self._set_param(filter_dict, body, key, create)
for key in ['src_cidr', 'dst_cidr']:
# CIDR must contain netmask even if it is an address.
convert_to = lambda x: str(netaddr.IPNetwork(x))
self._set_param(filter_dict, body, key, create, convert_to)
# protocol : decimal (0-255)
if 'protocol' in filter_dict:
if (not filter_dict['protocol'] or
# In the case of ARP, ip_proto should be set to wildcard.
# eth_type is set during adding an entry to DB layer.
filter_dict['protocol'].lower() == ext_pf.PROTO_NAME_ARP):
if not create:
body['protocol'] = ""
elif filter_dict['protocol'].lower() == constants.PROTO_NAME_ICMP:
body['protocol'] = constants.PROTO_NUM_ICMP
elif filter_dict['protocol'].lower() == constants.PROTO_NAME_TCP:
body['protocol'] = constants.PROTO_NUM_TCP
elif filter_dict['protocol'].lower() == constants.PROTO_NAME_UDP:
body['protocol'] = constants.PROTO_NUM_UDP
else:
body['protocol'] = int(filter_dict['protocol'], 0)
# eth_type : hex (0x0-0xFFFF)
self._set_param(filter_dict, body, 'eth_type', create, hex)
# apply_ports
if apply_ports:
# each element of apply_ports is a tuple of (neutron_id, ofc_id),
body['apply_ports'] = []
for p in apply_ports:
try:
body['apply_ports'].append(self._extract_ofc_port_id(p[1]))
except InvalidOFCIdFormat:
pass
return body
def _validate_filter_common(self, filter_dict):
# Currently PFC support only IPv4 CIDR.
for field in ['src_cidr', 'dst_cidr']:
if (not filter_dict.get(field) or
filter_dict[field] == attributes.ATTR_NOT_SPECIFIED):
continue
net = netaddr.IPNetwork(filter_dict[field])
if net.version != 4:
raise ext_pf.PacketFilterIpVersionNonSupported(
version=net.version, field=field, value=filter_dict[field])
if ('priority' in filter_dict and
not (self.MIN_PRIORITY <= filter_dict['priority']
<= self.MAX_PRIORITY)):
raise ext_pf.PacketFilterInvalidPriority(
min=self.MIN_PRIORITY, max=self.MAX_PRIORITY)
def _validate_duplicate_priority(self, context, filter_dict):
plugin = manager.NeutronManager.get_plugin()
filters = {'network_id': [filter_dict['network_id']],
'priority': [filter_dict['priority']]}
ret = plugin.get_packet_filters(context, filters=filters,
fields=['id'])
if ret:
raise ext_pf.PacketFilterDuplicatedPriority(
priority=filter_dict['priority'])
def validate_filter_create(self, context, filter_dict):
self._validate_filter_common(filter_dict)
self._validate_duplicate_priority(context, filter_dict)
def validate_filter_update(self, context, filter_dict):
for field in self.CREATE_ONLY_FIELDS:
if field in filter_dict:
raise ext_pf.PacketFilterUpdateNotSupported(field=field)
self._validate_filter_common(filter_dict)
@call_log.log
def create_filter(self, ofc_network_id, filter_dict,
portinfo=None, filter_id=None, apply_ports=None):
body = self._generate_body(filter_dict, apply_ports, create=True)
res = self.client.post(self.filters_path, body=body)
# filter_id passed from a caller is not used.
# ofc_filter_id is generated by PFC because the prefix of
# filter_id has special meaning and it is internally used.
ofc_filter_id = res['id']
return self.filter_path % ofc_filter_id
@call_log.log
def update_filter(self, ofc_filter_id, filter_dict):
body = self._generate_body(filter_dict, create=False)
self.client.put(ofc_filter_id, body)
@call_log.log
def delete_filter(self, ofc_filter_id):
return self.client.delete(ofc_filter_id)
def _extract_ofc_filter_id(self, ofc_filter_id):
match = self.match_ofc_filter_id.match(ofc_filter_id)
if match:
return match.group('filter_id')
raise InvalidOFCIdFormat(resource='filter', ofc_id=ofc_filter_id)
def convert_ofc_filter_id(self, context, ofc_filter_id):
# PFC Packet Filter is supported after the format of mapping tables
# are changed, so it is enough just to return ofc_filter_id
return ofc_filter_id
class PFCRouterDriverMixin(object):
router_supported = True
@ -218,3 +397,7 @@ class PFCV4Driver(PFCDriverBase):
class PFCV5Driver(PFCRouterDriverMixin, PFCDriverBase):
pass
class PFCV51Driver(PFCFilterDriverMixin, PFCV5Driver):
pass

View File

@ -86,7 +86,7 @@ class TremaFilterDriverMixin(object):
return True
def create_filter(self, ofc_network_id, filter_dict,
portinfo=None, filter_id=None):
portinfo=None, filter_id=None, apply_ports=None):
if filter_dict['action'].upper() in ["ACCEPT", "ALLOW"]:
ofc_action = "ALLOW"
elif filter_dict['action'].upper() in ["DROP", "DENY"]:
@ -125,27 +125,29 @@ class TremaFilterDriverMixin(object):
ofp_wildcards.append("nw_dst:32")
if filter_dict['protocol']:
if filter_dict['protocol'].upper() in "ICMP":
if filter_dict['protocol'].upper() == "ICMP":
body['dl_type'] = "0x800"
body['nw_proto'] = hex(1)
elif filter_dict['protocol'].upper() in "TCP":
elif filter_dict['protocol'].upper() == "TCP":
body['dl_type'] = "0x800"
body['nw_proto'] = hex(6)
elif filter_dict['protocol'].upper() in "UDP":
elif filter_dict['protocol'].upper() == "UDP":
body['dl_type'] = "0x800"
body['nw_proto'] = hex(17)
elif filter_dict['protocol'].upper() in "ARP":
elif filter_dict['protocol'].upper() == "ARP":
body['dl_type'] = "0x806"
ofp_wildcards.append("nw_proto")
else:
body['nw_proto'] = filter_dict['protocol']
if filter_dict['eth_type']:
body['dl_type'] = filter_dict['eth_type']
else:
ofp_wildcards.append("dl_type")
else:
ofp_wildcards.append("nw_proto")
if 'dl_type' in body:
pass
elif filter_dict['eth_type']:
body['dl_type'] = filter_dict['eth_type']
else:
ofp_wildcards.append("dl_type")
ofp_wildcards.append("nw_proto")
if filter_dict['src_port']:
body['tp_src'] = hex(filter_dict['src_port'])
@ -185,7 +187,7 @@ class TremaPortBaseDriver(TremaDriverBase, TremaFilterDriverMixin):
port_path = "%(network)s/ports/%(port)s"
def create_port(self, ofc_network_id, portinfo,
port_id=None):
port_id=None, filters=None):
ofc_port_id = port_id or uuidutils.generate_uuid()
path = self.ports_path % {'network': ofc_network_id}
body = {'id': ofc_port_id,
@ -224,7 +226,8 @@ class TremaPortMACBaseDriver(TremaDriverBase, TremaFilterDriverMixin):
attachments_path = "%(network)s/ports/%(port)s/attachments"
attachment_path = "%(network)s/ports/%(port)s/attachments/%(attachment)s"
def create_port(self, ofc_network_id, portinfo, port_id=None):
def create_port(self, ofc_network_id, portinfo, port_id=None,
filters=None):
#NOTE: This Driver create slices with Port-MAC Based bindings on Trema
# Sliceable. It's REST API requires Port Based binding before you
# define Port-MAC Based binding.
@ -282,7 +285,8 @@ class TremaMACBaseDriver(TremaDriverBase):
def filter_supported(cls):
return False
def create_port(self, ofc_network_id, portinfo, port_id=None):
def create_port(self, ofc_network_id, portinfo, port_id=None,
filters=None):
ofc_port_id = port_id or uuidutils.generate_uuid()
path = self.attachments_path % {'network': ofc_network_id}
body = {'id': ofc_port_id, 'mac': portinfo.mac}

View File

@ -23,6 +23,7 @@ from oslo.config import cfg
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron.api.v2 import base
from neutron.common import constants
from neutron.common import exceptions
from neutron.manager import NeutronManager
from neutron import quota
@ -37,7 +38,35 @@ quota_packet_filter_opts = [
cfg.CONF.register_opts(quota_packet_filter_opts, 'QUOTAS')
def convert_to_int(data):
class PacketFilterNotFound(exceptions.NotFound):
message = _("PacketFilter %(id)s could not be found")
class PacketFilterIpVersionNonSupported(exceptions.BadRequest):
message = _("IP version %(version)s is not supported for %(field)s "
"(%(value)s is specified)")
class PacketFilterInvalidPriority(exceptions.BadRequest):
message = _("Packet Filter priority should be %(min)s-%(max)s (included)")
class PacketFilterUpdateNotSupported(exceptions.BadRequest):
message = _("%(field)s field cannot be updated")
class PacketFilterDuplicatedPriority(exceptions.BadRequest):
message = _("The backend does not support duplicated priority. "
"Priority %(priority)s is in use")
class PacketFilterEtherTypeProtocolMismatch(exceptions.Conflict):
message = _("Ether Type '%(eth_type)s' conflicts with protocol "
"'%(protocol)s'. Update or clear protocol before "
"changing ether type.")
def convert_to_int_dec_and_hex(data):
try:
return int(data, 0)
except (ValueError, TypeError):
@ -49,12 +78,27 @@ def convert_to_int(data):
raise exceptions.InvalidInput(error_message=msg)
def convert_to_int_or_none(data):
if data is None:
return
return convert_to_int_dec_and_hex(data)
PROTO_NAME_ARP = 'arp'
SUPPORTED_PROTOCOLS = [constants.PROTO_NAME_ICMP,
constants.PROTO_NAME_TCP,
constants.PROTO_NAME_UDP,
PROTO_NAME_ARP]
ALLOW_ACTIONS = ['allow', 'accept']
DROP_ACTIONS = ['drop', 'deny']
SUPPORTED_ACTIONS = ALLOW_ACTIONS + DROP_ACTIONS
ALIAS = 'packet-filter'
RESOURCE = 'packet_filter'
COLLECTION = 'packet_filters'
PACKET_FILTER_ACTION_REGEX = '(?i)^(allow|accept|drop|deny)$'
PACKET_FILTER_PROTOCOL_REGEX = (
'(?i)^(icmp|tcp|udp|arp|0x[0-9a-fA-F]+|[0-9]+|)$')
PACKET_FILTER_ACTION_REGEX = '(?i)^(%s)$' % '|'.join(SUPPORTED_ACTIONS)
PACKET_FILTER_PROTOCOL_REGEX = ('(?i)^(%s|0x[0-9a-fA-F]+|[0-9]+|)$' %
'|'.join(SUPPORTED_PROTOCOLS))
PACKET_FILTER_ATTR_PARAMS = {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
@ -79,7 +123,7 @@ PACKET_FILTER_ATTR_PARAMS = {
'validate': {'type:regex': PACKET_FILTER_ACTION_REGEX},
'is_visible': True},
'priority': {'allow_post': True, 'allow_put': True,
'convert_to': convert_to_int,
'convert_to': convert_to_int_dec_and_hex,
'is_visible': True},
'in_port': {'allow_post': True, 'allow_put': False,
'default': attributes.ATTR_NOT_SPECIFIED,
@ -87,35 +131,36 @@ PACKET_FILTER_ATTR_PARAMS = {
'is_visible': True},
'src_mac': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'validate': {'type:mac_address': None},
'validate': {'type:mac_address_or_none': None},
'is_visible': True},
'dst_mac': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'validate': {'type:mac_address': None},
'validate': {'type:mac_address_or_none': None},
'is_visible': True},
'eth_type': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'convert_to': convert_to_int,
'convert_to': convert_to_int_or_none,
'is_visible': True},
'src_cidr': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'validate': {'type:subnet': None},
'validate': {'type:subnet_or_none': None},
'is_visible': True},
'dst_cidr': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'validate': {'type:subnet': None},
'validate': {'type:subnet_or_none': None},
'is_visible': True},
'protocol': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'validate': {'type:regex': PACKET_FILTER_PROTOCOL_REGEX},
'validate': {'type:regex_or_none':
PACKET_FILTER_PROTOCOL_REGEX},
'is_visible': True},
'src_port': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'convert_to': convert_to_int,
'convert_to': convert_to_int_or_none,
'is_visible': True},
'dst_port': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'convert_to': convert_to_int,
'convert_to': convert_to_int_or_none,
'is_visible': True},
}
PACKET_FILTER_ATTR_MAP = {COLLECTION: PACKET_FILTER_ATTR_PARAMS}

View File

@ -104,7 +104,7 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def __init__(self):
super(NECPluginV2, self).__init__()
self.ofc = ofc_manager.OFCManager()
self.ofc = ofc_manager.OFCManager(self)
self.base_binding_dict = self._get_base_binding_dict()
portbindings_base.register_port_dict_function()

View File

@ -74,7 +74,7 @@ class OFCDriverBase(object):
@abstractmethod
def create_port(self, ofc_network_id, portinfo,
port_id=None):
port_id=None, filters=None):
"""Create a new port on specified network at OFC.
:param ofc_network_id: a OFC tenant ID in which a new port belongs.
@ -89,6 +89,8 @@ class OFCDriverBase(object):
If a port is identified in combination with a network or
a tenant, such information should be included in the ID.
:param filters: A list of packet filter associated with the port.
Each element is a tuple (neutron ID, OFC ID)
:returns: ID of the port created at OpenFlow Controller.
:raises: neutron.plugin.nec.common.exceptions.OFCException

View File

@ -39,8 +39,9 @@ class OFCManager(object):
of the switch. An ID named as 'ofc_*' is used to identify resource on OFC.
"""
def __init__(self):
def __init__(self, plugin):
self.driver = drivers.get_driver(config.OFC.driver)(config.OFC)
self.plugin = plugin
def _get_ofc_id(self, context, resource, neutron_id):
return ndb.get_ofc_id_lookup_both(context.session,
@ -107,7 +108,15 @@ class OFCManager(object):
if not portinfo:
raise nexc.PortInfoNotFound(id=port_id)
ofc_port_id = self.driver.create_port(ofc_net_id, portinfo, port_id)
# Associate packet filters
filters = self.plugin.get_packet_filters_for_port(context, port)
if filters is not None:
params = {'filters': filters}
else:
params = {}
ofc_port_id = self.driver.create_port(ofc_net_id, portinfo, port_id,
**params)
self._add_ofc_item(context, "ofc_port", port_id, ofc_port_id)
def exists_ofc_port(self, context, port_id):
@ -132,10 +141,19 @@ class OFCManager(object):
if not portinfo:
raise nexc.PortInfoNotFound(id=in_port_id)
# Collect ports to be associated with the filter
apply_ports = ndb.get_active_ports_on_ofc(
context, filter_dict['network_id'], in_port_id)
ofc_pf_id = self.driver.create_filter(ofc_net_id,
filter_dict, portinfo, filter_id)
filter_dict, portinfo, filter_id,
apply_ports)
self._add_ofc_item(context, "ofc_packet_filter", filter_id, ofc_pf_id)
def update_ofc_packet_filter(self, context, filter_id, filter_dict):
ofc_pf_id = self._get_ofc_id(context, "ofc_packet_filter", filter_id)
ofc_pf_id = self.driver.convert_ofc_filter_id(context, ofc_pf_id)
self.driver.update_filter(ofc_pf_id, filter_dict)
def exists_ofc_packet_filter(self, context, filter_id):
return self._exists_ofc_item(context, "ofc_packet_filter", filter_id)

View File

@ -15,6 +15,7 @@
# under the License.
# @author: Ryota MIBU
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.plugins.nec.common import config
from neutron.plugins.nec.common import exceptions as nexc
@ -46,6 +47,9 @@ class PacketFilterMixin(pf_db.PacketFilterDbMixin):
LOG.debug(_("create_packet_filter() called, packet_filter=%s ."),
packet_filter)
if hasattr(self.ofc.driver, 'validate_filter_create'):
pf = packet_filter['packet_filter']
self.ofc.driver.validate_filter_create(context, pf)
pf = super(PacketFilterMixin, self).create_packet_filter(
context, packet_filter)
@ -60,6 +64,10 @@ class PacketFilterMixin(pf_db.PacketFilterDbMixin):
"id=%(id)s packet_filter=%(packet_filter)s ."),
{'id': id, 'packet_filter': packet_filter})
pf_data = packet_filter['packet_filter']
if hasattr(self.ofc.driver, 'validate_filter_update'):
self.ofc.driver.validate_filter_update(context, pf_data)
# validate ownership
pf_old = self.get_packet_filter(context, id)
@ -67,19 +75,72 @@ class PacketFilterMixin(pf_db.PacketFilterDbMixin):
context, id, packet_filter)
def _packet_filter_changed(old_pf, new_pf):
LOG.debug('old_pf=%(old_pf)s, new_pf=%(new_pf)s',
{'old_pf': old_pf, 'new_pf': new_pf})
# When the status is ERROR, force sync to OFC.
if old_pf['status'] == pf_db.PF_STATUS_ERROR:
LOG.debug('update_packet_filter: Force filter update '
'because the previous status is ERROR.')
return True
for key in new_pf:
if key not in ('id', 'name', 'tenant_id', 'network_id',
'in_port', 'status'):
if old_pf[key] != new_pf[key]:
return True
if key in ('id', 'name', 'tenant_id', 'network_id',
'in_port', 'status'):
continue
if old_pf[key] != new_pf[key]:
return True
return False
if _packet_filter_changed(pf_old, pf):
pf = self.deactivate_packet_filter(context, pf)
pf = self.activate_packet_filter_if_ready(context, pf)
if hasattr(self.ofc.driver, 'update_filter'):
# admin_state is changed
if pf_old['admin_state_up'] != pf['admin_state_up']:
LOG.debug('update_packet_filter: admin_state '
'is changed to %s', pf['admin_state_up'])
if pf['admin_state_up']:
self.activate_packet_filter_if_ready(context, pf)
else:
self.deactivate_packet_filter(context, pf)
elif pf['admin_state_up']:
LOG.debug('update_packet_filter: admin_state is '
'unchanged (True)')
if self.ofc.exists_ofc_packet_filter(context, id):
pf = self._update_packet_filter(context, pf, pf_data)
else:
pf = self.activate_packet_filter_if_ready(context, pf)
else:
LOG.debug('update_packet_filter: admin_state is unchanged '
'(False). No need to update OFC filter.')
else:
pf = self.deactivate_packet_filter(context, pf)
pf = self.activate_packet_filter_if_ready(context, pf)
return pf
def _update_packet_filter(self, context, new_pf, pf_data):
pf_id = new_pf['id']
prev_status = new_pf['status']
try:
# If previous status is ERROR, try to sync all attributes.
pf = new_pf if prev_status == pf_db.PF_STATUS_ERROR else pf_data
self.ofc.update_ofc_packet_filter(context, pf_id, pf)
new_status = pf_db.PF_STATUS_ACTIVE
if new_status != prev_status:
self._update_resource_status(context, "packet_filter",
pf_id, new_status)
new_pf['status'] = new_status
return new_pf
except Exception as exc:
with excutils.save_and_reraise_exception():
if (isinstance(exc, nexc.OFCException) or
isinstance(exc, nexc.OFCConsistencyBroken)):
LOG.error(_("Failed to create packet_filter id=%(id)s on "
"OFC: %(exc)s"),
{'id': pf_id, 'exc': exc})
new_status = pf_db.PF_STATUS_ERROR
if new_status != prev_status:
self._update_resource_status(context, "packet_filter",
pf_id, new_status)
def delete_packet_filter(self, context, id):
"""Deactivate and delete packet_filter."""
LOG.debug(_("delete_packet_filter() called, id=%s ."), id)
@ -129,7 +190,7 @@ class PacketFilterMixin(pf_db.PacketFilterDbMixin):
pf_status = pf_db.PF_STATUS_ACTIVE
except (nexc.OFCException, nexc.OFCMappingNotFound) as exc:
LOG.error(_("Failed to create packet_filter id=%(id)s on "
"OFC: %(exc)s"), {'id': pf_id, 'exc': str(exc)})
"OFC: %(exc)s"), {'id': pf_id, 'exc': exc})
pf_status = pf_db.PF_STATUS_ERROR
if pf_status != current:
@ -155,7 +216,7 @@ class PacketFilterMixin(pf_db.PacketFilterDbMixin):
pf_status = pf_db.PF_STATUS_DOWN
except (nexc.OFCException, nexc.OFCMappingNotFound) as exc:
LOG.error(_("Failed to delete packet_filter id=%(id)s from "
"OFC: %(exc)s"), {'id': pf_id, 'exc': str(exc)})
"OFC: %(exc)s"), {'id': pf_id, 'exc': exc})
pf_status = pf_db.PF_STATUS_ERROR
else:
LOG.debug(_("deactivate_packet_filter(): skip, "
@ -187,3 +248,8 @@ class PacketFilterMixin(pf_db.PacketFilterDbMixin):
pfs = self.get_packet_filters(context, filters=filters)
for pf in pfs:
self.deactivate_packet_filter(context, pf)
def get_packet_filters_for_port(self, context, port):
if self.packet_filter_enabled:
return super(PacketFilterMixin,
self).get_packet_filters_for_port(context, port)

View File

@ -116,7 +116,7 @@ class StubOFCDriver(ofc_driver_base.OFCDriverBase):
LOG.debug(_('delete_network: SUCCEED'))
@call_log.log
def create_port(self, ofc_network_id, info, port_id=None):
def create_port(self, ofc_network_id, info, port_id=None, filters=None):
ofc_id = "ofc-" + port_id[:-4]
if self.autocheck:
if ofc_network_id not in self.ofc_network_dict:
@ -127,6 +127,8 @@ class StubOFCDriver(ofc_driver_base.OFCDriverBase):
% ofc_id)
self.ofc_port_dict[ofc_id] = {'network_id': ofc_network_id,
'port_id': port_id}
if filters:
self.ofc_port_dict[ofc_id]['filters'] = filters
return ofc_id
@call_log.log
@ -144,7 +146,7 @@ class StubOFCDriver(ofc_driver_base.OFCDriverBase):
return True
def create_filter(self, ofc_network_id, filter_dict,
portinfo=None, filter_id=None):
portinfo=None, filter_id=None, apply_ports=None):
return "ofc-" + filter_id[:-4]
def delete_filter(self, ofc_filter_id):

View File

@ -49,7 +49,9 @@ class OFCManagerTestBase(base.BaseTestCase):
driver = "neutron.tests.unit.nec.stub_ofc_driver.StubOFCDriver"
config.CONF.set_override('driver', driver, 'OFC')
self.addCleanup(ndb.clear_db)
self.ofc = ofc_manager.OFCManager()
self.plugin = mock.Mock()
self.plugin.get_packet_filters_for_port.return_value = None
self.ofc = ofc_manager.OFCManager(self.plugin)
# NOTE: enable_autocheck() is a feature of StubOFCDriver
self.ofc.driver.enable_autocheck()
self.ctx = context.get_admin_context()
@ -124,19 +126,34 @@ class OFCManagerTest(OFCManagerTestBase):
get_portinfo.return_value = fake_portinfo
return get_portinfo
def testg_create_ofc_port(self):
"""test create ofc_port."""
def _test_create_ofc_port(self, with_filter=False):
t, n, p, f, none = self.get_random_params()
self.ofc.create_ofc_tenant(self.ctx, t)
self.ofc.create_ofc_network(self.ctx, t, n)
self.assertFalse(ndb.get_ofc_item(self.ctx.session, 'ofc_port', p))
get_portinfo = self._mock_get_portinfo(p)
port = {'tenant_id': t, 'network_id': n}
if with_filter:
_filters = ['filter1', 'filter2']
self.plugin.get_packet_filters_for_port.return_value = _filters
self.ofc.create_ofc_port(self.ctx, p, port)
self.assertTrue(ndb.get_ofc_item(self.ctx.session, 'ofc_port', p))
port = ndb.get_ofc_item(self.ctx.session, 'ofc_port', p)
self.assertEqual(port.ofc_id, "ofc-" + p[:-4])
get_portinfo.assert_called_once_with(mock.ANY, p)
portval = self.ofc.driver.ofc_port_dict[port.ofc_id]
if with_filter:
self.assertEqual(_filters, portval['filters'])
else:
self.assertFalse('filters' in portval)
def testg_create_ofc_port(self):
"""test create ofc_port."""
self._test_create_ofc_port(with_filter=False)
def testg_create_ofc_port_with_filters(self):
"""test create ofc_port."""
self._test_create_ofc_port(with_filter=True)
def testh_exists_ofc_port(self):
"""test exists_ofc_port."""

View File

@ -21,7 +21,7 @@ import webob.exc
from neutron.api.v2 import attributes
from neutron import context
from neutron.plugins.nec.common import exceptions as nexc
from neutron.plugins.nec.extensions import packetfilter
from neutron.plugins.nec.extensions import packetfilter as ext_pf
from neutron.tests.unit.nec import test_nec_plugin
from neutron.tests.unit import test_db_plugin as test_plugin
@ -35,7 +35,7 @@ enable_packet_filter = True
"""
class PacketfilterExtensionManager(packetfilter.Packetfilter):
class PacketfilterExtensionManager(ext_pf.Packetfilter):
@classmethod
def get_resources(cls):
@ -44,17 +44,17 @@ class PacketfilterExtensionManager(packetfilter.Packetfilter):
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
{'packet_filters': packetfilter.PACKET_FILTER_ATTR_MAP})
{'packet_filters': ext_pf.PACKET_FILTER_ATTR_MAP})
return super(PacketfilterExtensionManager, cls).get_resources()
class TestNecPluginPacketFilter(test_nec_plugin.NecPluginV2TestCase):
class TestNecPluginPacketFilterBase(test_nec_plugin.NecPluginV2TestCase):
_nec_ini = NEC_PLUGIN_PF_INI
def setUp(self):
ext_mgr = PacketfilterExtensionManager()
super(TestNecPluginPacketFilter, self).setUp(ext_mgr=ext_mgr)
super(TestNecPluginPacketFilterBase, self).setUp(ext_mgr=ext_mgr)
def _create_packet_filter(self, fmt, net_id, expected_res_status=None,
arg_list=None, **kwargs):
@ -127,6 +127,17 @@ class TestNecPluginPacketFilter(test_nec_plugin.NecPluginV2TestCase):
if do_delete:
self._delete('packet_filters', pf['packet_filter']['id'])
class TestNecPluginPacketFilter(TestNecPluginPacketFilterBase):
def setUp(self):
super(TestNecPluginPacketFilter, self).setUp()
# Remove attributes explicitly from mock object to check
# a case where there are no update_filter and validate_*.
del self.ofc.driver.update_filter
del self.ofc.driver.validate_filter_create
del self.ofc.driver.validate_filter_update
def test_list_packet_filters(self):
self._list('packet_filters')
@ -179,6 +190,34 @@ class TestNecPluginPacketFilter(test_nec_plugin.NecPluginV2TestCase):
self.assertEqual(self.ofc.create_ofc_packet_filter.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_packet_filter.call_count, 1)
def _test_create_pf_with_protocol(self, protocol, expected_eth_type):
with self.packet_filter_on_network(protocol=protocol) as pf:
pf_data = pf['packet_filter']
self.assertEqual(protocol, pf_data['protocol'])
self.assertEqual(expected_eth_type, pf_data['eth_type'])
def test_create_pf_with_protocol_tcp(self):
self._test_create_pf_with_protocol('TCP', 0x800)
def test_create_pf_with_protocol_udp(self):
self._test_create_pf_with_protocol('UDP', 0x800)
def test_create_pf_with_protocol_icmp(self):
self._test_create_pf_with_protocol('ICMP', 0x800)
def test_create_pf_with_protocol_arp(self):
self._test_create_pf_with_protocol('ARP', 0x806)
def test_create_pf_with_inconsistent_protocol_and_eth_type(self):
with self.packet_filter_on_network(protocol='TCP') as pf:
pf_data = pf['packet_filter']
pf_id = pf_data['id']
self.assertEqual('TCP', pf_data['protocol'])
self.assertEqual(0x800, pf_data['eth_type'])
data = {'packet_filter': {'eth_type': 0x806}}
self._update('packet_filters', pf_id, data,
expected_code=409)
def test_create_pf_with_invalid_priority(self):
with self.network() as net:
net_id = net['network']['id']
@ -186,7 +225,6 @@ class TestNecPluginPacketFilter(test_nec_plugin.NecPluginV2TestCase):
self._create_packet_filter(self.fmt, net_id,
webob.exc.HTTPBadRequest.code,
**kwargs)
self.assertFalse(self.ofc.create_ofc_packet_filter.called)
def test_create_pf_with_ofc_creation_failure(self):
@ -200,8 +238,8 @@ class TestNecPluginPacketFilter(test_nec_plugin.NecPluginV2TestCase):
self.ofc.set_raise_exc('create_ofc_packet_filter', None)
# Retry deactivate packet_filter.
data = {'packet_filter': {'priority': 1000}}
# Retry activate packet_filter (even if there is no change).
data = {'packet_filter': {}}
self._update('packet_filters', pf_id, data)
pf_ref = self._show('packet_filters', pf_id)
@ -243,7 +281,36 @@ class TestNecPluginPacketFilter(test_nec_plugin.NecPluginV2TestCase):
# convert string to int.
kwargs.update({'priority': 102, 'eth_type': 2048,
'src_port': 35001, 'dst_port': 22})
'src_port': 35001, 'dst_port': 22,
'in_port': None})
self.assertEqual(pf_id, pf_ref['packet_filter']['id'])
for key in kwargs:
self.assertEqual(kwargs[key], pf_ref['packet_filter'][key])
def test_show_pf_on_network_with_wildcards(self):
kwargs = {
'name': 'test-pf-net',
'admin_state_up': False,
'action': 'DENY',
'priority': '102',
}
with self.packet_filter_on_network(**kwargs) as pf:
pf_id = pf['packet_filter']['id']
pf_ref = self._show('packet_filters', pf_id)
# convert string to int.
kwargs.update({'priority': 102,
'in_port': None,
'src_mac': None,
'dst_mac': None,
'eth_type': None,
'src_cidr': None,
'dst_cidr': None,
'protocol': None,
'src_port': None,
'dst_port': None})
self.assertEqual(pf_id, pf_ref['packet_filter']['id'])
for key in kwargs:
@ -270,9 +337,12 @@ class TestNecPluginPacketFilter(test_nec_plugin.NecPluginV2TestCase):
# convert string to int.
kwargs.update({'priority': 103, 'eth_type': 2048,
'src_port': u'', 'dst_port': 80})
'dst_port': 80,
# wildcard field is None in a response.
'src_port': None})
self.assertEqual(pf_id, pf_ref['packet_filter']['id'])
self.assertTrue(pf_ref['packet_filter']['in_port'])
for key in kwargs:
self.assertEqual(kwargs[key], pf_ref['packet_filter'][key])
@ -497,3 +567,132 @@ class TestNecPluginPacketFilter(test_nec_plugin.NecPluginV2TestCase):
self._update('ports', in_port_id, data)
self.assertEqual(self.ofc.create_ofc_packet_filter.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_packet_filter.call_count, 0)
class TestNecPluginPacketFilterWithValidate(TestNecPluginPacketFilterBase):
def setUp(self):
super(TestNecPluginPacketFilterWithValidate, self).setUp()
# Remove attributes explicitly from mock object to check
# a case where there are no update_filter.
del self.ofc.driver.update_filter
self.validate_create = self.ofc.driver.validate_filter_create
self.validate_update = self.ofc.driver.validate_filter_update
def test_create_pf_on_network(self):
with self.packet_filter_on_network() as pf:
pf_id = pf['packet_filter']['id']
self.assertEqual(pf['packet_filter']['status'], 'ACTIVE')
ctx = mock.ANY
pf_dict = mock.ANY
expected = [
mock.call.driver.validate_filter_create(ctx, pf_dict),
mock.call.exists_ofc_packet_filter(ctx, pf_id),
mock.call.create_ofc_packet_filter(ctx, pf_id, pf_dict),
mock.call.exists_ofc_packet_filter(ctx, pf_id),
mock.call.delete_ofc_packet_filter(ctx, pf_id),
]
self.ofc.assert_has_calls(expected)
self.assertEqual(self.ofc.create_ofc_packet_filter.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_packet_filter.call_count, 1)
def test_update_pf_on_network(self):
ctx = mock.ANY
pf_dict = mock.ANY
with self.packet_filter_on_network(admin_state_up=False) as pf:
pf_id = pf['packet_filter']['id']
self.assertFalse(self.ofc.create_ofc_packet_filter.called)
data = {'packet_filter': {'admin_state_up': True}}
self._update('packet_filters', pf_id, data)
self.ofc.create_ofc_packet_filter.assert_called_once_with(
ctx, pf_id, pf_dict)
self.ofc.driver.validate_filter_update.assert_called_once_with(
ctx, data['packet_filter'])
self.assertFalse(self.ofc.delete_ofc_packet_filter.called)
data = {'packet_filter': {'admin_state_up': False}}
self._update('packet_filters', pf_id, data)
self.ofc.delete_ofc_packet_filter.assert_called_once_with(
ctx, pf_id)
self.assertEqual(
2, self.ofc.driver.validate_filter_update.call_count)
def test_create_pf_on_network_with_validation_error(self):
self.validate_create.side_effect = ext_pf.PacketFilterInvalidPriority(
min=1, max=65535)
with self.network() as net:
net_id = net['network']['id']
e = self.assertRaises(webob.exc.HTTPClientError,
self._make_packet_filter,
self.fmt, net_id, expected_res_status=400)
self.assertEqual(400, e.status_int)
def test_update_pf_on_network_with_validation_error(self):
self.validate_update.side_effect = (
ext_pf.PacketFilterUpdateNotSupported(field='priority'))
with self.packet_filter_on_network() as pf:
pf_id = pf['packet_filter']['id']
pf_ref = self._show('packet_filters', pf_id)
self.assertEqual(pf_ref['packet_filter']['status'], 'ACTIVE')
data = {'packet_filter': {'priority': 1000}}
self._update('packet_filters', pf_id, data,
expected_code=400)
class TestNecPluginPacketFilterWithFilterUpdate(TestNecPluginPacketFilterBase):
def setUp(self):
super(TestNecPluginPacketFilterWithFilterUpdate, self).setUp()
# Remove attributes explicitly from mock object to check
# a case where there are no update_filter and validate_*.
del self.ofc.driver.validate_filter_create
del self.ofc.driver.validate_filter_update
def test_update_pf_toggle_admin_state(self):
ctx = mock.ANY
pf_dict = mock.ANY
with self.packet_filter_on_network(admin_state_up=False) as pf:
pf_id = pf['packet_filter']['id']
self.assertFalse(self.ofc.create_ofc_packet_filter.called)
data = {'packet_filter': {'admin_state_up': True}}
self._update('packet_filters', pf_id, data)
self.ofc.create_ofc_packet_filter.assert_called_once_with(
ctx, pf_id, pf_dict)
self.assertFalse(self.ofc.delete_ofc_packet_filter.called)
data = {'packet_filter': {'admin_state_up': False}}
self._update('packet_filters', pf_id, data)
self.ofc.delete_ofc_packet_filter.assert_called_once_with(
ctx, pf_id)
def test_update_pf_change_field(self):
ctx = mock.ANY
with self.packet_filter_on_network(admin_state_up=True) as pf:
pf_id = pf['packet_filter']['id']
self.assertTrue(self.ofc.create_ofc_packet_filter.called)
data = {'packet_filter': {'src_mac': '12:34:56:78:9a:bc'}}
self._update('packet_filters', pf_id, data)
self.ofc.update_ofc_packet_filter.assert_called_once_with(
ctx, pf_id, data['packet_filter'])
self.assertEqual(1, self.ofc.update_ofc_packet_filter.call_count)
self.assertFalse(self.ofc.delete_ofc_packet_filter.called)
data = {'packet_filter': {'admin_state_up': False}}
self._update('packet_filters', pf_id, data)
self.ofc.delete_ofc_packet_filter.assert_called_once_with(
ctx, pf_id)
data = {'packet_filter': {'src_mac': '11:22:33:44:55:66'}}
self._update('packet_filters', pf_id, data)
self.assertEqual(1, self.ofc.update_ofc_packet_filter.call_count)
data = {'packet_filter': {'admin_state_up': True}}
self._update('packet_filters', pf_id, data)
data = {'packet_filter': {'src_mac': '66:55:44:33:22:11'}}
self._update('packet_filters', pf_id, data)
self.assertEqual(2, self.ofc.update_ofc_packet_filter.call_count)

View File

@ -17,15 +17,19 @@
import random
import string
import uuid
import mock
import netaddr
from neutron.common import constants
from neutron.openstack.common import uuidutils
from neutron.plugins.nec.common import ofc_client as ofc
from neutron.plugins.nec.db import api as ndb
from neutron.plugins.nec.db import models as nmodels
from neutron.plugins.nec import drivers
from neutron.plugins.nec.drivers import pfc
from neutron.plugins.nec.extensions import packetfilter as ext_pf
from neutron.tests import base
@ -46,6 +50,7 @@ def _ofc(id):
class PFCDriverTestBase(base.BaseTestCase):
driver = 'neutron.plugins.nec.drivers.pfc.PFCDriverBase'
filter_supported = False
def setUp(self):
super(PFCDriverTestBase, self).setUp()
@ -129,7 +134,7 @@ class PFCDriverTestBase(base.BaseTestCase):
self.driver.delete_network(net_path)
self.do_request.assert_called_once_with("DELETE", net_path)
def testg_create_port(self):
def _test_create_port(self, call_filters_arg=None, send_filters_arg=None):
t, n, p = self.get_ofc_item_random_params()
net_path = "/tenants/%s/networks/%s" % (_ofc(t), _ofc(n))
@ -139,13 +144,27 @@ class PFCDriverTestBase(base.BaseTestCase):
body = {'datapath_id': p.datapath_id,
'port': str(p.port_no),
'vid': str(p.vlan_id)}
if send_filters_arg is not None:
body['filters'] = send_filters_arg
port = {'id': _ofc(p.id)}
self.do_request.return_value = port
ret = self.driver.create_port(net_path, p, p.id)
if call_filters_arg is not None:
ret = self.driver.create_port(net_path, p, p.id, call_filters_arg)
else:
ret = self.driver.create_port(net_path, p, p.id)
self.do_request.assert_called_once_with("POST", post_path, body=body)
self.assertEqual(ret, port_path)
def testg_create_port(self):
self._test_create_port()
def test_create_port_with_filters_argument(self):
# If no filter support, 'filters' argument is passed to OFC.
# Note that it will be overridden in a test class with filter support.
self._test_create_port(call_filters_arg=['dummy'],
send_filters_arg=None)
def testh_delete_port(self):
t, n, p = self.get_ofc_item_random_params()
@ -156,11 +175,32 @@ class PFCDriverTestBase(base.BaseTestCase):
self.do_request.assert_called_once_with("DELETE", port_path)
def test_filter_supported(self):
self.assertFalse(self.driver.filter_supported())
self.assertEqual(self.filter_supported, self.driver.filter_supported())
class PFCDriverBaseTest(PFCDriverTestBase):
pass
def test_extract_ofc_network_id(self):
network_id = '/tenants/tenant-a/networks/network-a'
self.assertEqual('network-a',
self.driver._extract_ofc_network_id(network_id))
def test_extract_ofc_network_id_failure(self):
network_id = '/tenants/tenant-a/networks/network-a/dummy'
self.assertRaises(pfc.InvalidOFCIdFormat,
self.driver._extract_ofc_network_id, network_id)
def test_extract_ofc_port_id(self):
port_id = '/tenants/tenant-a/networks/network-a/ports/port-a'
self.assertEqual({'tenant': 'tenant-a',
'network': 'network-a',
'port': 'port-a'},
self.driver._extract_ofc_port_id(port_id))
def test_extract_ofc_port_id_failure(self):
port_id = '/tenants/tenant-a/dummy/network-a/ports/port-a'
self.assertRaises(pfc.InvalidOFCIdFormat,
self.driver._extract_ofc_port_id, port_id)
class PFCV3DriverTest(PFCDriverTestBase):
@ -327,6 +367,302 @@ class PFCV5DriverTest(PFCDriverTestBase):
self.assertEqual(data['routes'], expected)
class PFCFilterDriverTestMixin:
def _test_create_filter(self, filter_dict=None, filter_post=None,
apply_ports=None):
t, n, p = self.get_ofc_item_random_params()
filter_id = uuidutils.generate_uuid()
f = {'priority': 123, 'action': "ACCEPT"}
if filter_dict:
f.update(filter_dict)
net_path = "/networks/%s" % n
body = {'action': 'pass', 'priority': 123}
if filter_post:
body.update(filter_post)
self.do_request.return_value = {'id': filter_id}
if apply_ports is not None:
ret = self.driver.create_filter(net_path, f, p,
apply_ports=apply_ports)
else:
ret = self.driver.create_filter(net_path, f, p)
self.do_request.assert_called_once_with("POST", "/filters",
body=body)
self.assertEqual(ret, '/filters/%s' % filter_id)
def test_create_filter_accept(self):
self._test_create_filter(filter_dict={'action': 'ACCEPT'})
def test_create_filter_allow(self):
self._test_create_filter(filter_dict={'action': 'ALLOW'})
def test_create_filter_deny(self):
self._test_create_filter(filter_dict={'action': 'DENY'},
filter_post={'action': 'drop'})
def test_create_filter_drop(self):
self._test_create_filter(filter_dict={'action': 'DROP'},
filter_post={'action': 'drop'})
def test_create_filter_empty_field_not_post(self):
filter_dict = {'src_mac': '', 'src_cidr': '', 'src_port': 0,
'dst_mac': '', 'dst_cidr': '', 'dst_port': 0,
'protocol': '', 'eth_type': 0}
filter_post = {}
self._test_create_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_create_filter_none_field_not_post(self):
filter_dict = {'src_mac': None, 'src_cidr': None, 'src_port': None,
'dst_mac': None, 'dst_cidr': None, 'dst_port': None,
'protocol': None, 'eth_type': None}
filter_post = {}
self._test_create_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_create_filter_all_fields(self):
filter_dict = {'src_mac': '11:22:33:44:55:66',
'dst_mac': '77:88:99:aa:bb:cc',
'src_cidr': '192.168.3.0/24',
'dst_cidr': '10.11.240.0/20',
'src_port': 12345,
'dst_port': 23456,
'protocol': '0x10',
'eth_type': 0x800}
filter_post = filter_dict.copy()
filter_post['protocol'] = 16
filter_post['eth_type'] = '0x800'
self._test_create_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_create_filter_cidr_ip_addr_32(self):
filter_dict = {'src_cidr': '192.168.3.1',
'dst_cidr': '10.11.240.2'}
filter_post = {'src_cidr': '192.168.3.1/32',
'dst_cidr': '10.11.240.2/32'}
self._test_create_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_create_filter_proto_tcp(self):
filter_dict = {'protocol': 'TCP'}
filter_post = {'protocol': constants.PROTO_NUM_TCP}
self._test_create_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_create_filter_proto_udp(self):
filter_dict = {'protocol': 'UDP'}
filter_post = {'protocol': constants.PROTO_NUM_UDP}
self._test_create_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_create_filter_proto_icmp(self):
filter_dict = {'protocol': 'ICMP'}
filter_post = {'protocol': constants.PROTO_NUM_ICMP}
self._test_create_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_create_filter_proto_arp_not_proto_post(self):
filter_dict = {'protocol': 'ARP'}
filter_post = {}
self._test_create_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_create_filter_apply_ports(self):
apply_ports = [
('p1', '/tenants/tenant-1/networks/network-1/ports/port-1'),
('p2', '/tenants/tenant-2/networks/network-2/ports/port-2')]
filter_post = {'apply_ports': [
{'tenant': 'tenant-1', 'network': 'network-1', 'port': 'port-1'},
{'tenant': 'tenant-2', 'network': 'network-2', 'port': 'port-2'}
]}
self._test_create_filter(filter_dict={}, apply_ports=apply_ports,
filter_post=filter_post)
def _test_update_filter(self, filter_dict=None, filter_post=None):
filter_id = uuidutils.generate_uuid()
ofc_filter_id = '/filters/%s' % filter_id
self.driver.update_filter(ofc_filter_id, filter_dict)
self.do_request.assert_called_once_with("PUT", ofc_filter_id,
body=filter_post)
def test_update_filter_empty_fields(self):
filter_dict = {'src_mac': '', 'src_cidr': '', 'src_port': 0,
'dst_mac': '', 'dst_cidr': '', 'dst_port': 0,
'protocol': '', 'eth_type': 0}
filter_post = {'src_mac': '', 'src_cidr': '', 'src_port': '',
'dst_mac': '', 'dst_cidr': '', 'dst_port': '',
'protocol': '', 'eth_type': ''}
self._test_update_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_update_filter_none_fields(self):
filter_dict = {'src_mac': None, 'src_cidr': None, 'src_port': None,
'dst_mac': None, 'dst_cidr': None, 'dst_port': None,
'protocol': None, 'eth_type': None}
filter_post = {'src_mac': '', 'src_cidr': '', 'src_port': '',
'dst_mac': '', 'dst_cidr': '', 'dst_port': '',
'protocol': '', 'eth_type': ''}
self._test_update_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_update_filter_all_fields(self):
filter_dict = {'src_mac': '11:22:33:44:55:66',
'dst_mac': '77:88:99:aa:bb:cc',
'src_cidr': '192.168.3.0/24',
'dst_cidr': '10.11.240.0/20',
'src_port': 12345,
'dst_port': 23456,
'protocol': '0x10',
'eth_type': 0x800}
filter_post = filter_dict.copy()
filter_post['protocol'] = 16
filter_post['eth_type'] = '0x800'
self._test_update_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_update_filter_cidr_ip_addr_32(self):
filter_dict = {'src_cidr': '192.168.3.1',
'dst_cidr': '10.11.240.2'}
filter_post = {'src_cidr': '192.168.3.1/32',
'dst_cidr': '10.11.240.2/32'}
self._test_update_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_update_filter_proto_tcp(self):
filter_dict = {'protocol': 'TCP'}
filter_post = {'protocol': constants.PROTO_NUM_TCP}
self._test_update_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_update_filter_proto_udp(self):
filter_dict = {'protocol': 'UDP'}
filter_post = {'protocol': constants.PROTO_NUM_UDP}
self._test_update_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_update_filter_proto_icmp(self):
filter_dict = {'protocol': 'ICMP'}
filter_post = {'protocol': constants.PROTO_NUM_ICMP}
self._test_update_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_update_filter_proto_arp_post_empty(self):
filter_dict = {'protocol': 'ARP'}
filter_post = {'protocol': ''}
self._test_update_filter(filter_dict=filter_dict,
filter_post=filter_post)
def test_delete_filter(self):
t, n, p = self.get_ofc_item_random_params()
f_path = "/filters/%s" % uuidutils.generate_uuid()
self.driver.delete_filter(f_path)
self.do_request.assert_called_once_with("DELETE", f_path)
def _test_validate_filter_duplicate_priority(self, method, found_dup):
with mock.patch('neutron.manager.NeutronManager'
'.get_plugin') as get_plugin:
plugin = get_plugin.return_value
if found_dup:
plugin.get_packet_filters.return_value = ['found']
else:
plugin.get_packet_filters.return_value = []
network_id = str(uuid.uuid4())
filter_dict = {'network_id': network_id,
'priority': 12}
if found_dup:
self.assertRaises(ext_pf.PacketFilterDuplicatedPriority,
method, 'context', filter_dict)
else:
self.assertIsNone(method('context', filter_dict))
plugin.get_packet_filters.assert_called_once_with(
'context',
filters={'network_id': [network_id],
'priority': [12]},
fields=['id'])
def test_validate_filter_create_no_duplicate_priority(self):
self._test_validate_filter_duplicate_priority(
self.driver.validate_filter_create,
found_dup=False)
def test_validate_filter_create_duplicate_priority(self):
self._test_validate_filter_duplicate_priority(
self.driver.validate_filter_create,
found_dup=True)
def test_validate_filter_update_action_raises_error(self):
filter_dict = {'action': 'ALLOW'}
self.assertRaises(ext_pf.PacketFilterUpdateNotSupported,
self.driver.validate_filter_update,
'context', filter_dict)
def test_validate_filter_update_priority_raises_error(self):
filter_dict = {'priority': '13'}
self.assertRaises(ext_pf.PacketFilterUpdateNotSupported,
self.driver.validate_filter_update,
'context', filter_dict)
def _test_validate_filter_ipv6_not_supported(self, field, create=True):
if create:
filter_dict = {'network_id': 'net1', 'priority': 12}
method = self.driver.validate_filter_create
else:
filter_dict = {}
method = self.driver.validate_filter_update
filter_dict[field] = 'fe80::1'
self.assertRaises(ext_pf.PacketFilterIpVersionNonSupported,
method, 'context', filter_dict)
filter_dict[field] = '10.56.3.3'
self.assertIsNone(method('context', filter_dict))
def test_validate_filter_create_ipv6_not_supported(self):
with mock.patch('neutron.manager.NeutronManager'
'.get_plugin') as get_plugin:
plugin = get_plugin.return_value
plugin.get_packet_filters.return_value = []
self._test_validate_filter_ipv6_not_supported(
'src_cidr', create=True)
self._test_validate_filter_ipv6_not_supported(
'dst_cidr', create=True)
def test_validate_filter_update_ipv6_not_supported(self):
self._test_validate_filter_ipv6_not_supported('src_cidr', create=False)
self._test_validate_filter_ipv6_not_supported('dst_cidr', create=False)
def _test_validate_filter_priority_range_one(self, method, priority, ok):
filter_dict = {'priority': priority, 'network_id': 'net1'}
if ok:
self.assertIsNone(method('context', filter_dict))
else:
self.assertRaises(ext_pf.PacketFilterInvalidPriority,
method, 'context', filter_dict)
def test_validate_filter_create_priority_range(self):
with mock.patch('neutron.manager.NeutronManager'
'.get_plugin') as get_plugin:
plugin = get_plugin.return_value
plugin.get_packet_filters.return_value = []
method = self.driver.validate_filter_create
self._test_validate_filter_priority_range_one(method, 0, False)
self._test_validate_filter_priority_range_one(method, 1, True)
self._test_validate_filter_priority_range_one(method, 32766, True)
self._test_validate_filter_priority_range_one(method, 32767, False)
class PFCV51DriverTest(PFCFilterDriverTestMixin, PFCV5DriverTest):
driver = 'pfc_v51'
filter_supported = True
def test_create_port_with_filters_argument(self):
self._test_create_port(
call_filters_arg=[('neutron-id-1', '/filters/filter-1'),
('neutron-id-2', '/filters/filter-2')],
send_filters_arg=['filter-1', 'filter-2'])
class PFCDriverStringTest(base.BaseTestCase):
driver = 'neutron.plugins.nec.drivers.pfc.PFCDriverBase'

View File

@ -241,17 +241,30 @@ class TremaFilterDriverTest(TremaDriverTestBase):
ofp_wildcards = ["%s:32" % _f if _f in ['nw_src', 'nw_dst'] else _f
for _f in all_wildcards_ofp if _f not in body]
body['ofp_wildcards'] = ','.join(ofp_wildcards)
body['ofp_wildcards'] = set(ofp_wildcards)
non_ofp_wildcards = [_f for _f in all_wildcards_non_ofp
if _f not in body]
if non_ofp_wildcards:
body['wildcards'] = ','.join(non_ofp_wildcards)
body['wildcards'] = set(non_ofp_wildcards)
ret = self.driver.create_filter(net_path, f, p, f['id'])
self.do_request.assert_called_once_with("POST", "/filters", body=body)
# The content of 'body' is checked below.
self.do_request.assert_called_once_with("POST", "/filters",
body=mock.ANY)
self.assertEqual(ret, '/filters/%s' % f['id'])
# ofp_wildcards and wildcards in body are comma-separated
# string but the order of elements are not considered,
# so we check these fields as set.
actual_body = self.do_request.call_args[1]['body']
if 'ofp_wildcards' in actual_body:
ofp_wildcards = actual_body['ofp_wildcards'].split(',')
actual_body['ofp_wildcards'] = set(ofp_wildcards)
if 'wildcards' in actual_body:
actual_body['wildcards'] = set(actual_body['wildcards'].split(','))
self.assertEqual(body, actual_body)
def test_create_filter_accept(self):
self._test_create_filter(filter_dict={'action': 'ACCEPT'})

View File

@ -169,15 +169,29 @@ class TestAttributes(base.BaseTestCase):
self.assertEqual(msg,
"'10' is too large - must be no larger than '9'")
def test_validate_mac_address(self):
def _test_validate_mac_address(self, validator, allow_none=False):
mac_addr = "ff:16:3e:4f:00:00"
msg = attributes._validate_mac_address(mac_addr)
msg = validator(mac_addr)
self.assertIsNone(msg)
mac_addr = "ffa:16:3e:4f:00:00"
msg = attributes._validate_mac_address(mac_addr)
msg = validator(mac_addr)
self.assertEqual(msg, "'%s' is not a valid MAC address" % mac_addr)
mac_addr = None
msg = validator(mac_addr)
if allow_none:
self.assertIsNone(msg)
else:
self.assertEqual(msg, "'None' is not a valid MAC address")
def test_validate_mac_address(self):
self._test_validate_mac_address(attributes._validate_mac_address)
def test_validate_mac_address_or_none(self):
self._test_validate_mac_address(
attributes._validate_mac_address_or_none, allow_none=True)
def test_validate_ip_address(self):
ip_addr = '1.1.1.1'
msg = attributes._validate_ip_address(ip_addr)
@ -396,59 +410,50 @@ class TestAttributes(base.BaseTestCase):
attributes.MAC_PATTERN)
self.assertIsNotNone(msg)
def test_validate_subnet(self):
def _test_validate_subnet(self, validator, allow_none=False):
# Valid - IPv4
cidr = "10.0.2.0/24"
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - IPv6 without final octets
cidr = "fe80::/24"
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - IPv6 with final octets
cidr = "fe80::/24"
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - uncompressed ipv6 address
cidr = "fe80:0:0:0:0:0:0:0/128"
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - ipv6 address with multiple consecutive zero
cidr = "2001:0db8:0:0:1::1/128"
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - ipv6 address with multiple consecutive zero
cidr = "2001:0db8::1:0:0:1/128"
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - ipv6 address with multiple consecutive zero
cidr = "2001::0:1:0:0:1100/120"
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - abbreviated ipv4 address
cidr = "10/24"
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
self.assertIsNone(msg)
# Invalid - IPv4 missing mask
cidr = "10.0.2.0"
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "10.0.2.0/32"}
@ -456,8 +461,7 @@ class TestAttributes(base.BaseTestCase):
# Invalid - IPv4 with final octets
cidr = "192.168.1.1/24"
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "192.168.1.0/24"}
@ -465,8 +469,7 @@ class TestAttributes(base.BaseTestCase):
# Invalid - IPv6 without final octets, missing mask
cidr = "fe80::"
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "fe80::/128"}
@ -474,8 +477,7 @@ class TestAttributes(base.BaseTestCase):
# Invalid - IPv6 with final octets, missing mask
cidr = "fe80::0"
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "fe80::/128"}
@ -483,30 +485,54 @@ class TestAttributes(base.BaseTestCase):
# Invalid - Address format error
cidr = 'invalid'
msg = attributes._validate_subnet(cidr,
None)
msg = validator(cidr, None)
error = "'%s' is not a valid IP subnet" % cidr
self.assertEqual(msg, error)
def test_validate_regex(self):
cidr = None
msg = validator(cidr, None)
if allow_none:
self.assertIsNone(msg)
else:
error = "'%s' is not a valid IP subnet" % cidr
self.assertEqual(msg, error)
def test_validate_subnet(self):
self._test_validate_subnet(attributes._validate_subnet)
def test_validate_subnet_or_none(self):
self._test_validate_subnet(attributes._validate_subnet_or_none,
allow_none=True)
def _test_validate_regex(self, validator, allow_none=False):
pattern = '[hc]at'
data = None
msg = attributes._validate_regex(data, pattern)
self.assertEqual(msg, "'%s' is not a valid input" % data)
msg = validator(data, pattern)
if allow_none:
self.assertIsNone(msg)
else:
self.assertEqual(msg, "'None' is not a valid input")
data = 'bat'
msg = attributes._validate_regex(data, pattern)
msg = validator(data, pattern)
self.assertEqual(msg, "'%s' is not a valid input" % data)
data = 'hat'
msg = attributes._validate_regex(data, pattern)
msg = validator(data, pattern)
self.assertIsNone(msg)
data = 'cat'
msg = attributes._validate_regex(data, pattern)
msg = validator(data, pattern)
self.assertIsNone(msg)
def test_validate_regex(self):
self._test_validate_regex(attributes._validate_regex)
def test_validate_regex_or_none(self):
self._test_validate_regex(attributes._validate_regex_or_none,
allow_none=True)
def test_validate_uuid(self):
msg = attributes._validate_uuid('garbage')
self.assertEqual(msg, "'garbage' is not a valid UUID")