NSX|V3+P: Levarage the tagging plugin for nsx logical ports

To enable this feature:
1. set oslo_messaging_notifications driver in the neutron.conf
2. set support_nsx_port_tagging to true in the nsx.ini

Use the neutron tagging plugin to set a tag on a neutron port.
The tag should have the format of '<tag_scope>:<tag value>'

Change-Id: I89e24bc7f3eeb363dd05748d77bd683a236934a0
This commit is contained in:
Adit Sarfaty 2019-05-29 16:49:08 +03:00
parent 1464d39925
commit 9b57e3c527
4 changed files with 160 additions and 0 deletions

View File

@ -401,6 +401,12 @@ nsx_v3_and_p = [
cfg.IntOpt('dhcp_lease_time', cfg.IntOpt('dhcp_lease_time',
default=86400, default=86400,
help=_("DHCP default lease time.")), help=_("DHCP default lease time.")),
cfg.BoolOpt('support_nsx_port_tagging',
default=False,
help=_("If true, adding neutron tags to ports will also add "
"tags on the NSX logical ports. This feature requires "
"oslo_messaging_notifications driver to be "
"configured.")),
] ]
nsx_v3_opts = nsx_v3_and_p + [ nsx_v3_opts = nsx_v3_and_p + [

View File

@ -18,6 +18,7 @@ import netaddr
from oslo_config import cfg from oslo_config import cfg
from oslo_db import exception as db_exc from oslo_db import exception as db_exc
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging
from oslo_utils import excutils from oslo_utils import excutils
from sqlalchemy import exc as sql_exc from sqlalchemy import exc as sql_exc
import webob.exc import webob.exc
@ -43,6 +44,7 @@ from neutron.db import portsecurity_db
from neutron.db import securitygroups_db from neutron.db import securitygroups_db
from neutron.db import vlantransparent_db from neutron.db import vlantransparent_db
from neutron.extensions import securitygroup as ext_sg from neutron.extensions import securitygroup as ext_sg
from neutron.extensions import tagging
from neutron_lib.agent import topics from neutron_lib.agent import topics
from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef
from neutron_lib.api.definitions import availability_zone as az_def from neutron_lib.api.definitions import availability_zone as az_def
@ -61,6 +63,7 @@ from neutron_lib import exceptions as n_exc
from neutron_lib.exceptions import allowedaddresspairs as addr_exc from neutron_lib.exceptions import allowedaddresspairs as addr_exc
from neutron_lib.exceptions import l3 as l3_exc from neutron_lib.exceptions import l3 as l3_exc
from neutron_lib.exceptions import port_security as psec_exc from neutron_lib.exceptions import port_security as psec_exc
from neutron_lib.plugins import directory
from neutron_lib.plugins import utils as plugin_utils from neutron_lib.plugins import utils as plugin_utils
from neutron_lib import rpc as n_rpc from neutron_lib import rpc as n_rpc
from neutron_lib.services.qos import constants as qos_consts from neutron_lib.services.qos import constants as qos_consts
@ -182,6 +185,18 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
"""Should be implemented by each plugin""" """Should be implemented by each plugin"""
pass pass
@property
def support_external_port_tagging(self):
# oslo_messaging_notifications must be defined for this to work
if (cfg.CONF.oslo_messaging_notifications.driver and
self._get_conf_attr('support_nsx_port_tagging')):
return True
return False
def update_port_nsx_tags(self, context, port_id, tags, is_delete=False):
"""Can be implemented by each plugin to update the backend port tags"""
pass
def start_rpc_listeners(self): def start_rpc_listeners(self):
if self.start_rpc_listeners_called: if self.start_rpc_listeners_called:
# If called more than once - we should not create it again # If called more than once - we should not create it again
@ -196,8 +211,46 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
fanout=False) fanout=False)
self.start_rpc_listeners_called = True self.start_rpc_listeners_called = True
if self.support_external_port_tagging:
self.start_tagging_rpc_listener()
return self.conn.consume_in_threads() return self.conn.consume_in_threads()
def start_tagging_rpc_listener(self):
# Add listener for tags plugin notifications
transport = oslo_messaging.get_notification_transport(cfg.CONF)
targets = [oslo_messaging.Target(
topic=cfg.CONF.oslo_messaging_notifications.topics[0])]
endpoints = [TagsCallbacks()]
pool = "tags-listeners"
server = oslo_messaging.get_notification_listener(transport, targets,
endpoints, pool=pool)
server.start()
server.wait()
def _translate_external_tag(self, external_tag, port_id):
tag_parts = external_tag.split(':')
if len(tag_parts) != 2:
LOG.warning("Skipping tag %s for port %s: wrong format",
external_tag, port_id)
else:
return {'scope': tag_parts[0][:nsxlib_utils.MAX_RESOURCE_TYPE_LEN],
'tag': tag_parts[1][:nsxlib_utils.MAX_TAG_LEN]}
def _translate_external_tags(self, external_tags, port_id):
new_tags = []
for tag in external_tags:
new_tag = self._translate_external_tag(tag, port_id)
if new_tag:
new_tags.append(new_tag)
return new_tags
def get_external_tags_for_port(self, context, port_id):
tags_plugin = directory.get_plugin(tagging.TAG_PLUGIN_TYPE)
if tags_plugin:
extra_tags = tags_plugin.get_tags(context, 'ports', port_id)
return self._translate_external_tags(extra_tags['tags'], port_id)
def _get_interface_subnet(self, context, interface_info): def _get_interface_subnet(self, context, interface_info):
is_port, is_sub = self._validate_interface_info(interface_info) is_port, is_sub = self._validate_interface_info(interface_info)
@ -2659,3 +2712,28 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
if self.fwaas_callbacks and self.fwaas_callbacks.fwaas_enabled: if self.fwaas_callbacks and self.fwaas_callbacks.fwaas_enabled:
ports = self._get_router_interfaces(context, router.id) ports = self._get_router_interfaces(context, router.id)
return self.fwaas_callbacks.router_with_fwg(context, ports) return self.fwaas_callbacks.router_with_fwg(context, ports)
class TagsCallbacks(object):
target = oslo_messaging.Target(
namespace=None,
version='1.0')
def __init__(self, **kwargs):
super(TagsCallbacks, self).__init__()
def info(self, ctxt, publisher_id, event_type, payload, metadata):
# Make sure we catch only tags operations, and each one only once
# tagging events look like 'tag.create/delete.start/end'
if (event_type.startswith('tag.') and
event_type.endswith('.end')):
action = event_type.split('.')[1]
is_delete = (action == 'delete')
# Currently support only ports tags
if payload.get('parent_resource') == 'ports':
core_plugin = directory.get_plugin()
port_id = payload.get('parent_resource_id')
core_plugin.update_port_nsx_tags(ctxt, port_id,
payload.get('tags'),
is_delete=is_delete)

View File

@ -51,6 +51,7 @@ from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources from neutron_lib.callbacks import resources
from neutron_lib import constants as const from neutron_lib import constants as const
from neutron_lib import context as n_context
from neutron_lib.db import api as db_api from neutron_lib.db import api as db_api
from neutron_lib.db import resource_extend from neutron_lib.db import resource_extend
from neutron_lib.db import utils as db_utils from neutron_lib.db import utils as db_utils
@ -983,6 +984,17 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
tags.append({'scope': security.PORT_SG_SCOPE, tags.append({'scope': security.PORT_SG_SCOPE,
'tag': NSX_P_EXCLUDE_LIST_TAG}) 'tag': NSX_P_EXCLUDE_LIST_TAG})
if self.support_external_port_tagging:
external_tags = self.get_external_tags_for_port(
context, port_data['id'])
if external_tags:
total_len = len(external_tags) + len(tags)
if total_len > nsxlib_utils.MAX_TAGS:
LOG.warning("Cannot add external tags to port %s: "
"too many tags", port_data['id'])
else:
tags.extend(external_tags)
segment_id = self._get_network_nsx_segment_id( segment_id = self._get_network_nsx_segment_id(
context, port_data['network_id']) context, port_data['network_id'])
self.nsxpolicy.segment_port.create_or_overwrite( self.nsxpolicy.segment_port.create_or_overwrite(
@ -2768,3 +2780,36 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
# let the fwaas callbacks update the router FW # let the fwaas callbacks update the router FW
return self.fwaas_callbacks.update_router_firewall( return self.fwaas_callbacks.update_router_firewall(
context, router_id, router_db, ports, called_from_fw=from_fw) context, router_id, router_db, ports, called_from_fw=from_fw)
def update_port_nsx_tags(self, context, port_id, tags, is_delete=False):
"""Update backend NSX segment port with tags from the tagging plugin"""
# Make sure it is a backend port
ctx = n_context.get_admin_context()
port_data = self.get_port(ctx, port_id)
if not self._is_backend_port(ctx, port_data):
LOG.info("Ignoring tags on port %s: this port has no backend "
"NSX logical port", port_id)
return
# Get the current tags on this port
segment_id = self._get_network_nsx_segment_id(
ctx, port_data['network_id'])
lport = self.nsxpolicy.segment_port.get(segment_id, port_id)
port_tags = lport.get('tags')
orig_len = len(port_tags)
# Update and validate the list of tags
extra_tags = self._translate_external_tags(tags, port_id)
if is_delete:
port_tags = [tag for tag in port_tags if tag not in extra_tags]
else:
port_tags.extend(
[tag for tag in extra_tags if tag not in port_tags])
if len(port_tags) > nsxlib_utils.MAX_TAGS:
LOG.warning("Cannot add external tags to port %s: "
"too many tags", port_id)
# Update the NSX port
if len(port_tags) != orig_len:
self.nsxpolicy.segment_port.update(
segment_id, port_id, tags=port_tags)

View File

@ -3460,3 +3460,34 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
def _support_vlan_router_interfaces(self): def _support_vlan_router_interfaces(self):
return self.nsxlib.feature_supported( return self.nsxlib.feature_supported(
nsxlib_consts.FEATURE_VLAN_ROUTER_INTERFACE) nsxlib_consts.FEATURE_VLAN_ROUTER_INTERFACE)
def update_port_nsx_tags(self, context, port_id, tags, is_delete=False):
"""Update backend NSX port with tags from the tagging plugin"""
ctx = q_context.get_admin_context()
_, nsx_lport_id = nsx_db.get_nsx_switch_and_port_id(
ctx.session, port_id)
if not nsx_lport_id:
LOG.info("Ignoring tags on port %s: this port has no backend "
"NSX logical port", port_id)
return
# Get the current tags on this port
lport = self.nsxlib.logical_port.get(nsx_lport_id)
port_tags = lport.get('tags')
orig_len = len(port_tags)
# Update and validate the list of tags
extra_tags = self._translate_external_tags(tags, port_id)
if is_delete:
port_tags = [tag for tag in port_tags if tag not in extra_tags]
else:
port_tags.extend(
[tag for tag in extra_tags if tag not in port_tags])
if len(port_tags) > nsxlib_utils.MAX_TAGS:
LOG.warning("Cannot add external tags to port %s: "
"too many tags", port_id)
# Update the NSX port
if len(port_tags) != orig_len:
self.nsxlib.logical_port.update(
nsx_lport_id, False, tags=port_tags)