vmware-nsx/neutron/tests/unit/test_iptables_manager.py
Henry Gessau 4a90dcd85f Predictable iptables chains output order
This fixes the iptables unit tests that break with a randomized PYTHONHASHSEED
(see the bug report).

The chains for iptables are stored as sets to avoid duplicates. When they are
output by iptables_manager their order can therefore be unpredictable. This was
found hash seed 1016732220.

To fix this we:
 - Sort the chains output by iptables_manager
 - Update the unit tests to check for sorted chains

When multiple tables are processed, they can be processed in any order or
dumped in any order. Found with hash seed 3728666619.

To fix this we:
 - Traverse the tables in sorted order for dumping
 - Fix tests to allow for tables to be processed in any order

Note: There are several other unrelated unit tests that also break with a
randomized PYTHONHASHSEED, but they are not addressed here. They will be
addressed in separate patches.

Partial-bug: #1348818

Change-Id: Ic3f4cd85316c9fc2e78bc7f5e900cfac87baf39d
2014-08-26 16:38:59 -04:00

860 lines
38 KiB
Python

# Copyright 2012 Locaweb.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# @author: Juliano Martinez, Locaweb.
import inspect
import os
import mock
from neutron.agent.linux import iptables_manager
from neutron.tests import base
from neutron.tests import tools
IPTABLES_ARG = {'bn': iptables_manager.binary_name}
NAT_DUMP = ('# Generated by iptables_manager\n'
'*nat\n'
':neutron-postrouting-bottom - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-POSTROUTING - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
':%(bn)s-float-snat - [0:0]\n'
':%(bn)s-snat - [0:0]\n'
'[0:0] -A PREROUTING -j %(bn)s-PREROUTING\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A POSTROUTING -j %(bn)s-POSTROUTING\n'
'[0:0] -A POSTROUTING -j neutron-postrouting-bottom\n'
'[0:0] -A neutron-postrouting-bottom -j %(bn)s-snat\n'
'[0:0] -A %(bn)s-snat -j '
'%(bn)s-float-snat\n'
'COMMIT\n'
'# Completed by iptables_manager\n' % IPTABLES_ARG)
FILTER_DUMP = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'[0:0] -A FORWARD -j neutron-filter-top\n'
'[0:0] -A OUTPUT -j neutron-filter-top\n'
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
'COMMIT\n'
'# Completed by iptables_manager\n' % IPTABLES_ARG)
class IptablesManagerStateFulTestCase(base.BaseTestCase):
def setUp(self):
super(IptablesManagerStateFulTestCase, self).setUp()
self.root_helper = 'sudo'
self.iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper)
self.execute = mock.patch.object(self.iptables, "execute").start()
def test_binary_name(self):
self.assertEqual(iptables_manager.binary_name,
os.path.basename(inspect.stack()[-1][1])[:16])
def test_get_chain_name(self):
name = '0123456789' * 5
# 28 chars is the maximum length of iptables chain name.
self.assertEqual(iptables_manager.get_chain_name(name, wrap=False),
name[:28])
# 11 chars is the maximum length of chain name of iptable_manager
# if binary_name is prepended.
self.assertEqual(iptables_manager.get_chain_name(name, wrap=True),
name[:11])
def _extend_with_ip6tables_filter(self, expected_calls, filter_dump):
expected_calls.insert(2, (
mock.call(['ip6tables-save', '-c'],
root_helper=self.root_helper),
''))
expected_calls.insert(3, (
mock.call(['ip6tables-restore', '-c'],
process_input=filter_dump,
root_helper=self.root_helper),
None))
expected_calls.extend([
(mock.call(['ip6tables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['ip6tables-restore', '-c'],
process_input=filter_dump,
root_helper=self.root_helper),
None)])
def _test_add_and_remove_chain_custom_binary_name_helper(self, use_ipv6):
bn = ("abcdef" * 5)
self.iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
binary_name=bn,
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
iptables_args = {'bn': bn[:16]}
filter_dump = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-filter - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'[0:0] -A FORWARD -j neutron-filter-top\n'
'[0:0] -A OUTPUT -j neutron-filter-top\n'
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
'COMMIT\n'
'# Completed by iptables_manager\n' % iptables_args)
filter_dump_ipv6 = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'[0:0] -A FORWARD -j neutron-filter-top\n'
'[0:0] -A OUTPUT -j neutron-filter-top\n'
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
'COMMIT\n'
'# Completed by iptables_manager\n' %
iptables_args)
filter_dump_mod = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-filter - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'[0:0] -A FORWARD -j neutron-filter-top\n'
'[0:0] -A OUTPUT -j neutron-filter-top\n'
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
'COMMIT\n'
'# Completed by iptables_manager\n'
% iptables_args)
nat_dump = ('# Generated by iptables_manager\n'
'*nat\n'
':neutron-postrouting-bottom - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-POSTROUTING - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
':%(bn)s-float-snat - [0:0]\n'
':%(bn)s-snat - [0:0]\n'
'[0:0] -A PREROUTING -j %(bn)s-PREROUTING\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A POSTROUTING -j %(bn)s-POSTROUTING\n'
'[0:0] -A POSTROUTING -j neutron-postrouting-bottom\n'
'[0:0] -A neutron-postrouting-bottom -j %(bn)s-snat\n'
'[0:0] -A %(bn)s-snat -j '
'%(bn)s-float-snat\n'
'COMMIT\n'
'# Completed by iptables_manager\n' % iptables_args)
expected_calls_and_values = [
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=nat_dump + filter_dump_mod,
root_helper=self.root_helper),
None),
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=nat_dump + filter_dump,
root_helper=self.root_helper),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(expected_calls_and_values,
filter_dump_ipv6)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
self.iptables.apply()
self.iptables.ipv4['filter'].empty_chain('filter')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_add_and_remove_chain_custom_binary_name(self):
self._test_add_and_remove_chain_custom_binary_name_helper(False)
def test_add_and_remove_chain_custom_binary_name_with_ipv6(self):
self._test_add_and_remove_chain_custom_binary_name_helper(True)
def _test_empty_chain_custom_binary_name_helper(self, use_ipv6):
bn = ("abcdef" * 5)[:16]
self.iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
binary_name=bn,
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
iptables_args = {'bn': bn}
filter_dump = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'[0:0] -A FORWARD -j neutron-filter-top\n'
'[0:0] -A OUTPUT -j neutron-filter-top\n'
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
'COMMIT\n'
'# Completed by iptables_manager\n' % iptables_args)
filter_dump_mod = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-filter - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'[0:0] -A FORWARD -j neutron-filter-top\n'
'[0:0] -A OUTPUT -j neutron-filter-top\n'
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
'[0:0] -A %(bn)s-filter -s 0/0 -d 192.168.0.2\n'
'COMMIT\n'
'# Completed by iptables_manager\n'
% iptables_args)
nat_dump = ('# Generated by iptables_manager\n'
'*nat\n'
':neutron-postrouting-bottom - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-POSTROUTING - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
':%(bn)s-float-snat - [0:0]\n'
':%(bn)s-snat - [0:0]\n'
'[0:0] -A PREROUTING -j %(bn)s-PREROUTING\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A POSTROUTING -j %(bn)s-POSTROUTING\n'
'[0:0] -A POSTROUTING -j neutron-postrouting-bottom\n'
'[0:0] -A neutron-postrouting-bottom -j %(bn)s-snat\n'
'[0:0] -A %(bn)s-snat -j '
'%(bn)s-float-snat\n'
'COMMIT\n'
'# Completed by iptables_manager\n' % iptables_args)
expected_calls_and_values = [
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=nat_dump + filter_dump_mod,
root_helper=self.root_helper),
None),
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=nat_dump + filter_dump,
root_helper=self.root_helper),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(expected_calls_and_values,
filter_dump)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
self.iptables.ipv4['filter'].add_rule('filter',
'-s 0/0 -d 192.168.0.2')
self.iptables.apply()
self.iptables.ipv4['filter'].remove_chain('filter')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_empty_chain_custom_binary_name(self):
self._test_empty_chain_custom_binary_name_helper(False)
def test_empty_chain_custom_binary_name_with_ipv6(self):
self._test_empty_chain_custom_binary_name_helper(True)
def _test_add_and_remove_chain_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
filter_dump_mod = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-filter - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'[0:0] -A FORWARD -j neutron-filter-top\n'
'[0:0] -A OUTPUT -j neutron-filter-top\n'
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
'COMMIT\n'
'# Completed by iptables_manager\n'
% IPTABLES_ARG)
expected_calls_and_values = [
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=NAT_DUMP + filter_dump_mod,
root_helper=self.root_helper),
None),
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=NAT_DUMP + FILTER_DUMP,
root_helper=self.root_helper),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(expected_calls_and_values,
FILTER_DUMP)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
self.iptables.apply()
self.iptables.ipv4['filter'].remove_chain('filter')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_add_and_remove_chain(self):
self._test_add_and_remove_chain_helper(False)
def test_add_and_remove_chain_with_ipv6(self):
self._test_add_and_remove_chain_helper(True)
def _test_add_filter_rule_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
filter_dump_mod = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-filter - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'[0:0] -A FORWARD -j neutron-filter-top\n'
'[0:0] -A OUTPUT -j neutron-filter-top\n'
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
'[0:0] -A %(bn)s-filter -j DROP\n'
'[0:0] -A %(bn)s-INPUT -s 0/0 -d 192.168.0.2 -j '
'%(bn)s-filter\n'
'COMMIT\n'
'# Completed by iptables_manager\n'
% IPTABLES_ARG)
expected_calls_and_values = [
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=NAT_DUMP + filter_dump_mod,
root_helper=self.root_helper),
None),
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=NAT_DUMP + FILTER_DUMP,
root_helper=self.root_helper
),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(expected_calls_and_values,
FILTER_DUMP)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
self.iptables.ipv4['filter'].add_rule('filter', '-j DROP')
self.iptables.ipv4['filter'].add_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' %(bn)s-filter' % IPTABLES_ARG)
self.iptables.apply()
self.iptables.ipv4['filter'].remove_rule('filter', '-j DROP')
self.iptables.ipv4['filter'].remove_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' %(bn)s-filter'
% IPTABLES_ARG)
self.iptables.ipv4['filter'].remove_chain('filter')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_add_filter_rule(self):
self._test_add_filter_rule_helper(False)
def test_add_filter_rule_with_ipv6(self):
self._test_add_filter_rule_helper(True)
def _test_rule_with_wrap_target_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
name = '0123456789' * 5
wrap = "%s-%s" % (iptables_manager.binary_name,
iptables_manager.get_chain_name(name))
iptables_args = {'bn': iptables_manager.binary_name,
'wrap': wrap}
filter_dump_mod = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
':%(wrap)s - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'[0:0] -A FORWARD -j neutron-filter-top\n'
'[0:0] -A OUTPUT -j neutron-filter-top\n'
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
'[0:0] -A %(bn)s-INPUT -s 0/0 -d 192.168.0.2 -j '
'%(wrap)s\n'
'COMMIT\n'
'# Completed by iptables_manager\n'
% iptables_args)
expected_calls_and_values = [
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=NAT_DUMP + filter_dump_mod,
root_helper=self.root_helper),
None),
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=NAT_DUMP + FILTER_DUMP,
root_helper=self.root_helper),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(expected_calls_and_values,
FILTER_DUMP)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain(name)
self.iptables.ipv4['filter'].add_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' $%s' % name)
self.iptables.apply()
self.iptables.ipv4['filter'].remove_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' $%s' % name)
self.iptables.ipv4['filter'].remove_chain(name)
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_rule_with_wrap_target(self):
self._test_rule_with_wrap_target_helper(False)
def test_rule_with_wrap_target_with_ipv6(self):
self._test_rule_with_wrap_target_helper(True)
def _test_add_nat_rule_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
nat_dump = ('# Generated by iptables_manager\n'
'*nat\n'
':neutron-postrouting-bottom - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-POSTROUTING - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
':%(bn)s-float-snat - [0:0]\n'
':%(bn)s-snat - [0:0]\n'
'[0:0] -A PREROUTING -j %(bn)s-PREROUTING\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A POSTROUTING -j %(bn)s-POSTROUTING\n'
'[0:0] -A POSTROUTING -j neutron-postrouting-bottom\n'
'[0:0] -A neutron-postrouting-bottom -j %(bn)s-snat\n'
'[0:0] -A %(bn)s-snat -j %(bn)s-float-snat\n'
'COMMIT\n'
'# Completed by iptables_manager\n'
% IPTABLES_ARG)
nat_dump_mod = ('# Generated by iptables_manager\n'
'*nat\n'
':neutron-postrouting-bottom - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-POSTROUTING - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
':%(bn)s-float-snat - [0:0]\n'
':%(bn)s-nat - [0:0]\n'
':%(bn)s-snat - [0:0]\n'
'[0:0] -A PREROUTING -j %(bn)s-PREROUTING\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A POSTROUTING -j %(bn)s-POSTROUTING\n'
'[0:0] -A POSTROUTING -j neutron-postrouting-bottom\n'
'[0:0] -A neutron-postrouting-bottom -j %(bn)s-snat\n'
'[0:0] -A %(bn)s-snat -j %(bn)s-float-snat\n'
'[0:0] -A %(bn)s-PREROUTING -d 192.168.0.3 -j '
'%(bn)s-nat\n'
'[0:0] -A %(bn)s-nat -p tcp --dport 8080 -j '
'REDIRECT --to-port 80\n'
'COMMIT\n'
'# Completed by iptables_manager\n'
% IPTABLES_ARG)
expected_calls_and_values = [
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=nat_dump_mod + FILTER_DUMP,
root_helper=self.root_helper),
None),
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=nat_dump + FILTER_DUMP,
root_helper=self.root_helper),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(expected_calls_and_values,
FILTER_DUMP)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['nat'].add_chain('nat')
self.iptables.ipv4['nat'].add_rule('PREROUTING',
'-d 192.168.0.3 -j '
'%(bn)s-nat' % IPTABLES_ARG)
self.iptables.ipv4['nat'].add_rule('nat',
'-p tcp --dport 8080' +
' -j REDIRECT --to-port 80')
self.iptables.apply()
self.iptables.ipv4['nat'].remove_rule('nat',
'-p tcp --dport 8080 -j'
' REDIRECT --to-port 80')
self.iptables.ipv4['nat'].remove_rule('PREROUTING',
'-d 192.168.0.3 -j '
'%(bn)s-nat' % IPTABLES_ARG)
self.iptables.ipv4['nat'].remove_chain('nat')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_add_nat_rule(self):
self._test_add_nat_rule_helper(False)
def test_add_nat_rule_with_ipv6(self):
self._test_add_nat_rule_helper(True)
def test_add_rule_to_a_nonexistent_chain(self):
self.assertRaises(LookupError, self.iptables.ipv4['filter'].add_rule,
'nonexistent', '-j DROP')
def test_remove_nonexistent_chain(self):
with mock.patch.object(iptables_manager, "LOG") as log:
self.iptables.ipv4['filter'].remove_chain('nonexistent')
log.warn.assert_called_once_with(
'Attempted to remove chain %s which does not exist',
'nonexistent')
def test_remove_nonexistent_rule(self):
with mock.patch.object(iptables_manager, "LOG") as log:
self.iptables.ipv4['filter'].remove_rule('nonexistent', '-j DROP')
log.warn.assert_called_once_with(
'Tried to remove rule that was not there: '
'%(chain)r %(rule)r %(wrap)r %(top)r',
{'wrap': True, 'top': False, 'rule': '-j DROP',
'chain': 'nonexistent'})
def test_iptables_failure_with_no_failing_line_number(self):
with mock.patch.object(iptables_manager, "LOG") as log:
# generate Runtime errors on iptables-restore calls
def iptables_restore_failer(*args, **kwargs):
if 'iptables-restore' in args[0]:
self.input_lines = kwargs['process_input'].split('\n')
# don't provide a specific failure message so all lines
# are logged
raise RuntimeError()
return FILTER_DUMP
self.execute.side_effect = iptables_restore_failer
# _apply_synchronized calls iptables-restore so it should raise
# a RuntimeError
self.assertRaises(RuntimeError,
self.iptables._apply_synchronized)
# The RuntimeError should have triggered a log of the input to the
# process that it failed to execute. Verify by comparing the log
# call to the 'process_input' arg given to the failed iptables-restore
# call.
# Failure without a specific line number in the error should cause
# all lines to be logged with numbers.
logged = ['%7d. %s' % (n, l)
for n, l in enumerate(self.input_lines, 1)]
log.error.assert_called_once_with(_(
'IPTablesManager.apply failed to apply the '
'following set of iptables rules:\n%s'),
'\n'.join(logged)
)
def test_iptables_failure_on_specific_line(self):
with mock.patch.object(iptables_manager, "LOG") as log:
# generate Runtime errors on iptables-restore calls
def iptables_restore_failer(*args, **kwargs):
if 'iptables-restore' in args[0]:
self.input_lines = kwargs['process_input'].split('\n')
# pretend line 11 failed
msg = ("Exit code: 1\nStdout: ''\n"
"Stderr: 'iptables-restore: line 11 failed\n'")
raise RuntimeError(msg)
return FILTER_DUMP
self.execute.side_effect = iptables_restore_failer
# _apply_synchronized calls iptables-restore so it should raise
# a RuntimeError
self.assertRaises(RuntimeError,
self.iptables._apply_synchronized)
# The RuntimeError should have triggered a log of the input to the
# process that it failed to execute. Verify by comparing the log
# call to the 'process_input' arg given to the failed iptables-restore
# call.
# Line 11 of the input was marked as failing so lines (11 - context)
# to (11 + context) should be logged
ctx = iptables_manager.IPTABLES_ERROR_LINES_OF_CONTEXT
log_start = max(0, 11 - ctx)
log_end = 11 + ctx
logged = ['%7d. %s' % (n, l)
for n, l in enumerate(self.input_lines[log_start:log_end],
log_start + 1)]
log.error.assert_called_once_with(_(
'IPTablesManager.apply failed to apply the '
'following set of iptables rules:\n%s'),
'\n'.join(logged)
)
def test_get_traffic_counters_chain_notexists(self):
with mock.patch.object(iptables_manager, "LOG") as log:
acc = self.iptables.get_traffic_counters('chain1')
self.assertIsNone(acc)
self.assertEqual(0, self.execute.call_count)
log.warn.assert_called_once_with(
'Attempted to get traffic counters of chain %s which '
'does not exist', 'chain1')
def _test_get_traffic_counters_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
exp_packets = 800
exp_bytes = 131802
iptables_dump = (
'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
' pkts bytes target prot opt in out source'
' destination \n'
' 400 65901 chain1 all -- * * 0.0.0.0/0'
' 0.0.0.0/0 \n'
' 400 65901 chain2 all -- * * 0.0.0.0/0'
' 0.0.0.0/0 \n')
expected_calls_and_values = [
(mock.call(['iptables', '-t', 'filter', '-L', 'OUTPUT',
'-n', '-v', '-x'],
root_helper=self.root_helper),
iptables_dump),
(mock.call(['iptables', '-t', 'nat', '-L', 'OUTPUT', '-n',
'-v', '-x'],
root_helper=self.root_helper),
''),
]
if use_ipv6:
expected_calls_and_values.append(
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
'-n', '-v', '-x'],
root_helper=self.root_helper),
iptables_dump))
exp_packets *= 2
exp_bytes *= 2
tools.setup_mock_calls(self.execute, expected_calls_and_values)
acc = self.iptables.get_traffic_counters('OUTPUT')
self.assertEqual(acc['pkts'], exp_packets)
self.assertEqual(acc['bytes'], exp_bytes)
tools.verify_mock_calls(self.execute, expected_calls_and_values,
any_order=True)
def test_get_traffic_counters(self):
self._test_get_traffic_counters_helper(False)
def test_get_traffic_counters_with_ipv6(self):
self._test_get_traffic_counters_helper(True)
def _test_get_traffic_counters_with_zero_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
exp_packets = 800
exp_bytes = 131802
iptables_dump = (
'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
' pkts bytes target prot opt in out source'
' destination \n'
' 400 65901 chain1 all -- * * 0.0.0.0/0'
' 0.0.0.0/0 \n'
' 400 65901 chain2 all -- * * 0.0.0.0/0'
' 0.0.0.0/0 \n')
expected_calls_and_values = [
(mock.call(['iptables', '-t', 'filter', '-L', 'OUTPUT',
'-n', '-v', '-x', '-Z'],
root_helper=self.root_helper),
iptables_dump),
(mock.call(['iptables', '-t', 'nat', '-L', 'OUTPUT', '-n',
'-v', '-x', '-Z'],
root_helper=self.root_helper),
'')
]
if use_ipv6:
expected_calls_and_values.append(
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
'-n', '-v', '-x', '-Z'],
root_helper=self.root_helper),
iptables_dump))
exp_packets *= 2
exp_bytes *= 2
tools.setup_mock_calls(self.execute, expected_calls_and_values)
acc = self.iptables.get_traffic_counters('OUTPUT', zero=True)
self.assertEqual(acc['pkts'], exp_packets)
self.assertEqual(acc['bytes'], exp_bytes)
tools.verify_mock_calls(self.execute, expected_calls_and_values,
any_order=True)
def test_get_traffic_counters_with_zero(self):
self._test_get_traffic_counters_with_zero_helper(False)
def test_get_traffic_counters_with_zero_with_ipv6(self):
self._test_get_traffic_counters_with_zero_helper(True)
def _test_find_last_entry(self, find_str):
filter_list = [':neutron-filter-top - [0:0]',
':%(bn)s-FORWARD - [0:0]',
':%(bn)s-INPUT - [0:0]',
':%(bn)s-local - [0:0]',
':%(wrap)s - [0:0]',
':%(bn)s-OUTPUT - [0:0]',
'[0:0] -A FORWARD -j neutron-filter-top',
'[0:0] -A OUTPUT -j neutron-filter-top'
% IPTABLES_ARG]
return self.iptables._find_last_entry(filter_list, find_str)
def test_find_last_entry_old_dup(self):
find_str = 'neutron-filter-top'
match_str = '[0:0] -A OUTPUT -j neutron-filter-top'
ret_str = self._test_find_last_entry(find_str)
self.assertEqual(ret_str, match_str)
def test_find_last_entry_none(self):
find_str = 'neutron-filter-NOTFOUND'
ret_str = self._test_find_last_entry(find_str)
self.assertIsNone(ret_str)
class IptablesManagerStateLessTestCase(base.BaseTestCase):
def setUp(self):
super(IptablesManagerStateLessTestCase, self).setUp()
self.iptables = (iptables_manager.IptablesManager(state_less=True))
def test_nat_not_found(self):
self.assertNotIn('nat', self.iptables.ipv4)