diff --git a/tox.ini b/tox.ini index e8d0ae61c3..9bfeb95b0e 100644 --- a/tox.ini +++ b/tox.ini @@ -17,6 +17,7 @@ deps = -r{toxinidir}/requirements.txt git+https://git.openstack.org/openstack/neutron.git@master#egg=neutron git+https://git.openstack.org/openstack/networking-l2gw.git@master#egg=networking-l2gw git+https://git.openstack.org/openstack/neutron-lbaas.git@master#egg=neutron-lbaas + git+https://git.openstack.org/openstack/tap-as-a-service.git@master#egg=tap-as-a-service whitelist_externals = sh commands = {toxinidir}/tools/ostestr_compat_shim.sh {posargs} diff --git a/vmware_nsx/common/exceptions.py b/vmware_nsx/common/exceptions.py index bac918f0c7..cab6cb528d 100644 --- a/vmware_nsx/common/exceptions.py +++ b/vmware_nsx/common/exceptions.py @@ -180,3 +180,11 @@ class NsxQosPolicyMappingNotFound(n_exc.NotFound): class NumberOfNsgroupCriteriaTagsReached(NsxPluginException): message = _("Port can be associated with at most %(max_num)s " "security-groups.") + + +class NsxTaaSDriverException(NsxPluginException): + message = _("Tap-as-a-Service NSX driver exception: %(msg)s.") + + +class NsxPortMirrorSessionMappingNotFound(n_exc.NotFound): + message = _("Unable to find mapping for Tap Flow: %(tf)s") diff --git a/vmware_nsx/db/db.py b/vmware_nsx/db/db.py index 4009b3ef75..9f42d8ff28 100644 --- a/vmware_nsx/db/db.py +++ b/vmware_nsx/db/db.py @@ -322,3 +322,26 @@ def get_switch_profile_by_qos_policy(session, qos_policy_id): return entry.switch_profile_id except exc.NoResultFound: raise nsx_exc.NsxQosPolicyMappingNotFound(policy=qos_policy_id) + + +# NSXv3 Port Mirror Sessions DB methods. +def add_port_mirror_session_mapping(session, tf_id, pm_session_id): + with session.begin(subtransactions=True): + mapping = nsx_models.NsxPortMirrorSessionMapping( + tap_flow_id=tf_id, + port_mirror_session_id=pm_session_id) + session.add(mapping) + return mapping + + +def get_port_mirror_session_mapping(session, tf_id): + try: + return (session.query(nsx_models.NsxPortMirrorSessionMapping). + filter_by(tap_flow_id=tf_id).one()) + except exc.NoResultFound: + raise nsx_exc.NsxPortMirrorSessionMappingNotFound(tf=tf_id) + + +def delete_port_mirror_session_mapping(session, tf_id): + return (session.query(nsx_models.NsxPortMirrorSessionMapping). + filter_by(tap_flow_id=tf_id).delete()) diff --git a/vmware_nsx/db/migration/alembic_migrations/versions/EXPAND_HEAD b/vmware_nsx/db/migration/alembic_migrations/versions/EXPAND_HEAD index f4dd4dc895..3a21a5db3a 100644 --- a/vmware_nsx/db/migration/alembic_migrations/versions/EXPAND_HEAD +++ b/vmware_nsx/db/migration/alembic_migrations/versions/EXPAND_HEAD @@ -1 +1 @@ -86a55205337c +633514d94b93 diff --git a/vmware_nsx/db/migration/alembic_migrations/versions/newton/expand/633514d94b93_add_support_for_taas.py b/vmware_nsx/db/migration/alembic_migrations/versions/newton/expand/633514d94b93_add_support_for_taas.py new file mode 100644 index 0000000000..90529d4b31 --- /dev/null +++ b/vmware_nsx/db/migration/alembic_migrations/versions/newton/expand/633514d94b93_add_support_for_taas.py @@ -0,0 +1,37 @@ +# Copyright 2016 VMware, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Add support for TaaS + +Revision ID: 633514d94b93 +Revises: 86a55205337c +Create Date: 2016-05-09 14:11:31.940021 + +""" + +revision = '633514d94b93' +down_revision = '86a55205337c' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table( + 'nsx_port_mirror_session_mappings', + sa.Column('tap_flow_id', sa.String(length=36), nullable=False), + sa.Column('port_mirror_session_id', sa.String(length=36), + nullable=False), + sa.PrimaryKeyConstraint('tap_flow_id'), + ) diff --git a/vmware_nsx/db/nsx_models.py b/vmware_nsx/db/nsx_models.py index f10df207c5..6ea5099781 100644 --- a/vmware_nsx/db/nsx_models.py +++ b/vmware_nsx/db/nsx_models.py @@ -359,3 +359,12 @@ class QosPolicySwitchProfile(model_base.BASEV2, models.TimestampMixin): ondelete='CASCADE'), primary_key=True) switch_profile_id = sa.Column(sa.String(36), nullable=False) + + +class NsxPortMirrorSessionMapping(model_base.BASEV2): + """Define a mapping between Tap Flow and PortMirrorSession object.""" + __tablename__ = 'nsx_port_mirror_session_mappings' + tap_flow_id = sa.Column(sa.String(36), + nullable=False, + primary_key=True) + port_mirror_session_id = sa.Column(sa.String(36), nullable=False) diff --git a/vmware_nsx/nsxlib/v3/__init__.py b/vmware_nsx/nsxlib/v3/__init__.py index 0453dbfc26..dbda36fd02 100644 --- a/vmware_nsx/nsxlib/v3/__init__.py +++ b/vmware_nsx/nsxlib/v3/__init__.py @@ -363,3 +363,38 @@ def get_bridge_cluster_id_by_name_or_id(name_or_id): """ return _get_resource_by_name_or_id(name_or_id, 'bridge-clusters') + + +def create_port_mirror_session(source_ports, dest_ports, direction, + description, name, tags): + """Create a PortMirror Session on the backend. + + :param source_ports: List of UUIDs of the ports whose traffic is to be + mirrored. + :param dest_ports: List of UUIDs of the ports where the mirrored traffic is + to be sent. + :param direction: String representing the direction of traffic to be + mirrored. [INGRESS, EGRESS, BIDIRECTIONAL] + :param description: String representing the description of the session. + :param name: String representing the name of the session. + :param tags: nsx backend specific tags. + """ + + resource = 'mirror-sessions' + body = {'direction': direction, + 'tags': tags, + 'display_name': name, + 'description': description, + 'mirror_sources': source_ports, + 'mirror_destination': dest_ports} + return client.create_resource(resource, body) + + +def delete_port_mirror_session(mirror_session_id): + """Delete a PortMirror session on the backend. + + :param mirror_session_id: string representing the UUID of the port mirror + session to be deleted. + """ + resource = 'mirror-sessions/%s' % mirror_session_id + client.delete_resource(resource) diff --git a/vmware_nsx/services/neutron_taas/README.rst b/vmware_nsx/services/neutron_taas/README.rst new file mode 100644 index 0000000000..998a94f4ce --- /dev/null +++ b/vmware_nsx/services/neutron_taas/README.rst @@ -0,0 +1,15 @@ +================================================= + Enabling NSX Tap-as-a-Service Plugin in DevStack +================================================= + +1. Download DevStack + +2. Add tap-as-a-service repo as an external repository and configure following flags in ``local.conf``:: + + [[local]|[localrc]] + # TaaS plugin NSXv3 driver config + enable_plugin tap-as-a-service https://github.com/openstack/tap-as-a-service + enable_service taas + Q_PLUGIN_EXTRA_CONF_PATH=/etc/neutron + Q_PLUGIN_EXTRA_CONF_FILES=(taas_plugin.ini) + TAAS_SERVICE_DRIVER=TAAS:vmware_nsx_taas:vmware_nsx.services.neutron_taas.nsx_v3.driver.NsxV3Driver:default diff --git a/vmware_nsx/services/neutron_taas/__init__.py b/vmware_nsx/services/neutron_taas/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/vmware_nsx/services/neutron_taas/nsx_v3/__init__.py b/vmware_nsx/services/neutron_taas/nsx_v3/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/vmware_nsx/services/neutron_taas/nsx_v3/driver.py b/vmware_nsx/services/neutron_taas/nsx_v3/driver.py new file mode 100644 index 0000000000..8b85553f95 --- /dev/null +++ b/vmware_nsx/services/neutron_taas/nsx_v3/driver.py @@ -0,0 +1,182 @@ +# Copyright 2016 VMware, Inc. +# +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron_taas.db import taas_db +from neutron_taas.services.taas import service_drivers as base_driver + +from oslo_db import exception as db_exc +from oslo_log import log as logging +from oslo_utils import excutils + +from vmware_nsx._i18n import _, _LE +from vmware_nsx.common import exceptions as nsx_exc +from vmware_nsx.common import utils as nsx_utils +from vmware_nsx.db import db as nsx_db +from vmware_nsx.nsxlib import v3 as nsxlib + +LOG = logging.getLogger(__name__) + + +class NsxV3Driver(base_driver.TaasBaseDriver, + taas_db.Taas_db_Mixin): + + """Class to handle API calls for Port Mirroring and NSXv3 backend.""" + + def __init__(self, service_plugin): + LOG.debug("Loading TaaS NsxV3Driver.") + super(NsxV3Driver, self).__init__(service_plugin) + + def _validate_tap_flow(self, source_port, dest_port): + # Verify whether the source port and monitored port belong to the + # same network. + if source_port['network_id'] != dest_port['network_id']: + msg = (_("Destination port %(dest)s and source port %(src)s " + "should be on the same network") % + {'dest': dest_port['id'], 'src': source_port['id']}) + raise nsx_exc.NsxTaaSDriverException(msg=msg) + # Verify whether the source port is not same as the destination port + if source_port['id'] == dest_port['id']: + msg = (_("Destination port %(dest)s is same as source port " + "%(src)s") % {'dest': dest_port['id'], + 'src': source_port['id']}) + raise nsx_exc.NsxTaaSDriverException(msg=msg) + + def create_tap_service_precommit(self, context): + pass + + def create_tap_service_postcommit(self, context): + pass + + def delete_tap_service_precommit(self, context): + pass + + def delete_tap_service_postcommit(self, context): + pass + + def create_tap_flow_precommit(self, context): + """Validate and create database entries for creation of tap flow.""" + tf = context.tap_flow + # Retrieve source port details. + source_port = self._get_port_details( + context._plugin_context, tf.get('source_port')) + # Retrieve tap service and destination port details. + ts = self._get_tap_service( + context._plugin_context, tf.get('tap_service_id')) + dest_port = self._get_port_details( + context._plugin_context, ts.get('port_id')) + self._validate_tap_flow(source_port, dest_port) + + def _convert_to_backend_direction(self, direction): + nsx_direction = None + if direction == 'BOTH': + nsx_direction = 'BIDIRECTIONAL' + elif direction == 'IN': + nsx_direction = 'INGRESS' + elif direction == 'OUT': + nsx_direction = 'EGRESS' + return nsx_direction + + def _convert_to_backend_source_port(self, session, port_id): + nsx_port_id = nsx_db.get_nsx_switch_and_port_id(session, port_id)[1] + return [{"resource_type": "LogicalPortMirrorSource", + "port_ids": [nsx_port_id]}] + + def _convert_to_backend_dest_port(self, session, port_id): + nsx_port_id = nsx_db.get_nsx_switch_and_port_id(session, port_id)[1] + return {"resource_type": "LogicalPortMirrorDestination", + "port_ids": [nsx_port_id]} + + def create_tap_flow_postcommit(self, context): + """Create tap flow and port mirror session on NSX backend.""" + tf = context.tap_flow + # Retrieve tap service. + ts = self._get_tap_service(context._plugin_context, + tf.get('tap_service_id')) + tags = nsx_utils.build_v3_tags_payload( + tf, resource_type='os-neutron-mirror-id', + project_name=context._plugin_context.tenant_name) + nsx_direction = self._convert_to_backend_direction( + tf.get('direction')) + # Backend expects a list of source ports and destination ports. + # Due to TaaS API requirements, we are only able to add one port + # as a source port and one port as a destination port in a single + # request. Hence we send a list of one port for source_ports + # and dest_ports. + nsx_src_ports = self._convert_to_backend_source_port( + context._plugin_context.session, tf.get('source_port')) + nsx_dest_ports = self._convert_to_backend_dest_port( + context._plugin_context.session, ts.get('port_id')) + # Create port mirror session on the backend + try: + pm_session = nsxlib.create_port_mirror_session( + source_ports=nsx_src_ports, + dest_ports=nsx_dest_ports, + direction=nsx_direction, + description=tf.get('description'), + name=tf.get('name'), + tags=tags) + except nsx_exc.ManagerError: + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Unable to create port mirror session %s " + "on NSX backend, rolling back " + "changes on neutron."), tf['id']) + # Create internal mappings between tap flow and port mirror session. + # Ideally DB transactions must take place in precommit, but since we + # rely on the NSX backend to retrieve the port session UUID, we perform + # the create action in postcommit. + try: + nsx_db.add_port_mirror_session_mapping( + session=context._plugin_context.session, + tf_id=tf['id'], + pm_session_id=pm_session['id']) + except db_exc.DBError: + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Unable to create port mirror session db " + "mappings for tap flow %s. Rolling back " + "changes in Neutron."), tf['id']) + nsxlib.delete_port_mirror_session(pm_session['id']) + + def delete_tap_flow_precommit(self, context): + pass + + def delete_tap_flow_postcommit(self, context): + """Delete tap flow and port mirror session on NSX backend.""" + tf = context.tap_flow + # Retrieve port mirroring session mappings. + pm_session_mapping = nsx_db.get_port_mirror_session_mapping( + session=context._plugin_context.session, + tf_id=tf['id']) + # Delete port mirroring session on the backend + try: + nsxlib.delete_port_mirror_session( + pm_session_mapping['port_mirror_session_id']) + except nsx_exc.ManagerError: + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Unable to delete port mirror session %s " + "on NSX backend."), + pm_session_mapping['port_mirror_session_id']) + # Delete internal mappings between tap flow and port mirror session. + # Ideally DB transactions must take place in precommit, but since we + # rely on the DB mapping to retrieve NSX backend UUID for the port + # session mapping, we perform the delete action in postcommit. + try: + nsx_db.delete_port_mirror_session_mapping( + session=context._plugin_context.session, + tf_id=tf['id']) + except db_exc.DBError: + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Unable to delete port mirror session db " + "mappings for tap flow %s"), tf['id']) diff --git a/vmware_nsx/tests/unit/services/neutron_taas/__init__.py b/vmware_nsx/tests/unit/services/neutron_taas/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/vmware_nsx/tests/unit/services/neutron_taas/test_nsxv3_driver.py b/vmware_nsx/tests/unit/services/neutron_taas/test_nsxv3_driver.py new file mode 100644 index 0000000000..bc10221816 --- /dev/null +++ b/vmware_nsx/tests/unit/services/neutron_taas/test_nsxv3_driver.py @@ -0,0 +1,210 @@ +# Copyright (c) 2016 VMware, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import mock + +from neutron import context +from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin + +from neutron_taas.extensions import taas +from neutron_taas.services.taas import taas_plugin +from neutron_taas.tests.unit.db import test_taas_db + +from oslo_utils import importutils +from oslo_utils import uuidutils + +from vmware_nsx.common import exceptions as nsx_exc +from vmware_nsx.services.neutron_taas.nsx_v3 import driver as nsx_v3_driver +from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_nsx_v3_plugin + +_uuid = uuidutils.generate_uuid +NSX_V3_PLUGIN_CLASS = ('vmware_nsx.plugins.nsx_v3.plugin.NsxV3Plugin') + + +class TestNsxV3TaaSDriver(test_taas_db.TaaSDbTestCase, + test_nsx_v3_plugin.NsxV3PluginTestCaseMixin, + test_plugin.NeutronDbPluginV2TestCase): + def setUp(self): + super(TestNsxV3TaaSDriver, self).setUp() + self.driver = nsx_v3_driver.NsxV3Driver(mock.MagicMock()) + mock.patch('neutron.services.service_base.load_drivers', + return_value=({'dummyprovider': self.driver}, + 'dummyprovider')).start() + mock.patch('neutron.db.servicetype_db.ServiceTypeManager.get_instance', + return_value=mock.MagicMock()).start() + self.taas_plugin = taas_plugin.TaasPlugin() + self.core_plugin = importutils.import_object(NSX_V3_PLUGIN_CLASS) + self.ctx = context.get_admin_context() + + def test_validate_tap_flow_same_network_same_port_fail(self): + with self.port() as src_port: + self.assertRaises(nsx_exc.NsxTaaSDriverException, + self.driver._validate_tap_flow, + src_port['port'], src_port['port']) + + def test_validate_tap_flow_different_network_different_port_fail(self): + with self.port() as src_port, self.port() as dest_port: + self.assertRaises(nsx_exc.NsxTaaSDriverException, + self.driver._validate_tap_flow, + src_port['port'], + dest_port['port']) + + def test_validate_tap_flow_same_network_different_port(self): + with self.network() as network: + with self.subnet(network=network) as subnet: + with self.port(subnet=subnet) as src_port: + with self.port(subnet=subnet) as dest_port: + result = self.driver._validate_tap_flow( + src_port['port'], + dest_port['port']) + # result = None signifies that the method returned + # with no exceptions. + self.assertIsNone(result) + + def test_convert_to_backend_direction(self): + direction = 'BOTH' + nsx_direction = self.driver._convert_to_backend_direction(direction) + self.assertEqual('BIDIRECTIONAL', nsx_direction) + + direction = 'IN' + nsx_direction = self.driver._convert_to_backend_direction(direction) + self.assertEqual('INGRESS', nsx_direction) + + direction = 'OUT' + nsx_direction = self.driver._convert_to_backend_direction(direction) + self.assertEqual('EGRESS', nsx_direction) + + def test_convert_to_backend_source_port(self): + nsx_id = _uuid() + with mock.patch('vmware_nsx.db.db.get_nsx_switch_and_port_id', + return_value=(_uuid(), nsx_id)): + result = self.driver._convert_to_backend_source_port( + self.ctx.session, _uuid()) + self.assertEqual(1, len(result)) + self.assertEqual('LogicalPortMirrorSource', + result[0].get('resource_type')) + self.assertEqual(1, len(result[0].get('port_ids'))) + self.assertEqual(nsx_id, result[0].get('port_ids')[0]) + + def test_convert_to_backend_dest_port(self): + nsx_id = _uuid() + with mock.patch('vmware_nsx.db.db.get_nsx_switch_and_port_id', + return_value=(_uuid(), nsx_id)): + result = self.driver._convert_to_backend_dest_port( + self.ctx.session, _uuid()) + self.assertEqual('LogicalPortMirrorDestination', + result.get('resource_type')) + self.assertEqual(1, len(result.get('port_ids'))) + self.assertEqual(nsx_id, result.get('port_ids')[0]) + + def test_create_tap_service(self): + ts_name = 'test-tap-service' + with self.port(tenant_id=self.tenant_id) as port: + ts_data = self._get_tap_service_data( + name=ts_name, port_id=port['port']['id']) + ts = self.taas_plugin.create_tap_service(self.ctx, ts_data) + self.assertIsNotNone(ts) + self.assertEqual(ts_name, ts['name']) + self.assertEqual(port['port']['id'], ts['port_id']) + + def test_delete_tap_service(self): + with self.port(tenant_id=self.tenant_id) as port: + ts_data = self._get_tap_service_data(port_id=port['port']['id']) + ts = self.taas_plugin.create_tap_service(self.ctx, ts_data) + self.assertIsNotNone(ts) + self.taas_plugin.delete_tap_service(self.ctx, ts['id']) + self.assertRaises(taas.TapServiceNotFound, + self._get_tap_service, ts['id']) + + def test_create_tap_flow(self): + tf_name = 'test-tap-flow' + with self.network() as network: + with self.subnet(network=network) as subnet: + with self.port(tenant_id=self.tenant_id, + subnet=subnet) as dest_port: + with self.port(tenant_id=self.tenant_id, + subnet=subnet) as src_port: + ts_data = self._get_tap_service_data( + port_id=dest_port['port']['id']) + ts = self.taas_plugin.create_tap_service( + self.ctx, ts_data) + tf_data = self._get_tap_flow_data( + tap_service_id=ts['id'], + source_port=src_port['port']['id'], + name=tf_name) + tf = self.taas_plugin.create_tap_flow(self.ctx, + tf_data) + self.assertIsNotNone(tf) + self.assertEqual(tf_name, tf['name']) + self.assertEqual(src_port['port']['id'], + tf['source_port']) + + def test_create_tap_flow_same_network_same_port_fail(self): + tf_name = 'test-tap-flow' + with self.network() as network: + with self.subnet(network=network) as subnet: + with self.port(tenant_id=self.tenant_id, + subnet=subnet) as port: + ts_data = self._get_tap_service_data( + port_id=port['port']['id']) + ts = self.taas_plugin.create_tap_service( + self.ctx, ts_data) + tf_data = self._get_tap_flow_data( + tap_service_id=ts['id'], + source_port=port['port']['id'], + name=tf_name) + self.assertRaises(nsx_exc.NsxTaaSDriverException, + self.taas_plugin.create_tap_flow, + self.ctx, + tf_data) + + def test_create_tap_flow_different_network_different_port_fail(self): + tf_name = 'test-tap-flow' + with self.port(tenant_id=self.tenant_id) as src_port: + with self.port(tenant_id=self.tenant_id) as dest_port: + ts_data = self._get_tap_service_data( + port_id=dest_port['port']['id']) + ts = self.taas_plugin.create_tap_service( + self.ctx, ts_data) + tf_data = self._get_tap_flow_data( + tap_service_id=ts['id'], + source_port=src_port['port']['id'], + name=tf_name) + self.assertRaises(nsx_exc.NsxTaaSDriverException, + self.taas_plugin.create_tap_flow, + self.ctx, + tf_data) + + def test_delete_tap_flow(self): + tf_name = 'test-tap-flow' + with self.network() as network: + with self.subnet(network=network) as subnet: + with self.port(tenant_id=self.tenant_id, + subnet=subnet) as dest_port: + with self.port(tenant_id=self.tenant_id, + subnet=subnet) as src_port: + ts_data = self._get_tap_service_data( + port_id=dest_port['port']['id']) + ts = self.taas_plugin.create_tap_service( + self.ctx, ts_data) + tf_data = self._get_tap_flow_data( + tap_service_id=ts['id'], + source_port=src_port['port']['id'], + name=tf_name) + tf = self.taas_plugin.create_tap_flow(self.ctx, + tf_data) + self.assertIsNotNone(tf) + self.taas_plugin.delete_tap_flow(self.ctx, tf['id']) + self.assertRaises(taas.TapFlowNotFound, + self._get_tap_flow, tf['id'])