From eb2869fe0356f620cb1907b9cdb349042e3aa5b8 Mon Sep 17 00:00:00 2001 From: Adit Sarfaty Date: Thu, 12 Jul 2018 14:13:49 +0300 Subject: [PATCH] TVD: Octavia support For the TVD plugin, the Octavia listener will use a wrapper and select the T/V implementation by the loadbalancer project Change-Id: I0540f7998e72b73058fd531f2fb16364b4f35491 --- vmware_nsx/plugins/nsx/plugin.py | 71 +++++++++++++++++++ vmware_nsx/plugins/nsx_v/plugin.py | 38 ++++++---- vmware_nsx/plugins/nsx_v3/plugin.py | 36 +++++++--- .../services/lbaas/octavia/tvd_wrapper.py | 65 +++++++++++++++++ 4 files changed, 185 insertions(+), 25 deletions(-) create mode 100644 vmware_nsx/services/lbaas/octavia/tvd_wrapper.py diff --git a/vmware_nsx/plugins/nsx/plugin.py b/vmware_nsx/plugins/nsx/plugin.py index bbd5ac8728..8c1d112495 100644 --- a/vmware_nsx/plugins/nsx/plugin.py +++ b/vmware_nsx/plugins/nsx/plugin.py @@ -52,6 +52,7 @@ from neutron_lib import exceptions as n_exc from vmware_nsx.common import availability_zones as nsx_com_az from vmware_nsx.common import config from vmware_nsx.common import exceptions as nsx_exc +from vmware_nsx.common import locking from vmware_nsx.common import managers as nsx_managers from vmware_nsx.db import ( routertype as rt_rtr) @@ -64,6 +65,8 @@ from vmware_nsx.plugins.dvs import plugin as dvs from vmware_nsx.plugins.nsx_v import plugin as v from vmware_nsx.plugins.nsx_v3 import plugin as t from vmware_nsx.services.lbaas.nsx import lb_driver_v2 +from vmware_nsx.services.lbaas.octavia import octavia_listener +from vmware_nsx.services.lbaas.octavia import tvd_wrapper as octavia_tvd LOG = logging.getLogger(__name__) TVD_PLUGIN_TYPE = "Nsx-TVD" @@ -104,6 +107,7 @@ class NsxTVDPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, def __init__(self): self._extension_manager = nsx_managers.ExtensionManager() LOG.info("Start NSX TVD Plugin") + self.init_is_complete = False # Validate configuration config.validate_nsx_config_options() super(NsxTVDPlugin, self).__init__() @@ -117,6 +121,14 @@ class NsxTVDPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, self._unsubscribe_callback_events() + registry.subscribe(self.spawn_complete, + resources.PROCESS, + events.AFTER_SPAWN) + + registry.subscribe(self.init_complete, + resources.PROCESS, + events.AFTER_INIT) + @staticmethod def plugin_type(): return TVD_PLUGIN_TYPE @@ -215,6 +227,65 @@ class NsxTVDPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, msg = _("Cannot use the same availability zones in NSX-V and T") raise nsx_exc.NsxPluginException(err_msg=msg) + def _get_octavia_objects(self, plugin_type): + plugin = self.get_plugin_by_type(plugin_type) + if plugin: + return plugin._get_octavia_objects() + else: + return {'loadbalancer': None, 'listener': None, 'pool': None, + 'member': None, 'healthmonitor': None, 'l7policy': None, + 'l7rule': None} + + def init_complete(self, resource, event, trigger, payload=None): + with locking.LockManager.get_lock('plugin-init-complete-tvd'): + if self.init_is_complete: + # Should be called only once per worker + return + self.init_octavia() + self.init_is_complete = True + + def init_octavia(self): + # Init Octavia listener and endpoints + v_objects = self._get_octavia_objects( + projectpluginmap.NsxPlugins.NSX_V) + t_objects = self._get_octavia_objects( + projectpluginmap.NsxPlugins.NSX_T) + + self.octavia_listener = octavia_listener.NSXOctaviaListener( + loadbalancer=octavia_tvd.OctaviaTVDWrapper( + v_objects['loadbalancer'], t_objects['loadbalancer']), + listener=octavia_tvd.OctaviaTVDWrapper( + v_objects['listener'], t_objects['listener']), + pool=octavia_tvd.OctaviaTVDWrapper( + v_objects['pool'], t_objects['pool']), + member=octavia_tvd.OctaviaTVDWrapper( + v_objects['member'], t_objects['member']), + healthmonitor=octavia_tvd.OctaviaTVDWrapper( + v_objects['healthmonitor'], t_objects['healthmonitor']), + l7policy=octavia_tvd.OctaviaTVDWrapper( + v_objects['l7policy'], t_objects['l7policy']), + l7rule=octavia_tvd.OctaviaTVDWrapper( + v_objects['l7rule'], t_objects['l7rule'])) + + def spawn_complete(self, resource, event, trigger, payload=None): + # This method should run only once, but after init_complete + if not self.init_is_complete: + self.init_complete(None, None, None) + self.init_octavia_stats_collector() + + def init_octavia_stats_collector(self): + self.octavia_stats_collector = ( + octavia_listener.NSXOctaviaStatisticsCollector( + self, + octavia_tvd.stats_getter)) + + def start_rpc_listeners(self): + # Run the start_rpc_listeners of one of the sub-plugins + for plugin_type in self.plugins: + plugin = self.plugins[plugin_type] + if plugin.rpc_workers_supported(): + return plugin.start_rpc_listeners() + def _unsubscribe_callback_events(self): # unsubscribe the callback that should be called on all plugins # other that NSX-T. diff --git a/vmware_nsx/plugins/nsx_v/plugin.py b/vmware_nsx/plugins/nsx_v/plugin.py index 3446ba15ba..d6d0ef31b6 100644 --- a/vmware_nsx/plugins/nsx_v/plugin.py +++ b/vmware_nsx/plugins/nsx_v/plugin.py @@ -355,10 +355,11 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, if not self.init_is_complete: self.init_complete(None, None, None) - self.octavia_stats_collector = ( - octavia_listener.NSXOctaviaStatisticsCollector( - self, - listener_mgr.stats_getter)) + if not self._is_sub_plugin: + self.octavia_stats_collector = ( + octavia_listener.NSXOctaviaStatisticsCollector( + self, + self._get_octavia_stats_getter())) def init_complete(self, resource, event, trigger, payload=None): with locking.LockManager.get_lock('plugin-init-complete'): @@ -388,19 +389,28 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, hk_readonly_jobs=cfg.CONF.nsxv.housekeeping_readonly_jobs) # Init octavia listener and endpoints - self.octavia_listener = octavia_listener.NSXOctaviaListener( - loadbalancer=loadbalancer_mgr.EdgeLoadBalancerManagerFromDict( - self.nsx_v), - listener=listener_mgr.EdgeListenerManagerFromDict(self.nsx_v), - pool=pool_mgr.EdgePoolManagerFromDict(self.nsx_v), - member=member_mgr.EdgeMemberManagerFromDict(self.nsx_v), - healthmonitor=healthmon_mgr.EdgeHealthMonitorManagerFromDict( - self.nsx_v), - l7policy=l7policy_mgr.EdgeL7PolicyManagerFromDict(self.nsx_v), - l7rule=l7rule_mgr.EdgeL7RuleManagerFromDict(self.nsx_v)) + if not self._is_sub_plugin: + octavia_objects = self._get_octavia_objects() + self.octavia_listener = octavia_listener.NSXOctaviaListener( + **octavia_objects) self.init_is_complete = True + def _get_octavia_objects(self): + return { + 'loadbalancer': loadbalancer_mgr.EdgeLoadBalancerManagerFromDict( + self.nsx_v), + 'listener': listener_mgr.EdgeListenerManagerFromDict(self.nsx_v), + 'pool': pool_mgr.EdgePoolManagerFromDict(self.nsx_v), + 'member': member_mgr.EdgeMemberManagerFromDict(self.nsx_v), + 'healthmonitor': healthmon_mgr.EdgeHealthMonitorManagerFromDict( + self.nsx_v), + 'l7policy': l7policy_mgr.EdgeL7PolicyManagerFromDict(self.nsx_v), + 'l7rule': l7rule_mgr.EdgeL7RuleManagerFromDict(self.nsx_v)} + + def _get_octavia_stats_getter(self): + return listener_mgr.stats_getter + def _validate_nsx_version(self): ver = self.nsx_v.vcns.get_version() if version.LooseVersion(ver) < version.LooseVersion('6.2.3'): diff --git a/vmware_nsx/plugins/nsx_v3/plugin.py b/vmware_nsx/plugins/nsx_v3/plugin.py index de0fbcf8cf..5735e0ac93 100644 --- a/vmware_nsx/plugins/nsx_v3/plugin.py +++ b/vmware_nsx/plugins/nsx_v3/plugin.py @@ -450,10 +450,11 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, if not self.init_is_complete: self.init_complete(None, None, None) - self.octavia_stats_collector = ( - octavia_listener.NSXOctaviaStatisticsCollector( - self, - listener_mgr.stats_getter)) + if not self._is_sub_plugin: + self.octavia_stats_collector = ( + octavia_listener.NSXOctaviaStatisticsCollector( + self, + self._get_octavia_stats_getter())) def init_complete(self, resource, event, trigger, payload=None): with locking.LockManager.get_lock('plugin-init-complete'): @@ -482,18 +483,31 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, self.init_is_complete = True def _init_octavia(self): + if self._is_sub_plugin: + # The TVD plugin will take care of this + return + if not self.nsxlib.feature_supported( nsxlib_consts.FEATURE_LOAD_BALANCER): return + octavia_objects = self._get_octavia_objects() self.octavia_listener = octavia_listener.NSXOctaviaListener( - loadbalancer=loadbalancer_mgr.EdgeLoadBalancerManagerFromDict(), - listener=listener_mgr.EdgeListenerManagerFromDict(), - pool=pool_mgr.EdgePoolManagerFromDict(), - member=member_mgr.EdgeMemberManagerFromDict(), - healthmonitor=healthmonitor_mgr.EdgeHealthMonitorManagerFromDict(), - l7policy=l7policy_mgr.EdgeL7PolicyManagerFromDict(), - l7rule=l7rule_mgr.EdgeL7RuleManagerFromDict()) + **octavia_objects) + + def _get_octavia_objects(self): + return { + 'loadbalancer': loadbalancer_mgr.EdgeLoadBalancerManagerFromDict(), + 'listener': listener_mgr.EdgeListenerManagerFromDict(), + 'pool': pool_mgr.EdgePoolManagerFromDict(), + 'member': member_mgr.EdgeMemberManagerFromDict(), + 'healthmonitor': + healthmonitor_mgr.EdgeHealthMonitorManagerFromDict(), + 'l7policy': l7policy_mgr.EdgeL7PolicyManagerFromDict(), + 'l7rule': l7rule_mgr.EdgeL7RuleManagerFromDict()} + + def _get_octavia_stats_getter(self): + return listener_mgr.stats_getter def _extend_fault_map(self): """Extends the Neutron Fault Map. diff --git a/vmware_nsx/services/lbaas/octavia/tvd_wrapper.py b/vmware_nsx/services/lbaas/octavia/tvd_wrapper.py new file mode 100644 index 0000000000..b8980b7a0f --- /dev/null +++ b/vmware_nsx/services/lbaas/octavia/tvd_wrapper.py @@ -0,0 +1,65 @@ +# Copyright 2018 VMware, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +from neutron_lib import exceptions as n_exc + +from vmware_nsx.extensions import projectpluginmap +from vmware_nsx.plugins.nsx import utils as tvd_utils + +LOG = logging.getLogger(__name__) + + +class OctaviaTVDWrapper(object): + + def __init__(self, v_manager, t_manager): + self.managers = {} + if v_manager: + self.managers[projectpluginmap.NsxPlugins.NSX_V] = v_manager + if t_manager: + self.managers[projectpluginmap.NsxPlugins.NSX_T] = t_manager + + def _get_manager_by_project(self, context, project_id): + plugin_type = tvd_utils.get_tvd_plugin_type_for_project( + project_id, context=context) + if not self.managers.get(plugin_type): + LOG.error("Project %(project)s with plugin %(plugin)s has no " + "support for Octavia", {'project': project_id, + 'plugin': plugin_type}) + raise n_exc.ServiceUnavailable() + return self.managers[plugin_type] + + def create(self, context, obj, completor, **args): + manager = self._get_manager_by_project(context, obj['project_id']) + return manager.create(context, obj, completor, **args) + + def update(self, context, old_obj, new_obj, completor, **args): + manager = self._get_manager_by_project(context, old_obj['project_id']) + return manager.update(context, old_obj, new_obj, completor, **args) + + def delete(self, context, obj, completor, **args): + manager = self._get_manager_by_project(context, obj['project_id']) + return manager.delete(context, obj, completor, **args) + + +def stats_getter(context, core_plugin): + """Call stats of both plugins""" + for plugin_type in [projectpluginmap.NsxPlugins.NSX_V, + projectpluginmap.NsxPlugins.NSX_T]: + plugin = core_plugin.get_plugin_by_type(plugin_type) + if plugin: + stats_getter_func = plugin._get_octavia_stats_getter() + return stats_getter_func(context, plugin)