Merge "Install SNAT rules for ipv4 only"
This commit is contained in:
commit
e4f0ad3aa6
@ -449,7 +449,9 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
||||
namespace=ri.ns_name,
|
||||
prefix=INTERNAL_DEV_PREFIX)
|
||||
|
||||
internal_cidrs = [p['ip_cidr'] for p in ri.internal_ports]
|
||||
# Get IPv4 only internal CIDRs
|
||||
internal_cidrs = [p['ip_cidr'] for p in ri.internal_ports
|
||||
if netaddr.IPNetwork(p['ip_cidr']).version == 4]
|
||||
# TODO(salv-orlando): RouterInfo would be a better place for
|
||||
# this logic too
|
||||
ex_gw_port_id = (ex_gw_port and ex_gw_port['id'] or
|
||||
@ -528,11 +530,16 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
||||
# And add them back if the action if add_rules
|
||||
if action == 'add_rules' and ex_gw_port:
|
||||
# ex_gw_port should not be None in this case
|
||||
ex_gw_ip = ex_gw_port['fixed_ips'][0]['ip_address']
|
||||
for rule in self.external_gateway_nat_rules(ex_gw_ip,
|
||||
internal_cidrs,
|
||||
interface_name):
|
||||
ri.iptables_manager.ipv4['nat'].add_rule(*rule)
|
||||
# NAT rules are added only if ex_gw_port has an IPv4 address
|
||||
for ip_addr in ex_gw_port['fixed_ips']:
|
||||
ex_gw_ip = ip_addr['ip_address']
|
||||
if netaddr.IPAddress(ex_gw_ip).version == 4:
|
||||
rules = self.external_gateway_nat_rules(ex_gw_ip,
|
||||
internal_cidrs,
|
||||
interface_name)
|
||||
for rule in rules:
|
||||
ri.iptables_manager.ipv4['nat'].add_rule(*rule)
|
||||
break
|
||||
ri.iptables_manager.apply()
|
||||
|
||||
def process_router_floating_ip_nat_rules(self, ri):
|
||||
|
@ -17,6 +17,7 @@ import contextlib
|
||||
import copy
|
||||
|
||||
import mock
|
||||
import netaddr
|
||||
from oslo.config import cfg
|
||||
from testtools import matchers
|
||||
|
||||
@ -333,24 +334,37 @@ class TestBasicRouterOperations(base.BaseTestCase):
|
||||
'! -i %s ! -o %s -m conntrack ! --ctstate DNAT -j ACCEPT' %
|
||||
(interface_name, interface_name)]
|
||||
for source_cidr in source_cidrs:
|
||||
value_dict = {'source_cidr': source_cidr,
|
||||
'source_nat_ip': source_nat_ip}
|
||||
expected_rules.append('-s %(source_cidr)s -j SNAT --to-source '
|
||||
'%(source_nat_ip)s' % value_dict)
|
||||
# Create SNAT rules for IPv4 only
|
||||
if (netaddr.IPNetwork(source_cidr).version == 4 and
|
||||
netaddr.IPNetwork(source_nat_ip).version == 4):
|
||||
value_dict = {'source_cidr': source_cidr,
|
||||
'source_nat_ip': source_nat_ip}
|
||||
expected_rules.append('-s %(source_cidr)s -j SNAT --to-source '
|
||||
'%(source_nat_ip)s' % value_dict)
|
||||
for r in rules:
|
||||
if negate:
|
||||
self.assertNotIn(r.rule, expected_rules)
|
||||
else:
|
||||
self.assertIn(r.rule, expected_rules)
|
||||
|
||||
def _prepare_router_data(self, enable_snat=None, num_internal_ports=1):
|
||||
def _prepare_router_data(self, ip_version=4,
|
||||
enable_snat=None, num_internal_ports=1):
|
||||
if ip_version == 4:
|
||||
ip_addr = '19.4.4.4'
|
||||
cidr = '19.4.4.0/24'
|
||||
gateway_ip = '19.4.4.1'
|
||||
elif ip_version == 6:
|
||||
ip_addr = 'fd00::4'
|
||||
cidr = 'fd00::/64'
|
||||
gateway_ip = 'fd00::1'
|
||||
|
||||
router_id = _uuid()
|
||||
ex_gw_port = {'id': _uuid(),
|
||||
'network_id': _uuid(),
|
||||
'fixed_ips': [{'ip_address': '19.4.4.4',
|
||||
'fixed_ips': [{'ip_address': ip_addr,
|
||||
'subnet_id': _uuid()}],
|
||||
'subnet': {'cidr': '19.4.4.0/24',
|
||||
'gateway_ip': '19.4.4.1'}}
|
||||
'subnet': {'cidr': cidr,
|
||||
'gateway_ip': gateway_ip}}
|
||||
int_ports = []
|
||||
for i in range(num_internal_ports):
|
||||
int_ports.append({'id': _uuid(),
|
||||
@ -638,6 +652,99 @@ class TestBasicRouterOperations(base.BaseTestCase):
|
||||
# send_arp is called both times process_router is called
|
||||
self.assertEqual(self.send_arp.call_count, 2)
|
||||
|
||||
def test_process_ipv6_only_gw(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router = self._prepare_router_data(ip_version=6)
|
||||
# Get NAT rules without the gw_port
|
||||
gw_port = router['gw_port']
|
||||
router['gw_port'] = None
|
||||
ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
|
||||
self.conf.use_namespaces, router=router)
|
||||
agent.external_gateway_added = mock.Mock()
|
||||
agent.process_router(ri)
|
||||
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
|
||||
|
||||
# Get NAT rules with the gw_port
|
||||
router['gw_port'] = gw_port
|
||||
ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
|
||||
self.conf.use_namespaces, router=router)
|
||||
with mock.patch.object(
|
||||
agent,
|
||||
'external_gateway_nat_rules') as external_gateway_nat_rules:
|
||||
agent.process_router(ri)
|
||||
new_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
|
||||
|
||||
# There should be no change with the NAT rules
|
||||
self.assertFalse(external_gateway_nat_rules.called)
|
||||
self.assertEqual(orig_nat_rules, new_nat_rules)
|
||||
|
||||
def test_process_router_ipv6_interface_added(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router = self._prepare_router_data()
|
||||
ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
|
||||
self.conf.use_namespaces, router=router)
|
||||
agent.external_gateway_added = mock.Mock()
|
||||
# Process with NAT
|
||||
agent.process_router(ri)
|
||||
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
|
||||
# Add an IPv6 interface and reprocess
|
||||
router[l3_constants.INTERFACE_KEY].append(
|
||||
{'id': _uuid(),
|
||||
'network_id': _uuid(),
|
||||
'admin_state_up': True,
|
||||
'fixed_ips': [{'ip_address': 'fd00::2',
|
||||
'subnet_id': _uuid()}],
|
||||
'mac_address': 'ca:fe:de:ad:be:ef',
|
||||
'subnet': {'cidr': 'fd00::/64',
|
||||
'gateway_ip': 'fd00::1'}})
|
||||
# Reassign the router object to RouterInfo
|
||||
ri.router = router
|
||||
agent.process_router(ri)
|
||||
# For some reason set logic does not work well with
|
||||
# IpTablesRule instances
|
||||
nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
|
||||
if r not in orig_nat_rules]
|
||||
self.assertFalse(nat_rules_delta)
|
||||
|
||||
def test_process_router_ipv6v4_interface_added(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router = self._prepare_router_data()
|
||||
ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
|
||||
self.conf.use_namespaces, router=router)
|
||||
agent.external_gateway_added = mock.Mock()
|
||||
# Process with NAT
|
||||
agent.process_router(ri)
|
||||
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
|
||||
# Add an IPv4 and IPv6 interface and reprocess
|
||||
router[l3_constants.INTERFACE_KEY].append(
|
||||
{'id': _uuid(),
|
||||
'network_id': _uuid(),
|
||||
'admin_state_up': True,
|
||||
'fixed_ips': [{'ip_address': '35.4.1.4',
|
||||
'subnet_id': _uuid()}],
|
||||
'mac_address': 'ca:fe:de:ad:be:ef',
|
||||
'subnet': {'cidr': '35.4.1.0/24',
|
||||
'gateway_ip': '35.4.1.1'}})
|
||||
|
||||
router[l3_constants.INTERFACE_KEY].append(
|
||||
{'id': _uuid(),
|
||||
'network_id': _uuid(),
|
||||
'admin_state_up': True,
|
||||
'fixed_ips': [{'ip_address': 'fd00::2',
|
||||
'subnet_id': _uuid()}],
|
||||
'mac_address': 'ca:fe:de:ad:be:ef',
|
||||
'subnet': {'cidr': 'fd00::/64',
|
||||
'gateway_ip': 'fd00::1'}})
|
||||
# Reassign the router object to RouterInfo
|
||||
ri.router = router
|
||||
agent.process_router(ri)
|
||||
# For some reason set logic does not work well with
|
||||
# IpTablesRule instances
|
||||
nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
|
||||
if r not in orig_nat_rules]
|
||||
self.assertEqual(1, len(nat_rules_delta))
|
||||
self._verify_snat_rules(nat_rules_delta, router)
|
||||
|
||||
def test_process_router_interface_removed(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router = self._prepare_router_data(num_internal_ports=2)
|
||||
|
Loading…
x
Reference in New Issue
Block a user