vmware-nsx/neutron/tests/unit/oneconvergence/test_security_group.py
Xuhan Peng 67998705a0 Permit ICMPv6 RAs only from known routers
Currently ingress ICMPv6 RAs are permitted from any IPs by
default to allow VMs to accept ICMPv6 RA from provider network.
In this way, VM can accept RAs from attacker VM and configure
a network prefix specified by the attacher VM.

Remove permitting ICMPv6 RAs from any IPs and add security rule
to only permit ICMPv6 RA from:

1. If the port's subnet is configured with ipv6_ra_mode value
(i.e.value is slaac, dhcpv6-stateful, or dhcpv6-stateless), RA
is sending from dnsmasq controlled by OpenStack. In this case,
allow RA from the link local address of gateway port (if the
gateway port is created).

2. If the subnet's gateway port is not managed by OpenStack, allow
the ICMPv6 RA sent from the subnet gateway IP if it's a link local
address. The administrator needs to configure the gateway IP as
link local address in this case to make the RA rule work.

Change-Id: I1d5c7aaa8e4cf057204eb746c0faab2c70409a94
Closes-Bug: 1262759
2014-04-02 16:24:17 +08:00

157 lines
6.8 KiB
Python

# Copyright 2014 OneConvergence, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Kedar Kulkarni, One Convergence, Inc.
import uuid
import mock
from neutron.api.v2 import attributes
from neutron.extensions import securitygroup as ext_sg
from neutron import manager
from neutron.plugins.oneconvergence import plugin as nvsd_plugin
from neutron.tests.unit import test_extension_security_group as test_sg
from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
PLUGIN_NAME = ('neutron.plugins.oneconvergence.'
'plugin.OneConvergencePluginV2')
AGENTNOTIFIER = ('neutron.plugins.oneconvergence.'
'plugin.NVSDPluginV2AgentNotifierApi')
DUMMY_NVSD_LIB = ('neutron.tests.unit.oneconvergence.dummynvsdlib.NVSDApi')
class OneConvergenceSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
_plugin_name = PLUGIN_NAME
def setUp(self):
def mocked_oneconvergence_init(self):
def side_effect(*args, **kwargs):
return {'id': str(uuid.uuid4())}
self.nvsdlib = mock.Mock()
self.nvsdlib.create_network.side_effect = side_effect
test_sg_rpc.set_firewall_driver(test_sg_rpc.FIREWALL_HYBRID_DRIVER)
notifier_cls = mock.patch(AGENTNOTIFIER).start()
self.notifier = mock.Mock()
notifier_cls.return_value = self.notifier
self._attribute_map_bk_ = {}
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
self._attribute_map_bk_[item] = (attributes.
RESOURCE_ATTRIBUTE_MAP[item].
copy())
with mock.patch.object(nvsd_plugin.OneConvergencePluginV2,
'oneconvergence_init',
new=mocked_oneconvergence_init):
super(OneConvergenceSecurityGroupsTestCase,
self).setUp(PLUGIN_NAME)
def tearDown(self):
super(OneConvergenceSecurityGroupsTestCase, self).tearDown()
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
class TestOneConvergenceSGServerRpcCallBack(
OneConvergenceSecurityGroupsTestCase,
test_sg_rpc.SGServerRpcCallBackMixinTestCase):
def test_security_group_rules_for_devices_ipv6_egress(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_rules_for_devices_ipv6_ingress(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_rules_for_devices_ipv6_source_group(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_ra_rules_for_devices_ipv6_gateway_global(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_ra_rules_for_devices_ipv6_gateway_lla(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_ra_rules_for_devices_ipv6_no_gateway_port(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_rule_for_device_ipv6_multi_router_interfaces(self):
self.skipTest("NVSD Plugin does not support IPV6.")
class TestOneConvergenceSGServerRpcCallBackXML(
OneConvergenceSecurityGroupsTestCase,
test_sg_rpc.SGServerRpcCallBackMixinTestCaseXML):
def test_security_group_rules_for_devices_ipv6_egress(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_rules_for_devices_ipv6_ingress(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_rules_for_devices_ipv6_source_group(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_ra_rules_for_devices_ipv6_gateway_global(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_ra_rules_for_devices_ipv6_gateway_lla(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_ra_rules_for_devices_ipv6_no_gateway_port(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_rule_for_device_ipv6_multi_router_interfaces(self):
self.skipTest("NVSD Plugin does not support IPV6.")
class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase,
test_sg.TestSecurityGroups,
test_sg_rpc.SGNotificationTestMixin):
def test_security_group_get_port_from_device(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg:
security_group_id = sg['security_group']['id']
res = self._create_port(self.fmt, n['network']['id'])
port = self.deserialize(self.fmt, res)
fixed_ips = port['port']['fixed_ips']
data = {'port': {'fixed_ips': fixed_ips,
'name': port['port']['name'],
ext_sg.SECURITYGROUPS:
[security_group_id]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
self.assertEqual([], port_dict['security_group_rules'])
self.assertEqual([fixed_ips[0]['ip_address']],
port_dict['fixed_ips'])
self._delete('ports', port_id)
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)
class TestOneConvergenceSecurityGroupsXML(TestOneConvergenceSecurityGroups):
fmt = 'xml'