# Copyright (c) 2014 VMware, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. # from neutron.common import exceptions from neutron.plugins.nicira.nsxlib import secgroup as secgrouplib from neutron.plugins.nicira import nvplib from neutron.tests.unit.vmware.nsxlib import base class SecurityProfileTestCase(base.NsxlibTestCase): def test_create_and_get_security_profile(self): sec_prof = secgrouplib.create_security_profile( self.fake_cluster, 'pippo', {'name': 'test'}) sec_prof_res = nvplib.do_request( secgrouplib.HTTP_GET, nvplib._build_uri_path('security-profile', resource_id=sec_prof['uuid']), cluster=self.fake_cluster) self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid']) # Check for builtin rules self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 1) self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2) def test_create_and_get_default_security_profile(self): sec_prof = secgrouplib.create_security_profile( self.fake_cluster, 'pippo', {'name': 'default'}) sec_prof_res = nvplib.do_request( nvplib.HTTP_GET, nvplib._build_uri_path('security-profile', resource_id=sec_prof['uuid']), cluster=self.fake_cluster) self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid']) # Check for builtin rules self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 3) self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2) def test_update_security_profile_rules(self): sec_prof = secgrouplib.create_security_profile( self.fake_cluster, 'pippo', {'name': 'test'}) ingress_rule = {'ethertype': 'IPv4'} egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'} new_rules = {'logical_port_egress_rules': [egress_rule], 'logical_port_ingress_rules': [ingress_rule]} secgrouplib.update_security_group_rules( self.fake_cluster, sec_prof['uuid'], new_rules) sec_prof_res = nvplib.do_request( nvplib.HTTP_GET, nvplib._build_uri_path('security-profile', resource_id=sec_prof['uuid']), cluster=self.fake_cluster) self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid']) # Check for builtin rules self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2) self.assertIn(egress_rule, sec_prof_res['logical_port_egress_rules']) self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1) self.assertIn(ingress_rule, sec_prof_res['logical_port_ingress_rules']) def test_update_security_profile_rules_noingress(self): sec_prof = secgrouplib.create_security_profile( self.fake_cluster, 'pippo', {'name': 'test'}) hidden_ingress_rule = {'ethertype': 'IPv4', 'ip_prefix': '127.0.0.1/32'} egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'} new_rules = {'logical_port_egress_rules': [egress_rule], 'logical_port_ingress_rules': []} secgrouplib.update_security_group_rules( self.fake_cluster, sec_prof['uuid'], new_rules) sec_prof_res = nvplib.do_request( nvplib.HTTP_GET, nvplib._build_uri_path('security-profile', resource_id=sec_prof['uuid']), cluster=self.fake_cluster) self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid']) # Check for builtin rules self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2) self.assertIn(egress_rule, sec_prof_res['logical_port_egress_rules']) self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1) self.assertIn(hidden_ingress_rule, sec_prof_res['logical_port_ingress_rules']) def test_update_non_existing_securityprofile_raises(self): self.assertRaises(exceptions.NeutronException, secgrouplib.update_security_group_rules, self.fake_cluster, 'whatever', {'logical_port_egress_rules': [], 'logical_port_ingress_rules': []}) def test_delete_security_profile(self): sec_prof = secgrouplib.create_security_profile( self.fake_cluster, 'pippo', {'name': 'test'}) secgrouplib.delete_security_profile( self.fake_cluster, sec_prof['uuid']) self.assertRaises(exceptions.NotFound, nvplib.do_request, nvplib.HTTP_GET, nvplib._build_uri_path( 'security-profile', resource_id=sec_prof['uuid']), cluster=self.fake_cluster) def test_delete_non_existing_securityprofile_raises(self): self.assertRaises(exceptions.NeutronException, secgrouplib.delete_security_profile, self.fake_cluster, 'whatever')