Merge "LBaaS: move agent based driver files into a separate dir"
This commit is contained in:
commit
6a346f37d4
@ -29,10 +29,12 @@ L3PLUGIN = 'q-l3-plugin'
|
|||||||
DHCP = 'q-dhcp-notifer'
|
DHCP = 'q-dhcp-notifer'
|
||||||
FIREWALL_PLUGIN = 'q-firewall-plugin'
|
FIREWALL_PLUGIN = 'q-firewall-plugin'
|
||||||
METERING_PLUGIN = 'q-metering-plugin'
|
METERING_PLUGIN = 'q-metering-plugin'
|
||||||
|
LOADBALANCER_PLUGIN = 'n-lbaas-plugin'
|
||||||
|
|
||||||
L3_AGENT = 'l3_agent'
|
L3_AGENT = 'l3_agent'
|
||||||
DHCP_AGENT = 'dhcp_agent'
|
DHCP_AGENT = 'dhcp_agent'
|
||||||
METERING_AGENT = 'metering_agent'
|
METERING_AGENT = 'metering_agent'
|
||||||
|
LOADBALANCER_AGENT = 'n-lbaas_agent'
|
||||||
|
|
||||||
|
|
||||||
def get_topic_name(prefix, table, operation, host=None):
|
def get_topic_name(prefix, table, operation, host=None):
|
||||||
|
0
neutron/services/loadbalancer/agent/__init__.py
Normal file
0
neutron/services/loadbalancer/agent/__init__.py
Normal file
@ -22,12 +22,10 @@ from oslo.config import cfg
|
|||||||
from neutron.agent.common import config
|
from neutron.agent.common import config
|
||||||
from neutron.agent.linux import interface
|
from neutron.agent.linux import interface
|
||||||
from neutron.common import legacy
|
from neutron.common import legacy
|
||||||
|
from neutron.common import topics
|
||||||
from neutron.openstack.common.rpc import service as rpc_service
|
from neutron.openstack.common.rpc import service as rpc_service
|
||||||
from neutron.openstack.common import service
|
from neutron.openstack.common import service
|
||||||
from neutron.services.loadbalancer.drivers.haproxy import (
|
from neutron.services.loadbalancer.agent import agent_manager as manager
|
||||||
agent_manager as manager,
|
|
||||||
plugin_driver
|
|
||||||
)
|
|
||||||
|
|
||||||
OPTS = [
|
OPTS = [
|
||||||
cfg.IntOpt(
|
cfg.IntOpt(
|
||||||
@ -65,7 +63,7 @@ def main():
|
|||||||
mgr = manager.LbaasAgentManager(cfg.CONF)
|
mgr = manager.LbaasAgentManager(cfg.CONF)
|
||||||
svc = LbaasAgentService(
|
svc = LbaasAgentService(
|
||||||
host=cfg.CONF.host,
|
host=cfg.CONF.host,
|
||||||
topic=plugin_driver.TOPIC_LOADBALANCER_AGENT,
|
topic=topics.LOADBALANCER_AGENT,
|
||||||
manager=mgr
|
manager=mgr
|
||||||
)
|
)
|
||||||
service.launch(svc).wait()
|
service.launch(svc).wait()
|
@ -21,16 +21,14 @@ from oslo.config import cfg
|
|||||||
from neutron.agent import rpc as agent_rpc
|
from neutron.agent import rpc as agent_rpc
|
||||||
from neutron.common import constants as n_const
|
from neutron.common import constants as n_const
|
||||||
from neutron.common import exceptions as n_exc
|
from neutron.common import exceptions as n_exc
|
||||||
|
from neutron.common import topics
|
||||||
from neutron import context
|
from neutron import context
|
||||||
from neutron.openstack.common import importutils
|
from neutron.openstack.common import importutils
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
from neutron.openstack.common import loopingcall
|
from neutron.openstack.common import loopingcall
|
||||||
from neutron.openstack.common import periodic_task
|
from neutron.openstack.common import periodic_task
|
||||||
from neutron.plugins.common import constants
|
from neutron.plugins.common import constants
|
||||||
from neutron.services.loadbalancer.drivers.haproxy import (
|
from neutron.services.loadbalancer.agent import agent_api
|
||||||
agent_api,
|
|
||||||
plugin_driver
|
|
||||||
)
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -67,7 +65,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
|
|||||||
self.conf = conf
|
self.conf = conf
|
||||||
self.context = context.get_admin_context_without_session()
|
self.context = context.get_admin_context_without_session()
|
||||||
self.plugin_rpc = agent_api.LbaasAgentApi(
|
self.plugin_rpc = agent_api.LbaasAgentApi(
|
||||||
plugin_driver.TOPIC_LOADBALANCER_PLUGIN,
|
topics.LOADBALANCER_PLUGIN,
|
||||||
self.context,
|
self.context,
|
||||||
self.conf.host
|
self.conf.host
|
||||||
)
|
)
|
||||||
@ -76,7 +74,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
|
|||||||
self.agent_state = {
|
self.agent_state = {
|
||||||
'binary': 'neutron-lbaas-agent',
|
'binary': 'neutron-lbaas-agent',
|
||||||
'host': conf.host,
|
'host': conf.host,
|
||||||
'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT,
|
'topic': topics.LOADBALANCER_AGENT,
|
||||||
'configurations': {'device_drivers': self.device_drivers.keys()},
|
'configurations': {'device_drivers': self.device_drivers.keys()},
|
||||||
'agent_type': n_const.AGENT_TYPE_LOADBALANCER,
|
'agent_type': n_const.AGENT_TYPE_LOADBALANCER,
|
||||||
'start_flag': True}
|
'start_flag': True}
|
||||||
@ -109,7 +107,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
|
|||||||
|
|
||||||
def _setup_state_rpc(self):
|
def _setup_state_rpc(self):
|
||||||
self.state_rpc = agent_rpc.PluginReportStateAPI(
|
self.state_rpc = agent_rpc.PluginReportStateAPI(
|
||||||
plugin_driver.TOPIC_LOADBALANCER_PLUGIN)
|
topics.LOADBALANCER_PLUGIN)
|
||||||
report_interval = self.conf.AGENT.report_interval
|
report_interval = self.conf.AGENT.report_interval
|
||||||
if report_interval:
|
if report_interval:
|
||||||
heartbeat = loopingcall.FixedIntervalLoopingCall(
|
heartbeat = loopingcall.FixedIntervalLoopingCall(
|
@ -0,0 +1,449 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
#
|
||||||
|
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
# @author: Mark McClain, DreamHost
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
from neutron.common import constants as q_const
|
||||||
|
from neutron.common import exceptions as q_exc
|
||||||
|
from neutron.common import rpc as q_rpc
|
||||||
|
from neutron.common import topics
|
||||||
|
from neutron.db import agents_db
|
||||||
|
from neutron.db.loadbalancer import loadbalancer_db
|
||||||
|
from neutron.extensions import lbaas_agentscheduler
|
||||||
|
from neutron.extensions import portbindings
|
||||||
|
from neutron.openstack.common import importutils
|
||||||
|
from neutron.openstack.common import log as logging
|
||||||
|
from neutron.openstack.common import rpc
|
||||||
|
from neutron.openstack.common.rpc import proxy
|
||||||
|
from neutron.plugins.common import constants
|
||||||
|
from neutron.services.loadbalancer.drivers import abstract_driver
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
AGENT_SCHEDULER_OPTS = [
|
||||||
|
cfg.StrOpt('loadbalancer_pool_scheduler_driver',
|
||||||
|
default='neutron.services.loadbalancer.agent_scheduler'
|
||||||
|
'.ChanceScheduler',
|
||||||
|
help=_('Driver to use for scheduling '
|
||||||
|
'pool to a default loadbalancer agent')),
|
||||||
|
]
|
||||||
|
|
||||||
|
cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
|
||||||
|
|
||||||
|
|
||||||
|
class DriverNotSpecified(q_exc.NeutronException):
|
||||||
|
message = _("Device driver for agent should be specified "
|
||||||
|
"in plugin driver.")
|
||||||
|
|
||||||
|
|
||||||
|
class LoadBalancerCallbacks(object):
|
||||||
|
|
||||||
|
RPC_API_VERSION = '2.0'
|
||||||
|
# history
|
||||||
|
# 1.0 Initial version
|
||||||
|
# 2.0 Generic API for agent based drivers
|
||||||
|
# - get_logical_device() handling changed;
|
||||||
|
# - pool_deployed() and update_status() methods added;
|
||||||
|
|
||||||
|
def __init__(self, plugin):
|
||||||
|
self.plugin = plugin
|
||||||
|
|
||||||
|
def create_rpc_dispatcher(self):
|
||||||
|
return q_rpc.PluginRpcDispatcher(
|
||||||
|
[self, agents_db.AgentExtRpcCallback(self.plugin)])
|
||||||
|
|
||||||
|
def get_ready_devices(self, context, host=None):
|
||||||
|
with context.session.begin(subtransactions=True):
|
||||||
|
agents = self.plugin.get_lbaas_agents(context,
|
||||||
|
filters={'host': [host]})
|
||||||
|
if not agents:
|
||||||
|
return []
|
||||||
|
elif len(agents) > 1:
|
||||||
|
LOG.warning(_('Multiple lbaas agents found on host %s'), host)
|
||||||
|
pools = self.plugin.list_pools_on_lbaas_agent(context,
|
||||||
|
agents[0].id)
|
||||||
|
pool_ids = [pool['id'] for pool in pools['pools']]
|
||||||
|
|
||||||
|
qry = context.session.query(loadbalancer_db.Pool.id)
|
||||||
|
qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
|
||||||
|
qry = qry.filter(
|
||||||
|
loadbalancer_db.Pool.status.in_(constants.ACTIVE_PENDING))
|
||||||
|
up = True # makes pep8 and sqlalchemy happy
|
||||||
|
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
|
||||||
|
return [id for id, in qry]
|
||||||
|
|
||||||
|
def get_logical_device(self, context, pool_id=None):
|
||||||
|
with context.session.begin(subtransactions=True):
|
||||||
|
qry = context.session.query(loadbalancer_db.Pool)
|
||||||
|
qry = qry.filter_by(id=pool_id)
|
||||||
|
pool = qry.one()
|
||||||
|
|
||||||
|
if pool.status != constants.ACTIVE:
|
||||||
|
raise q_exc.Invalid(_('Expected active pool'))
|
||||||
|
|
||||||
|
retval = {}
|
||||||
|
retval['pool'] = self.plugin._make_pool_dict(pool)
|
||||||
|
|
||||||
|
if pool.vip:
|
||||||
|
retval['vip'] = self.plugin._make_vip_dict(pool.vip)
|
||||||
|
retval['vip']['port'] = (
|
||||||
|
self.plugin._core_plugin._make_port_dict(pool.vip.port)
|
||||||
|
)
|
||||||
|
for fixed_ip in retval['vip']['port']['fixed_ips']:
|
||||||
|
fixed_ip['subnet'] = (
|
||||||
|
self.plugin._core_plugin.get_subnet(
|
||||||
|
context,
|
||||||
|
fixed_ip['subnet_id']
|
||||||
|
)
|
||||||
|
)
|
||||||
|
retval['members'] = [
|
||||||
|
self.plugin._make_member_dict(m)
|
||||||
|
for m in pool.members if (
|
||||||
|
m.status in constants.ACTIVE_PENDING or
|
||||||
|
m.status == constants.INACTIVE)
|
||||||
|
]
|
||||||
|
retval['healthmonitors'] = [
|
||||||
|
self.plugin._make_health_monitor_dict(hm.healthmonitor)
|
||||||
|
for hm in pool.monitors
|
||||||
|
if hm.status in constants.ACTIVE_PENDING
|
||||||
|
]
|
||||||
|
retval['driver'] = (
|
||||||
|
self.plugin.drivers[pool.provider.provider_name].device_driver)
|
||||||
|
|
||||||
|
return retval
|
||||||
|
|
||||||
|
def pool_deployed(self, context, pool_id):
|
||||||
|
with context.session.begin(subtransactions=True):
|
||||||
|
qry = context.session.query(loadbalancer_db.Pool)
|
||||||
|
qry = qry.filter_by(id=pool_id)
|
||||||
|
pool = qry.one()
|
||||||
|
|
||||||
|
# set all resources to active
|
||||||
|
if pool.status in constants.ACTIVE_PENDING:
|
||||||
|
pool.status = constants.ACTIVE
|
||||||
|
|
||||||
|
if pool.vip and pool.vip.status in constants.ACTIVE_PENDING:
|
||||||
|
pool.vip.status = constants.ACTIVE
|
||||||
|
|
||||||
|
for m in pool.members:
|
||||||
|
if m.status in constants.ACTIVE_PENDING:
|
||||||
|
m.status = constants.ACTIVE
|
||||||
|
|
||||||
|
for hm in pool.monitors:
|
||||||
|
if hm.status in constants.ACTIVE_PENDING:
|
||||||
|
hm.status = constants.ACTIVE
|
||||||
|
|
||||||
|
def update_status(self, context, obj_type, obj_id, status):
|
||||||
|
model_mapping = {
|
||||||
|
'pool': loadbalancer_db.Pool,
|
||||||
|
'vip': loadbalancer_db.Vip,
|
||||||
|
'member': loadbalancer_db.Member,
|
||||||
|
'health_monitor': loadbalancer_db.PoolMonitorAssociation
|
||||||
|
}
|
||||||
|
if obj_type not in model_mapping:
|
||||||
|
raise q_exc.Invalid(_('Unknown object type: %s') % obj_type)
|
||||||
|
try:
|
||||||
|
if obj_type == 'health_monitor':
|
||||||
|
self.plugin.update_pool_health_monitor(
|
||||||
|
context, obj_id['monitor_id'], obj_id['pool_id'], status)
|
||||||
|
else:
|
||||||
|
self.plugin.update_status(
|
||||||
|
context, model_mapping[obj_type], obj_id, status)
|
||||||
|
except q_exc.NotFound:
|
||||||
|
# update_status may come from agent on an object which was
|
||||||
|
# already deleted from db with other request
|
||||||
|
LOG.warning(_('Cannot update status: %(obj_type)s %(obj_id)s '
|
||||||
|
'not found in the DB, it was probably deleted '
|
||||||
|
'concurrently'),
|
||||||
|
{'obj_type': obj_type, 'obj_id': obj_id})
|
||||||
|
|
||||||
|
def pool_destroyed(self, context, pool_id=None):
|
||||||
|
"""Agent confirmation hook that a pool has been destroyed.
|
||||||
|
|
||||||
|
This method exists for subclasses to change the deletion
|
||||||
|
behavior.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def plug_vip_port(self, context, port_id=None, host=None):
|
||||||
|
if not port_id:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
port = self.plugin._core_plugin.get_port(
|
||||||
|
context,
|
||||||
|
port_id
|
||||||
|
)
|
||||||
|
except q_exc.PortNotFound:
|
||||||
|
msg = _('Unable to find port %s to plug.')
|
||||||
|
LOG.debug(msg, port_id)
|
||||||
|
return
|
||||||
|
|
||||||
|
port['admin_state_up'] = True
|
||||||
|
port['device_owner'] = 'neutron:' + constants.LOADBALANCER
|
||||||
|
port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
|
||||||
|
port[portbindings.HOST_ID] = host
|
||||||
|
self.plugin._core_plugin.update_port(
|
||||||
|
context,
|
||||||
|
port_id,
|
||||||
|
{'port': port}
|
||||||
|
)
|
||||||
|
|
||||||
|
def unplug_vip_port(self, context, port_id=None, host=None):
|
||||||
|
if not port_id:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
port = self.plugin._core_plugin.get_port(
|
||||||
|
context,
|
||||||
|
port_id
|
||||||
|
)
|
||||||
|
except q_exc.PortNotFound:
|
||||||
|
msg = _('Unable to find port %s to unplug. This can occur when '
|
||||||
|
'the Vip has been deleted first.')
|
||||||
|
LOG.debug(msg, port_id)
|
||||||
|
return
|
||||||
|
|
||||||
|
port['admin_state_up'] = False
|
||||||
|
port['device_owner'] = ''
|
||||||
|
port['device_id'] = ''
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.plugin._core_plugin.update_port(
|
||||||
|
context,
|
||||||
|
port_id,
|
||||||
|
{'port': port}
|
||||||
|
)
|
||||||
|
|
||||||
|
except q_exc.PortNotFound:
|
||||||
|
msg = _('Unable to find port %s to unplug. This can occur when '
|
||||||
|
'the Vip has been deleted first.')
|
||||||
|
LOG.debug(msg, port_id)
|
||||||
|
|
||||||
|
def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
|
||||||
|
self.plugin.update_pool_stats(context, pool_id, data=stats)
|
||||||
|
|
||||||
|
|
||||||
|
class LoadBalancerAgentApi(proxy.RpcProxy):
|
||||||
|
"""Plugin side of plugin to agent RPC API."""
|
||||||
|
|
||||||
|
BASE_RPC_API_VERSION = '2.0'
|
||||||
|
# history
|
||||||
|
# 1.0 Initial version
|
||||||
|
# 1.1 Support agent_updated call
|
||||||
|
# 2.0 Generic API for agent based drivers
|
||||||
|
# - modify/reload/destroy_pool methods were removed;
|
||||||
|
# - added methods to handle create/update/delete for every lbaas
|
||||||
|
# object individually;
|
||||||
|
|
||||||
|
def __init__(self, topic):
|
||||||
|
super(LoadBalancerAgentApi, self).__init__(
|
||||||
|
topic, default_version=self.BASE_RPC_API_VERSION)
|
||||||
|
|
||||||
|
def _cast(self, context, method_name, method_args, host, version=None):
|
||||||
|
return self.cast(
|
||||||
|
context,
|
||||||
|
self.make_msg(method_name, **method_args),
|
||||||
|
topic='%s.%s' % (self.topic, host),
|
||||||
|
version=version
|
||||||
|
)
|
||||||
|
|
||||||
|
def create_vip(self, context, vip, host):
|
||||||
|
return self._cast(context, 'create_vip', {'vip': vip}, host)
|
||||||
|
|
||||||
|
def update_vip(self, context, old_vip, vip, host):
|
||||||
|
return self._cast(context, 'update_vip',
|
||||||
|
{'old_vip': old_vip, 'vip': vip}, host)
|
||||||
|
|
||||||
|
def delete_vip(self, context, vip, host):
|
||||||
|
return self._cast(context, 'delete_vip', {'vip': vip}, host)
|
||||||
|
|
||||||
|
def create_pool(self, context, pool, host, driver_name):
|
||||||
|
return self._cast(context, 'create_pool',
|
||||||
|
{'pool': pool, 'driver_name': driver_name}, host)
|
||||||
|
|
||||||
|
def update_pool(self, context, old_pool, pool, host):
|
||||||
|
return self._cast(context, 'update_pool',
|
||||||
|
{'old_pool': old_pool, 'pool': pool}, host)
|
||||||
|
|
||||||
|
def delete_pool(self, context, pool, host):
|
||||||
|
return self._cast(context, 'delete_pool', {'pool': pool}, host)
|
||||||
|
|
||||||
|
def create_member(self, context, member, host):
|
||||||
|
return self._cast(context, 'create_member', {'member': member}, host)
|
||||||
|
|
||||||
|
def update_member(self, context, old_member, member, host):
|
||||||
|
return self._cast(context, 'update_member',
|
||||||
|
{'old_member': old_member, 'member': member}, host)
|
||||||
|
|
||||||
|
def delete_member(self, context, member, host):
|
||||||
|
return self._cast(context, 'delete_member', {'member': member}, host)
|
||||||
|
|
||||||
|
def create_pool_health_monitor(self, context, health_monitor, pool_id,
|
||||||
|
host):
|
||||||
|
return self._cast(context, 'create_pool_health_monitor',
|
||||||
|
{'health_monitor': health_monitor,
|
||||||
|
'pool_id': pool_id}, host)
|
||||||
|
|
||||||
|
def update_pool_health_monitor(self, context, old_health_monitor,
|
||||||
|
health_monitor, pool_id, host):
|
||||||
|
return self._cast(context, 'update_pool_health_monitor',
|
||||||
|
{'old_health_monitor': old_health_monitor,
|
||||||
|
'health_monitor': health_monitor,
|
||||||
|
'pool_id': pool_id}, host)
|
||||||
|
|
||||||
|
def delete_pool_health_monitor(self, context, health_monitor, pool_id,
|
||||||
|
host):
|
||||||
|
return self._cast(context, 'delete_pool_health_monitor',
|
||||||
|
{'health_monitor': health_monitor,
|
||||||
|
'pool_id': pool_id}, host)
|
||||||
|
|
||||||
|
def agent_updated(self, context, admin_state_up, host):
|
||||||
|
return self._cast(context, 'agent_updated',
|
||||||
|
{'payload': {'admin_state_up': admin_state_up}},
|
||||||
|
host)
|
||||||
|
|
||||||
|
|
||||||
|
class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
|
||||||
|
|
||||||
|
# name of device driver that should be used by the agent;
|
||||||
|
# vendor specific plugin drivers must override it;
|
||||||
|
device_driver = None
|
||||||
|
|
||||||
|
def __init__(self, plugin):
|
||||||
|
if not self.device_driver:
|
||||||
|
raise DriverNotSpecified()
|
||||||
|
|
||||||
|
self.agent_rpc = LoadBalancerAgentApi(topics.LOADBALANCER_AGENT)
|
||||||
|
|
||||||
|
self.plugin = plugin
|
||||||
|
self._set_callbacks_on_plugin()
|
||||||
|
self.plugin.agent_notifiers.update(
|
||||||
|
{q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
|
||||||
|
|
||||||
|
self.pool_scheduler = importutils.import_object(
|
||||||
|
cfg.CONF.loadbalancer_pool_scheduler_driver)
|
||||||
|
|
||||||
|
def _set_callbacks_on_plugin(self):
|
||||||
|
# other agent based plugin driver might already set callbacks on plugin
|
||||||
|
if hasattr(self.plugin, 'agent_callbacks'):
|
||||||
|
return
|
||||||
|
|
||||||
|
self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
|
||||||
|
self.plugin.conn = rpc.create_connection(new=True)
|
||||||
|
self.plugin.conn.create_consumer(
|
||||||
|
topics.LOADBALANCER_PLUGIN,
|
||||||
|
self.plugin.agent_callbacks.create_rpc_dispatcher(),
|
||||||
|
fanout=False)
|
||||||
|
self.plugin.conn.consume_in_thread()
|
||||||
|
|
||||||
|
def get_pool_agent(self, context, pool_id):
|
||||||
|
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
|
||||||
|
if not agent:
|
||||||
|
raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id)
|
||||||
|
return agent['agent']
|
||||||
|
|
||||||
|
def create_vip(self, context, vip):
|
||||||
|
agent = self.get_pool_agent(context, vip['pool_id'])
|
||||||
|
self.agent_rpc.create_vip(context, vip, agent['host'])
|
||||||
|
|
||||||
|
def update_vip(self, context, old_vip, vip):
|
||||||
|
agent = self.get_pool_agent(context, vip['pool_id'])
|
||||||
|
if vip['status'] in constants.ACTIVE_PENDING:
|
||||||
|
self.agent_rpc.update_vip(context, old_vip, vip, agent['host'])
|
||||||
|
else:
|
||||||
|
self.agent_rpc.delete_vip(context, vip, agent['host'])
|
||||||
|
|
||||||
|
def delete_vip(self, context, vip):
|
||||||
|
self.plugin._delete_db_vip(context, vip['id'])
|
||||||
|
agent = self.get_pool_agent(context, vip['pool_id'])
|
||||||
|
self.agent_rpc.delete_vip(context, vip, agent['host'])
|
||||||
|
|
||||||
|
def create_pool(self, context, pool):
|
||||||
|
agent = self.pool_scheduler.schedule(self.plugin, context, pool,
|
||||||
|
self.device_driver)
|
||||||
|
if not agent:
|
||||||
|
raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
|
||||||
|
self.agent_rpc.create_pool(context, pool, agent['host'],
|
||||||
|
self.device_driver)
|
||||||
|
|
||||||
|
def update_pool(self, context, old_pool, pool):
|
||||||
|
agent = self.get_pool_agent(context, pool['id'])
|
||||||
|
if pool['status'] in constants.ACTIVE_PENDING:
|
||||||
|
self.agent_rpc.update_pool(context, old_pool, pool,
|
||||||
|
agent['host'])
|
||||||
|
else:
|
||||||
|
self.agent_rpc.delete_pool(context, pool, agent['host'])
|
||||||
|
|
||||||
|
def delete_pool(self, context, pool):
|
||||||
|
# get agent first to know host as binding will be deleted
|
||||||
|
# after pool is deleted from db
|
||||||
|
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
|
||||||
|
self.plugin._delete_db_pool(context, pool['id'])
|
||||||
|
if agent:
|
||||||
|
self.agent_rpc.delete_pool(context, pool, agent['agent']['host'])
|
||||||
|
|
||||||
|
def create_member(self, context, member):
|
||||||
|
agent = self.get_pool_agent(context, member['pool_id'])
|
||||||
|
self.agent_rpc.create_member(context, member, agent['host'])
|
||||||
|
|
||||||
|
def update_member(self, context, old_member, member):
|
||||||
|
agent = self.get_pool_agent(context, member['pool_id'])
|
||||||
|
# member may change pool id
|
||||||
|
if member['pool_id'] != old_member['pool_id']:
|
||||||
|
old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool(
|
||||||
|
context, old_member['pool_id'])
|
||||||
|
if old_pool_agent:
|
||||||
|
self.agent_rpc.delete_member(context, old_member,
|
||||||
|
old_pool_agent['agent']['host'])
|
||||||
|
self.agent_rpc.create_member(context, member, agent['host'])
|
||||||
|
else:
|
||||||
|
self.agent_rpc.update_member(context, old_member, member,
|
||||||
|
agent['host'])
|
||||||
|
|
||||||
|
def delete_member(self, context, member):
|
||||||
|
self.plugin._delete_db_member(context, member['id'])
|
||||||
|
agent = self.get_pool_agent(context, member['pool_id'])
|
||||||
|
self.agent_rpc.delete_member(context, member, agent['host'])
|
||||||
|
|
||||||
|
def create_pool_health_monitor(self, context, healthmon, pool_id):
|
||||||
|
# healthmon is not used here
|
||||||
|
agent = self.get_pool_agent(context, pool_id)
|
||||||
|
self.agent_rpc.create_pool_health_monitor(context, healthmon,
|
||||||
|
pool_id, agent['host'])
|
||||||
|
|
||||||
|
def update_pool_health_monitor(self, context, old_health_monitor,
|
||||||
|
health_monitor, pool_id):
|
||||||
|
agent = self.get_pool_agent(context, pool_id)
|
||||||
|
self.agent_rpc.update_pool_health_monitor(context, old_health_monitor,
|
||||||
|
health_monitor, pool_id,
|
||||||
|
agent['host'])
|
||||||
|
|
||||||
|
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
|
||||||
|
self.plugin._delete_db_pool_health_monitor(
|
||||||
|
context, health_monitor['id'], pool_id
|
||||||
|
)
|
||||||
|
|
||||||
|
agent = self.get_pool_agent(context, pool_id)
|
||||||
|
self.agent_rpc.delete_pool_health_monitor(context, health_monitor,
|
||||||
|
pool_id, agent['host'])
|
||||||
|
|
||||||
|
def stats(self, context, pool_id):
|
||||||
|
pass
|
@ -31,8 +31,8 @@ from neutron.openstack.common import excutils
|
|||||||
from neutron.openstack.common import importutils
|
from neutron.openstack.common import importutils
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
from neutron.plugins.common import constants
|
from neutron.plugins.common import constants
|
||||||
|
from neutron.services.loadbalancer.agent import agent_device_driver
|
||||||
from neutron.services.loadbalancer import constants as lb_const
|
from neutron.services.loadbalancer import constants as lb_const
|
||||||
from neutron.services.loadbalancer.drivers import agent_device_driver
|
|
||||||
from neutron.services.loadbalancer.drivers.haproxy import cfg as hacfg
|
from neutron.services.loadbalancer.drivers.haproxy import cfg as hacfg
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
#
|
|
||||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
# Copyright (c) 2013 OpenStack Foundation.
|
||||||
|
# All Rights Reserved.
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
@ -13,447 +14,10 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
#
|
|
||||||
# @author: Mark McClain, DreamHost
|
|
||||||
|
|
||||||
import uuid
|
from neutron.services.loadbalancer.drivers.common import agent_driver_base
|
||||||
|
from neutron.services.loadbalancer.drivers.haproxy import namespace_driver
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
|
|
||||||
from neutron.common import constants as q_const
|
class HaproxyOnHostPluginDriver(agent_driver_base.AgentDriverBase):
|
||||||
from neutron.common import exceptions as q_exc
|
device_driver = namespace_driver.DRIVER_NAME
|
||||||
from neutron.common import rpc as q_rpc
|
|
||||||
from neutron.db import agents_db
|
|
||||||
from neutron.db.loadbalancer import loadbalancer_db
|
|
||||||
from neutron.extensions import lbaas_agentscheduler
|
|
||||||
from neutron.extensions import portbindings
|
|
||||||
from neutron.openstack.common import importutils
|
|
||||||
from neutron.openstack.common import log as logging
|
|
||||||
from neutron.openstack.common import rpc
|
|
||||||
from neutron.openstack.common.rpc import proxy
|
|
||||||
from neutron.plugins.common import constants
|
|
||||||
from neutron.services.loadbalancer.drivers import abstract_driver
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
AGENT_SCHEDULER_OPTS = [
|
|
||||||
cfg.StrOpt('loadbalancer_pool_scheduler_driver',
|
|
||||||
default='neutron.services.loadbalancer.agent_scheduler'
|
|
||||||
'.ChanceScheduler',
|
|
||||||
help=_('Driver to use for scheduling '
|
|
||||||
'pool to a default loadbalancer agent')),
|
|
||||||
]
|
|
||||||
|
|
||||||
cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
|
|
||||||
|
|
||||||
# topic name for this particular agent implementation
|
|
||||||
TOPIC_LOADBALANCER_PLUGIN = 'n-lbaas-plugin'
|
|
||||||
TOPIC_LOADBALANCER_AGENT = 'n-lbaas_agent'
|
|
||||||
|
|
||||||
|
|
||||||
class DriverNotSpecified(q_exc.NeutronException):
|
|
||||||
message = _("Device driver for agent should be specified "
|
|
||||||
"in plugin driver.")
|
|
||||||
|
|
||||||
|
|
||||||
class LoadBalancerCallbacks(object):
|
|
||||||
|
|
||||||
RPC_API_VERSION = '2.0'
|
|
||||||
# history
|
|
||||||
# 1.0 Initial version
|
|
||||||
# 2.0 Generic API for agent based drivers
|
|
||||||
# - get_logical_device() handling changed;
|
|
||||||
# - pool_deployed() and update_status() methods added;
|
|
||||||
|
|
||||||
def __init__(self, plugin):
|
|
||||||
self.plugin = plugin
|
|
||||||
|
|
||||||
def create_rpc_dispatcher(self):
|
|
||||||
return q_rpc.PluginRpcDispatcher(
|
|
||||||
[self, agents_db.AgentExtRpcCallback(self.plugin)])
|
|
||||||
|
|
||||||
def get_ready_devices(self, context, host=None):
|
|
||||||
with context.session.begin(subtransactions=True):
|
|
||||||
agents = self.plugin.get_lbaas_agents(context,
|
|
||||||
filters={'host': [host]})
|
|
||||||
if not agents:
|
|
||||||
return []
|
|
||||||
elif len(agents) > 1:
|
|
||||||
LOG.warning(_('Multiple lbaas agents found on host %s'), host)
|
|
||||||
pools = self.plugin.list_pools_on_lbaas_agent(context,
|
|
||||||
agents[0].id)
|
|
||||||
pool_ids = [pool['id'] for pool in pools['pools']]
|
|
||||||
|
|
||||||
qry = context.session.query(loadbalancer_db.Pool.id)
|
|
||||||
qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
|
|
||||||
qry = qry.filter(
|
|
||||||
loadbalancer_db.Pool.status.in_(constants.ACTIVE_PENDING))
|
|
||||||
up = True # makes pep8 and sqlalchemy happy
|
|
||||||
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
|
|
||||||
return [id for id, in qry]
|
|
||||||
|
|
||||||
def get_logical_device(self, context, pool_id=None):
|
|
||||||
with context.session.begin(subtransactions=True):
|
|
||||||
qry = context.session.query(loadbalancer_db.Pool)
|
|
||||||
qry = qry.filter_by(id=pool_id)
|
|
||||||
pool = qry.one()
|
|
||||||
|
|
||||||
if pool.status != constants.ACTIVE:
|
|
||||||
raise q_exc.Invalid(_('Expected active pool'))
|
|
||||||
|
|
||||||
retval = {}
|
|
||||||
retval['pool'] = self.plugin._make_pool_dict(pool)
|
|
||||||
|
|
||||||
if pool.vip:
|
|
||||||
retval['vip'] = self.plugin._make_vip_dict(pool.vip)
|
|
||||||
retval['vip']['port'] = (
|
|
||||||
self.plugin._core_plugin._make_port_dict(pool.vip.port)
|
|
||||||
)
|
|
||||||
for fixed_ip in retval['vip']['port']['fixed_ips']:
|
|
||||||
fixed_ip['subnet'] = (
|
|
||||||
self.plugin._core_plugin.get_subnet(
|
|
||||||
context,
|
|
||||||
fixed_ip['subnet_id']
|
|
||||||
)
|
|
||||||
)
|
|
||||||
retval['members'] = [
|
|
||||||
self.plugin._make_member_dict(m)
|
|
||||||
for m in pool.members if (
|
|
||||||
m.status in constants.ACTIVE_PENDING or
|
|
||||||
m.status == constants.INACTIVE)
|
|
||||||
]
|
|
||||||
retval['healthmonitors'] = [
|
|
||||||
self.plugin._make_health_monitor_dict(hm.healthmonitor)
|
|
||||||
for hm in pool.monitors
|
|
||||||
if hm.status in constants.ACTIVE_PENDING
|
|
||||||
]
|
|
||||||
retval['driver'] = (
|
|
||||||
self.plugin.drivers[pool.provider.provider_name].device_driver)
|
|
||||||
|
|
||||||
return retval
|
|
||||||
|
|
||||||
def pool_deployed(self, context, pool_id):
|
|
||||||
with context.session.begin(subtransactions=True):
|
|
||||||
qry = context.session.query(loadbalancer_db.Pool)
|
|
||||||
qry = qry.filter_by(id=pool_id)
|
|
||||||
pool = qry.one()
|
|
||||||
|
|
||||||
# set all resources to active
|
|
||||||
if pool.status in constants.ACTIVE_PENDING:
|
|
||||||
pool.status = constants.ACTIVE
|
|
||||||
|
|
||||||
if pool.vip and pool.vip.status in constants.ACTIVE_PENDING:
|
|
||||||
pool.vip.status = constants.ACTIVE
|
|
||||||
|
|
||||||
for m in pool.members:
|
|
||||||
if m.status in constants.ACTIVE_PENDING:
|
|
||||||
m.status = constants.ACTIVE
|
|
||||||
|
|
||||||
for hm in pool.monitors:
|
|
||||||
if hm.status in constants.ACTIVE_PENDING:
|
|
||||||
hm.status = constants.ACTIVE
|
|
||||||
|
|
||||||
def update_status(self, context, obj_type, obj_id, status):
|
|
||||||
model_mapping = {
|
|
||||||
'pool': loadbalancer_db.Pool,
|
|
||||||
'vip': loadbalancer_db.Vip,
|
|
||||||
'member': loadbalancer_db.Member,
|
|
||||||
'health_monitor': loadbalancer_db.PoolMonitorAssociation
|
|
||||||
}
|
|
||||||
if obj_type not in model_mapping:
|
|
||||||
raise q_exc.Invalid(_('Unknown object type: %s') % obj_type)
|
|
||||||
try:
|
|
||||||
if obj_type == 'health_monitor':
|
|
||||||
self.plugin.update_pool_health_monitor(
|
|
||||||
context, obj_id['monitor_id'], obj_id['pool_id'], status)
|
|
||||||
else:
|
|
||||||
self.plugin.update_status(
|
|
||||||
context, model_mapping[obj_type], obj_id, status)
|
|
||||||
except q_exc.NotFound:
|
|
||||||
# update_status may come from agent on an object which was
|
|
||||||
# already deleted from db with other request
|
|
||||||
LOG.warning(_('Cannot update status: %(obj_type)s %(obj_id)s '
|
|
||||||
'not found in the DB, it was probably deleted '
|
|
||||||
'concurrently'),
|
|
||||||
{'obj_type': obj_type, 'obj_id': obj_id})
|
|
||||||
|
|
||||||
def pool_destroyed(self, context, pool_id=None):
|
|
||||||
"""Agent confirmation hook that a pool has been destroyed.
|
|
||||||
|
|
||||||
This method exists for subclasses to change the deletion
|
|
||||||
behavior.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
def plug_vip_port(self, context, port_id=None, host=None):
|
|
||||||
if not port_id:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
port = self.plugin._core_plugin.get_port(
|
|
||||||
context,
|
|
||||||
port_id
|
|
||||||
)
|
|
||||||
except q_exc.PortNotFound:
|
|
||||||
msg = _('Unable to find port %s to plug.')
|
|
||||||
LOG.debug(msg, port_id)
|
|
||||||
return
|
|
||||||
|
|
||||||
port['admin_state_up'] = True
|
|
||||||
port['device_owner'] = 'neutron:' + constants.LOADBALANCER
|
|
||||||
port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
|
|
||||||
port[portbindings.HOST_ID] = host
|
|
||||||
self.plugin._core_plugin.update_port(
|
|
||||||
context,
|
|
||||||
port_id,
|
|
||||||
{'port': port}
|
|
||||||
)
|
|
||||||
|
|
||||||
def unplug_vip_port(self, context, port_id=None, host=None):
|
|
||||||
if not port_id:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
port = self.plugin._core_plugin.get_port(
|
|
||||||
context,
|
|
||||||
port_id
|
|
||||||
)
|
|
||||||
except q_exc.PortNotFound:
|
|
||||||
msg = _('Unable to find port %s to unplug. This can occur when '
|
|
||||||
'the Vip has been deleted first.')
|
|
||||||
LOG.debug(msg, port_id)
|
|
||||||
return
|
|
||||||
|
|
||||||
port['admin_state_up'] = False
|
|
||||||
port['device_owner'] = ''
|
|
||||||
port['device_id'] = ''
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.plugin._core_plugin.update_port(
|
|
||||||
context,
|
|
||||||
port_id,
|
|
||||||
{'port': port}
|
|
||||||
)
|
|
||||||
|
|
||||||
except q_exc.PortNotFound:
|
|
||||||
msg = _('Unable to find port %s to unplug. This can occur when '
|
|
||||||
'the Vip has been deleted first.')
|
|
||||||
LOG.debug(msg, port_id)
|
|
||||||
|
|
||||||
def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
|
|
||||||
self.plugin.update_pool_stats(context, pool_id, data=stats)
|
|
||||||
|
|
||||||
|
|
||||||
class LoadBalancerAgentApi(proxy.RpcProxy):
|
|
||||||
"""Plugin side of plugin to agent RPC API."""
|
|
||||||
|
|
||||||
BASE_RPC_API_VERSION = '2.0'
|
|
||||||
# history
|
|
||||||
# 1.0 Initial version
|
|
||||||
# 1.1 Support agent_updated call
|
|
||||||
# 2.0 Generic API for agent based drivers
|
|
||||||
# - modify/reload/destroy_pool methods were removed;
|
|
||||||
# - added methods to handle create/update/delete for every lbaas
|
|
||||||
# object individually;
|
|
||||||
|
|
||||||
def __init__(self, topic):
|
|
||||||
super(LoadBalancerAgentApi, self).__init__(
|
|
||||||
topic, default_version=self.BASE_RPC_API_VERSION)
|
|
||||||
|
|
||||||
def _cast(self, context, method_name, method_args, host, version=None):
|
|
||||||
return self.cast(
|
|
||||||
context,
|
|
||||||
self.make_msg(method_name, **method_args),
|
|
||||||
topic='%s.%s' % (self.topic, host),
|
|
||||||
version=version
|
|
||||||
)
|
|
||||||
|
|
||||||
def create_vip(self, context, vip, host):
|
|
||||||
return self._cast(context, 'create_vip', {'vip': vip}, host)
|
|
||||||
|
|
||||||
def update_vip(self, context, old_vip, vip, host):
|
|
||||||
return self._cast(context, 'update_vip',
|
|
||||||
{'old_vip': old_vip, 'vip': vip}, host)
|
|
||||||
|
|
||||||
def delete_vip(self, context, vip, host):
|
|
||||||
return self._cast(context, 'delete_vip', {'vip': vip}, host)
|
|
||||||
|
|
||||||
def create_pool(self, context, pool, host, driver_name):
|
|
||||||
return self._cast(context, 'create_pool',
|
|
||||||
{'pool': pool, 'driver_name': driver_name}, host)
|
|
||||||
|
|
||||||
def update_pool(self, context, old_pool, pool, host):
|
|
||||||
return self._cast(context, 'update_pool',
|
|
||||||
{'old_pool': old_pool, 'pool': pool}, host)
|
|
||||||
|
|
||||||
def delete_pool(self, context, pool, host):
|
|
||||||
return self._cast(context, 'delete_pool', {'pool': pool}, host)
|
|
||||||
|
|
||||||
def create_member(self, context, member, host):
|
|
||||||
return self._cast(context, 'create_member', {'member': member}, host)
|
|
||||||
|
|
||||||
def update_member(self, context, old_member, member, host):
|
|
||||||
return self._cast(context, 'update_member',
|
|
||||||
{'old_member': old_member, 'member': member}, host)
|
|
||||||
|
|
||||||
def delete_member(self, context, member, host):
|
|
||||||
return self._cast(context, 'delete_member', {'member': member}, host)
|
|
||||||
|
|
||||||
def create_pool_health_monitor(self, context, health_monitor, pool_id,
|
|
||||||
host):
|
|
||||||
return self._cast(context, 'create_pool_health_monitor',
|
|
||||||
{'health_monitor': health_monitor,
|
|
||||||
'pool_id': pool_id}, host)
|
|
||||||
|
|
||||||
def update_pool_health_monitor(self, context, old_health_monitor,
|
|
||||||
health_monitor, pool_id, host):
|
|
||||||
return self._cast(context, 'update_pool_health_monitor',
|
|
||||||
{'old_health_monitor': old_health_monitor,
|
|
||||||
'health_monitor': health_monitor,
|
|
||||||
'pool_id': pool_id}, host)
|
|
||||||
|
|
||||||
def delete_pool_health_monitor(self, context, health_monitor, pool_id,
|
|
||||||
host):
|
|
||||||
return self._cast(context, 'delete_pool_health_monitor',
|
|
||||||
{'health_monitor': health_monitor,
|
|
||||||
'pool_id': pool_id}, host)
|
|
||||||
|
|
||||||
def agent_updated(self, context, admin_state_up, host):
|
|
||||||
return self._cast(context, 'agent_updated',
|
|
||||||
{'payload': {'admin_state_up': admin_state_up}},
|
|
||||||
host)
|
|
||||||
|
|
||||||
|
|
||||||
class AgentBasedPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
|
|
||||||
|
|
||||||
# name of device driver that should be used by the agent;
|
|
||||||
# vendor specific plugin drivers must override it;
|
|
||||||
device_driver = None
|
|
||||||
|
|
||||||
def __init__(self, plugin):
|
|
||||||
if not self.device_driver:
|
|
||||||
raise DriverNotSpecified()
|
|
||||||
|
|
||||||
self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT)
|
|
||||||
|
|
||||||
self.plugin = plugin
|
|
||||||
self._set_callbacks_on_plugin()
|
|
||||||
self.plugin.agent_notifiers.update(
|
|
||||||
{q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
|
|
||||||
|
|
||||||
self.pool_scheduler = importutils.import_object(
|
|
||||||
cfg.CONF.loadbalancer_pool_scheduler_driver)
|
|
||||||
|
|
||||||
def _set_callbacks_on_plugin(self):
|
|
||||||
# other agent based plugin driver might already set callbacks on plugin
|
|
||||||
if hasattr(self.plugin, 'agent_callbacks'):
|
|
||||||
return
|
|
||||||
|
|
||||||
self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
|
|
||||||
self.plugin.conn = rpc.create_connection(new=True)
|
|
||||||
self.plugin.conn.create_consumer(
|
|
||||||
TOPIC_LOADBALANCER_PLUGIN,
|
|
||||||
self.plugin.agent_callbacks.create_rpc_dispatcher(),
|
|
||||||
fanout=False)
|
|
||||||
self.plugin.conn.consume_in_thread()
|
|
||||||
|
|
||||||
def get_pool_agent(self, context, pool_id):
|
|
||||||
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
|
|
||||||
if not agent:
|
|
||||||
raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id)
|
|
||||||
return agent['agent']
|
|
||||||
|
|
||||||
def create_vip(self, context, vip):
|
|
||||||
agent = self.get_pool_agent(context, vip['pool_id'])
|
|
||||||
self.agent_rpc.create_vip(context, vip, agent['host'])
|
|
||||||
|
|
||||||
def update_vip(self, context, old_vip, vip):
|
|
||||||
agent = self.get_pool_agent(context, vip['pool_id'])
|
|
||||||
if vip['status'] in constants.ACTIVE_PENDING:
|
|
||||||
self.agent_rpc.update_vip(context, old_vip, vip, agent['host'])
|
|
||||||
else:
|
|
||||||
self.agent_rpc.delete_vip(context, vip, agent['host'])
|
|
||||||
|
|
||||||
def delete_vip(self, context, vip):
|
|
||||||
self.plugin._delete_db_vip(context, vip['id'])
|
|
||||||
agent = self.get_pool_agent(context, vip['pool_id'])
|
|
||||||
self.agent_rpc.delete_vip(context, vip, agent['host'])
|
|
||||||
|
|
||||||
def create_pool(self, context, pool):
|
|
||||||
agent = self.pool_scheduler.schedule(self.plugin, context, pool,
|
|
||||||
self.device_driver)
|
|
||||||
if not agent:
|
|
||||||
raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
|
|
||||||
self.agent_rpc.create_pool(context, pool, agent['host'],
|
|
||||||
self.device_driver)
|
|
||||||
|
|
||||||
def update_pool(self, context, old_pool, pool):
|
|
||||||
agent = self.get_pool_agent(context, pool['id'])
|
|
||||||
if pool['status'] in constants.ACTIVE_PENDING:
|
|
||||||
self.agent_rpc.update_pool(context, old_pool, pool,
|
|
||||||
agent['host'])
|
|
||||||
else:
|
|
||||||
self.agent_rpc.delete_pool(context, pool, agent['host'])
|
|
||||||
|
|
||||||
def delete_pool(self, context, pool):
|
|
||||||
# get agent first to know host as binding will be deleted
|
|
||||||
# after pool is deleted from db
|
|
||||||
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
|
|
||||||
self.plugin._delete_db_pool(context, pool['id'])
|
|
||||||
if agent:
|
|
||||||
self.agent_rpc.delete_pool(context, pool, agent['agent']['host'])
|
|
||||||
|
|
||||||
def create_member(self, context, member):
|
|
||||||
agent = self.get_pool_agent(context, member['pool_id'])
|
|
||||||
self.agent_rpc.create_member(context, member, agent['host'])
|
|
||||||
|
|
||||||
def update_member(self, context, old_member, member):
|
|
||||||
agent = self.get_pool_agent(context, member['pool_id'])
|
|
||||||
# member may change pool id
|
|
||||||
if member['pool_id'] != old_member['pool_id']:
|
|
||||||
old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool(
|
|
||||||
context, old_member['pool_id'])
|
|
||||||
if old_pool_agent:
|
|
||||||
self.agent_rpc.delete_member(context, old_member,
|
|
||||||
old_pool_agent['agent']['host'])
|
|
||||||
self.agent_rpc.create_member(context, member, agent['host'])
|
|
||||||
else:
|
|
||||||
self.agent_rpc.update_member(context, old_member, member,
|
|
||||||
agent['host'])
|
|
||||||
|
|
||||||
def delete_member(self, context, member):
|
|
||||||
self.plugin._delete_db_member(context, member['id'])
|
|
||||||
agent = self.get_pool_agent(context, member['pool_id'])
|
|
||||||
self.agent_rpc.delete_member(context, member, agent['host'])
|
|
||||||
|
|
||||||
def create_pool_health_monitor(self, context, healthmon, pool_id):
|
|
||||||
# healthmon is not used here
|
|
||||||
agent = self.get_pool_agent(context, pool_id)
|
|
||||||
self.agent_rpc.create_pool_health_monitor(context, healthmon,
|
|
||||||
pool_id, agent['host'])
|
|
||||||
|
|
||||||
def update_pool_health_monitor(self, context, old_health_monitor,
|
|
||||||
health_monitor, pool_id):
|
|
||||||
agent = self.get_pool_agent(context, pool_id)
|
|
||||||
self.agent_rpc.update_pool_health_monitor(context, old_health_monitor,
|
|
||||||
health_monitor, pool_id,
|
|
||||||
agent['host'])
|
|
||||||
|
|
||||||
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
|
|
||||||
self.plugin._delete_db_pool_health_monitor(
|
|
||||||
context, health_monitor['id'], pool_id
|
|
||||||
)
|
|
||||||
|
|
||||||
agent = self.get_pool_agent(context, pool_id)
|
|
||||||
self.agent_rpc.delete_pool_health_monitor(context, health_monitor,
|
|
||||||
pool_id, agent['host'])
|
|
||||||
|
|
||||||
def stats(self, context, pool_id):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class HaproxyOnHostPluginDriver(AgentBasedPluginDriver):
|
|
||||||
#TODO(obondarev): change hardcoded driver name
|
|
||||||
# to namespace_driver.DRIVER_NAME after moving HaproxyOnHostPluginDriver
|
|
||||||
# to a separate file (follow-up patch)
|
|
||||||
device_driver = 'haproxy_ns'
|
|
||||||
|
@ -20,7 +20,7 @@ import contextlib
|
|||||||
import mock
|
import mock
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from neutron.services.loadbalancer.drivers.haproxy import agent
|
from neutron.services.loadbalancer.agent import agent
|
||||||
from neutron.tests import base
|
from neutron.tests import base
|
||||||
|
|
||||||
|
|
@ -21,9 +21,7 @@ import contextlib
|
|||||||
import mock
|
import mock
|
||||||
|
|
||||||
from neutron.plugins.common import constants
|
from neutron.plugins.common import constants
|
||||||
from neutron.services.loadbalancer.drivers.haproxy import (
|
from neutron.services.loadbalancer.agent import agent_manager as manager
|
||||||
agent_manager as manager
|
|
||||||
)
|
|
||||||
from neutron.tests import base
|
from neutron.tests import base
|
||||||
|
|
||||||
|
|
||||||
@ -38,8 +36,7 @@ class TestManager(base.BaseTestCase):
|
|||||||
self.mock_importer = mock.patch.object(manager, 'importutils').start()
|
self.mock_importer = mock.patch.object(manager, 'importutils').start()
|
||||||
|
|
||||||
rpc_mock_cls = mock.patch(
|
rpc_mock_cls = mock.patch(
|
||||||
'neutron.services.loadbalancer.drivers'
|
'neutron.services.loadbalancer.agent.agent_api.LbaasAgentApi'
|
||||||
'.haproxy.agent_api.LbaasAgentApi'
|
|
||||||
).start()
|
).start()
|
||||||
|
|
||||||
self.mgr = manager.LbaasAgentManager(mock_conf)
|
self.mgr = manager.LbaasAgentManager(mock_conf)
|
@ -18,9 +18,7 @@
|
|||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
from neutron.services.loadbalancer.drivers.haproxy import (
|
from neutron.services.loadbalancer.agent import agent_api as api
|
||||||
agent_api as api
|
|
||||||
)
|
|
||||||
from neutron.tests import base
|
from neutron.tests import base
|
||||||
|
|
||||||
|
|
@ -30,9 +30,7 @@ from neutron.extensions import portbindings
|
|||||||
from neutron import manager
|
from neutron import manager
|
||||||
from neutron.openstack.common import uuidutils
|
from neutron.openstack.common import uuidutils
|
||||||
from neutron.plugins.common import constants
|
from neutron.plugins.common import constants
|
||||||
from neutron.services.loadbalancer.drivers.haproxy import (
|
from neutron.services.loadbalancer.drivers.common import agent_driver_base
|
||||||
plugin_driver
|
|
||||||
)
|
|
||||||
from neutron.tests import base
|
from neutron.tests import base
|
||||||
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
|
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
|
||||||
from neutron.tests.unit import testlib_api
|
from neutron.tests.unit import testlib_api
|
||||||
@ -43,20 +41,20 @@ class TestLoadBalancerPluginBase(
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
def reset_device_driver():
|
def reset_device_driver():
|
||||||
plugin_driver.AgentBasedPluginDriver.device_driver = None
|
agent_driver_base.AgentDriverBase.device_driver = None
|
||||||
self.addCleanup(reset_device_driver)
|
self.addCleanup(reset_device_driver)
|
||||||
|
|
||||||
self.mock_importer = mock.patch.object(
|
self.mock_importer = mock.patch.object(
|
||||||
plugin_driver, 'importutils').start()
|
agent_driver_base, 'importutils').start()
|
||||||
self.addCleanup(mock.patch.stopall)
|
self.addCleanup(mock.patch.stopall)
|
||||||
|
|
||||||
# needed to reload provider configuration
|
# needed to reload provider configuration
|
||||||
st_db.ServiceTypeManager._instance = None
|
st_db.ServiceTypeManager._instance = None
|
||||||
plugin_driver.AgentBasedPluginDriver.device_driver = 'dummy'
|
agent_driver_base.AgentDriverBase.device_driver = 'dummy'
|
||||||
super(TestLoadBalancerPluginBase, self).setUp(
|
super(TestLoadBalancerPluginBase, self).setUp(
|
||||||
lbaas_provider=('LOADBALANCER:lbaas:neutron.services.'
|
lbaas_provider=('LOADBALANCER:lbaas:neutron.services.'
|
||||||
'loadbalancer.drivers.haproxy.plugin_driver.'
|
'loadbalancer.drivers.common.agent_driver_base.'
|
||||||
'AgentBasedPluginDriver:default'))
|
'AgentDriverBase:default'))
|
||||||
|
|
||||||
# we need access to loaded plugins to modify models
|
# we need access to loaded plugins to modify models
|
||||||
loaded_plugins = manager.NeutronManager().get_service_plugins()
|
loaded_plugins = manager.NeutronManager().get_service_plugins()
|
||||||
@ -68,7 +66,7 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestLoadBalancerCallbacks, self).setUp()
|
super(TestLoadBalancerCallbacks, self).setUp()
|
||||||
|
|
||||||
self.callbacks = plugin_driver.LoadBalancerCallbacks(
|
self.callbacks = agent_driver_base.LoadBalancerCallbacks(
|
||||||
self.plugin_instance
|
self.plugin_instance
|
||||||
)
|
)
|
||||||
get_lbaas_agents_patcher = mock.patch(
|
get_lbaas_agents_patcher = mock.patch(
|
||||||
@ -400,7 +398,7 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
|
|||||||
self.assertEqual('ACTIVE', p['status'])
|
self.assertEqual('ACTIVE', p['status'])
|
||||||
|
|
||||||
def test_update_status_pool_deleted_already(self):
|
def test_update_status_pool_deleted_already(self):
|
||||||
with mock.patch.object(plugin_driver, 'LOG') as mock_log:
|
with mock.patch.object(agent_driver_base, 'LOG') as mock_log:
|
||||||
pool_id = 'deleted_pool'
|
pool_id = 'deleted_pool'
|
||||||
ctx = context.get_admin_context()
|
ctx = context.get_admin_context()
|
||||||
self.assertRaises(loadbalancer.PoolNotFound,
|
self.assertRaises(loadbalancer.PoolNotFound,
|
||||||
@ -433,7 +431,7 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
|
|||||||
super(TestLoadBalancerAgentApi, self).setUp()
|
super(TestLoadBalancerAgentApi, self).setUp()
|
||||||
self.addCleanup(mock.patch.stopall)
|
self.addCleanup(mock.patch.stopall)
|
||||||
|
|
||||||
self.api = plugin_driver.LoadBalancerAgentApi('topic')
|
self.api = agent_driver_base.LoadBalancerAgentApi('topic')
|
||||||
self.mock_cast = mock.patch.object(self.api, 'cast').start()
|
self.mock_cast = mock.patch.object(self.api, 'cast').start()
|
||||||
self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
|
self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
|
||||||
|
|
||||||
@ -510,16 +508,16 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
|
|||||||
|
|
||||||
class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.log = mock.patch.object(plugin_driver, 'LOG')
|
self.log = mock.patch.object(agent_driver_base, 'LOG')
|
||||||
api_cls = mock.patch.object(plugin_driver,
|
api_cls = mock.patch.object(agent_driver_base,
|
||||||
'LoadBalancerAgentApi').start()
|
'LoadBalancerAgentApi').start()
|
||||||
super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
|
super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
|
||||||
self.mock_api = api_cls.return_value
|
self.mock_api = api_cls.return_value
|
||||||
|
|
||||||
self.mock_get_driver = mock.patch.object(self.plugin_instance,
|
self.mock_get_driver = mock.patch.object(self.plugin_instance,
|
||||||
'_get_driver')
|
'_get_driver')
|
||||||
self.mock_get_driver.return_value = (plugin_driver.
|
self.mock_get_driver.return_value = (agent_driver_base.
|
||||||
AgentBasedPluginDriver(
|
AgentDriverBase(
|
||||||
self.plugin_instance
|
self.plugin_instance
|
||||||
))
|
))
|
||||||
|
|
@ -86,7 +86,7 @@ console_scripts =
|
|||||||
neutron-dhcp-agent = neutron.agent.dhcp_agent:main
|
neutron-dhcp-agent = neutron.agent.dhcp_agent:main
|
||||||
neutron-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main
|
neutron-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main
|
||||||
neutron-l3-agent = neutron.agent.l3_agent:main
|
neutron-l3-agent = neutron.agent.l3_agent:main
|
||||||
neutron-lbaas-agent = neutron.services.loadbalancer.drivers.haproxy.agent:main
|
neutron-lbaas-agent = neutron.services.loadbalancer.agent.agent:main
|
||||||
neutron-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main
|
neutron-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main
|
||||||
neutron-metadata-agent = neutron.agent.metadata.agent:main
|
neutron-metadata-agent = neutron.agent.metadata.agent:main
|
||||||
neutron-mlnx-agent = neutron.plugins.mlnx.agent.eswitch_neutron_agent:main
|
neutron-mlnx-agent = neutron.plugins.mlnx.agent.eswitch_neutron_agent:main
|
||||||
@ -107,7 +107,7 @@ console_scripts =
|
|||||||
quantum-dhcp-agent = neutron.agent.dhcp_agent:main
|
quantum-dhcp-agent = neutron.agent.dhcp_agent:main
|
||||||
quantum-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main
|
quantum-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main
|
||||||
quantum-l3-agent = neutron.agent.l3_agent:main
|
quantum-l3-agent = neutron.agent.l3_agent:main
|
||||||
quantum-lbaas-agent = neutron.services.loadbalancer.drivers.haproxy.agent:main
|
quantum-lbaas-agent = neutron.services.loadbalancer.agent.agent:main
|
||||||
quantum-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main
|
quantum-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main
|
||||||
quantum-metadata-agent = neutron.agent.metadata.agent:main
|
quantum-metadata-agent = neutron.agent.metadata.agent:main
|
||||||
quantum-mlnx-agent = neutron.plugins.mlnx.agent.eswitch_neutron_agent:main
|
quantum-mlnx-agent = neutron.plugins.mlnx.agent.eswitch_neutron_agent:main
|
||||||
|
Loading…
Reference in New Issue
Block a user