Add coverage tests

As a first step add coverage test as non-voting with a low threshold,
and exclude older plugins.
Also removing some unused code, and relocating tests-only code, and adding
some unit tests to improve coverage.

Change-Id: Ib7af0b5de49e1a0ee2927b01f2a5f71acf633fb5
This commit is contained in:
asarfaty 2020-04-07 09:08:29 +02:00
parent 18260971fb
commit 98b2832e4d
13 changed files with 205 additions and 219 deletions

View File

@ -1,7 +1,7 @@
[run]
branch = True
source = neutron
omit = neutron/tests/*,neutron/openstack/*
source = vmware_nsx
omit = vmware_nsx/tests/*,vmware_nsx/*dvs*,vmware_nsx/api_replay/*,vmware_nsx/dhcp_meta/*,vmware_nsx/nsxlib/*,vmware_nsx/*lsn*,vmware_nsx/*tv*,vmware_nsx/api_client/*,vmware_nsx/common/profile*,vmware_nsx/shell/nsx_instance_if_migrate*,vmware_nsx/plugins/nsx_v/vshield/vcns.*,vmware_nsx/db/migration/alembic_migrations/*
[report]
ignore_errors = True

View File

@ -4,6 +4,7 @@
- check-requirements
- openstack-python3-ussuri-jobs-neutron
- openstack-python3-ussuri-jobs
- openstack-cover-jobs
check:
jobs:
- vmware-tox-lower-constraints
@ -54,6 +55,19 @@
- openstack/neutron-vpnaas
- x/tap-as-a-service
- openstack/octavia
- openstack-tox-cover:
timeout: 5400
required-projects:
- openstack/neutron
- openstack/networking-l2gw
- openstack/networking-sfc
- x/vmware-nsxlib
- openstack/neutron-fwaas
- openstack/neutron-dynamic-routing
- openstack/neutron-vpnaas
- x/tap-as-a-service
- openstack/octavia
voting: false
gate:
queue: vmware-nsx
jobs:

15
tox.ini
View File

@ -129,9 +129,20 @@ whitelist_externals =
commands = bandit -r vmware_nsx -n 5 -ll
[testenv:cover]
basepython = python3.6
envdir = {toxworkdir}/shared
setenv = {[testenv]setenv}
{[testenv:common]setenv}
PYTHON=coverage run --source vmware_nsx --parallel-mode
commands =
python setup.py testr --coverage --testr-args='{posargs}'
coverage report
{[testenv:dev]commands}
coverage erase
stestr run {posargs}
stestr slowest
coverage combine
coverage report --fail-under=65 --skip-covered
coverage html -d cover
coverage xml -o cover/coverage.xml
[testenv:docs]
deps = -c{env:UPPER_CONSTRAINTS_FILE:https://opendev.org/openstack/requirements/raw/branch/master/upper-constraints.txt}

View File

@ -1,194 +0,0 @@
# Copyright 2013 VMware Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import multiprovidernet as mpnet_apidef
from neutron_lib.api.definitions import provider_net as pnet
from neutron_lib.api import validators
from neutron_lib import constants
from oslo_log import log
from vmware_nsx.api_client import client
from vmware_nsx.common import utils as vmw_utils
from vmware_nsx.db import db as nsx_db
from vmware_nsx import nsx_cluster
from vmware_nsx.nsxlib.mh import switch as switchlib
LOG = log.getLogger(__name__)
def get_nsx_switch_ids(session, cluster, neutron_network_id):
"""Return the NSX switch id for a given neutron network.
First lookup for mappings in Neutron database. If no mapping is
found, query the NSX backend and add the mappings.
"""
nsx_switch_ids = nsx_db.get_nsx_switch_ids(
session, neutron_network_id)
if not nsx_switch_ids:
# Find logical switches from backend.
# This is a rather expensive query, but it won't be executed
# more than once for each network in Neutron's lifetime
nsx_switches = switchlib.get_lswitches(cluster, neutron_network_id)
if not nsx_switches:
LOG.warning("Unable to find NSX switches for Neutron network "
"%s", neutron_network_id)
return
nsx_switch_ids = []
with session.begin(subtransactions=True):
for nsx_switch in nsx_switches:
nsx_switch_id = nsx_switch['uuid']
nsx_switch_ids.append(nsx_switch_id)
# Create DB mapping
nsx_db.add_neutron_nsx_network_mapping(
session,
neutron_network_id,
nsx_switch_id)
return nsx_switch_ids
def get_nsx_switch_and_port_id(session, cluster, neutron_port_id):
"""Return the NSX switch and port uuids for a given neutron port.
First, look up the Neutron database. If not found, execute
a query on NSX platform as the mapping might be missing because
the port was created before upgrading to grizzly.
This routine also retrieves the identifier of the logical switch in
the backend where the port is plugged. Prior to Icehouse this
information was not available in the Neutron Database. For dealing
with pre-existing records, this routine will query the backend
for retrieving the correct switch identifier.
As of Icehouse release it is not indeed anymore possible to assume
the backend logical switch identifier is equal to the neutron
network identifier.
"""
nsx_switch_id, nsx_port_id = nsx_db.get_nsx_switch_and_port_id(
session, neutron_port_id)
if not nsx_switch_id:
# Find logical switch for port from backend
# This is a rather expensive query, but it won't be executed
# more than once for each port in Neutron's lifetime
nsx_ports = switchlib.query_lswitch_lports(
cluster, '*', relations='LogicalSwitchConfig',
filters={'tag': neutron_port_id,
'tag_scope': 'q_port_id'})
# Only one result expected
# NOTE(salv-orlando): Not handling the case where more than one
# port is found with the same neutron port tag
if not nsx_ports:
LOG.warning("Unable to find NSX port for Neutron port %s",
neutron_port_id)
# This method is supposed to return a tuple
return None, None
nsx_port = nsx_ports[0]
nsx_switch_id = (nsx_port['_relations']
['LogicalSwitchConfig']['uuid'])
if nsx_port_id:
# Mapping already exists. Delete before recreating
nsx_db.delete_neutron_nsx_port_mapping(
session, neutron_port_id)
else:
nsx_port_id = nsx_port['uuid']
# (re)Create DB mapping
nsx_db.add_neutron_nsx_port_mapping(
session, neutron_port_id,
nsx_switch_id, nsx_port_id)
return nsx_switch_id, nsx_port_id
def create_nsx_cluster(cluster_opts, concurrent_connections, gen_timeout):
cluster = nsx_cluster.NSXCluster(**cluster_opts)
def _ctrl_split(x, y):
return (x, int(y), True)
api_providers = [_ctrl_split(*ctrl.split(':'))
for ctrl in cluster.nsx_controllers]
cluster.api_client = client.NsxApiClient(
api_providers, cluster.nsx_user, cluster.nsx_password,
http_timeout=cluster.http_timeout,
retries=cluster.retries,
redirects=cluster.redirects,
concurrent_connections=concurrent_connections,
gen_timeout=gen_timeout)
return cluster
def _convert_bindings_to_nsx_transport_zones(bindings):
nsx_transport_zones_config = []
for binding in bindings:
transport_entry = {}
if binding.binding_type in [vmw_utils.NetworkTypes.FLAT,
vmw_utils.NetworkTypes.VLAN]:
transport_entry['transport_type'] = (
vmw_utils.NetworkTypes.BRIDGE)
transport_entry['binding_config'] = {}
vlan_id = binding.vlan_id
if vlan_id:
transport_entry['binding_config'] = (
{'vlan_translation': [{'transport': vlan_id}]})
else:
transport_entry['transport_type'] = binding.binding_type
transport_entry['zone_uuid'] = binding.phy_uuid
nsx_transport_zones_config.append(transport_entry)
return nsx_transport_zones_config
def _convert_segments_to_nsx_transport_zones(segments, default_tz_uuid):
nsx_transport_zones_config = []
for transport_zone in segments:
for value in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID]:
if transport_zone.get(value) == constants.ATTR_NOT_SPECIFIED:
transport_zone[value] = None
transport_entry = {}
transport_type = transport_zone.get(pnet.NETWORK_TYPE)
if transport_type in [vmw_utils.NetworkTypes.FLAT,
vmw_utils.NetworkTypes.VLAN]:
transport_entry['transport_type'] = (
vmw_utils.NetworkTypes.BRIDGE)
transport_entry['binding_config'] = {}
vlan_id = transport_zone.get(pnet.SEGMENTATION_ID)
if vlan_id:
transport_entry['binding_config'] = (
{'vlan_translation': [{'transport': vlan_id}]})
else:
transport_entry['transport_type'] = transport_type
transport_entry['zone_uuid'] = (
transport_zone[pnet.PHYSICAL_NETWORK] or default_tz_uuid)
nsx_transport_zones_config.append(transport_entry)
return nsx_transport_zones_config
def convert_to_nsx_transport_zones(
default_tz_uuid, network=None, bindings=None,
default_transport_type=None):
# Convert fields from provider request to nsx format
if (network and not validators.is_attr_set(
network.get(mpnet_apidef.SEGMENTS))):
return [{"zone_uuid": default_tz_uuid,
"transport_type": default_transport_type}]
# Convert fields from db to nsx format
if bindings:
return _convert_bindings_to_nsx_transport_zones(bindings)
# If we end up here we need to convert multiprovider segments into nsx
# transport zone configurations
return _convert_segments_to_nsx_transport_zones(
network.get(mpnet_apidef.SEGMENTS), default_tz_uuid)

View File

@ -41,13 +41,6 @@ def lsn_remove(context, lsn_id):
context.session.query(nsx_models.Lsn).filter_by(lsn_id=lsn_id).delete()
def lsn_remove_for_network(context, network_id):
"""Remove information about the Logical Service Node given its network."""
with db_api.CONTEXT_WRITER.using(context):
context.session.query(nsx_models.Lsn).filter_by(
net_id=network_id).delete()
def lsn_get_for_network(context, network_id, raise_on_err=True):
"""Retrieve LSN information given its network id."""
query = context.session.query(nsx_models.Lsn)

View File

@ -24,7 +24,7 @@ from oslo_utils import excutils
from vmware_nsx._i18n import _
from vmware_nsx.api_client import exception as api_exc
from vmware_nsx.common import exceptions as p_exc
from vmware_nsx.common import nsx_utils
from vmware_nsx.db import db as nsx_db
from vmware_nsx.db import lsn_db
from vmware_nsx.dhcp_meta import constants as const
from vmware_nsx.nsxlib.mh import lsn as lsn_api
@ -48,6 +48,36 @@ def register_lsn_opts(config):
config.CONF.register_opts(lsn_opts, "NSX_LSN")
def get_nsx_switch_ids(session, cluster, neutron_network_id):
"""Return the NSX switch id for a given neutron network.
First lookup for mappings in Neutron database. If no mapping is
found, query the NSX backend and add the mappings.
"""
nsx_switch_ids = nsx_db.get_nsx_switch_ids(
session, neutron_network_id)
if not nsx_switch_ids:
# Find logical switches from backend.
# This is a rather expensive query, but it won't be executed
# more than once for each network in Neutron's lifetime
nsx_switches = switch_api.get_lswitches(cluster, neutron_network_id)
if not nsx_switches:
LOG.warning("Unable to find NSX switches for Neutron network "
"%s", neutron_network_id)
return
nsx_switch_ids = []
with session.begin(subtransactions=True):
for nsx_switch in nsx_switches:
nsx_switch_id = nsx_switch['uuid']
nsx_switch_ids.append(nsx_switch_id)
# Create DB mapping
nsx_db.add_neutron_nsx_network_mapping(
session,
neutron_network_id,
nsx_switch_id)
return nsx_switch_ids
class LsnManager(object):
"""Manage LSN entities associated with networks."""
@ -199,7 +229,7 @@ class LsnManager(object):
"""Connect network to LSN via specified port and port_data."""
try:
lsn_id = None
switch_id = nsx_utils.get_nsx_switch_ids(
switch_id = get_nsx_switch_ids(
context.session, self.cluster, network_id)[0]
lswitch_port_id = switch_api.get_port_by_neutron_tag(
self.cluster, switch_id, port_id)['uuid']
@ -233,7 +263,7 @@ class LsnManager(object):
tenant_id = subnet['tenant_id']
lswitch_port_id = None
try:
switch_id = nsx_utils.get_nsx_switch_ids(
switch_id = get_nsx_switch_ids(
context.session, self.cluster, network_id)[0]
lswitch_port_id = switch_api.create_lport(
self.cluster, switch_id, tenant_id,

View File

@ -346,9 +346,11 @@ def get_security_groups_mappings(context):
def get_orphaned_firewall_sections(context, nsxlib):
fw_sections = nsxlib.firewall_section.list()
sg_mappings = get_security_groups_mappings(context)
orphaned_sections = []
fw_sections = nsxlib.firewall_section.list()
if not fw_sections:
return orphaned_sections
sg_mappings = get_security_groups_mappings(context)
for fw_section in fw_sections:
for sg_db in sg_mappings:
if fw_section['id'] == sg_db['section-id']:
@ -356,6 +358,7 @@ def get_orphaned_firewall_sections(context, nsxlib):
else:
# Skip non-neutron sections, by tags
neutron_obj = False
LOG.error("DEBUG ADIT fw_section %s", fw_section)
for tag in fw_section.get('tags', []):
if tag['scope'] == 'os-api-version':
neutron_obj = True

View File

@ -149,13 +149,13 @@ def update_nat_firewall_match(resource, event, trigger, **kwargs):
for rule in rules:
if not nsxpolicy.feature_supported(
nsx_constants.FEATURE_PARTIAL_UPDATES):
if rule['firewall_match'] == old_firewall_match:
if rule.get('firewall_match', '') == old_firewall_match:
nsxpolicy.tier1_nat_rule.update(
router['id'], rule['id'],
firewall_match=new_firewall_match)
else:
with policy_trans.NsxPolicyTransaction():
if rule['firewall_match'] == old_firewall_match:
if rule.get('firewall_match', '') == old_firewall_match:
nsxpolicy.tier1_nat_rule.update(
router['id'], rule['id'],
firewall_match=new_firewall_match)

View File

@ -0,0 +1,86 @@
# Copyright 2020 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.tests import base
from neutron_lib.plugins import constants
from oslo_utils import uuidutils
from vmware_nsx.plugins.common.housekeeper import base_job
from vmware_nsx.plugins.nsx_v3.housekeeper import orphaned_firewall_section
DUMMY_FS = {
"resource_type": "FirewallSection",
"id": uuidutils.generate_uuid(),
"display_name": "test",
"tags": [{
"scope": "os-neutron-secgr-id",
"tag": uuidutils.generate_uuid()
}, {
"scope": "os-project-id",
"tag": uuidutils.generate_uuid()
}, {
"scope": "os-project-name",
"tag": "admin"
}, {
"scope": "os-api-version",
"tag": "13.0.0.0b3.dev90"
}]}
class OrphanedFirewallSectionTestCaseReadOnly(base.BaseTestCase):
def setUp(self):
def get_plugin_mock(alias=constants.CORE):
if alias in (constants.CORE, constants.L3):
return self.plugin
super(OrphanedFirewallSectionTestCaseReadOnly, self).setUp()
self.plugin = mock.Mock()
self.plugin.nsxlib = mock.Mock()
self.context = mock.Mock()
self.context.session = mock.Mock()
mock.patch('neutron_lib.plugins.directory.get_plugin',
side_effect=get_plugin_mock).start()
self.log = mock.Mock()
base_job.LOG = self.log
self.job = orphaned_firewall_section.OrphanedFirewallSectionJob(
True, [])
def run_job(self):
self.job.run(self.context, readonly=True)
def test_clean_run(self):
with mock.patch.object(self.plugin.nsxlib.firewall_section, 'list',
return_value=[]),\
mock.patch("vmware_nsx.plugins.nsx_v3.utils."
"get_security_groups_mappings", return_value=[]):
self.run_job()
self.log.warning.assert_not_called()
def test_with_orphaned_ls(self):
with mock.patch.object(self.plugin.nsxlib.firewall_section, 'list',
return_value=[DUMMY_FS]),\
mock.patch("vmware_nsx.plugins.nsx_v3.utils."
"get_security_groups_mappings", return_value=[]):
self.run_job()
self.log.warning.assert_called()
class OrphanedFirewallSectionTestCaseReadWrite(
OrphanedFirewallSectionTestCaseReadOnly):
def run_job(self):
self.job.run(self.context, readonly=False)

View File

@ -22,9 +22,9 @@ from vmware_nsx.api_client import client
from vmware_nsx.api_client import exception
from vmware_nsx.api_client import version
from vmware_nsx.common import config # noqa
from vmware_nsx import nsx_cluster as cluster
from vmware_nsx.tests import unit as vmware
from vmware_nsx.tests.unit.nsxlib import fake
from vmware_nsx.tests.unit.nsxlib.mh import nsx_cluster as cluster
_uuid = test_base._uuid

View File

@ -645,7 +645,7 @@ class TestVpnaasDriver(test_plugin.NsxPPluginTestCaseMixin):
return_value=tier0_uuid),\
self.router(external_gateway_info={'network_id':
ext_net['network']['id']}) as router,\
self.subnet(cidr='1.1.0.0/24') as sub:
self.subnet(cidr='1.1.0.0/24', enable_dhcp=False) as sub:
# add an interface to the router
self.l3plugin.add_router_interface(
self.context,
@ -701,7 +701,7 @@ class TestVpnaasDriver(test_plugin.NsxPPluginTestCaseMixin):
return_value=tier0_rtr_id),\
self.router(external_gateway_info={'network_id':
ext_net['network']['id']}) as router,\
self.subnet(cidr='1.1.0.0/24') as sub:
self.subnet(cidr='1.1.0.0/24', enable_dhcp=False) as sub:
# add an interface to the router
self.l3plugin.add_router_interface(
self.context,

View File

@ -45,6 +45,7 @@ from vmware_nsx.tests.unit.nsx_v import test_plugin as test_v_plugin
from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_v3_plugin
from vmware_nsxlib.v3 import core_resources
from vmware_nsxlib.v3 import resources as nsx_v3_resources
from vmware_nsxlib.v3 import security as nsx_v3_security
LOG = logging.getLogger(__name__)
NSX_INI_PATH = vmware.get_fake_conf('nsx.ini.test')
@ -169,12 +170,16 @@ class TestNsxvAdminUtils(AbstractTestAdminUtils,
# Create a router to make sure we have deployed an edge
self.router = self._create_router()
self.dist_router = self._create_router(dist=True)
self.network = self._create_net()
def tearDown(self):
if self.router and self.router.get('id'):
self._plugin.delete_router(
self.edgeapi.context, self.router['id'])
if self.dist_router and self.dist_router.get('id'):
self._plugin.delete_router(
self.edgeapi.context, self.dist_router['id'])
if self.network and self.network.get('id'):
self._plugin.delete_network(
self.edgeapi.context, self.network['id'])
@ -188,12 +193,15 @@ class TestNsxvAdminUtils(AbstractTestAdminUtils,
args['property'].extend(params)
self._test_resource('edges', 'nsx-update', **args)
def _create_router(self):
def _create_router(self, dist=False):
# Create an exclusive router (with an edge)
tenant_id = uuidutils.generate_uuid()
data = {'router': {'tenant_id': tenant_id}}
data['router']['name'] = 'dummy'
data['router']['admin_state_up'] = True
if dist:
data['router']['distributes'] = True
else:
data['router']['router_type'] = 'exclusive'
return self._plugin.create_router(self.edgeapi.context, data)
@ -259,11 +267,16 @@ class TestNsxvAdminUtils(AbstractTestAdminUtils,
"policy-id=1",
"network_id=net-1",
"net-id=net-1",
"network=net-1",
"port=port-1",
"security-group-id=sg-1",
"dvs-id=dvs-1",
"moref=virtualwire-1",
"teamingpolicy=LACP_ACTIVE",
"log-allowed-traffic=true"
"log-allowed-traffic=true",
"az-name=default",
"transit-network=abc",
"moref=abc",
]
self._test_resources_with_args(
resources.nsxv_resources, args)
@ -311,6 +324,11 @@ class TestNsxv3AdminUtils(AbstractTestAdminUtils,
self._patch_object(core_resources.NsxLibSwitchingProfile,
'find_by_display_name',
return_value=[{'id': uuidutils.generate_uuid()}])
self._patch_object(nsx_v3_security.NsxLibFirewallSection,
'get_excludelist',
return_value={'members': [{
'target_type': 'LogicalPort',
'target_id': 'port_id'}]})
super(TestNsxv3AdminUtils, self)._init_mock_plugin()
self._plugin = nsxv3_utils.NsxV3PluginWrapper()
@ -336,9 +354,15 @@ class TestNsxv3AdminUtils(AbstractTestAdminUtils,
args = ["dhcp_profile_uuid=e5b9b249-0034-4729-8ab6-fe4dacaa3a12",
"metadata_proxy_uuid=e5b9b249-0034-4729-8ab6-fe4dacaa3a12",
"nsx-id=e5b9b249-0034-4729-8ab6-fe4dacaa3a12",
"net-id=e5b9b249-0034-4729-8ab6-fe4dacaa3a12",
"availability-zone=default",
"server-ip=1.1.1.1",
"log-allowed-traffic=true"
"log-allowed-traffic=true",
"value=10",
"old-tier0=olduuid",
"new-tier0=newuuid",
"project-id=aaa",
"host-moref=dummy-moref"
]
# Create some neutron objects for the utilities to run on
self._create_router()
@ -378,3 +402,22 @@ class TestNsxpAdminUtils(AbstractTestAdminUtils,
with self.network():
# Run all utilities with backend objects
self._test_resources(resources.nsxp_resources)
def test_resources_with_common_args(self):
"""Run all nsxp admin utilities with some common arguments
Using arguments some apis need to improves the test coverage
"""
args = ["realization_interval=1",
"dhcp-config=dumyuuid",
"old-tier0=olduuid",
"new-tier0=newuuid",
"firewall-match=internal"]
# Create some neutron objects for the utilities to run on
self._create_router()
with self._create_l3_ext_network() as network:
with self.subnet(network=network, enable_dhcp=False) as subnet:
with self.port(subnet=subnet):
# Run all utilities with backend objects
self._test_resources_with_args(
resources.nsxp_resources, args)