Merge "Added policy support for router_nonat , cert cases - All scenario nonot cases - one cert cases (test_prevention_modification_openstack_network)"

This commit is contained in:
Zuul 2019-04-04 04:02:15 +00:00 committed by Gerrit Code Review
commit 011e157820
4 changed files with 232 additions and 55 deletions

View File

@ -125,7 +125,7 @@ class NSXPClient(object):
""" """
NSX-T API Put request for certificate Management NSX-T API Put request for certificate Management
""" """
endpoint = ("/%s/%s" % (component, comp_id)) endpoint = ("%s/%s" % (component, comp_id))
response = self.put(endpoint=endpoint, body=body) response = self.put(endpoint=endpoint, body=body)
return response return response
@ -142,7 +142,7 @@ class NSXPClient(object):
""" """
NSX-T API delete request for certificate Management NSX-T API delete request for certificate Management
""" """
endpoint = ("/%s/%s" % (component, comp_id)) endpoint = ("%s/%s" % (component, comp_id))
response = self.delete(endpoint=endpoint) response = self.delete(endpoint=endpoint)
return response return response
@ -313,3 +313,46 @@ class NSXPClient(object):
nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:] nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:]
nsgroups = self.get_ns_groups(tenant_id=os_tenant_id) nsgroups = self.get_ns_groups(tenant_id=os_tenant_id)
return self.get_nsx_resource_by_name(nsgroups, nsx_name) return self.get_nsx_resource_by_name(nsgroups, nsx_name)
def get_logical_switches(self):
"""
Retrieve all logical switches on NSX backend
"""
return self.get_logical_resources("segments")
def get_logical_switch(self, os_name, os_uuid):
"""
Get the logical switch based on the name and uuid provided.
The name of the logical switch should follow
<os_network_name>_<first 5 os uuid>...<last 5 os uuid>
Return logical switch if found, otherwise return None
"""
if not os_name or not os_uuid:
LOG.error("Name and uuid of OpenStack L2 network need to be "
"present in order to query backend logical switch!")
return None
nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:]
lswitches = self.get_logical_switches()
return self.get_nsx_resource_by_name(lswitches, nsx_name)
def get_logical_router_nat_rules(self, lrouter):
"""
Get all user defined NAT rules of the specific logical router
"""
if not lrouter:
LOG.error("Logical router needs to be present in order "
"to get the NAT rules")
return None
endpoint = "tier-1s/%s/nat/USER/nat-rules" % lrouter['id']
return self.get_logical_resources(endpoint)
def get_logical_router_advertisement(self, lrouter):
"""Get logical router advertisement"""
if not lrouter:
LOG.error("Logical router needs to be present in order "
"to get router advertisement!")
return None
endpoint = "/logical-routers/%s/routing/advertisement" % lrouter['id']
response = self.get(endpoint)
return response.json()

View File

@ -10,11 +10,15 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import time
from tempest.api.network import base from tempest.api.network import base
from tempest import config from tempest import config
from tempest.lib.common.utils import data_utils from tempest.lib.common.utils import data_utils
from tempest.lib import decorators from tempest.lib import decorators
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.services import nsxp_client
from vmware_nsx_tempest_plugin.services import nsxv3_client from vmware_nsx_tempest_plugin.services import nsxv3_client
CONF = config.CONF CONF = config.CONF
@ -35,6 +39,9 @@ class NSXv3NetworksTest(base.BaseNetworkTest):
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager, cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user, CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password) CONF.nsxv3.nsx_password)
cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
@decorators.attr(type='nsxv3') @decorators.attr(type='nsxv3')
@decorators.idempotent_id('63085723-23ae-4109-ac86-69f895097957') @decorators.idempotent_id('63085723-23ae-4109-ac86-69f895097957')
@ -43,6 +50,12 @@ class NSXv3NetworksTest(base.BaseNetworkTest):
name = data_utils.rand_name('network-') name = data_utils.rand_name('network-')
network = self.create_network(network_name=name) network = self.create_network(network_name=name)
net_id = network['id'] net_id = network['id']
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsxp_network = self.nsxp.get_logical_switch(network['name'],
network['id'])
self.assertEqual('ACTIVE', network['status'])
self.assertIsNotNone(nsxp_network)
nsx_network = self.nsx.get_logical_switch(network['name'], nsx_network = self.nsx.get_logical_switch(network['name'],
network['id']) network['id'])
self.assertEqual('ACTIVE', network['status']) self.assertEqual('ACTIVE', network['status'])
@ -51,12 +64,23 @@ class NSXv3NetworksTest(base.BaseNetworkTest):
new_name = "New_network" new_name = "New_network"
body = self.networks_client.update_network(net_id, name=new_name) body = self.networks_client.update_network(net_id, name=new_name)
updated_net = body['network'] updated_net = body['network']
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsxp_network = self.nsxp.get_logical_switch(updated_net['name'],
updated_net['id'])
self.assertEqual(updated_net['name'], new_name)
self.assertIsNotNone(nsxp_network)
nsx_network = self.nsx.get_logical_switch(updated_net['name'], nsx_network = self.nsx.get_logical_switch(updated_net['name'],
updated_net['id']) updated_net['id'])
self.assertEqual(updated_net['name'], new_name) self.assertEqual(updated_net['name'], new_name)
self.assertIsNotNone(nsx_network) self.assertIsNotNone(nsx_network)
# Verify delete network # Verify delete network
self.networks_client.delete_network(updated_net['id']) self.networks_client.delete_network(updated_net['id'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsxp_network = self.nsxp.get_logical_switch(updated_net['name'],
updated_net['id'])
self.assertIsNone(nsxp_network)
nsx_network = self.nsx.get_logical_switch(updated_net['name'], nsx_network = self.nsx.get_logical_switch(updated_net['name'],
updated_net['id']) updated_net['id'])
self.assertIsNone(nsx_network) self.assertIsNone(nsx_network)

View File

@ -13,17 +13,18 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import time
from oslo_log import log as logging from oslo_log import log as logging
from tempest.common import utils from tempest.common import utils
from tempest import config from tempest import config
from tempest.lib import decorators from tempest.lib import decorators
from tempest.lib.common.utils import data_utils from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils from tempest.lib.common.utils import test_utils
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.services import nsxp_client
from vmware_nsx_tempest_plugin.services import nsxv3_client from vmware_nsx_tempest_plugin.services import nsxv3_client
from vmware_nsx_tempest_plugin.services.qos import base_qos from vmware_nsx_tempest_plugin.services.qos import base_qos
from vmware_nsx_tempest_plugin.tests.scenario import manager from vmware_nsx_tempest_plugin.tests.scenario import manager
@ -41,8 +42,8 @@ class TestCertificateMgmt(manager.NetworkScenarioTest):
@classmethod @classmethod
def skip_checks(cls): def skip_checks(cls):
super(TestCertificateMgmt, cls).skip_checks() super(TestCertificateMgmt, cls).skip_checks()
if not (CONF.network.project_networks_reachable if not (CONF.network.project_networks_reachable or
or CONF.network.public_network_id): CONF.network.public_network_id):
msg = ('Either project_networks_reachable must be true, or\ msg = ('Either project_networks_reachable must be true, or\
public_network_id must be defined.') public_network_id must be defined.')
raise cls.skipException(msg) raise cls.skipException(msg)
@ -55,7 +56,11 @@ class TestCertificateMgmt(manager.NetworkScenarioTest):
cls.set_network_resources() cls.set_network_resources()
super(TestCertificateMgmt, cls).setup_credentials() super(TestCertificateMgmt, cls).setup_credentials()
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager, cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user, CONF.nsxv3.nsx_password) CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
@classmethod @classmethod
def resource_setup(cls): def resource_setup(cls):
@ -121,7 +126,7 @@ class TestCertificateMgmt(manager.NetworkScenarioTest):
msg = 'Error: NSX admin is able to modify/delete' msg = 'Error: NSX admin is able to modify/delete'
if all(x in response.json()['error_message'] for x in self.error_msg): if all(x in response.json()['error_message'] for x in self.error_msg):
LOG.info('NSX admin is unable to modify/delete ' LOG.info('NSX admin is unable to modify/delete '
'the openstack object') 'the openstack object')
else: else:
raise Exception(msg) raise Exception(msg)
@ -131,13 +136,13 @@ class TestCertificateMgmt(manager.NetworkScenarioTest):
and a logical port attached to the network and a logical port attached to the network
""" """
self.network = self._create_network(namestart="net-ca") self.network = self._create_network(namestart="net-ca")
self.subnet = self._create_subnet(self.network, self.subnet = self._create_subnet(
cidr=CONF.network.project_network_cidr) self.network, cidr=CONF.network.project_network_cidr)
self.port = self._create_port(network_id=self.network['id'], self.port = self._create_port(network_id=self.network['id'],
namestart='ca') namestart='ca')
msg = 'Logical Port %s not found' % self.port['name'] msg = 'Logical Port %s not found' % self.port['name']
self.assertIsNotNone(self.nsx.get_logical_port( self.assertIsNotNone(self.nsx.get_logical_port(self.port['name']),
self.port['name']), msg) msg)
data = self.nsx.get_logical_port(self.port['name']) data = self.nsx.get_logical_port(self.port['name'])
return data return data
@ -163,24 +168,40 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
Verify if NSX admin is unable to modify this network Verify if NSX admin is unable to modify this network
""" """
self.network = self._create_network() self.network = self._create_network()
self.subnet = self._create_subnet(self.network, self.subnet = self._create_subnet(
cidr=CONF.network.project_network_cidr) self.network, cidr=CONF.network.project_network_cidr)
#check backend if the network was created #check backend if the network was created
msg = 'network %s not found' % self.network['name'] msg = 'network %s not found' % self.network['name']
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self.assertIsNotNone(self.nsxp.get_logical_switch(
self.network['name'], self.network['id']), msg)
self.assertIsNotNone(self.nsx.get_logical_switch( self.assertIsNotNone(self.nsx.get_logical_switch(
self.network['name'], self.network['id']), msg) self.network['name'], self.network['id']), msg)
if CONF.network.backend == 'nsxp':
data_policy = self.nsxp.get_logical_switch(self.network['name'],
self.network['id'])
self.assertEqual(data_policy['_create_user'], self.openstack_tag,
'Incorrect tag for the create user')
data = self.nsx.get_logical_switch(self.network['name'], data = self.nsx.get_logical_switch(self.network['name'],
self.network['id']) self.network['id'])
""" """
Check if backend shows openstack Check if backend shows openstack
as the create user for the object as the create user for the object
""" """
self.assertEqual(data['_create_user'], self.openstack_tag, self.assertEqual(data['_create_user'], self.openstack_tag,
'Incorrect tag for the create user') 'Incorrect tag for the create user')
#try to update network name as NSX admin #try to update network name as NSX admin
data.update({"display_name": "nsx_modified_switch"}) if CONF.network.backend == 'nsxp':
response = self.nsx.ca_put_request(component='logical-switches', data_policy.update({"display_name": "nsx_modified_switch"})
comp_id=data['id'], body=data) response = self.nsxp.ca_put_request(component='segments',
comp_id=data_policy['id'],
body=data_policy)
else:
data.update({"display_name": "nsx_modified_switch"})
response = self.nsx.ca_put_request(component='segments',
comp_id=data['id'],
body=data)
self.parse_response(response) self.parse_response(response)
@decorators.attr(type='nsxv3') @decorators.attr(type='nsxv3')
@ -193,8 +214,8 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
Verify if NSX admin can not delete this router Verify if NSX admin can not delete this router
""" """
self.network = self._create_network() self.network = self._create_network()
self.subnet = self._create_subnet(self.network, self.subnet = self._create_subnet(
cidr=CONF.network.project_network_cidr) self.network, cidr=CONF.network.project_network_cidr)
#create router and add an interface #create router and add an interface
self.router = self._create_router( self.router = self._create_router(
router_name=data_utils.rand_name('router-cert-mgmt'), router_name=data_utils.rand_name('router-cert-mgmt'),
@ -208,19 +229,19 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
self.assertIsNotNone(self.nsx.get_logical_router( self.assertIsNotNone(self.nsx.get_logical_router(
self.router['name'], self.router['id']), msg) self.router['name'], self.router['id']), msg)
data = self.nsx.get_logical_router(self.router['name'], data = self.nsx.get_logical_router(self.router['name'],
self.router['id']) self.router['id'])
""" """
Check if backend shows openstack Check if backend shows openstack
as the create user for the object as the create user for the object
""" """
self.assertEqual(data['_create_user'], self.openstack_tag, self.assertEqual(data['_create_user'], self.openstack_tag,
'Incorrect tag for the create user') 'Incorrect tag for the create user')
#Obtain any router port corresponding to the logical router #Obtain any router port corresponding to the logical router
rtr_ports = self.nsx.get_logical_router_ports(data) rtr_ports = self.nsx.get_logical_router_ports(data)
#try to update router name as NSX admin #try to update router name as NSX admin
data.update({"display_name": "nsx_modified_router"}) data.update({"display_name": "nsx_modified_router"})
response = self.nsx.ca_put_request(component='logical-routers', response = self.nsx.ca_put_request(component='logical-routers',
comp_id=data['id'], body=data) comp_id=data['id'], body=data)
self.parse_response(response) self.parse_response(response)
#try to delete logical router port as NSX admin #try to delete logical router port as NSX admin
if len(rtr_ports) != 0: if len(rtr_ports) != 0:
@ -246,7 +267,7 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
#obtain all switching profiles at the backend #obtain all switching profiles at the backend
qos_policies = self.nsx.get_switching_profiles() qos_policies = self.nsx.get_switching_profiles()
nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies, nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies,
policy['name']) policy['name'])
#check backend if the qos policy was created #check backend if the qos policy was created
msg = 'Qos policy %s not found' % policy['name'] msg = 'Qos policy %s not found' % policy['name']
self.assertIsNotNone(self.nsx.get_switching_profile( self.assertIsNotNone(self.nsx.get_switching_profile(
@ -257,15 +278,15 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
as the create user for the object as the create user for the object
""" """
self.assertEqual(data['_create_user'], self.openstack_tag, self.assertEqual(data['_create_user'], self.openstack_tag,
'Incorrect tag for the create user') 'Incorrect tag for the create user')
#try to update qos policy as NSX admin #try to update qos policy as NSX admin
data.update({"display_name": "nsx_modified_qos-policy"}) data.update({"display_name": "nsx_modified_qos-policy"})
response = self.nsx.ca_put_request(component='switching-profiles', response = self.nsx.ca_put_request(component='switching-profiles',
comp_id=data['id'], body=data) comp_id=data['id'], body=data)
self.parse_response(response) self.parse_response(response)
#try to delete qos policy as NSX admin #try to delete qos policy as NSX admin
response = self.nsx.ca_delete_request(component='switching-profiles', response = self.nsx.ca_delete_request(component='switching-profiles',
comp_id=data['id']) comp_id=data['id'])
self.parse_response(response) self.parse_response(response)
@decorators.attr(type='nsxv3') @decorators.attr(type='nsxv3')
@ -283,25 +304,25 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
self.assertIsNotNone(self.nsx.get_firewall_section( self.assertIsNotNone(self.nsx.get_firewall_section(
self.security_group['name'], self.security_group['id']), msg) self.security_group['name'], self.security_group['id']), msg)
data = self.nsx.get_firewall_section(self.security_group['name'], data = self.nsx.get_firewall_section(self.security_group['name'],
self.security_group['id']) self.security_group['id'])
""" """
Check if backend shows openstack Check if backend shows openstack
as the create user for the object as the create user for the object
""" """
self.assertEqual(data['_create_user'], self.openstack_tag, self.assertEqual(data['_create_user'], self.openstack_tag,
'Incorrect tag for the create user') 'Incorrect tag for the create user')
#obtain firewall rules related to the security group #obtain firewall rules related to the security group
fw_rules = self.nsx.get_firewall_section_rules(data) fw_rules = self.nsx.get_firewall_section_rules(data)
#try to update security group as NSX admin #try to update security group as NSX admin
data.update({"display_name": "nsx_modified_security_group"}) data.update({"display_name": "nsx_modified_security_group"})
response = self.nsx.ca_put_request(component='firewall/sections', response = self.nsx.ca_put_request(component='firewall/sections',
comp_id=data['id'], body=data) comp_id=data['id'], body=data)
self.parse_response(response) self.parse_response(response)
#try to delete logical firewall rule as NSX admin #try to delete logical firewall rule as NSX admin
if len(fw_rules) != 0: if len(fw_rules) != 0:
component = 'firewall/sections/' + data['id'] + '/rules' component = 'firewall/sections/' + data['id'] + '/rules'
response = self.nsx.ca_delete_request(component=component, response = self.nsx.ca_delete_request(component=component,
comp_id=fw_rules[0]['id']) comp_id=fw_rules[0]['id'])
self.parse_response(response) self.parse_response(response)
@decorators.attr(type='nsxv3') @decorators.attr(type='nsxv3')
@ -317,15 +338,15 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
""" """
data = self.ca_topo() data = self.ca_topo()
self.assertEqual(data['_create_user'], self.openstack_tag, self.assertEqual(data['_create_user'], self.openstack_tag,
'Incorrect tag for the create user') 'Incorrect tag for the create user')
#try to update logical port as NSX admin #try to update logical port as NSX admin
data.update({"display_name": "nsx_modified_logical_port"}) data.update({"display_name": "nsx_modified_logical_port"})
response = self.nsx.ca_put_request(component='logical-ports', response = self.nsx.ca_put_request(component='logical-ports',
comp_id=data['id'], body=data) comp_id=data['id'], body=data)
self.parse_response(response) self.parse_response(response)
#try to delete logical port as NSX admin #try to delete logical port as NSX admin
response = self.nsx.ca_delete_request(component='logical-ports', response = self.nsx.ca_delete_request(component='logical-ports',
comp_id=data['id']) comp_id=data['id'])
self.parse_response(response) self.parse_response(response)
@decorators.attr(type='nsxv3') @decorators.attr(type='nsxv3')
@ -344,7 +365,7 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
#obtain all switching profiles at the backend #obtain all switching profiles at the backend
qos_policies = self.nsx.get_switching_profiles() qos_policies = self.nsx.get_switching_profiles()
nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies, nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies,
policy['name']) policy['name'])
#check backend if the qos policy was created #check backend if the qos policy was created
msg = 'Qos policy %s not found' % policy['name'] msg = 'Qos policy %s not found' % policy['name']
self.assertIsNotNone(self.nsx.get_switching_profile( self.assertIsNotNone(self.nsx.get_switching_profile(
@ -352,10 +373,11 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
data = self.nsx.get_switching_profile(nsx_policy['id']) data = self.nsx.get_switching_profile(nsx_policy['id'])
#try to delete qos policy as NSX admin #try to delete qos policy as NSX admin
endpoint = ("/%s/%s" % ('switching-profiles', endpoint = ("/%s/%s" % ('switching-profiles',
data['id'])) data['id']))
response = self.nsx.delete_super_admin(endpoint) response = self.nsx.delete_super_admin(endpoint)
self.assertEqual(response.status_code, 200, self.assertEqual(response.status_code, 200,
"Superadmin unable to delete the qos switching profile") "Superadmin unable to "
"delete the qos switching profile")
@decorators.attr(type='nsxv3') @decorators.attr(type='nsxv3')
@decorators.idempotent_id('a874d78b-eb7a-4df6-a01b-dc0a22422dc2') @decorators.idempotent_id('a874d78b-eb7a-4df6-a01b-dc0a22422dc2')

View File

@ -14,6 +14,7 @@
# under the License. # under the License.
import collections import collections
import time
from oslo_log import log as logging from oslo_log import log as logging
@ -24,6 +25,8 @@ from tempest.lib.common.utils import test_utils
from tempest.lib import decorators from tempest.lib import decorators
from tempest.lib import exceptions from tempest.lib import exceptions
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.services import nsxp_client
from vmware_nsx_tempest_plugin.services import nsxv3_client from vmware_nsx_tempest_plugin.services import nsxv3_client
from vmware_nsx_tempest_plugin.tests.scenario import manager from vmware_nsx_tempest_plugin.tests.scenario import manager
@ -70,6 +73,9 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest):
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager, cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user, CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password) CONF.nsxv3.nsx_password)
cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
def setUp(self): def setUp(self):
super(TestRouterNoNATOps, self).setUp() super(TestRouterNoNATOps, self).setUp()
@ -245,6 +251,12 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest):
""" """
snat = True snat = True
self._setup_network_topo(enable_snat=snat) self._setup_network_topo(enable_snat=snat)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_policy = self.nsxp.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router_policy, None)
self.assertEqual(nsx_router_policy['resource_type'], 'Tier1')
nsx_router = self.nsx.get_logical_router( nsx_router = self.nsx.get_logical_router(
self.router['name'], self.router['id']) self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router, None) self.assertNotEqual(nsx_router, None)
@ -265,22 +277,41 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest):
"""Test update router from NATed to NoNAT scenario""" """Test update router from NATed to NoNAT scenario"""
snat = True snat = True
self._setup_network_topo(enable_snat=snat) self._setup_network_topo(enable_snat=snat)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_policy = self.nsxp.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router_policy, None)
self.assertEqual(nsx_router_policy['resource_type'], 'Tier1')
nsx_router = self.nsx.get_logical_router( nsx_router = self.nsx.get_logical_router(
self.router['name'], self.router['id']) self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router, None) self.assertNotEqual(nsx_router, None)
self.assertEqual(nsx_router['router_type'], 'TIER1') self.assertEqual(nsx_router['router_type'], 'TIER1')
# Check nat rules created correctly # Check nat rules created correctly
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router) if CONF.network.backend == 'nsxp':
nat_rules = self.nsxp.get_logical_router_nat_rules(
nsx_router_policy)
else:
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
router_adv = self.nsx.get_logical_router_advertisement(nsx_router)
# Check router advertisement is correctly set # Check router advertisement is correctly set
router_adv = self.nsx.get_logical_router_advertisement(nsx_router)
adv_msg = "Tier1 router's advertise_nsx_connected_routes is not True" adv_msg = "Tier1 router's advertise_nsx_connected_routes is not True"
nat_msg = "Tier1 router's advertise_nat_routes is not False" nat_msg = "Tier1 router's advertise_nat_routes is not False"
if any(d['action'] == 'NO_DNAT' for d in nat_rules): if any(d['action'] == 'NO_DNAT' for d in nat_rules):
self.assertTrue(len(nat_rules) == 4) self.assertTrue(len(nat_rules) == 4)
else: else:
self.assertTrue(len(nat_rules) == 3) self.assertTrue(len(nat_rules) == 3)
self.assertTrue(router_adv['advertise_nat_routes'], nat_msg) if CONF.network.backend == 'nsxp':
self.assertFalse(router_adv['advertise_nsx_connected_routes'], adv_msg) self.assertTrue(
'TIER1_NAT' in nsx_router_policy['route_advertisement_types'],
nat_msg)
self.assertFalse(
'TIER1_CONNECTED' in nsx_router_policy[
'route_advertisement_types'], adv_msg)
else:
self.assertTrue(router_adv['advertise_nat_routes'], nat_msg)
self.assertFalse(
router_adv['advertise_nsx_connected_routes'], adv_msg)
self._check_network_internal_connectivity(network=self.network) self._check_network_internal_connectivity(network=self.network)
self._check_network_vm_connectivity(network=self.network) self._check_network_vm_connectivity(network=self.network)
self._check_nonat_network_connectivity(should_connect=False) self._check_nonat_network_connectivity(should_connect=False)
@ -293,42 +324,80 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest):
'enable_snat': (not snat)} 'enable_snat': (not snat)}
self._update_router(self.router['id'], self.cmgr_adm.routers_client, self._update_router(self.router['id'], self.cmgr_adm.routers_client,
external_gateway_info) external_gateway_info)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_policy = self.nsxp.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router_policy, None)
self.assertEqual(nsx_router_policy['resource_type'], 'Tier1')
nsx_router = self.nsx.get_logical_router( nsx_router = self.nsx.get_logical_router(
self.router['name'], self.router['id']) self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router, None) self.assertNotEqual(nsx_router, None)
self.assertEqual(nsx_router['router_type'], 'TIER1') self.assertEqual(nsx_router['router_type'], 'TIER1')
# Check nat rules created correctly # Check nat rules created correctly
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router) if CONF.network.backend == 'nsxp':
nat_rules = self.nsxp.get_logical_router_nat_rules(
nsx_router_policy)
else:
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
router_adv = self.nsx.get_logical_router_advertisement(nsx_router)
# Check router advertisement is correctly set # Check router advertisement is correctly set
router_adv = self.nsx.get_logical_router_advertisement(nsx_router)
if len(nat_rules) == 1: if len(nat_rules) == 1:
self.assertTrue(any(d['action'] == 'NO_DNAT' for d in nat_rules)) self.assertTrue(any(d['action'] == 'NO_DNAT' for d in nat_rules))
else: else:
self.assertTrue(len(nat_rules) == 0) self.assertTrue(len(nat_rules) == 0)
self.assertFalse(router_adv['advertise_nat_routes'], nat_msg) if CONF.network.backend == 'nsxp':
self.assertTrue(router_adv['advertise_nsx_connected_routes'], adv_msg) self.assertFalse(
'TIER1_NAT' in nsx_router_policy[
'route_advertisement_types'], nat_msg)
self.assertTrue(
'TIER1_CONNECTED' in nsx_router_policy[
'route_advertisement_types'], adv_msg)
else:
self.assertFalse(router_adv['advertise_nat_routes'], nat_msg)
self.assertTrue(
router_adv['advertise_nsx_connected_routes'], adv_msg)
self._check_nonat_network_connectivity() self._check_nonat_network_connectivity()
def _test_router_nat_update_when_no_snat(self): def _test_router_nat_update_when_no_snat(self):
"""Test update router from NATed to NoNAT scenario""" """Test update router from NATed to NoNAT scenario"""
snat = False snat = False
self._setup_network_topo(enable_snat=snat) self._setup_network_topo(enable_snat=snat)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_policy = self.nsxp.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router_policy, None)
self.assertEqual(nsx_router_policy['resource_type'], 'Tier1')
nsx_router = self.nsx.get_logical_router( nsx_router = self.nsx.get_logical_router(
self.router['name'], self.router['id']) self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router, None) self.assertNotEqual(nsx_router, None)
self.assertEqual(nsx_router['router_type'], 'TIER1') self.assertEqual(nsx_router['router_type'], 'TIER1')
# Check nat rules created correctly # Check nat rules created correctly
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router) if CONF.network.backend == 'nsxp':
nat_rules = self.nsxp.get_logical_router_nat_rules(
nsx_router_policy)
else:
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
router_adv = self.nsx.get_logical_router_advertisement(nsx_router)
# Check router advertisement is correctly set # Check router advertisement is correctly set
router_adv = self.nsx.get_logical_router_advertisement(nsx_router)
adv_msg = "Tier1 router's advertise_nsx_connected_routes is not True" adv_msg = "Tier1 router's advertise_nsx_connected_routes is not True"
nat_msg = "Tier1 router's advertise_nat_routes is not False" nat_msg = "Tier1 router's advertise_nat_routes is not False"
if len(nat_rules) == 1: if len(nat_rules) == 1:
self.assertTrue(any(d['action'] == 'NO_DNAT' for d in nat_rules)) self.assertTrue(any(d['action'] == 'NO_DNAT' for d in nat_rules))
else: else:
self.assertTrue(len(nat_rules) == 0) self.assertTrue(len(nat_rules) == 0)
self.assertFalse(router_adv['advertise_nat_routes'], nat_msg) if CONF.network.backend == 'nsxp':
self.assertTrue(router_adv['advertise_nsx_connected_routes'], adv_msg) self.assertFalse(
'TIER1_NAT' in nsx_router_policy[
'route_advertisement_types'], nat_msg)
self.assertTrue(
'TIER1_CONNECTED' in nsx_router_policy[
'route_advertisement_types'], adv_msg)
else:
self.assertFalse(router_adv['advertise_nat_routes'], nat_msg)
self.assertTrue(
router_adv['advertise_nsx_connected_routes'], adv_msg)
self._check_nonat_network_connectivity() self._check_nonat_network_connectivity()
# Update router to Enable snat and associate floating ip # Update router to Enable snat and associate floating ip
external_gateway_info = { external_gateway_info = {
@ -338,20 +407,39 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest):
external_gateway_info) external_gateway_info)
floating_ip = self.create_floating_ip(self.server) floating_ip = self.create_floating_ip(self.server)
self.floating_ip_tuple = Floating_IP_tuple(floating_ip, self.server) self.floating_ip_tuple = Floating_IP_tuple(floating_ip, self.server)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_policy = self.nsxp.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router_policy, None)
self.assertEqual(nsx_router_policy['resource_type'], 'Tier1')
nsx_router = self.nsx.get_logical_router( nsx_router = self.nsx.get_logical_router(
self.router['name'], self.router['id']) self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router, None) self.assertNotEqual(nsx_router, None)
self.assertEqual(nsx_router['router_type'], 'TIER1') self.assertEqual(nsx_router['router_type'], 'TIER1')
# Check nat rules created correctly # Check nat rules created correctly
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router) if CONF.network.backend == 'nsxp':
nat_rules = self.nsxp.get_logical_router_nat_rules(
nsx_router_policy)
else:
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
router_adv = self.nsx.get_logical_router_advertisement(nsx_router)
# Check router advertisement is correctly set # Check router advertisement is correctly set
router_adv = self.nsx.get_logical_router_advertisement(nsx_router)
if any(d['action'] == 'NO_DNAT' for d in nat_rules): if any(d['action'] == 'NO_DNAT' for d in nat_rules):
self.assertTrue(len(nat_rules) == 4) self.assertTrue(len(nat_rules) == 4)
else: else:
self.assertTrue(len(nat_rules) == 3) self.assertTrue(len(nat_rules) == 3)
self.assertTrue(router_adv['advertise_nat_routes'], nat_msg) if CONF.network.backend == 'nsxp':
self.assertFalse(router_adv['advertise_nsx_connected_routes'], adv_msg) self.assertTrue(
'TIER1_NAT' in nsx_router_policy[
'route_advertisement_types'], nat_msg)
self.assertFalse(
'TIER1_CONNECTED' in nsx_router_policy[
'route_advertisement_types'], adv_msg)
else:
self.assertTrue(router_adv['advertise_nat_routes'], nat_msg)
self.assertFalse(
router_adv['advertise_nsx_connected_routes'], adv_msg)
self._check_network_internal_connectivity(network=self.network) self._check_network_internal_connectivity(network=self.network)
self._check_network_vm_connectivity(network=self.network) self._check_network_vm_connectivity(network=self.network)