Iptables security group implementation for LinuxBridge

Implements bp quantum-security-groups-iptables-lb
- Added firewall driver
- Added iptables based firewall driver
- Implemented security groups for rpc support mixin classes

Change-Id: I974d2f1cae75ce4a55c2b5d820a0b42ff5661309
This commit is contained in:
Nachi Ueno 2012-11-07 11:00:53 -08:00 committed by Aaron Rosen
parent cb5f284f85
commit 185daea9bb
18 changed files with 3219 additions and 65 deletions

105
quantum/agent/firewall.py Normal file
View File

@ -0,0 +1,105 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import contextlib
class FirewallDriver(object):
""" Firewall Driver base class.
Defines methods that any driver providing security groups
and provider firewall functionality should implement.
Note port attribute should have information of security group ids and
security group rules.
the dict of port should have
device : interface name
fixed_ips: ips of the device
mac_address: mac_address of the device
security_groups: [sgid, sgid]
security_group_rules : [ rule, rule ]
the rule must contain ethertype and direction
the rule may contain security_group_id,
protocol, port_min, port_max
source_ip_prefix, source_port_min,
source_port_max, dest_ip_prefix,
Note: source_group_ip in REST API should be converted by this rule
if direction is ingress:
source_group_ip will be a soruce_prefix_ip
if direction is egress:
source_group_ip will be a dest_prefix_ip
Note: source_group_id in REST API should be converted by this rule
if direction is ingress:
source_group_id will be a list of soruce_prefix_ip
if direction is egress:
source_group_id will be a list of dest_prefix_ip
"""
__metaclass__ = abc.ABCMeta
def prepare_port_filter(self, port):
"""Prepare filters for the port.
This method should be called before the port is created.
"""
raise NotImplementedError()
def apply_port_filter(self, port):
"""Apply port filter.
Once this method returns, the port should be firewalled
appropriately. This method should as far as possible be a
no-op. It's vastly preferred to get everything set up in
prepare_port_filter.
"""
raise NotImplementedError()
def update_port_filter(self, port):
"""Refresh security group rules from data store
Gets called when an port gets added to or removed from
the security group the port is a member of or if the
group gains or looses a rule.
"""
raise NotImplementedError()
def remove_port_filter(self, port):
"""Stop filtering port"""
raise NotImplementedError()
def filter_defer_apply_on(self):
"""Defer application of filtering rule"""
pass
def filter_defer_apply_off(self):
"""Turn off deferral of rules and apply the rules now"""
pass
@property
def ports(self):
""" returns filterd ports"""
pass
@contextlib.contextmanager
def defer_apply(self):
"""defer apply context"""
self.filter_defer_apply_on()
try:
yield
finally:
self.filter_defer_apply_off()

View File

@ -0,0 +1,280 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from quantum.agent import firewall
from quantum.common import constants
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
SG_CHAIN = 'sg-chain'
INGRESS_DIRECTION = 'ingress'
EGRESS_DIRECTION = 'egress'
CHAIN_NAME_PREFIX = {INGRESS_DIRECTION: 'i',
EGRESS_DIRECTION: 'o'}
IPTABLES_DIRECTION = {INGRESS_DIRECTION: 'physdev-out',
EGRESS_DIRECTION: 'physdev-in'}
class IptablesFirewallDriver(firewall.FirewallDriver):
"""Driver which enforces security groups through iptables rules."""
def __init__(self, iptables_manager):
self.iptables = iptables_manager
# list of port which has security group
self.filtered_ports = {}
self._add_fallback_chain_v4v6()
@property
def ports(self):
return self.filtered_ports
def prepare_port_filter(self, port):
LOG.debug(_("Preparing device (%s) filter"), port['device'])
self._remove_chains()
self.filtered_ports[port['device']] = port
# each security group has it own chains
self._setup_chains()
self.iptables.apply()
def update_port_filter(self, port):
LOG.debug(_("Updating device (%s) filter"), port['device'])
if not port['device'] in self.filtered_ports:
LOG.info(_('Attempted to update port filter which is not '
'filtered %s') % port['device'])
return
self._remove_chains()
self.filtered_ports[port['device']] = port
self._setup_chains()
self.iptables.apply()
def remove_port_filter(self, port):
LOG.debug(_("Removing device (%s) filter"), port['device'])
if not self.filtered_ports.get(port['device']):
LOG.info(_('Attempted to remove port filter which is not '
'filtered %r'), port)
return
self._remove_chains()
self.filtered_ports.pop(port['device'], None)
self._setup_chains()
self.iptables.apply()
def _setup_chains(self):
"""Setup ingress and egress chain for a port. """
self._add_chain_by_name_v4v6(SG_CHAIN)
for port in self.filtered_ports.values():
self._setup_chain(port, INGRESS_DIRECTION)
self._setup_chain(port, EGRESS_DIRECTION)
self.iptables.ipv4['filter'].add_rule(SG_CHAIN, '-j ACCEPT')
self.iptables.ipv6['filter'].add_rule(SG_CHAIN, '-j ACCEPT')
def _remove_chains(self):
"""Remove ingress and egress chain for a port"""
for port in self.filtered_ports.values():
self._remove_chain(port, INGRESS_DIRECTION)
self._remove_chain(port, EGRESS_DIRECTION)
self._remove_chain_by_name_v4v6(SG_CHAIN)
def _setup_chain(self, port, DIRECTION):
self._add_chain(port, DIRECTION)
self._add_rule_by_security_group(port, DIRECTION)
def _remove_chain(self, port, DIRECTION):
chain_name = self._port_chain_name(port, DIRECTION)
self._remove_chain_by_name_v4v6(chain_name)
def _add_fallback_chain_v4v6(self):
self.iptables.ipv4['filter'].add_chain('sg-fallback')
self.iptables.ipv4['filter'].add_rule('sg-fallback', '-j DROP')
self.iptables.ipv6['filter'].add_chain('sg-fallback')
self.iptables.ipv6['filter'].add_rule('sg-fallback', '-j DROP')
def _add_chain_by_name_v4v6(self, chain_name):
self.iptables.ipv6['filter'].add_chain(chain_name)
self.iptables.ipv4['filter'].add_chain(chain_name)
def _remove_chain_by_name_v4v6(self, chain_name):
self.iptables.ipv4['filter'].ensure_remove_chain(chain_name)
self.iptables.ipv6['filter'].ensure_remove_chain(chain_name)
def _add_rule_to_chain_v4v6(self, chain_name, ipv4_rules, ipv6_rules):
for rule in ipv4_rules:
self.iptables.ipv4['filter'].add_rule(chain_name, rule)
for rule in ipv6_rules:
self.iptables.ipv6['filter'].add_rule(chain_name, rule)
def _add_chain(self, port, direction):
chain_name = self._port_chain_name(port, direction)
self._add_chain_by_name_v4v6(chain_name)
# Note(nati) jump to the security group chain (SG_CHAIN)
# This is needed because the packet may much two rule in port
# if the two port is in the same host
# We accept the packet at the end of SG_CHAIN.
# jump to the security group chain
device = port['device']
jump_rule = ['-m physdev --physdev-is-bridged --%s '
'%s -j $%s' % (IPTABLES_DIRECTION[direction],
device,
SG_CHAIN)]
self._add_rule_to_chain_v4v6('FORWARD', jump_rule, jump_rule)
# jump to the chain based on the device
jump_rule = ['-m physdev --physdev-is-bridged --%s '
'%s -j $%s' % (IPTABLES_DIRECTION[direction],
device,
chain_name)]
self._add_rule_to_chain_v4v6(SG_CHAIN, jump_rule, jump_rule)
if direction == EGRESS_DIRECTION:
self._add_rule_to_chain_v4v6('INPUT', jump_rule, jump_rule)
def _split_sgr_by_ethertype(self, security_group_rules):
ipv4_sg_rules = []
ipv6_sg_rules = []
for rule in security_group_rules:
if rule.get('ethertype') == constants.IPv4:
ipv4_sg_rules.append(rule)
elif rule.get('ethertype') == constants.IPv6:
if rule.get('protocol') == 'icmp':
rule['protocol'] = 'icmpv6'
ipv6_sg_rules.append(rule)
return ipv4_sg_rules, ipv6_sg_rules
def _select_sgr_by_direction(self, port, direction):
return [rule
for rule in port.get('security_group_rules', [])
if rule['direction'] == direction]
def _arp_spoofing_rule(self, port):
return ['-m mac ! --mac-source %s -j DROP' % port['mac_address']]
def _ip_spoofing_rule(self, port, ipv4_rules, ipv6_rules):
#Note(nati) allow dhcp or RA packet
ipv4_rules += ['-p udp --sport 68 --dport 67 -j RETURN']
ipv6_rules += ['-p icmpv6 -j RETURN']
for ip in port['fixed_ips']:
if netaddr.IPAddress(ip).version == 4:
ipv4_rules += ['! -s %s -j DROP' % ip]
else:
ipv6_rules += ['! -s %s -j DROP' % ip]
def _drop_dhcp_rule(self):
#Note(nati) Drop dhcp packet from VM
return ['-p udp --sport 67 --dport 68 -j DROP']
def _add_rule_by_security_group(self, port, direction):
chain_name = self._port_chain_name(port, direction)
# select rules for current direction
security_group_rules = self._select_sgr_by_direction(port, direction)
# split groups by ip version
# for ipv4, iptables command is used
# for ipv6, iptables6 command is used
ipv4_sg_rules, ipv6_sg_rules = self._split_sgr_by_ethertype(
security_group_rules)
ipv4_iptables_rule = []
ipv6_iptables_rule = []
if direction == EGRESS_DIRECTION:
ipv4_iptables_rule += self._arp_spoofing_rule(port)
ipv6_iptables_rule += self._arp_spoofing_rule(port)
self._ip_spoofing_rule(port,
ipv4_iptables_rule,
ipv6_iptables_rule)
ipv4_iptables_rule += self._drop_dhcp_rule()
ipv4_iptables_rule += self._convert_sgr_to_iptables_rules(
ipv4_sg_rules)
ipv6_iptables_rule += self._convert_sgr_to_iptables_rules(
ipv6_sg_rules)
self._add_rule_to_chain_v4v6(chain_name,
ipv4_iptables_rule,
ipv6_iptables_rule)
def _convert_sgr_to_iptables_rules(self, security_group_rules):
iptables_rules = []
self._drop_invalid_packets(iptables_rules)
self._allow_established(iptables_rules)
for rule in security_group_rules:
args = ['-j RETURN']
args += self._protocol_arg(rule.get('protocol'))
args += self._port_arg('dport',
rule.get('protocol'),
rule.get('port_range_min'),
rule.get('port_range_max'))
args += self._port_arg('sport',
rule.get('protocol'),
rule.get('source_port_range_min'),
rule.get('source_port_range_max'))
args += self._ip_prefix_arg('s',
rule.get('source_ip_prefix'))
args += self._ip_prefix_arg('d',
rule.get('dest_ip_prefix'))
iptables_rules += [' '.join(args)]
iptables_rules += ['-j $sg-fallback']
return iptables_rules
def _drop_invalid_packets(self, iptables_rules):
# Always drop invalid packets
iptables_rules += ['-m state --state ' 'INVALID -j DROP']
return iptables_rules
def _allow_established(self, iptables_rules):
# Allow established connections
iptables_rules += ['-m state --state ESTABLISHED,RELATED -j RETURN']
return iptables_rules
def _protocol_arg(self, protocol):
if protocol:
return ['-p', protocol]
return []
def _port_arg(self, direction, protocol, port_range_min, port_range_max):
if not (protocol in ['udp', 'tcp'] and port_range_min):
return []
if port_range_min == port_range_max:
return ['--%s' % direction, '%s' % (port_range_min,)]
else:
return ['-m', 'multiport',
'--%ss' % direction,
'%s:%s' % (port_range_min, port_range_max)]
def _ip_prefix_arg(self, direction, ip_prefix):
#NOTE (nati) : source_group_id is converted to list of source_
# ip_prefix in server side
if ip_prefix:
return ['-%s' % direction, ip_prefix]
return []
def _port_chain_name(self, port, direction):
#Note (nati) make chain name short less than 28 char
# with extra prefix
# ( see comment in iptables_manager )
return '%s%s' % (CHAIN_NAME_PREFIX[direction],
port['device'][3:13])
def filter_defer_apply_on(self):
self.iptables.defer_apply_on()
def filter_defer_apply_off(self):
self.iptables.defer_apply_off()

View File

@ -25,6 +25,7 @@ import inspect
import os
from quantum.agent.linux import utils
from quantum.openstack.common import lockutils
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
@ -91,6 +92,24 @@ class IptablesTable(object):
else:
self.unwrapped_chains.add(name)
def _select_chain_set(self, wrap):
if wrap:
return self.chains
else:
return self.unwrapped_chains
def ensure_remove_chain(self, name, wrap=True):
"""Ensure the chain is removed.
This removal "cascades". All rule in the chain are removed, as are
all rules in other chains that jump to it.
"""
chain_set = self._select_chain_set(wrap)
if name not in chain_set:
return
self.remove_chain(name, wrap)
def remove_chain(self, name, wrap=True):
"""Remove named chain.
@ -100,10 +119,7 @@ class IptablesTable(object):
If the chain is not found, this is merely logged.
"""
if wrap:
chain_set = self.chains
else:
chain_set = self.unwrapped_chains
chain_set = self._select_chain_set(wrap)
if name not in chain_set:
LOG.warn(('Attempted to remove chain %s which does not exist'),
@ -112,7 +128,6 @@ class IptablesTable(object):
chain_set.remove(name)
self.rules = filter(lambda r: r.chain != name, self.rules)
if wrap:
jump_snippet = '-j %s-%s' % (binary_name, name)
else:
@ -201,6 +216,7 @@ class IptablesManager(object):
self.use_ipv6 = use_ipv6
self.root_helper = root_helper
self.namespace = namespace
self.iptables_apply_deferred = False
self.ipv4 = {'filter': IptablesTable()}
self.ipv6 = {'filter': IptablesTable()}
@ -261,7 +277,21 @@ class IptablesManager(object):
self.ipv4['nat'].add_chain('float-snat')
self.ipv4['nat'].add_rule('snat', '-j $float-snat')
def defer_apply_on(self):
self.iptables_apply_deferred = True
def defer_apply_off(self):
self.iptables_apply_deferred = False
self._apply()
def apply(self):
if self.iptables_apply_deferred:
return
self._apply()
@lockutils.synchronized('iptables', 'quantum-', external=True)
def _apply(self):
"""Apply the current in-memory set of iptables rules.
This will blow away any rules left over from previous runs of the

View File

@ -0,0 +1,179 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from quantum.agent.linux import iptables_firewall
from quantum.agent.linux import iptables_manager
from quantum.common import topics
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
SG_RPC_VERSION = "1.1"
class SecurityGroupServerRpcApiMixin(object):
"""A mix-in that enable SecurityGroup support in plugin rpc
"""
def security_group_rules_for_devices(self, context, devices):
LOG.debug(_("Get security group rules "
"for devices via rpc %r"), devices)
return self.call(context,
self.make_msg('security_group_rules_for_devices',
devices=devices),
version=SG_RPC_VERSION,
topic=self.topic)
class SecurityGroupAgentRpcCallbackMixin(object):
"""A mix-in that enable SecurityGroup agent
support in agent implementations.
"""
def security_groups_rule_updated(self, context, **kwargs):
""" callback for security group rule update
:param security_groups: list of updated security_groups
"""
security_groups = kwargs.get('security_groups', [])
LOG.debug(
_("Security group rule updated on remote: %s"), security_groups)
self.agent.security_groups_rule_updated(security_groups)
def security_groups_member_updated(self, context, **kwargs):
""" callback for security group member update
:param security_groups: list of updated security_groups
"""
security_groups = kwargs.get('security_groups', [])
LOG.debug(
_("Security group member updated on remote: %s"), security_groups)
self.agent.security_groups_member_updated(security_groups)
def security_groups_provider_updated(self, context, **kwargs):
""" callback for security group provider update
"""
LOG.debug(_("Provider rule updated"))
self.agent.security_groups_provider_updated()
class SecurityGroupAgentRpcMixin(object):
"""A mix-in that enable SecurityGroup agent
support in agent implementations.
"""
def init_firewall(self):
LOG.debug(_("Init firewall settings"))
ip_manager = iptables_manager.IptablesManager(
root_helper=self.root_helper,
use_ipv6=True)
self.firewall = iptables_firewall.IptablesFirewallDriver(ip_manager)
def prepare_devices_filter(self, device_ids):
if not device_ids:
return
LOG.info(_("Preparing filters for devices %s"), device_ids)
devices = self.plugin_rpc.security_group_rules_for_devices(
self.context, list(device_ids))
with self.firewall.defer_apply():
for device in devices.values():
self.firewall.prepare_port_filter(device)
def security_groups_rule_updated(self, security_groups):
LOG.info(_("Security group "
"rule updated %r"), security_groups)
self._security_group_updated(
security_groups,
'security_groups')
def security_groups_member_updated(self, security_groups):
LOG.info(_("Security group "
"member updated %r"), security_groups)
self._security_group_updated(
security_groups,
'security_group_source_groups')
def _security_group_updated(self, security_groups, attribute):
#check need update or not
for device in self.firewall.ports.values():
if set(device.get(attribute,
[])).intersection(
set(security_groups)):
self.refresh_firewall()
return
def security_groups_provider_updated(self):
LOG.info(_("Provider rule updated"))
self.refresh_firewall()
def remove_devices_filter(self, device_ids):
if not device_ids:
return
LOG.info(_("Remove device filter for %r"), device_ids)
with self.firewall.defer_apply():
for device_id in device_ids:
device = self.firewall.ports.get(device_id)
if not device:
continue
self.firewall.remove_port_filter(device)
def refresh_firewall(self):
LOG.info(_("Refresh firewall rules"))
device_ids = self.firewall.ports.keys()
if not device_ids:
return
devices = self.plugin_rpc.security_group_rules_for_devices(
self.context, device_ids)
with self.firewall.defer_apply():
for device in devices.values():
LOG.debug(_("Update port filter for %s"), device)
self.firewall.update_port_filter(device)
class SecurityGroupAgentRpcApiMixin(object):
def _get_security_group_topic(self):
return topics.get_topic_name(self.topic,
topics.SECURITY_GROUP,
topics.UPDATE)
def security_groups_rule_updated(self, context, security_groups):
""" notify rule updated security groups """
if not security_groups:
return
self.fanout_cast(context,
self.make_msg('security_groups_rule_updated',
security_groups=security_groups),
version=SG_RPC_VERSION,
topic=self._get_security_group_topic())
def security_groups_member_updated(self, context, security_groups):
""" notify member updated security groups """
if not security_groups:
return
self.fanout_cast(context,
self.make_msg('security_groups_member_updated',
security_groups=security_groups),
version=SG_RPC_VERSION,
topic=self._get_security_group_topic())
def security_groups_provider_updated(self, context):
""" notify provider updated security groups """
self.fanout_cast(context,
self.make_msg('security_groups_provider_updated'),
version=SG_RPC_VERSION,
topic=self._get_security_group_topic())

View File

@ -26,5 +26,10 @@ PORT_STATUS_ERROR = 'ERROR'
DEVICE_OWNER_ROUTER_INTF = "network:router_interface"
DEVICE_OWNER_ROUTER_GW = "network:router_gateway"
DEVICE_OWNER_FLOATINGIP = "network:floatingip"
DEVICE_OWNER_DHCP = "network:dhcp"
FLOATINGIP_KEY = '_floatingips'
INTERFACE_KEY = '_interfaces'
IPv4 = 'IPv4'
IPv6 = 'IPv6'

View File

@ -16,6 +16,7 @@
NETWORK = 'network'
SUBNET = 'subnet'
PORT = 'port'
SECURITY_GROUP = 'security_group'
CREATE = 'create'
DELETE = 'delete'

View File

@ -150,3 +150,15 @@ def parse_mappings(mapping_list, unique_values=True):
def get_hostname():
return socket.getfqdn()
def compare_elements(a, b):
""" compare elements if a and b have same elements
This method doesn't consider ordering
"""
if a is None:
a = []
if b is None:
b = []
return set(a) == set(b)

View File

@ -22,6 +22,8 @@ from sqlalchemy import orm
from sqlalchemy.orm import exc
from sqlalchemy.orm import scoped_session
from quantum.api.v2 import attributes as attr
from quantum.common import utils
from quantum.db import model_base
from quantum.db import models_v2
from quantum.extensions import securitygroup as ext_sg
@ -403,9 +405,10 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
def _extend_port_dict_security_group(self, context, port):
filters = {'port_id': [port['id']]}
fields = {'security_group_id': None}
port[ext_sg.SECURITYGROUP] = []
security_group_id = self._get_port_security_group_bindings(
context, filters, fields)
port[ext_sg.SECURITYGROUP] = []
for security_group_id in security_group_id:
port[ext_sg.SECURITYGROUP].append(
security_group_id['security_group_id'])
@ -413,7 +416,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
def _process_port_create_security_group(self, context, port_id,
security_group_id):
if not security_group_id:
if not attr.is_attr_set(security_group_id):
return
for security_group_id in security_group_id:
self._create_port_security_group_binding(context, port_id,
@ -445,8 +448,10 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
def _validate_security_groups_on_port(self, context, port):
p = port['port']
if not p.get(ext_sg.SECURITYGROUP):
if not attr.is_attr_set(p.get(ext_sg.SECURITYGROUP)):
return
if p.get('device_owner') and p['device_owner'].startswith('network:'):
raise ext_sg.SecurityGroupInvalidDeviceOwner()
valid_groups = self.get_security_groups(context, fields={'id': None})
valid_groups_set = set([x['id'] for x in valid_groups])
@ -455,3 +460,17 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
if invalid_sg_set:
msg = ' '.join(str(x) for x in invalid_sg_set)
raise ext_sg.SecurityGroupNotFound(id=msg)
def _ensure_default_security_group_on_port(self, context, port):
# we don't apply security groups for dhcp, router
if (port['port'].get('device_owner') and
port['port']['device_owner'].startswith('network:')):
return
tenant_id = self._get_tenant_id_for_create(context,
port['port'])
default_sg = self._ensure_default_security_group(context, tenant_id)
if attr.is_attr_set(port['port'].get(ext_sg.SECURITYGROUP)):
sgids = port['port'].get(ext_sg.SECURITYGROUP)
else:
sgids = [default_sg]
port['port'][ext_sg.SECURITYGROUP] = sgids

View File

@ -0,0 +1,259 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from quantum.common import constants as q_const
from quantum.db import models_v2
from quantum.db import securitygroups_db as sg_db
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
IP_MASK = {q_const.IPv4: 32,
q_const.IPv6: 128}
DIRECTION_IP_PREFIX = {'ingress': 'source_ip_prefix',
'egress': 'dest_ip_prefix'}
class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
def create_security_group_rule(self, context, security_group_rule):
bulk_rule = {'security_group_rules': [security_group_rule]}
rule = self.create_security_group_rule_bulk_native(context,
bulk_rule)[0]
sgids = [rule['security_group_id']]
self.notifier.security_groups_rule_updated(context, sgids)
return rule
def create_security_group_rule_bulk(self, context,
security_group_rule):
rules = super(SecurityGroupServerRpcMixin,
self).create_security_group_rule_bulk_native(
context, security_group_rule)
sgids = set([r['security_group_id'] for r in rules])
self.notifier.security_groups_rule_updated(context, list(sgids))
return rules
def delete_security_group_rule(self, context, sgrid):
rule = self.get_security_group_rule(context, sgrid)
super(SecurityGroupServerRpcMixin,
self).delete_security_group_rule(context, sgrid)
self.notifier.security_groups_rule_updated(context,
[rule['security_group_id']])
class SecurityGroupServerRpcCallbackMixin(object):
"""A mix-in that enable SecurityGroup agent
support in plugin implementations.
"""
def security_group_rules_for_devices(self, context, **kwargs):
""" return security group rules for each port
also convert source_group_id rule
to source_ip_prefix rule
:params devices: list of devices
:returns: port correspond to the devices with security group rules
"""
devices = kwargs.get('devices')
ports = {}
for device in devices:
port = self.get_port_from_device(device)
if not port:
continue
if port['device_owner'].startswith('network:'):
continue
ports[port['id']] = port
return self._security_group_rules_for_ports(context, ports)
def _select_rules_for_ports(self, context, ports):
if not ports:
return []
sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
sg_binding_sgid = sg_db.SecurityGroupPortBinding.security_group_id
sgr_sgid = sg_db.SecurityGroupRule.security_group_id
query = context.session.query(sg_db.SecurityGroupPortBinding,
sg_db.SecurityGroupRule)
query = query.join(sg_db.SecurityGroupRule,
sgr_sgid == sg_binding_sgid)
query = query.filter(sg_binding_port.in_(ports.keys()))
return query.all()
def _select_ips_for_source_group(self, context, source_group_ids):
ips_by_group = {}
if not source_group_ids:
return ips_by_group
for source_group_id in source_group_ids:
ips_by_group[source_group_id] = []
ip_port = models_v2.IPAllocation.port_id
sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
sg_binding_sgid = sg_db.SecurityGroupPortBinding.security_group_id
query = context.session.query(sg_binding_sgid,
models_v2.IPAllocation.ip_address)
query = query.join(models_v2.IPAllocation,
ip_port == sg_binding_port)
query = query.filter(sg_binding_sgid.in_(source_group_ids))
ip_in_db = query.all()
for security_group_id, ip_address in ip_in_db:
ips_by_group[security_group_id].append(ip_address)
return ips_by_group
def _select_source_group_ids(self, ports):
source_group_ids = []
for port in ports.values():
for rule in port.get('security_group_rules'):
source_group_id = rule.get('source_group_id')
if source_group_id:
source_group_ids.append(source_group_id)
return source_group_ids
def _select_network_ids(self, ports):
return set((port['network_id'] for port in ports.values()))
def _select_dhcp_ips_for_network_ids(self, context, network_ids):
if not network_ids:
return {}
query = context.session.query(models_v2.Port,
models_v2.IPAllocation.ip_address)
query = query.join(models_v2.IPAllocation)
query = query.filter(models_v2.Port.network_id.in_(network_ids))
owner = q_const.DEVICE_OWNER_DHCP
query = query.filter(models_v2.Port.device_owner == owner)
ips = {}
for network_id in network_ids:
ips[network_id] = []
for port, ip in query.all():
ips[port['network_id']].append(ip)
return ips
def _convert_source_group_id_to_ip_prefix(self, context, ports):
source_group_ids = self._select_source_group_ids(ports)
ips = self._select_ips_for_source_group(context, source_group_ids)
for port in ports.values():
updated_rule = []
for rule in port.get('security_group_rules'):
source_group_id = rule.get('source_group_id')
direction = rule.get('direction')
direction_ip_prefix = DIRECTION_IP_PREFIX[direction]
if not source_group_id:
updated_rule.append(rule)
continue
port['security_group_source_groups'].append(source_group_id)
base_rule = rule
for ip in ips[source_group_id]:
if ip in port.get('fixed_ips', []):
continue
ip_rule = base_rule.copy()
version = netaddr.IPAddress(ip).version
ethertype = 'IPv%s' % version
if base_rule['ethertype'] != ethertype:
continue
ip_rule[direction_ip_prefix] = "%s/%s" % (
ip, IP_MASK[ethertype])
updated_rule.append(ip_rule)
port['security_group_rules'] = updated_rule
return ports
def _add_default_egress_rule(self, port, ethertype, ips):
""" Adding default egress rule which allows all egress traffic. """
egress_rule = [r for r in port['security_group_rules']
if (r['direction'] == 'egress' and
r['ethertype'] == ethertype)]
if len(egress_rule) > 0:
return
for ip in port['fixed_ips']:
version = netaddr.IPAddress(ip).version
if "IPv%s" % version == ethertype:
default_egress_rule = {'direction': 'egress',
'ethertype': ethertype}
port['security_group_rules'].append(default_egress_rule)
return
def _add_ingress_dhcp_rule(self, port, ips):
dhcp_ips = ips.get(port['network_id'])
for dhcp_ip in dhcp_ips:
if not netaddr.IPAddress(dhcp_ip).version == 4:
return
dhcp_rule = {'direction': 'ingress',
'ethertype': q_const.IPv4,
'protocol': 'udp',
'port_range_min': 68,
'port_range_max': 68,
'source_port_range_min': 67,
'source_port_range_max': 67}
dhcp_rule['source_ip_prefix'] = "%s/%s" % (dhcp_ip,
IP_MASK[q_const.IPv4])
port['security_group_rules'].append(dhcp_rule)
def _add_ingress_ra_rule(self, port, ips):
ra_ips = ips.get(port['network_id'])
for ra_ip in ra_ips:
if not netaddr.IPAddress(ra_ip).version == 6:
return
ra_rule = {'direction': 'ingress',
'ethertype': q_const.IPv6,
'protocol': 'icmp'}
ra_rule['source_ip_prefix'] = "%s/%s" % (ra_ip,
IP_MASK[q_const.IPv6])
port['security_group_rules'].append(ra_rule)
def _apply_provider_rule(self, context, ports):
network_ids = self._select_network_ids(ports)
ips = self._select_dhcp_ips_for_network_ids(context, network_ids)
for port in ports.values():
self._add_default_egress_rule(port, q_const.IPv4, ips)
self._add_default_egress_rule(port, q_const.IPv6, ips)
self._add_ingress_ra_rule(port, ips)
self._add_ingress_dhcp_rule(port, ips)
def _security_group_rules_for_ports(self, context, ports):
rules_in_db = self._select_rules_for_ports(context, ports)
for (binding, rule_in_db) in rules_in_db:
port_id = binding['port_id']
port = ports[port_id]
direction = rule_in_db['direction']
rule_dict = {
'security_group_id': rule_in_db['security_group_id'],
'direction': direction,
'ethertype': rule_in_db['ethertype'],
}
for key in ('protocol', 'port_range_min', 'port_range_max',
'source_ip_prefix', 'source_group_id'):
if rule_in_db.get(key):
if key == 'source_ip_prefix' and direction == 'egress':
rule_dict['dest_ip_prefix'] = rule_in_db[key]
continue
rule_dict[key] = rule_in_db[key]
port['security_group_rules'].append(rule_dict)
self._apply_provider_rule(context, ports)
return self._convert_source_group_id_to_ip_prefix(context, ports)

View File

@ -40,6 +40,10 @@ class SecurityGroupInvalidPortValue(qexception.InvalidInput):
message = _("Invalid value for port %(port)s")
class SecurityGroupInvalidDeviceOwner(qexception.InvalidInput):
message = _("Security Group can't be applied to network ports.")
class SecurityGroupInUse(qexception.InUse):
message = _("Security Group %(id)s in use.")
@ -208,7 +212,7 @@ EXTENDED_ATTRIBUTES_2_0 = {
'ports': {SECURITYGROUP: {'allow_post': True,
'allow_put': True,
'is_visible': True,
'default': None}}}
'default': attr.ATTR_NOT_SPECIFIED}}}
security_group_quota_opts = [
cfg.IntOpt('quota_security_group',
default=10,

View File

@ -32,6 +32,7 @@ import pyudev
from quantum.agent.linux import ip_lib
from quantum.agent.linux import utils
from quantum.agent import rpc as agent_rpc
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.common import config as logging_config
from quantum.common import topics
from quantum.common import utils as q_utils
@ -388,14 +389,17 @@ class LinuxBridge:
LOG.debug(_("Done deleting subinterface %s"), interface)
class LinuxBridgeRpcCallbacks():
class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
# history
# 1.1 Support Security Group RPC
RPC_API_VERSION = '1.1'
def __init__(self, context, linux_br):
def __init__(self, context, agent):
self.context = context
self.linux_br = linux_br
self.agent = agent
self.linux_br = agent.linux_br
def network_delete(self, context, **kwargs):
LOG.debug(_("network_delete received"))
@ -407,6 +411,9 @@ class LinuxBridgeRpcCallbacks():
def port_update(self, context, **kwargs):
LOG.debug(_("port_update received"))
port = kwargs.get('port')
if 'security_groups' in port:
self.agent.refresh_firewall()
if port['admin_state_up']:
vlan_id = kwargs.get('vlan_id')
physical_network = kwargs.get('physical_network')
@ -429,7 +436,12 @@ class LinuxBridgeRpcCallbacks():
return dispatcher.RpcDispatcher([self])
class LinuxBridgeQuantumAgentRPC:
class LinuxBridgePluginApi(agent_rpc.PluginApi,
sg_rpc.SecurityGroupServerRpcApiMixin):
pass
class LinuxBridgeQuantumAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
def __init__(self, interface_mappings, polling_interval,
root_helper):
@ -437,6 +449,7 @@ class LinuxBridgeQuantumAgentRPC:
self.root_helper = root_helper
self.setup_linux_bridge(interface_mappings)
self.setup_rpc(interface_mappings.values())
self.init_firewall()
def setup_rpc(self, physical_interfaces):
if physical_interfaces:
@ -453,17 +466,18 @@ class LinuxBridgeQuantumAgentRPC:
LOG.info(_("RPC agent_id: %s"), self.agent_id)
self.topic = topics.AGENT
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.plugin_rpc = LinuxBridgePluginApi(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.callbacks = LinuxBridgeRpcCallbacks(self.context,
self.linux_br)
self)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE]]
[topics.NETWORK, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
@ -515,6 +529,7 @@ class LinuxBridgeQuantumAgentRPC:
def treat_devices_added(self, devices):
resync = False
self.prepare_devices_filter(devices)
for device in devices:
LOG.debug(_("Port %s added"), device)
try:
@ -544,6 +559,7 @@ class LinuxBridgeQuantumAgentRPC:
def treat_devices_removed(self, devices):
resync = False
self.remove_devices_filter(devices)
for device in devices:
LOG.info(_("Attachment %s removed"), device)
try:
@ -597,10 +613,7 @@ class LinuxBridgeQuantumAgentRPC:
def main():
eventlet.monkey_patch()
cfg.CONF(args=sys.argv, project='quantum')
# (TODO) gary - swap with common logging
logging_config.setup_logging(cfg.CONF)
try:
interface_mappings = q_utils.parse_mappings(
cfg.CONF.LINUX_BRIDGE.physical_interface_mappings)

View File

@ -18,7 +18,9 @@ from sqlalchemy.orm import exc
from quantum.common import exceptions as q_exc
import quantum.db.api as db
from quantum import manager
from quantum.db import models_v2
from quantum.db import securitygroups_db as sg_db
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
from quantum.plugins.linuxbridge.common import config
@ -192,13 +194,28 @@ def get_port_from_device(device):
"""Get port from database"""
LOG.debug(_("get_port_from_device() called"))
session = db.get_session()
ports = session.query(models_v2.Port).all()
if not ports:
sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
query = session.query(models_v2.Port,
sg_db.SecurityGroupPortBinding.security_group_id)
query = query.outerjoin(sg_db.SecurityGroupPortBinding,
models_v2.Port.id == sg_binding_port)
query = query.filter(models_v2.Port.id.startswith(device))
port_and_sgs = query.all()
if not port_and_sgs:
return
for port in ports:
if port['id'].startswith(device):
return port
return
port = port_and_sgs[0][0]
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict['security_groups'] = []
for port_in_db, sg_id in port_and_sgs:
if sg_id:
port_dict['security_groups'].append(sg_id)
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address']
for ip in port['fixed_ips']]
return port_dict
def set_port_status(port_id, status):

View File

@ -15,19 +15,23 @@
import sys
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.api.v2 import attributes
from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum.common import utils
from quantum.db import api as db_api
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import l3_db
from quantum.db import l3_rpc_base
from quantum.db import quota_db
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings
from quantum.extensions import providernet as provider
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
@ -41,11 +45,13 @@ LOG = logging.getLogger(__name__)
class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin):
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
RPC_API_VERSION = '1.1'
# Device names start with "tap"
# history
# 1.1 Support Security Group RPC
TAP_PREFIX_LEN = 3
def create_rpc_dispatcher(self):
@ -56,13 +62,20 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
'''
return q_rpc.PluginRpcDispatcher([self])
@classmethod
def get_port_from_device(cls, device):
port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:])
if port:
port['device'] = device
return port
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details"""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
LOG.debug(_("Device %(device)s details requested from %(agent_id)s"),
locals())
port = db.get_port_from_device(device[self.TAP_PREFIX_LEN:])
port = self.get_port_from_device(device)
if port:
binding = db.get_network_binding(db_api.get_session(),
port['network_id'])
@ -86,7 +99,7 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
device = kwargs.get('device')
LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
locals())
port = db.get_port_from_device(device[self.TAP_PREFIX_LEN:])
port = self.get_port_from_device(device)
if port:
entry = {'device': device,
'exists': True}
@ -99,7 +112,8 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
return entry
class AgentNotifierApi(proxy.RpcProxy):
class AgentNotifierApi(proxy.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
'''Agent side of the linux bridge rpc API.
API version history:
@ -112,6 +126,7 @@ class AgentNotifierApi(proxy.RpcProxy):
def __init__(self, topic):
super(AgentNotifierApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.topic = topic
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
topics.DELETE)
@ -135,7 +150,8 @@ class AgentNotifierApi(proxy.RpcProxy):
class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
l3_db.L3_NAT_db_mixin):
l3_db.L3_NAT_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin):
"""Implement the Quantum abstractions using Linux bridging.
A new VLAN is created for each network. An agent is relied upon
@ -157,7 +173,8 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
# is qualified by class
__native_bulk_support = True
supported_extension_aliases = ["provider", "router", "binding", "quotas"]
supported_extension_aliases = ["provider", "router", "binding", "quotas",
"security-group"]
network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set"
@ -333,6 +350,11 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
session = context.session
with session.begin(subtransactions=True):
#set up default security groups
tenant_id = self._get_tenant_id_for_create(
context, network['network'])
self._ensure_default_security_group(context, tenant_id)
if not network_type:
# tenant network
network_type = self.tenant_network_type
@ -374,8 +396,7 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
session = context.session
with session.begin(subtransactions=True):
binding = db.get_network_binding(session, id)
result = super(LinuxBridgePluginV2, self).delete_network(context,
id)
super(LinuxBridgePluginV2, self).delete_network(context, id)
if binding.vlan_id != constants.LOCAL_VLAN_ID:
db.release_network(session, binding.physical_network,
binding.vlan_id, self.network_vlan_ranges)
@ -412,31 +433,70 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_BRIDGE
return port
def create_port(self, context, port):
port = super(LinuxBridgePluginV2, self).create_port(context, port)
return self._extend_port_dict_binding(context, port)
def get_port(self, context, id, fields=None):
port = super(LinuxBridgePluginV2, self).get_port(context, id, fields)
return self._fields(self._extend_port_dict_binding(context, port),
fields)
self._extend_port_dict_security_group(context, port)
self._extend_port_dict_binding(context, port),
return self._fields(port, fields)
def get_ports(self, context, filters=None, fields=None):
ports = super(LinuxBridgePluginV2, self).get_ports(context, filters,
fields)
#TODO(nati) filter by security group
for port in ports:
self._extend_port_dict_security_group(context, port)
return [self._fields(self._extend_port_dict_binding(context, port),
fields) for port in ports]
def create_port(self, context, port):
session = context.session
with session.begin(subtransactions=True):
self._ensure_default_security_group_on_port(context, port)
self._validate_security_groups_on_port(context, port)
sgids = port['port'].get(ext_sg.SECURITYGROUP)
port = super(LinuxBridgePluginV2,
self).create_port(context, port)
self._process_port_create_security_group(
context, port['id'], sgids)
self._extend_port_dict_security_group(context, port)
if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
self.notifier.security_groups_provider_updated(context)
else:
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUP))
return self._extend_port_dict_binding(context, port)
def update_port(self, context, id, port):
original_port = super(LinuxBridgePluginV2, self).get_port(context,
id)
port = super(LinuxBridgePluginV2, self).update_port(context, id, port)
self._validate_security_groups_on_port(context, port)
original_port = self.get_port(context, id)
session = context.session
port_updated = False
with session.begin(subtransactions=True):
# delete the port binding and read it with the new rules
if ext_sg.SECURITYGROUP in port['port']:
self._delete_port_security_group_bindings(context, id)
self._process_port_create_security_group(
context,
id,
port['port'][ext_sg.SECURITYGROUP])
port_updated = True
port = super(LinuxBridgePluginV2, self).update_port(
context, id, port)
self._extend_port_dict_security_group(context, port)
if original_port['admin_state_up'] != port['admin_state_up']:
binding = db.get_network_binding(context.session,
port['network_id'])
self.notifier.port_update(context, port,
binding.physical_network,
binding.vlan_id)
port_updated = True
if (original_port['fixed_ips'] != port['fixed_ips'] or
not utils.compare_elements(
original_port.get(ext_sg.SECURITYGROUP),
port.get(ext_sg.SECURITYGROUP))):
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUP))
if port_updated:
self._notify_port_updated(context, port)
return self._extend_port_dict_binding(context, port)
def delete_port(self, context, id, l3_port_check=True):
@ -445,5 +505,19 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
# and l3-router. If so, we should prevent deletion.
if l3_port_check:
self.prevent_l3_port_deletion(context, id)
self.disassociate_floatingips(context, id)
return super(LinuxBridgePluginV2, self).delete_port(context, id)
session = context.session
with session.begin(subtransactions=True):
self.disassociate_floatingips(context, id)
port = self.get_port(context, id)
self._delete_port_security_group_bindings(context, id)
super(LinuxBridgePluginV2, self).delete_port(context, id)
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUP))
def _notify_port_updated(self, context, port):
binding = db.get_network_binding(context.session,
port['network_id'])
self.notifier.port_update(context, port,
binding.physical_network,
binding.vlan_id)

View File

@ -0,0 +1,137 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import call
from quantum.api.v2 import attributes
from quantum.extensions import securitygroup as ext_sg
from quantum.plugins.linuxbridge.db import l2network_db_v2 as lb_db
from quantum.tests.unit import test_extension_security_group as test_sg
PLUGIN_NAME = ('quantum.plugins.linuxbridge.'
'lb_quantum_plugin.LinuxBridgePluginV2')
AGENT_NAME = ('quantum.plugins.linuxbridge.'
'agent.linuxbridg_quantum_agent.LinuxBridgeQuantumAgentRPC')
NOTIFIER = ('quantum.plugins.linuxbridge.'
'lb_quantum_plugin.AgentNotifierApi')
class LinuxBridgeSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
_plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
self.addCleanup(mock.patch.stopall)
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
notifier_cls.return_value = self.notifier
self._attribute_map_bk_ = {}
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
self._attribute_map_bk_[item] = (attributes.
RESOURCE_ATTRIBUTE_MAP[item].
copy())
super(LinuxBridgeSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
def tearDown(self):
super(LinuxBridgeSecurityGroupsTestCase, self).tearDown()
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
class TestLinuxBridgeSecurityGroups(LinuxBridgeSecurityGroupsTestCase,
test_sg.TestSecurityGroups):
def test_security_group_rule_updated(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
with self.security_group(name, description) as sg2:
security_group_id = sg['security_group']['id']
direction = "ingress"
source_group_id = sg2['security_group']['id']
protocol = 'tcp'
port_range_min = 88
port_range_max = 88
with self.security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_group_id=source_group_id
):
pass
self.notifier.assert_has_calls(
[call.security_groups_rule_updated(mock.ANY,
[security_group_id]),
call.security_groups_rule_updated(mock.ANY,
[security_group_id])])
def test_security_group_member_updated(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg:
security_group_id = sg['security_group']['id']
res = self._create_port('json', n['network']['id'])
port = self.deserialize('json', res)
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
'name': port['port']['name'],
ext_sg.SECURITYGROUP:
[security_group_id]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEquals(res['port'][ext_sg.SECURITYGROUP][0],
security_group_id)
self._delete('ports', port['port']['id'])
self.notifier.assert_has_calls(
[call.security_groups_member_updated(
mock.ANY, [mock.ANY]),
call.security_groups_member_updated(
mock.ANY, [security_group_id])])
class TestLinuxBridgeSecurityGroupsDB(LinuxBridgeSecurityGroupsTestCase):
def test_security_group_get_port_from_device(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg:
security_group_id = sg['security_group']['id']
res = self._create_port('json', n['network']['id'])
port = self.deserialize('json', res)
fixed_ips = port['port']['fixed_ips']
data = {'port': {'fixed_ips': fixed_ips,
'name': port['port']['name'],
ext_sg.SECURITYGROUP:
[security_group_id]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
port_id = res['port']['id']
device_id = port_id[:8]
port_dict = lb_db.get_port_from_device(device_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUP])
self.assertEqual([], port_dict['security_group_rules'])
self.assertEqual([fixed_ips[0]['ip_address']],
port_dict['fixed_ips'])
self._delete('ports', port['port']['id'])
def test_security_group_get_port_from_device_with_no_port(self):
port_dict = lb_db.get_port_from_device('bad_device_id')
self.assertEqual(None, port_dict)

View File

@ -21,14 +21,15 @@ from quantum.manager import QuantumManager
from quantum.openstack.common import cfg
from quantum.tests.unit import test_db_plugin as test_plugin
PLUGIN_NAME = ('quantum.plugins.linuxbridge.'
'lb_quantum_plugin.LinuxBridgePluginV2')
class LinuxBridgePluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
_plugin_name = ('quantum.plugins.linuxbridge.'
'lb_quantum_plugin.LinuxBridgePluginV2')
_plugin_name = PLUGIN_NAME
def setUp(self):
super(LinuxBridgePluginV2TestCase, self).setUp(self._plugin_name)
super(LinuxBridgePluginV2TestCase, self).setUp(PLUGIN_NAME)
class TestLinuxBridgeBasicGet(test_plugin.TestBasicGet,

View File

@ -77,6 +77,7 @@ class SecurityGroupsTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
data = {'security_group_rule': {'security_group_id': security_group_id,
'direction': direction,
'protocol': protocol,
'ethertype': ethertype,
'port_range_min': port_range_min,
'port_range_max': port_range_max,
'tenant_id': tenant_id,
@ -181,12 +182,13 @@ class SecurityGroupTestPlugin(db_base_plugin_v2.QuantumDbPluginV2,
def update_port(self, context, id, port):
session = context.session
with session.begin(subtransactions=True):
self._validate_security_groups_on_port(context, port)
# delete the port binding and read it with the new rules
self._delete_port_security_group_bindings(context, id)
self._process_port_create_security_group(context, id,
port['port'].get(
ext_sg.SECURITYGROUP))
if ext_sg.SECURITYGROUP in port['port']:
self._validate_security_groups_on_port(context, port)
# delete the port binding and read it with the new rules
self._delete_port_security_group_bindings(context, id)
self._process_port_create_security_group(context, id,
port['port'].get(
ext_sg.SECURITYGROUP))
port = super(SecurityGroupTestPlugin, self).update_port(
context, id, port)
self._extend_port_dict_security_group(context, port)
@ -204,7 +206,7 @@ class SecurityGroupDBTestCase(SecurityGroupsTestCase):
test_config['plugin_name_v2'] = DB_PLUGIN_KLASS
ext_mgr = SecurityGroupTestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(SecurityGroupDBTestCase, self).setUp()
super(SecurityGroupDBTestCase, self).setUp(plugin)
class TestSecurityGroups(SecurityGroupDBTestCase):
@ -580,6 +582,17 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['port'][ext_sg.SECURITYGROUP][0],
sg['security_group']['id'])
# Test update port without security group
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
'name': port['port']['name']}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['port'][ext_sg.SECURITYGROUP][0],
sg['security_group']['id'])
self._delete('ports', port['port']['id'])
def test_update_port_with_multiple_security_groups(self):
@ -606,12 +619,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
port = self.deserialize('json', res)
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
'name': port['port']['name']}}
'name': port['port']['name'],
'security_groups': []}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['port'][ext_sg.SECURITYGROUP], [])
self.assertEqual(res['port'].get(ext_sg.SECURITYGROUP),
[])
self._delete('ports', port['port']['id'])
def test_create_port_with_bad_security_group(self):

View File

@ -0,0 +1,912 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import call
import unittest2 as unittest
from quantum.agent.linux.iptables_firewall import IptablesFirewallDriver
from quantum.tests.unit import test_api_v2
_uuid = test_api_v2._uuid
FAKE_PREFIX = {'IPv4': '10.0.0.0/24',
'IPv6': 'fe80::0/48'}
FAKE_IP = {'IPv4': '10.0.0.1',
'IPv6': 'fe80::1'}
class IptablesFirewallTestCase(unittest.TestCase):
def setUp(self):
self.utils_exec_p = mock.patch(
'quantum.agent.linux.utils.execute')
self.utils_exec = self.utils_exec_p.start()
self.iptables_cls_p = mock.patch(
'quantum.agent.linux.iptables_manager.IptablesManager')
iptables_cls = self.iptables_cls_p.start()
self.iptables_inst = mock.Mock()
self.v4filter_inst = mock.Mock()
self.v6filter_inst = mock.Mock()
self.iptables_inst.ipv4 = {'filter': self.v4filter_inst}
self.iptables_inst.ipv6 = {'filter': self.v6filter_inst}
iptables_cls.return_value = self.iptables_inst
self.firewall = IptablesFirewallDriver(self.iptables_inst)
def tearDown(self):
self.iptables_cls_p.stop()
self.utils_exec_p.stop()
def _fake_port(self):
return {'device': 'tapfake_dev',
'mac_address': 'ff:ff:ff:ff',
'fixed_ips': [FAKE_IP['IPv4'],
FAKE_IP['IPv6']]}
def test_prepare_port_filter_with_no_sg(self):
port = self._fake_port()
self.firewall.prepare_port_filter(port)
calls = [call.add_chain('sg-fallback'),
call.add_rule('sg-fallback', '-j DROP'),
call.ensure_remove_chain('sg-chain'),
call.add_chain('sg-chain'),
call.add_chain('ifake_dev'),
call.add_rule('FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev '
'-j $sg-chain'),
call.add_rule('sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev '
'-j $ifake_dev'),
call.add_rule(
'ifake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ifake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN'),
call.add_rule('ifake_dev', '-j $sg-fallback'),
call.add_chain('ofake_dev'),
call.add_rule('FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev '
'-j $sg-chain'),
call.add_rule('sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev '
'-j $ofake_dev'),
call.add_rule('INPUT',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev '
'-j $ofake_dev'),
call.add_rule(
'ofake_dev', '-m mac ! --mac-source ff:ff:ff:ff -j DROP'),
call.add_rule(
'ofake_dev',
'-p udp --sport 68 --dport 67 -j RETURN'),
call.add_rule('ofake_dev', '! -s 10.0.0.1 -j DROP'),
call.add_rule(
'ofake_dev',
'-p udp --sport 67 --dport 68 -j DROP'),
call.add_rule(
'ofake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ofake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN'),
call.add_rule('ofake_dev', '-j $sg-fallback'),
call.add_rule('sg-chain', '-j ACCEPT')]
self.v4filter_inst.assert_has_calls(calls)
def test_filter_ipv4_ingress(self):
rule = {'ethertype': 'IPv4',
'direction': 'ingress'}
ingress = call.add_rule('ifake_dev', '-j RETURN')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_prefix(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'source_ip_prefix': prefix}
ingress = call.add_rule('ifake_dev', '-j RETURN -s %s' % prefix)
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_tcp(self):
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'protocol': 'tcp'}
ingress = call.add_rule('ifake_dev', '-j RETURN -p tcp')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_tcp_prefix(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'protocol': 'tcp',
'source_ip_prefix': prefix}
ingress = call.add_rule('ifake_dev', '-j RETURN -p tcp -s %s' % prefix)
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_icmp(self):
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'protocol': 'icmp'}
ingress = call.add_rule('ifake_dev', '-j RETURN -p icmp')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_icmp_prefix(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'protocol': 'icmp',
'source_ip_prefix': prefix}
ingress = call.add_rule(
'ifake_dev', '-j RETURN -p icmp -s %s' % prefix)
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_tcp_port(self):
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': 10,
'port_range_max': 10}
ingress = call.add_rule('ifake_dev', '-j RETURN -p tcp --dport 10')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_tcp_mport(self):
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': 10,
'port_range_max': 100}
ingress = call.add_rule(
'ifake_dev',
'-j RETURN -p tcp -m multiport --dports 10:100')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_tcp_mport_prefix(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': 10,
'port_range_max': 100,
'source_ip_prefix': prefix}
ingress = call.add_rule(
'ifake_dev',
'-j RETURN -p tcp -m multiport '
'--dports 10:100 -s %s' % prefix)
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_udp(self):
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'protocol': 'udp'}
ingress = call.add_rule('ifake_dev', '-j RETURN -p udp')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_udp_prefix(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'protocol': 'udp',
'source_ip_prefix': prefix}
ingress = call.add_rule('ifake_dev', '-j RETURN -p udp -s %s' % prefix)
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_udp_port(self):
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'protocol': 'udp',
'port_range_min': 10,
'port_range_max': 10}
ingress = call.add_rule('ifake_dev', '-j RETURN -p udp --dport 10')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_udp_mport(self):
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'protocol': 'udp',
'port_range_min': 10,
'port_range_max': 100}
ingress = call.add_rule(
'ifake_dev',
'-j RETURN -p udp -m multiport --dports 10:100')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_ingress_udp_mport_prefix(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'ingress',
'protocol': 'udp',
'port_range_min': 10,
'port_range_max': 100,
'source_ip_prefix': prefix}
ingress = call.add_rule(
'ifake_dev',
'-j RETURN -p udp -m multiport '
'--dports 10:100 -s %s' % prefix)
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress(self):
rule = {'ethertype': 'IPv4',
'direction': 'egress'}
egress = call.add_rule('ofake_dev', '-j RETURN')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_prefix(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'source_ip_prefix': prefix}
egress = call.add_rule('ofake_dev', '-j RETURN -s %s' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_tcp(self):
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'tcp'}
egress = call.add_rule('ofake_dev', '-j RETURN -p tcp')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_tcp_prefix(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'tcp',
'source_ip_prefix': prefix}
egress = call.add_rule('ofake_dev', '-j RETURN -p tcp -s %s' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_icmp(self):
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'icmp'}
egress = call.add_rule('ofake_dev', '-j RETURN -p icmp')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_icmp_prefix(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'icmp',
'source_ip_prefix': prefix}
egress = call.add_rule(
'ofake_dev', '-j RETURN -p icmp -s %s' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_tcp_port(self):
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'tcp',
'port_range_min': 10,
'port_range_max': 10}
egress = call.add_rule('ofake_dev', '-j RETURN -p tcp --dport 10')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_tcp_mport(self):
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'tcp',
'port_range_min': 10,
'port_range_max': 100}
egress = call.add_rule(
'ofake_dev',
'-j RETURN -p tcp -m multiport --dports 10:100')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_tcp_mport_prefix(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'tcp',
'port_range_min': 10,
'port_range_max': 100,
'source_ip_prefix': prefix}
egress = call.add_rule(
'ofake_dev',
'-j RETURN -p tcp -m multiport '
'--dports 10:100 -s %s' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_udp(self):
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'udp'}
egress = call.add_rule('ofake_dev', '-j RETURN -p udp')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_udp_prefix(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'udp',
'source_ip_prefix': prefix}
egress = call.add_rule('ofake_dev', '-j RETURN -p udp -s %s' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_udp_port(self):
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'udp',
'port_range_min': 10,
'port_range_max': 10}
egress = call.add_rule('ofake_dev', '-j RETURN -p udp --dport 10')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_udp_mport(self):
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'udp',
'port_range_min': 10,
'port_range_max': 100}
egress = call.add_rule(
'ofake_dev',
'-j RETURN -p udp -m multiport --dports 10:100')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_udp_mport_prefix(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'udp',
'port_range_min': 10,
'port_range_max': 100,
'source_ip_prefix': prefix}
egress = call.add_rule(
'ofake_dev',
'-j RETURN -p udp -m multiport '
'--dports 10:100 -s %s' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress(self):
rule = {'ethertype': 'IPv6',
'direction': 'ingress'}
ingress = call.add_rule('ifake_dev', '-j RETURN')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_prefix(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'source_ip_prefix': prefix}
ingress = call.add_rule('ifake_dev', '-j RETURN -s %s' % prefix)
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_tcp(self):
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'protocol': 'tcp'}
ingress = call.add_rule('ifake_dev', '-j RETURN -p tcp')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_tcp_prefix(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'protocol': 'tcp',
'source_ip_prefix': prefix}
ingress = call.add_rule('ifake_dev', '-j RETURN -p tcp -s %s' % prefix)
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_tcp_port(self):
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': 10,
'port_range_max': 10}
ingress = call.add_rule('ifake_dev', '-j RETURN -p tcp --dport 10')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_icmp(self):
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'protocol': 'icmp'}
ingress = call.add_rule('ifake_dev', '-j RETURN -p icmpv6')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_icmp_prefix(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'protocol': 'icmp',
'source_ip_prefix': prefix}
ingress = call.add_rule(
'ifake_dev', '-j RETURN -p icmpv6 -s %s' % prefix)
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_tcp_mport(self):
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': 10,
'port_range_max': 100}
ingress = call.add_rule(
'ifake_dev',
'-j RETURN -p tcp -m multiport --dports 10:100')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_tcp_mport_prefix(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': 10,
'port_range_max': 100,
'source_ip_prefix': prefix}
ingress = call.add_rule(
'ifake_dev',
'-j RETURN -p tcp -m multiport '
'--dports 10:100 -s %s' % prefix)
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_udp(self):
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'protocol': 'udp'}
ingress = call.add_rule('ifake_dev', '-j RETURN -p udp')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_udp_prefix(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'protocol': 'udp',
'source_ip_prefix': prefix}
ingress = call.add_rule('ifake_dev', '-j RETURN -p udp -s %s' % prefix)
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_udp_port(self):
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'protocol': 'udp',
'port_range_min': 10,
'port_range_max': 10}
ingress = call.add_rule('ifake_dev', '-j RETURN -p udp --dport 10')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_udp_mport(self):
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'protocol': 'udp',
'port_range_min': 10,
'port_range_max': 100}
ingress = call.add_rule(
'ifake_dev',
'-j RETURN -p udp -m multiport --dports 10:100')
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_ingress_udp_mport_prefix(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'ingress',
'protocol': 'udp',
'port_range_min': 10,
'port_range_max': 100,
'source_ip_prefix': prefix}
ingress = call.add_rule(
'ifake_dev',
'-j RETURN -p udp -m multiport '
'--dports 10:100 -s %s' % prefix)
egress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress(self):
rule = {'ethertype': 'IPv6',
'direction': 'egress'}
egress = call.add_rule('ofake_dev', '-j RETURN')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_prefix(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'source_ip_prefix': prefix}
egress = call.add_rule('ofake_dev', '-j RETURN -s %s' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_tcp(self):
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'tcp'}
egress = call.add_rule('ofake_dev', '-j RETURN -p tcp')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_tcp_prefix(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'tcp',
'source_ip_prefix': prefix}
egress = call.add_rule('ofake_dev', '-j RETURN -p tcp -s %s' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_icmp(self):
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'icmp'}
egress = call.add_rule('ofake_dev', '-j RETURN -p icmpv6')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_icmp_prefix(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'icmp',
'source_ip_prefix': prefix}
egress = call.add_rule(
'ofake_dev', '-j RETURN -p icmpv6 -s %s' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_tcp_port(self):
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'tcp',
'port_range_min': 10,
'port_range_max': 10}
egress = call.add_rule('ofake_dev', '-j RETURN -p tcp --dport 10')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_tcp_mport(self):
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'tcp',
'port_range_min': 10,
'port_range_max': 100}
egress = call.add_rule(
'ofake_dev',
'-j RETURN -p tcp -m multiport --dports 10:100')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_tcp_mport_prefix(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'tcp',
'port_range_min': 10,
'port_range_max': 100,
'source_ip_prefix': prefix}
egress = call.add_rule(
'ofake_dev',
'-j RETURN -p tcp -m multiport '
'--dports 10:100 -s %s' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_udp(self):
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'udp'}
egress = call.add_rule('ofake_dev', '-j RETURN -p udp')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_udp_prefix(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'udp',
'source_ip_prefix': prefix}
egress = call.add_rule('ofake_dev', '-j RETURN -p udp -s %s' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_udp_port(self):
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'udp',
'port_range_min': 10,
'port_range_max': 10}
egress = call.add_rule('ofake_dev', '-j RETURN -p udp --dport 10')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_udp_mport(self):
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'udp',
'port_range_min': 10,
'port_range_max': 100}
egress = call.add_rule(
'ofake_dev',
'-j RETURN -p udp -m multiport --dports 10:100')
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_udp_mport_prefix(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'udp',
'port_range_min': 10,
'port_range_max': 100,
'source_ip_prefix': prefix}
egress = call.add_rule(
'ofake_dev',
'-j RETURN -p udp -m multiport '
'--dports 10:100 -s %s' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def _test_prepare_port_filter(self,
rule,
ingress_expected_call=None,
egress_expected_call=None):
port = self._fake_port()
ethertype = rule['ethertype']
prefix = FAKE_IP[ethertype]
filter_inst = self.v4filter_inst
dhcp_rule = call.add_rule(
'ofake_dev',
'-p udp --sport 68 --dport 67 -j RETURN')
if ethertype == 'IPv6':
filter_inst = self.v6filter_inst
dhcp_rule = call.add_rule('ofake_dev', '-p icmpv6 -j RETURN')
sg = [rule]
port['security_group_rules'] = sg
self.firewall.prepare_port_filter(port)
calls = [call.add_chain('sg-fallback'),
call.add_rule('sg-fallback', '-j DROP'),
call.ensure_remove_chain('sg-chain'),
call.add_chain('sg-chain'),
call.add_chain('ifake_dev'),
call.add_rule('FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev '
'-j $sg-chain'),
call.add_rule('sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev '
'-j $ifake_dev'),
call.add_rule(
'ifake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ifake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN')]
if ingress_expected_call:
calls.append(ingress_expected_call)
calls += [call.add_rule('ifake_dev', '-j $sg-fallback'),
call.add_chain('ofake_dev'),
call.add_rule('FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev '
'-j $sg-chain'),
call.add_rule('sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev '
'-j $ofake_dev'),
call.add_rule('INPUT',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev '
'-j $ofake_dev'),
call.add_rule(
'ofake_dev',
'-m mac ! --mac-source ff:ff:ff:ff -j DROP'),
dhcp_rule,
call.add_rule('ofake_dev', '! -s %s -j DROP' % prefix)]
if ethertype == 'IPv4':
calls.append(call.add_rule(
'ofake_dev',
'-p udp --sport 67 --dport 68 -j DROP'))
calls += [call.add_rule(
'ofake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ofake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN')]
if egress_expected_call:
calls.append(egress_expected_call)
calls += [call.add_rule('ofake_dev', '-j $sg-fallback'),
call.add_rule('sg-chain', '-j ACCEPT')]
filter_inst.assert_has_calls(calls)
def test_update_delete_port_filter(self):
port = self._fake_port()
port['security_group_rules'] = [{'ethertype': 'IPv4',
'direction': 'ingress'}]
self.firewall.prepare_port_filter(port)
port['security_group_rules'] = [{'ethertype': 'IPv4',
'direction': 'egress'}]
self.firewall.update_port_filter(port)
self.firewall.update_port_filter({'device': 'no-exist-device'})
self.firewall.remove_port_filter(port)
self.firewall.remove_port_filter({'device': 'no-exist-device'})
calls = [call.add_chain('sg-fallback'),
call.add_rule('sg-fallback', '-j DROP'),
call.ensure_remove_chain('sg-chain'),
call.add_chain('sg-chain'),
call.add_chain('ifake_dev'),
call.add_rule(
'FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev -j $sg-chain'),
call.add_rule(
'sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev -j $ifake_dev'),
call.add_rule(
'ifake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ifake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN'),
call.add_rule('ifake_dev', '-j RETURN'),
call.add_rule('ifake_dev', '-j $sg-fallback'),
call.add_chain('ofake_dev'),
call.add_rule(
'FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev -j $sg-chain'),
call.add_rule(
'sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev -j $ofake_dev'),
call.add_rule(
'INPUT',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev -j $ofake_dev'),
call.add_rule(
'ofake_dev',
'-m mac ! --mac-source ff:ff:ff:ff -j DROP'),
call.add_rule(
'ofake_dev',
'-p udp --sport 68 --dport 67 -j RETURN'),
call.add_rule(
'ofake_dev',
'! -s 10.0.0.1 -j DROP'),
call.add_rule(
'ofake_dev',
'-p udp --sport 67 --dport 68 -j DROP'),
call.add_rule(
'ofake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ofake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN'),
call.add_rule('ofake_dev', '-j $sg-fallback'),
call.add_rule('sg-chain', '-j ACCEPT'),
call.ensure_remove_chain('ifake_dev'),
call.ensure_remove_chain('ofake_dev'),
call.ensure_remove_chain('sg-chain'),
call.add_chain('sg-chain'),
call.add_chain('ifake_dev'),
call.add_rule(
'FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev -j $sg-chain'),
call.add_rule(
'sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev -j $ifake_dev'),
call.add_rule(
'ifake_dev',
'-m state --state INVALID -j DROP'),
call.add_rule(
'ifake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN'),
call.add_rule('ifake_dev', '-j $sg-fallback'),
call.add_chain('ofake_dev'),
call.add_rule(
'FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev -j $sg-chain'),
call.add_rule(
'sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev -j $ofake_dev'),
call.add_rule(
'INPUT',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev -j $ofake_dev'),
call.add_rule(
'ofake_dev',
'-m mac ! --mac-source ff:ff:ff:ff -j DROP'),
call.add_rule(
'ofake_dev', '-p udp --sport 68 --dport 67 -j RETURN'),
call.add_rule(
'ofake_dev', '! -s 10.0.0.1 -j DROP'),
call.add_rule(
'ofake_dev', '-p udp --sport 67 --dport 68 -j DROP'),
call.add_rule(
'ofake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ofake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN'),
call.add_rule('ofake_dev', '-j RETURN'),
call.add_rule('ofake_dev', '-j $sg-fallback'),
call.add_rule('sg-chain', '-j ACCEPT'),
call.ensure_remove_chain('ifake_dev'),
call.ensure_remove_chain('ofake_dev'),
call.ensure_remove_chain('sg-chain'),
call.add_chain('sg-chain')]
self.v4filter_inst.assert_has_calls(calls)
def test_remove_unknown_port(self):
port = self._fake_port()
self.firewall.remove_port_filter(port)
# checking no exception occures
self.v4filter_inst.assert_has_calls([])
def test_defer_apply(self):
with self.firewall.defer_apply():
pass
self.iptables_inst.assert_has_calls([call.defer_apply_on(),
call.defer_apply_off()])
def test_filter_defer_with_exception(self):
try:
with self.firewall.defer_apply():
raise Exception("same exception")
except:
pass
self.iptables_inst.assert_has_calls([call.defer_apply_on(),
call.defer_apply_off()])

File diff suppressed because it is too large Load Diff