Process ICMP type for iptables firewall

In current security group code, source_port_range_min
and source_port_range_max are used to specify icmp type
and code when security group rule protocol is icmp.
However, the code _port_arg in iptables_firewall called
by _convert_sgr_to_iptables_rules skips protocol icmp
when processing the arg. This happens to both ipv4 and
ipv6 icmp firewall rules.

This fix adds --icmp-type to iptables firewall rule when
icmp type is specified.

Closes-Bug: 1289088

Change-Id: Iebf109f246d47cffc26ab3c2cf113234a4b2cffe
This commit is contained in:
Xuhan Peng 2014-03-06 20:55:28 -05:00
parent fd7d4d75ba
commit e7e4335486
2 changed files with 91 additions and 2 deletions

View File

@ -325,10 +325,19 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
return iptables_rule
def _port_arg(self, direction, protocol, port_range_min, port_range_max):
if not (protocol in ['udp', 'tcp'] and port_range_min):
if (protocol not in ['udp', 'tcp', 'icmp', 'icmpv6']
or not port_range_min):
return []
if port_range_min == port_range_max:
if protocol in ['icmp', 'icmpv6']:
# Note(xuhanp): port_range_min/port_range_max represent
# icmp type/code when protocal is icmp or icmpv6
# icmp code can be 0 so we cannot use "if port_range_max" here
if port_range_max is not None:
return ['--%s-type' % protocol,
'%s/%s' % (port_range_min, port_range_max)]
return ['--%s-type' % protocol, '%s' % port_range_min]
elif port_range_min == port_range_max:
return ['--%s' % direction, '%s' % (port_range_min,)]
else:
return ['-m', 'multiport',

View File

@ -323,6 +323,46 @@ class IptablesFirewallTestCase(base.BaseTestCase):
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_icmp_type(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'icmp',
'source_port_range_min': 8,
'source_ip_prefix': prefix}
egress = call.add_rule(
'ofake_dev',
'-s %s -p icmp --icmp-type 8 -j RETURN' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_icmp_type_name(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'icmp',
'source_port_range_min': 'echo-request',
'source_ip_prefix': prefix}
egress = call.add_rule(
'ofake_dev',
'-s %s -p icmp --icmp-type echo-request -j RETURN' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_icmp_type_code(self):
prefix = FAKE_PREFIX['IPv4']
rule = {'ethertype': 'IPv4',
'direction': 'egress',
'protocol': 'icmp',
'source_port_range_min': 8,
'source_port_range_max': 0,
'source_ip_prefix': prefix}
egress = call.add_rule(
'ofake_dev',
'-s %s -p icmp --icmp-type 8/0 -j RETURN' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv4_egress_tcp_port(self):
rule = {'ethertype': 'IPv4',
'direction': 'egress',
@ -621,6 +661,46 @@ class IptablesFirewallTestCase(base.BaseTestCase):
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_icmp_type(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'icmp',
'source_port_range_min': 8,
'source_ip_prefix': prefix}
egress = call.add_rule(
'ofake_dev',
'-s %s -p icmpv6 --icmpv6-type 8 -j RETURN' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_icmp_type_name(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'icmp',
'source_port_range_min': 'echo-request',
'source_ip_prefix': prefix}
egress = call.add_rule(
'ofake_dev',
'-s %s -p icmpv6 --icmpv6-type echo-request -j RETURN' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_icmp_type_code(self):
prefix = FAKE_PREFIX['IPv6']
rule = {'ethertype': 'IPv6',
'direction': 'egress',
'protocol': 'icmp',
'source_port_range_min': 8,
'source_port_range_max': 0,
'source_ip_prefix': prefix}
egress = call.add_rule(
'ofake_dev',
'-s %s -p icmpv6 --icmpv6-type 8/0 -j RETURN' % prefix)
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def test_filter_ipv6_egress_tcp_port(self):
rule = {'ethertype': 'IPv6',
'direction': 'egress',