vmware-nsx/quantum/tests/unit/test_iptables_manager.py
Akihiro MOTOKI 5b8ff27d98 Ensure max length of iptables chain name w/o prefix is up to 11 chars.
The maximum length of Linux iptables chain name must be less than or
equal to 28 characters. In iptables_manager binary_name up to 16 chars
is used as a prefix and a '-' follows it, so a chain name passed to
iptables_manager must be less than 12 character long. Accordingky
MAX_CHAIN_LEN should be changed from 28 to 12. Also this commit
introduces a method to get a chain name with valid length.

Since iptables_firewall module constructs a rule by directly using
a chain name, iptable_firewall also must take care of the length.

Fixes bug #1133833

Change-Id: I6157d519f3cb91ec32dc6a92eae45439b8717b2d
2013-03-01 23:50:55 +09:00

305 lines
15 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Locaweb.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# @author: Juliano Martinez, Locaweb.
import inspect
import os
import mox
import testtools
from quantum.agent.linux import iptables_manager
class IptablesManagerStateFulTestCase(testtools.TestCase):
def setUp(self):
super(IptablesManagerStateFulTestCase, self).setUp()
self.mox = mox.Mox()
self.root_helper = 'sudo'
self.iptables = (iptables_manager.
IptablesManager(root_helper=self.root_helper))
self.mox.StubOutWithMock(self.iptables, "execute")
self.addCleanup(self.mox.UnsetStubs)
def test_binary_name(self):
self.assertEqual(iptables_manager.binary_name,
os.path.basename(inspect.stack()[-1][1])[:16])
def test_get_chanin_name(self):
name = '0123456789' * 5
# 28 chars is the maximum length of iptables chain name.
self.assertEqual(iptables_manager.get_chain_name(name, wrap=False),
name[:28])
# 11 chars is the maximum length of chain name of iptable_manager
# if binary_name is prepended.
self.assertEqual(iptables_manager.get_chain_name(name, wrap=True),
name[:11])
def test_add_and_remove_chain(self):
bn = iptables_manager.binary_name
self.iptables.execute(['iptables-save', '-t', 'filter'],
root_helper=self.root_helper).AndReturn('')
nat_dump = (':%s-OUTPUT - [0:0]\n:%s-snat - [0:0]\n:%s-PREROUTING -'
' [0:0]\n:%s-float-snat - [0:0]\n:%s-POSTROUTING - [0:0]'
'\n:quantum-postrouting-bottom - [0:0]\n-A PREROUTING -j'
' %s-PREROUTING\n-A OUTPUT -j %s-OUTPUT\n-A POSTROUTING '
'-j %s-POSTROUTING\n-A POSTROUTING -j quantum-postroutin'
'g-bottom\n-A quantum-postrouting-bottom -j %s-snat\n-A '
'%s-snat -j %s-float-snat\n' % (bn, bn, bn, bn, bn, bn,
bn, bn, bn, bn, bn))
self.iptables.execute(['iptables-restore'],
process_input=(':%s-FORWARD - [0:0]\n:%s-INPUT'
' - [0:0]\n:%s-local - [0:0]\n:%s-filter - [0:'
'0]\n:%s-OUTPUT - [0:0]\n:quantum-filter-top -'
' [0:0]\n-A FORWARD -j quantum-filter-top\n-A '
'OUTPUT -j quantum-filter-top\n-A quantum-filt'
'er-top -j %s-local\n-A INPUT -j %s-INPUT\n-A '
'OUTPUT -j %s-OUTPUT\n-A FORWARD -j %s-FORWARD'
'\n' % (bn, bn, bn, bn, bn, bn, bn, bn, bn)
), root_helper=self.root_helper).AndReturn(None)
self.iptables.execute(['iptables-save', '-t', 'nat'],
root_helper=self.root_helper).AndReturn('')
self.iptables.execute(['iptables-restore'],
process_input=nat_dump,
root_helper=self.root_helper).AndReturn(None)
self.iptables.execute(['iptables-save', '-t', 'filter'],
root_helper=self.root_helper).AndReturn('')
self.iptables.execute(['iptables-restore'],
process_input=(':%s-FORWARD - [0:0]\n:%s-INPUT'
' - [0:0]\n:%s-local - [0:0]\n:%s-OUTPUT - [0:'
'0]\n:quantum-filter-top - [0:0]\n-A FORWARD -'
'j quantum-filter-top\n-A OUTPUT -j quantum-fi'
'lter-top\n-A quantum-filter-top -j %s-local\n'
'-A INPUT -j %s-INPUT\n-A OUTPUT -j %s-OUTPUT'
'\n-A FORWARD -j %s-FORWARD\n' % (bn, bn, bn, bn,
bn, bn, bn, bn)), root_helper=self.root_helper
).AndReturn(None)
self.iptables.execute(['iptables-save', '-t', 'nat'],
root_helper=self.root_helper).AndReturn('')
self.iptables.execute(['iptables-restore'],
process_input=nat_dump,
root_helper=self.root_helper).AndReturn(None)
self.mox.ReplayAll()
self.iptables.ipv4['filter'].add_chain('filter')
self.iptables.apply()
self.iptables.ipv4['filter'].remove_chain('filter')
self.iptables.apply()
self.mox.VerifyAll()
def test_add_filter_rule(self):
bn = iptables_manager.binary_name
self.iptables.execute(['iptables-save', '-t', 'filter'],
root_helper=self.root_helper).AndReturn('')
nat_dump = (':%s-OUTPUT - [0:0]\n:%s-snat - [0:0]\n:%s-PREROUTING -'
' [0:0]\n:%s-float-snat - [0:0]\n:%s-POSTROUTING - [0:0]'
'\n:quantum-postrouting-bottom - [0:0]\n-A PREROUTING -j'
' %s-PREROUTING\n-A OUTPUT -j %s-OUTPUT\n-A POSTROUTING '
'-j %s-POSTROUTING\n-A POSTROUTING -j quantum-postroutin'
'g-bottom\n-A quantum-postrouting-bottom -j %s-snat\n-A '
'%s-snat -j %s-float-snat\n' % (bn, bn, bn, bn, bn, bn,
bn, bn, bn, bn, bn))
self.iptables.execute(['iptables-restore'],
process_input=(':%s-FORWARD - [0:0]\n:%s-INPUT'
' - [0:0]\n:%s-local - [0:0]\n:%s-filter - [0:'
'0]\n:%s-OUTPUT - [0:0]\n:quantum-filter-top -'
' [0:0]\n-A FORWARD -j quantum-filter-top\n-A '
'OUTPUT -j quantum-filter-top\n-A quantum-filt'
'er-top -j %s-local\n-A INPUT -j %s-INPUT\n-A '
'OUTPUT -j %s-OUTPUT\n-A FORWARD -j %s-FORWARD'
'\n-A %s-filter -j DROP\n-A %s-INPUT -s 0/0 -d'
' 192.168.0.2 -j %s-filter\n' % (bn, bn, bn, bn,
bn, bn, bn, bn, bn, bn, bn, bn)),
root_helper=self.root_helper).AndReturn(None)
self.iptables.execute(['iptables-save', '-t', 'nat'],
root_helper=self.root_helper).AndReturn('')
self.iptables.execute(['iptables-restore'],
process_input=nat_dump,
root_helper=self.root_helper).AndReturn(None)
self.iptables.execute(['iptables-save', '-t', 'filter'],
root_helper=self.root_helper).AndReturn('')
self.iptables.execute(['iptables-restore'],
process_input=(':%s-FORWARD - [0:0]\n:%s-INPUT -'
' [0:0]\n:%s-local - [0:0]\n:%s-OUTPUT - [0:0]\n'
':quantum-filter-top - [0:0]\n-A FORWARD -j quan'
'tum-filter-top\n-A OUTPUT -j quantum-filter-top'
'\n-A quantum-filter-top -j %s-local\n-A INPUT -'
'j %s-INPUT\n-A OUTPUT -j %s-OUTPUT\n-A FORWARD '
'-j %s-FORWARD\n' % (bn, bn, bn, bn, bn, bn, bn,
bn)), root_helper=self.root_helper
).AndReturn(None)
self.iptables.execute(['iptables-save', '-t', 'nat'],
root_helper=self.root_helper).AndReturn('')
self.iptables.execute(['iptables-restore'],
process_input=nat_dump,
root_helper=self.root_helper).AndReturn(None)
self.mox.ReplayAll()
self.iptables.ipv4['filter'].add_chain('filter')
self.iptables.ipv4['filter'].add_rule('filter', '-j DROP')
self.iptables.ipv4['filter'].add_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' %s-filter' %
(iptables_manager.binary_name))
self.iptables.apply()
self.iptables.ipv4['filter'].remove_rule('filter', '-j DROP')
self.iptables.ipv4['filter'].remove_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' %s-filter' %
(iptables_manager.
binary_name))
self.iptables.ipv4['filter'].remove_chain('filter')
self.iptables.apply()
self.mox.VerifyAll()
def test_add_nat_rule(self):
bn = iptables_manager.binary_name
filter_dump = (':%s-FORWARD - [0:0]\n:%s-INPUT - [0:0]\n:%s-local - '
'[0:0]\n:%s-OUTPUT - [0:0]\n:quantum-filter-top - [0:'
'0]\n-A FORWARD -j quantum-filter-top\n-A OUTPUT -j q'
'uantum-filter-top\n-A quantum-filter-top -j %s-local'
'\n-A INPUT -j %s-INPUT\n-A OUTPUT -j %s-OUTPUT\n-A F'
'ORWARD -j %s-FORWARD\n' % (bn, bn, bn, bn, bn,
bn, bn, bn))
self.iptables.execute(['iptables-save', '-t', 'filter'],
root_helper=self.root_helper).AndReturn('')
self.iptables.execute(['iptables-restore'],
process_input=filter_dump,
root_helper=self.root_helper).AndReturn(None)
self.iptables.execute(['iptables-save', '-t', 'nat'],
root_helper=self.root_helper).AndReturn('')
self.iptables.execute(['iptables-restore'],
process_input=(':%s-float-snat - [0:0]\n:%s-POS'
'TROUTING - [0:0]\n:%s-PREROUTING - [0:0]\n:%s-'
'nat - [0:0]\n:%s-OUTPUT - [0:0]\n:%s-snat - [0'
':0]\n:quantum-postrouting-bottom - [0:0]\n-A P'
'REROUTING -j %s-PREROUTING\n-A OUTPUT -j %s-OU'
'TPUT\n-A POSTROUTING -j %s-POSTROUTING\n-A POS'
'TROUTING -j quantum-postrouting-bottom\n-A qua'
'ntum-postrouting-bottom -j %s-snat\n-A %s-snat'
' -j %s-float-snat\n-A %s-PREROUTING -d 192.168'
'.0.3 -j %s-nat\n-A %s-nat -p tcp --dport 8080 '
'-j REDIRECT --to-port 80\n' % (bn, bn, bn, bn,
bn, bn, bn, bn, bn, bn, bn, bn, bn, bn, bn)),
root_helper=self.root_helper).AndReturn(None)
self.iptables.execute(['iptables-save', '-t', 'filter'],
root_helper=self.root_helper).AndReturn('')
self.iptables.execute(['iptables-restore'],
process_input=filter_dump,
root_helper=self.root_helper).AndReturn(None)
self.iptables.execute(['iptables-save', '-t', 'nat'],
root_helper=self.root_helper).AndReturn('')
self.iptables.execute(['iptables-restore'],
process_input=(':%s-float-snat - [0:0]\n:%s-POST'
'ROUTING - [0:0]\n:%s-PREROUTING - [0:0]\n:%s-OU'
'TPUT - [0:0]\n:%s-snat - [0:0]\n:quantum-postro'
'uting-bottom - [0:0]\n-A PREROUTING -j %s-PRERO'
'UTING\n-A OUTPUT -j %s-OUTPUT\n-A POSTROUTING -'
'j %s-POSTROUTING\n-A POSTROUTING -j quantum-pos'
'trouting-bottom\n-A quantum-postrouting-bottom '
'-j %s-snat\n-A %s-snat -j %s-float-snat\n' % (
bn, bn, bn, bn, bn, bn, bn, bn, bn, bn, bn)
), root_helper=self.root_helper).AndReturn(None)
self.mox.ReplayAll()
self.iptables.ipv4['nat'].add_chain('nat')
self.iptables.ipv4['nat'].add_rule('PREROUTING',
'-d 192.168.0.3 -j %s-nat' %
(iptables_manager.binary_name))
self.iptables.ipv4['nat'].add_rule('nat',
'-p tcp --dport 8080' +
' -j REDIRECT --to-port 80')
self.iptables.apply()
self.iptables.ipv4['nat'].remove_rule('nat',
'-p tcp --dport 8080 -j'
' REDIRECT --to-port 80')
self.iptables.ipv4['nat'].remove_rule('PREROUTING',
'-d 192.168.0.3 -j %s-nat' %
(iptables_manager.binary_name))
self.iptables.ipv4['nat'].remove_chain('nat')
self.iptables.apply()
self.mox.VerifyAll()
def test_add_rule_to_a_nonexistent_chain(self):
self.assertRaises(LookupError, self.iptables.ipv4['filter'].add_rule,
'nonexistent', '-j DROP')
def test_remove_nonexistent_chain(self):
self.mox.StubOutWithMock(iptables_manager, "LOG")
iptables_manager.LOG.warn(('Attempted to remove chain %s which does '
'not exist'), 'nonexistent')
self.mox.ReplayAll()
self.iptables.ipv4['filter'].remove_chain('nonexistent')
self.mox.VerifyAll()
def test_remove_nonexistent_rule(self):
self.mox.StubOutWithMock(iptables_manager, "LOG")
iptables_manager.LOG.warn('Tried to remove rule that was not there: '
'%(chain)r %(rule)r %(wrap)r %(top)r',
{'wrap': True, 'top': False,
'rule': '-j DROP',
'chain': 'nonexistent'})
self.mox.ReplayAll()
self.iptables.ipv4['filter'].remove_rule('nonexistent', '-j DROP')
self.mox.VerifyAll()
class IptablesManagerStateLessTestCase(testtools.TestCase):
def setUp(self):
super(IptablesManagerStateLessTestCase, self).setUp()
self.iptables = (iptables_manager.IptablesManager(state_less=True))
def test_nat_not_found(self):
self.assertFalse('nat' in self.iptables.ipv4)