Add functional testing to ipset_manager

Add functional testing to the ipset_manager module to verify
it works as expected in combination with iptables matching.

Implements:  blueprint add-ipset-to-security

Change-Id: Iec791ec30f87f6c00805f1d52c23b84aa7bc19de
This commit is contained in:
Miguel Angel Ajo 2014-09-09 15:22:03 +02:00
parent 35d3ee2ac3
commit ba51dc0665
5 changed files with 205 additions and 49 deletions

View File

@ -18,9 +18,10 @@ from neutron.common import utils
class IpsetManager(object): class IpsetManager(object):
"""Wrapper for ipset.""" """Wrapper for ipset."""
def __init__(self, execute=None, root_helper=None): def __init__(self, execute=None, root_helper=None, namespace=None):
self.execute = execute or linux_utils.execute self.execute = execute or linux_utils.execute
self.root_helper = root_helper self.root_helper = root_helper
self.namespace = namespace
@utils.synchronized('ipset', external=True) @utils.synchronized('ipset', external=True)
def create_ipset_chain(self, chain_name, ethertype): def create_ipset_chain(self, chain_name, ethertype):
@ -57,7 +58,13 @@ class IpsetManager(object):
def _apply(self, cmd, input=None): def _apply(self, cmd, input=None):
input = '\n'.join(input) if input else None input = '\n'.join(input) if input else None
self.execute(cmd, root_helper=self.root_helper, process_input=input) cmd_ns = []
if self.namespace:
cmd_ns.extend(['ip', 'netns', 'exec', self.namespace])
cmd_ns.extend(cmd)
self.execute(cmd_ns,
root_helper=self.root_helper,
process_input=input)
def _get_ipset_chain_type(self, ethertype): def _get_ipset_chain_type(self, ethertype):
return 'inet6' if ethertype == 'IPv6' else 'inet' return 'inet6' if ethertype == 'IPv6' else 'inet'

View File

@ -14,13 +14,17 @@
import random import random
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils from neutron.agent.linux import utils
from neutron.common import constants as n_const from neutron.common import constants as n_const
from neutron.openstack.common import uuidutils
from neutron.tests.functional.agent.linux import pinger
from neutron.tests.functional import base as functional_base from neutron.tests.functional import base as functional_base
BR_PREFIX = 'test-br' BR_PREFIX = 'test-br'
ICMP_BLOCK_RULE = '-p icmp -j DROP'
class BaseLinuxTestCase(functional_base.BaseSudoTestCase): class BaseLinuxTestCase(functional_base.BaseSudoTestCase):
@ -64,3 +68,53 @@ class BaseOVSLinuxTestCase(BaseLinuxTestCase):
br = self.create_resource(br_prefix, self.ovs.add_bridge) br = self.create_resource(br_prefix, self.ovs.add_bridge)
self.addCleanup(br.destroy) self.addCleanup(br.destroy)
return br return br
class BaseIPVethTestCase(BaseLinuxTestCase):
SRC_ADDRESS = '192.168.0.1'
DST_ADDRESS = '192.168.0.2'
BROADCAST_ADDRESS = '192.168.0.255'
SRC_VETH = 'source'
DST_VETH = 'destination'
def setUp(self):
super(BaseIPVethTestCase, self).setUp()
self.check_sudo_enabled()
self.pinger = pinger.Pinger(self)
@staticmethod
def _set_ip_up(device, cidr, broadcast, ip_version=4):
device.addr.add(ip_version=ip_version, cidr=cidr, broadcast=broadcast)
device.link.set_up()
def _create_namespace(self):
ip_cmd = ip_lib.IPWrapper(self.root_helper)
name = "func-%s" % uuidutils.generate_uuid()
namespace = ip_cmd.ensure_namespace(name)
self.addCleanup(namespace.netns.delete, namespace.namespace)
return namespace
def prepare_veth_pairs(self, src_addr=None,
dst_addr=None,
broadcast_addr=None,
src_ns=None, dst_ns=None,
src_veth=None,
dst_veth=None):
src_addr = src_addr or self.SRC_ADDRESS
dst_addr = dst_addr or self.DST_ADDRESS
broadcast_addr = broadcast_addr or self.BROADCAST_ADDRESS
src_veth = src_veth or self.SRC_VETH
dst_veth = dst_veth or self.DST_VETH
src_ns = src_ns or self._create_namespace()
dst_ns = dst_ns or self._create_namespace()
src_veth, dst_veth = src_ns.add_veth(src_veth,
dst_veth,
dst_ns.namespace)
self._set_ip_up(src_veth, '%s/24' % src_addr, broadcast_addr)
self._set_ip_up(dst_veth, '%s/24' % dst_addr, broadcast_addr)
return src_ns, dst_ns

View File

@ -0,0 +1,42 @@
# Copyright (c) 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class Pinger(object):
def __init__(self, testcase, timeout=1, max_attempts=1):
self.testcase = testcase
self._timeout = timeout
self._max_attempts = max_attempts
def _ping_destination(self, src_namespace, dest_address):
src_namespace.netns.execute(['ping', '-c', self._max_attempts,
'-W', self._timeout, dest_address])
def assert_ping_from_ns(self, src_ns, dst_ip):
try:
self._ping_destination(src_ns, dst_ip)
except RuntimeError:
self.testcase.fail("destination ip %(dst_ip)s is not replying "
"to ping from namespace %(src_ns)s" %
{'src_ns': src_ns.namespace, 'dst_ip': dst_ip})
def assert_no_ping_from_ns(self, src_ns, dst_ip):
try:
self._ping_destination(src_ns, dst_ip)
self.testcase.fail("destination ip %(dst_ip)s is replying to ping"
"from namespace %(src_ns)s, but it shouldn't" %
{'src_ns': src_ns.namespace, 'dst_ip': dst_ip})
except RuntimeError:
pass

View File

@ -0,0 +1,92 @@
# Copyright (c) 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.linux import ipset_manager
from neutron.agent.linux import iptables_manager
from neutron.tests.functional.agent.linux import base
IPSET_CHAIN = 'test-chain'
IPSET_ETHERTYPE = 'IPv4'
ICMP_ACCEPT_RULE = '-p icmp -m set --match-set %s src -j ACCEPT' % IPSET_CHAIN
UNRELATED_IP = '1.1.1.1'
class IpsetBase(base.BaseIPVethTestCase):
def setUp(self):
super(IpsetBase, self).setUp()
self.src_ns, self.dst_ns = self.prepare_veth_pairs()
self.ipset = self._create_ipset_manager_and_chain(self.dst_ns,
IPSET_CHAIN)
self.dst_iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
namespace=self.dst_ns.namespace)
self._add_iptables_ipset_rules(self.dst_iptables)
def _create_ipset_manager_and_chain(self, dst_ns, chain_name):
ipset = ipset_manager.IpsetManager(
root_helper=self.root_helper,
namespace=dst_ns.namespace)
ipset.create_ipset_chain(chain_name, IPSET_ETHERTYPE)
return ipset
@staticmethod
def _remove_iptables_ipset_rules(iptables_manager):
iptables_manager.ipv4['filter'].remove_rule('INPUT', ICMP_ACCEPT_RULE)
iptables_manager.apply()
@staticmethod
def _add_iptables_ipset_rules(iptables_manager):
iptables_manager.ipv4['filter'].add_rule('INPUT', ICMP_ACCEPT_RULE)
iptables_manager.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
iptables_manager.apply()
class IpsetManagerTestCase(IpsetBase):
def test_add_member_allows_ping(self):
self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
self.ipset.add_member_to_ipset_chain(IPSET_CHAIN, self.SRC_ADDRESS)
self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
def test_del_member_denies_ping(self):
self.ipset.add_member_to_ipset_chain(IPSET_CHAIN, self.SRC_ADDRESS)
self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
self.ipset.del_ipset_chain_member(IPSET_CHAIN, self.SRC_ADDRESS)
self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
def test_refresh_ipset_allows_ping(self):
self.ipset.refresh_ipset_chain_by_name(IPSET_CHAIN, [UNRELATED_IP],
IPSET_ETHERTYPE)
self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
self.ipset.refresh_ipset_chain_by_name(
IPSET_CHAIN, [UNRELATED_IP, self.SRC_ADDRESS], IPSET_ETHERTYPE)
self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
self.ipset.refresh_ipset_chain_by_name(
IPSET_CHAIN, [self.SRC_ADDRESS, UNRELATED_IP], IPSET_ETHERTYPE)
self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
def test_destroy_ipset_chain(self):
self.assertRaises(RuntimeError,
self.ipset.destroy_ipset_chain_by_name, IPSET_CHAIN)
self._remove_iptables_ipset_rules(self.dst_iptables)
self.ipset.destroy_ipset_chain_by_name(IPSET_CHAIN)

View File

@ -13,64 +13,25 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager from neutron.agent.linux import iptables_manager
from neutron.openstack.common import uuidutils
from neutron.tests.functional.agent.linux import base from neutron.tests.functional.agent.linux import base
ICMP_BLOCK_RULE = '-p icmp -j DROP'
SRC_VETH_NAME = 'source'
DEST_VETH_NAME = 'destination'
class IptablesManagerTestCase(base.BaseIPVethTestCase):
class IpBase(base.BaseLinuxTestCase):
SRC_ADDRESS = '192.168.0.1'
DST_ADDRESS = '192.168.0.2'
@staticmethod
def _set_ip_up(device, cidr, broadcast='192.168.0.255', ip_version=4):
device.addr.add(ip_version=ip_version, cidr=cidr, broadcast=broadcast)
device.link.set_up()
@staticmethod
def _ping_destination(src_namespace, dest_address, attempts=3):
src_namespace.netns.execute(['ping', '-c', attempts, dest_address])
def _create_namespace(self):
ip_cmd = ip_lib.IPWrapper(self.root_helper)
name = "func-%s" % uuidutils.generate_uuid()
namespace = ip_cmd.ensure_namespace(name)
self.addCleanup(namespace.netns.delete, namespace.namespace)
return namespace
def _prepare_veth_pairs(self):
src_ns = self._create_namespace()
dst_ns = self._create_namespace()
src_veth, dst_veth = src_ns.add_veth(SRC_VETH_NAME,
DEST_VETH_NAME,
dst_ns.namespace)
self._set_ip_up(src_veth, '%s/24' % self.SRC_ADDRESS)
self._set_ip_up(dst_veth, '%s/24' % self.DST_ADDRESS)
return src_ns, dst_ns
class IptablesManagerTestCase(IpBase):
def setUp(self): def setUp(self):
super(IptablesManagerTestCase, self).setUp() super(IptablesManagerTestCase, self).setUp()
self.check_sudo_enabled() self.src_ns, self.dst_ns = self.prepare_veth_pairs()
self.src_ns, self.dst_ns = self._prepare_veth_pairs()
self.iptables = iptables_manager.IptablesManager( self.iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper, root_helper=self.root_helper,
namespace=self.dst_ns.namespace) namespace=self.dst_ns.namespace)
def test_icmp(self): def test_icmp(self):
self._ping_destination(self.src_ns, self.DST_ADDRESS) self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
self.iptables.ipv4['filter'].add_rule('INPUT', ICMP_BLOCK_RULE) self.iptables.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
self.iptables.apply() self.iptables.apply()
self.assertRaises(RuntimeError, self._ping_destination, self.src_ns, self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
self.DST_ADDRESS) self.iptables.ipv4['filter'].remove_rule('INPUT',
self.iptables.ipv4['filter'].remove_rule('INPUT', ICMP_BLOCK_RULE) base.ICMP_BLOCK_RULE)
self.iptables.apply() self.iptables.apply()
self._ping_destination(self.src_ns, self.DST_ADDRESS) self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)