diff --git a/neutron/agent/linux/iptables_firewall.py b/neutron/agent/linux/iptables_firewall.py index b39c23e65e..9c8f283662 100644 --- a/neutron/agent/linux/iptables_firewall.py +++ b/neutron/agent/linux/iptables_firewall.py @@ -325,10 +325,19 @@ class IptablesFirewallDriver(firewall.FirewallDriver): return iptables_rule def _port_arg(self, direction, protocol, port_range_min, port_range_max): - if not (protocol in ['udp', 'tcp'] and port_range_min): + if (protocol not in ['udp', 'tcp', 'icmp', 'icmpv6'] + or not port_range_min): return [] - if port_range_min == port_range_max: + if protocol in ['icmp', 'icmpv6']: + # Note(xuhanp): port_range_min/port_range_max represent + # icmp type/code when protocal is icmp or icmpv6 + # icmp code can be 0 so we cannot use "if port_range_max" here + if port_range_max is not None: + return ['--%s-type' % protocol, + '%s/%s' % (port_range_min, port_range_max)] + return ['--%s-type' % protocol, '%s' % port_range_min] + elif port_range_min == port_range_max: return ['--%s' % direction, '%s' % (port_range_min,)] else: return ['-m', 'multiport', diff --git a/neutron/tests/unit/test_iptables_firewall.py b/neutron/tests/unit/test_iptables_firewall.py index b65ee780f5..e64080356c 100644 --- a/neutron/tests/unit/test_iptables_firewall.py +++ b/neutron/tests/unit/test_iptables_firewall.py @@ -323,6 +323,46 @@ class IptablesFirewallTestCase(base.BaseTestCase): ingress = None self._test_prepare_port_filter(rule, ingress, egress) + def test_filter_ipv4_egress_icmp_type(self): + prefix = FAKE_PREFIX['IPv4'] + rule = {'ethertype': 'IPv4', + 'direction': 'egress', + 'protocol': 'icmp', + 'source_port_range_min': 8, + 'source_ip_prefix': prefix} + egress = call.add_rule( + 'ofake_dev', + '-s %s -p icmp --icmp-type 8 -j RETURN' % prefix) + ingress = None + self._test_prepare_port_filter(rule, ingress, egress) + + def test_filter_ipv4_egress_icmp_type_name(self): + prefix = FAKE_PREFIX['IPv4'] + rule = {'ethertype': 'IPv4', + 'direction': 'egress', + 'protocol': 'icmp', + 'source_port_range_min': 'echo-request', + 'source_ip_prefix': prefix} + egress = call.add_rule( + 'ofake_dev', + '-s %s -p icmp --icmp-type echo-request -j RETURN' % prefix) + ingress = None + self._test_prepare_port_filter(rule, ingress, egress) + + def test_filter_ipv4_egress_icmp_type_code(self): + prefix = FAKE_PREFIX['IPv4'] + rule = {'ethertype': 'IPv4', + 'direction': 'egress', + 'protocol': 'icmp', + 'source_port_range_min': 8, + 'source_port_range_max': 0, + 'source_ip_prefix': prefix} + egress = call.add_rule( + 'ofake_dev', + '-s %s -p icmp --icmp-type 8/0 -j RETURN' % prefix) + ingress = None + self._test_prepare_port_filter(rule, ingress, egress) + def test_filter_ipv4_egress_tcp_port(self): rule = {'ethertype': 'IPv4', 'direction': 'egress', @@ -621,6 +661,46 @@ class IptablesFirewallTestCase(base.BaseTestCase): ingress = None self._test_prepare_port_filter(rule, ingress, egress) + def test_filter_ipv6_egress_icmp_type(self): + prefix = FAKE_PREFIX['IPv6'] + rule = {'ethertype': 'IPv6', + 'direction': 'egress', + 'protocol': 'icmp', + 'source_port_range_min': 8, + 'source_ip_prefix': prefix} + egress = call.add_rule( + 'ofake_dev', + '-s %s -p icmpv6 --icmpv6-type 8 -j RETURN' % prefix) + ingress = None + self._test_prepare_port_filter(rule, ingress, egress) + + def test_filter_ipv6_egress_icmp_type_name(self): + prefix = FAKE_PREFIX['IPv6'] + rule = {'ethertype': 'IPv6', + 'direction': 'egress', + 'protocol': 'icmp', + 'source_port_range_min': 'echo-request', + 'source_ip_prefix': prefix} + egress = call.add_rule( + 'ofake_dev', + '-s %s -p icmpv6 --icmpv6-type echo-request -j RETURN' % prefix) + ingress = None + self._test_prepare_port_filter(rule, ingress, egress) + + def test_filter_ipv6_egress_icmp_type_code(self): + prefix = FAKE_PREFIX['IPv6'] + rule = {'ethertype': 'IPv6', + 'direction': 'egress', + 'protocol': 'icmp', + 'source_port_range_min': 8, + 'source_port_range_max': 0, + 'source_ip_prefix': prefix} + egress = call.add_rule( + 'ofake_dev', + '-s %s -p icmpv6 --icmpv6-type 8/0 -j RETURN' % prefix) + ingress = None + self._test_prepare_port_filter(rule, ingress, egress) + def test_filter_ipv6_egress_tcp_port(self): rule = {'ethertype': 'IPv6', 'direction': 'egress',