Remove Tap-as-a-service Support

The platform support will be changing so we will remove this in the
mean time.

The patch also remove false positives for test failures.

Change-Id: I118010085d305883f521fa01a5fd341e775eea5a
This commit is contained in:
Gary Kotton 2017-03-19 22:26:34 -07:00
parent c76464dac3
commit 3759da2874
10 changed files with 12 additions and 1251 deletions

View File

@ -141,15 +141,3 @@ Enable trunk service and configure following flags in ``local.conf``::
# Trunk plugin NSXv3 driver config # Trunk plugin NSXv3 driver config
ENABLED_SERVICES+=,q-trunk ENABLED_SERVICES+=,q-trunk
Q_SERVICE_PLUGIN_CLASSES=trunk Q_SERVICE_PLUGIN_CLASSES=trunk
TaaS Driver
~~~~~~~~~~~
Add tap-as-a-service repo as an external repository and configure following flags in ``local.conf``::
[[local]|[localrc]]
# TaaS plugin NSXv3 driver config
enable_plugin tap-as-a-service https://github.com/openstack/tap-as-a-service
enable_service taas
TAAS_SERVICE_DRIVER=TAAS:vmware_nsx_taas:vmware_nsx.services.neutron_taas.nsx_v3.driver.NsxV3Driver:default

View File

@ -7,7 +7,6 @@ ${DIR}/tox_install_project.sh neutron neutron $*
${DIR}/tox_install_project.sh networking-l2gw networking_l2gw $* ${DIR}/tox_install_project.sh networking-l2gw networking_l2gw $*
${DIR}/tox_install_project.sh networking-sfc networking_sfc $* ${DIR}/tox_install_project.sh networking-sfc networking_sfc $*
${DIR}/tox_install_project.sh neutron-lbaas neutron_lbaas $* ${DIR}/tox_install_project.sh neutron-lbaas neutron_lbaas $*
${DIR}/tox_install_project.sh tap-as-a-service neutron_taas $*
${DIR}/tox_install_project.sh vmware-nsxlib vmware_nsxlib $* ${DIR}/tox_install_project.sh vmware-nsxlib vmware_nsxlib $*
CONSTRAINTS_FILE=$1 CONSTRAINTS_FILE=$1

View File

@ -1,375 +0,0 @@
# Copyright 2016 VMware, Inc.
#
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutron_lib.plugins import directory
from neutron_taas.db import taas_db
from neutron_taas.services.taas import service_drivers as base_driver
from oslo_db import exception as db_exc
from oslo_log import log as logging
from oslo_utils import excutils
from vmware_nsx._i18n import _, _LE, _LW
from vmware_nsx.common import exceptions as nsx_exc
from vmware_nsx.db import db as nsx_db
from vmware_nsx.plugins.nsx_v3 import utils as v3_utils
from vmware_nsxlib import v3 as nsxlib
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
from vmware_nsxlib.v3 import resources as nsx_resources
LOG = logging.getLogger(__name__)
class NsxV3Driver(base_driver.TaasBaseDriver,
taas_db.Taas_db_Mixin):
"""Class to handle API calls for Port Mirroring and NSXv3 backend."""
def __init__(self, service_plugin):
LOG.debug("Loading TaaS NsxV3Driver.")
super(NsxV3Driver, self).__init__(service_plugin)
@property
def _nsx_plugin(self):
return directory.get_plugin()
def _validate_tap_flow(self, source_port, dest_port):
# Verify whether the source port is not same as the destination port
if source_port['id'] == dest_port['id']:
msg = (_("Destination port %(dest)s is same as source port "
"%(src)s") % {'dest': dest_port['id'],
'src': source_port['id']})
raise nsx_exc.NsxTaaSDriverException(msg=msg)
def create_tap_service_precommit(self, context):
pass
def create_tap_service_postcommit(self, context):
pass
def delete_tap_service_precommit(self, context):
pass
def delete_tap_service_postcommit(self, context):
pass
def create_tap_flow_precommit(self, context):
"""Validate and create database entries for creation of tap flow."""
tf = context.tap_flow
# Retrieve source port details.
source_port = self._get_port_details(
context._plugin_context, tf.get('source_port'))
# Retrieve tap service and destination port details.
ts = self._get_tap_service(
context._plugin_context, tf.get('tap_service_id'))
dest_port = self._get_port_details(
context._plugin_context, ts.get('port_id'))
self._validate_tap_flow(source_port, dest_port)
def _convert_to_backend_direction(self, direction):
nsx_direction = None
if direction == 'BOTH':
nsx_direction = 'BIDIRECTIONAL'
elif direction == 'IN':
nsx_direction = 'INGRESS'
elif direction == 'OUT':
nsx_direction = 'EGRESS'
return nsx_direction
def _convert_to_backend_source_port(self, session, port_id):
nsx_port_id = nsx_db.get_nsx_switch_and_port_id(session, port_id)[1]
return [{"resource_type": "LogicalPortMirrorSource",
"port_ids": [nsx_port_id]}]
def _convert_to_backend_dest_port(self, session, port_id):
nsx_port_id = nsx_db.get_nsx_switch_and_port_id(session, port_id)[1]
return {"resource_type": "LogicalPortMirrorDestination",
"port_ids": [nsx_port_id]}
def _is_local_span(self, context, src_port_id, dest_port_id):
"""Verify whether the mirror session is Local or L3SPAN."""
# TODO(abhiraut): Create only L3SPAN until we find a way to
# detect local SPAN support from backend.
return False
def _update_port_at_backend(self, context, port_id, switching_profile,
delete_profile):
"""Update a logical port on the backend."""
port = self._get_port_details(context._plugin_context,
port_id)
# Retrieve logical port ID based on neutron port ID.
nsx_port_id = nsx_db.get_nsx_switch_and_port_id(
session=context._plugin_context.session,
neutron_id=port_id)[1]
# Retrieve source logical port from the backend.
nsx_port = self._nsx_plugin._port_client.get(nsx_port_id)
if delete_profile:
# Prepare switching profile resources retrieved from backend
# and pop the port mirror switching profile.
switching_profile_ids = self._prepare_switch_profiles(
nsx_port.get('switching_profile_ids', []),
switching_profile)
else:
# Prepare switching profile resources retrieved from backend.
switching_profile_ids = self._prepare_switch_profiles(
nsx_port.get('switching_profile_ids', []))
# Update body with PortMirroring switching profile.
switching_profile_ids.append(
self._get_switching_profile_resource(
switching_profile['id'],
nsx_resources.SwitchingProfileTypes.PORT_MIRRORING))
address_bindings = self._nsx_plugin._build_address_bindings(port)
#NOTE(abhiraut): Consider passing attachment_type
self._nsx_plugin._port_client.update(
lport_id=nsx_port.get('id'),
vif_uuid=port_id,
name=nsx_port.get('display_name'),
admin_state=nsx_port.get('admin_state'),
address_bindings=address_bindings,
switch_profile_ids=switching_profile_ids,)
def _prepare_switch_profiles(self, profiles, deleted_profile=None):
switch_profile_ids = []
if not deleted_profile:
for profile in profiles:
# profile is a dict of type {'key': profile_type,
# 'value': profile_id}
profile_resource = self._get_switching_profile_resource(
profile_id=profile['value'],
profile_type=profile['key'])
switch_profile_ids.append(profile_resource)
else:
for profile in profiles:
if profile['value'] == deleted_profile['id']:
continue
profile_resource = self._get_switching_profile_resource(
profile_id=profile['value'],
profile_type=profile['key'])
switch_profile_ids.append(profile_resource)
return switch_profile_ids
def _get_switching_profile_resource(self, profile_id, profile_type):
return nsx_resources.SwitchingProfileTypeId(
profile_type=profile_type,
profile_id=profile_id)
def create_tap_flow_postcommit(self, context):
"""Create tap flow and port mirror session on NSX backend."""
tf = context.tap_flow
# Retrieve tap service.
ts = self._get_tap_service(context._plugin_context,
tf.get('tap_service_id'))
src_port_id = tf.get('source_port')
dest_port_id = ts.get('port_id')
nsxlib = v3_utils.get_nsxlib_wrapper()
tags = nsxlib.build_v3_tags_payload(
tf, resource_type='os-neutron-mirror-id',
project_name=context._plugin_context.tenant_name)
nsx_direction = self._convert_to_backend_direction(
tf.get('direction'))
# Create a port mirroring session object if local SPAN. Otherwise
# create a port mirroring switching profile for L3SPAN.
if self._is_local_span(context, src_port_id, dest_port_id):
self._create_local_span(context, src_port_id, dest_port_id,
nsx_direction, tags)
else:
self._create_l3span(context, src_port_id, dest_port_id,
nsx_direction, tags)
def _create_l3span(self, context, src_port_id, dest_port_id, direction,
tags):
"""Create a PortMirroring SwitchingProfile for L3SPAN."""
tf = context.tap_flow
# Verify whether destination port is L3 reachable. i.e. destination
# port has a floating IP address.
fips = self._nsx_plugin.get_floatingips(
context._plugin_context, filters={'port_id': [dest_port_id]})
if not fips:
msg = (_("Destination port %s must have a floating IP for "
"L3 SPAN") % dest_port_id)
raise nsx_exc.NsxTaaSDriverException(msg=msg)
destinations = []
# Retrieve destination port's IP addresses and add it to the list
# since the backend expects a list of IP addresses.
for fip in fips:
# NOTE(abhiraut): nsx-v3 doesn't seem to handle ipv6 addresses
# currently so for now we remove them here and do not pass
# them to the backend which would raise an error.
if netaddr.IPNetwork(fip['floating_ip_address']).version == 6:
LOG.warning(_LW("Skipping IPv6 address %(ip)s for L3SPAN "
"tap flow: %(tap_flow)s"),
{'tap_flow': tf['id'],
'ip': fip['floating_ip_address']})
continue
destinations.append(fip['floating_ip_address'])
# Create a switch profile in the backend.
try:
port_mirror_profile = (self._nsx_plugin._switching_profiles.
create_port_mirror_profile(
display_name=tf.get('name'),
description=tf.get('description'),
direction=direction,
destinations=destinations,
tags=tags))
except nsxlib_exc.ManagerError:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Unable to create port mirror switch profile "
"for tap flow %s on NSX backend, rolling back "
"changes on neutron."), tf['id'])
# Create internal mappings between tap flow and port mirror switch
# profile. Ideally DB transactions must take place in precommit, but
# we rely on the NSX backend to retrieve the port mirror profile UUID,
# we perform the create action in postcommit.
try:
nsx_db.add_port_mirror_session_mapping(
session=context._plugin_context.session,
tf_id=tf['id'],
pm_session_id=port_mirror_profile['id'])
except db_exc.DBError:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Unable to create port mirror session db "
"mappings for tap flow %s. Rolling back "
"changes in Neutron."), tf['id'])
self._nsx_plugin._switching_profiles.delete(
port_mirror_profile['id'])
# Update the source port to include the port mirror switch profile.
try:
self._update_port_at_backend(context=context, port_id=src_port_id,
switching_profile=port_mirror_profile,
delete_profile=False)
except nsxlib_exc.ManagerError:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Unable to update source port %(port)s with "
"switching profile %(profile) for tap flow "
"%(tap_flow)s on NSX backend, rolling back "
"changes on neutron."),
{'tap_flow': tf['id'],
'port': src_port_id,
'profile': port_mirror_profile['id']})
self._nsx_plugin._switching_profiles.delete(
port_mirror_profile['id'])
def _create_local_span(self, context, src_port_id, dest_port_id,
direction, tags):
"""Create a PortMirroring session on the backend for local SPAN."""
tf = context.tap_flow
# Backend expects a list of source ports and destination ports.
# Due to TaaS API requirements, we are only able to add one port
# as a source port and one port as a destination port in a single
# request. Hence we send a list of one port for source_ports
# and dest_ports.
nsx_src_ports = self._convert_to_backend_source_port(
context._plugin_context.session, src_port_id)
nsx_dest_ports = self._convert_to_backend_dest_port(
context._plugin_context.session, dest_port_id)
# Create port mirror session on the backend
try:
nsxlib = v3_utils.get_nsxlib_wrapper()
pm_session = nsxlib.port_mirror.create_session(
source_ports=nsx_src_ports,
dest_ports=nsx_dest_ports,
direction=direction,
description=tf.get('description'),
name=tf.get('name'),
tags=tags)
except nsxlib_exc.ManagerError:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Unable to create port mirror session %s "
"on NSX backend, rolling back "
"changes on neutron."), tf['id'])
# Create internal mappings between tap flow and port mirror session.
# Ideally DB transactions must take place in precommit, but since we
# rely on the NSX backend to retrieve the port session UUID, we perform
# the create action in postcommit.
try:
nsx_db.add_port_mirror_session_mapping(
session=context._plugin_context.session,
tf_id=tf['id'],
pm_session_id=pm_session['id'])
except db_exc.DBError:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Unable to create port mirror session db "
"mappings for tap flow %s. Rolling back "
"changes in Neutron."), tf['id'])
nsxlib.port_mirror.delete_session(pm_session['id'])
def delete_tap_flow_precommit(self, context):
pass
def delete_tap_flow_postcommit(self, context):
"""Delete tap flow and port mirror session on NSX backend."""
tf = context.tap_flow
ts = self._get_tap_service(context._plugin_context,
tf.get('tap_service_id'))
# Retrieve port mirroring session mappings.
pm_session_mapping = nsx_db.get_port_mirror_session_mapping(
session=context._plugin_context.session,
tf_id=tf['id'])
src_port_id = tf.get('source_port')
dest_port_id = ts.get('port_id')
if self._is_local_span(context, src_port_id, dest_port_id):
self._delete_local_span(
context, pm_session_mapping['port_mirror_session_id'])
else:
self._delete_l3span(
context, pm_session_mapping['port_mirror_session_id'])
# Delete internal mappings between tap flow and port mirror session.
# Ideally DB transactions must take place in precommit, but since we
# rely on the DB mapping to retrieve NSX backend UUID for the port
# session mapping, we perform the delete action in postcommit.
try:
nsx_db.delete_port_mirror_session_mapping(
session=context._plugin_context.session,
tf_id=tf['id'])
except db_exc.DBError:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Unable to delete port mirror session db "
"mappings %(pm)s for tap flow %(tf)s"), tf['id'])
def _delete_l3span(self, context, pm_profile_id):
tf = context.tap_flow
src_port_id = tf.get('source_port')
port_mirror_profile = self._nsx_plugin._switching_profiles.get(
uuid=pm_profile_id)
try:
# Update source port and remove L3 switching profile.
self._update_port_at_backend(context=context, port_id=src_port_id,
switching_profile=port_mirror_profile,
delete_profile=True)
except nsxlib_exc.ManagerError:
LOG.error(_LE("Unable to update source port %(port)s "
"to delete port mirror profile %(pm)s on NSX "
"backend."),
{'pm': pm_profile_id,
'port': src_port_id})
try:
# Delete port mirroring switching profile
self._nsx_plugin._switching_profiles.delete(uuid=pm_profile_id)
except nsxlib_exc.ManagerError:
LOG.error(_LE("Unable to delete port mirror switching profile "
"%s on NSX backend."), pm_profile_id)
def _delete_local_span(self, context, pm_session_id):
# Delete port mirroring session on the backend
try:
nsxlib.port_mirror.delete_session(pm_session_id)
except nsxlib_exc.ManagerError:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Unable to delete port mirror session %s "
"on NSX backend."), pm_session_id)

View File

@ -384,9 +384,10 @@ class LsnManagerTestCase(base.BaseTestCase):
def test_lsn_port_get_lsn_not_found_silent_raise(self): def test_lsn_port_get_lsn_not_found_silent_raise(self):
with mock.patch.object(self.manager, 'lsn_get', return_value=None): with mock.patch.object(self.manager, 'lsn_get', return_value=None):
expected = self.manager.lsn_port_get( result = self.manager.lsn_port_get(
mock.ANY, self.net_id, self.sub_id, raise_on_err=False) mock.ANY, self.net_id, self.sub_id, raise_on_err=False)
self.assertEqual(expected, (None, None)) expected = (None, None)
self.assertEqual(expected, result)
def test_lsn_port_get_port_not_found_on_raise(self): def test_lsn_port_get_port_not_found_on_raise(self):
self.mock_lsn_api.lsn_port_by_subnet_get.side_effect = n_exc.NotFound self.mock_lsn_api.lsn_port_by_subnet_get.side_effect = n_exc.NotFound
@ -400,9 +401,10 @@ class LsnManagerTestCase(base.BaseTestCase):
self.mock_lsn_api.lsn_port_by_subnet_get.side_effect = n_exc.NotFound self.mock_lsn_api.lsn_port_by_subnet_get.side_effect = n_exc.NotFound
with mock.patch.object( with mock.patch.object(
self.manager, 'lsn_get', return_value=self.lsn_id): self.manager, 'lsn_get', return_value=self.lsn_id):
expected = self.manager.lsn_port_get( result = self.manager.lsn_port_get(
mock.ANY, self.net_id, self.sub_id, raise_on_err=False) mock.ANY, self.net_id, self.sub_id, raise_on_err=False)
self.assertEqual(expected, (self.lsn_id, None)) expected = (self.lsn_id, None)
self.assertEqual(expected, result)
def test_lsn_port_create(self): def test_lsn_port_create(self):
self.mock_lsn_api.lsn_port_create.return_value = self.lsn_port_id self.mock_lsn_api.lsn_port_create.return_value = self.lsn_port_id
@ -794,7 +796,8 @@ class PersistentLsnManagerTestCase(testlib_api.SqlTestCase):
def test_lsn_port_get_silent_not_found(self): def test_lsn_port_get_silent_not_found(self):
result = self.manager.lsn_port_get( result = self.manager.lsn_port_get(
self.context, self.net_id, self.sub_id, raise_on_err=False) self.context, self.net_id, self.sub_id, raise_on_err=False)
self.assertEqual((None, None), result) expected = (None, None)
self.assertEqual(expected, result)
def test_lsn_port_get_sync_on_missing(self): def test_lsn_port_get_sync_on_missing(self):
return return
@ -830,7 +833,8 @@ class PersistentLsnManagerTestCase(testlib_api.SqlTestCase):
def test_lsn_port_get_by_mac_silent_not_found(self): def test_lsn_port_get_by_mac_silent_not_found(self):
result = self.manager.lsn_port_get_by_mac( result = self.manager.lsn_port_get_by_mac(
self.context, self.net_id, self.sub_id, raise_on_err=False) self.context, self.net_id, self.sub_id, raise_on_err=False)
self.assertEqual((None, None), result) expected = (None, None)
self.assertEqual(expected, result)
def test_lsn_port_create(self): def test_lsn_port_create(self):
lsn_db.lsn_add(self.context, self.net_id, self.lsn_id) lsn_db.lsn_add(self.context, self.net_id, self.lsn_id)
@ -860,7 +864,8 @@ class PersistentLsnManagerTestCase(testlib_api.SqlTestCase):
self.sub_id, self.mac, self.lsn_id) self.sub_id, self.mac, self.lsn_id)
self.manager.lsn_port_delete( self.manager.lsn_port_delete(
self.context, self.lsn_id, self.lsn_port_id) self.context, self.lsn_id, self.lsn_port_id)
self.assertEqual((None, None), self.manager.lsn_port_get( expected = (None, None)
self.assertEqual(expected, self.manager.lsn_port_get(
self.context, self.lsn_id, self.sub_id, raise_on_err=False)) self.context, self.lsn_id, self.sub_id, raise_on_err=False))
def test_lsn_port_delete_not_existent(self): def test_lsn_port_delete_not_existent(self):

View File

@ -1,189 +0,0 @@
# Copyright (c) 2016 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
from neutron_taas.extensions import taas
from neutron_taas.services.taas import taas_plugin
from neutron_taas.tests.unit.db import test_taas_db
from neutron_lib import context
from oslo_utils import importutils
from oslo_utils import uuidutils
from vmware_nsx.common import exceptions as nsx_exc
from vmware_nsx.services.neutron_taas.nsx_v3 import driver as nsx_v3_driver
from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_nsx_v3_plugin
_uuid = uuidutils.generate_uuid
NSX_V3_PLUGIN_CLASS = ('vmware_nsx.plugins.nsx_v3.plugin.NsxV3Plugin')
class TestNsxV3TaaSDriver(test_taas_db.TaaSDbTestCase,
test_nsx_v3_plugin.NsxV3PluginTestCaseMixin,
test_plugin.NeutronDbPluginV2TestCase):
def setUp(self):
super(TestNsxV3TaaSDriver, self).setUp()
self.driver = nsx_v3_driver.NsxV3Driver(mock.MagicMock())
mock.patch('neutron.services.service_base.load_drivers',
return_value=({'dummyprovider': self.driver},
'dummyprovider')).start()
mock.patch('neutron.db.servicetype_db.ServiceTypeManager.get_instance',
return_value=mock.MagicMock()).start()
self.taas_plugin = taas_plugin.TaasPlugin()
self.core_plugin = importutils.import_object(NSX_V3_PLUGIN_CLASS)
mock.patch(
'vmware_nsx.plugins.nsx_v3.plugin.NsxV3Plugin.get_floatingips',
return_value=([{'floating_ip_address': '172.10.10.10'}])).start()
self.ctx = context.get_admin_context()
def test_validate_tap_flow_same_network_same_port_fail(self):
with self.port() as src_port:
self.assertRaises(nsx_exc.NsxTaaSDriverException,
self.driver._validate_tap_flow,
src_port['port'], src_port['port'])
def test_validate_tap_flow_same_network_different_port(self):
with self.network() as network:
with self.subnet(network=network) as subnet:
with self.port(subnet=subnet) as src_port:
with self.port(subnet=subnet) as dest_port:
result = self.driver._validate_tap_flow(
src_port['port'],
dest_port['port'])
# result = None signifies that the method returned
# with no exceptions.
self.assertIsNone(result)
def test_convert_to_backend_direction(self):
direction = 'BOTH'
nsx_direction = self.driver._convert_to_backend_direction(direction)
self.assertEqual('BIDIRECTIONAL', nsx_direction)
direction = 'IN'
nsx_direction = self.driver._convert_to_backend_direction(direction)
self.assertEqual('INGRESS', nsx_direction)
direction = 'OUT'
nsx_direction = self.driver._convert_to_backend_direction(direction)
self.assertEqual('EGRESS', nsx_direction)
def test_convert_to_backend_source_port(self):
nsx_id = _uuid()
with mock.patch('vmware_nsx.db.db.get_nsx_switch_and_port_id',
return_value=(_uuid(), nsx_id)):
result = self.driver._convert_to_backend_source_port(
self.ctx.session, _uuid())
self.assertEqual(1, len(result))
self.assertEqual('LogicalPortMirrorSource',
result[0].get('resource_type'))
self.assertEqual(1, len(result[0].get('port_ids')))
self.assertEqual(nsx_id, result[0].get('port_ids')[0])
def test_convert_to_backend_dest_port(self):
nsx_id = _uuid()
with mock.patch('vmware_nsx.db.db.get_nsx_switch_and_port_id',
return_value=(_uuid(), nsx_id)):
result = self.driver._convert_to_backend_dest_port(
self.ctx.session, _uuid())
self.assertEqual('LogicalPortMirrorDestination',
result.get('resource_type'))
self.assertEqual(1, len(result.get('port_ids')))
self.assertEqual(nsx_id, result.get('port_ids')[0])
def test_create_tap_service(self):
ts_name = 'test-tap-service'
with self.port(tenant_id=self.tenant_id) as port:
ts_data = self._get_tap_service_data(
name=ts_name, port_id=port['port']['id'])
ts = self.taas_plugin.create_tap_service(self.ctx, ts_data)
self.assertIsNotNone(ts)
self.assertEqual(ts_name, ts['name'])
self.assertEqual(port['port']['id'], ts['port_id'])
def test_delete_tap_service(self):
with self.port(tenant_id=self.tenant_id) as port:
ts_data = self._get_tap_service_data(port_id=port['port']['id'])
ts = self.taas_plugin.create_tap_service(self.ctx, ts_data)
self.assertIsNotNone(ts)
self.taas_plugin.delete_tap_service(self.ctx, ts['id'])
self.assertRaises(taas.TapServiceNotFound,
self._get_tap_service, ts['id'])
def test_create_tap_flow(self):
tf_name = 'test-tap-flow'
with self.network() as network:
with self.subnet(network=network) as subnet:
with self.port(tenant_id=self.tenant_id,
subnet=subnet) as dest_port:
with self.port(tenant_id=self.tenant_id,
subnet=subnet) as src_port:
ts_data = self._get_tap_service_data(
port_id=dest_port['port']['id'])
ts = self.taas_plugin.create_tap_service(
self.ctx, ts_data)
tf_data = self._get_tap_flow_data(
tap_service_id=ts['id'],
source_port=src_port['port']['id'],
name=tf_name)
tf = self.taas_plugin.create_tap_flow(self.ctx,
tf_data)
self.assertIsNotNone(tf)
self.assertEqual(tf_name, tf['name'])
self.assertEqual(src_port['port']['id'],
tf['source_port'])
def test_create_tap_flow_same_network_same_port_fail(self):
tf_name = 'test-tap-flow'
with self.network() as network:
with self.subnet(network=network) as subnet:
with self.port(tenant_id=self.tenant_id,
subnet=subnet) as port:
ts_data = self._get_tap_service_data(
port_id=port['port']['id'])
ts = self.taas_plugin.create_tap_service(
self.ctx, ts_data)
tf_data = self._get_tap_flow_data(
tap_service_id=ts['id'],
source_port=port['port']['id'],
name=tf_name)
self.assertRaises(nsx_exc.NsxTaaSDriverException,
self.taas_plugin.create_tap_flow,
self.ctx,
tf_data)
def test_delete_tap_flow(self):
tf_name = 'test-tap-flow'
with self.network() as network:
with self.subnet(network=network) as subnet:
with self.port(tenant_id=self.tenant_id,
subnet=subnet) as dest_port:
with self.port(tenant_id=self.tenant_id,
subnet=subnet) as src_port:
ts_data = self._get_tap_service_data(
port_id=dest_port['port']['id'])
ts = self.taas_plugin.create_tap_service(
self.ctx, ts_data)
tf_data = self._get_tap_flow_data(
tap_service_id=ts['id'],
source_port=src_port['port']['id'],
name=tf_name)
tf = self.taas_plugin.create_tap_flow(self.ctx,
tf_data)
self.assertIsNotNone(tf)
self.taas_plugin.delete_tap_flow(self.ctx, tf['id'])
self.assertRaises(taas.TapFlowNotFound,
self._get_tap_flow, tf['id'])

View File

@ -1,109 +0,0 @@
# Copyright 2016 VMware Inc
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from tempest.lib.services.network import base
from vmware_nsx_tempest._i18n import _LI
from vmware_nsx_tempest._i18n import _LW
from vmware_nsx_tempest.services import network_client_base as base_client
LOG = log.getLogger(__name__)
class TaaSClient(base.BaseNetworkClient):
"""
Request resources via API for TapService and TapFlow
create request
show request
delete request
list all request
"""
# Tap Service
def create_tap_service(self, **kwargs):
uri = '/taas/tap_services'
post_data = {'tap_service': kwargs}
LOG.info(_LI("URI : %(uri)s, posting data : %(post_data)s") % {
"uri": uri, "post_data": post_data})
return self.create_resource(uri, post_data)
def list_tap_service(self, **filters):
uri = '/taas/tap_services'
LOG.info(_LI("URI : %(uri)s") % {"uri": uri})
return self.list_resources(uri, **filters)
def show_tap_service(self, ts_id, **fields):
uri = '/taas/tap_services' + "/" + ts_id
LOG.info(_LI("URI : %(uri)s") % {"uri": uri})
return self.show_resource(uri, **fields)
def delete_tap_service(self, ts_id):
uri = '/taas/tap_services' + "/" + ts_id
LOG.info(_LI("URI : %(uri)s") % {"uri": uri})
return self.delete_resource(uri)
# Tap Flow
def create_tap_flow(self, **kwargs):
uri = '/taas/tap_flows'
post_data = {'tap_flow': kwargs}
LOG.info(_LI("URI : %(uri)s, posting data : %(post_data)s") % {
"uri": uri, "post_data": post_data})
return self.create_resource(uri, post_data)
def list_tap_flow(self, **filters):
uri = '/taas/tap_flows'
LOG.info(_LI("URI : %(uri)s") % {"uri": uri})
return self.list_resources(uri, **filters)
def show_tap_flow(self, tf_id, **fields):
uri = '/taas/tap_flows' + "/" + tf_id
LOG.info(_LI("URI : %(uri)s") % {"uri": uri})
return self.show_resource(uri, **fields)
def delete_tap_flow(self, tf_id):
uri = '/taas/tap_flows' + "/" + tf_id
LOG.info(_LI("URI : %(uri)s") % {"uri": uri})
return self.delete_resource(uri)
def get_client(client_mgr):
"""
Create a TaaS client from manager or networks_client
"""
try:
manager = getattr(client_mgr, "manager", client_mgr)
net_client = getattr(manager, "networks_client")
_params = base_client.default_params_with_timeout_values.copy()
except AttributeError as attribute_err:
LOG.warning(_LW("Failed to locate the attribute, Error: %(err_msg)s") %
{"err_msg": attribute_err.__str__()})
_params = {}
client = TaaSClient(net_client.auth_provider,
net_client.service,
net_client.region,
net_client.endpoint_type,
**_params)
return client

View File

@ -1,558 +0,0 @@
# Copyright 2016 VMware Inc
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tempest.api.network import base
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest import test
from vmware_nsx_tempest._i18n import _LI
from vmware_nsx_tempest._i18n import _LW
from vmware_nsx_tempest.services import nsxv3_client
from vmware_nsx_tempest.services import taas_client
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TaaSJsonTest(base.BaseNetworkTest):
"""Tap Service and Tap Flow API's can be accessed using the TaaS Client .
Tap Service is created by associating floating ip to the destination
port and Tap Flow is created by binding the Tap Service created with the
source port .
CRUD Operations for Tap Service and Tap Flow are covered .
"""
@classmethod
def skip_checks(cls):
super(TaaSJsonTest, cls).skip_checks()
if not test.is_extension_enabled('taas', 'network'):
msg = "taas extension not enabled."
raise cls.skipException(msg)
if not CONF.network.public_network_id:
msg = "Public network id not found."
raise cls.skipException(msg)
@classmethod
def resource_setup(cls):
super(TaaSJsonTest, cls).resource_setup()
cls.ext_net_id = CONF.network.public_network_id
# Create the topology to test TaaS Client
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
cls.router = cls.create_router(data_utils.rand_name('router-'),
external_network_id=cls.ext_net_id)
cls.create_router_interface(cls.router['id'], cls.subnet['id'])
for i in range(4):
cls.create_port(cls.network)
@classmethod
def setup_clients(cls):
super(TaaSJsonTest, cls).setup_clients()
try:
cls.tclient = taas_client.get_client(cls.manager)
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
except AttributeError as attribute_err:
LOG.warning(
_LW("Failed to locate the attribute, Error: %(err_msg)s") %
{"err_msg": attribute_err.__str__()})
def _create_floating_ip(self, port_index):
# Create and associates floating ip to the port based on port index
create_body = self.floating_ips_client.create_floatingip(
floating_network_id=self.ext_net_id,
port_id=self.ports[int(port_index)]['id'])
fip = create_body['floatingip']
return fip
def _create_tap_service_env(self, port_index):
"""
Creates floating ip and device_tap_service dict for
Tap service environment
"""
fip = self._create_floating_ip(port_index)
tap_service_name = data_utils.rand_name('tapservice-ch')
device_tap_service = {
"description": 'TAP1', "name": tap_service_name,
"port_id": self.ports[port_index]['id'],
"tenant_id": self.ports[0]['tenant_id']
}
return fip, device_tap_service
def _create_tap_flow_env(self, tap_service_id, tap_flow_direction,
src_port_index):
# Creates device_tap_flow dict for tap flow environment
tap_flow_name = data_utils.rand_name('tapflow-ch')
device_tap_flow = {
"description": 'tapflow1', "direction": tap_flow_direction,
"name": tap_flow_name, "source_port": self.ports[src_port_index][
'id'], "tap_service_id": tap_service_id,
"tenant_id": self.ports[0]['tenant_id']
}
return device_tap_flow
def _resource_cleanup(self, fip, tapservice_id, tapflow_id):
# Cleans Tap Service and Tap Flow resources after each test
if fip != 'null':
self.addCleanup(self.floating_ips_client.delete_floatingip,
fip['id'])
if tapflow_id != 'null':
self.tclient.delete_tap_flow(tapflow_id)
if tapservice_id != 'null':
self.tclient.delete_tap_service(tapservice_id)
@test.attr(type='nsxv3')
@decorators.idempotent_id('dc5c1ec6-8c18-11e6-ae22-56b6b6499611')
def test_create_tap_service(self):
"""
Tap service create api is tested , Tap Service is created with
destination port associated with floating ip
"""
LOG.info(_LI(
"Testing Tap Service Create api with floating ip associated to "
"destination port"))
# Create tap service env
fip, device_tap_service = self._create_tap_service_env(port_index=0)
# Create Tap Service
rsp = self.tclient.create_tap_service(**device_tap_service)
LOG.info(_LI("response from tap serive create : %(rsp)s") %
{"rsp": rsp})
self.assertEqual('201',
rsp.response["status"],
"Response code is not 201 ")
self.assertIn(self.ports[0]['id'], rsp['tap_service']['port_id'])
self._resource_cleanup(fip, rsp['tap_service']['id'], 'null')
@test.attr(type='nsxv3')
@decorators.idempotent_id('dc5c21f0-8c18-11e6-ae22-56b6b6499611')
def test_list_tap_service(self):
"""
Tap Service List api is tested
"""
LOG.info(_LI(
"Testing Tap Service List api with floating ip associated "
"to destination port "))
# Create tap service env
fip, device_tap_service = self._create_tap_service_env(port_index=0)
# Create Tap Service
rsp_create = self.tclient.create_tap_service(**device_tap_service)
# List Tap Service
rsp_list = self.tclient.list_tap_service()
LOG.info(_LI("response from tap list : %(rsp)s") % {"rsp": rsp_list})
self.assertEqual('200',
rsp_list.response["status"],
"Response code is not 200 ")
self.assertIn(device_tap_service['name'],
rsp_list['tap_services'][0]['name'])
self.assertIn(self.ports[0]['id'], rsp_list[
'tap_services'][0]['port_id'])
self._resource_cleanup(fip, rsp_create['tap_service']['id'], 'null')
@test.attr(type='nsxv3')
@decorators.idempotent_id('dc5c2394-8c18-11e6-ae22-56b6b6499611')
def test_show_tap_service(self):
"""
Tap Service Show api is tested
"""
LOG.info(_LI(
"Testing Tap Service Show api with floating ip associated to "
"destination port "))
# Create tap service env
fip, device_tap_service = self._create_tap_service_env(port_index=0)
# Create Tap Service
rsp_create = self.tclient.create_tap_service(**device_tap_service)
# Show Tap Service
rsp_show = self.tclient.show_tap_service(
rsp_create['tap_service']['id'])
LOG.info(_LI("response from tap list : %(rsp)s") % {"rsp": rsp_show})
self.assertEqual('200',
rsp_show.response["status"],
"Response code is not 200 ")
self.assertIn(device_tap_service['name'],
rsp_show['tap_service']['name'])
self.assertIn(self.ports[0]['id'],
rsp_show['tap_service']['port_id'])
self._resource_cleanup(fip, rsp_create['tap_service']['id'], 'null')
@test.attr(type='nsxv3')
@decorators.idempotent_id('dc5c2538-8c18-11e6-ae22-56b6b6499611')
def test_delete_tap_service(self):
"""
Tap Service Delete api is tested
"""
LOG.info(
_LI(
"Testing Tap delete api with floating ip associated "
"to destination port "))
# Create tap service env
fip, device_tap_service = self._create_tap_service_env(port_index=0)
# Create Tap Service
rsp_create = self.tclient.create_tap_service(**device_tap_service)
LOG.info(_LI("response from tap list : %(rsp)s") % {"rsp": rsp_create})
self.assertEqual('201',
rsp_create.response["status"],
"Response code is not 201 ")
# Delete Tap Service
rsp_delete = self.tclient.delete_tap_service(
rsp_create['tap_service']['id'])
self.assertEqual('204',
rsp_delete.response["status"],
"Response code is not 204 ")
rsp_list = self.tclient.list_tap_service()
LOG.info(_LI("response from tap list : %(rsp)s") % {"rsp": rsp_list})
self._resource_cleanup(fip, 'null', 'null')
@test.attr(type='nsxv3')
@decorators.idempotent_id('dc5c2a7e-8c18-11e6-ae22-56b6b6499611')
def test_create_tap_flow(self):
"""
Tap flow create api is tested , Tap Service is created with
destination port associated with floating ip
"""
LOG.info(_LI(
"Testing Tap flow create api with direction BOTH and "
"floating ip associated to destination port "))
# Create tap service env
fip, device_tap_service = self._create_tap_service_env(port_index=0)
# Create Tap Service
rsp_tap_service = self.tclient.create_tap_service(**device_tap_service)
LOG.info(_LI("response from tap service : %(rsp)s") %
{"rsp": rsp_tap_service})
self.assertEqual('201',
rsp_tap_service.response["status"],
"Response code is not 201 ")
# Create Tap Flow env
device_tap_flow = self._create_tap_flow_env(
tap_service_id=rsp_tap_service['tap_service']['id'],
tap_flow_direction='BOTH', src_port_index=1)
# Create Tap flow
rsp_tap_flow = self.tclient.create_tap_flow(**device_tap_flow)
LOG.info(_LI("response from flow : %(rsp)s") % {"rsp": rsp_tap_flow})
# NSX backend check for Switch Profile
nsx_switch_profiles = self.nsx.get_logical_switch_profiles()
switch_profile = []
for ls in nsx_switch_profiles:
if ls['display_name'] == device_tap_flow['name']:
self.assertIn(ls['direction'], 'BIDIRECTIONAL')
self.assertIn(ls['destinations'][0],
fip['floating_ip_address'])
switch_profile = [ls]
self.assertNotEqual(len(switch_profile), 0, "Port mirror profile is "
"not found in NSX ")
self.assertEqual('201',
rsp_tap_flow.response["status"],
"Response code is not 201 ")
self.assertIn(self.ports[1]['id'], rsp_tap_flow[
'tap_flow']['source_port'])
self.assertEqual(device_tap_flow['name'], rsp_tap_flow['tap_flow'][
'name'])
self.assertEqual(device_tap_flow['direction'], rsp_tap_flow[
'tap_flow']['direction'])
self.assertEqual(rsp_tap_service['tap_service'][
'id'], rsp_tap_flow['tap_flow']['tap_service_id'])
self._resource_cleanup(fip, rsp_tap_service['tap_service'][
'id'], rsp_tap_flow['tap_flow']['id'])
@test.attr(type='nsxv3')
@decorators.idempotent_id('dc5c2c5e-8c18-11e6-ae22-56b6b6499611')
def test_create_tap_flow_multiple(self):
"""
Multiple Tap flow's are created in with 'IN' and 'OUT' , Tap Service
is created with destination port associated with floating ip's for
each Tap Flow
"""
LOG.info(_LI(
"Multiple Tap flow's created with direction IN and OUT and"
"floating ip associated to destination port "))
# Create tap service env
fip1, device_tap_service1 = self._create_tap_service_env(port_index=0)
# Create tap service env
fip2, device_tap_service2 = self._create_tap_service_env(port_index=1)
# Create Tap Service 1 and Tap Service 2
rsp_tap_service1 = self.tclient.create_tap_service(
**device_tap_service1)
rsp_tap_service2 = self.tclient.create_tap_service(
**device_tap_service2)
LOG.info(_LI(
"response from tap service1 and tap service2 : %(rsp1)s "
"%(rsp2)s ") % {
"rsp1": rsp_tap_service1, "rsp2": rsp_tap_service2
})
# Create Tap Flow env
device_tap_flow1 = self._create_tap_flow_env(
tap_service_id=rsp_tap_service1['tap_service']['id'],
tap_flow_direction='IN', src_port_index=2)
# Create Tap Flow env
device_tap_flow2 = self._create_tap_flow_env(
tap_service_id=rsp_tap_service2['tap_service']['id'],
tap_flow_direction='OUT', src_port_index=3)
# Create Tap Flow1 and Tap Flow2
rsp_tap_flow1 = self.tclient.create_tap_flow(**device_tap_flow1)
rsp_tap_flow2 = self.tclient.create_tap_flow(**device_tap_flow2)
LOG.info(_LI(
"response from tap flow1 and tap flow2 : %(rsp1)s %(rsp2)s ") % {
"rsp1": rsp_tap_flow1,
"rsp2": rsp_tap_flow2
})
# NSX backend check for Switch Profile
nsx_switch_profiles = self.nsx.get_logical_switch_profiles()
profile_count = 0
for ls in nsx_switch_profiles:
if ls['display_name'].startswith('tapflow-ch-'):
if ls['direction'] == 'INGRESS' or 'EGRESS':
profile_count += 1
self.assertEqual(profile_count, 2, "Port mirror profile is "
"not found in NSX ")
self.assertEqual(device_tap_flow1['name'], rsp_tap_flow1['tap_flow'][
'name'])
self.assertEqual(device_tap_flow2['name'], rsp_tap_flow2['tap_flow'][
'name'])
self.assertEqual(device_tap_flow1['direction'], rsp_tap_flow1[
'tap_flow']['direction'])
self.assertEqual(device_tap_flow2['direction'], rsp_tap_flow2[
'tap_flow']['direction'])
self._resource_cleanup(fip1, rsp_tap_service1['tap_service'][
'id'], rsp_tap_flow1['tap_flow']['id'])
self._resource_cleanup(fip2, rsp_tap_service2['tap_service'][
'id'], rsp_tap_flow2['tap_flow']['id'])
@test.attr(type='nsxv3')
@decorators.idempotent_id('dc5c2dda-8c18-11e6-ae22-56b6b6499611')
def test_list_tap_flow(self):
"""
Tap flow list api is tested , Tap Service is created with
destination port associated with floating ip
"""
LOG.info(
_LI(
"Testing Tap Flow list api with floating ip associated to "
"destination port "))
# Create tap service env
fip, device_tap_service = self._create_tap_service_env(port_index=0)
# Create Tap Service
rsp_tap_service = self.tclient.create_tap_service(**device_tap_service)
LOG.info(_LI("response from tap service : %(rsp)s") %
{"rsp": rsp_tap_service})
self.assertEqual('201',
rsp_tap_service.response["status"],
"Response code is not 201 ")
# Create Tap Flow env
device_tap_flow = self._create_tap_flow_env(
tap_service_id=rsp_tap_service['tap_service']['id'],
tap_flow_direction='BOTH', src_port_index=1)
# Create Tap flow
rsp_tap_flow = self.tclient.create_tap_flow(**device_tap_flow)
LOG.info(_LI("response from flow : %(rsp)s") % {"rsp": rsp_tap_flow})
self.assertEqual('201',
rsp_tap_flow.response["status"],
"Response code is not 201 ")
self.assertIn(self.ports[1]['id'], rsp_tap_flow[
'tap_flow']['source_port'])
self.assertEqual(device_tap_flow['name'], rsp_tap_flow['tap_flow'][
'name'])
self.assertEqual(device_tap_flow['direction'], rsp_tap_flow[
'tap_flow']['direction'])
self.assertEqual(rsp_tap_service['tap_service'][
'id'], rsp_tap_flow['tap_flow']['tap_service_id'])
# List Tap Flow
rsp_tap_list_flow = self.tclient.list_tap_flow()
LOG.info(_LI("response from tap list : %(rsp)s") %
{"rsp": rsp_tap_list_flow})
self.assertEqual('200',
rsp_tap_list_flow.response["status"],
"Response code is not 200 ")
self.assertIn(device_tap_flow['name'], rsp_tap_list_flow[
'tap_flows'][0][
'name'])
self.assertIn(self.ports[1]['id'], rsp_tap_list_flow[
'tap_flows'][0]['source_port'])
self._resource_cleanup(fip, rsp_tap_service['tap_service'][
'id'], rsp_tap_flow['tap_flow']['id'])
@test.attr(type='nsxv3')
@decorators.idempotent_id('dc5c2f6a-8c18-11e6-ae22-56b6b6499611')
def test_show_tap_flow(self):
"""
Tap flow show api is tested , Tap Service is created with
destination port associated with floating ip
"""
LOG.info(_LI(
"Testing Tap Service Show api with floating ip associated "
"to destination port "))
# Create tap service env
fip, device_tap_service = self._create_tap_service_env(port_index=0)
# Create Tap Service
rsp_tap_service = self.tclient.create_tap_service(**device_tap_service)
LOG.info(_LI("response from tap service : %(rsp)s") %
{"rsp": rsp_tap_service})
self.assertEqual('201',
rsp_tap_service.response["status"],
"Response code is not 201 ")
# Create Tap Flow env
device_tap_flow = self._create_tap_flow_env(
tap_service_id=rsp_tap_service['tap_service']['id'],
tap_flow_direction='BOTH', src_port_index=1)
# Create Tap flow
rsp_tap_flow = self.tclient.create_tap_flow(**device_tap_flow)
# Show Tap Flow
rsp_tap_flow_show = self.tclient.show_tap_flow(
rsp_tap_flow['tap_flow']['id'])
LOG.info(_LI("response from tap list : %(rsp)s") %
{"rsp": rsp_tap_flow_show})
self.assertEqual('200',
rsp_tap_flow_show.response["status"],
"Response code is not 200 ")
self.assertIn(device_tap_flow['name'], rsp_tap_flow_show['tap_flow'][
'name'])
self.assertIn(self.ports[1]['id'], rsp_tap_flow_show[
'tap_flow']['source_port'])
self._resource_cleanup(fip, rsp_tap_service['tap_service'][
'id'], rsp_tap_flow['tap_flow']['id'])
@test.attr(type='nsxv3')
@decorators.idempotent_id('dc5c310e-8c18-11e6-ae22-56b6b6499611')
def test_delete_tap_flow(self):
"""
Tap flow delete api is tested , Tap Service is created with
destination port associated with floating ip
"""
LOG.info(_LI(
"Testing Tap flow delete api with floating ip associated to "
"destination port "))
# Create tap service env
fip, device_tap_service = self._create_tap_service_env(port_index=0)
# Create Tap Service
rsp_tap_service = self.tclient.create_tap_service(**device_tap_service)
LOG.info(_LI("response from tap service : %(rsp)s") %
{"rsp": rsp_tap_service})
self.assertEqual('201',
rsp_tap_service.response["status"],
"Response code is not 201 ")
# Create Tap Flow env
device_tap_flow = self._create_tap_flow_env(
tap_service_id=rsp_tap_service['tap_service']['id'],
tap_flow_direction='BOTH', src_port_index=1)
# Create Tap flow
rsp_tap_flow = self.tclient.create_tap_flow(**device_tap_flow)
LOG.info(_LI("response from flow : %(rsp)s") % {"rsp": rsp_tap_flow})
self.assertEqual('201',
rsp_tap_flow.response["status"],
"Response code is not 201 ")
self.assertIn(self.ports[1]['id'], rsp_tap_flow[
'tap_flow']['source_port'])
self.assertEqual(device_tap_flow['name'], rsp_tap_flow['tap_flow'][
'name'])
self.assertEqual(device_tap_flow['direction'], rsp_tap_flow[
'tap_flow']['direction'])
self.assertEqual(rsp_tap_service['tap_service'][
'id'], rsp_tap_flow['tap_flow']['tap_service_id'])
rsp_tap_flow_delete = self.tclient.delete_tap_flow(
rsp_tap_flow['tap_flow']['id'])
LOG.info(_LI("response from tap list : %(rsp)s") %
{"rsp": rsp_tap_flow_delete})
self.assertEqual('204',
rsp_tap_flow_delete.response["status"],
"Response code is not 204 ")
rsp_tap_list_flow = self.tclient.list_tap_flow()
LOG.info(_LI("response from tap list : %(rsp)s") %
{"rsp": rsp_tap_list_flow})
self._resource_cleanup(fip, rsp_tap_service[
'tap_service']['id'], 'null')
@test.attr(type='nsxv3')
@decorators.idempotent_id('dc5c328a-8c18-11e6-ae22-56b6b6499611')
def test_create_tap_flow_negative_nofloatingip(self):
"""
Tap flow create api is tested , Tap Service is created with
destination port associated to non floating ip
"""
LOG.info(_LI(
"Testing Tap flow create api with non floating ip "
"associated to destination port "))
tap_service_name = data_utils.rand_name('tapservice-ch')
device_tap_service = {
"description": 'tapservice1', "name": tap_service_name,
"port_id": self.ports[0]['id'],
"tenant_id": self.ports[0]['tenant_id']
}
# Create Tap Service
rsp_tap_service = self.tclient.create_tap_service(**device_tap_service)
LOG.info(_LI("response from tap service : %(rsp)s") %
{"rsp": rsp_tap_service})
self.assertEqual('201',
rsp_tap_service.response["status"],
"Response code is not 201 ")
tap_flow_name = data_utils.rand_name('tapflow-ch')
device_tap_flow = {
"description": 'tapflow1', "direction": "BOTH",
"name": tap_flow_name, "source_port": self.ports[1]['id'],
"tap_service_id": rsp_tap_service['tap_service']['id'],
"tenant_id": self.ports[0]['tenant_id']
}
# Create Tap Flow with a non floating ip associated to destination port
try:
self.tclient.create_tap_flow(**device_tap_flow)
except Exception as e:
LOG.info(_LI("response from exception %(rsp)s") % {"rsp": e})
self._resource_cleanup('null', rsp_tap_service[
'tap_service']['id'], 'null')
@test.attr(type='nsxv3')
@decorators.idempotent_id('dc5c3802-8c18-11e6-ae22-56b6b6499611')
def test_create_tap_flow_negative_nosrcport(self):
"""
Tap flow create api is tested with non existent src port
"""
LOG.info(
_LI("Testing Tap flow create api with non existent src port "))
tap_service_name = data_utils.rand_name('tapservice-ch')
device_tap_service = {
"description": 'tapservice1', "name": tap_service_name,
"port_id": self.ports[0]['id'],
"tenant_id": self.ports[0]['tenant_id']
}
# Create Tap Service
rsp_tap_service = self.tclient.create_tap_service(**device_tap_service)
LOG.info(_LI("response from tap service : %(rsp)s") %
{"rsp": rsp_tap_service})
self.assertEqual('201',
rsp_tap_service.response["status"],
"Response code is not 201 ")
tap_flow_name = data_utils.rand_name('tapflow-ch')
device_tap_flow = {
"description": 'tapflow1', "direction": "BOTH",
"name": tap_flow_name,
"source_port": '2ad76061-252e-xxxx-9d0f-dd94188be9cc',
"tap_service_id": rsp_tap_service['tap_service']['id'],
"tenant_id": self.ports[0]['tenant_id']
}
# Create Tap Flow with a dummy non existent source port
try:
self.tclient.create_tap_flow(**device_tap_flow)
except Exception as e:
LOG.info(_LI("response from exception %(rsp)s") % {"rsp": e})
self._resource_cleanup('null', rsp_tap_service[
'tap_service']['id'], 'null')