vmware-nsx/neutron/services/loadbalancer/drivers/haproxy/agent_manager.py
Oleg Bondarev 916d97b5d6 LBaaS: unify haproxy-on-host plugin driver and agent
Unifies haproxy reference implementation to make common agent based plugin driver
which is suitable for all vendors who wants to use async mechanism.

 - Agent API as well as device driver API changed to handle
   loadbalancer objects individually;
 - Agent loads device drivers according to config;
 - LogicalDeviceCache class was removed from agent as it was used only
   as a list - to put and remove entries ant check whether entry is in or not.
   It was replaced with instance_mapping dict in agent to store known instances and
   corresponding device_drivers;
 - Agent reports which device drivers are supported (needs for scheduling on plugin side);
 - Agent-to-plugin API was extended to provide an ability for agent to update
   statuses of pools/vips/members/health_monitors;
 - Vendor should only implement device driver; plugin driver just needs
   to inherit AgentBasedPluginDriver and override device_driver member;
 - This patch doesn't move files to make review easier;
   all rename/replace will be done in a subsequent patch;

DocImpact

NOTE: Since the change in the agent RPC API is backward-incompatible
(major RPC version change), LBaaS server-agent communications will be
completely broken until both sides are upgraded so users will be unable to
create new or update existing HAProxy loadbalancer instances during upgrade

Implements blueprint lbaas-common-agent-driver

Change-Id: I9fd90a1321611d202ef838681273081fa6c1686a
2013-12-10 12:08:03 +04:00

342 lines
13 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
from oslo.config import cfg
from neutron.agent import rpc as agent_rpc
from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
from neutron import context
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task
from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers.haproxy import (
agent_api,
plugin_driver
)
LOG = logging.getLogger(__name__)
OPTS = [
cfg.MultiStrOpt(
'device_driver',
default=['neutron.services.loadbalancer.drivers'
'.haproxy.namespace_driver.HaproxyNSDriver'],
help=_('Drivers used to manage loadbalancing devices'),
),
cfg.StrOpt(
'interface_driver',
help=_('The driver used to manage the virtual interface')
),
]
class DeviceNotFoundOnAgent(n_exc.NotFound):
msg = _('Unknown device with pool_id %(pool_id)s')
class LbaasAgentManager(periodic_task.PeriodicTasks):
RPC_API_VERSION = '2.0'
# history
# 1.0 Initial version
# 1.1 Support agent_updated call
# 2.0 Generic API for agent based drivers
# - modify/reload/destroy_pool methods were removed;
# - added methods to handle create/update/delete for every lbaas
# object individually;
def __init__(self, conf):
self.conf = conf
self.context = context.get_admin_context_without_session()
self.plugin_rpc = agent_api.LbaasAgentApi(
plugin_driver.TOPIC_LOADBALANCER_PLUGIN,
self.context,
self.conf.host
)
self._load_drivers()
self.agent_state = {
'binary': 'neutron-lbaas-agent',
'host': conf.host,
'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT,
'configurations': {'device_drivers': self.device_drivers.keys()},
'agent_type': n_const.AGENT_TYPE_LOADBALANCER,
'start_flag': True}
self.admin_state_up = True
self._setup_state_rpc()
self.needs_resync = False
# pool_id->device_driver_name mapping used to store known instances
self.instance_mapping = {}
def _load_drivers(self):
self.device_drivers = {}
for driver in self.conf.device_driver:
try:
driver_inst = importutils.import_object(
driver,
self.conf,
self.plugin_rpc
)
except ImportError:
msg = _('Error importing loadbalancer device driver: %s')
raise SystemExit(msg % driver)
driver_name = driver_inst.get_name()
if driver_name not in self.device_drivers:
self.device_drivers[driver_name] = driver_inst
else:
msg = _('Multiple device drivers with the same name found: %s')
raise SystemExit(msg % driver_name)
def _setup_state_rpc(self):
self.state_rpc = agent_rpc.PluginReportStateAPI(
plugin_driver.TOPIC_LOADBALANCER_PLUGIN)
report_interval = self.conf.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
def _report_state(self):
try:
instance_count = len(self.instance_mapping)
self.agent_state['configurations']['instances'] = instance_count
self.state_rpc.report_state(self.context,
self.agent_state)
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception(_("Failed reporting state!"))
def initialize_service_hook(self, started_by):
self.sync_state()
@periodic_task.periodic_task
def periodic_resync(self, context):
if self.needs_resync:
self.needs_resync = False
self.sync_state()
@periodic_task.periodic_task(spacing=6)
def collect_stats(self, context):
for pool_id, driver_name in self.instance_mapping.items():
driver = self.device_drivers[driver_name]
try:
stats = driver.get_stats(pool_id)
if stats:
self.plugin_rpc.update_pool_stats(pool_id, stats)
except Exception:
LOG.exception(_('Error upating stats'))
self.needs_resync = True
def sync_state(self):
known_instances = set(self.instance_mapping.keys())
try:
ready_instances = set(self.plugin_rpc.get_ready_devices())
for deleted_id in known_instances - ready_instances:
self._destroy_pool(deleted_id)
for pool_id in ready_instances:
self._reload_pool(pool_id)
except Exception:
LOG.exception(_('Unable to retrieve ready devices'))
self.needs_resync = True
self.remove_orphans()
def _get_driver(self, pool_id):
if pool_id not in self.instance_mapping:
raise DeviceNotFoundOnAgent(pool_id=pool_id)
driver_name = self.instance_mapping[pool_id]
return self.device_drivers[driver_name]
def _reload_pool(self, pool_id):
try:
logical_config = self.plugin_rpc.get_logical_device(pool_id)
driver_name = logical_config['driver']
if driver_name not in self.device_drivers:
LOG.error(_('No device driver '
'on agent: %s.'), driver_name)
self.plugin_rpc.update_status(
'pool', pool_id, constants.ERROR)
return
self.device_drivers[driver_name].deploy_instance(logical_config)
self.instance_mapping[pool_id] = driver_name
self.plugin_rpc.pool_deployed(pool_id)
except Exception:
LOG.exception(_('Unable to deploy instance for pool: %s'), pool_id)
self.needs_resync = True
def _destroy_pool(self, pool_id):
driver = self._get_driver(pool_id)
try:
driver.undeploy_instance(pool_id)
del self.instance_mapping[pool_id]
self.plugin_rpc.pool_destroyed(pool_id)
except Exception:
LOG.exception(_('Unable to destroy device for pool: %s'), pool_id)
self.needs_resync = True
def remove_orphans(self):
for driver_name in self.device_drivers:
pool_ids = [pool_id for pool_id in self.instance_mapping
if self.instance_mapping[pool_id] == driver_name]
try:
self.device_drivers[driver_name].remove_orphans(pool_ids)
except NotImplementedError:
pass # Not all drivers will support this
def _handle_failed_driver_call(self, operation, obj_type, obj_id, driver):
LOG.exception(_('%(operation)s %(obj)s %(id)s failed on device driver '
'%(driver)s'),
{'operation': operation.capitalize(), 'obj': obj_type,
'id': obj_id, 'driver': driver})
self.plugin_rpc.update_status(obj_type, obj_id, constants.ERROR)
def create_vip(self, context, vip):
driver = self._get_driver(vip['pool_id'])
try:
driver.create_vip(vip)
except Exception:
self._handle_failed_driver_call('create', 'vip', vip['id'],
driver.get_name())
else:
self.plugin_rpc.update_status('vip', vip['id'], constants.ACTIVE)
def update_vip(self, context, old_vip, vip):
driver = self._get_driver(vip['pool_id'])
try:
driver.update_vip(old_vip, vip)
except Exception:
self._handle_failed_driver_call('update', 'vip', vip['id'],
driver.get_name())
else:
self.plugin_rpc.update_status('vip', vip['id'], constants.ACTIVE)
def delete_vip(self, context, vip):
driver = self._get_driver(vip['pool_id'])
driver.delete_vip(vip)
def create_pool(self, context, pool, driver_name):
if driver_name not in self.device_drivers:
LOG.error(_('No device driver on agent: %s.'), driver_name)
self.plugin_rpc.update_status('pool', pool['id'], constants.ERROR)
return
driver = self.device_drivers[driver_name]
try:
driver.create_pool(pool)
except Exception:
self._handle_failed_driver_call('create', 'pool', pool['id'],
driver.get_name())
else:
self.instance_mapping[pool['id']] = driver_name
self.plugin_rpc.update_status('pool', pool['id'], constants.ACTIVE)
def update_pool(self, context, old_pool, pool):
driver = self._get_driver(pool['id'])
try:
driver.update_pool(old_pool, pool)
except Exception:
self._handle_failed_driver_call('update', 'pool', pool['id'],
driver.get_name())
else:
self.plugin_rpc.update_status('pool', pool['id'], constants.ACTIVE)
def delete_pool(self, context, pool):
driver = self._get_driver(pool['id'])
driver.delete_pool(pool)
del self.instance_mapping[pool['id']]
def create_member(self, context, member):
driver = self._get_driver(member['pool_id'])
try:
driver.create_member(member)
except Exception:
self._handle_failed_driver_call('create', 'member', member['id'],
driver.get_name())
else:
self.plugin_rpc.update_status('member', member['id'],
constants.ACTIVE)
def update_member(self, context, old_member, member):
driver = self._get_driver(member['pool_id'])
try:
driver.update_member(old_member, member)
except Exception:
self._handle_failed_driver_call('update', 'member', member['id'],
driver.get_name())
else:
self.plugin_rpc.update_status('member', member['id'],
constants.ACTIVE)
def delete_member(self, context, member):
driver = self._get_driver(member['pool_id'])
driver.delete_member(member)
def create_pool_health_monitor(self, context, health_monitor, pool_id):
driver = self._get_driver(pool_id)
assoc_id = {'pool_id': pool_id, 'monitor_id': health_monitor['id']}
try:
driver.create_pool_health_monitor(health_monitor, pool_id)
except Exception:
self._handle_failed_driver_call(
'create', 'health_monitor', assoc_id, driver.get_name())
else:
self.plugin_rpc.update_status(
'health_monitor', assoc_id, constants.ACTIVE)
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id):
driver = self._get_driver(pool_id)
assoc_id = {'pool_id': pool_id, 'monitor_id': health_monitor['id']}
try:
driver.update_pool_health_monitor(old_health_monitor,
health_monitor,
pool_id)
except Exception:
self._handle_failed_driver_call(
'update', 'health_monitor', assoc_id, driver.get_name())
else:
self.plugin_rpc.update_status(
'health_monitor', assoc_id, constants.ACTIVE)
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
driver = self._get_driver(pool_id)
driver.delete_pool_health_monitor(health_monitor, pool_id)
def agent_updated(self, context, payload):
"""Handle the agent_updated notification event."""
if payload['admin_state_up'] != self.admin_state_up:
self.admin_state_up = payload['admin_state_up']
if self.admin_state_up:
self.needs_resync = True
else:
for pool_id in self.instance_mapping.keys():
LOG.info(_("Destroying pool %s due to agent disabling"),
pool_id)
self._destroy_pool(pool_id)
LOG.info(_("Agent_updated by server side %s!"), payload)