Merge "TVD: Octavia support"
This commit is contained in:
commit
9c7991dacb
@ -52,6 +52,7 @@ from neutron_lib import exceptions as n_exc
|
||||
from vmware_nsx.common import availability_zones as nsx_com_az
|
||||
from vmware_nsx.common import config
|
||||
from vmware_nsx.common import exceptions as nsx_exc
|
||||
from vmware_nsx.common import locking
|
||||
from vmware_nsx.common import managers as nsx_managers
|
||||
from vmware_nsx.db import (
|
||||
routertype as rt_rtr)
|
||||
@ -64,6 +65,8 @@ from vmware_nsx.plugins.dvs import plugin as dvs
|
||||
from vmware_nsx.plugins.nsx_v import plugin as v
|
||||
from vmware_nsx.plugins.nsx_v3 import plugin as t
|
||||
from vmware_nsx.services.lbaas.nsx import lb_driver_v2
|
||||
from vmware_nsx.services.lbaas.octavia import octavia_listener
|
||||
from vmware_nsx.services.lbaas.octavia import tvd_wrapper as octavia_tvd
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
TVD_PLUGIN_TYPE = "Nsx-TVD"
|
||||
@ -104,6 +107,7 @@ class NsxTVDPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
def __init__(self):
|
||||
self._extension_manager = nsx_managers.ExtensionManager()
|
||||
LOG.info("Start NSX TVD Plugin")
|
||||
self.init_is_complete = False
|
||||
# Validate configuration
|
||||
config.validate_nsx_config_options()
|
||||
super(NsxTVDPlugin, self).__init__()
|
||||
@ -117,6 +121,14 @@ class NsxTVDPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
|
||||
self._unsubscribe_callback_events()
|
||||
|
||||
registry.subscribe(self.spawn_complete,
|
||||
resources.PROCESS,
|
||||
events.AFTER_SPAWN)
|
||||
|
||||
registry.subscribe(self.init_complete,
|
||||
resources.PROCESS,
|
||||
events.AFTER_INIT)
|
||||
|
||||
@staticmethod
|
||||
def plugin_type():
|
||||
return TVD_PLUGIN_TYPE
|
||||
@ -215,6 +227,65 @@ class NsxTVDPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
msg = _("Cannot use the same availability zones in NSX-V and T")
|
||||
raise nsx_exc.NsxPluginException(err_msg=msg)
|
||||
|
||||
def _get_octavia_objects(self, plugin_type):
|
||||
plugin = self.get_plugin_by_type(plugin_type)
|
||||
if plugin:
|
||||
return plugin._get_octavia_objects()
|
||||
else:
|
||||
return {'loadbalancer': None, 'listener': None, 'pool': None,
|
||||
'member': None, 'healthmonitor': None, 'l7policy': None,
|
||||
'l7rule': None}
|
||||
|
||||
def init_complete(self, resource, event, trigger, payload=None):
|
||||
with locking.LockManager.get_lock('plugin-init-complete-tvd'):
|
||||
if self.init_is_complete:
|
||||
# Should be called only once per worker
|
||||
return
|
||||
self.init_octavia()
|
||||
self.init_is_complete = True
|
||||
|
||||
def init_octavia(self):
|
||||
# Init Octavia listener and endpoints
|
||||
v_objects = self._get_octavia_objects(
|
||||
projectpluginmap.NsxPlugins.NSX_V)
|
||||
t_objects = self._get_octavia_objects(
|
||||
projectpluginmap.NsxPlugins.NSX_T)
|
||||
|
||||
self.octavia_listener = octavia_listener.NSXOctaviaListener(
|
||||
loadbalancer=octavia_tvd.OctaviaTVDWrapper(
|
||||
v_objects['loadbalancer'], t_objects['loadbalancer']),
|
||||
listener=octavia_tvd.OctaviaTVDWrapper(
|
||||
v_objects['listener'], t_objects['listener']),
|
||||
pool=octavia_tvd.OctaviaTVDWrapper(
|
||||
v_objects['pool'], t_objects['pool']),
|
||||
member=octavia_tvd.OctaviaTVDWrapper(
|
||||
v_objects['member'], t_objects['member']),
|
||||
healthmonitor=octavia_tvd.OctaviaTVDWrapper(
|
||||
v_objects['healthmonitor'], t_objects['healthmonitor']),
|
||||
l7policy=octavia_tvd.OctaviaTVDWrapper(
|
||||
v_objects['l7policy'], t_objects['l7policy']),
|
||||
l7rule=octavia_tvd.OctaviaTVDWrapper(
|
||||
v_objects['l7rule'], t_objects['l7rule']))
|
||||
|
||||
def spawn_complete(self, resource, event, trigger, payload=None):
|
||||
# This method should run only once, but after init_complete
|
||||
if not self.init_is_complete:
|
||||
self.init_complete(None, None, None)
|
||||
self.init_octavia_stats_collector()
|
||||
|
||||
def init_octavia_stats_collector(self):
|
||||
self.octavia_stats_collector = (
|
||||
octavia_listener.NSXOctaviaStatisticsCollector(
|
||||
self,
|
||||
octavia_tvd.stats_getter))
|
||||
|
||||
def start_rpc_listeners(self):
|
||||
# Run the start_rpc_listeners of one of the sub-plugins
|
||||
for plugin_type in self.plugins:
|
||||
plugin = self.plugins[plugin_type]
|
||||
if plugin.rpc_workers_supported():
|
||||
return plugin.start_rpc_listeners()
|
||||
|
||||
def _unsubscribe_callback_events(self):
|
||||
# unsubscribe the callback that should be called on all plugins
|
||||
# other that NSX-T.
|
||||
|
@ -355,10 +355,11 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
if not self.init_is_complete:
|
||||
self.init_complete(None, None, None)
|
||||
|
||||
if not self._is_sub_plugin:
|
||||
self.octavia_stats_collector = (
|
||||
octavia_listener.NSXOctaviaStatisticsCollector(
|
||||
self,
|
||||
listener_mgr.stats_getter))
|
||||
self._get_octavia_stats_getter()))
|
||||
|
||||
def init_complete(self, resource, event, trigger, payload=None):
|
||||
with locking.LockManager.get_lock('plugin-init-complete'):
|
||||
@ -388,19 +389,28 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
hk_readonly_jobs=cfg.CONF.nsxv.housekeeping_readonly_jobs)
|
||||
|
||||
# Init octavia listener and endpoints
|
||||
if not self._is_sub_plugin:
|
||||
octavia_objects = self._get_octavia_objects()
|
||||
self.octavia_listener = octavia_listener.NSXOctaviaListener(
|
||||
loadbalancer=loadbalancer_mgr.EdgeLoadBalancerManagerFromDict(
|
||||
self.nsx_v),
|
||||
listener=listener_mgr.EdgeListenerManagerFromDict(self.nsx_v),
|
||||
pool=pool_mgr.EdgePoolManagerFromDict(self.nsx_v),
|
||||
member=member_mgr.EdgeMemberManagerFromDict(self.nsx_v),
|
||||
healthmonitor=healthmon_mgr.EdgeHealthMonitorManagerFromDict(
|
||||
self.nsx_v),
|
||||
l7policy=l7policy_mgr.EdgeL7PolicyManagerFromDict(self.nsx_v),
|
||||
l7rule=l7rule_mgr.EdgeL7RuleManagerFromDict(self.nsx_v))
|
||||
**octavia_objects)
|
||||
|
||||
self.init_is_complete = True
|
||||
|
||||
def _get_octavia_objects(self):
|
||||
return {
|
||||
'loadbalancer': loadbalancer_mgr.EdgeLoadBalancerManagerFromDict(
|
||||
self.nsx_v),
|
||||
'listener': listener_mgr.EdgeListenerManagerFromDict(self.nsx_v),
|
||||
'pool': pool_mgr.EdgePoolManagerFromDict(self.nsx_v),
|
||||
'member': member_mgr.EdgeMemberManagerFromDict(self.nsx_v),
|
||||
'healthmonitor': healthmon_mgr.EdgeHealthMonitorManagerFromDict(
|
||||
self.nsx_v),
|
||||
'l7policy': l7policy_mgr.EdgeL7PolicyManagerFromDict(self.nsx_v),
|
||||
'l7rule': l7rule_mgr.EdgeL7RuleManagerFromDict(self.nsx_v)}
|
||||
|
||||
def _get_octavia_stats_getter(self):
|
||||
return listener_mgr.stats_getter
|
||||
|
||||
def _validate_nsx_version(self):
|
||||
ver = self.nsx_v.vcns.get_version()
|
||||
if version.LooseVersion(ver) < version.LooseVersion('6.2.3'):
|
||||
|
@ -450,10 +450,11 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
if not self.init_is_complete:
|
||||
self.init_complete(None, None, None)
|
||||
|
||||
if not self._is_sub_plugin:
|
||||
self.octavia_stats_collector = (
|
||||
octavia_listener.NSXOctaviaStatisticsCollector(
|
||||
self,
|
||||
listener_mgr.stats_getter))
|
||||
self._get_octavia_stats_getter()))
|
||||
|
||||
def init_complete(self, resource, event, trigger, payload=None):
|
||||
with locking.LockManager.get_lock('plugin-init-complete'):
|
||||
@ -482,18 +483,31 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
self.init_is_complete = True
|
||||
|
||||
def _init_octavia(self):
|
||||
if self._is_sub_plugin:
|
||||
# The TVD plugin will take care of this
|
||||
return
|
||||
|
||||
if not self.nsxlib.feature_supported(
|
||||
nsxlib_consts.FEATURE_LOAD_BALANCER):
|
||||
return
|
||||
|
||||
octavia_objects = self._get_octavia_objects()
|
||||
self.octavia_listener = octavia_listener.NSXOctaviaListener(
|
||||
loadbalancer=loadbalancer_mgr.EdgeLoadBalancerManagerFromDict(),
|
||||
listener=listener_mgr.EdgeListenerManagerFromDict(),
|
||||
pool=pool_mgr.EdgePoolManagerFromDict(),
|
||||
member=member_mgr.EdgeMemberManagerFromDict(),
|
||||
healthmonitor=healthmonitor_mgr.EdgeHealthMonitorManagerFromDict(),
|
||||
l7policy=l7policy_mgr.EdgeL7PolicyManagerFromDict(),
|
||||
l7rule=l7rule_mgr.EdgeL7RuleManagerFromDict())
|
||||
**octavia_objects)
|
||||
|
||||
def _get_octavia_objects(self):
|
||||
return {
|
||||
'loadbalancer': loadbalancer_mgr.EdgeLoadBalancerManagerFromDict(),
|
||||
'listener': listener_mgr.EdgeListenerManagerFromDict(),
|
||||
'pool': pool_mgr.EdgePoolManagerFromDict(),
|
||||
'member': member_mgr.EdgeMemberManagerFromDict(),
|
||||
'healthmonitor':
|
||||
healthmonitor_mgr.EdgeHealthMonitorManagerFromDict(),
|
||||
'l7policy': l7policy_mgr.EdgeL7PolicyManagerFromDict(),
|
||||
'l7rule': l7rule_mgr.EdgeL7RuleManagerFromDict()}
|
||||
|
||||
def _get_octavia_stats_getter(self):
|
||||
return listener_mgr.stats_getter
|
||||
|
||||
def _extend_fault_map(self):
|
||||
"""Extends the Neutron Fault Map.
|
||||
|
65
vmware_nsx/services/lbaas/octavia/tvd_wrapper.py
Normal file
65
vmware_nsx/services/lbaas/octavia/tvd_wrapper.py
Normal file
@ -0,0 +1,65 @@
|
||||
# Copyright 2018 VMware, Inc.
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron_lib import exceptions as n_exc
|
||||
|
||||
from vmware_nsx.extensions import projectpluginmap
|
||||
from vmware_nsx.plugins.nsx import utils as tvd_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OctaviaTVDWrapper(object):
|
||||
|
||||
def __init__(self, v_manager, t_manager):
|
||||
self.managers = {}
|
||||
if v_manager:
|
||||
self.managers[projectpluginmap.NsxPlugins.NSX_V] = v_manager
|
||||
if t_manager:
|
||||
self.managers[projectpluginmap.NsxPlugins.NSX_T] = t_manager
|
||||
|
||||
def _get_manager_by_project(self, context, project_id):
|
||||
plugin_type = tvd_utils.get_tvd_plugin_type_for_project(
|
||||
project_id, context=context)
|
||||
if not self.managers.get(plugin_type):
|
||||
LOG.error("Project %(project)s with plugin %(plugin)s has no "
|
||||
"support for Octavia", {'project': project_id,
|
||||
'plugin': plugin_type})
|
||||
raise n_exc.ServiceUnavailable()
|
||||
return self.managers[plugin_type]
|
||||
|
||||
def create(self, context, obj, completor, **args):
|
||||
manager = self._get_manager_by_project(context, obj['project_id'])
|
||||
return manager.create(context, obj, completor, **args)
|
||||
|
||||
def update(self, context, old_obj, new_obj, completor, **args):
|
||||
manager = self._get_manager_by_project(context, old_obj['project_id'])
|
||||
return manager.update(context, old_obj, new_obj, completor, **args)
|
||||
|
||||
def delete(self, context, obj, completor, **args):
|
||||
manager = self._get_manager_by_project(context, obj['project_id'])
|
||||
return manager.delete(context, obj, completor, **args)
|
||||
|
||||
|
||||
def stats_getter(context, core_plugin):
|
||||
"""Call stats of both plugins"""
|
||||
for plugin_type in [projectpluginmap.NsxPlugins.NSX_V,
|
||||
projectpluginmap.NsxPlugins.NSX_T]:
|
||||
plugin = core_plugin.get_plugin_by_type(plugin_type)
|
||||
if plugin:
|
||||
stats_getter_func = plugin._get_octavia_stats_getter()
|
||||
return stats_getter_func(context, plugin)
|
Loading…
x
Reference in New Issue
Block a user