NSX|V+V3: Octavia driver

Implementing the Octavia support for NSX-V & NSX-T.
Follow up patches will handle the TVD plugin, Status updates,
and migration.

Since Octavia is not (yet?) in the requirements, using a hack to allow unittests
to be skipped.

Co-Authored-by: Adit Sarfaty <asarfaty@vmware.com>
Change-Id: Iadb24e7eadcab658faf3e646cc528c2a8a6976e5
This commit is contained in:
Kobi Samoray 2018-05-24 10:15:26 +03:00 committed by Adit Sarfaty
parent 759f015542
commit 83d9b3abdd
36 changed files with 2090 additions and 104 deletions

View File

@ -131,3 +131,4 @@
- openstack/neutron-dynamic-routing - openstack/neutron-dynamic-routing
- openstack/neutron-vpnaas - openstack/neutron-vpnaas
- openstack/tap-as-a-service - openstack/tap-as-a-service
- openstack/octavia

View File

@ -30,7 +30,7 @@ function _nsxv_ini_set {
} }
function install_neutron_projects { function install_neutron_projects {
pkg_list="networking-l2gw networking-sfc neutron-lbaas neutron-fwaas neutron-dynamic-routing neutron-vpnaas vmware-nsxlib" pkg_list="networking-l2gw networking-sfc neutron-lbaas neutron-fwaas neutron-dynamic-routing neutron-vpnaas octavia vmware-nsxlib"
for pkg in `echo $pkg_list` for pkg in `echo $pkg_list`
do do
if is_plugin_enabled $pkg; then if is_plugin_enabled $pkg; then

View File

@ -238,6 +238,30 @@ Add neutron-vpnaas repo as an external repository and configure following flags
[DEFAULT] [DEFAULT]
api_extensions_path = $DEST/neutron-vpnaas/neutron_vpnaas/extensions api_extensions_path = $DEST/neutron-vpnaas/neutron_vpnaas/extensions
Octavia
~~~~~~~
Add octavia repo as an external repository and configure following flags in ``local.conf``::
[[local|localrc]]
OCTAVIA_NODE=api
DISABLE_AMP_IMAGE_BUILD=True
enable_plugin octavia $GIT_BASE/openstack/octavia.git
enable_plugin octavia-dashboard $GIT_BASE/openstack/octavia-dashboard
enable_service octavia
enable_service o-api
[[post-config|$OCTAVIA_CONF]]
[DEFAULT]
verbose = True
debug = True
[api_settings]
default_provider_driver=vmwareedge
enabled_provider_drivers=vmwareedge:NSX
[oslo_messaging]
topic=vmwarensxv_edge_lb
NSX-TVD NSX-TVD
------- -------

View File

@ -55,6 +55,7 @@ munch==2.1.0
netaddr==0.7.18 netaddr==0.7.18
netifaces==0.10.4 netifaces==0.10.4
neutron-lib==1.18.0 neutron-lib==1.18.0
octavia==3.0.0
openstackdocstheme==1.18.1 openstackdocstheme==1.18.1
openstacksdk==0.11.2 openstacksdk==0.11.2
os-client-config==1.28.0 os-client-config==1.28.0

View File

@ -0,0 +1,6 @@
---
prelude: >
Support Octavia loadbalancer support in NSXv and NSXv3 plugins.
features:
- |
NSXv and NSXv3 plugins now support Octavia loadbalancer.

View File

@ -38,6 +38,7 @@ neutron-fwaas>=12.0.0 # Apache-2.0
neutron-vpnaas>=12.0.0 # Apache-2.0 neutron-vpnaas>=12.0.0 # Apache-2.0
neutron-dynamic-routing>=12.0.0 # Apache-2.0 neutron-dynamic-routing>=12.0.0 # Apache-2.0
vmware-nsxlib>=12.0.0 # Apache-2.0 vmware-nsxlib>=12.0.0 # Apache-2.0
#octavia>=3.0.0 # Apache-2.0
# The comment below indicates this project repo is current with neutron-lib # The comment below indicates this project repo is current with neutron-lib
# and should receive neutron-lib consumption patches as they are released # and should receive neutron-lib consumption patches as they are released

View File

@ -94,7 +94,8 @@ vmware_nsx.neutron.nsxv3.housekeeper.jobs =
orphaned_logical_router = vmware_nsx.plugins.nsx_v3.housekeeper.orphaned_logical_router:OrphanedLogicalRouterJob orphaned_logical_router = vmware_nsx.plugins.nsx_v3.housekeeper.orphaned_logical_router:OrphanedLogicalRouterJob
orphaned_firewall_section = vmware_nsx.plugins.nsx_v3.housekeeper.orphaned_firewall_section:OrphanedFirewallSectionJob orphaned_firewall_section = vmware_nsx.plugins.nsx_v3.housekeeper.orphaned_firewall_section:OrphanedFirewallSectionJob
mismatch_logical_port = vmware_nsx.plugins.nsx_v3.housekeeper.mismatch_logical_port:MismatchLogicalportJob mismatch_logical_port = vmware_nsx.plugins.nsx_v3.housekeeper.mismatch_logical_port:MismatchLogicalportJob
octavia.api.drivers =
vmwareedge = vmware_nsx.services.lbaas.octavia.octavia_driver:NSXOctaviaDriver
[build_sphinx] [build_sphinx]
source-dir = doc/source source-dir = doc/source
build-dir = doc/build build-dir = doc/build

View File

@ -34,6 +34,7 @@ commands =
pip install -q -e "git+https://git.openstack.org/openstack/neutron-fwaas#egg=neutron_fwaas" pip install -q -e "git+https://git.openstack.org/openstack/neutron-fwaas#egg=neutron_fwaas"
pip install -q -e "git+https://git.openstack.org/openstack/neutron-dynamic-routing#egg=neutron_dynamic_routing" pip install -q -e "git+https://git.openstack.org/openstack/neutron-dynamic-routing#egg=neutron_dynamic_routing"
pip install -q -e "git+https://git.openstack.org/openstack/neutron-vpnaas#egg=neutron_vpnaas" pip install -q -e "git+https://git.openstack.org/openstack/neutron-vpnaas#egg=neutron_vpnaas"
pip install -q -e "git+https://git.openstack.org/openstack/octavia#egg=octavia"
pip install -q -e "git+https://git.openstack.org/openstack/vmware-nsxlib#egg=vmware_nsxlib" pip install -q -e "git+https://git.openstack.org/openstack/vmware-nsxlib#egg=vmware_nsxlib"
pip install -q -e "git+https://git.openstack.org/openstack/neutron#egg=neutron" pip install -q -e "git+https://git.openstack.org/openstack/neutron#egg=neutron"

View File

@ -262,6 +262,10 @@ nsx_common_opts = [
default=[], default=[],
help=_("(Optional) List of email addresses for " help=_("(Optional) List of email addresses for "
"notifications.")), "notifications.")),
cfg.IntOpt('octavia_stats_interval',
default=10,
help=_("Interval in seconds for Octavia statistics reporting. "
"0 means no reporting")),
] ]
nsx_v3_and_p = [ nsx_v3_and_p = [

View File

@ -559,6 +559,10 @@ def get_nsx_lbaas_loadbalancer_binding(session, loadbalancer_id):
return return
def get_nsx_lbaas_loadbalancer_bindings(session):
return session.query(nsx_models.NsxLbaasLoadbalancer).all()
def get_nsx_lbaas_loadbalancer_binding_by_service(session, lb_service_id): def get_nsx_lbaas_loadbalancer_binding_by_service(session, lb_service_id):
return session.query( return session.query(
nsx_models.NsxLbaasLoadbalancer).filter_by( nsx_models.NsxLbaasLoadbalancer).filter_by(
@ -591,7 +595,8 @@ def get_nsx_lbaas_listener_binding(session, loadbalancer_id, listener_id):
return return
def get_nsx_lbaas_listener_binding_by_vs(session, loadbalancer_id, lb_vs_id): def get_nsx_lbaas_listener_binding_by_lb_and_vs(session, loadbalancer_id,
lb_vs_id):
try: try:
return session.query( return session.query(
nsx_models.NsxLbaasListener).filter_by( nsx_models.NsxLbaasListener).filter_by(
@ -601,6 +606,15 @@ def get_nsx_lbaas_listener_binding_by_vs(session, loadbalancer_id, lb_vs_id):
return return
def get_nsx_lbaas_listener_binding_by_vs_id(session, lb_vs_id):
try:
return session.query(
nsx_models.NsxLbaasListener).filter_by(
lb_vs_id=lb_vs_id).one()
except exc.NoResultFound:
return
def delete_nsx_lbaas_listener_binding(session, loadbalancer_id, listener_id): def delete_nsx_lbaas_listener_binding(session, loadbalancer_id, listener_id):
return (session.query(nsx_models.NsxLbaasListener). return (session.query(nsx_models.NsxLbaasListener).
filter_by(loadbalancer_id=loadbalancer_id, filter_by(loadbalancer_id=loadbalancer_id,

View File

@ -1 +1 @@
0dbeda408e41 fc6308289aca

View File

@ -0,0 +1,51 @@
# Copyright 2018 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""lbaas_no_foreign_key
Revision ID: fc6308289aca
Revises: 0dbeda408e41
Create Date: 2018-06-04 13:47:09.450116
"""
from alembic import op
from sqlalchemy.engine import reflection
from neutron.db import migration
# revision identifiers, used by Alembic.
revision = 'fc6308289aca'
down_revision = '0dbeda408e41'
depends_on = ('717f7f63a219')
def upgrade():
for table_name in ['nsxv3_lbaas_loadbalancers',
'nsxv3_lbaas_listeners',
'nsxv3_lbaas_pools',
'nsxv3_lbaas_monitors',
'nsxv3_lbaas_l7rules',
'nsxv3_lbaas_l7policies',
'nsxv_lbaas_loadbalancer_bindings',
'nsxv_lbaas_listener_bindings',
'nsxv_lbaas_pool_bindings',
'nsxv_lbaas_monitor_bindings',
'nsxv_lbaas_l7policy_bindings']:
if migration.schema_has_table(table_name):
inspector = reflection.Inspector.from_engine(op.get_bind())
fk_constraint = inspector.get_foreign_keys(table_name)[0]
op.drop_constraint(fk_constraint.get('name'), table_name,
type_='foreignkey')

View File

@ -399,12 +399,7 @@ class NsxLbaasLoadbalancer(model_base.BASEV2, models.TimestampMixin):
and NSX logical router id. and NSX logical router id.
""" """
__tablename__ = 'nsxv3_lbaas_loadbalancers' __tablename__ = 'nsxv3_lbaas_loadbalancers'
fk_name = 'fk_nsxv3_lbaas_loadbalancers_id' loadbalancer_id = sa.Column(sa.String(36), primary_key=True)
loadbalancer_id = sa.Column(sa.String(36),
sa.ForeignKey('lbaas_loadbalancers.id',
name=fk_name,
ondelete="CASCADE"),
primary_key=True)
lb_router_id = sa.Column(sa.String(36), nullable=False) lb_router_id = sa.Column(sa.String(36), nullable=False)
lb_service_id = sa.Column(sa.String(36), nullable=False) lb_service_id = sa.Column(sa.String(36), nullable=False)
vip_address = sa.Column(sa.String(36), nullable=False) vip_address = sa.Column(sa.String(36), nullable=False)
@ -414,11 +409,7 @@ class NsxLbaasListener(model_base.BASEV2, models.TimestampMixin):
"""Stores the mapping between LBaaS listener and NSX LB virtual server""" """Stores the mapping between LBaaS listener and NSX LB virtual server"""
__tablename__ = 'nsxv3_lbaas_listeners' __tablename__ = 'nsxv3_lbaas_listeners'
loadbalancer_id = sa.Column(sa.String(36), primary_key=True) loadbalancer_id = sa.Column(sa.String(36), primary_key=True)
listener_id = sa.Column(sa.String(36), listener_id = sa.Column(sa.String(36), primary_key=True)
sa.ForeignKey('lbaas_listeners.id',
name='fk_nsxv3_lbaas_listeners_id',
ondelete="CASCADE"),
primary_key=True)
app_profile_id = sa.Column(sa.String(36), nullable=False) app_profile_id = sa.Column(sa.String(36), nullable=False)
lb_vs_id = sa.Column(sa.String(36), nullable=False) lb_vs_id = sa.Column(sa.String(36), nullable=False)
@ -427,11 +418,7 @@ class NsxLbaasPool(model_base.BASEV2, models.TimestampMixin):
"""Stores the mapping between LBaaS pool and NSX LB Pool""" """Stores the mapping between LBaaS pool and NSX LB Pool"""
__tablename__ = 'nsxv3_lbaas_pools' __tablename__ = 'nsxv3_lbaas_pools'
loadbalancer_id = sa.Column(sa.String(36), primary_key=True) loadbalancer_id = sa.Column(sa.String(36), primary_key=True)
pool_id = sa.Column(sa.String(36), pool_id = sa.Column(sa.String(36), primary_key=True)
sa.ForeignKey('lbaas_pools.id',
name='fk_nsxv3_lbaas_pools_id',
ondelete="CASCADE"),
primary_key=True)
lb_pool_id = sa.Column(sa.String(36), nullable=False) lb_pool_id = sa.Column(sa.String(36), nullable=False)
lb_vs_id = sa.Column(sa.String(36)) lb_vs_id = sa.Column(sa.String(36))
@ -441,11 +428,7 @@ class NsxLbaasMonitor(model_base.BASEV2, models.TimestampMixin):
__tablename__ = 'nsxv3_lbaas_monitors' __tablename__ = 'nsxv3_lbaas_monitors'
loadbalancer_id = sa.Column(sa.String(36), primary_key=True) loadbalancer_id = sa.Column(sa.String(36), primary_key=True)
pool_id = sa.Column(sa.String(36), primary_key=True) pool_id = sa.Column(sa.String(36), primary_key=True)
hm_id = sa.Column(sa.String(36), hm_id = sa.Column(sa.String(36), primary_key=True)
sa.ForeignKey('lbaas_healthmonitors.id',
name='fk_nsxv3_lbaas_healthmonitors_id',
ondelete="CASCADE"),
primary_key=True)
lb_monitor_id = sa.Column(sa.String(36), nullable=False) lb_monitor_id = sa.Column(sa.String(36), nullable=False)
lb_pool_id = sa.Column(sa.String(36), nullable=False) lb_pool_id = sa.Column(sa.String(36), nullable=False)
@ -462,11 +445,7 @@ class NsxLbaasL7Rule(model_base.BASEV2, models.TimestampMixin):
__tablename__ = 'nsxv3_lbaas_l7rules' __tablename__ = 'nsxv3_lbaas_l7rules'
loadbalancer_id = sa.Column(sa.String(36), primary_key=True) loadbalancer_id = sa.Column(sa.String(36), primary_key=True)
l7policy_id = sa.Column(sa.String(36), primary_key=True) l7policy_id = sa.Column(sa.String(36), primary_key=True)
l7rule_id = sa.Column(sa.String(36), l7rule_id = sa.Column(sa.String(36), primary_key=True)
sa.ForeignKey('lbaas_l7rules.id',
name='fk_nsxv3_lbaas_l7rules_id',
ondelete="CASCADE"),
primary_key=True)
lb_rule_id = sa.Column(sa.String(36), nullable=False) lb_rule_id = sa.Column(sa.String(36), nullable=False)
lb_vs_id = sa.Column(sa.String(36), nullable=False) lb_vs_id = sa.Column(sa.String(36), nullable=False)
@ -474,11 +453,7 @@ class NsxLbaasL7Rule(model_base.BASEV2, models.TimestampMixin):
class NsxLbaasL7Policy(model_base.BASEV2, models.TimestampMixin): class NsxLbaasL7Policy(model_base.BASEV2, models.TimestampMixin):
"""Stores the mapping between LBaaS l7policy and NSX LB rule""" """Stores the mapping between LBaaS l7policy and NSX LB rule"""
__tablename__ = 'nsxv3_lbaas_l7policies' __tablename__ = 'nsxv3_lbaas_l7policies'
l7policy_id = sa.Column(sa.String(36), l7policy_id = sa.Column(sa.String(36), primary_key=True)
sa.ForeignKey('lbaas_l7policies.id',
name='fk_nsxv3_lbaas_l7policies_id',
ondelete="CASCADE"),
primary_key=True)
lb_rule_id = sa.Column(sa.String(36), nullable=False) lb_rule_id = sa.Column(sa.String(36), nullable=False)
lb_vs_id = sa.Column(sa.String(36), nullable=False) lb_vs_id = sa.Column(sa.String(36), nullable=False)

View File

@ -684,6 +684,15 @@ def add_nsxv_lbaas_loadbalancer_binding(
return binding return binding
def get_nsxv_lbaas_loadbalancer_bindings(session, filters=None,
like_filters=None):
session = db_api.get_reader_session()
query = session.query(nsxv_models.NsxvLbaasLoadbalancerBinding)
return nsx_db._apply_filters_to_query(
query, nsxv_models.NsxvLbaasLoadbalancerBinding, filters,
like_filters).all()
def get_nsxv_lbaas_loadbalancer_binding(session, loadbalancer_id): def get_nsxv_lbaas_loadbalancer_binding(session, loadbalancer_id):
try: try:
return session.query( return session.query(
@ -731,6 +740,15 @@ def del_nsxv_lbaas_listener_binding(session, loadbalancer_id, listener_id):
listener_id=listener_id).delete()) listener_id=listener_id).delete())
def get_nsxv_lbaas_listener_binding_by_vse(session, loadbalancer_id, vse_id):
try:
return session.query(
nsxv_models.NsxvLbaasListenerBinding).filter_by(
loadbalancer_id=loadbalancer_id, vse_id=vse_id).one()
except exc.NoResultFound:
return
def add_nsxv_lbaas_pool_binding(session, loadbalancer_id, pool_id, def add_nsxv_lbaas_pool_binding(session, loadbalancer_id, pool_id,
edge_pool_id): edge_pool_id):
with session.begin(subtransactions=True): with session.begin(subtransactions=True):

View File

@ -252,11 +252,7 @@ class NsxvLbaasLoadbalancerBinding(model_base.BASEV2, models.TimestampMixin):
__tablename__ = 'nsxv_lbaas_loadbalancer_bindings' __tablename__ = 'nsxv_lbaas_loadbalancer_bindings'
loadbalancer_id = sa.Column(sa.String(36), loadbalancer_id = sa.Column(sa.String(36), primary_key=True)
sa.ForeignKey('lbaas_loadbalancers.id',
name='fk_lbaas_loadbalancers_id',
ondelete="CASCADE"),
primary_key=True)
edge_id = sa.Column(sa.String(36), nullable=False) edge_id = sa.Column(sa.String(36), nullable=False)
edge_fw_rule_id = sa.Column(sa.String(36), nullable=False) edge_fw_rule_id = sa.Column(sa.String(36), nullable=False)
vip_address = sa.Column(sa.String(36), nullable=False) vip_address = sa.Column(sa.String(36), nullable=False)
@ -268,11 +264,7 @@ class NsxvLbaasListenerBinding(model_base.BASEV2, models.TimestampMixin):
__tablename__ = 'nsxv_lbaas_listener_bindings' __tablename__ = 'nsxv_lbaas_listener_bindings'
loadbalancer_id = sa.Column(sa.String(36), primary_key=True) loadbalancer_id = sa.Column(sa.String(36), primary_key=True)
listener_id = sa.Column(sa.String(36), listener_id = sa.Column(sa.String(36), primary_key=True)
sa.ForeignKey('lbaas_listeners.id',
name='fk_lbaas_listeners_id',
ondelete="CASCADE"),
primary_key=True)
app_profile_id = sa.Column(sa.String(36), nullable=False) app_profile_id = sa.Column(sa.String(36), nullable=False)
vse_id = sa.Column(sa.String(36), nullable=False) vse_id = sa.Column(sa.String(36), nullable=False)
@ -283,11 +275,7 @@ class NsxvLbaasPoolBinding(model_base.BASEV2, models.TimestampMixin):
__tablename__ = 'nsxv_lbaas_pool_bindings' __tablename__ = 'nsxv_lbaas_pool_bindings'
loadbalancer_id = sa.Column(sa.String(36), primary_key=True) loadbalancer_id = sa.Column(sa.String(36), primary_key=True)
pool_id = sa.Column(sa.String(36), pool_id = sa.Column(sa.String(36), primary_key=True)
sa.ForeignKey('lbaas_pools.id',
name='fk_lbaas_pools_id',
ondelete="CASCADE"),
primary_key=True)
edge_pool_id = sa.Column(sa.String(36), nullable=False) edge_pool_id = sa.Column(sa.String(36), nullable=False)
@ -298,11 +286,7 @@ class NsxvLbaasMonitorBinding(model_base.BASEV2, models.TimestampMixin):
loadbalancer_id = sa.Column(sa.String(36), primary_key=True) loadbalancer_id = sa.Column(sa.String(36), primary_key=True)
pool_id = sa.Column(sa.String(36), primary_key=True) pool_id = sa.Column(sa.String(36), primary_key=True)
hm_id = sa.Column(sa.String(36), hm_id = sa.Column(sa.String(36), primary_key=True)
sa.ForeignKey('lbaas_healthmonitors.id',
name='fk_lbaas_healthmonitors_id',
ondelete="CASCADE"),
primary_key=True)
edge_id = sa.Column(sa.String(36), primary_key=True) edge_id = sa.Column(sa.String(36), primary_key=True)
edge_mon_id = sa.Column(sa.String(36), nullable=False) edge_mon_id = sa.Column(sa.String(36), nullable=False)
@ -322,11 +306,7 @@ class NsxvLbaasL7PolicyBinding(model_base.BASEV2, models.TimestampMixin):
__tablename__ = 'nsxv_lbaas_l7policy_bindings' __tablename__ = 'nsxv_lbaas_l7policy_bindings'
policy_id = sa.Column(sa.String(36), policy_id = sa.Column(sa.String(36), primary_key=True)
sa.ForeignKey('lbaas_l7policies.id',
name='fk_lbaas_l7policies_id',
ondelete="CASCADE"),
primary_key=True)
edge_id = sa.Column(sa.String(36), nullable=False) edge_id = sa.Column(sa.String(36), nullable=False)
edge_app_rule_id = sa.Column(sa.String(36), nullable=False) edge_app_rule_id = sa.Column(sa.String(36), nullable=False)

View File

@ -16,7 +16,6 @@ from oslo_log import log as logging
from neutron_lib import constants as n_consts from neutron_lib import constants as n_consts
from neutron_lib.db import api as db_api from neutron_lib.db import api as db_api
from neutron_lib.plugins import constants as plugin_const
from vmware_nsx._i18n import _ from vmware_nsx._i18n import _
from vmware_nsx.common import exceptions as nsxv_exc from vmware_nsx.common import exceptions as nsxv_exc
@ -26,6 +25,7 @@ from vmware_nsx.plugins.nsx_v.drivers import (
abstract_router_driver as router_driver) abstract_router_driver as router_driver)
from vmware_nsx.plugins.nsx_v import plugin as nsx_v from vmware_nsx.plugins.nsx_v import plugin as nsx_v
from vmware_nsx.plugins.nsx_v.vshield import edge_utils from vmware_nsx.plugins.nsx_v.vshield import edge_utils
from vmware_nsx.services.lbaas.octavia import constants as oct_const
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -279,9 +279,10 @@ class RouterExclusiveDriver(router_driver.RouterBaseDriver):
def _check_lb_on_subnet(self, context, subnet_id, router_id): def _check_lb_on_subnet(self, context, subnet_id, router_id):
# Check lbaas # Check lbaas
dev_owner_v1 = 'neutron:' + plugin_const.LOADBALANCER dev_owner_v1 = n_consts.DEVICE_OWNER_LOADBALANCER
dev_owner_v2 = 'neutron:' + plugin_const.LOADBALANCERV2 dev_owner_v2 = n_consts.DEVICE_OWNER_LOADBALANCERV2
filters = {'device_owner': [dev_owner_v1, dev_owner_v2], dev_owner_oct = oct_const.DEVICE_OWNER_OCTAVIA
filters = {'device_owner': [dev_owner_v1, dev_owner_v2, dev_owner_oct],
'fixed_ips': {'subnet_id': [subnet_id]}} 'fixed_ips': {'subnet_id': [subnet_id]}}
ports = super(nsx_v.NsxVPluginV2, self.plugin).get_ports( ports = super(nsx_v.NsxVPluginV2, self.plugin).get_ports(
context, filters=filters) context, filters=filters)

View File

@ -143,6 +143,15 @@ from vmware_nsx.plugins.nsx_v.vshield import securitygroup_utils
from vmware_nsx.plugins.nsx_v.vshield import vcns_driver from vmware_nsx.plugins.nsx_v.vshield import vcns_driver
from vmware_nsx.services.flowclassifier.nsx_v import utils as fc_utils from vmware_nsx.services.flowclassifier.nsx_v import utils as fc_utils
from vmware_nsx.services.fwaas.nsx_v import fwaas_callbacks from vmware_nsx.services.fwaas.nsx_v import fwaas_callbacks
from vmware_nsx.services.lbaas.nsx_v.implementation import healthmon_mgr
from vmware_nsx.services.lbaas.nsx_v.implementation import l7policy_mgr
from vmware_nsx.services.lbaas.nsx_v.implementation import l7rule_mgr
from vmware_nsx.services.lbaas.nsx_v.implementation import listener_mgr
from vmware_nsx.services.lbaas.nsx_v.implementation import loadbalancer_mgr
from vmware_nsx.services.lbaas.nsx_v.implementation import member_mgr
from vmware_nsx.services.lbaas.nsx_v.implementation import pool_mgr
from vmware_nsx.services.lbaas.octavia import constants as oct_const
from vmware_nsx.services.lbaas.octavia import octavia_listener
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
PORTGROUP_PREFIX = 'dvportgroup' PORTGROUP_PREFIX = 'dvportgroup'
@ -226,6 +235,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
def __init__(self): def __init__(self):
self._is_sub_plugin = tvd_utils.is_tvd_core_plugin() self._is_sub_plugin = tvd_utils.is_tvd_core_plugin()
self.init_is_complete = False self.init_is_complete = False
self.octavia_listener = None
self.octavia_stats_collector = None
self.housekeeper = None self.housekeeper = None
super(NsxVPluginV2, self).__init__() super(NsxVPluginV2, self).__init__()
if self._is_sub_plugin: if self._is_sub_plugin:
@ -321,6 +332,10 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# Bind QoS notifications # Bind QoS notifications
qos_driver.register(self) qos_driver.register(self)
registry.subscribe(self.spawn_complete,
resources.PROCESS,
events.AFTER_SPAWN)
# subscribe the init complete method last, so it will be called only # subscribe the init complete method last, so it will be called only
# if init was successful # if init was successful
registry.subscribe(self.init_complete, registry.subscribe(self.init_complete,
@ -335,6 +350,16 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
def is_tvd_plugin(): def is_tvd_plugin():
return False return False
def spawn_complete(self, resource, event, trigger, payload=None):
# This method should run only once, but after init_complete
if not self.init_is_complete:
self.init_complete(None, None, None)
self.octavia_stats_collector = (
octavia_listener.NSXOctaviaStatisticsCollector(
self,
listener_mgr.stats_getter))
def init_complete(self, resource, event, trigger, payload=None): def init_complete(self, resource, event, trigger, payload=None):
with locking.LockManager.get_lock('plugin-init-complete'): with locking.LockManager.get_lock('plugin-init-complete'):
if self.init_is_complete: if self.init_is_complete:
@ -362,6 +387,18 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
hk_readonly=cfg.CONF.nsxv.housekeeping_readonly, hk_readonly=cfg.CONF.nsxv.housekeeping_readonly,
hk_readonly_jobs=cfg.CONF.nsxv.housekeeping_readonly_jobs) hk_readonly_jobs=cfg.CONF.nsxv.housekeeping_readonly_jobs)
# Init octavia listener and endpoints
self.octavia_listener = octavia_listener.NSXOctaviaListener(
loadbalancer=loadbalancer_mgr.EdgeLoadBalancerManagerFromDict(
self.nsx_v),
listener=listener_mgr.EdgeListenerManagerFromDict(self.nsx_v),
pool=pool_mgr.EdgePoolManagerFromDict(self.nsx_v),
member=member_mgr.EdgeMemberManagerFromDict(self.nsx_v),
healthmonitor=healthmon_mgr.EdgeHealthMonitorManagerFromDict(
self.nsx_v),
l7policy=l7policy_mgr.EdgeL7PolicyManagerFromDict(self.nsx_v),
l7rule=l7rule_mgr.EdgeL7RuleManagerFromDict(self.nsx_v))
self.init_is_complete = True self.init_is_complete = True
def _validate_nsx_version(self): def _validate_nsx_version(self):
@ -1851,7 +1888,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
def _assert_on_lb_port_admin_state(self, port_data, original_port, def _assert_on_lb_port_admin_state(self, port_data, original_port,
device_owner): device_owner):
if device_owner == constants.DEVICE_OWNER_LOADBALANCERV2: if device_owner in [constants.DEVICE_OWNER_LOADBALANCERV2,
oct_const.DEVICE_OWNER_OCTAVIA]:
orig_state = original_port.get("admin_state_up") orig_state = original_port.get("admin_state_up")
new_state = port_data.get("admin_state_up") new_state = port_data.get("admin_state_up")
if new_state is not None and (orig_state != new_state) and ( if new_state is not None and (orig_state != new_state) and (

View File

@ -116,7 +116,16 @@ from vmware_nsx.plugins.nsx_v3 import utils as v3_utils
from vmware_nsx.services.fwaas.common import utils as fwaas_utils from vmware_nsx.services.fwaas.common import utils as fwaas_utils
from vmware_nsx.services.fwaas.nsx_v3 import fwaas_callbacks_v1 from vmware_nsx.services.fwaas.nsx_v3 import fwaas_callbacks_v1
from vmware_nsx.services.fwaas.nsx_v3 import fwaas_callbacks_v2 from vmware_nsx.services.fwaas.nsx_v3 import fwaas_callbacks_v2
from vmware_nsx.services.lbaas.nsx_v3.implementation import healthmonitor_mgr
from vmware_nsx.services.lbaas.nsx_v3.implementation import l7policy_mgr
from vmware_nsx.services.lbaas.nsx_v3.implementation import l7rule_mgr
from vmware_nsx.services.lbaas.nsx_v3.implementation import listener_mgr
from vmware_nsx.services.lbaas.nsx_v3.implementation import loadbalancer_mgr
from vmware_nsx.services.lbaas.nsx_v3.implementation import member_mgr
from vmware_nsx.services.lbaas.nsx_v3.implementation import pool_mgr
from vmware_nsx.services.lbaas.nsx_v3.v2 import lb_driver_v2 from vmware_nsx.services.lbaas.nsx_v3.v2 import lb_driver_v2
from vmware_nsx.services.lbaas.octavia import constants as oct_const
from vmware_nsx.services.lbaas.octavia import octavia_listener
from vmware_nsx.services.qos.common import utils as qos_com_utils from vmware_nsx.services.qos.common import utils as qos_com_utils
from vmware_nsx.services.qos.nsx_v3 import driver as qos_driver from vmware_nsx.services.qos.nsx_v3 import driver as qos_driver
from vmware_nsx.services.trunk.nsx_v3 import driver as trunk_driver from vmware_nsx.services.trunk.nsx_v3 import driver as trunk_driver
@ -211,6 +220,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
self.fwaas_callbacks = None self.fwaas_callbacks = None
self._is_sub_plugin = tvd_utils.is_tvd_core_plugin() self._is_sub_plugin = tvd_utils.is_tvd_core_plugin()
self.init_is_complete = False self.init_is_complete = False
self.octavia_listener = None
self.octavia_stats_collector = None
nsxlib_utils.set_is_attr_callback(validators.is_attr_set) nsxlib_utils.set_is_attr_callback(validators.is_attr_set)
self._extend_fault_map() self._extend_fault_map()
if self._is_sub_plugin: if self._is_sub_plugin:
@ -293,6 +304,10 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
# Register NSXv3 trunk driver to support trunk extensions # Register NSXv3 trunk driver to support trunk extensions
self.trunk_driver = trunk_driver.NsxV3TrunkDriver.create(self) self.trunk_driver = trunk_driver.NsxV3TrunkDriver.create(self)
registry.subscribe(self.spawn_complete,
resources.PROCESS,
events.AFTER_SPAWN)
# subscribe the init complete method last, so it will be called only # subscribe the init complete method last, so it will be called only
# if init was successful # if init was successful
registry.subscribe(self.init_complete, registry.subscribe(self.init_complete,
@ -430,6 +445,16 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def is_tvd_plugin(): def is_tvd_plugin():
return False return False
def spawn_complete(self, resource, event, trigger, payload=None):
# This method should run only once, but after init_complete
if not self.init_is_complete:
self.init_complete(None, None, None)
self.octavia_stats_collector = (
octavia_listener.NSXOctaviaStatisticsCollector(
self,
listener_mgr.stats_getter))
def init_complete(self, resource, event, trigger, payload=None): def init_complete(self, resource, event, trigger, payload=None):
with locking.LockManager.get_lock('plugin-init-complete'): with locking.LockManager.get_lock('plugin-init-complete'):
if self.init_is_complete: if self.init_is_complete:
@ -451,8 +476,25 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
hk_readonly=cfg.CONF.nsx_v3.housekeeping_readonly, hk_readonly=cfg.CONF.nsx_v3.housekeeping_readonly,
hk_readonly_jobs=cfg.CONF.nsx_v3.housekeeping_readonly_jobs) hk_readonly_jobs=cfg.CONF.nsx_v3.housekeeping_readonly_jobs)
# Init octavia listener and endpoints
self._init_octavia()
self.init_is_complete = True self.init_is_complete = True
def _init_octavia(self):
if not self.nsxlib.feature_supported(
nsxlib_consts.FEATURE_LOAD_BALANCER):
return
self.octavia_listener = octavia_listener.NSXOctaviaListener(
loadbalancer=loadbalancer_mgr.EdgeLoadBalancerManagerFromDict(),
listener=listener_mgr.EdgeListenerManagerFromDict(),
pool=pool_mgr.EdgePoolManagerFromDict(),
member=member_mgr.EdgeMemberManagerFromDict(),
healthmonitor=healthmonitor_mgr.EdgeHealthMonitorManagerFromDict(),
l7policy=l7policy_mgr.EdgeL7PolicyManagerFromDict(),
l7rule=l7rule_mgr.EdgeL7RuleManagerFromDict())
def _extend_fault_map(self): def _extend_fault_map(self):
"""Extends the Neutron Fault Map. """Extends the Neutron Fault Map.
@ -4479,7 +4521,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
port_data = self.get_port(context, port_id) port_data = self.get_port(context, port_id)
device_owner = port_data.get('device_owner') device_owner = port_data.get('device_owner')
fip_address = new_fip['floating_ip_address'] fip_address = new_fip['floating_ip_address']
if device_owner == const.DEVICE_OWNER_LOADBALANCERV2: if (device_owner == const.DEVICE_OWNER_LOADBALANCERV2 or
device_owner == oct_const.DEVICE_OWNER_OCTAVIA):
try: try:
self._update_lb_vip(port_data, fip_address) self._update_lb_vip(port_data, fip_address)
except nsx_lib_exc.ManagerError: except nsx_lib_exc.ManagerError:
@ -4508,7 +4551,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
port_data = self.get_port(context, port_id) port_data = self.get_port(context, port_id)
device_owner = port_data.get('device_owner') device_owner = port_data.get('device_owner')
fixed_ip_address = fip['fixed_ip_address'] fixed_ip_address = fip['fixed_ip_address']
if device_owner == const.DEVICE_OWNER_LOADBALANCERV2: if (device_owner == const.DEVICE_OWNER_LOADBALANCERV2 or
device_owner == oct_const.DEVICE_OWNER_OCTAVIA):
# If the port is LB VIP port, after deleting the FIP, # If the port is LB VIP port, after deleting the FIP,
# update the virtual server VIP back to fixed IP. # update the virtual server VIP back to fixed IP.
is_lb_port = True is_lb_port = True
@ -4551,7 +4595,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
old_port_data = self.get_port(context, old_port_id) old_port_data = self.get_port(context, old_port_id)
old_device_owner = old_port_data['device_owner'] old_device_owner = old_port_data['device_owner']
old_fixed_ip = old_fip['fixed_ip_address'] old_fixed_ip = old_fip['fixed_ip_address']
if old_device_owner == const.DEVICE_OWNER_LOADBALANCERV2: if (old_device_owner == const.DEVICE_OWNER_LOADBALANCERV2 or
old_device_owner == oct_const.DEVICE_OWNER_OCTAVIA):
is_lb_port = True is_lb_port = True
self._update_lb_vip(old_port_data, old_fixed_ip) self._update_lb_vip(old_port_data, old_fixed_ip)
@ -4578,7 +4623,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
new_port_data = self.get_port(context, new_port_id) new_port_data = self.get_port(context, new_port_id)
new_dev_own = new_port_data['device_owner'] new_dev_own = new_port_data['device_owner']
new_fip_address = new_fip['floating_ip_address'] new_fip_address = new_fip['floating_ip_address']
if new_dev_own == const.DEVICE_OWNER_LOADBALANCERV2: if (new_dev_own == const.DEVICE_OWNER_LOADBALANCERV2 or
new_dev_own == oct_const.DEVICE_OWNER_OCTAVIA):
is_lb_port = True is_lb_port = True
self._update_lb_vip(new_port_data, new_fip_address) self._update_lb_vip(new_port_data, new_fip_address)

View File

@ -96,6 +96,10 @@ LB_STATS_MAP = {'active_connections': 'current_sessions',
'bytes_in': 'bytes_in', 'bytes_in': 'bytes_in',
'bytes_out': 'bytes_out', 'bytes_out': 'bytes_out',
'total_connections': 'total_sessions'} 'total_connections': 'total_sessions'}
LB_EMPTY_STATS = {'active_connections': 0,
'bytes_in': 0,
'bytes_out': 0,
'total_connections': 0}
LR_ROUTER_TYPE = 'os-neutron-router-id' LR_ROUTER_TYPE = 'os-neutron-router-id'
LR_PORT_TYPE = 'os-neutron-rport-id' LR_PORT_TYPE = 'os-neutron-rport-id'
LB_CERT_RESOURCE_TYPE = ['certificate_signed', 'certificate_self_signed'] LB_CERT_RESOURCE_TYPE = ['certificate_signed', 'certificate_self_signed']

View File

@ -32,12 +32,14 @@ def lb_listener_obj_to_dict(listener):
# Translate the LBaaS listener to a dictionary skipping the some objects # Translate the LBaaS listener to a dictionary skipping the some objects
# to avoid recursions # to avoid recursions
listener_dict = listener.to_dict(loadbalancer=False, default_pool=False) listener_dict = listener.to_dict(loadbalancer=False, default_pool=False)
# Translate the default pool separately without it's internal objects # Translate the default pool separately without it's internal objects
if listener.default_pool: if listener.default_pool:
listener_dict['default_pool'] = lb_pool_obj_to_dict( listener_dict['default_pool'] = lb_pool_obj_to_dict(
listener.default_pool, with_listeners=False) listener.default_pool, with_listeners=False)
else: else:
listener_dict['default_pool'] = None listener_dict['default_pool'] = None
if listener.loadbalancer: if listener.loadbalancer:
listener_dict['loadbalancer'] = lb_loadbalancer_obj_to_dict( listener_dict['loadbalancer'] = lb_loadbalancer_obj_to_dict(
listener.loadbalancer) listener.loadbalancer)

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import copy
from oslo_log import helpers as log_helpers from oslo_log import helpers as log_helpers
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import excutils from oslo_utils import excutils
@ -297,3 +299,45 @@ class EdgeListenerManagerFromDict(base_mgr.EdgeLoadbalancerBaseManager):
listener['id']) listener['id'])
completor(success=True) completor(success=True)
def stats_getter(context, core_plugin, ignore_list=None):
"""Update Octavia statistics for each listener (virtual server)"""
stat_list = []
vcns = core_plugin.nsx_v.vcns
# go over all LB edges
bindings = nsxv_db.get_nsxv_lbaas_loadbalancer_bindings(context.session)
for binding in bindings:
lb_id = binding['loadbalancer_id']
if ignore_list and lb_id in ignore_list:
continue
edge_id = binding['edge_id']
try:
lb_stats = vcns.get_loadbalancer_statistics(edge_id)
virtual_servers_stats = lb_stats[1].get('virtualServer', [])
for vs_stats in virtual_servers_stats:
# Get the stats of the virtual server
stats = copy.copy(lb_const.LB_EMPTY_STATS)
stats['bytes_in'] += vs_stats.get('bytesIn', 0)
stats['bytes_out'] += vs_stats.get('bytesOut', 0)
stats['active_connections'] += vs_stats.get('curSessions', 0)
stats['total_connections'] += vs_stats.get('totalSessions', 0)
stats['request_errors'] = 0 # currently unsupported
# Find the listener Id
vs_id = vs_stats.get('virtualServerId')
list_bind = nsxv_db.get_nsxv_lbaas_listener_binding_by_vse(
context.session, lb_id, vs_id)
if not list_bind:
continue
stats['id'] = list_bind['listener_id']
stat_list.append(stats)
except vcns_exc.VcnsApiException as e:
LOG.warning('Failed to read load balancer statistics for %s: %s',
edge_id, e)
return stat_list

View File

@ -31,6 +31,7 @@ from vmware_nsx.plugins.nsx_v.vshield.common import (
from vmware_nsx.plugins.nsx_v.vshield.common import exceptions as nsxv_exc from vmware_nsx.plugins.nsx_v.vshield.common import exceptions as nsxv_exc
from vmware_nsx.services.lbaas import base_mgr from vmware_nsx.services.lbaas import base_mgr
from vmware_nsx.services.lbaas.nsx_v import lbaas_common as lb_common from vmware_nsx.services.lbaas.nsx_v import lbaas_common as lb_common
from vmware_nsx.services.lbaas.octavia import constants as oct_const
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -159,33 +160,15 @@ class EdgeLoadBalancerManagerFromDict(base_mgr.EdgeLoadbalancerBaseManager):
completor(success=True) completor(success=True)
def refresh(self, context, lb): def refresh(self, context, lb):
# TODO(kobis): implememnt # TODO(kobis): implement
pass pass
def stats(self, context, lb): def stats(self, context, lb):
stats = {'bytes_in': 0,
'bytes_out': 0,
'active_connections': 0,
'total_connections': 0}
binding = nsxv_db.get_nsxv_lbaas_loadbalancer_binding(context.session, binding = nsxv_db.get_nsxv_lbaas_loadbalancer_binding(context.session,
lb['id']) lb['id'])
try: stats = _get_edge_loadbalancer_statistics(self.vcns,
lb_stats = self.vcns.get_loadbalancer_statistics( binding['edge_id'])
binding['edge_id'])
except nsxv_exc.VcnsApiException:
msg = (_('Failed to read load balancer statistics, edge: %s') %
binding['edge_id'])
raise n_exc.BadRequest(resource='edge-lbaas', msg=msg)
pools_stats = lb_stats[1].get('pool', [])
for pool_stats in pools_stats:
stats['bytes_in'] += pool_stats.get('bytesIn', 0)
stats['bytes_out'] += pool_stats.get('bytesOut', 0)
stats['active_connections'] += pool_stats.get('curSessions', 0)
stats['total_connections'] += pool_stats.get('totalSessions', 0)
return stats return stats
@ -208,12 +191,15 @@ class EdgeLoadBalancerManagerFromDict(base_mgr.EdgeLoadbalancerBaseManager):
subnet = self.core_plugin.get_subnet(context.elevated(), subnet_id) subnet = self.core_plugin.get_subnet(context.elevated(), subnet_id)
filters = {'fixed_ips': {'subnet_id': [subnet_id]}, filters = {'fixed_ips': {'subnet_id': [subnet_id]},
'device_owner': [constants.DEVICE_OWNER_LOADBALANCERV2]} 'device_owner': [constants.DEVICE_OWNER_LOADBALANCERV2,
oct_const.DEVICE_OWNER_OCTAVIA]}
lb_ports = self.core_plugin.get_ports(context.elevated(), lb_ports = self.core_plugin.get_ports(context.elevated(),
filters=filters) filters=filters)
if lb_ports: if lb_ports:
for lb_port in lb_ports: for lb_port in lb_ports:
# TODO(asarfaty): for Octavia this code might need to change
# as the device_id is different
if lb_port['device_id']: if lb_port['device_id']:
edge_bind = nsxv_db.get_nsxv_lbaas_loadbalancer_binding( edge_bind = nsxv_db.get_nsxv_lbaas_loadbalancer_binding(
context.session, lb_port['device_id']) context.session, lb_port['device_id'])
@ -238,3 +224,27 @@ class EdgeLoadBalancerManagerFromDict(base_mgr.EdgeLoadbalancerBaseManager):
if not found: if not found:
return False return False
return True return True
def _get_edge_loadbalancer_statistics(vcns, edge_id):
stats = {'bytes_in': 0,
'bytes_out': 0,
'active_connections': 0,
'total_connections': 0}
try:
lb_stats = vcns.get_loadbalancer_statistics(edge_id)
except nsxv_exc.VcnsApiException:
msg = (_('Failed to read load balancer statistics, edge: %s') %
edge_id)
raise n_exc.BadRequest(resource='edge-lbaas', msg=msg)
pools_stats = lb_stats[1].get('pool', [])
for pool_stats in pools_stats:
stats['bytes_in'] += pool_stats.get('bytesIn', 0)
stats['bytes_out'] += pool_stats.get('bytesOut', 0)
stats['active_connections'] += pool_stats.get('curSessions', 0)
stats['total_connections'] += pool_stats.get('totalSessions', 0)
return stats

View File

@ -46,6 +46,8 @@ class EdgeL7RuleManagerFromDict(base_mgr.Nsxv3LoadbalancerBaseManager):
lb_rule_id = binding['lb_rule_id'] lb_rule_id = binding['lb_rule_id']
if delete: if delete:
lb_utils.remove_rule_from_policy(rule) lb_utils.remove_rule_from_policy(rule)
else:
lb_utils.update_rule_in_policy(rule)
rule_body = lb_utils.convert_l7policy_to_lb_rule( rule_body = lb_utils.convert_l7policy_to_lb_rule(
context, rule['policy']) context, rule['policy'])
try: try:

View File

@ -203,3 +203,8 @@ def convert_l7policy_to_lb_rule(context, policy):
def remove_rule_from_policy(rule): def remove_rule_from_policy(rule):
l7rules = rule['policy']['rules'] l7rules = rule['policy']['rules']
rule['policy']['rules'] = [r for r in l7rules if r['id'] != rule['id']] rule['policy']['rules'] = [r for r in l7rules if r['id'] != rule['id']]
def update_rule_in_policy(rule):
remove_rule_from_policy(rule)
rule['policy']['rules'].append(rule)

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import copy
from neutron_lib import exceptions as n_exc from neutron_lib import exceptions as n_exc
from oslo_log import helpers as log_helpers from oslo_log import helpers as log_helpers
from oslo_log import log as logging from oslo_log import log as logging
@ -281,3 +283,42 @@ class EdgeListenerManagerFromDict(base_mgr.Nsxv3LoadbalancerBaseManager):
context.session, lb_id, listener['id']) context.session, lb_id, listener['id'])
completor(success=True) completor(success=True)
def stats_getter(context, core_plugin, ignore_list=None):
"""Update Octavia statistics for each listener (virtual server)"""
stat_list = []
lb_service_client = core_plugin.nsxlib.load_balancer.service
# Go over all the loadbalancers & services
lb_bindings = nsx_db.get_nsx_lbaas_loadbalancer_bindings(
context.session)
for lb_binding in lb_bindings:
if ignore_list and lb_binding['loadbalancer_id'] in ignore_list:
continue
lb_service_id = lb_binding.get('lb_service_id')
LOG.debug("Getting listeners statistics for NSX lb service %s",
lb_service_id)
try:
# get the NSX statistics for this LB service
rsp = lb_service_client.get_stats(lb_service_id)
if rsp and 'virtual_servers' in rsp:
# Go over each virtual server in the response
for vs in rsp['virtual_servers']:
# look up the virtual server in the DB
vs_bind = nsx_db.get_nsx_lbaas_listener_binding_by_vs_id(
context.session, vs['virtual_server_id'])
if vs_bind:
vs_stats = vs['statistics']
stats = copy.copy(lb_const.LB_EMPTY_STATS)
stats['id'] = vs_bind.listener_id
stats['request_errors'] = 0 # currently unsupported
for stat in lb_const.LB_STATS_MAP:
lb_stat = lb_const.LB_STATS_MAP[stat]
stats[stat] += vs_stats[lb_stat]
stat_list.append(stats)
except nsxlib_exc.ManagerError:
pass
return stat_list

View File

@ -192,10 +192,10 @@ class EdgeLoadBalancerManagerFromDict(base_mgr.Nsxv3LoadbalancerBaseManager):
for vs in vs_statuses.get('results', []): for vs in vs_statuses.get('results', []):
vs_status = self._nsx_status_to_lb_status(vs.get('status')) vs_status = self._nsx_status_to_lb_status(vs.get('status'))
vs_id = vs.get('virtual_server_id') vs_id = vs.get('virtual_server_id')
listener_binding = nsx_db.get_nsx_lbaas_listener_binding_by_vs( list_binding = nsx_db.get_nsx_lbaas_listener_binding_by_lb_and_vs(
context.session, id, vs_id) context.session, id, vs_id)
if listener_binding: if list_binding:
listener_id = listener_binding['listener_id'] listener_id = list_binding['listener_id']
statuses[lb_const.LISTENERS].append( statuses[lb_const.LISTENERS].append(
{'id': listener_id, 'status': vs_status}) {'id': listener_id, 'status': vs_status})

View File

@ -65,10 +65,15 @@ class EdgeMemberManagerFromDict(base_mgr.Nsxv3LoadbalancerBaseManager):
tenant_id, context.project_name) tenant_id, context.project_name)
attachment = {'target_id': nsx_router_id, attachment = {'target_id': nsx_router_id,
'target_type': 'LogicalRouter'} 'target_type': 'LogicalRouter'}
lb_service = service_client.create(display_name=lb_name, try:
tags=tags, lb_service = service_client.create(display_name=lb_name,
attachment=attachment, tags=tags,
size=lb_size) attachment=attachment,
size=lb_size)
except nsxlib_exc.ManagerError as e:
LOG.error("Failed to create LB service: %s", e)
return
# Update router to enable advertise_lb_vip flag # Update router to enable advertise_lb_vip flag
self.core_plugin.nsxlib.logical_router.update_advertisement( self.core_plugin.nsxlib.logical_router.update_advertisement(
nsx_router_id, advertise_lb_vip=True) nsx_router_id, advertise_lb_vip=True)

View File

@ -16,8 +16,8 @@
from neutron_lib.callbacks import events from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources from neutron_lib.callbacks import resources
from neutron_lib import constants as n_consts
from neutron_lib import exceptions as n_exc from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import constants as plugin_const
from oslo_log import helpers as log_helpers from oslo_log import helpers as log_helpers
from oslo_log import log as logging from oslo_log import log as logging
@ -33,6 +33,7 @@ from vmware_nsx.services.lbaas.nsx_v3.implementation import listener_mgr
from vmware_nsx.services.lbaas.nsx_v3.implementation import loadbalancer_mgr from vmware_nsx.services.lbaas.nsx_v3.implementation import loadbalancer_mgr
from vmware_nsx.services.lbaas.nsx_v3.implementation import member_mgr from vmware_nsx.services.lbaas.nsx_v3.implementation import member_mgr
from vmware_nsx.services.lbaas.nsx_v3.implementation import pool_mgr from vmware_nsx.services.lbaas.nsx_v3.implementation import pool_mgr
from vmware_nsx.services.lbaas.octavia import constants as oct_const
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -108,6 +109,9 @@ class EdgeLoadbalancerDriverV2(base_mgr.LoadbalancerBaseManager):
# Check if there is any LB attachment for the NSX router. # Check if there is any LB attachment for the NSX router.
# This callback is subscribed here to prevent router/GW/interface # This callback is subscribed here to prevent router/GW/interface
# deletion if it still has LB service attached to it. # deletion if it still has LB service attached to it.
#Note(asarfaty): Those callbacks are used by Octavia as well even
# though they are bound only here
registry.subscribe(self._check_lb_service_on_router, registry.subscribe(self._check_lb_service_on_router,
resources.ROUTER, events.BEFORE_DELETE) resources.ROUTER, events.BEFORE_DELETE)
registry.subscribe(self._check_lb_service_on_router, registry.subscribe(self._check_lb_service_on_router,
@ -124,8 +128,9 @@ class EdgeLoadbalancerDriverV2(base_mgr.LoadbalancerBaseManager):
resources.ROUTER_INTERFACE, events.BEFORE_DELETE) resources.ROUTER_INTERFACE, events.BEFORE_DELETE)
def _get_lb_ports(self, context, subnet_ids): def _get_lb_ports(self, context, subnet_ids):
dev_owner = 'neutron:' + plugin_const.LOADBALANCERV2 dev_owner_v2 = n_consts.DEVICE_OWNER_LOADBALANCERV2
filters = {'device_owner': [dev_owner], dev_owner_oct = oct_const.DEVICE_OWNER_OCTAVIA
filters = {'device_owner': [dev_owner_v2, dev_owner_oct],
'fixed_ips': {'subnet_id': subnet_ids}} 'fixed_ips': {'subnet_id': subnet_ids}}
return self.loadbalancer.core_plugin.get_ports( return self.loadbalancer.core_plugin.get_ports(
context, filters=filters) context, filters=filters)

View File

@ -0,0 +1,45 @@
# Copyright 2018 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
OCTAVIA_TO_DRIVER_TOPIC = 'vmware_nsx__lb_listener'
DRIVER_TO_OCTAVIA_TOPIC = 'vmware_nsx__driver_listener'
LOADBALANCER = 'loadbalancer'
LISTENER = 'listener'
POOL = 'pool'
HEALTHMONITOR = 'healthmonitor'
MEMBER = 'member'
L7POLICY = 'l7policy'
L7RULE = 'l7rule'
LOADBALANCERS = 'loadbalancers'
LISTENERS = 'listeners'
POOLS = 'pools'
HEALTHMONITORS = 'healthmonitors'
MEMBERS = 'members'
L7POLICIES = 'l7policies'
L7RULES = 'l7rules'
ONLINE = 'ONLINE'
OFFLINE = 'OFFLINE'
ERROR = 'ERROR'
ACTIVE = 'ACTIVE'
DELETED = 'DELETED'
ERROR = 'ERROR'
OPERATING_STATUS = 'operating_status'
PROVISIONING_STATUS = 'provisioning_status'
DEVICE_OWNER_OCTAVIA = 'Octavia'

View File

@ -0,0 +1,507 @@
# Copyright 2018 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import socket
from oslo_config import cfg
from oslo_log import helpers as log_helpers
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
import pecan
from stevedore import driver as stevedore_driver
from octavia.api.drivers import driver_lib
from octavia.api.drivers import exceptions
from octavia.api.drivers import provider_base as driver_base
from octavia.api.drivers import utils as oct_utils
from octavia.db import api as db_apis
from octavia.db import repositories
from vmware_nsx.services.lbaas.octavia import constants as d_const
LOG = logging.getLogger(__name__)
cfg.CONF.import_group('oslo_messaging', 'octavia.common.config')
# List of keys per object type that will not be sent to the listener
unsupported_keys = {'Loadbalancer': ['vip_qos_policy_id'],
'Listener': ['sni_container_refs',
'insert_headers',
'timeout_client_data',
'timeout_member_connect',
'timeout_member_data',
'timeout_tcp_inspect'],
'HealthMonitor': ['expected_codes', 'max_retries_down'],
'Member': ['monitor_address', 'monitor_port', 'backup']}
class NSXOctaviaDriver(driver_base.ProviderDriver):
@log_helpers.log_method_call
def __init__(self):
super(NSXOctaviaDriver, self).__init__()
self._init_rpc_messaging()
self._init_rpc_listener()
self._init_cert_manager()
self.repositories = repositories.Repositories()
@log_helpers.log_method_call
def _init_rpc_messaging(self):
topic = d_const.OCTAVIA_TO_DRIVER_TOPIC
transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic=topic, exchange="common",
namespace='control', fanout=False,
version='1.0')
self.client = messaging.RPCClient(transport, target)
@log_helpers.log_method_call
def _init_rpc_listener(self):
# Initialize RPC listener
topic = d_const.DRIVER_TO_OCTAVIA_TOPIC
server = socket.gethostname()
transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic=topic, server=server,
exchange="common", fanout=False)
endpoints = [NSXOctaviaDriverEndpoint()]
access_policy = dispatcher.DefaultRPCAccessPolicy
self.octavia_server = messaging.get_rpc_server(
transport, target, endpoints, executor='threading',
access_policy=access_policy)
self.octavia_server.start()
@log_helpers.log_method_call
def _init_cert_manager(self):
self.cert_manager = stevedore_driver.DriverManager(
namespace='octavia.cert_manager',
name=cfg.CONF.certificates.cert_manager,
invoke_on_load=True).driver
def get_obj_project_id(self, obj_type, obj_dict):
if obj_dict.get('project_id'):
return obj_dict['project_id']
if obj_dict.get('tenant_id'):
return obj_dict['tenant_id']
# look for the project id of the attached objects
project_id = None
if obj_dict.get('loadbalancer_id'):
db_lb = self.repositories.load_balancer.get(
db_apis.get_session(), id=obj_dict['loadbalancer_id'])
if db_lb:
project_id = db_lb.project_id
if not project_id and obj_dict.get('pool_id'):
db_pool = self.repositories.pool.get(
db_apis.get_session(), id=obj_dict['pool_id'])
if db_pool:
project_id = db_pool.load_balancer.project_id
if not project_id and obj_dict.get('listener_id'):
db_list = self.repositories.listener.get(
db_apis.get_session(), id=obj_dict['listener_id'])
if db_list:
project_id = db_list.load_balancer.project_id
if not project_id and obj_dict.get('l7policy_id'):
db_policy = self.repositories.l7policy.get(
db_apis.get_session(), id=obj_dict['l7policy_id'])
if db_policy:
if db_policy.listener:
db_lb = db_policy.listener.load_balancer
elif db_policy.redirect_pool:
db_lb = db_policy.redirect_pool.load_balancer
if db_lb:
project_id = db_lb.project_id
if not project_id:
LOG.warning("Could bot find the tenant id for %(type)s "
"%(obj)s", {'type': obj_type, 'obj': obj_dict})
return project_id
def _get_load_balancer_dict(self, loadbalancer_id):
if not loadbalancer_id:
return
db_lb = self.repositories.load_balancer.get(
db_apis.get_session(), id=loadbalancer_id)
if not db_lb:
return
lb_dict = {'name': db_lb.name, 'id': loadbalancer_id}
if db_lb.vip:
lb_dict['vip_port_id'] = db_lb.vip.port_id
lb_dict['vip_address'] = db_lb.vip.ip_address
lb_dict['vip_port_id'] = db_lb.vip.port_id
lb_dict['vip_network_id'] = db_lb.vip.network_id
lb_dict['vip_subnet_id'] = db_lb.vip.subnet_id
return lb_dict
def _get_listener_in_pool_dict(self, pool_dict):
if 'listener' not in pool_dict:
if pool_dict.get('listener_id'):
db_listener = self.repositories.listener.get(
db_apis.get_session(), id=pool_dict['listener_id'])
listener_obj = oct_utils.db_listener_to_provider_listener(
db_listener)
listener_dict = listener_obj.to_dict(
recurse=False, render_unsets=True)
listener_dict['id'] = listener_dict['listener_id']
listener_dict['l7_policies'] = listener_dict['l7policies']
pool_dict['listener'] = listener_dict
if 'listeners' not in pool_dict:
# multiple listeners is not really supported yet
pool_dict['listeners'] = [listener_dict]
else:
pool_dict['listener'] = None
if 'listeners' not in pool_dict:
pool_dict['listeners'] = []
def _get_pool_dict(self, pool_id):
if not pool_id:
return
db_pool = self.repositories.pool.get(db_apis.get_session(), id=pool_id)
if not db_pool:
return
pool_obj = oct_utils.db_pool_to_provider_pool(db_pool)
pool_dict = pool_obj.to_dict(recurse=True, render_unsets=True)
pool_dict['id'] = pool_id
# Get the load balancer object
if pool_dict.get('loadbalancer_id'):
# Generate a loadbalancer object
pool_dict['loadbalancer'] = self._get_load_balancer_dict(
pool_dict['loadbalancer_id'])
if 'listener' not in pool_dict:
self._get_listener_in_pool_dict(pool_dict)
return pool_dict
def update_policy_dict(self, policy_dict, policy_obj, is_update=False):
if policy_dict.get('listener_id'):
db_list = self.repositories.listener.get(
db_apis.get_session(), id=policy_dict['listener_id'])
list_obj = oct_utils.db_listener_to_provider_listener(db_list)
list_dict = list_obj.to_dict(recurse=True, render_unsets=True)
list_dict['id'] = policy_dict['listener_id']
policy_dict['listener'] = list_dict
if policy_obj.rules:
policy_dict['rules'] = []
for rule in policy_obj.rules:
rule_dict = rule.to_dict(recurse=False, render_unsets=True)
rule_dict['id'] = rule_dict['l7rule_id']
policy_dict['rules'].append(rule_dict)
elif not is_update:
policy_dict['rules'] = []
def _remove_unsupported_keys(self, obj_type, obj_dict):
for key in unsupported_keys.get(obj_type, []):
if key in obj_dict:
if obj_dict.get(key):
LOG.warning("Ignoring %(key)s:%(val)s in %(type)s as the "
"NSX plugin does not currently support it",
{'key': key, 'val': obj_dict[key],
'type': obj_type})
del obj_dict[key]
def obj_to_dict(self, obj, is_update=False, project_id=None):
obj_type = obj.__class__.__name__
# create a dictionary out of the object
render_unsets = False if is_update else True
obj_dict = obj.to_dict(recurse=True, render_unsets=render_unsets)
# Update the dictionary to match what the nsx driver expects
if not project_id:
project_id = self.get_obj_project_id(obj_type, obj_dict)
obj_dict['tenant_id'] = obj_dict['project_id'] = project_id
if 'id' not in obj_dict:
obj_dict['id'] = obj_dict.get('%s_id' % obj_type.lower())
if not obj_dict.get('name') and not is_update:
obj_dict['name'] = ""
self._remove_unsupported_keys(obj_type, obj_dict)
if obj_type == 'LoadBalancer':
# clean listeners and pools for update case:
if 'listeners' in obj_dict:
if is_update and not obj_dict['listeners']:
del obj_dict['listeners']
else:
if obj_dict['listeners'] is None:
obj_dict['listeners'] = []
for listener in obj_dict['listeners']:
listener['id'] = listener['listener_id']
if 'pools' in obj_dict:
if is_update and not obj_dict['pools']:
del obj_dict['pools']
else:
if obj_dict['pools'] is None:
obj_dict['pools'] = []
for pool in obj_dict['pools']:
pool['id'] = pool['pool_id']
elif obj_type == 'Listener':
if 'l7policies' in obj_dict:
obj_dict['l7_policies'] = obj_dict['l7policies']
if obj_dict.get('loadbalancer_id'):
# Generate a loadbalancer object
obj_dict['loadbalancer'] = self._get_load_balancer_dict(
obj_dict['loadbalancer_id'])
# TODO(asarfaty): add default_tls_container_id
elif obj_type == 'Pool':
if 'listener' not in obj_dict:
self._get_listener_in_pool_dict(obj_dict)
elif obj_type == 'Member':
# Get the pool object
if obj_dict.get('pool_id'):
obj_dict['pool'] = self._get_pool_dict(obj_dict['pool_id'])
obj_dict['loadbalancer'] = None
if 'loadbalancer' in obj_dict['pool']:
obj_dict['loadbalancer'] = obj_dict['pool']['loadbalancer']
if not obj_dict.get('subnet_id'):
# Use the parent vip_subnet_id instead
obj_dict['subnet_id'] = obj_dict['loadbalancer'][
'vip_subnet_id']
else:
obj_dict['pool'] = None
obj_dict['loadbalancer'] = None
elif obj_type == 'HealthMonitor':
# Get the pool object
if obj_dict.get('pool_id'):
obj_dict['pool'] = self._get_pool_dict(obj_dict['pool_id'])
elif obj_type == 'L7Policy':
self.update_policy_dict(obj_dict, obj, is_update=is_update)
elif obj_type == 'L7Rule':
# Get the L7 policy object
if obj_dict.get('l7policy_id'):
db_policy = self.repositories.l7policy.get(
db_apis.get_session(), id=obj_dict['l7policy_id'])
policy_obj = oct_utils.db_l7policy_to_provider_l7policy(
db_policy)
policy_dict = policy_obj.to_dict(
recurse=True, render_unsets=True)
policy_dict['id'] = obj_dict['l7policy_id']
self.update_policy_dict(
policy_dict, policy_obj, is_update=is_update)
obj_dict['policy'] = policy_dict
LOG.debug("Translated %(type)s to dictionary: %(obj)s",
{'type': obj_type, 'obj': obj_dict})
return obj_dict
# Load Balancer
@log_helpers.log_method_call
def create_vip_port(self, loadbalancer_id, project_id, vip_dictionary):
raise exceptions.NotImplementedError()
@log_helpers.log_method_call
def loadbalancer_create(self, loadbalancer):
kw = {'loadbalancer': self.obj_to_dict(loadbalancer)}
self.client.cast({}, 'loadbalancer_create', **kw)
@log_helpers.log_method_call
def loadbalancer_delete(self, loadbalancer, cascade=False):
if cascade:
# TODO(asarfaty) add support for cascade
LOG.warning("The NSX Octavia driver does not support loadbalancer "
"delete cascade")
raise exceptions.NotImplementedError()
kw = {'loadbalancer': self.obj_to_dict(loadbalancer),
'cascade': cascade}
self.client.cast({}, 'loadbalancer_delete', **kw)
@log_helpers.log_method_call
def loadbalancer_failover(self, loadbalancer_id):
LOG.error('Loadbalancer failover is handled by platform')
raise exceptions.NotImplementedError()
@log_helpers.log_method_call
def loadbalancer_update(self, old_loadbalancer, new_loadbalancer):
old_dict = self.obj_to_dict(old_loadbalancer)
new_dict = copy.deepcopy(old_dict)
new_dict.update(self.obj_to_dict(
new_loadbalancer, is_update=True,
project_id=old_dict.get('project_id')))
kw = {'old_loadbalancer': old_dict,
'new_loadbalancer': new_dict}
self.client.cast({}, 'loadbalancer_update', **kw)
# Listener
@log_helpers.log_method_call
def listener_create(self, listener):
cert = None
dict_list = self.obj_to_dict(listener)
if dict_list.get('tls_certificate_id'):
context = pecan.request.context.get('octavia_context')
cert = self.cert_manager.get_cert(context,
dict_list['tls_certificate_id'])
kw = {'listener': dict_list, 'cert': cert}
self.client.cast({}, 'listener_create', **kw)
@log_helpers.log_method_call
def listener_delete(self, listener):
kw = {'listener': self.obj_to_dict(listener)}
self.client.cast({}, 'listener_delete', **kw)
@log_helpers.log_method_call
def listener_update(self, old_listener, new_listener):
old_dict = self.obj_to_dict(old_listener)
new_dict = copy.deepcopy(old_dict)
new_dict.update(self.obj_to_dict(
new_listener, is_update=True,
project_id=old_dict.get('project_id')))
cert = None
if new_dict.get('tls_certificate_id'):
context = pecan.request.context.get('octavia_context')
cert = self.cert_manager.get_cert(context,
new_dict['tls_certificate_id'])
kw = {'old_listener': old_dict,
'new_listener': new_dict,
'cert': cert}
self.client.cast({}, 'listener_update', **kw)
# Pool
@log_helpers.log_method_call
def pool_create(self, pool):
kw = {'pool': self.obj_to_dict(pool)}
self.client.cast({}, 'pool_create', **kw)
@log_helpers.log_method_call
def pool_delete(self, pool):
kw = {'pool': self.obj_to_dict(pool)}
self.client.cast({}, 'pool_delete', **kw)
@log_helpers.log_method_call
def pool_update(self, old_pool, new_pool):
old_dict = self.obj_to_dict(old_pool)
new_dict = copy.deepcopy(old_dict)
new_dict.update(self.obj_to_dict(
new_pool, is_update=True, project_id=old_dict.get('project_id')))
kw = {'old_pool': old_dict,
'new_pool': new_dict}
self.client.cast({}, 'pool_update', **kw)
# Member
@log_helpers.log_method_call
def member_create(self, member):
kw = {'member': self.obj_to_dict(member)}
self.client.cast({}, 'member_create', **kw)
@log_helpers.log_method_call
def member_delete(self, member):
kw = {'member': self.obj_to_dict(member)}
self.client.cast({}, 'member_delete', **kw)
@log_helpers.log_method_call
def member_update(self, old_member, new_member):
old_dict = self.obj_to_dict(old_member)
new_dict = copy.deepcopy(old_dict)
new_dict.update(self.obj_to_dict(
new_member, is_update=True, project_id=old_dict.get('project_id')))
kw = {'old_member': old_dict,
'new_member': new_dict}
self.client.cast({}, 'member_update', **kw)
@log_helpers.log_method_call
def member_batch_update(self, members):
raise NotImplementedError()
# Health Monitor
@log_helpers.log_method_call
def health_monitor_create(self, healthmonitor):
kw = {'healthmonitor': self.obj_to_dict(healthmonitor)}
self.client.cast({}, 'healthmonitor_create', **kw)
@log_helpers.log_method_call
def health_monitor_delete(self, healthmonitor):
kw = {'healthmonitor': self.obj_to_dict(healthmonitor)}
self.client.cast({}, 'healthmonitor_delete', **kw)
@log_helpers.log_method_call
def health_monitor_update(self, old_healthmonitor, new_healthmonitor):
old_dict = self.obj_to_dict(old_healthmonitor)
new_dict = copy.deepcopy(old_dict)
new_dict.update(self.obj_to_dict(
new_healthmonitor, is_update=True,
project_id=old_dict.get('project_id')))
kw = {'old_healthmonitor': old_dict,
'new_healthmonitor': new_dict}
self.client.cast({}, 'healthmonitor_update', **kw)
# L7 Policy
@log_helpers.log_method_call
def l7policy_create(self, l7policy):
kw = {'l7policy': self.obj_to_dict(l7policy)}
self.client.cast({}, 'l7policy_create', **kw)
@log_helpers.log_method_call
def l7policy_delete(self, l7policy):
kw = {'l7policy': self.obj_to_dict(l7policy)}
self.client.cast({}, 'l7policy_delete', **kw)
@log_helpers.log_method_call
def l7policy_update(self, old_l7policy, new_l7policy):
old_dict = self.obj_to_dict(old_l7policy)
new_dict = copy.deepcopy(old_dict)
new_dict.update(self.obj_to_dict(
new_l7policy, is_update=True,
project_id=old_dict.get('project_id')))
kw = {'old_l7policy': old_dict,
'new_l7policy': new_dict}
self.client.cast({}, 'l7policy_update', **kw)
# L7 Rule
@log_helpers.log_method_call
def l7rule_create(self, l7rule):
kw = {'l7rule': self.obj_to_dict(l7rule)}
self.client.cast({}, 'l7rule_create', **kw)
@log_helpers.log_method_call
def l7rule_delete(self, l7rule):
kw = {'l7rule': self.obj_to_dict(l7rule)}
self.client.cast({}, 'l7rule_delete', **kw)
@log_helpers.log_method_call
def l7rule_update(self, old_l7rule, new_l7rule):
old_dict = self.obj_to_dict(old_l7rule)
new_dict = copy.deepcopy(old_dict)
new_dict.update(self.obj_to_dict(
new_l7rule, is_update=True, project_id=old_dict.get('project_id')))
kw = {'old_l7rule': old_dict,
'new_l7rule': new_dict}
self.client.cast({}, 'l7rule_update', **kw)
# Flavor
@log_helpers.log_method_call
def get_supported_flavor_metadata(self):
raise exceptions.NotImplementedError()
@log_helpers.log_method_call
def validate_flavor(self, flavor_metadata):
raise exceptions.NotImplementedError()
class NSXOctaviaDriverEndpoint(driver_lib.DriverLibrary):
target = messaging.Target(namespace="control", version='1.0')
@log_helpers.log_method_call
def update_loadbalancer_status(self, ctxt, status):
return super(NSXOctaviaDriverEndpoint,
self).update_loadbalancer_status(status)
@log_helpers.log_method_call
def update_listener_statistics(self, ctxt, statistics):
return super(NSXOctaviaDriverEndpoint,
self).update_listener_statistics(statistics)

View File

@ -0,0 +1,368 @@
# Copyright 2018 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import time
import eventlet
from neutron_lib import context as neutron_context
from oslo_config import cfg
from oslo_log import helpers as log_helpers
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from neutron_lbaas.db.loadbalancer import models
from vmware_nsx.services.lbaas.octavia import constants
LOG = logging.getLogger(__name__)
class NSXOctaviaListener(object):
@log_helpers.log_method_call
def __init__(self, loadbalancer=None, listener=None, pool=None,
member=None, healthmonitor=None, l7policy=None, l7rule=None):
self._init_rpc_messaging()
self._init_rpc_listener(healthmonitor, l7policy, l7rule, listener,
loadbalancer, member, pool)
def _init_rpc_messaging(self):
topic = constants.DRIVER_TO_OCTAVIA_TOPIC
transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic=topic, exchange="common",
namespace='control', fanout=False,
version='1.0')
self.client = messaging.RPCClient(transport, target)
def _init_rpc_listener(self, healthmonitor, l7policy, l7rule, listener,
loadbalancer, member, pool):
# Initialize RPC listener
topic = constants.OCTAVIA_TO_DRIVER_TOPIC
server = socket.gethostname()
transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic=topic, server=server,
exchange="common", fanout=False)
self.endpoints = [NSXOctaviaListenerEndpoint(
client=self.client, loadbalancer=loadbalancer, listener=listener,
pool=pool, member=member, healthmonitor=healthmonitor,
l7policy=l7policy, l7rule=l7rule)]
access_policy = dispatcher.DefaultRPCAccessPolicy
self.octavia_server = messaging.get_rpc_server(
transport, target, self.endpoints, executor='eventlet',
access_policy=access_policy)
self.octavia_server.start()
class NSXOctaviaListenerEndpoint(object):
target = messaging.Target(namespace="control", version='1.0')
def __init__(self, client=None, loadbalancer=None, listener=None,
pool=None, member=None, healthmonitor=None, l7policy=None,
l7rule=None):
self.client = client
self.loadbalancer = loadbalancer
self.listener = listener
self.pool = pool
self.member = member
self.healthmonitor = healthmonitor
self.l7policy = l7policy
self.l7rule = l7rule
def get_completor_func(self, obj_type, obj, delete=False):
# return a method that will be called on success/failure completion
def completor_func(success=True):
LOG.debug("Octavia transaction completed. status %s",
'success' if success else 'failure')
# calculate the provisioning and operating statuses
main_prov_status = constants.ACTIVE
parent_prov_status = constants.ACTIVE
if not success:
main_prov_status = constants.ERROR
parent_prov_status = constants.ERROR
elif delete:
main_prov_status = constants.DELETED
op_status = constants.ONLINE if success else constants.ERROR
# add the status of the created/deleted/updated object
status_dict = {
obj_type: [{
'id': obj['id'],
constants.PROVISIONING_STATUS: main_prov_status,
constants.OPERATING_STATUS: op_status}]}
# Get all its parents, and update their statuses as well
loadbalancer_id = None
listener_id = None
pool_id = None
policy_id = None
if obj_type != constants.LOADBALANCERS:
loadbalancer_id = None
if obj.get('loadbalancer_id'):
loadbalancer_id = obj.get('loadbalancer_id')
elif obj.get('pool'):
pool_id = obj['pool']['id']
loadbalancer_id = obj['pool']['loadbalancer_id']
elif obj.get('listener'):
listener_id = obj['listener']['id']
loadbalancer_id = obj['listener']['loadbalancer_id']
elif obj.get('policy') and obj['policy'].get('listener'):
policy_id = obj['policy']['id']
listener_id = obj['policy']['listener']['id']
loadbalancer_id = obj['policy']['listener'][
'loadbalancer_id']
if loadbalancer_id:
status_dict[constants.LOADBALANCERS] = [{
'id': loadbalancer_id,
constants.PROVISIONING_STATUS: parent_prov_status,
constants.OPERATING_STATUS: op_status}]
if listener_id:
status_dict[constants.LISTENERS] = [{
'id': listener_id,
constants.PROVISIONING_STATUS: parent_prov_status,
constants.OPERATING_STATUS: op_status}]
if pool_id:
status_dict[constants.POOLS] = [{
'id': pool_id,
constants.PROVISIONING_STATUS: parent_prov_status,
constants.OPERATING_STATUS: op_status}]
if policy_id:
status_dict[constants.L7POLICIES] = [{
'id': policy_id,
constants.PROVISIONING_STATUS: parent_prov_status,
constants.OPERATING_STATUS: op_status}]
kw = {'status': status_dict}
self.client.cast({}, 'update_loadbalancer_status', **kw)
return completor_func
def update_listener_statistics(self, statistics):
kw = {'statistics': statistics}
self.client.cast({}, 'update_listener_statistics', **kw)
@log_helpers.log_method_call
def loadbalancer_create(self, ctxt, loadbalancer):
ctx = neutron_context.Context(None, loadbalancer['project_id'])
self.loadbalancer.create(
ctx, loadbalancer,
self.get_completor_func(constants.LOADBALANCERS,
loadbalancer))
@log_helpers.log_method_call
def loadbalancer_delete(self, ctxt, loadbalancer, cascade=False):
ctx = neutron_context.Context(None, loadbalancer['project_id'])
# TODO(asarfaty): No support for cascade. It is blocked by the driver
self.loadbalancer.delete(
ctx, loadbalancer,
self.get_completor_func(constants.LOADBALANCERS,
loadbalancer,
delete=True))
@log_helpers.log_method_call
def loadbalancer_update(self, ctxt, old_loadbalancer, new_loadbalancer):
ctx = neutron_context.Context(None, old_loadbalancer['project_id'])
self.loadbalancer.update(
ctx, old_loadbalancer, new_loadbalancer,
self.get_completor_func(constants.LOADBALANCERS,
new_loadbalancer))
# Listener
@log_helpers.log_method_call
def listener_create(self, ctxt, listener, cert):
ctx = neutron_context.Context(None, listener['project_id'])
self.listener.create(ctx, listener,
self.get_completor_func(constants.LISTENERS,
listener),
certificate=cert)
@log_helpers.log_method_call
def listener_delete(self, ctxt, listener):
ctx = neutron_context.Context(None, listener['project_id'])
self.listener.delete(ctx, listener,
self.get_completor_func(constants.LISTENERS,
listener,
delete=True))
@log_helpers.log_method_call
def listener_update(self, ctxt, old_listener, new_listener, cert):
ctx = neutron_context.Context(None, old_listener['project_id'])
self.listener.update(ctx, old_listener, new_listener,
self.get_completor_func(constants.LISTENERS,
new_listener),
certificate=cert)
# Pool
@log_helpers.log_method_call
def pool_create(self, ctxt, pool):
ctx = neutron_context.Context(None, pool['project_id'])
self.pool.create(ctx, pool, self.get_completor_func(constants.POOLS,
pool))
@log_helpers.log_method_call
def pool_delete(self, ctxt, pool):
ctx = neutron_context.Context(None, pool['project_id'])
self.pool.delete(ctx, pool, self.get_completor_func(constants.POOLS,
pool,
delete=True))
@log_helpers.log_method_call
def pool_update(self, ctxt, old_pool, new_pool):
ctx = neutron_context.Context(None, old_pool['project_id'])
self.pool.update(ctx, old_pool, new_pool,
self.get_completor_func(constants.POOLS, new_pool))
# Member
@log_helpers.log_method_call
def member_create(self, ctxt, member):
ctx = neutron_context.Context(None, member['project_id'])
self.member.create(ctx, member,
self.get_completor_func(constants.MEMBERS,
member))
@log_helpers.log_method_call
def member_delete(self, ctxt, member):
ctx = neutron_context.Context(None, member['project_id'])
self.member.delete(ctx, member,
self.get_completor_func(constants.MEMBERS,
member,
delete=True))
@log_helpers.log_method_call
def member_update(self, ctxt, old_member, new_member):
ctx = neutron_context.Context(None, old_member['project_id'])
self.member.update(ctx, old_member, new_member,
self.get_completor_func(constants.MEMBERS,
new_member))
# Health Monitor
@log_helpers.log_method_call
def healthmonitor_create(self, ctxt, healthmonitor):
ctx = neutron_context.Context(None, healthmonitor['project_id'])
self.healthmonitor.create(ctx, healthmonitor,
self.get_completor_func(
constants.HEALTHMONITORS, healthmonitor))
@log_helpers.log_method_call
def healthmonitor_delete(self, ctxt, healthmonitor):
ctx = neutron_context.Context(None, healthmonitor['project_id'])
self.healthmonitor.delete(ctx, healthmonitor,
self.get_completor_func(
constants.HEALTHMONITORS, healthmonitor,
delete=True))
@log_helpers.log_method_call
def healthmonitor_update(self, ctxt, old_healthmonitor, new_healthmonitor):
ctx = neutron_context.Context(None, old_healthmonitor['project_id'])
self.healthmonitor.update(ctx, old_healthmonitor, new_healthmonitor,
self.get_completor_func(
constants.HEALTHMONITORS,
new_healthmonitor))
# L7 Policy
@log_helpers.log_method_call
def l7policy_create(self, ctxt, l7policy):
ctx = neutron_context.Context(None, l7policy['project_id'])
self.l7policy.create(ctx, l7policy,
self.get_completor_func(constants.L7POLICIES,
l7policy))
@log_helpers.log_method_call
def l7policy_delete(self, ctxt, l7policy):
ctx = neutron_context.Context(None, l7policy['project_id'])
self.l7policy.delete(ctx, l7policy,
self.get_completor_func(constants.L7POLICIES,
l7policy,
delete=True))
@log_helpers.log_method_call
def l7policy_update(self, ctxt, old_l7policy, new_l7policy):
ctx = neutron_context.Context(None, old_l7policy['project_id'])
self.l7policy.update(ctx, old_l7policy, new_l7policy,
self.get_completor_func(constants.L7POLICIES,
new_l7policy))
# L7 Rule
@log_helpers.log_method_call
def l7rule_create(self, ctxt, l7rule):
ctx = neutron_context.Context(None, l7rule['project_id'])
self.l7rule.create(ctx, l7rule,
self.get_completor_func(constants.L7RULES, l7rule))
@log_helpers.log_method_call
def l7rule_delete(self, ctxt, l7rule):
ctx = neutron_context.Context(None, l7rule['project_id'])
self.l7rule.delete(ctx, l7rule,
self.get_completor_func(constants.L7RULES,
l7rule,
delete=True))
@log_helpers.log_method_call
def l7rule_update(self, ctxt, old_l7rule, new_l7rule):
ctx = neutron_context.Context(None, old_l7rule['project_id'])
self.l7rule.update(ctx, old_l7rule, new_l7rule,
self.get_completor_func(constants.L7RULES,
new_l7rule))
class NSXOctaviaStatisticsCollector(object):
def __init__(self, core_plugin, listener_stats_getter):
self.core_plugin = core_plugin
self.listener_stats_getter = listener_stats_getter
if cfg.CONF.octavia_stats_interval:
eventlet.spawn_n(self.thread_runner,
cfg.CONF.octavia_stats_interval)
@log_helpers.log_method_call
def thread_runner(self, interval):
while True:
time.sleep(interval)
self.collect()
def _get_nl_loadbalancers(self, context):
"""Getting the list of neutron-lbaas loadbalancers
This is done directly from the neutron-lbaas DB to also support the
case that the plugin is currently unavailable, but entries already
exist on the DB.
"""
nl_loadbalancers = context.session.query(models.LoadBalancer).all()
return [lb.id for lb in nl_loadbalancers]
@log_helpers.log_method_call
def collect(self):
if not self.core_plugin.octavia_listener:
return
endpoint = self.core_plugin.octavia_listener.endpoints[0]
context = neutron_context.get_admin_context()
# get the statistics of all the Octavia loadbalancers/listeners while
# ignoring the neutron-lbaas loadbalancers.
# Note(asarfaty): The Octavia plugin/DB is unavailable from the
# neutron context, so there is no option to query the Octavia DB for
# the relevant loadbalancers.
nl_loadbalancers = self._get_nl_loadbalancers(context)
listeners_stats = self.listener_stats_getter(
context, self.core_plugin, ignore_list=nl_loadbalancers)
if not listeners_stats:
# Avoid sending empty stats
return
stats = {'listeners': listeners_stats}
endpoint.update_listener_statistics(stats)

View File

@ -293,7 +293,8 @@ class TestEdgeLbaasV2Loadbalancer(BaseTestEdgeLbaasV2):
) as mock_get_pool_binding, \ ) as mock_get_pool_binding, \
mock.patch.object(self.pool_client, 'get' mock.patch.object(self.pool_client, 'get'
) as mock_get_pool, \ ) as mock_get_pool, \
mock.patch.object(nsx_db, 'get_nsx_lbaas_listener_binding_by_vs' mock.patch.object(nsx_db,
'get_nsx_lbaas_listener_binding_by_lb_and_vs'
) as mock_get_listener_binding: ) as mock_get_listener_binding:
mock_get_lb_binding.return_value = LB_BINDING mock_get_lb_binding.return_value = LB_BINDING
mock_get_pool_binding.return_value = POOL_BINDING mock_get_pool_binding.return_value = POOL_BINDING

View File

@ -0,0 +1,477 @@
# Copyright 2018 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import decorator
import mock
import testtools
from oslo_utils import uuidutils
code_ok = True
# Skip duplications between Octavia & Neutron configurations
with mock.patch('oslo_config.cfg.ConfigOpts.import_group'):
try:
from octavia.api.drivers import data_models
from vmware_nsx.services.lbaas.octavia import octavia_driver as driver
except ImportError:
# Octavia code not found
# this can happen as Octavia is not in the requirements yet
code_ok = False
DRIVER = 'vmware_nsx.services.lbaas.octavia.octavia_driver.NSXOctaviaDriver'
class TestNsxProviderDriver(testtools.TestCase):
"""Test the NSX Octavia driver
Make sure all the relevant data is translated and sent to the listener
"""
def setUp(self):
super(TestNsxProviderDriver, self).setUp()
global code_ok
if not code_ok:
return
# init the NSX driver without the RPC & certificate
with mock.patch(DRIVER + '._init_rpc_messaging'), \
mock.patch(DRIVER + '._init_rpc_listener'), \
mock.patch(DRIVER + '._init_cert_manager'):
self.driver = driver.NSXOctaviaDriver()
self.driver.client = mock.Mock()
self.loadbalancer_id = uuidutils.generate_uuid()
self.vip_address = '192.0.2.10'
self.vip_network_id = uuidutils.generate_uuid()
self.vip_port_id = uuidutils.generate_uuid()
self.vip_subnet_id = uuidutils.generate_uuid()
self.listener_id = uuidutils.generate_uuid()
self.pool_id = uuidutils.generate_uuid()
self.member_id = uuidutils.generate_uuid()
self.member_subnet_id = uuidutils.generate_uuid()
self.healthmonitor_id = uuidutils.generate_uuid()
self.l7policy_id = uuidutils.generate_uuid()
self.l7rule_id = uuidutils.generate_uuid()
self.project_id = uuidutils.generate_uuid()
self.default_tls_container_ref = uuidutils.generate_uuid()
self.sni_container_ref_1 = uuidutils.generate_uuid()
self.sni_container_ref_2 = uuidutils.generate_uuid()
self.ref_member = data_models.Member(
address='198.51.100.4',
admin_state_up=True,
member_id=self.member_id,
monitor_address='203.0.113.2',
monitor_port=66,
name='jacket',
pool_id=self.pool_id,
protocol_port=99,
subnet_id=self.member_subnet_id,
weight=55)
self.ref_healthmonitor = data_models.HealthMonitor(
admin_state_up=False,
delay=2,
expected_codes="500",
healthmonitor_id=self.healthmonitor_id,
http_method='TRACE',
max_retries=1,
max_retries_down=0,
name='doc',
pool_id=self.pool_id,
timeout=3,
type='PHD',
url_path='/index.html')
self.ref_pool = data_models.Pool(
admin_state_up=True,
description='Olympic swimming pool',
healthmonitor=self.ref_healthmonitor,
lb_algorithm='A_Fast_One',
loadbalancer_id=self.loadbalancer_id,
members=[self.ref_member],
name='Osborn',
pool_id=self.pool_id,
protocol='avian',
session_persistence={'type': 'glue'})
self.ref_l7rule = data_models.L7Rule(
admin_state_up=True,
compare_type='store_brand',
invert=True,
key='board',
l7policy_id=self.l7policy_id,
l7rule_id=self.l7rule_id,
type='strict',
value='gold')
self.ref_l7policy = data_models.L7Policy(
action='packed',
admin_state_up=False,
description='Corporate policy',
l7policy_id=self.l7policy_id,
listener_id=self.listener_id,
name='more_policy',
position=1,
redirect_pool_id=self.pool_id,
redirect_url='/hr',
rules=[self.ref_l7rule])
self.ref_listener = data_models.Listener(
admin_state_up=False,
connection_limit=5,
default_pool=self.ref_pool,
default_pool_id=self.pool_id,
default_tls_container_data='default_cert_data',
default_tls_container_ref=self.default_tls_container_ref,
description='The listener',
insert_headers={'X-Forwarded-For': 'true'},
l7policies=[self.ref_l7policy],
listener_id=self.listener_id,
loadbalancer_id=self.loadbalancer_id,
name='super_listener',
protocol='avian',
protocol_port=42,
sni_container_data=['sni_cert_data_1', 'sni_cert_data_2'],
sni_container_refs=[self.sni_container_ref_1,
self.sni_container_ref_2])
self.ref_lb = data_models.LoadBalancer(
admin_state_up=False,
description='One great load balancer',
flavor={'cake': 'chocolate'},
listeners=[self.ref_listener],
loadbalancer_id=self.loadbalancer_id,
name='favorite_lb',
project_id=self.project_id,
vip_address=self.vip_address,
vip_network_id=self.vip_network_id,
vip_port_id=self.vip_port_id,
vip_subnet_id=self.vip_subnet_id)
# start DB mocks
mock.patch('octavia.db.api.get_session').start()
mock.patch("octavia.api.drivers.utils.db_pool_to_provider_pool",
return_value=self.ref_pool).start()
@decorator.decorator
def skip_no_octavia(f, *args, **kwargs):
global code_ok
if not code_ok:
obj = args[0]
return obj.skipTest('Octavia code not found')
return f(*args, **kwargs)
@skip_no_octavia
def test_loadbalancer_create(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.loadbalancer_create(self.ref_lb)
cast_method.assert_called_with({}, 'loadbalancer_create',
loadbalancer=mock.ANY)
driver_obj = cast_method.call_args[1]['loadbalancer']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
self.assertIn('admin_state_up', driver_obj)
self.assertIn('name', driver_obj)
self.assertIn('listeners', driver_obj)
self.assertEqual(1, len(driver_obj['listeners']))
self.assertEqual(self.ref_lb.vip_address,
driver_obj['vip_address'])
self.assertEqual(self.ref_lb.vip_network_id,
driver_obj['vip_network_id'])
self.assertEqual(self.ref_lb.vip_port_id,
driver_obj['vip_port_id'])
self.assertEqual(self.ref_lb.vip_subnet_id,
driver_obj['vip_subnet_id'])
@skip_no_octavia
def test_loadbalancer_delete(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.loadbalancer_delete(self.ref_lb)
cast_method.assert_called_with({}, 'loadbalancer_delete',
cascade=False,
loadbalancer=mock.ANY)
driver_obj = cast_method.call_args[1]['loadbalancer']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_loadbalancer_update(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.loadbalancer_update(self.ref_lb, self.ref_lb)
cast_method.assert_called_with({}, 'loadbalancer_update',
old_loadbalancer=mock.ANY,
new_loadbalancer=mock.ANY)
driver_obj = cast_method.call_args[1]['new_loadbalancer']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_listener_create(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.listener_create(self.ref_listener)
cast_method.assert_called_with({}, 'listener_create', cert=None,
listener=mock.ANY)
driver_obj = cast_method.call_args[1]['listener']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
self.assertIn('admin_state_up', driver_obj)
self.assertIn('name', driver_obj)
self.assertIn('loadbalancer_id', driver_obj)
self.assertIn('loadbalancer', driver_obj)
self.assertEqual(self.ref_listener.protocol,
driver_obj['protocol'])
self.assertEqual(self.ref_listener.protocol_port,
driver_obj['protocol_port'])
self.assertEqual(self.ref_listener.connection_limit,
driver_obj['connection_limit'])
self.assertIn('l7policies', driver_obj)
#TODO(asarfaty) add after the driver is fixed
#self.assertIn('default_tls_container_id', driver_obj)
@skip_no_octavia
def test_listener_delete(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.listener_delete(self.ref_listener)
cast_method.assert_called_with({}, 'listener_delete',
listener=mock.ANY)
driver_obj = cast_method.call_args[1]['listener']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_listener_update(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.listener_update(self.ref_listener, self.ref_listener)
cast_method.assert_called_with({}, 'listener_update', cert=None,
old_listener=mock.ANY,
new_listener=mock.ANY)
driver_obj = cast_method.call_args[1]['new_listener']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_pool_create(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.pool_create(self.ref_pool)
cast_method.assert_called_with({}, 'pool_create', pool=mock.ANY)
driver_obj = cast_method.call_args[1]['pool']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
self.assertIn('admin_state_up', driver_obj)
self.assertIn('name', driver_obj)
self.assertIn('loadbalancer_id', driver_obj)
self.assertIn('listener', driver_obj)
self.assertIn('listeners', driver_obj)
self.assertEqual(self.ref_pool.lb_algorithm,
driver_obj['lb_algorithm'])
self.assertEqual(self.ref_pool.session_persistence,
driver_obj['session_persistence'])
self.assertIn('members', driver_obj)
@skip_no_octavia
def test_pool_delete(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.pool_delete(self.ref_pool)
cast_method.assert_called_with({}, 'pool_delete', pool=mock.ANY)
driver_obj = cast_method.call_args[1]['pool']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_pool_update(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.pool_update(self.ref_pool, self.ref_pool)
cast_method.assert_called_with({}, 'pool_update',
old_pool=mock.ANY,
new_pool=mock.ANY)
driver_obj = cast_method.call_args[1]['new_pool']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_member_create(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.member_create(self.ref_member)
cast_method.assert_called_with({}, 'member_create',
member=mock.ANY)
driver_obj = cast_method.call_args[1]['member']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
self.assertIn('admin_state_up', driver_obj)
self.assertIn('name', driver_obj)
self.assertEqual(self.pool_id, driver_obj['pool_id'])
self.assertIn('pool', driver_obj)
self.assertIn('loadbalancer', driver_obj['pool'])
#TODO(asarfaty) add when the driver is fixed
#self.assertIn('listener', driver_obj['pool'])
self.assertEqual(self.ref_member.subnet_id,
driver_obj['subnet_id'])
self.assertEqual(self.ref_member.address,
driver_obj['address'])
self.assertEqual(self.ref_member.protocol_port,
driver_obj['protocol_port'])
self.assertEqual(self.ref_member.weight,
driver_obj['weight'])
@skip_no_octavia
def test_member_delete(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.member_delete(self.ref_member)
cast_method.assert_called_with({}, 'member_delete',
member=mock.ANY)
driver_obj = cast_method.call_args[1]['member']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_member_update(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.member_update(self.ref_member, self.ref_member)
cast_method.assert_called_with({}, 'member_update',
old_member=mock.ANY,
new_member=mock.ANY)
driver_obj = cast_method.call_args[1]['old_member']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_health_monitor_create(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.health_monitor_create(self.ref_healthmonitor)
cast_method.assert_called_with({}, 'healthmonitor_create',
healthmonitor=mock.ANY)
driver_obj = cast_method.call_args[1]['healthmonitor']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
self.assertIn('admin_state_up', driver_obj)
self.assertIn('name', driver_obj)
self.assertEqual(self.ref_healthmonitor.type,
driver_obj['type'])
self.assertEqual(self.ref_healthmonitor.url_path,
driver_obj['url_path'])
self.assertEqual(self.ref_healthmonitor.delay,
driver_obj['delay'])
self.assertEqual(self.ref_healthmonitor.timeout,
driver_obj['timeout'])
self.assertEqual(self.ref_healthmonitor.max_retries,
driver_obj['max_retries'])
self.assertEqual(self.ref_healthmonitor.http_method,
driver_obj['http_method'])
self.assertIn('pool', driver_obj)
self.assertEqual(self.pool_id,
driver_obj['pool']['id'])
self.assertEqual(self.loadbalancer_id,
driver_obj['pool']['loadbalancer_id'])
@skip_no_octavia
def test_health_monitor_delete(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.health_monitor_delete(self.ref_healthmonitor)
cast_method.assert_called_with({}, 'healthmonitor_delete',
healthmonitor=mock.ANY)
driver_obj = cast_method.call_args[1]['healthmonitor']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_health_monitor_update(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.health_monitor_update(self.ref_healthmonitor,
self.ref_healthmonitor)
cast_method.assert_called_with({}, 'healthmonitor_update',
old_healthmonitor=mock.ANY,
new_healthmonitor=mock.ANY)
driver_obj = cast_method.call_args[1]['new_healthmonitor']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_l7policy_create(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.l7policy_create(self.ref_l7policy)
cast_method.assert_called_with({}, 'l7policy_create',
l7policy=mock.ANY)
driver_obj = cast_method.call_args[1]['l7policy']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
self.assertIn('admin_state_up', driver_obj)
self.assertIn('name', driver_obj)
self.assertIn('listener', driver_obj)
self.assertEqual(self.listener_id, driver_obj['listener_id'])
self.assertIn('rules', driver_obj)
self.assertIn('position', driver_obj)
self.assertEqual(self.ref_l7policy.action, driver_obj['action'])
self.assertEqual(self.ref_l7policy.redirect_url,
driver_obj['redirect_url'])
self.assertEqual(self.ref_l7policy.redirect_pool_id,
driver_obj['redirect_pool_id'])
@skip_no_octavia
def test_l7policy_delete(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.l7policy_delete(self.ref_l7policy)
cast_method.assert_called_with({}, 'l7policy_delete',
l7policy=mock.ANY)
driver_obj = cast_method.call_args[1]['l7policy']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_l7policy_update(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.l7policy_update(self.ref_l7policy, self.ref_l7policy)
cast_method.assert_called_with({}, 'l7policy_update',
old_l7policy=mock.ANY,
new_l7policy=mock.ANY)
driver_obj = cast_method.call_args[1]['new_l7policy']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_l7rule_create(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.l7rule_create(self.ref_l7rule)
cast_method.assert_called_with({}, 'l7rule_create',
l7rule=mock.ANY)
driver_obj = cast_method.call_args[1]['l7rule']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
self.assertIn('admin_state_up', driver_obj)
self.assertIn('name', driver_obj)
self.assertIn('policy', driver_obj)
self.assertIn('rules', driver_obj['policy'])
self.assertEqual(self.ref_l7rule.type, driver_obj['type'])
self.assertEqual(self.ref_l7rule.value, driver_obj['value'])
self.assertEqual(self.ref_l7rule.invert, driver_obj['invert'])
@skip_no_octavia
def test_l7rule_delete(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.l7rule_delete(self.ref_l7rule)
cast_method.assert_called_with({}, 'l7rule_delete',
l7rule=mock.ANY)
driver_obj = cast_method.call_args[1]['l7rule']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)
@skip_no_octavia
def test_l7rule_update(self):
with mock.patch.object(self.driver.client, 'cast') as cast_method:
self.driver.l7rule_update(self.ref_l7rule, self.ref_l7rule)
cast_method.assert_called_with({}, 'l7rule_update',
old_l7rule=mock.ANY,
new_l7rule=mock.ANY)
driver_obj = cast_method.call_args[1]['new_l7rule']
self.assertIn('id', driver_obj)
self.assertIn('project_id', driver_obj)

View File

@ -0,0 +1,301 @@
# Copyright 2018 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from oslo_utils import uuidutils
from vmware_nsx.services.lbaas.octavia import octavia_listener
class DummyOctaviaResource(object):
create_called = False
update_called = False
delete_called = False
def create(self, ctx, lb_obj, completor_func, **args):
self.create_called = True
completor_func(success=True)
def update(self, ctx, old_lb_obj, new_lb_obj, completor_func, **args):
self.update_called = True
completor_func(success=True)
def delete(self, ctx, lb_obj, completor_func, **args):
self.delete_called = True
completor_func(success=True)
class TestNsxOctaviaListener(testtools.TestCase):
"""Test the NSX Octavia listener"""
def setUp(self):
super(TestNsxOctaviaListener, self).setUp()
self.dummyResource = DummyOctaviaResource()
self.clientMock = mock.Mock()
self.clientMock.cast = mock.Mock()
self.endpoint = octavia_listener.NSXOctaviaListenerEndpoint(
client=self.clientMock,
loadbalancer=self.dummyResource,
listener=self.dummyResource,
pool=self.dummyResource,
member=self.dummyResource,
healthmonitor=self.dummyResource,
l7policy=self.dummyResource,
l7rule=self.dummyResource)
self.dummyObj = {'project_id': uuidutils.generate_uuid(),
'id': uuidutils.generate_uuid()}
self.ctx = None
self.mock_ctx = mock.patch("neutron_lib.context.Context")
self.mock_ctx.start()
def tearDown(self):
self.mock_ctx.stop()
super(TestNsxOctaviaListener, self).tearDown()
def test_loadbalancer_create(self):
self.dummyResource.create_called = False
self.endpoint.loadbalancer_create(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.create_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'loadbalancers': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_loadbalancer_delete(self):
self.dummyResource.delete_called = False
self.endpoint.loadbalancer_delete(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.delete_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'loadbalancers': [
{'operating_status': 'ONLINE',
'provisioning_status': 'DELETED',
'id': mock.ANY}]})
def test_loadbalancer_update(self):
self.dummyResource.update_called = False
self.endpoint.loadbalancer_update(self.ctx, self.dummyObj,
self.dummyObj)
self.assertTrue(self.dummyResource.update_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'loadbalancers': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_listener_create(self):
self.dummyResource.create_called = False
self.endpoint.listener_create(self.ctx, self.dummyObj, None)
self.assertTrue(self.dummyResource.create_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'listeners': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_listener_delete(self):
self.dummyResource.delete_called = False
self.endpoint.listener_delete(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.delete_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'listeners': [
{'operating_status': 'ONLINE',
'provisioning_status': 'DELETED',
'id': mock.ANY}]})
def test_listener_update(self):
self.dummyResource.update_called = False
self.endpoint.listener_update(self.ctx, self.dummyObj, self.dummyObj,
None)
self.assertTrue(self.dummyResource.update_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'listeners': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_pool_create(self):
self.dummyResource.create_called = False
self.endpoint.pool_create(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.create_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'pools': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_pool_delete(self):
self.dummyResource.delete_called = False
self.endpoint.pool_delete(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.delete_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'pools': [
{'operating_status': 'ONLINE',
'provisioning_status': 'DELETED',
'id': mock.ANY}]})
def test_pool_update(self):
self.dummyResource.update_called = False
self.endpoint.pool_update(self.ctx, self.dummyObj, self.dummyObj)
self.assertTrue(self.dummyResource.update_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'pools': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_member_create(self):
self.dummyResource.create_called = False
self.endpoint.member_create(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.create_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'members': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_member_delete(self):
self.dummyResource.delete_called = False
self.endpoint.member_delete(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.delete_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'members': [
{'operating_status': 'ONLINE',
'provisioning_status': 'DELETED',
'id': mock.ANY}]})
def test_member_update(self):
self.dummyResource.update_called = False
self.endpoint.member_update(self.ctx, self.dummyObj, self.dummyObj)
self.assertTrue(self.dummyResource.update_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'members': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_healthmonitor_create(self):
self.dummyResource.create_called = False
self.endpoint.healthmonitor_create(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.create_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'healthmonitors': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_healthmonitor_delete(self):
self.dummyResource.delete_called = False
self.endpoint.healthmonitor_delete(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.delete_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'healthmonitors': [
{'operating_status': 'ONLINE',
'provisioning_status': 'DELETED',
'id': mock.ANY}]})
def test_healthmonitor_update(self):
self.dummyResource.update_called = False
self.endpoint.healthmonitor_update(self.ctx, self.dummyObj,
self.dummyObj)
self.assertTrue(self.dummyResource.update_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'healthmonitors': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_l7policy_create(self):
self.dummyResource.create_called = False
self.endpoint.l7policy_create(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.create_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'l7policies': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_l7policy_delete(self):
self.dummyResource.delete_called = False
self.endpoint.l7policy_delete(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.delete_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'l7policies': [
{'operating_status': 'ONLINE',
'provisioning_status': 'DELETED',
'id': mock.ANY}]})
def test_l7policy_update(self):
self.dummyResource.update_called = False
self.endpoint.l7policy_update(self.ctx, self.dummyObj, self.dummyObj)
self.assertTrue(self.dummyResource.update_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'l7policies': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_l7rule_create(self):
self.dummyResource.create_called = False
self.endpoint.l7rule_create(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.create_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'l7rules': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})
def test_l7rule_delete(self):
self.dummyResource.delete_called = False
self.endpoint.l7rule_delete(self.ctx, self.dummyObj)
self.assertTrue(self.dummyResource.delete_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'l7rules': [
{'operating_status': 'ONLINE',
'provisioning_status': 'DELETED',
'id': mock.ANY}]})
def test_l7rule_update(self):
self.dummyResource.update_called = False
self.endpoint.l7rule_update(self.ctx, self.dummyObj, self.dummyObj)
self.assertTrue(self.dummyResource.update_called)
self.clientMock.cast.assert_called_once_with(
{}, 'update_loadbalancer_status',
status={'l7rules': [
{'operating_status': 'ONLINE',
'provisioning_status': 'ACTIVE',
'id': mock.ANY}]})

View File

@ -44,8 +44,15 @@ class TestQosNsxVNotification(test_plugin.NsxVPluginV2TestCase,
self._init_dvs_config() self._init_dvs_config()
# Reset the drive to re-create it # Reset the drive to re-create it
qos_driver.DRIVER = None qos_driver.DRIVER = None
super(TestQosNsxVNotification, self).setUp(plugin=CORE_PLUGIN, # Skip Octavia init because of RPC conflicts
ext_mgr=None) with mock.patch("vmware_nsx.services.lbaas.octavia.octavia_listener."
"NSXOctaviaListener.__init__", return_value=None),\
mock.patch("vmware_nsx.services.lbaas.octavia.octavia_listener."
"NSXOctaviaStatisticsCollector.__init__",
return_value=None):
super(TestQosNsxVNotification, self).setUp(plugin=CORE_PLUGIN,
ext_mgr=None,
with_md_proxy=False)
self.setup_coreplugin(CORE_PLUGIN) self.setup_coreplugin(CORE_PLUGIN)
plugin_instance = directory.get_plugin() plugin_instance = directory.get_plugin()