
Now that we are python3 only, we should move to using the built in version of mock that supports all of our testing needs and remove the dependency on the "mock" package. Also see commit: Ifcaf1c21bea0ec3c35278e49cecc90a101a82113 Change-Id: I58da980351fe14357c210c02eb167a6c0af9d09e
255 lines
11 KiB
Python
255 lines
11 KiB
Python
# Copyright 2016 VMware, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from unittest import mock
|
|
|
|
from oslo_config import cfg
|
|
import webob.exc
|
|
|
|
from neutron.extensions import securitygroup as ext_sg
|
|
from neutron.tests.unit.api import test_extensions
|
|
from neutron.tests.unit.db import test_db_base_plugin_v2
|
|
from neutron.tests.unit.extensions import test_securitygroup
|
|
from neutron_lib import constants
|
|
from neutron_lib import context
|
|
from neutron_lib import exceptions as n_exc
|
|
|
|
from vmware_nsx.extensions import nsxpolicy
|
|
from vmware_nsx.extensions import securitygrouplogging as ext_logging
|
|
from vmware_nsx.extensions import securitygrouppolicy as ext_policy
|
|
from vmware_nsx.tests.unit.nsx_v import test_plugin
|
|
from vmware_nsx.tests.unit.nsx_v.vshield import fake_vcns
|
|
|
|
PLUGIN_NAME = 'vmware_nsx.plugin.NsxVPlugin'
|
|
|
|
|
|
class SecGroupPolicyExtensionTestCase(
|
|
test_plugin.NsxVPluginV2TestCase,
|
|
test_securitygroup.SecurityGroupDBTestCase):
|
|
def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None):
|
|
cfg.CONF.set_override('use_nsx_policies', True, group='nsxv')
|
|
cfg.CONF.set_override('default_policy_id', 'policy-1', group='nsxv')
|
|
# This feature is enabled only since 6.2
|
|
with mock.patch.object(fake_vcns.FakeVcns,
|
|
'get_version',
|
|
return_value="6.2.3"):
|
|
super(SecGroupPolicyExtensionTestCase, self).setUp(
|
|
plugin=plugin, ext_mgr=ext_mgr)
|
|
self._tenant_id = test_db_base_plugin_v2.TEST_TENANT_ID
|
|
# add policy & logging security group attribute
|
|
ext_sg.Securitygroup().update_attributes_map(
|
|
ext_policy.RESOURCE_ATTRIBUTE_MAP)
|
|
ext_sg.Securitygroup().update_attributes_map(
|
|
ext_logging.RESOURCE_ATTRIBUTE_MAP)
|
|
|
|
def _create_secgroup_with_policy(self, policy_id, description=None,
|
|
logging=False):
|
|
body = {'security_group':
|
|
{'name': 'sg-policy',
|
|
'tenant_id': self._tenant_id,
|
|
'policy': policy_id,
|
|
'description': description if description else '',
|
|
'logging': logging}}
|
|
return self._create_security_group_response(self.fmt, body)
|
|
|
|
def _get_secgroup_with_policy(self):
|
|
policy_id = 'policy-5'
|
|
res = self._create_secgroup_with_policy(policy_id)
|
|
return self.deserialize(self.fmt, res)
|
|
|
|
def test_secgroup_create_with_policy(self):
|
|
policy_id = 'policy-5'
|
|
res = self._create_secgroup_with_policy(policy_id)
|
|
sg = self.deserialize(self.fmt, res)
|
|
self.assertEqual(policy_id, sg['security_group']['policy'])
|
|
self.assertEqual('dummy', sg['security_group']['description'])
|
|
|
|
def test_secgroup_create_with_policyand_desc(self):
|
|
policy_id = 'policy-5'
|
|
desc = 'test'
|
|
res = self._create_secgroup_with_policy(policy_id, description=desc)
|
|
sg = self.deserialize(self.fmt, res)
|
|
self.assertEqual(policy_id, sg['security_group']['policy'])
|
|
self.assertEqual(desc, sg['security_group']['description'])
|
|
|
|
def test_secgroup_create_without_policy(self):
|
|
res = self._create_secgroup_with_policy(None)
|
|
self.assertEqual(400, res.status_int)
|
|
|
|
def test_secgroup_create_with_illegal_policy(self):
|
|
policy_id = 'bad-policy'
|
|
with mock.patch(PLUGIN_NAME + '.get_nsx_policy',
|
|
side_effect=n_exc.ObjectNotFound(id=policy_id)):
|
|
res = self._create_secgroup_with_policy(policy_id)
|
|
self.assertEqual(400, res.status_int)
|
|
|
|
def test_secgroup_create_with_policy_and_logging(self):
|
|
# We do not support policy & logging together
|
|
policy_id = 'policy-5'
|
|
res = self._create_secgroup_with_policy(policy_id, logging=True)
|
|
self.assertEqual(400, res.status_int)
|
|
|
|
def test_secgroup_update_with_policy(self):
|
|
# Test that updating the policy is allowed
|
|
old_policy = 'policy-5'
|
|
new_policy = 'policy-6'
|
|
res = self._create_secgroup_with_policy(old_policy)
|
|
sg = self.deserialize(self.fmt, res)
|
|
data = {'security_group': {'policy': new_policy}}
|
|
req = self.new_update_request('security-groups', data,
|
|
sg['security_group']['id'])
|
|
updated_sg = self.deserialize(self.fmt, req.get_response(self.ext_api))
|
|
self.assertEqual(new_policy, updated_sg['security_group']['policy'])
|
|
# Verify the same result in 'get'
|
|
req = self.new_show_request('security-groups',
|
|
sg['security_group']['id'])
|
|
shown_sg = self.deserialize(self.fmt, req.get_response(self.ext_api))
|
|
self.assertEqual(new_policy, shown_sg['security_group']['policy'])
|
|
|
|
def test_secgroup_update_no_policy_change(self):
|
|
# Test updating without changing the policy
|
|
old_policy = 'policy-5'
|
|
desc = 'abc'
|
|
res = self._create_secgroup_with_policy(old_policy)
|
|
sg = self.deserialize(self.fmt, res)
|
|
data = {'security_group': {'description': desc}}
|
|
req = self.new_update_request('security-groups', data,
|
|
sg['security_group']['id'])
|
|
updated_sg = self.deserialize(self.fmt, req.get_response(self.ext_api))
|
|
self.assertEqual(old_policy, updated_sg['security_group']['policy'])
|
|
self.assertEqual(desc, updated_sg['security_group']['description'])
|
|
|
|
def test_secgroup_update_remove_policy(self):
|
|
# removing the policy is not allowed
|
|
sg = self._get_secgroup_with_policy()
|
|
data = {'security_group': {'policy': None}}
|
|
req = self.new_update_request('security-groups', data,
|
|
sg['security_group']['id'])
|
|
res = req.get_response(self.ext_api)
|
|
self.assertEqual(400, res.status_int)
|
|
|
|
def test_secgroup_update_add_logging(self):
|
|
# We do not support policy & logging together
|
|
sg = self._get_secgroup_with_policy()
|
|
data = {'security_group': {'logging': True}}
|
|
req = self.new_update_request('security-groups', data,
|
|
sg['security_group']['id'])
|
|
res = req.get_response(self.ext_api)
|
|
self.assertEqual(400, res.status_int)
|
|
|
|
def test_non_admin_cannot_delete_policy_sg_and_admin_can(self):
|
|
sg = self._get_secgroup_with_policy()
|
|
sg_id = sg['security_group']['id']
|
|
|
|
# Try deleting the request as a normal user returns forbidden
|
|
# as a tenant is not allowed to delete this.
|
|
ctx = context.Context('', self._tenant_id)
|
|
self._delete('security-groups', sg_id,
|
|
expected_code=webob.exc.HTTPForbidden.code,
|
|
neutron_context=ctx)
|
|
# can be deleted though as admin
|
|
self._delete('security-groups', sg_id,
|
|
expected_code=webob.exc.HTTPNoContent.code)
|
|
|
|
def test_create_rule(self):
|
|
sg = self._get_secgroup_with_policy()
|
|
rule = self._build_security_group_rule(
|
|
sg['security_group']['id'], 'ingress',
|
|
constants.PROTO_NAME_TCP, '22', '22')
|
|
res = self._create_security_group_rule(self.fmt, rule)
|
|
self.deserialize(self.fmt, res)
|
|
self.assertEqual(400, res.status_int)
|
|
|
|
|
|
class SecGroupPolicyExtensionTestCaseWithRules(
|
|
SecGroupPolicyExtensionTestCase):
|
|
|
|
def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None):
|
|
cfg.CONF.set_override('allow_tenant_rules_with_policy',
|
|
True, group='nsxv')
|
|
super(SecGroupPolicyExtensionTestCaseWithRules, self).setUp(
|
|
plugin=plugin, ext_mgr=ext_mgr)
|
|
|
|
def test_secgroup_create_without_policy(self):
|
|
# in case allow_tenant_rules_with_policy is True, it is allowed to
|
|
# create a regular sg
|
|
desc = 'test'
|
|
res = self._create_secgroup_with_policy(None, description=desc)
|
|
sg = self.deserialize(self.fmt, res)
|
|
self.assertIsNone(sg['security_group']['policy'])
|
|
self.assertEqual(desc, sg['security_group']['description'])
|
|
|
|
def test_secgroup_create_without_policy_update_policy(self):
|
|
# Create a regular security group. adding the policy later should fail
|
|
res = self._create_secgroup_with_policy(None)
|
|
sg = self.deserialize(self.fmt, res)
|
|
data = {'security_group': {'policy': 'policy-1'}}
|
|
req = self.new_update_request('security-groups', data,
|
|
sg['security_group']['id'])
|
|
res = req.get_response(self.ext_api)
|
|
self.assertEqual(400, res.status_int)
|
|
|
|
def test_secgroup_create_without_policy_and_rule(self):
|
|
# Test that regular security groups can have rules
|
|
res = self._create_secgroup_with_policy(None)
|
|
sg = self.deserialize(self.fmt, res)
|
|
self.assertIsNone(sg['security_group']['policy'])
|
|
|
|
rule = self._build_security_group_rule(
|
|
sg['security_group']['id'], 'ingress',
|
|
constants.PROTO_NAME_TCP, '22', '22')
|
|
res = self._create_security_group_rule(self.fmt, rule)
|
|
rule_data = self.deserialize(self.fmt, res)
|
|
self.assertEqual(
|
|
sg['security_group']['id'],
|
|
rule_data['security_group_rule']['security_group_id'])
|
|
|
|
|
|
class NsxPolExtensionManager(object):
|
|
|
|
def get_resources(self):
|
|
return nsxpolicy.Nsxpolicy.get_resources()
|
|
|
|
def get_actions(self):
|
|
return []
|
|
|
|
def get_request_extensions(self):
|
|
return []
|
|
|
|
|
|
class TestNsxPolicies(test_plugin.NsxVPluginV2TestCase):
|
|
|
|
def setUp(self, plugin=None):
|
|
super(TestNsxPolicies, self).setUp()
|
|
ext_mgr = NsxPolExtensionManager()
|
|
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
|
|
|
|
def test_get_policy(self):
|
|
id = 'policy-1'
|
|
req = self.new_show_request('nsx-policies', id)
|
|
res = self.deserialize(
|
|
self.fmt, req.get_response(self.ext_api)
|
|
)
|
|
policy = res['nsx_policy']
|
|
self.assertEqual(id, policy['id'])
|
|
|
|
def test_list_policies(self):
|
|
req = self.new_list_request('nsx-policies')
|
|
res = self.deserialize(
|
|
self.fmt, req.get_response(self.ext_api)
|
|
)
|
|
self.assertIn('nsx_policies', res)
|
|
# the fake_vcns api returns 3 policies
|
|
self.assertEqual(3, len(res['nsx_policies']))
|