Merge "Ensure ip6tables are used only if ipv6 is enabled in kernel"
This commit is contained in:
commit
2783e0c40c
@ -33,6 +33,7 @@ from neutron.agent.linux import ra
|
||||
from neutron.agent import rpc as agent_rpc
|
||||
from neutron.common import config as common_config
|
||||
from neutron.common import constants as l3_constants
|
||||
from neutron.common import ipv6_utils
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils as common_utils
|
||||
@ -238,7 +239,8 @@ class LinkLocalAllocator(object):
|
||||
|
||||
class RouterInfo(object):
|
||||
|
||||
def __init__(self, router_id, root_helper, use_namespaces, router):
|
||||
def __init__(self, router_id, root_helper, use_namespaces, router,
|
||||
use_ipv6=False):
|
||||
self.router_id = router_id
|
||||
self.ex_gw_port = None
|
||||
self._snat_enabled = None
|
||||
@ -254,7 +256,7 @@ class RouterInfo(object):
|
||||
self.ns_name = NS_PREFIX + router_id if use_namespaces else None
|
||||
self.iptables_manager = iptables_manager.IptablesManager(
|
||||
root_helper=root_helper,
|
||||
#FIXME(danwent): use_ipv6=True,
|
||||
use_ipv6=use_ipv6,
|
||||
namespace=self.ns_name)
|
||||
self.routes = []
|
||||
# DVR Data
|
||||
@ -544,6 +546,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
||||
super(L3NATAgent, self).__init__(conf=self.conf)
|
||||
|
||||
self.target_ex_net_id = None
|
||||
self.use_ipv6 = ipv6_utils.is_enabled()
|
||||
|
||||
def _check_config_params(self):
|
||||
"""Check items in configuration files.
|
||||
@ -716,7 +719,8 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
||||
|
||||
def _router_added(self, router_id, router):
|
||||
ri = RouterInfo(router_id, self.root_helper,
|
||||
self.conf.use_namespaces, router)
|
||||
self.conf.use_namespaces, router,
|
||||
use_ipv6=self.use_ipv6)
|
||||
self.router_info[router_id] = ri
|
||||
if self.conf.use_namespaces:
|
||||
self._create_router_namespace(ri)
|
||||
@ -1186,11 +1190,10 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
||||
SNAT_INT_DEV_PREFIX)
|
||||
self._external_gateway_added(ri, ex_gw_port, gw_interface_name,
|
||||
snat_ns_name, preserve_ips=[])
|
||||
ri.snat_iptables_manager = (
|
||||
iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper, namespace=snat_ns_name
|
||||
)
|
||||
)
|
||||
ri.snat_iptables_manager = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
namespace=snat_ns_name,
|
||||
use_ipv6=self.use_ipv6)
|
||||
|
||||
def external_gateway_added(self, ri, ex_gw_port, interface_name):
|
||||
if ri.router['distributed']:
|
||||
|
@ -19,6 +19,7 @@ from oslo.config import cfg
|
||||
from neutron.agent import firewall
|
||||
from neutron.agent.linux import iptables_manager
|
||||
from neutron.common import constants
|
||||
from neutron.common import ipv6_utils
|
||||
from neutron.openstack.common import log as logging
|
||||
|
||||
|
||||
@ -41,7 +42,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
|
||||
def __init__(self):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=cfg.CONF.AGENT.root_helper,
|
||||
use_ipv6=True)
|
||||
use_ipv6=ipv6_utils.is_enabled())
|
||||
# list of port which has security group
|
||||
self.filtered_ports = {}
|
||||
self._add_fallback_chain_v4v6()
|
||||
|
@ -625,8 +625,10 @@ class IptablesManager(object):
|
||||
cmd_tables = [('iptables', key) for key, table in self.ipv4.items()
|
||||
if name in table._select_chain_set(wrap)]
|
||||
|
||||
cmd_tables += [('ip6tables', key) for key, table in self.ipv6.items()
|
||||
if name in table._select_chain_set(wrap)]
|
||||
if self.use_ipv6:
|
||||
cmd_tables += [('ip6tables', key)
|
||||
for key, table in self.ipv6.items()
|
||||
if name in table._select_chain_set(wrap)]
|
||||
|
||||
return cmd_tables
|
||||
|
||||
|
@ -20,6 +20,9 @@ IPv6-related utilities and helper functions.
|
||||
import netaddr
|
||||
|
||||
|
||||
_IS_IPV6_ENABLED = None
|
||||
|
||||
|
||||
def get_ipv6_addr_by_EUI64(prefix, mac):
|
||||
# Check if the prefix is IPv4 address
|
||||
isIPv4 = netaddr.valid_ipv4(prefix)
|
||||
@ -37,3 +40,14 @@ def get_ipv6_addr_by_EUI64(prefix, mac):
|
||||
except TypeError:
|
||||
raise TypeError(_('Bad prefix type for generate IPv6 address by '
|
||||
'EUI-64: %s') % prefix)
|
||||
|
||||
|
||||
def is_enabled():
|
||||
global _IS_IPV6_ENABLED
|
||||
|
||||
if _IS_IPV6_ENABLED is None:
|
||||
disabled_ipv6_path = "/proc/sys/net/ipv6/conf/default/disable_ipv6"
|
||||
with open(disabled_ipv6_path, 'r') as f:
|
||||
disabled = f.read().strip()
|
||||
_IS_IPV6_ENABLED = disabled == "0"
|
||||
return _IS_IPV6_ENABLED
|
||||
|
@ -20,6 +20,7 @@ from neutron.agent.common import config
|
||||
from neutron.agent.linux import interface
|
||||
from neutron.agent.linux import iptables_manager
|
||||
from neutron.common import constants as constants
|
||||
from neutron.common import ipv6_utils
|
||||
from neutron.common import log
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.openstack.common import log as logging
|
||||
@ -74,7 +75,8 @@ class RouterWithMetering(object):
|
||||
self.iptables_manager = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
namespace=self.ns_name,
|
||||
binary_name=WRAP_NAME)
|
||||
binary_name=WRAP_NAME,
|
||||
use_ipv6=ipv6_utils.is_enabled())
|
||||
self.metering_labels = {}
|
||||
|
||||
|
||||
|
@ -65,7 +65,8 @@ class IptablesDriverTestCase(base.BaseTestCase):
|
||||
|
||||
self.iptables_cls.assert_called_with(root_helper='fake_sudo',
|
||||
namespace=mock.ANY,
|
||||
binary_name=mock.ANY)
|
||||
binary_name=mock.ANY,
|
||||
use_ipv6=mock.ANY)
|
||||
|
||||
def test_add_metering_label(self):
|
||||
routers = [{'_metering_labels': [
|
||||
|
@ -67,8 +67,8 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(IptablesManagerStateFulTestCase, self).setUp()
|
||||
self.root_helper = 'sudo'
|
||||
self.iptables = (iptables_manager.
|
||||
IptablesManager(root_helper=self.root_helper))
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
def test_binary_name(self):
|
||||
@ -85,12 +85,32 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
self.assertEqual(iptables_manager.get_chain_name(name, wrap=True),
|
||||
name[:11])
|
||||
|
||||
def test_add_and_remove_chain_custom_binary_name(self):
|
||||
def _extend_with_ip6tables_filter(self, expected_calls, filter_dump):
|
||||
expected_calls.insert(2, (
|
||||
mock.call(['ip6tables-save', '-c'],
|
||||
root_helper=self.root_helper),
|
||||
''))
|
||||
expected_calls.insert(3, (
|
||||
mock.call(['ip6tables-restore', '-c'],
|
||||
process_input=filter_dump,
|
||||
root_helper=self.root_helper),
|
||||
None))
|
||||
expected_calls.extend([
|
||||
(mock.call(['ip6tables-save', '-c'],
|
||||
root_helper=self.root_helper),
|
||||
''),
|
||||
(mock.call(['ip6tables-restore', '-c'],
|
||||
process_input=filter_dump,
|
||||
root_helper=self.root_helper),
|
||||
None)])
|
||||
|
||||
def _test_add_and_remove_chain_custom_binary_name_helper(self, use_ipv6):
|
||||
bn = ("abcdef" * 5)
|
||||
|
||||
self.iptables = (iptables_manager.
|
||||
IptablesManager(root_helper=self.root_helper,
|
||||
binary_name=bn))
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
binary_name=bn,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
iptables_args = {'bn': bn[:16]}
|
||||
@ -112,6 +132,23 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
'COMMIT\n'
|
||||
'# Completed by iptables_manager\n' % iptables_args)
|
||||
|
||||
filter_dump_ipv6 = ('# Generated by iptables_manager\n'
|
||||
'*filter\n'
|
||||
':neutron-filter-top - [0:0]\n'
|
||||
':%(bn)s-FORWARD - [0:0]\n'
|
||||
':%(bn)s-INPUT - [0:0]\n'
|
||||
':%(bn)s-local - [0:0]\n'
|
||||
':%(bn)s-OUTPUT - [0:0]\n'
|
||||
'[0:0] -A FORWARD -j neutron-filter-top\n'
|
||||
'[0:0] -A OUTPUT -j neutron-filter-top\n'
|
||||
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
|
||||
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
|
||||
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
|
||||
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
|
||||
'COMMIT\n'
|
||||
'# Completed by iptables_manager\n' %
|
||||
iptables_args)
|
||||
|
||||
filter_dump_mod = ('# Generated by iptables_manager\n'
|
||||
'*filter\n'
|
||||
':neutron-filter-top - [0:0]\n'
|
||||
@ -164,6 +201,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
root_helper=self.root_helper),
|
||||
None),
|
||||
]
|
||||
if use_ipv6:
|
||||
self._extend_with_ip6tables_filter(expected_calls_and_values,
|
||||
filter_dump_ipv6)
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
self.iptables.ipv4['filter'].add_chain('filter')
|
||||
@ -174,12 +215,19 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_empty_chain_custom_binary_name(self):
|
||||
def test_add_and_remove_chain_custom_binary_name(self):
|
||||
self._test_add_and_remove_chain_custom_binary_name_helper(False)
|
||||
|
||||
def test_add_and_remove_chain_custom_binary_name_with_ipv6(self):
|
||||
self._test_add_and_remove_chain_custom_binary_name_helper(True)
|
||||
|
||||
def _test_empty_chain_custom_binary_name_helper(self, use_ipv6):
|
||||
bn = ("abcdef" * 5)[:16]
|
||||
|
||||
self.iptables = (iptables_manager.
|
||||
IptablesManager(root_helper=self.root_helper,
|
||||
binary_name=bn))
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
binary_name=bn,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
iptables_args = {'bn': bn}
|
||||
@ -253,6 +301,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
root_helper=self.root_helper),
|
||||
None),
|
||||
]
|
||||
if use_ipv6:
|
||||
self._extend_with_ip6tables_filter(expected_calls_and_values,
|
||||
filter_dump)
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
self.iptables.ipv4['filter'].add_chain('filter')
|
||||
@ -265,7 +317,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_add_and_remove_chain(self):
|
||||
def test_empty_chain_custom_binary_name(self):
|
||||
self._test_empty_chain_custom_binary_name_helper(False)
|
||||
|
||||
def test_empty_chain_custom_binary_name_with_ipv6(self):
|
||||
self._test_empty_chain_custom_binary_name_helper(True)
|
||||
|
||||
def _test_add_and_remove_chain_helper(self, use_ipv6):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
filter_dump_mod = ('# Generated by iptables_manager\n'
|
||||
'*filter\n'
|
||||
':neutron-filter-top - [0:0]\n'
|
||||
@ -300,6 +363,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
root_helper=self.root_helper),
|
||||
None),
|
||||
]
|
||||
if use_ipv6:
|
||||
self._extend_with_ip6tables_filter(expected_calls_and_values,
|
||||
FILTER_DUMP)
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
self.iptables.ipv4['filter'].add_chain('filter')
|
||||
@ -310,7 +377,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_add_filter_rule(self):
|
||||
def test_add_and_remove_chain(self):
|
||||
self._test_add_and_remove_chain_helper(False)
|
||||
|
||||
def test_add_and_remove_chain_with_ipv6(self):
|
||||
self._test_add_and_remove_chain_helper(True)
|
||||
|
||||
def _test_add_filter_rule_helper(self, use_ipv6):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
filter_dump_mod = ('# Generated by iptables_manager\n'
|
||||
'*filter\n'
|
||||
':neutron-filter-top - [0:0]\n'
|
||||
@ -349,6 +427,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
),
|
||||
None),
|
||||
]
|
||||
if use_ipv6:
|
||||
self._extend_with_ip6tables_filter(expected_calls_and_values,
|
||||
FILTER_DUMP)
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
self.iptables.ipv4['filter'].add_chain('filter')
|
||||
@ -369,7 +451,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_rule_with_wrap_target(self):
|
||||
def test_add_filter_rule(self):
|
||||
self._test_add_filter_rule_helper(False)
|
||||
|
||||
def test_add_filter_rule_with_ipv6(self):
|
||||
self._test_add_filter_rule_helper(True)
|
||||
|
||||
def _test_rule_with_wrap_target_helper(self, use_ipv6):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
name = '0123456789' * 5
|
||||
wrap = "%s-%s" % (iptables_manager.binary_name,
|
||||
iptables_manager.get_chain_name(name))
|
||||
@ -413,6 +506,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
root_helper=self.root_helper),
|
||||
None),
|
||||
]
|
||||
if use_ipv6:
|
||||
self._extend_with_ip6tables_filter(expected_calls_and_values,
|
||||
FILTER_DUMP)
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
self.iptables.ipv4['filter'].add_chain(name)
|
||||
@ -430,7 +527,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_add_nat_rule(self):
|
||||
def test_rule_with_wrap_target(self):
|
||||
self._test_rule_with_wrap_target_helper(False)
|
||||
|
||||
def test_rule_with_wrap_target_with_ipv6(self):
|
||||
self._test_rule_with_wrap_target_helper(True)
|
||||
|
||||
def _test_add_nat_rule_helper(self, use_ipv6):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
nat_dump = ('# Generated by iptables_manager\n'
|
||||
'*nat\n'
|
||||
':neutron-postrouting-bottom - [0:0]\n'
|
||||
@ -488,6 +596,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
root_helper=self.root_helper),
|
||||
None),
|
||||
]
|
||||
if use_ipv6:
|
||||
self._extend_with_ip6tables_filter(expected_calls_and_values,
|
||||
FILTER_DUMP)
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
self.iptables.ipv4['nat'].add_chain('nat')
|
||||
@ -512,6 +624,12 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_add_nat_rule(self):
|
||||
self._test_add_nat_rule_helper(False)
|
||||
|
||||
def test_add_nat_rule_with_ipv6(self):
|
||||
self._test_add_nat_rule_helper(True)
|
||||
|
||||
def test_add_rule_to_a_nonexistent_chain(self):
|
||||
self.assertRaises(LookupError, self.iptables.ipv4['filter'].add_rule,
|
||||
'nonexistent', '-j DROP')
|
||||
@ -604,7 +722,14 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
'Attempted to get traffic counters of chain %s which '
|
||||
'does not exist', 'chain1')
|
||||
|
||||
def test_get_traffic_counters(self):
|
||||
def _test_get_traffic_counters_helper(self, use_ipv6):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
exp_packets = 800
|
||||
exp_bytes = 131802
|
||||
|
||||
iptables_dump = (
|
||||
'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
|
||||
' pkts bytes target prot opt in out source'
|
||||
@ -623,20 +748,38 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
'-v', '-x'],
|
||||
root_helper=self.root_helper),
|
||||
''),
|
||||
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
|
||||
'-n', '-v', '-x'],
|
||||
root_helper=self.root_helper),
|
||||
iptables_dump),
|
||||
]
|
||||
if use_ipv6:
|
||||
expected_calls_and_values.append(
|
||||
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
|
||||
'-n', '-v', '-x'],
|
||||
root_helper=self.root_helper),
|
||||
iptables_dump))
|
||||
exp_packets *= 2
|
||||
exp_bytes *= 2
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
acc = self.iptables.get_traffic_counters('OUTPUT')
|
||||
self.assertEqual(acc['pkts'], 1600)
|
||||
self.assertEqual(acc['bytes'], 263604)
|
||||
self.assertEqual(acc['pkts'], exp_packets)
|
||||
self.assertEqual(acc['bytes'], exp_bytes)
|
||||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_get_traffic_counters_with_zero(self):
|
||||
def test_get_traffic_counters(self):
|
||||
self._test_get_traffic_counters_helper(False)
|
||||
|
||||
def test_get_traffic_counters_with_ipv6(self):
|
||||
self._test_get_traffic_counters_helper(True)
|
||||
|
||||
def _test_get_traffic_counters_with_zero_helper(self, use_ipv6):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
exp_packets = 800
|
||||
exp_bytes = 131802
|
||||
|
||||
iptables_dump = (
|
||||
'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
|
||||
' pkts bytes target prot opt in out source'
|
||||
@ -654,20 +797,31 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
||||
(mock.call(['iptables', '-t', 'nat', '-L', 'OUTPUT', '-n',
|
||||
'-v', '-x', '-Z'],
|
||||
root_helper=self.root_helper),
|
||||
''),
|
||||
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
|
||||
'-n', '-v', '-x', '-Z'],
|
||||
root_helper=self.root_helper),
|
||||
iptables_dump),
|
||||
'')
|
||||
]
|
||||
if use_ipv6:
|
||||
expected_calls_and_values.append(
|
||||
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
|
||||
'-n', '-v', '-x', '-Z'],
|
||||
root_helper=self.root_helper),
|
||||
iptables_dump))
|
||||
exp_packets *= 2
|
||||
exp_bytes *= 2
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
acc = self.iptables.get_traffic_counters('OUTPUT', zero=True)
|
||||
self.assertEqual(acc['pkts'], 1600)
|
||||
self.assertEqual(acc['bytes'], 263604)
|
||||
self.assertEqual(acc['pkts'], exp_packets)
|
||||
self.assertEqual(acc['bytes'], exp_bytes)
|
||||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_get_traffic_counters_with_zero(self):
|
||||
self._test_get_traffic_counters_with_zero_helper(False)
|
||||
|
||||
def test_get_traffic_counters_with_zero_with_ipv6(self):
|
||||
self._test_get_traffic_counters_with_zero_helper(True)
|
||||
|
||||
def _test_find_last_entry(self, find_str):
|
||||
filter_list = [':neutron-filter-top - [0:0]',
|
||||
':%(bn)s-FORWARD - [0:0]',
|
||||
|
@ -13,6 +13,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutron.common import ipv6_utils
|
||||
from neutron.tests import base
|
||||
|
||||
@ -48,3 +50,29 @@ class IPv6byEUI64TestCase(base.BaseTestCase):
|
||||
prefix = 123
|
||||
self.assertRaises(TypeError, lambda:
|
||||
ipv6_utils.get_ipv6_addr_by_EUI64(prefix, mac))
|
||||
|
||||
|
||||
class TestIsEnabled(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestIsEnabled, self).setUp()
|
||||
ipv6_utils._IS_IPV6_ENABLED = None
|
||||
mock_open = mock.patch("__builtin__.open").start()
|
||||
self.mock_read = mock_open.return_value.__enter__.return_value.read
|
||||
|
||||
def test_enabled(self):
|
||||
self.mock_read.return_value = "0"
|
||||
enabled = ipv6_utils.is_enabled()
|
||||
self.assertTrue(enabled)
|
||||
|
||||
def test_disabled(self):
|
||||
self.mock_read.return_value = "1"
|
||||
enabled = ipv6_utils.is_enabled()
|
||||
self.assertFalse(enabled)
|
||||
|
||||
def test_memoize(self):
|
||||
self.mock_read.return_value = "0"
|
||||
ipv6_utils.is_enabled()
|
||||
enabled = ipv6_utils.is_enabled()
|
||||
self.assertTrue(enabled)
|
||||
self.mock_read.assert_called_once_with()
|
||||
|
@ -1821,6 +1821,9 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
|
||||
self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
|
||||
|
||||
self.iptables = self.agent.firewall.iptables
|
||||
# TODO(jlibosva) Get rid of mocking iptables execute and mock out
|
||||
# firewall instead
|
||||
self.iptables.use_ipv6 = True
|
||||
self.iptables_execute = mock.patch.object(self.iptables,
|
||||
"execute").start()
|
||||
self.iptables_execute_return_values = []
|
||||
|
Loading…
x
Reference in New Issue
Block a user