Embrane LBaaS Driver

Implements blueprint embrane-lbaas-driver

This commit implements Embrane's driver for LBaaS,
which uses Embrane's heleos(tm) appliances to provide Load Balancing.

Change-Id: Ia76fbc8881d178cfe6df11a2cfe8e77d3f36094f
This commit is contained in:
Ivar Lazzaro 2013-10-29 19:05:20 -07:00
parent 3d6d2f5e99
commit fc5e46802e
20 changed files with 1133 additions and 7 deletions

View File

@ -339,7 +339,7 @@ notification_driver = neutron.openstack.common.notifier.rpc_notifier
# quota_health_monitors = -1 # quota_health_monitors = -1
# Number of routers allowed per tenant. A negative value means unlimited. # Number of routers allowed per tenant. A negative value means unlimited.
# quota_router = 10 # quota_router = 10
# Number of floating IPs allowed per tenant. A negative value means unlimited. # Number of floating IPs allowed per tenant. A negative value means unlimited.
# quota_floatingip = 50 # quota_floatingip = 50
@ -427,3 +427,5 @@ service_provider=VPN:openswan:neutron.services.vpn.service_drivers.ipsec.IPsecVP
# service_provider=LOADBALANCER:NetScaler:neutron.services.loadbalancer.drivers.netscaler.netscaler_driver.NetScalerPluginDriver # service_provider=LOADBALANCER:NetScaler:neutron.services.loadbalancer.drivers.netscaler.netscaler_driver.NetScalerPluginDriver
# Uncomment the following line (and comment out the OpenSwan VPN line) to enable Cisco's VPN driver. # Uncomment the following line (and comment out the OpenSwan VPN line) to enable Cisco's VPN driver.
# service_provider=VPN:cisco:neutron.services.vpn.service_drivers.cisco_ipsec.CiscoCsrIPsecVPNDriver:default # service_provider=VPN:cisco:neutron.services.vpn.service_drivers.cisco_ipsec.CiscoCsrIPsecVPNDriver:default
# Uncomment the line below to use Embrane heleos as Load Balancer service provider.
# service_provider=LOADBALANCER:Embrane:neutron.services.loadbalancer.drivers.embrane.driver.EmbraneLbaas:default

View File

@ -23,3 +23,17 @@
#netscaler_ncc_uri = https://ncc_server.acme.org/ncc/v1/api #netscaler_ncc_uri = https://ncc_server.acme.org/ncc/v1/api
#netscaler_ncc_username = admin #netscaler_ncc_username = admin
#netscaler_ncc_password = secret #netscaler_ncc_password = secret
[heleoslb]
#esm_mgmt =
#admin_username =
#admin_password =
#lb_image =
#inband_id =
#oob_id =
#mgmt_id =
#dummy_utif_id =
#resource_pool_id =
#async_requests =
#lb_flavor = small
#sync_interval = 60

View File

@ -0,0 +1,61 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""embrane_lbaas_driver
Revision ID: 33dd0a9fa487
Revises: 19180cf98af6
Create Date: 2014-02-25 00:15:35.567111
"""
# revision identifiers, used by Alembic.
revision = '33dd0a9fa487'
down_revision = '19180cf98af6'
# Change to ['*'] if this migration applies to all plugins
migration_for_plugins = [
'neutron.services.loadbalancer.plugin.LoadBalancerPlugin'
]
from alembic import op
import sqlalchemy as sa
from neutron.db import migration
def upgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.create_table(
u'embrane_pool_port',
sa.Column(u'pool_id', sa.String(length=36), nullable=False),
sa.Column(u'port_id', sa.String(length=36), nullable=False),
sa.ForeignKeyConstraint(['pool_id'], [u'pools.id'],
name=u'embrane_pool_port_ibfk_1'),
sa.ForeignKeyConstraint(['port_id'], [u'ports.id'],
name=u'embrane_pool_port_ibfk_2'),
sa.PrimaryKeyConstraint(u'pool_id'))
def downgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.drop_table(u'embrane_pool_port')

View File

@ -40,7 +40,7 @@ class Dispatcher(object):
def dispatch_l3(self, d_context, args=(), kwargs={}): def dispatch_l3(self, d_context, args=(), kwargs={}):
item = d_context.item item = d_context.item
event = d_context.event event = d_context.event
q_context = d_context.q_context n_context = d_context.n_context
chain = d_context.chain chain = d_context.chain
item_id = item["id"] item_id = item["id"]
@ -52,7 +52,7 @@ class Dispatcher(object):
self.sync_items[item_id] = (queue.Queue(),) self.sync_items[item_id] = (queue.Queue(),)
first_run = True first_run = True
self.sync_items[item_id][0].put( self.sync_items[item_id][0].put(
ctx.OperationContext(event, q_context, item, chain, f, ctx.OperationContext(event, n_context, item, chain, f,
args, kwargs)) args, kwargs))
t = None t = None
if first_run: if first_run:
@ -93,7 +93,7 @@ class Dispatcher(object):
try: try:
dva_state = operation_context.function( dva_state = operation_context.function(
plugin._esm_api, plugin._esm_api,
operation_context.q_context.tenant_id, operation_context.n_context.tenant_id,
operation_context.item, operation_context.item,
*operation_context.args, *operation_context.args,
**operation_context.kwargs) **operation_context.kwargs)
@ -122,12 +122,12 @@ class Dispatcher(object):
if transient_state: if transient_state:
if transient_state == p_con.Status.DELETED: if transient_state == p_con.Status.DELETED:
current_state = plugin._delete_router( current_state = plugin._delete_router(
operation_context.q_context, operation_context.n_context,
operation_context.item["id"]) operation_context.item["id"])
# Error state cannot be reverted # Error state cannot be reverted
elif transient_state != p_con.Status.ERROR: elif transient_state != p_con.Status.ERROR:
current_state = plugin._update_neutron_state( current_state = plugin._update_neutron_state(
operation_context.q_context, operation_context.n_context,
operation_context.item, operation_context.item,
transient_state) transient_state)
except Exception: except Exception:

View File

@ -23,7 +23,7 @@ class DispatcherContext(object):
def __init__(self, event, item, neutron_context, chain=None): def __init__(self, event, item, neutron_context, chain=None):
self.event = event self.event = event
self.item = item self.item = item
self.q_context = neutron_context self.n_context = neutron_context
self.chain = chain self.chain = chain

View File

@ -22,3 +22,7 @@ from neutron.common import exceptions as neutron_exec
class EmbranePluginException(neutron_exec.NeutronException): class EmbranePluginException(neutron_exec.NeutronException):
message = _("An unexpected error occurred:%(err_msg)s") message = _("An unexpected error occurred:%(err_msg)s")
class UnsupportedException(EmbranePluginException):
message = _("%(err_msg)s")

View File

@ -0,0 +1,9 @@
Embrane LBaaS Driver
This DRIVER interfaces OpenStack Neutron with Embrane's heleos platform,
Load Balancing appliances for cloud environments.
L2 connectivity is leveraged by one of the supported existing plugins.
For more details on use, configuration and implementation please refer to:
https://wiki.openstack.org/wiki/Neutron/LBaaS/EmbraneDriver

View File

@ -0,0 +1,108 @@
# Copyright 2014 Embrane, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
from eventlet import greenthread
from eventlet import queue
from heleosapi import exceptions as h_exc
from neutron.openstack.common import log as logging
from neutron.plugins.embrane.common import contexts as ctx
from neutron.services.loadbalancer.drivers.embrane.agent import lb_operations
from neutron.services.loadbalancer.drivers.embrane import constants as econ
LOG = logging.getLogger(__name__)
class Dispatcher(object):
def __init__(self, driver, async=True):
self._async = async
self._driver = driver
self.sync_items = dict()
self.handlers = lb_operations.handlers
def dispatch_lb(self, d_context, *args, **kwargs):
item = d_context.item
event = d_context.event
n_context = d_context.n_context
chain = d_context.chain
item_id = item["id"]
if event in self.handlers:
for f in self.handlers[event]:
first_run = False
if item_id not in self.sync_items:
self.sync_items[item_id] = [queue.Queue()]
first_run = True
self.sync_items[item_id][0].put(
ctx.OperationContext(event, n_context, item, chain, f,
args, kwargs))
if first_run:
t = greenthread.spawn(self._consume_lb,
item_id,
self.sync_items[item_id][0],
self._driver,
self._async)
self.sync_items[item_id].append(t)
if not self._async:
t = self.sync_items[item_id][1]
t.wait()
def _consume_lb(self, sync_item, sync_queue, driver, a_sync):
current_state = None
while True:
try:
if current_state == econ.DELETED:
del self.sync_items[sync_item]
return
try:
operation_context = sync_queue.get(
block=a_sync,
timeout=econ.QUEUE_TIMEOUT)
except queue.Empty:
del self.sync_items[sync_item]
return
(operation_context.chain and
operation_context.chain.execute_all())
transient_state = None
try:
transient_state = operation_context.function(
driver, operation_context.n_context,
operation_context.item, *operation_context.args,
**operation_context.kwargs)
except (h_exc.PendingDva, h_exc.DvaNotFound,
h_exc.BrokenInterface, h_exc.DvaCreationFailed,
h_exc.BrokenDva, h_exc.ConfigurationFailed) as ex:
LOG.warning(econ.error_map[type(ex)], ex.message)
except h_exc.DvaDeleteFailed as ex:
LOG.warning(econ.error_map[type(ex)], ex.message)
transient_state = econ.DELETED
finally:
# if the returned transient state is None, no operations
# are required on the DVA status
if transient_state == econ.DELETED:
current_state = driver._delete_vip(
operation_context.n_context,
operation_context.item)
# Error state cannot be reverted
else:
driver._update_vip_graph_state(
operation_context.n_context,
operation_context.item)
except Exception:
LOG.exception(_('Unhandled exception occurred'))

View File

@ -0,0 +1,179 @@
# Copyright 2014 Embrane, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
from functools import wraps
from heleosapi import exceptions as h_exc
from neutron.openstack.common import log as logging
from neutron.services.loadbalancer import constants as lcon
from neutron.services.loadbalancer.drivers.embrane import constants as econ
LOG = logging.getLogger(__name__)
handlers = {}
def handler(event, handler):
def wrap(f):
if event not in handler.keys():
handler[event] = [f]
else:
handler[event].append(f)
@wraps(f)
def wrapped_f(*args, **kwargs):
return f(*args, **kwargs)
return wrapped_f
return wrap
@handler(econ.Events.CREATE_VIP, handlers)
def _provision_load_balancer(driver, context, vip, flavor,
vip_utif_info, vip_ip_allocation_info,
pool_utif_info=None,
pool_ip_allocation_info=None,
pool=None, members=None,
monitors=None):
api = driver._heleos_api
tenant_id = context.tenant_id
admin_state = vip["admin_state_up"]
# Architectural configuration
api.create_load_balancer(tenant_id=tenant_id,
router_id=vip["id"],
name=vip["name"],
flavor=flavor,
up=False)
api.grow_interface(vip_utif_info, False, tenant_id, vip["id"])
if pool:
api.grow_interface(pool_utif_info, False, tenant_id,
vip["id"])
# Logical configuration
api.allocate_address(vip["id"], True, vip_ip_allocation_info)
if pool:
api.allocate_address(vip["id"], True, pool_ip_allocation_info)
dva = api.configure_load_balancer(vip["id"], admin_state,
vip, pool,
monitors, members)
return api.extract_dva_state(dva)
@handler(econ.Events.UPDATE_VIP, handlers)
def _update_load_balancer(driver, context, vip,
old_pool_id=None, old_port_id=None,
removed_ip=None, pool_utif_info=None,
pool_ip_allocation_info=None,
new_pool=None, members=None,
monitors=None):
api = driver._heleos_api
tenant_id = context.tenant_id
admin_state = vip["admin_state_up"]
if old_pool_id:
# Architectural Changes
api.de_allocate_address(vip['id'], False, old_port_id, removed_ip)
api.shrink_interface(tenant_id, vip["id"], False, old_port_id)
api.grow_interface(pool_utif_info, False, tenant_id, vip["id"])
# Configuration Changes
api.allocate_address(vip["id"], True, pool_ip_allocation_info)
api.replace_pool(vip["id"], True, vip, old_pool_id,
new_pool, monitors, members)
api.update_vservice(vip["id"], True, vip)
# Dva update
dva = api.update_dva(tenant_id, vip["id"], vip["name"],
admin_state, description=vip["description"])
return api.extract_dva_state(dva)
@handler(econ.Events.DELETE_VIP, handlers)
def _delete_load_balancer(driver, context, vip):
try:
driver._heleos_api.delete_dva(context.tenant_id, vip['id'])
except h_exc.DvaNotFound:
LOG.warning(_('The load balancer %s had no physical representation, '
'likely already deleted'), vip['id'])
return econ.DELETED
@handler(econ.Events.UPDATE_POOL, handlers)
def _update_server_pool(driver, context, vip, pool,
monitors=None):
api = driver._heleos_api
cookie = ((vip.get('session_persistence') or {}).get('type') ==
lcon.SESSION_PERSISTENCE_HTTP_COOKIE)
return api.extract_dva_state(api.update_pool(vip['id'],
vip['admin_state_up'],
pool, cookie, monitors))
@handler(econ.Events.ADD_OR_UPDATE_MEMBER, handlers)
def _add_or_update_pool_member(driver, context, vip, member, protocol):
api = driver._heleos_api
return api.extract_dva_state(api.update_backend_server(
vip['id'], vip['admin_state_up'], member, protocol))
@handler(econ.Events.REMOVE_MEMBER, handlers)
def _remove_member_from_pool(driver, context, vip, member):
api = driver._heleos_api
return api.extract_dva_state(api.remove_pool_member(vip['id'],
vip['admin_state_up'],
member))
@handler(econ.Events.DELETE_MEMBER, handlers)
def _delete_member(driver, context, vip, member):
with context.session.begin(subtransactions=True):
api = driver._heleos_api
dva = api.delete_backend_server(vip['id'], vip['admin_state_up'],
member)
driver._delete_member(context, member)
return api.extract_dva_state(dva)
@handler(econ.Events.ADD_POOL_HM, handlers)
def _create_pool_hm(driver, context, vip, hm, pool_id):
api = driver._heleos_api
return api.extract_dva_state(api.add_pool_monitor(
vip['id'], vip['admin_state_up'], hm, pool_id))
@handler(econ.Events.UPDATE_POOL_HM, handlers)
def _update_pool_hm(driver, context, vip, hm, pool_id):
api = driver._heleos_api
return api.extract_dva_state(api.update_pool_monitor(
vip['id'], vip['admin_state_up'], hm, pool_id))
@handler(econ.Events.DELETE_POOL_HM, handlers)
def _delete_pool_hm(driver, context, vip, hm, pool_id):
with context.session.begin(subtransactions=True):
api = driver._heleos_api
dva = api.add_pool_monitor(vip['id'], vip['admin_state_up'],
hm, pool_id)
driver._delete_pool_hm(context, hm, pool_id)
return api.extract_dva_state(dva)
@handler(econ.Events.POLL_GRAPH, handlers)
def _poll_graph(driver, context, vip):
api = driver._heleos_api
return api.extract_dva_state(api.get_dva(vip['id']))

View File

@ -0,0 +1,53 @@
# Copyright 2014 Embrane, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
from oslo.config import cfg
# User may want to use LB service together with the L3 plugin, but using
# different resources. The service will inherit the configuration from the
# L3 heleos plugin if present and not overridden.
heleos_opts = [
cfg.StrOpt('esm_mgmt',
help=_('ESM management root address')),
cfg.StrOpt('admin_username', default=None,
help=_('ESM admin username.')),
cfg.StrOpt('admin_password', default=None,
secret=True,
help=_('ESM admin password.')),
cfg.StrOpt('lb_image', default=None,
help=_('Load Balancer image id (Embrane LB)')),
cfg.StrOpt('inband_id', default=None,
help=_('In band Security Zone id for LBs')),
cfg.StrOpt('oob_id', default=None,
help=_('Out of band Security Zone id for LBs')),
cfg.StrOpt('mgmt_id', default=None,
help=_('Management Security Zone id for LBs')),
cfg.StrOpt('dummy_utif_id', default=None,
help=_('Dummy user traffic Security Zone id for LBs')),
cfg.StrOpt('resource_pool_id', default=None,
help=_('Shared resource pool id')),
cfg.StrOpt('lb_flavor', default="small",
help=_('choose LB image flavor to use, accepted values: small, '
'medium')),
cfg.IntOpt('sync_interval', default=60,
help=_('resource synchronization interval in seconds')),
cfg.BoolOpt('async_requests', default=None,
help=_('Define if the requests have '
'run asynchronously or not')),
]
cfg.CONF.register_opts(heleos_opts, 'heleoslb')

View File

@ -0,0 +1,74 @@
# Copyright 2014 Embrane, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
from heleosapi import constants as h_con
from heleosapi import exceptions as h_exc
from neutron.plugins.common import constants as ccon
DELETED = 'DELETED' # not visible status
QUEUE_TIMEOUT = 300
BACK_SUB_LIMIT = 6
class BackendActions:
UPDATE = 'update'
GROW = 'grow'
REMOVE = 'remove'
SHRINK = 'shrink'
class Events:
CREATE_VIP = 'create_vip'
UPDATE_VIP = 'update_vip'
DELETE_VIP = 'delete_vip'
UPDATE_POOL = 'update_pool'
UPDATE_MEMBER = 'update_member'
ADD_OR_UPDATE_MEMBER = 'add_or_update_member'
REMOVE_MEMBER = 'remove_member'
DELETE_MEMBER = 'delete_member'
POLL_GRAPH = 'poll_graph'
ADD_POOL_HM = "create_pool_hm"
UPDATE_POOL_HM = "update_pool_hm"
DELETE_POOL_HM = "delete_pool_hm"
_DVA_PENDING_ERROR_MSG = _('Dva is pending for the following reason: %s')
_DVA_NOT_FOUNT_ERROR_MSG = _('%s, '
'probably was cancelled through the heleos UI')
_DVA_BROKEN_ERROR_MSG = _('Dva seems to be broken for reason %s')
_DVA_CREATION_FAILED_ERROR_MSG = _('Dva creation failed reason %s')
_DVA_CREATION_PENDING_ERROR_MSG = _('Dva creation is in pending state '
'for reason %s')
_CFG_FAILED_ERROR_MSG = _('Dva configuration failed for reason %s')
_DVA_DEL_FAILED_ERROR_MSG = _('Failed to delete the backend '
'load balancer for reason %s. Please remove '
'it manually through the heleos UI')
NO_MEMBER_SUBNET_WARN = _('No subnet is associated to member %s (required '
'to identify the proper load balancer port)')
error_map = {h_exc.PendingDva: _DVA_PENDING_ERROR_MSG,
h_exc.DvaNotFound: _DVA_NOT_FOUNT_ERROR_MSG,
h_exc.BrokenDva: _DVA_BROKEN_ERROR_MSG,
h_exc.DvaCreationFailed: _DVA_CREATION_FAILED_ERROR_MSG,
h_exc.DvaCreationPending: _DVA_CREATION_PENDING_ERROR_MSG,
h_exc.ConfigurationFailed: _CFG_FAILED_ERROR_MSG,
h_exc.DvaDeleteFailed: _DVA_DEL_FAILED_ERROR_MSG}
state_map = {h_con.DvaState.POWER_ON: ccon.ACTIVE,
None: ccon.ERROR,
DELETED: DELETED}

View File

@ -0,0 +1,56 @@
# Copyright 2014 Embrane, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
import neutron.db.api as db
from neutron.db import models_v2 as nmodel
from neutron.services.loadbalancer.drivers.embrane import models
def initialize():
db.configure_db()
def add_pool_port(context, pool_id, port_id):
session = context.session
with session.begin(subtransactions=True):
pool_port = models.PoolPort()
pool_port.pool_id = pool_id
pool_port.port_id = port_id
session.add(pool_port)
def get_pool_port(context, pool_id):
return (context.session.query(models.PoolPort).filter_by(pool_id=pool_id).
first())
def delete_pool_backend(context, pool_id):
session = context.session
backend = (session.query(models.PoolPort).filter_by(
pool_id=pool_id))
for b in backend:
delete_pool_port(context, b)
def delete_pool_port(context, backend_port):
session = context.session
with session.begin(subtransactions=True):
port = (session.query(nmodel.Port).filter_by(
id=backend_port['port_id'])).first()
if port:
session.delete(backend_port)
session.delete(port)

View File

@ -0,0 +1,342 @@
# Copyright 2014 Embrane, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
from heleosapi import backend_operations as h_op
from heleosapi import constants as h_con
from heleosapi import info as h_info
from oslo.config import cfg
from neutron.api.v2 import attributes
from neutron.common import exceptions as n_exc
from neutron.db.loadbalancer import loadbalancer_db as ldb
from neutron.extensions import loadbalancer as lb_ext
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants as pcon
from neutron.plugins.embrane.common import contexts as embrane_ctx
from neutron.plugins.embrane.common import exceptions as h_exc
from neutron.plugins.embrane.common import utils
from neutron.services.loadbalancer import constants as lbcon
from neutron.services.loadbalancer.drivers import abstract_driver
from neutron.services.loadbalancer.drivers.embrane.agent import dispatcher
from neutron.services.loadbalancer.drivers.embrane import config # noqa
from neutron.services.loadbalancer.drivers.embrane import constants as econ
from neutron.services.loadbalancer.drivers.embrane import db as edb
from neutron.services.loadbalancer.drivers.embrane import poller
LOG = logging.getLogger(__name__)
conf = cfg.CONF.heleoslb
confh = {}
try:
confh = cfg.CONF.heleos
except cfg.NoSuchOptError:
pass
def get_conf(x):
try:
return conf.get(x) or confh.get(x)
except cfg.NoSuchOptError:
return
class EmbraneLbaas(abstract_driver.LoadBalancerAbstractDriver):
def __init__(self, plugin):
edb.initialize()
config_esm_mgmt = get_conf('esm_mgmt')
config_admin_username = get_conf('admin_username')
config_admin_password = get_conf('admin_password')
config_lb_image_id = get_conf('lb_image')
config_security_zones = {h_con.SzType.IB: get_conf('inband_id'),
h_con.SzType.OOB: get_conf('oob_id'),
h_con.SzType.MGMT: get_conf('mgmt_id'),
h_con.SzType.DUMMY: get_conf('dummy_utif_id')}
config_resource_pool = get_conf('resource_pool_id')
self._heleos_api = h_op.BackendOperations(
esm_mgmt=config_esm_mgmt,
admin_username=config_admin_username,
admin_password=config_admin_password,
lb_image_id=config_lb_image_id,
security_zones=config_security_zones,
resource_pool=config_resource_pool)
self._dispatcher = dispatcher.Dispatcher(
self, get_conf("async_requests"))
self.plugin = plugin
poll_interval = conf.get('sync_interval')
if poll_interval > 0:
self._loop_call = poller.Poller(self)
self._loop_call.start_polling(conf.get('sync_interval'))
self._flavor = get_conf('lb_flavor')
def _validate_vip(self, vip):
if vip.get('connection_limit') and vip['connection_limit'] != -1:
raise h_exc.UnsupportedException(
err_msg=_('Connection limit is not supported by Embrane LB'))
persistance = vip.get('session_persistence')
if (persistance and persistance.get('type') ==
lbcon.SESSION_PERSISTENCE_APP_COOKIE):
p_type = vip['session_persistence']['type']
raise h_exc.UnsupportedException(
err_msg=_('Session persistence %s '
'not supported by Embrane LBaaS') % p_type)
def _delete_vip(self, context, vip):
with context.session.begin(subtransactions=True):
self.plugin._delete_db_vip(context, vip['id'])
return econ.DELETED
def _delete_member(self, context, member):
self.plugin._delete_db_member(context, member['id'])
def _delete_pool_hm(self, context, health_monitor, pool_id):
self.plugin._delete_db_pool_health_monitor(context,
health_monitor['id'],
pool_id)
def _update_vip_graph_state(self, context, vip):
self._heleos_api.update_vip_status(vip)
self.plugin.update_status(context, ldb.Vip, vip['id'],
vip['status'])
if vip['status'] != pcon.ERROR:
pool = self.plugin.get_pool(context, vip['pool_id'])
pool_members = pool['members']
# Manages possible manual changes and monitor actions
self._heleos_api.update_pool_status(vip['id'], pool)
self._heleos_api.update_members_status(vip['id'], pool['id'],
pool_members)
self.plugin.update_status(context, ldb.Pool, pool['id'],
pool['status'])
for member in pool_members:
self.plugin.update_status(context, ldb.Member,
member['id'], member['status'])
def _create_backend_port(self, context, db_pool):
try:
subnet = self.plugin._core_plugin.get_subnet(context,
db_pool["subnet_id"])
except n_exc.SubnetNotFound:
LOG.warning(_("Subnet assigned to pool %s doesn't exist, "
"backend port can't be created"), db_pool['id'])
return
fixed_ip = {'subnet_id': subnet['id'],
'fixed_ips': attributes.ATTR_NOT_SPECIFIED}
port_data = {
'tenant_id': db_pool['tenant_id'],
'name': 'pool-' + db_pool['id'],
'network_id': subnet['network_id'],
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': False,
'device_id': '',
'device_owner': '',
'fixed_ips': [fixed_ip]
}
port = self.plugin._core_plugin.create_port(context,
{'port': port_data})
return edb.add_pool_port(context, db_pool['id'], port['id'])
def _retrieve_utif_info(self, context, neutron_port):
network = self.plugin._core_plugin.get_network(
context, neutron_port['network_id'])
result = h_info.UtifInfo(network.get('provider:segmentation_id'),
network['name'],
network['id'],
False,
network['tenant_id'],
neutron_port['id'],
neutron_port['mac_address'],
network.get('provider:network_type'))
return result
def create_vip(self, context, vip):
self._validate_vip(vip)
db_vip = self.plugin.populate_vip_graph(context, vip)
vip_port = self.plugin._core_plugin._get_port(context,
db_vip['port_id'])
vip_utif_info = self._retrieve_utif_info(context, vip_port)
vip_ip_allocation_info = utils.retrieve_ip_allocation_info(
context, vip_port)
vip_ip_allocation_info.is_gw = True
db_pool = pool_utif_info = pool_ip_allocation_info = None
members = monitors = []
if db_vip['pool_id']:
db_pool = self.plugin.get_pool(
context, db_vip['pool_id'])
pool_port = edb.get_pool_port(context, db_pool["id"])
if pool_port:
db_port = self.plugin._core_plugin._get_port(
context, pool_port["port_id"])
pool_utif_info = self._retrieve_utif_info(context, db_port)
pool_ip_allocation_info = utils.retrieve_ip_allocation_info(
context, db_port)
members = self.plugin.get_members(
context, filters={'id': db_pool['members']})
monitors = self.plugin.get_members(
context, filters={'id': db_pool['health_monitors']})
self._dispatcher.dispatch_lb(
embrane_ctx.DispatcherContext(econ.Events.CREATE_VIP,
db_vip, context, None),
self._flavor, vip_utif_info, vip_ip_allocation_info,
pool_utif_info, pool_ip_allocation_info, db_pool, members,
monitors)
def update_vip(self, context, old_vip, vip):
new_pool = old_port_id = removed_ip = None
new_pool_utif = new_pool_ip_allocation = None
old_pool = {}
members = monitors = []
if old_vip['pool_id'] != vip['pool_id']:
new_pool = self.plugin.get_pool(
context, vip['pool_id'])
members = self.plugin.get_members(
context, filters={'id': new_pool['members']})
monitors = self.plugin.get_members(
context, filters={'id': new_pool['health_monitors']})
new_pool_port = edb.get_pool_port(context, new_pool["id"])
if new_pool_port:
db_port = self.plugin._core_plugin._get_port(
context, new_pool_port["port_id"])
new_pool_utif = self._retrieve_utif_info(context, db_port)
new_pool_ip_allocation = utils.retrieve_ip_allocation_info(
context, db_port)
old_pool = self.plugin.get_pool(
context, old_vip['pool_id'])
old_pool_port = edb.get_pool_port(context, old_pool["id"])
if old_pool_port:
old_port = self.plugin._core_plugin._get_port(
context, old_pool_port['port_id'])
# remove that subnet ip
removed_ip = old_port['fixed_ips'][0]['ip_address']
old_port_id = old_port['id']
self._dispatcher.dispatch_lb(
embrane_ctx.DispatcherContext(econ.Events.UPDATE_VIP, vip,
context, None),
old_pool.get('id'), old_port_id, removed_ip, new_pool_utif,
new_pool_ip_allocation, new_pool, members, monitors)
def delete_vip(self, context, vip):
db_vip = self.plugin.populate_vip_graph(context, vip)
self._dispatcher.dispatch_lb(
embrane_ctx.DispatcherContext(
econ.Events.DELETE_VIP, db_vip, context, None))
def create_pool(self, context, pool):
if pool['subnet_id']:
self._create_backend_port(context, pool)
def update_pool(self, context, old_pool, pool):
with context.session.begin(subtransactions=True):
if old_pool['vip_id']:
try:
db_vip = self.plugin._get_resource(
context, ldb.Vip, old_pool['vip_id'])
except lb_ext.VipNotFound:
return
monitors = self.plugin.get_members(
context, filters={'id': old_pool['health_monitors']})
self._dispatcher.dispatch_lb(
embrane_ctx.DispatcherContext(econ.Events.UPDATE_POOL,
db_vip, context, None),
pool, monitors)
def delete_pool(self, context, pool):
edb.delete_pool_backend(context, pool['id'])
self.plugin._delete_db_pool(context, pool['id'])
def create_member(self, context, member):
db_pool = self.plugin.get_pool(context, member['pool_id'])
if db_pool['vip_id']:
db_vip = self.plugin._get_resource(context, ldb.Vip,
db_pool['vip_id'])
self._dispatcher.dispatch_lb(
embrane_ctx.DispatcherContext(
econ.Events.ADD_OR_UPDATE_MEMBER, db_vip, context, None),
member, db_pool['protocol'])
def update_member(self, context, old_member, member):
db_pool = self.plugin.get_pool(context, member['pool_id'])
if member['pool_id'] != old_member['pool_id']:
old_pool = self.plugin.get_pool(context, old_member['pool_id'])
if old_pool['vip_id']:
db_vip = self.plugin._get_resource(context, ldb.Vip,
old_pool['vip_id'])
self._dispatcher.dispatch_lb(
embrane_ctx.DispatcherContext(
econ.Events.REMOVE_MEMBER, db_vip, context, None),
old_member)
if db_pool['vip_id']:
db_vip = self.plugin._get_resource(
context, ldb.Vip, db_pool['vip_id'])
self._dispatcher.dispatch_lb(
embrane_ctx.DispatcherContext(
econ.Events.ADD_OR_UPDATE_MEMBER, db_vip, context, None),
member, db_pool['protocol'])
def delete_member(self, context, member):
db_pool = self.plugin.get_pool(context, member['pool_id'])
if db_pool['vip_id']:
db_vip = self.plugin._get_resource(context, ldb.Vip,
db_pool['vip_id'])
self._dispatcher.dispatch_lb(
embrane_ctx.DispatcherContext(
econ.Events.DELETE_MEMBER, db_vip, context, None),
member)
else:
self._delete_member(context, member)
def stats(self, context, pool_id):
return {'bytes_in': 0,
'bytes_out': 0,
'active_connections': 0,
'total_connections': 0}
def create_pool_health_monitor(self, context, health_monitor, pool_id):
db_pool = self.plugin.get_pool(context, pool_id)
# API call only if vip exists
if db_pool['vip_id']:
db_vip = self.plugin._get_resource(context, ldb.Vip,
db_pool['vip_id'])
self._dispatcher.dispatch_lb(
embrane_ctx.DispatcherContext(
econ.Events.ADD_POOL_HM, db_vip, context, None),
health_monitor, pool_id)
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id):
db_pool = self.plugin.get_pool(context, pool_id)
if db_pool['vip_id']:
db_vip = self.plugin._get_resource(context, ldb.Vip,
db_pool['vip_id'])
self._dispatcher.dispatch_lb(
embrane_ctx.DispatcherContext(
econ.Events.UPDATE_POOL_HM, db_vip, context, None),
health_monitor, pool_id)
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
db_pool = self.plugin.get_pool(context, pool_id)
if db_pool['vip_id']:
db_vip = self.plugin._get_resource(context, ldb.Vip,
db_pool['vip_id'])
self._dispatcher.dispatch_lb(
embrane_ctx.DispatcherContext(
econ.Events.DELETE_POOL_HM, db_vip, context, None),
health_monitor, pool_id)
else:
self._delete_pool_hm(context, health_monitor, pool_id)

View File

@ -0,0 +1,30 @@
# Copyright 2014 Embrane, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
import sqlalchemy as sql
from neutron.db.models_v2 import model_base
class PoolPort(model_base.BASEV2):
"""Represents the connection between pools and ports."""
__tablename__ = 'embrane_pool_port'
pool_id = sql.Column(sql.String(36), sql.ForeignKey('pools.id'),
primary_key=True)
port_id = sql.Column(sql.String(36), sql.ForeignKey('ports.id'),
nullable=False)

View File

@ -0,0 +1,71 @@
# Copyright 2014 Embrane, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
from heleosapi import exceptions as h_exc
from neutron import context
from neutron.db.loadbalancer import loadbalancer_db as ldb
from neutron.db import servicetype_db as sdb
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.plugins.common import constants as ccon
from neutron.plugins.embrane.common import contexts as embrane_ctx
from neutron.services.loadbalancer.drivers.embrane import constants as econ
LOG = logging.getLogger(__name__)
skip_states = [ccon.PENDING_CREATE,
ccon.PENDING_DELETE,
ccon.PENDING_UPDATE,
ccon.ERROR]
class Poller(object):
def __init__(self, driver):
self.dispatcher = driver._dispatcher
service_type_manager = sdb.ServiceTypeManager.get_instance()
self.provider = (service_type_manager.get_service_providers(
None, filters={
'service_type': [ccon.LOADBALANCER],
'driver': ['neutron.services.loadbalancer.drivers.'
'embrane.driver.EmbraneLbaas']}))[0]['name']
def start_polling(self, interval):
loop_call = loopingcall.FixedIntervalLoopingCall(self._run)
loop_call.start(interval=interval)
return loop_call
def _run(self):
ctx = context.get_admin_context()
try:
self.synchronize_vips(ctx)
except h_exc.PollingException as e:
LOG.exception(_('Unhandled exception occurred'), e)
def synchronize_vips(self, ctx):
session = ctx.session
vips = session.query(ldb.Vip).join(
sdb.ProviderResourceAssociation,
sdb.ProviderResourceAssociation.resource_id ==
ldb.Vip.pool_id).filter(
sdb.ProviderResourceAssociation.provider_name == self.provider)
# No need to check pending states
for vip in vips:
if vip['status'] not in skip_states:
self.dispatcher.dispatch_lb(
d_context=embrane_ctx.DispatcherContext(
econ.Events.POLL_GRAPH, vip, ctx, None),
args=())

View File

@ -0,0 +1,35 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Embrane, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Ivar Lazzaro, Embrane, Inc.
import sys
import mock
from oslo.config import cfg
from neutron.services.loadbalancer.drivers.embrane import config # noqa
from neutron.tests import base
sys.modules["heleosapi"] = mock.Mock()
class ConfigurationTest(base.BaseTestCase):
def test_defaults(self):
self.assertEqual('small', cfg.CONF.heleoslb.lb_flavor)
self.assertEqual(60, cfg.CONF.heleoslb.sync_interval)

View File

@ -0,0 +1,88 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Embrane, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
import sys
import mock
sys.modules["heleosapi"] = mock.Mock()
from oslo.config import cfg
from neutron import context
from neutron.openstack.common.db import exception as n_exc
from neutron.services.loadbalancer.drivers.embrane import config # noqa
from neutron.services.loadbalancer.drivers.embrane import constants as h_con
from neutron.services.loadbalancer.drivers.embrane import db as h_db
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
EMBRANE_PROVIDER = ('LOADBALANCER:lbaas:neutron.services.'
'loadbalancer.drivers.embrane.driver.'
'EmbraneLbaas:default')
class TestLoadBalancerPluginBase(
test_db_loadbalancer.LoadBalancerPluginDbTestCase):
def setUp(self):
cfg.CONF.set_override('admin_password', "admin123", 'heleoslb')
cfg.CONF.set_override('sync_interval', 0, 'heleoslb')
super(TestLoadBalancerPluginBase, self).setUp(
lbaas_provider=EMBRANE_PROVIDER)
self.driver = self.plugin.drivers['lbaas']
class TestLoadBalancerPlugin(test_db_loadbalancer.TestLoadBalancer,
TestLoadBalancerPluginBase):
def test_create_vip_with_session_persistence_with_app_cookie(self):
self.skip("App cookie persistence not supported.")
def test_pool_port(self):
with self.port(no_delete=True) as port:
with self.pool() as pool:
h_db.add_pool_port(context.get_admin_context(),
pool['pool']['id'], port['port']['id'])
pool_port = h_db.get_pool_port(context.get_admin_context(),
pool['pool']['id'])
self.assertIsNotNone(pool_port)
pool_port = h_db.get_pool_port(context.get_admin_context(),
pool['pool']['id'])
self.assertIsNone(pool_port)
def test_create_pool_port_no_port(self):
with self.pool() as pool:
self.assertRaises(n_exc.DBError,
h_db.add_pool_port,
context.get_admin_context(),
pool['pool']['id'], None)
def test_lb_operations_handlers(self):
h = self.driver._dispatcher.handlers
self.assertIsNotNone(h[h_con.Events.ADD_OR_UPDATE_MEMBER])
self.assertIsNotNone(h[h_con.Events.CREATE_VIP])
self.assertIsNotNone(h[h_con.Events.DELETE_MEMBER])
self.assertIsNotNone(h[h_con.Events.DELETE_VIP])
self.assertIsNotNone(h[h_con.Events.POLL_GRAPH])
self.assertIsNotNone(h[h_con.Events.REMOVE_MEMBER])
self.assertIsNotNone(h[h_con.Events.UPDATE_POOL])
self.assertIsNotNone(h[h_con.Events.UPDATE_VIP])
self.assertIsNotNone(h[h_con.Events.UPDATE_POOL_HM])
self.assertIsNotNone(h[h_con.Events.DELETE_POOL_HM])
self.assertIsNotNone(h[h_con.Events.ADD_POOL_HM])