NSX|V3: Remove support for non dynamic creteria

Dynamic createria for security groups are supported since NSX 1.1
Commit Iae39a89b762786e4f05aa61aa0db634941806d41 broke this code
but since it is no longer in use this patch removes it.

Change-Id: I1ff5174d03c0e53796054a14a1f0f0ad5c6cceea
This commit is contained in:
asarfaty 2020-03-05 10:24:27 +02:00
parent ea65f8c4ef
commit f1837f6766
3 changed files with 18 additions and 229 deletions

View File

@ -316,19 +316,6 @@ def get_nsx_security_group_id(session, neutron_id, moref=False):
return None
def get_nsx_security_group_ids(session, neutron_ids):
"""Return list of ids of a security groups in the NSX backend.
"""
filters = {'neutron_id': neutron_ids}
like_filters = None
query = session.query(nsx_models.NeutronNsxSecurityGroupMapping)
mappings = _apply_filters_to_query(
query, nsx_models.NeutronNsxSecurityGroupMapping,
filters, like_filters).all()
return [mapping['nsx_id'] for mapping in mappings
if mapping['nsx_id'] is not None]
def _delete_by_neutron_id(session, model, neutron_id):
return session.query(model).filter_by(neutron_id=neutron_id).delete()

View File

@ -1300,8 +1300,7 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
else:
add_to_exclude_list = True
elif self.nsxlib.feature_supported(
nsxlib_consts.FEATURE_DYNAMIC_CRITERIA):
else:
# If port has no security-groups then we don't need to add any
# security criteria tag.
if port_data[ext_sg.SECURITYGROUPS]:
@ -1502,16 +1501,6 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
LOG.warning(err_msg)
raise n_exc.InvalidInput(error_message=err_msg)
def _update_lport_with_security_groups(self, context, lport_id,
original, updated):
# translate the neutron sg ids to nsx ids, and call nsxlib
nsx_origial = nsx_db.get_nsx_security_group_ids(context.session,
original)
nsx_updated = nsx_db.get_nsx_security_group_ids(context.session,
updated)
self.nsxlib.ns_group.update_lport_nsgroups(
lport_id, nsx_origial, nsx_updated)
def _disable_ens_portsec(self, port_data):
if (cfg.CONF.nsx_v3.disable_port_security_for_ens and
not self._ens_psec_supported()):
@ -1610,29 +1599,6 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
{'id': neutron_db['id'], 'e': e})
self._cleanup_port(context, neutron_db['id'], None)
if not self.nsxlib.feature_supported(
nsxlib_consts.FEATURE_DYNAMIC_CRITERIA):
try:
self._update_lport_with_security_groups(
context, lport['id'], [], sgids or [])
except Exception as e:
with excutils.save_and_reraise_exception(reraise=False):
LOG.debug("Couldn't associate port %s with "
"one or more security-groups, reverting "
"logical-port creation (%s).",
port_data['id'], lport['id'])
self._cleanup_port(
context, neutron_db['id'], lport['id'])
# NOTE(arosen): this is to translate between nsxlib
# exceptions and the plugin exceptions. This should be
# later refactored.
if (e.__class__ is
nsx_lib_exc.SecurityGroupMaximumCapacityReached):
raise nsx_exc.SecurityGroupMaximumCapacityReached(
err_msg=e.msg)
else:
raise e
try:
net_id = self._get_network_nsx_id(
context, port_data['network_id'])
@ -1709,11 +1675,6 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
_net_id, nsx_port_id = nsx_db.get_nsx_switch_and_port_id(
context.session, port_id)
self.nsxlib.logical_port.delete(nsx_port_id)
if not self.nsxlib.feature_supported(
nsxlib_consts.FEATURE_DYNAMIC_CRITERIA):
self._update_lport_with_security_groups(
context, nsx_port_id,
port.get(ext_sg.SECURITYGROUPS, []), [])
if (not self.nsxlib.feature_supported(
nsxlib_consts.FEATURE_EXCLUDE_PORT_BY_TAG) and
self._is_excluded_port(port.get('device_owner'),
@ -1809,8 +1770,6 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
fs.remove_member_from_fw_exclude_list(
lport_id, nsxlib_consts.TARGET_TYPE_LOGICAL_PORT)
if self.nsxlib.feature_supported(
nsxlib_consts.FEATURE_DYNAMIC_CRITERIA):
tags_update += self.nsxlib.ns_group.get_lport_tags(
updated_port.get(ext_sg.SECURITYGROUPS, []) +
updated_port.get(provider_sg.PROVIDER_SECURITYGROUPS, []))
@ -1824,13 +1783,6 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
nsxlib_consts.FEATURE_EXCLUDE_PORT_BY_TAG):
tags_update.append({'scope': security.PORT_SG_SCOPE,
'tag': nsxlib_consts.EXCLUDE_PORT})
else:
self._update_lport_with_security_groups(
context, lport_id,
original_port.get(ext_sg.SECURITYGROUPS, []) +
original_port.get(provider_sg.PROVIDER_SECURITYGROUPS, []),
updated_port.get(ext_sg.SECURITYGROUPS, []) +
updated_port.get(provider_sg.PROVIDER_SECURITYGROUPS, []))
# Add availability zone profiles first (so that specific profiles will
# override them)
@ -3103,13 +3055,9 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
project_name=secgroup['tenant_id'])
name = self.nsxlib.ns_group.get_name(secgroup)
if self.nsxlib.feature_supported(
nsxlib_consts.FEATURE_DYNAMIC_CRITERIA):
tag_expression = (
self.nsxlib.ns_group.get_port_tag_expression(
security.PORT_SG_SCOPE, secgroup['id']))
else:
tag_expression = None
ns_group = self.nsxlib.ns_group.create(
name, secgroup['description'], tags, tag_expression)

View File

@ -14,13 +14,10 @@
# under the License.
import mock
from neutron.extensions import securitygroup as ext_sg
from neutron.tests.unit.extensions import test_securitygroup as test_ext_sg
from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_nsxv3
from vmware_nsxlib import v3 as nsxlib
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
from vmware_nsxlib.v3 import nsx_constants as consts
from webob import exc
@ -93,6 +90,7 @@ class TestSecurityGroups(test_nsxv3.NsxV3PluginTestCaseMixin,
name = 'webservers'
description = 'my webservers'
fail = False
with mock.patch("vmware_nsxlib.v3.security.NsxLibFirewallSection."
"create_section_rules",
@ -101,151 +99,7 @@ class TestSecurityGroups(test_nsxv3.NsxV3PluginTestCaseMixin,
with self.security_group(name, description):
# This should not succeed
# (assertRaises would not work with generators)
self.assertTrue(False)
self.assertTrue(fail)
except exc.HTTPClientError:
pass
class TestSecurityGroupsNoDynamicCriteria(test_nsxv3.NsxV3PluginTestCaseMixin,
test_ext_sg.TestSecurityGroups):
def setUp(self):
super(TestSecurityGroupsNoDynamicCriteria, self).setUp()
mock_nsx_version = mock.patch.object(
nsxlib.NsxLib, 'feature_supported', return_value=False)
mock_nsx_version.start()
self._patchers.append(mock_nsx_version)
def test_create_security_group_rule_icmp_with_type_and_code(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
protocol = "icmp"
# port_range_min (ICMP type) is greater than port_range_max
# (ICMP code) in order to confirm min <= max port check is
# not called for ICMP.
port_range_min = 14
port_range_max = None
keys = [('remote_ip_prefix', remote_ip_prefix),
('security_group_id', security_group_id),
('direction', direction),
('protocol', protocol),
('port_range_min', port_range_min),
('port_range_max', port_range_max)]
with self.security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
remote_ip_prefix) as rule:
for k, v, in keys:
self.assertEqual(rule['security_group_rule'][k], v)
@_mock_create_and_list_nsgroups
@mock.patch('vmware_nsxlib.v3.security.NsxLibNsGroup.remove_member')
@mock.patch('vmware_nsxlib.v3.security.NsxLibNsGroup.add_members')
def test_create_port_with_multiple_security_groups(self,
add_member_mock,
remove_member_mock):
super(TestSecurityGroupsNoDynamicCriteria,
self).test_create_port_with_multiple_security_groups()
# The first nsgroup is associated with the default secgroup, which is
# not added to this port.
calls = [mock.call(NSG_IDS[1],
consts.TARGET_TYPE_LOGICAL_PORT, mock.ANY),
mock.call(NSG_IDS[2],
consts.TARGET_TYPE_LOGICAL_PORT, mock.ANY)]
add_member_mock.assert_has_calls(calls, any_order=True)
@_mock_create_and_list_nsgroups
@mock.patch('vmware_nsxlib.v3.security.NsxLibNsGroup.remove_member')
@mock.patch('vmware_nsxlib.v3.security.NsxLibNsGroup.add_members')
def test_update_port_with_multiple_security_groups(self,
add_member_mock,
remove_member_mock):
super(TestSecurityGroupsNoDynamicCriteria,
self).test_update_port_with_multiple_security_groups()
calls = [mock.call(NSG_IDS[0],
consts.TARGET_TYPE_LOGICAL_PORT, mock.ANY),
mock.call(NSG_IDS[1],
consts.TARGET_TYPE_LOGICAL_PORT, mock.ANY),
mock.call(NSG_IDS[2],
consts.TARGET_TYPE_LOGICAL_PORT, mock.ANY)]
add_member_mock.assert_has_calls(calls, any_order=True)
remove_member_mock.assert_called_with(
NSG_IDS[0], consts.TARGET_TYPE_LOGICAL_PORT, mock.ANY)
@_mock_create_and_list_nsgroups
@mock.patch('vmware_nsxlib.v3.security.NsxLibNsGroup.remove_member')
@mock.patch('vmware_nsxlib.v3.security.NsxLibNsGroup.add_members')
def test_update_port_remove_security_group_empty_list(self,
add_member_mock,
remove_member_mock):
super(TestSecurityGroupsNoDynamicCriteria,
self).test_update_port_remove_security_group_empty_list()
add_member_mock.assert_called_with(
NSG_IDS[1], consts.TARGET_TYPE_LOGICAL_PORT, mock.ANY)
remove_member_mock.assert_called_with(
NSG_IDS[1], consts.TARGET_TYPE_LOGICAL_PORT, mock.ANY)
@_mock_create_and_list_nsgroups
@mock.patch('vmware_nsxlib.v3.security.NsxLibNsGroup.add_members')
def test_create_port_with_full_security_group(self, add_member_mock):
def _add_member_mock(nsgroup, target_type, target_id):
if nsgroup in NSG_IDS:
raise nsxlib_exc.NSGroupIsFull(nsgroup_id=nsgroup)
add_member_mock.side_effect = _add_member_mock
with self.network() as net:
with self.subnet(net):
res = self._create_port(self.fmt, net['network']['id'])
res_body = self.deserialize(self.fmt, res)
self.assertEqual(400, res.status_int)
self.assertEqual('SecurityGroupMaximumCapacityReached',
res_body['NeutronError']['type'])
@_mock_create_and_list_nsgroups
@mock.patch('vmware_nsxlib.v3.security.NsxLibNsGroup.remove_member')
@mock.patch('vmware_nsxlib.v3.security.NsxLibNsGroup.add_members')
def test_update_port_with_full_security_group(self,
add_member_mock,
remove_member_mock):
def _add_member_mock(nsgroup, target_type, target_id):
if nsgroup == NSG_IDS[2]:
raise nsxlib_exc.NSGroupIsFull(nsgroup_id=nsgroup)
add_member_mock.side_effect = _add_member_mock
with self.port() as port:
with self.security_group() as sg1:
with self.security_group() as sg2:
data = {'port': {ext_sg.SECURITYGROUPS:
[sg1['security_group']['id'],
sg2['security_group']['id']]}}
req = self.new_update_request(
'ports', data, port['port']['id'])
res = req.get_response(self.api)
res_body = self.deserialize(self.fmt, res)
self.assertEqual(400, res.status_int)
self.assertEqual('SecurityGroupMaximumCapacityReached',
res_body['NeutronError']['type'])
# Because the update has failed we excpect that the plugin will try to
# revert any changes in the NSGroups - It is required to remove the
# lport from any NSGroups which it was added to during that call.
calls = [mock.call(NSG_IDS[1],
consts.TARGET_TYPE_LOGICAL_PORT, mock.ANY),
mock.call(NSG_IDS[2],
consts.TARGET_TYPE_LOGICAL_PORT, mock.ANY)]
remove_member_mock.assert_has_calls(calls, any_order=True)
def test_create_security_group_rule_icmpv6_legacy_protocol_name(self):
self.skipTest('not supported')