# Copyright (c) 2015 VMware, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import mock from neutron.extensions import securitygroup as ext_sg from neutron.tests.unit.extensions import test_securitygroup as test_ext_sg from vmware_nsx.nsxlib.v3 import dfw_api as firewall from vmware_nsx.nsxlib.v3 import security from vmware_nsx.plugins.nsx_v3 import plugin as nsx_plugin from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_nsxv3 from vmware_nsx.tests.unit.nsxlib.v3 import nsxlib_testcase # Pool of fake ns-groups uuids NSG_IDS = ['11111111-1111-1111-1111-111111111111', '22222222-2222-2222-2222-222222222222', '33333333-3333-3333-3333-333333333333', '44444444-4444-4444-4444-444444444444', '55555555-5555-5555-5555-555555555555'] def _mock_create_and_list_nsgroups(test_method): nsgroups = [] def _create_nsgroup_mock(name, desc, tags): nsgroup = {'id': NSG_IDS[len(nsgroups)], 'display_name': name, 'desc': desc, 'tags': tags} nsgroups.append(nsgroup) return nsgroup def wrap(*args, **kwargs): with mock.patch.object(nsx_plugin.security.firewall, 'create_nsgroup') as create_nsgroup_mock: create_nsgroup_mock.side_effect = _create_nsgroup_mock with mock.patch.object(nsx_plugin.security.firewall, 'list_nsgroups') as list_nsgroups_mock: list_nsgroups_mock.side_effect = lambda: nsgroups test_method(*args, **kwargs) return wrap class TestSecurityGroups(test_nsxv3.NsxV3PluginTestCaseMixin, test_ext_sg.TestSecurityGroups): @_mock_create_and_list_nsgroups @mock.patch.object(firewall, 'remove_nsgroup_member') @mock.patch.object(firewall, 'add_nsgroup_member') def test_create_port_with_multiple_security_groups(self, add_member_mock, remove_member_mock): super(TestSecurityGroups, self).test_create_port_with_multiple_security_groups() # The first nsgroup is associated with the default secgroup, which is # not added to this port. calls = [mock.call(NSG_IDS[1], firewall.LOGICAL_PORT, mock.ANY), mock.call(NSG_IDS[2], firewall.LOGICAL_PORT, mock.ANY)] add_member_mock.assert_has_calls(calls, any_order=True) @_mock_create_and_list_nsgroups @mock.patch.object(firewall, 'remove_nsgroup_member') @mock.patch.object(firewall, 'add_nsgroup_member') def test_update_port_with_multiple_security_groups(self, add_member_mock, remove_member_mock): super(TestSecurityGroups, self).test_update_port_with_multiple_security_groups() calls = [mock.call(NSG_IDS[0], firewall.LOGICAL_PORT, mock.ANY), mock.call(NSG_IDS[1], firewall.LOGICAL_PORT, mock.ANY), mock.call(NSG_IDS[2], firewall.LOGICAL_PORT, mock.ANY)] add_member_mock.assert_has_calls(calls, any_order=True) remove_member_mock.assert_called_with( NSG_IDS[0], firewall.LOGICAL_PORT, mock.ANY) @_mock_create_and_list_nsgroups @mock.patch.object(firewall, 'remove_nsgroup_member') @mock.patch.object(firewall, 'add_nsgroup_member') def test_update_port_remove_security_group_empty_list(self, add_member_mock, remove_member_mock): super(TestSecurityGroups, self).test_update_port_remove_security_group_empty_list() add_member_mock.assert_called_with( NSG_IDS[1], firewall.LOGICAL_PORT, mock.ANY) remove_member_mock.assert_called_with( NSG_IDS[1], firewall.LOGICAL_PORT, mock.ANY) @_mock_create_and_list_nsgroups @mock.patch.object(firewall, 'add_nsgroup_member') def test_create_port_with_full_security_group(self, add_member_mock): def _add_member_mock(nsgroup, target_type, target_id): if nsgroup in NSG_IDS: raise firewall.NSGroupIsFull(nsgroup_id=nsgroup) add_member_mock.side_effect = _add_member_mock with self.network() as net: with self.subnet(net): res = self._create_port(self.fmt, net['network']['id']) res_body = self.deserialize(self.fmt, res) self.assertEqual(500, res.status_int) self.assertEqual('SecurityGroupMaximumCapacityReached', res_body['NeutronError']['type']) @_mock_create_and_list_nsgroups @mock.patch.object(firewall, 'remove_nsgroup_member') @mock.patch.object(firewall, 'add_nsgroup_member') def test_update_port_with_full_security_group(self, add_member_mock, remove_member_mock): def _add_member_mock(nsgroup, target_type, target_id): if nsgroup == NSG_IDS[2]: raise firewall.NSGroupIsFull(nsgroup_id=nsgroup) add_member_mock.side_effect = _add_member_mock with self.port() as port: with self.security_group() as sg1: with self.security_group() as sg2: data = {'port': {ext_sg.SECURITYGROUPS: [sg1['security_group']['id'], sg2['security_group']['id']]}} req = self.new_update_request( 'ports', data, port['port']['id']) res = req.get_response(self.api) res_body = self.deserialize(self.fmt, res) self.assertEqual(500, res.status_int) self.assertEqual('SecurityGroupMaximumCapacityReached', res_body['NeutronError']['type']) # Because the update has failed we excpect that the plugin will try to # revert any changes in the NSGroups - It is required to remove the # lport from any NSGroups which it was added to during that call. calls = [mock.call(NSG_IDS[1], firewall.LOGICAL_PORT, mock.ANY), mock.call(NSG_IDS[2], firewall.LOGICAL_PORT, mock.ANY)] remove_member_mock.assert_has_calls(calls, any_order=True) def test_create_security_group_rule_icmpv6_legacy_protocol_name(self): self.skipTest('not supported') class TestNSGroupManager(nsxlib_testcase.NsxLibTestCase): """ This test suite is responsible for unittesting of class vmware_nsx.nsxlib.v3.security.NSGroupManager. """ @_mock_create_and_list_nsgroups def test_first_initialization(self): size = 5 cont_manager = security.NSGroupManager(size) nested_groups = cont_manager.nested_groups self.assertEqual({i: NSG_IDS[i] for i in range(size)}, nested_groups) @_mock_create_and_list_nsgroups def test_reconfigure_number_of_nested_groups(self): # We need to test that when changing the number of nested groups then # the NSGroupManager picks the ones which were previously created # and create the ones which are missing, which also verifies that it # also recognizes existing nested groups. size = 2 # Creates 2 nested groups. security.NSGroupManager(size) size = 5 # Creates another 3 nested groups. nested_groups = security.NSGroupManager(size).nested_groups self.assertEqual({i: NSG_IDS[i] for i in range(size)}, nested_groups) @_mock_create_and_list_nsgroups @mock.patch.object(firewall, 'remove_nsgroup_member') @mock.patch.object(firewall, 'add_nsgroup_member') def test_add_and_remove_nsgroups(self, add_member_mock, remove_member_mock): # We verify that when adding a new nsgroup the properly placed # according to its id and the number of nested groups. size = 5 cont_manager = security.NSGroupManager(size) nsgroup_id = 'nsgroup_id' with mock.patch.object(cont_manager, '_hash_uuid', return_value=7): cont_manager.add_nsgroup(nsgroup_id) cont_manager.remove_nsgroup(nsgroup_id) # There are 5 nested groups, the hash function will return 7, therefore # we expect that the nsgroup will be placed in the 3rd group. add_member_mock.assert_called_once_with( NSG_IDS[2], firewall.NSGROUP, nsgroup_id) remove_member_mock.assert_called_once_with( NSG_IDS[2], firewall.NSGROUP, nsgroup_id, verify=True) @_mock_create_and_list_nsgroups @mock.patch.object(firewall, 'remove_nsgroup_member') @mock.patch.object(firewall, 'add_nsgroup_member') def test_when_nested_group_is_full(self, add_member_mock, remove_member_mock): def _add_member_mock(nsgroup, target_type, target_id): if nsgroup == NSG_IDS[2]: raise firewall.NSGroupIsFull(nsgroup_id=nsgroup) def _remove_member_mock(nsgroup, target_type, target_id, verify=False): if nsgroup == NSG_IDS[2]: raise firewall.NSGroupMemberNotFound() add_member_mock.side_effect = _add_member_mock remove_member_mock.side_effect = _remove_member_mock size = 5 cont_manager = security.NSGroupManager(size) nsgroup_id = 'nsgroup_id' with mock.patch.object(cont_manager, '_hash_uuid', return_value=7): cont_manager.add_nsgroup(nsgroup_id) cont_manager.remove_nsgroup(nsgroup_id) # Trying to add nsgroup to the nested group at index 2 will raise # NSGroupIsFull exception, we expect that the nsgroup will be added to # the nested group at index 3. calls = [mock.call(NSG_IDS[2], firewall.NSGROUP, nsgroup_id), mock.call(NSG_IDS[3], firewall.NSGROUP, nsgroup_id)] add_member_mock.assert_has_calls(calls) # Since the nsgroup was added to the nested group at index 3, it will # fail to remove it from the group at index 2, and then will try to # remove it from the group at index 3. calls = [ mock.call( NSG_IDS[2], firewall.NSGROUP, nsgroup_id, verify=True), mock.call( NSG_IDS[3], firewall.NSGROUP, nsgroup_id, verify=True)] remove_member_mock.assert_has_calls(calls) @_mock_create_and_list_nsgroups @mock.patch.object(firewall, 'remove_nsgroup_member') @mock.patch.object(firewall, 'add_nsgroup_member') def initialize_with_absent_nested_groups(self, add_member_mock, remove_member_mock): size = 3 cont_manager = security.NSGroupManager(size) # list_nsgroups will return nested group 1 and 3, but not group 2. with mock.patch.object(firewall, 'list_nsgroups_mock') as list_nsgroups_mock: list_nsgroups_mock = lambda: list_nsgroups_mock()[::2] # invoking the initialization process again, it should process # groups 1 and 3 and create group 2. cont_manager = security.NSGroupManager(size) self.assertEqual({1: NSG_IDS[0], 2: NSG_IDS[3], 3: NSG_IDS[2]}, cont_manager.nested_groups) @_mock_create_and_list_nsgroups def test_suggest_nested_group(self): size = 5 cont_manager = security.NSGroupManager(size) # We expect that the first suggested index is 2 expected_suggested_groups = NSG_IDS[2:5] + NSG_IDS[:2] suggest_group = lambda: cont_manager._suggest_nested_group('fake-id') with mock.patch.object(cont_manager, '_hash_uuid', return_value=7): for i, suggested in enumerate(suggest_group()): self.assertEqual(expected_suggested_groups[i], suggested)