From bf8949defc24af3636da417442469e7d7194c7b3 Mon Sep 17 00:00:00 2001 From: Oleg Bondarev Date: Tue, 6 Aug 2013 12:52:34 +0400 Subject: [PATCH] LBaaS: unify haproxy-on-host plugin driver and agent Unifies haproxy reference implementation to make common agent based plugin driver which is suitable for all vendors who wants to use async mechanism. - Agent API as well as device driver API changed to handle loadbalancer objects individually; - Agent loads device drivers according to config; - LogicalDeviceCache class was removed from agent as it was used only as a list - to put and remove entries ant check whether entry is in or not. It was replaced with instance_mapping dict in agent to store known instances and corresponding device_drivers; - Agent reports which device drivers are supported (needs for scheduling on plugin side); - Agent-to-plugin API was extended to provide an ability for agent to update statuses of pools/vips/members/health_monitors; - Vendor should only implement device driver; plugin driver just needs to inherit AgentBasedPluginDriver and override device_driver member; - This patch doesn't move files to make review easier; all rename/replace will be done in a subsequent patch; DocImpact NOTE: Since the change in the agent RPC API is backward-incompatible (major RPC version change), LBaaS server-agent communications will be completely broken until both sides are upgraded so users will be unable to create new or update existing HAProxy loadbalancer instances during upgrade Implements blueprint lbaas-common-agent-driver Change-Id: I9fd90a1321611d202ef838681273081fa6c1686a --- etc/lbaas_agent.ini | 11 +- .../services/loadbalancer/agent_scheduler.py | 21 +- .../loadbalancer/drivers/abstract_driver.py | 8 +- .../drivers/agent_device_driver.py | 98 ++++ .../loadbalancer/drivers/haproxy/agent_api.py | 29 +- .../drivers/haproxy/agent_manager.py | 356 +++++++------ .../loadbalancer/drivers/haproxy/cfg.py | 12 +- .../drivers/haproxy/namespace_driver.py | 121 ++++- .../drivers/haproxy/plugin_driver.py | 307 +++++++---- .../loadbalancer/drivers/radware/driver.py | 6 +- neutron/services/loadbalancer/plugin.py | 4 +- .../db/loadbalancer/test_db_loadbalancer.py | 6 +- .../drivers/haproxy/test_agent_manager.py | 483 +++++++++--------- .../loadbalancer/drivers/haproxy/test_api.py | 40 +- .../loadbalancer/drivers/haproxy/test_cfg.py | 10 +- .../drivers/haproxy/test_namespace_driver.py | 170 +++++- .../drivers/haproxy/test_plugin_driver.py | 413 +++++++++------ .../loadbalancer/test_agent_scheduler.py | 12 +- neutron/tests/unit/test_agent_ext_plugin.py | 4 +- 19 files changed, 1379 insertions(+), 732 deletions(-) create mode 100644 neutron/services/loadbalancer/drivers/agent_device_driver.py diff --git a/etc/lbaas_agent.ini b/etc/lbaas_agent.ini index 17df74c348..021a8ba227 100644 --- a/etc/lbaas_agent.ini +++ b/etc/lbaas_agent.ini @@ -23,9 +23,16 @@ # Example of interface_driver option for LinuxBridge # interface_driver = neutron.agent.linux.interface.BridgeInterfaceDriver -# The agent requires a driver to manage the loadbalancer. HAProxy is the -# opensource version. +# The agent requires drivers to manage the loadbalancer. HAProxy is the opensource version. +# Multiple device drivers reflecting different service providers could be specified: +# device_driver = path.to.provider1.driver.Driver +# device_driver = path.to.provider2.driver.Driver +# Default is: # device_driver = neutron.services.loadbalancer.drivers.haproxy.namespace_driver.HaproxyNSDriver +[haproxy] +# Location to store config and state files +# loadbalancer_state_path = $state_path/lbaas + # The user group # user_group = nogroup diff --git a/neutron/services/loadbalancer/agent_scheduler.py b/neutron/services/loadbalancer/agent_scheduler.py index 95afe1c188..2ace1db6e2 100644 --- a/neutron/services/loadbalancer/agent_scheduler.py +++ b/neutron/services/loadbalancer/agent_scheduler.py @@ -79,11 +79,19 @@ class LbaasAgentSchedulerDbMixin(agentschedulers_db.AgentSchedulerDbMixin, else: return {'pools': []} + def get_lbaas_agent_candidates(self, device_driver, active_agents): + candidates = [] + for agent in active_agents: + agent_conf = self.get_configuration_dict(agent) + if device_driver in agent_conf['device_drivers']: + candidates.append(agent) + return candidates + class ChanceScheduler(object): """Allocate a loadbalancer agent for a vip in a random way.""" - def schedule(self, plugin, context, pool): + def schedule(self, plugin, context, pool, device_driver): """Schedule the pool to an active loadbalancer agent if there is no enabled agent hosting it. """ @@ -97,11 +105,18 @@ class ChanceScheduler(object): 'agent_id': lbaas_agent['id']}) return - candidates = plugin.get_lbaas_agents(context, active=True) - if not candidates: + active_agents = plugin.get_lbaas_agents(context, active=True) + if not active_agents: LOG.warn(_('No active lbaas agents for pool %s'), pool['id']) return + candidates = plugin.get_lbaas_agent_candidates(device_driver, + active_agents) + if not candidates: + LOG.warn(_('No lbaas agent supporting device driver %s'), + device_driver) + return + chosen_agent = random.choice(candidates) binding = PoolLoadbalancerAgentBinding() binding.agent = chosen_agent diff --git a/neutron/services/loadbalancer/drivers/abstract_driver.py b/neutron/services/loadbalancer/drivers/abstract_driver.py index 5659c3e4b2..55d2c6d628 100644 --- a/neutron/services/loadbalancer/drivers/abstract_driver.py +++ b/neutron/services/loadbalancer/drivers/abstract_driver.py @@ -107,10 +107,10 @@ class LoadBalancerAbstractDriver(object): pass @abc.abstractmethod - def update_health_monitor(self, context, - old_health_monitor, - health_monitor, - pool_id): + def update_pool_health_monitor(self, context, + old_health_monitor, + health_monitor, + pool_id): pass @abc.abstractmethod diff --git a/neutron/services/loadbalancer/drivers/agent_device_driver.py b/neutron/services/loadbalancer/drivers/agent_device_driver.py new file mode 100644 index 0000000000..3e0ede260c --- /dev/null +++ b/neutron/services/loadbalancer/drivers/agent_device_driver.py @@ -0,0 +1,98 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 OpenStack Foundation. All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +import six + + +@six.add_metaclass(abc.ABCMeta) +class AgentDeviceDriver(object): + """Abstract device driver that defines the API required by LBaaS agent.""" + + @abc.abstractmethod + def get_name(cls): + """Returns unique name across all LBaaS device drivers.""" + pass + + @abc.abstractmethod + def deploy_instance(self, logical_config): + """Fully deploys a loadbalancer instance from a given config.""" + pass + + @abc.abstractmethod + def undeploy_instance(self, pool_id): + """Fully undeploys the loadbalancer instance.""" + pass + + @abc.abstractmethod + def get_stats(self, pool_id): + pass + + def remove_orphans(self, known_pool_ids): + # Not all drivers will support this + raise NotImplementedError() + + @abc.abstractmethod + def create_vip(self, vip): + pass + + @abc.abstractmethod + def update_vip(self, old_vip, vip): + pass + + @abc.abstractmethod + def delete_vip(self, vip): + pass + + @abc.abstractmethod + def create_pool(self, pool): + pass + + @abc.abstractmethod + def update_pool(self, old_pool, pool): + pass + + @abc.abstractmethod + def delete_pool(self, pool): + pass + + @abc.abstractmethod + def create_member(self, member): + pass + + @abc.abstractmethod + def update_member(self, old_member, member): + pass + + @abc.abstractmethod + def delete_member(self, context, member): + pass + + @abc.abstractmethod + def create_pool_health_monitor(self, health_monitor, pool_id): + pass + + @abc.abstractmethod + def update_pool_health_monitor(self, + old_health_monitor, + health_monitor, + pool_id): + pass + + @abc.abstractmethod + def delete_pool_health_monitor(self, context, health_monitor, pool_id): + pass diff --git a/neutron/services/loadbalancer/drivers/haproxy/agent_api.py b/neutron/services/loadbalancer/drivers/haproxy/agent_api.py index 7990fd3ea9..65e2aa02d4 100644 --- a/neutron/services/loadbalancer/drivers/haproxy/agent_api.py +++ b/neutron/services/loadbalancer/drivers/haproxy/agent_api.py @@ -22,7 +22,12 @@ from neutron.openstack.common.rpc import proxy class LbaasAgentApi(proxy.RpcProxy): """Agent side of the Agent to Plugin RPC API.""" - API_VERSION = '1.0' + API_VERSION = '2.0' + # history + # 1.0 Initial version + # 2.0 Generic API for agent based drivers + # - get_logical_device() handling changed on plugin side; + # - pool_deployed() and update_status() methods added; def __init__(self, topic, context, host): super(LbaasAgentApi, self).__init__(topic, self.API_VERSION) @@ -36,21 +41,35 @@ class LbaasAgentApi(proxy.RpcProxy): topic=self.topic ) + def pool_destroyed(self, pool_id): + return self.call( + self.context, + self.make_msg('pool_destroyed', pool_id=pool_id), + topic=self.topic + ) + + def pool_deployed(self, pool_id): + return self.call( + self.context, + self.make_msg('pool_deployed', pool_id=pool_id), + topic=self.topic + ) + def get_logical_device(self, pool_id): return self.call( self.context, self.make_msg( 'get_logical_device', - pool_id=pool_id, - host=self.host + pool_id=pool_id ), topic=self.topic ) - def pool_destroyed(self, pool_id): + def update_status(self, obj_type, obj_id, status): return self.call( self.context, - self.make_msg('pool_destroyed', pool_id=pool_id, host=self.host), + self.make_msg('update_status', obj_type=obj_type, obj_id=obj_id, + status=status), topic=self.topic ) diff --git a/neutron/services/loadbalancer/drivers/haproxy/agent_manager.py b/neutron/services/loadbalancer/drivers/haproxy/agent_manager.py index ed05288dc5..7c6d20ab68 100644 --- a/neutron/services/loadbalancer/drivers/haproxy/agent_manager.py +++ b/neutron/services/loadbalancer/drivers/haproxy/agent_manager.py @@ -16,152 +16,100 @@ # # @author: Mark McClain, DreamHost -import weakref - from oslo.config import cfg -from neutron.agent.common import config from neutron.agent import rpc as agent_rpc -from neutron.common import constants +from neutron.common import constants as n_const +from neutron.common import exceptions as n_exc from neutron import context from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall from neutron.openstack.common import periodic_task +from neutron.plugins.common import constants from neutron.services.loadbalancer.drivers.haproxy import ( agent_api, plugin_driver ) LOG = logging.getLogger(__name__) -NS_PREFIX = 'qlbaas-' OPTS = [ - cfg.StrOpt( + cfg.MultiStrOpt( 'device_driver', - default=('neutron.services.loadbalancer.drivers' - '.haproxy.namespace_driver.HaproxyNSDriver'), - help=_('The driver used to manage the loadbalancing device'), - ), - cfg.StrOpt( - 'loadbalancer_state_path', - default='$state_path/lbaas', - help=_('Location to store config and state files'), + default=['neutron.services.loadbalancer.drivers' + '.haproxy.namespace_driver.HaproxyNSDriver'], + help=_('Drivers used to manage loadbalancing devices'), ), cfg.StrOpt( 'interface_driver', help=_('The driver used to manage the virtual interface') ), - cfg.StrOpt( - 'user_group', - default='nogroup', - help=_('The user group'), - ), ] -class LogicalDeviceCache(object): - """Manage a cache of known devices.""" - - class Device(object): - """Inner classes used to hold values for weakref lookups.""" - def __init__(self, port_id, pool_id): - self.port_id = port_id - self.pool_id = pool_id - - def __eq__(self, other): - return self.__dict__ == other.__dict__ - - def __hash__(self): - return hash((self.port_id, self.pool_id)) - - def __init__(self): - self.devices = set() - self.port_lookup = weakref.WeakValueDictionary() - self.pool_lookup = weakref.WeakValueDictionary() - - def put(self, device): - port_id = device['vip']['port_id'] - pool_id = device['pool']['id'] - d = self.Device(device['vip']['port_id'], device['pool']['id']) - if d not in self.devices: - self.devices.add(d) - self.port_lookup[port_id] = d - self.pool_lookup[pool_id] = d - - def remove(self, device): - if not isinstance(device, self.Device): - device = self.Device( - device['vip']['port_id'], device['pool']['id'] - ) - if device in self.devices: - self.devices.remove(device) - - def remove_by_pool_id(self, pool_id): - d = self.pool_lookup.get(pool_id) - if d: - self.devices.remove(d) - - def get_by_pool_id(self, pool_id): - return self.pool_lookup.get(pool_id) - - def get_by_port_id(self, port_id): - return self.port_lookup.get(port_id) - - def get_pool_ids(self): - return self.pool_lookup.keys() +class DeviceNotFoundOnAgent(n_exc.NotFound): + msg = _('Unknown device with pool_id %(pool_id)s') class LbaasAgentManager(periodic_task.PeriodicTasks): + RPC_API_VERSION = '2.0' # history # 1.0 Initial version # 1.1 Support agent_updated call - RPC_API_VERSION = '1.1' + # 2.0 Generic API for agent based drivers + # - modify/reload/destroy_pool methods were removed; + # - added methods to handle create/update/delete for every lbaas + # object individually; def __init__(self, conf): self.conf = conf - try: - vif_driver = importutils.import_object(conf.interface_driver, conf) - except ImportError: - msg = _('Error importing interface driver: %s') - raise SystemExit(msg % conf.interface_driver) - - try: - self.driver = importutils.import_object( - conf.device_driver, - config.get_root_helper(self.conf), - conf.loadbalancer_state_path, - vif_driver, - self._vip_plug_callback - ) - except ImportError: - msg = _('Error importing loadbalancer device driver: %s') - raise SystemExit(msg % conf.device_driver) + self.context = context.get_admin_context_without_session() + self.plugin_rpc = agent_api.LbaasAgentApi( + plugin_driver.TOPIC_LOADBALANCER_PLUGIN, + self.context, + self.conf.host + ) + self._load_drivers() self.agent_state = { 'binary': 'neutron-lbaas-agent', 'host': conf.host, 'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT, - 'configurations': {'device_driver': conf.device_driver, - 'interface_driver': conf.interface_driver}, - 'agent_type': constants.AGENT_TYPE_LOADBALANCER, + 'configurations': {'device_drivers': self.device_drivers.keys()}, + 'agent_type': n_const.AGENT_TYPE_LOADBALANCER, 'start_flag': True} self.admin_state_up = True - self.context = context.get_admin_context_without_session() - self._setup_rpc() + self._setup_state_rpc() self.needs_resync = False - self.cache = LogicalDeviceCache() + # pool_id->device_driver_name mapping used to store known instances + self.instance_mapping = {} - def _setup_rpc(self): - self.plugin_rpc = agent_api.LbaasAgentApi( - plugin_driver.TOPIC_PROCESS_ON_HOST, - self.context, - self.conf.host - ) + def _load_drivers(self): + self.device_drivers = {} + for driver in self.conf.device_driver: + try: + driver_inst = importutils.import_object( + driver, + self.conf, + self.plugin_rpc + ) + except ImportError: + msg = _('Error importing loadbalancer device driver: %s') + raise SystemExit(msg % driver) + + driver_name = driver_inst.get_name() + if driver_name not in self.device_drivers: + self.device_drivers[driver_name] = driver_inst + else: + msg = _('Multiple device drivers with the same name found: %s') + raise SystemExit(msg % driver_name) + + def _setup_state_rpc(self): self.state_rpc = agent_rpc.PluginReportStateAPI( - plugin_driver.TOPIC_PROCESS_ON_HOST) + plugin_driver.TOPIC_LOADBALANCER_PLUGIN) report_interval = self.conf.AGENT.report_interval if report_interval: heartbeat = loopingcall.FixedIntervalLoopingCall( @@ -170,8 +118,8 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): def _report_state(self): try: - device_count = len(self.cache.devices) - self.agent_state['configurations']['devices'] = device_count + instance_count = len(self.instance_mapping) + self.agent_state['configurations']['instances'] = instance_count self.state_rpc.report_state(self.context, self.agent_state) self.agent_state.pop('start_flag', None) @@ -189,31 +137,26 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): @periodic_task.periodic_task(spacing=6) def collect_stats(self, context): - for pool_id in self.cache.get_pool_ids(): + for pool_id, driver_name in self.instance_mapping.items(): + driver = self.device_drivers[driver_name] try: - stats = self.driver.get_stats(pool_id) + stats = driver.get_stats(pool_id) if stats: self.plugin_rpc.update_pool_stats(pool_id, stats) except Exception: LOG.exception(_('Error upating stats')) self.needs_resync = True - def _vip_plug_callback(self, action, port): - if action == 'plug': - self.plugin_rpc.plug_vip_port(port['id']) - elif action == 'unplug': - self.plugin_rpc.unplug_vip_port(port['id']) - def sync_state(self): - known_devices = set(self.cache.get_pool_ids()) + known_instances = set(self.instance_mapping.keys()) try: - ready_logical_devices = set(self.plugin_rpc.get_ready_devices()) + ready_instances = set(self.plugin_rpc.get_ready_devices()) - for deleted_id in known_devices - ready_logical_devices: - self.destroy_device(deleted_id) + for deleted_id in known_instances - ready_instances: + self._destroy_pool(deleted_id) - for pool_id in ready_logical_devices: - self.refresh_device(pool_id) + for pool_id in ready_instances: + self._reload_pool(pool_id) except Exception: LOG.exception(_('Unable to retrieve ready devices')) @@ -221,51 +164,168 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): self.remove_orphans() - def refresh_device(self, pool_id): + def _get_driver(self, pool_id): + if pool_id not in self.instance_mapping: + raise DeviceNotFoundOnAgent(pool_id=pool_id) + + driver_name = self.instance_mapping[pool_id] + return self.device_drivers[driver_name] + + def _reload_pool(self, pool_id): try: logical_config = self.plugin_rpc.get_logical_device(pool_id) + driver_name = logical_config['driver'] + if driver_name not in self.device_drivers: + LOG.error(_('No device driver ' + 'on agent: %s.'), driver_name) + self.plugin_rpc.update_status( + 'pool', pool_id, constants.ERROR) + return - if self.driver.exists(pool_id): - self.driver.update(logical_config) - else: - self.driver.create(logical_config) - self.cache.put(logical_config) + self.device_drivers[driver_name].deploy_instance(logical_config) + self.instance_mapping[pool_id] = driver_name + self.plugin_rpc.pool_deployed(pool_id) except Exception: - LOG.exception(_('Unable to refresh device for pool: %s'), pool_id) + LOG.exception(_('Unable to deploy instance for pool: %s'), pool_id) self.needs_resync = True - def destroy_device(self, pool_id): - device = self.cache.get_by_pool_id(pool_id) - if not device: - return + def _destroy_pool(self, pool_id): + driver = self._get_driver(pool_id) try: - self.driver.destroy(pool_id) + driver.undeploy_instance(pool_id) + del self.instance_mapping[pool_id] self.plugin_rpc.pool_destroyed(pool_id) except Exception: LOG.exception(_('Unable to destroy device for pool: %s'), pool_id) self.needs_resync = True - self.cache.remove(device) def remove_orphans(self): + for driver_name in self.device_drivers: + pool_ids = [pool_id for pool_id in self.instance_mapping + if self.instance_mapping[pool_id] == driver_name] + try: + self.device_drivers[driver_name].remove_orphans(pool_ids) + except NotImplementedError: + pass # Not all drivers will support this + + def _handle_failed_driver_call(self, operation, obj_type, obj_id, driver): + LOG.exception(_('%(operation)s %(obj)s %(id)s failed on device driver ' + '%(driver)s'), + {'operation': operation.capitalize(), 'obj': obj_type, + 'id': obj_id, 'driver': driver}) + self.plugin_rpc.update_status(obj_type, obj_id, constants.ERROR) + + def create_vip(self, context, vip): + driver = self._get_driver(vip['pool_id']) try: - self.driver.remove_orphans(self.cache.get_pool_ids()) - except NotImplementedError: - pass # Not all drivers will support this + driver.create_vip(vip) + except Exception: + self._handle_failed_driver_call('create', 'vip', vip['id'], + driver.get_name()) + else: + self.plugin_rpc.update_status('vip', vip['id'], constants.ACTIVE) - def reload_pool(self, context, pool_id=None, host=None): - """Handle RPC cast from plugin to reload a pool.""" - if pool_id: - self.refresh_device(pool_id) + def update_vip(self, context, old_vip, vip): + driver = self._get_driver(vip['pool_id']) + try: + driver.update_vip(old_vip, vip) + except Exception: + self._handle_failed_driver_call('update', 'vip', vip['id'], + driver.get_name()) + else: + self.plugin_rpc.update_status('vip', vip['id'], constants.ACTIVE) - def modify_pool(self, context, pool_id=None, host=None): - """Handle RPC cast from plugin to modify a pool if known to agent.""" - if self.cache.get_by_pool_id(pool_id): - self.refresh_device(pool_id) + def delete_vip(self, context, vip): + driver = self._get_driver(vip['pool_id']) + driver.delete_vip(vip) - def destroy_pool(self, context, pool_id=None, host=None): - """Handle RPC cast from plugin to destroy a pool if known to agent.""" - if self.cache.get_by_pool_id(pool_id): - self.destroy_device(pool_id) + def create_pool(self, context, pool, driver_name): + if driver_name not in self.device_drivers: + LOG.error(_('No device driver on agent: %s.'), driver_name) + self.plugin_rpc.update_status('pool', pool['id'], constants.ERROR) + return + + driver = self.device_drivers[driver_name] + try: + driver.create_pool(pool) + except Exception: + self._handle_failed_driver_call('create', 'pool', pool['id'], + driver.get_name()) + else: + self.instance_mapping[pool['id']] = driver_name + self.plugin_rpc.update_status('pool', pool['id'], constants.ACTIVE) + + def update_pool(self, context, old_pool, pool): + driver = self._get_driver(pool['id']) + try: + driver.update_pool(old_pool, pool) + except Exception: + self._handle_failed_driver_call('update', 'pool', pool['id'], + driver.get_name()) + else: + self.plugin_rpc.update_status('pool', pool['id'], constants.ACTIVE) + + def delete_pool(self, context, pool): + driver = self._get_driver(pool['id']) + driver.delete_pool(pool) + del self.instance_mapping[pool['id']] + + def create_member(self, context, member): + driver = self._get_driver(member['pool_id']) + try: + driver.create_member(member) + except Exception: + self._handle_failed_driver_call('create', 'member', member['id'], + driver.get_name()) + else: + self.plugin_rpc.update_status('member', member['id'], + constants.ACTIVE) + + def update_member(self, context, old_member, member): + driver = self._get_driver(member['pool_id']) + try: + driver.update_member(old_member, member) + except Exception: + self._handle_failed_driver_call('update', 'member', member['id'], + driver.get_name()) + else: + self.plugin_rpc.update_status('member', member['id'], + constants.ACTIVE) + + def delete_member(self, context, member): + driver = self._get_driver(member['pool_id']) + driver.delete_member(member) + + def create_pool_health_monitor(self, context, health_monitor, pool_id): + driver = self._get_driver(pool_id) + assoc_id = {'pool_id': pool_id, 'monitor_id': health_monitor['id']} + try: + driver.create_pool_health_monitor(health_monitor, pool_id) + except Exception: + self._handle_failed_driver_call( + 'create', 'health_monitor', assoc_id, driver.get_name()) + else: + self.plugin_rpc.update_status( + 'health_monitor', assoc_id, constants.ACTIVE) + + def update_pool_health_monitor(self, context, old_health_monitor, + health_monitor, pool_id): + driver = self._get_driver(pool_id) + assoc_id = {'pool_id': pool_id, 'monitor_id': health_monitor['id']} + try: + driver.update_pool_health_monitor(old_health_monitor, + health_monitor, + pool_id) + except Exception: + self._handle_failed_driver_call( + 'update', 'health_monitor', assoc_id, driver.get_name()) + else: + self.plugin_rpc.update_status( + 'health_monitor', assoc_id, constants.ACTIVE) + + def delete_pool_health_monitor(self, context, health_monitor, pool_id): + driver = self._get_driver(pool_id) + driver.delete_pool_health_monitor(health_monitor, pool_id) def agent_updated(self, context, payload): """Handle the agent_updated notification event.""" @@ -274,6 +334,8 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): if self.admin_state_up: self.needs_resync = True else: - for pool_id in self.cache.get_pool_ids(): - self.destroy_device(pool_id) - LOG.info(_("agent_updated by server side %s!"), payload) + for pool_id in self.instance_mapping.keys(): + LOG.info(_("Destroying pool %s due to agent disabling"), + pool_id) + self._destroy_pool(pool_id) + LOG.info(_("Agent_updated by server side %s!"), payload) diff --git a/neutron/services/loadbalancer/drivers/haproxy/cfg.py b/neutron/services/loadbalancer/drivers/haproxy/cfg.py index 052945cad5..776b4b1500 100644 --- a/neutron/services/loadbalancer/drivers/haproxy/cfg.py +++ b/neutron/services/loadbalancer/drivers/haproxy/cfg.py @@ -18,8 +18,6 @@ import itertools -from oslo.config import cfg - from neutron.agent.linux import utils from neutron.plugins.common import constants as qconstants from neutron.services.loadbalancer import constants @@ -53,21 +51,23 @@ ACTIVE = qconstants.ACTIVE INACTIVE = qconstants.INACTIVE -def save_config(conf_path, logical_config, socket_path=None): +def save_config(conf_path, logical_config, socket_path=None, + user_group='nogroup'): """Convert a logical configuration to the HAProxy version.""" data = [] - data.extend(_build_global(logical_config, socket_path=socket_path)) + data.extend(_build_global(logical_config, socket_path=socket_path, + user_group=user_group)) data.extend(_build_defaults(logical_config)) data.extend(_build_frontend(logical_config)) data.extend(_build_backend(logical_config)) utils.replace_file(conf_path, '\n'.join(data)) -def _build_global(config, socket_path=None): +def _build_global(config, socket_path=None, user_group='nogroup'): opts = [ 'daemon', 'user nobody', - 'group %s' % cfg.CONF.user_group, + 'group %s' % user_group, 'log /dev/log local0', 'log /dev/log local1 notice' ] diff --git a/neutron/services/loadbalancer/drivers/haproxy/namespace_driver.py b/neutron/services/loadbalancer/drivers/haproxy/namespace_driver.py index 20f85c72e8..03ff795a78 100644 --- a/neutron/services/loadbalancer/drivers/haproxy/namespace_driver.py +++ b/neutron/services/loadbalancer/drivers/haproxy/namespace_driver.py @@ -20,27 +20,69 @@ import shutil import socket import netaddr +from oslo.config import cfg +from neutron.agent.common import config from neutron.agent.linux import ip_lib from neutron.agent.linux import utils from neutron.common import exceptions +from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.plugins.common import constants from neutron.services.loadbalancer import constants as lb_const +from neutron.services.loadbalancer.drivers import agent_device_driver from neutron.services.loadbalancer.drivers.haproxy import cfg as hacfg LOG = logging.getLogger(__name__) NS_PREFIX = 'qlbaas-' +DRIVER_NAME = 'haproxy_ns' + +ACTIVE_PENDING = ( + constants.ACTIVE, + constants.PENDING_CREATE, + constants.PENDING_UPDATE +) + +STATE_PATH_DEFAULT = '$state_path/lbaas' +USER_GROUP_DEFAULT = 'nogroup' +OPTS = [ + cfg.StrOpt( + 'loadbalancer_state_path', + default=STATE_PATH_DEFAULT, + help=_('Location to store config and state files'), + deprecated_opts=[cfg.DeprecatedOpt('loadbalancer_state_path')], + ), + cfg.StrOpt( + 'user_group', + default=USER_GROUP_DEFAULT, + help=_('The user group'), + deprecated_opts=[cfg.DeprecatedOpt('user_group')], + ) +] +cfg.CONF.register_opts(OPTS, 'haproxy') -class HaproxyNSDriver(object): - def __init__(self, root_helper, state_path, vif_driver, vip_plug_callback): - self.root_helper = root_helper - self.state_path = state_path +class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver): + def __init__(self, conf, plugin_rpc): + self.conf = conf + self.root_helper = config.get_root_helper(conf) + self.state_path = conf.haproxy.loadbalancer_state_path + try: + vif_driver = importutils.import_object(conf.interface_driver, conf) + except ImportError: + msg = (_('Error importing interface driver: %s') + % conf.haproxy.interface_driver) + LOG.error(msg) + raise + self.vif_driver = vif_driver - self.vip_plug_callback = vip_plug_callback + self.plugin_rpc = plugin_rpc self.pool_to_port_id = {} + @classmethod + def get_name(cls): + return DRIVER_NAME + def create(self, logical_config): pool_id = logical_config['pool']['id'] namespace = get_ns_name(pool_id) @@ -62,8 +104,9 @@ class HaproxyNSDriver(object): conf_path = self._get_state_file_path(pool_id, 'conf') pid_path = self._get_state_file_path(pool_id, 'pid') sock_path = self._get_state_file_path(pool_id, 'sock') + user_group = self.conf.haproxy.user_group - hacfg.save_config(conf_path, logical_config, sock_path) + hacfg.save_config(conf_path, logical_config, sock_path, user_group) cmd = ['haproxy', '-f', conf_path, '-p', pid_path] cmd.extend(extra_cmd_args) @@ -73,7 +116,7 @@ class HaproxyNSDriver(object): # remember the pool<>port mapping self.pool_to_port_id[pool_id] = logical_config['vip']['port']['id'] - def destroy(self, pool_id): + def undeploy_instance(self, pool_id): namespace = get_ns_name(pool_id) ns = ip_lib.IPWrapper(self.root_helper, namespace) pid_path = self._get_state_file_path(pool_id, 'pid') @@ -176,9 +219,6 @@ class HaproxyNSDriver(object): return res_stats - def remove_orphans(self, known_pool_ids): - raise NotImplementedError() - def _get_state_file_path(self, pool_id, kind, ensure_state_dir=True): """Returns the file name for a given kind of config file.""" confs_dir = os.path.abspath(os.path.normpath(self.state_path)) @@ -189,7 +229,7 @@ class HaproxyNSDriver(object): return os.path.join(conf_dir, kind) def _plug(self, namespace, port, reuse_existing=True): - self.vip_plug_callback('plug', port) + self.plugin_rpc.plug_vip_port(port['id']) interface_name = self.vif_driver.get_device_name(Wrap(port)) if ip_lib.device_exists(interface_name, self.root_helper, namespace): @@ -222,10 +262,67 @@ class HaproxyNSDriver(object): def _unplug(self, namespace, port_id): port_stub = {'id': port_id} - self.vip_plug_callback('unplug', port_stub) + self.plugin_rpc.unplug_vip_port(port_id) interface_name = self.vif_driver.get_device_name(Wrap(port_stub)) self.vif_driver.unplug(interface_name, namespace=namespace) + def deploy_instance(self, logical_config): + # do actual deploy only if vip is configured and active + if ('vip' not in logical_config or + logical_config['vip']['status'] not in ACTIVE_PENDING or + not logical_config['vip']['admin_state_up']): + return + + if self.exists(logical_config['pool']['id']): + self.update(logical_config) + else: + self.create(logical_config) + + def _refresh_device(self, pool_id): + logical_config = self.plugin_rpc.get_logical_device(pool_id) + self.deploy_instance(logical_config) + + def create_vip(self, vip): + self._refresh_device(vip['pool_id']) + + def update_vip(self, old_vip, vip): + self._refresh_device(vip['pool_id']) + + def delete_vip(self, vip): + self.undeploy_instance(vip['pool_id']) + + def create_pool(self, pool): + # nothing to do here because a pool needs a vip to be useful + pass + + def update_pool(self, old_pool, pool): + self._refresh_device(pool['id']) + + def delete_pool(self, pool): + # delete_pool may be called before vip deletion in case + # pool's admin state set to down + if self.exists(pool['id']): + self.undeploy_instance(pool['id']) + + def create_member(self, member): + self._refresh_device(member['pool_id']) + + def update_member(self, old_member, member): + self._refresh_device(member['pool_id']) + + def delete_member(self, member): + self._refresh_device(member['pool_id']) + + def create_pool_health_monitor(self, health_monitor, pool_id): + self._refresh_device(pool_id) + + def update_pool_health_monitor(self, old_health_monitor, health_monitor, + pool_id): + self._refresh_device(pool_id) + + def delete_pool_health_monitor(self, health_monitor, pool_id): + self._refresh_device(pool_id) + # NOTE (markmcclain) For compliance with interface.py which expects objects class Wrap(object): diff --git a/neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py b/neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py index 1f402b35be..ad42b0c987 100644 --- a/neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py +++ b/neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py @@ -53,13 +53,23 @@ AGENT_SCHEDULER_OPTS = [ cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS) # topic name for this particular agent implementation -TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host' -TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent' +TOPIC_LOADBALANCER_PLUGIN = 'n-lbaas-plugin' +TOPIC_LOADBALANCER_AGENT = 'n-lbaas_agent' + + +class DriverNotSpecified(q_exc.NeutronException): + message = _("Device driver for agent should be specified " + "in plugin driver.") class LoadBalancerCallbacks(object): - RPC_API_VERSION = '1.0' + RPC_API_VERSION = '2.0' + # history + # 1.0 Initial version + # 2.0 Generic API for agent based drivers + # - get_logical_device() handling changed; + # - pool_deployed() and update_status() methods added; def __init__(self, plugin): self.plugin = plugin @@ -70,67 +80,47 @@ class LoadBalancerCallbacks(object): def get_ready_devices(self, context, host=None): with context.session.begin(subtransactions=True): - qry = (context.session.query(loadbalancer_db.Pool.id). - join(loadbalancer_db.Vip)) - - qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING)) - qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING)) - up = True # makes pep8 and sqlalchemy happy - qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up) - qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up) agents = self.plugin.get_lbaas_agents(context, filters={'host': [host]}) if not agents: return [] elif len(agents) > 1: LOG.warning(_('Multiple lbaas agents found on host %s'), host) - pools = self.plugin.list_pools_on_lbaas_agent(context, agents[0].id) pool_ids = [pool['id'] for pool in pools['pools']] + + qry = context.session.query(loadbalancer_db.Pool.id) qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids)) + qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING)) + up = True # makes pep8 and sqlalchemy happy + qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up) return [id for id, in qry] - def get_logical_device(self, context, pool_id=None, activate=True, - **kwargs): + def get_logical_device(self, context, pool_id=None): with context.session.begin(subtransactions=True): qry = context.session.query(loadbalancer_db.Pool) qry = qry.filter_by(id=pool_id) pool = qry.one() - if activate: - # set all resources to active - if pool.status in ACTIVE_PENDING: - pool.status = constants.ACTIVE - - if pool.vip.status in ACTIVE_PENDING: - pool.vip.status = constants.ACTIVE - - for m in pool.members: - if m.status in ACTIVE_PENDING: - m.status = constants.ACTIVE - - for hm in pool.monitors: - if hm.status in ACTIVE_PENDING: - hm.status = constants.ACTIVE - - if (pool.status != constants.ACTIVE - or pool.vip.status != constants.ACTIVE): - raise q_exc.Invalid(_('Expected active pool and vip')) + if pool.status != constants.ACTIVE: + raise q_exc.Invalid(_('Expected active pool')) retval = {} retval['pool'] = self.plugin._make_pool_dict(pool) - retval['vip'] = self.plugin._make_vip_dict(pool.vip) - retval['vip']['port'] = ( - self.plugin._core_plugin._make_port_dict(pool.vip.port) - ) - for fixed_ip in retval['vip']['port']['fixed_ips']: - fixed_ip['subnet'] = ( - self.plugin._core_plugin.get_subnet( - context, - fixed_ip['subnet_id'] - ) + + if pool.vip: + retval['vip'] = self.plugin._make_vip_dict(pool.vip) + retval['vip']['port'] = ( + self.plugin._core_plugin._make_port_dict(pool.vip.port) ) + for fixed_ip in retval['vip']['port']['fixed_ips']: + fixed_ip['subnet'] = ( + self.plugin._core_plugin.get_subnet( + context, + fixed_ip['subnet_id'] + ) + ) retval['members'] = [ self.plugin._make_member_dict(m) for m in pool.members if m.status in (constants.ACTIVE, @@ -141,10 +131,49 @@ class LoadBalancerCallbacks(object): for hm in pool.monitors if hm.status == constants.ACTIVE ] + retval['driver'] = ( + self.plugin.drivers[pool.provider.provider_name].device_driver) return retval - def pool_destroyed(self, context, pool_id=None, host=None): + def pool_deployed(self, context, pool_id): + with context.session.begin(subtransactions=True): + qry = context.session.query(loadbalancer_db.Pool) + qry = qry.filter_by(id=pool_id) + pool = qry.one() + + # set all resources to active + if pool.status in ACTIVE_PENDING: + pool.status = constants.ACTIVE + + if pool.vip and pool.vip.status in ACTIVE_PENDING: + pool.vip.status = constants.ACTIVE + + for m in pool.members: + if m.status in ACTIVE_PENDING: + m.status = constants.ACTIVE + + for hm in pool.monitors: + if hm.status in ACTIVE_PENDING: + hm.status = constants.ACTIVE + + def update_status(self, context, obj_type, obj_id, status): + model_mapping = { + 'pool': loadbalancer_db.Pool, + 'vip': loadbalancer_db.Vip, + 'member': loadbalancer_db.Member, + 'health_monitor': loadbalancer_db.PoolMonitorAssociation + } + if obj_type not in model_mapping: + raise q_exc.Invalid(_('Unknown object type: %s') % obj_type) + elif obj_type == 'health_monitor': + self.plugin.update_pool_health_monitor( + context, obj_id['monitor_id'], obj_id['pool_id'], status) + else: + self.plugin.update_status( + context, model_mapping[obj_type], obj_id, status) + + def pool_destroyed(self, context, pool_id=None): """Agent confirmation hook that a pool has been destroyed. This method exists for subclasses to change the deletion @@ -214,65 +243,116 @@ class LoadBalancerCallbacks(object): class LoadBalancerAgentApi(proxy.RpcProxy): """Plugin side of plugin to agent RPC API.""" - BASE_RPC_API_VERSION = '1.0' + BASE_RPC_API_VERSION = '2.0' # history # 1.0 Initial version # 1.1 Support agent_updated call + # 2.0 Generic API for agent based drivers + # - modify/reload/destroy_pool methods were removed; + # - added methods to handle create/update/delete for every lbaas + # object individually; def __init__(self, topic): super(LoadBalancerAgentApi, self).__init__( topic, default_version=self.BASE_RPC_API_VERSION) - def reload_pool(self, context, pool_id, host): + def _cast(self, context, method_name, method_args, host, version=None): return self.cast( context, - self.make_msg('reload_pool', pool_id=pool_id, host=host), - topic='%s.%s' % (self.topic, host) + self.make_msg(method_name, **method_args), + topic='%s.%s' % (self.topic, host), + version=version ) - def destroy_pool(self, context, pool_id, host): - return self.cast( - context, - self.make_msg('destroy_pool', pool_id=pool_id, host=host), - topic='%s.%s' % (self.topic, host) - ) + def create_vip(self, context, vip, host): + return self._cast(context, 'create_vip', {'vip': vip}, host) - def modify_pool(self, context, pool_id, host): - return self.cast( - context, - self.make_msg('modify_pool', pool_id=pool_id, host=host), - topic='%s.%s' % (self.topic, host) - ) + def update_vip(self, context, old_vip, vip, host): + return self._cast(context, 'update_vip', + {'old_vip': old_vip, 'vip': vip}, host) + + def delete_vip(self, context, vip, host): + return self._cast(context, 'delete_vip', {'vip': vip}, host) + + def create_pool(self, context, pool, host, driver_name): + return self._cast(context, 'create_pool', + {'pool': pool, 'driver_name': driver_name}, host) + + def update_pool(self, context, old_pool, pool, host): + return self._cast(context, 'update_pool', + {'old_pool': old_pool, 'pool': pool}, host) + + def delete_pool(self, context, pool, host): + return self._cast(context, 'delete_pool', {'pool': pool}, host) + + def create_member(self, context, member, host): + return self._cast(context, 'create_member', {'member': member}, host) + + def update_member(self, context, old_member, member, host): + return self._cast(context, 'update_member', + {'old_member': old_member, 'member': member}, host) + + def delete_member(self, context, member, host): + return self._cast(context, 'delete_member', {'member': member}, host) + + def create_pool_health_monitor(self, context, health_monitor, pool_id, + host): + return self._cast(context, 'create_pool_health_monitor', + {'health_monitor': health_monitor, + 'pool_id': pool_id}, host) + + def update_pool_health_monitor(self, context, old_health_monitor, + health_monitor, pool_id, host): + return self._cast(context, 'update_pool_health_monitor', + {'old_health_monitor': old_health_monitor, + 'health_monitor': health_monitor, + 'pool_id': pool_id}, host) + + def delete_pool_health_monitor(self, context, health_monitor, pool_id, + host): + return self._cast(context, 'delete_pool_health_monitor', + {'health_monitor': health_monitor, + 'pool_id': pool_id}, host) def agent_updated(self, context, admin_state_up, host): - return self.cast( - context, - self.make_msg('agent_updated', - payload={'admin_state_up': admin_state_up}), - topic='%s.%s' % (self.topic, host), - version='1.1' - ) + return self._cast(context, 'agent_updated', + {'payload': {'admin_state_up': admin_state_up}}, + host) -class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver): +class AgentBasedPluginDriver(abstract_driver.LoadBalancerAbstractDriver): + + # name of device driver that should be used by the agent; + # vendor specific plugin drivers must override it; + device_driver = None def __init__(self, plugin): - self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT) - self.callbacks = LoadBalancerCallbacks(plugin) + if not self.device_driver: + raise DriverNotSpecified() + + self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT) - self.conn = rpc.create_connection(new=True) - self.conn.create_consumer( - TOPIC_PROCESS_ON_HOST, - self.callbacks.create_rpc_dispatcher(), - fanout=False) - self.conn.consume_in_thread() self.plugin = plugin + self._set_callbacks_on_plugin() self.plugin.agent_notifiers.update( {q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc}) self.pool_scheduler = importutils.import_object( cfg.CONF.loadbalancer_pool_scheduler_driver) + def _set_callbacks_on_plugin(self): + # other agent based plugin driver might already set callbacks on plugin + if hasattr(self.plugin, 'agent_callbacks'): + return + + self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin) + self.plugin.conn = rpc.create_connection(new=True) + self.plugin.conn.create_consumer( + TOPIC_LOADBALANCER_PLUGIN, + self.plugin.agent_callbacks.create_rpc_dispatcher(), + fanout=False) + self.plugin.conn.consume_in_thread() + def get_pool_agent(self, context, pool_id): agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id) if not agent: @@ -281,80 +361,95 @@ class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver): def create_vip(self, context, vip): agent = self.get_pool_agent(context, vip['pool_id']) - self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host']) + self.agent_rpc.create_vip(context, vip, agent['host']) def update_vip(self, context, old_vip, vip): agent = self.get_pool_agent(context, vip['pool_id']) if vip['status'] in ACTIVE_PENDING: - self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host']) + self.agent_rpc.update_vip(context, old_vip, vip, agent['host']) else: - self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host']) + self.agent_rpc.delete_vip(context, vip, agent['host']) def delete_vip(self, context, vip): self.plugin._delete_db_vip(context, vip['id']) agent = self.get_pool_agent(context, vip['pool_id']) - self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host']) + self.agent_rpc.delete_vip(context, vip, agent['host']) def create_pool(self, context, pool): - if not self.pool_scheduler.schedule(self.plugin, context, pool): + agent = self.pool_scheduler.schedule(self.plugin, context, pool, + self.device_driver) + if not agent: raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id']) - # don't notify here because a pool needs a vip to be useful + self.agent_rpc.create_pool(context, pool, agent['host'], + self.device_driver) def update_pool(self, context, old_pool, pool): agent = self.get_pool_agent(context, pool['id']) if pool['status'] in ACTIVE_PENDING: - if pool['vip_id'] is not None: - self.agent_rpc.reload_pool(context, pool['id'], agent['host']) + self.agent_rpc.update_pool(context, old_pool, pool, + agent['host']) else: - self.agent_rpc.destroy_pool(context, pool['id'], agent['host']) + self.agent_rpc.delete_pool(context, pool, agent['host']) def delete_pool(self, context, pool): + # get agent first to know host as binding will be deleted + # after pool is deleted from db agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id']) - if agent: - self.agent_rpc.destroy_pool(context, pool['id'], - agent['agent']['host']) self.plugin._delete_db_pool(context, pool['id']) + if agent: + self.agent_rpc.delete_pool(context, pool, agent['agent']['host']) def create_member(self, context, member): agent = self.get_pool_agent(context, member['pool_id']) - self.agent_rpc.modify_pool(context, member['pool_id'], agent['host']) + self.agent_rpc.create_member(context, member, agent['host']) def update_member(self, context, old_member, member): + agent = self.get_pool_agent(context, member['pool_id']) # member may change pool id if member['pool_id'] != old_member['pool_id']: - agent = self.plugin.get_lbaas_agent_hosting_pool( + old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool( context, old_member['pool_id']) - if agent: - self.agent_rpc.modify_pool(context, - old_member['pool_id'], - agent['agent']['host']) - agent = self.get_pool_agent(context, member['pool_id']) - self.agent_rpc.modify_pool(context, member['pool_id'], agent['host']) + if old_pool_agent: + self.agent_rpc.delete_member(context, old_member, + old_pool_agent['agent']['host']) + self.agent_rpc.create_member(context, member, agent['host']) + else: + self.agent_rpc.update_member(context, old_member, member, + agent['host']) def delete_member(self, context, member): self.plugin._delete_db_member(context, member['id']) agent = self.get_pool_agent(context, member['pool_id']) - self.agent_rpc.modify_pool(context, member['pool_id'], agent['host']) - - def update_health_monitor(self, context, old_health_monitor, - health_monitor, pool_id): - # monitors are unused here because agent will fetch what is necessary - agent = self.get_pool_agent(context, pool_id) - self.agent_rpc.modify_pool(context, pool_id, agent['host']) + self.agent_rpc.delete_member(context, member, agent['host']) def create_pool_health_monitor(self, context, healthmon, pool_id): # healthmon is not used here agent = self.get_pool_agent(context, pool_id) - self.agent_rpc.modify_pool(context, pool_id, agent['host']) + self.agent_rpc.create_pool_health_monitor(context, healthmon, + pool_id, agent['host']) + + def update_pool_health_monitor(self, context, old_health_monitor, + health_monitor, pool_id): + agent = self.get_pool_agent(context, pool_id) + self.agent_rpc.update_pool_health_monitor(context, old_health_monitor, + health_monitor, pool_id, + agent['host']) def delete_pool_health_monitor(self, context, health_monitor, pool_id): self.plugin._delete_db_pool_health_monitor( context, health_monitor['id'], pool_id ) - # healthmon_id is not used here agent = self.get_pool_agent(context, pool_id) - self.agent_rpc.modify_pool(context, pool_id, agent['host']) + self.agent_rpc.delete_pool_health_monitor(context, health_monitor, + pool_id, agent['host']) def stats(self, context, pool_id): pass + + +class HaproxyOnHostPluginDriver(AgentBasedPluginDriver): + #TODO(obondarev): change hardcoded driver name + # to namespace_driver.DRIVER_NAME after moving HaproxyOnHostPluginDriver + # to a separate file (follow-up patch) + device_driver = 'haproxy_ns' diff --git a/neutron/services/loadbalancer/drivers/radware/driver.py b/neutron/services/loadbalancer/drivers/radware/driver.py index b76a74c14c..e639d0adfd 100644 --- a/neutron/services/loadbalancer/drivers/radware/driver.py +++ b/neutron/services/loadbalancer/drivers/radware/driver.py @@ -285,9 +285,9 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver): # Anything to do here? the hm is not connected to the graph yet pass - def update_health_monitor(self, context, old_health_monitor, - health_monitor, - pool_id): + def update_pool_health_monitor(self, context, old_health_monitor, + health_monitor, + pool_id): self._handle_pool_health_monitor(context, health_monitor, pool_id) def create_pool_health_monitor(self, context, diff --git a/neutron/services/loadbalancer/plugin.py b/neutron/services/loadbalancer/plugin.py index e773af1a98..29813579ae 100644 --- a/neutron/services/loadbalancer/plugin.py +++ b/neutron/services/loadbalancer/plugin.py @@ -242,8 +242,8 @@ class LoadBalancerPlugin(ldb.LoadBalancerPluginDb, ).filter_by(monitor_id=hm['id']).join(ldb.Pool) for assoc in qry: driver = self._get_driver_for_pool(context, assoc['pool_id']) - driver.update_health_monitor(context, old_hm, - hm, assoc['pool_id']) + driver.update_pool_health_monitor(context, old_hm, + hm, assoc['pool_id']) return hm def _delete_db_pool_health_monitor(self, context, hm_id, pool_id): diff --git a/neutron/tests/unit/db/loadbalancer/test_db_loadbalancer.py b/neutron/tests/unit/db/loadbalancer/test_db_loadbalancer.py index f1f2a64207..e99cc479af 100644 --- a/neutron/tests/unit/db/loadbalancer/test_db_loadbalancer.py +++ b/neutron/tests/unit/db/loadbalancer/test_db_loadbalancer.py @@ -102,9 +102,9 @@ class NoopLbaaSDriver(abstract_driver.LoadBalancerAbstractDriver): def delete_member(self, context, member): self.plugin._delete_db_member(context, member["id"]) - def update_health_monitor(self, context, old_health_monitor, - health_monitor, - pool_association): + def update_pool_health_monitor(self, context, old_health_monitor, + health_monitor, + pool_association): pass def create_pool_health_monitor(self, context, diff --git a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_agent_manager.py b/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_agent_manager.py index d7bffc2c5d..328dc22c9e 100644 --- a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_agent_manager.py +++ b/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_agent_manager.py @@ -20,129 +20,20 @@ import contextlib import mock +from neutron.plugins.common import constants from neutron.services.loadbalancer.drivers.haproxy import ( agent_manager as manager ) from neutron.tests import base -class TestLogicalDeviceCache(base.BaseTestCase): - def setUp(self): - super(TestLogicalDeviceCache, self).setUp() - self.cache = manager.LogicalDeviceCache() - - def test_put(self): - fake_device = { - 'vip': {'port_id': 'port_id'}, - 'pool': {'id': 'pool_id'} - } - self.cache.put(fake_device) - - self.assertEqual(len(self.cache.devices), 1) - self.assertEqual(len(self.cache.port_lookup), 1) - self.assertEqual(len(self.cache.pool_lookup), 1) - - def test_double_put(self): - fake_device = { - 'vip': {'port_id': 'port_id'}, - 'pool': {'id': 'pool_id'} - } - self.cache.put(fake_device) - self.cache.put(fake_device) - - self.assertEqual(len(self.cache.devices), 1) - self.assertEqual(len(self.cache.port_lookup), 1) - self.assertEqual(len(self.cache.pool_lookup), 1) - - def test_remove_in_cache(self): - fake_device = { - 'vip': {'port_id': 'port_id'}, - 'pool': {'id': 'pool_id'} - } - self.cache.put(fake_device) - - self.assertEqual(len(self.cache.devices), 1) - - self.cache.remove(fake_device) - - self.assertFalse(len(self.cache.devices)) - self.assertFalse(self.cache.port_lookup) - self.assertFalse(self.cache.pool_lookup) - - def test_remove_in_cache_same_object(self): - fake_device = { - 'vip': {'port_id': 'port_id'}, - 'pool': {'id': 'pool_id'} - } - self.cache.put(fake_device) - - self.assertEqual(len(self.cache.devices), 1) - - self.cache.remove(set(self.cache.devices).pop()) - - self.assertFalse(len(self.cache.devices)) - self.assertFalse(self.cache.port_lookup) - self.assertFalse(self.cache.pool_lookup) - - def test_remove_by_pool_id(self): - fake_device = { - 'vip': {'port_id': 'port_id'}, - 'pool': {'id': 'pool_id'} - } - self.cache.put(fake_device) - - self.assertEqual(len(self.cache.devices), 1) - - self.cache.remove_by_pool_id('pool_id') - - self.assertFalse(len(self.cache.devices)) - self.assertFalse(self.cache.port_lookup) - self.assertFalse(self.cache.pool_lookup) - - def test_get_by_pool_id(self): - fake_device = { - 'vip': {'port_id': 'port_id'}, - 'pool': {'id': 'pool_id'} - } - self.cache.put(fake_device) - - dev = self.cache.get_by_pool_id('pool_id') - - self.assertEqual(dev.pool_id, 'pool_id') - self.assertEqual(dev.port_id, 'port_id') - - def test_get_by_port_id(self): - fake_device = { - 'vip': {'port_id': 'port_id'}, - 'pool': {'id': 'pool_id'} - } - self.cache.put(fake_device) - - dev = self.cache.get_by_port_id('port_id') - - self.assertEqual(dev.pool_id, 'pool_id') - self.assertEqual(dev.port_id, 'port_id') - - def test_get_pool_ids(self): - fake_device = { - 'vip': {'port_id': 'port_id'}, - 'pool': {'id': 'pool_id'} - } - self.cache.put(fake_device) - - self.assertEqual(self.cache.get_pool_ids(), ['pool_id']) - - class TestManager(base.BaseTestCase): def setUp(self): super(TestManager, self).setUp() self.addCleanup(mock.patch.stopall) mock_conf = mock.Mock() - mock_conf.interface_driver = 'intdriver' - mock_conf.device_driver = 'devdriver' - mock_conf.AGENT.root_helper = 'sudo' - mock_conf.loadbalancer_state_path = '/the/path' + mock_conf.device_driver = ['devdriver'] self.mock_importer = mock.patch.object(manager, 'importutils').start() @@ -154,6 +45,9 @@ class TestManager(base.BaseTestCase): self.mgr = manager.LbaasAgentManager(mock_conf) self.rpc_mock = rpc_mock_cls.return_value self.log = mock.patch.object(manager, 'LOG').start() + self.driver_mock = mock.Mock() + self.mgr.device_drivers = {'devdriver': self.driver_mock} + self.mgr.instance_mapping = {'1': 'devdriver', '2': 'devdriver'} self.mgr.needs_resync = False def test_initialize_service_hook(self): @@ -174,64 +68,51 @@ class TestManager(base.BaseTestCase): self.assertFalse(sync.called) def test_collect_stats(self): - with mock.patch.object(self.mgr, 'cache') as cache: - cache.get_pool_ids.return_value = ['1', '2'] - self.mgr.collect_stats(mock.Mock()) - self.rpc_mock.update_pool_stats.assert_has_calls([ - mock.call('1', mock.ANY), - mock.call('2', mock.ANY) - ]) + self.mgr.collect_stats(mock.Mock()) + self.rpc_mock.update_pool_stats.assert_has_calls([ + mock.call('1', mock.ANY), + mock.call('2', mock.ANY) + ]) def test_collect_stats_exception(self): - with mock.patch.object(self.mgr, 'cache') as cache: - cache.get_pool_ids.return_value = ['1', '2'] - with mock.patch.object(self.mgr, 'driver') as driver: - driver.get_stats.side_effect = Exception + self.driver_mock.get_stats.side_effect = Exception - self.mgr.collect_stats(mock.Mock()) + self.mgr.collect_stats(mock.Mock()) - self.assertFalse(self.rpc_mock.called) - self.assertTrue(self.mgr.needs_resync) - self.assertTrue(self.log.exception.called) + self.assertFalse(self.rpc_mock.called) + self.assertTrue(self.mgr.needs_resync) + self.assertTrue(self.log.exception.called) - def test_vip_plug_callback(self): - self.mgr._vip_plug_callback('plug', {'id': 'id'}) - self.rpc_mock.plug_vip_port.assert_called_once_with('id') - - def test_vip_unplug_callback(self): - self.mgr._vip_plug_callback('unplug', {'id': 'id'}) - self.rpc_mock.unplug_vip_port.assert_called_once_with('id') - - def _sync_state_helper(self, cache, ready, refreshed, destroyed): + def _sync_state_helper(self, ready, reloaded, destroyed): with contextlib.nested( - mock.patch.object(self.mgr, 'cache'), - mock.patch.object(self.mgr, 'refresh_device'), - mock.patch.object(self.mgr, 'destroy_device') - ) as (mock_cache, refresh, destroy): + mock.patch.object(self.mgr, '_reload_pool'), + mock.patch.object(self.mgr, '_destroy_pool') + ) as (reload, destroy): - mock_cache.get_pool_ids.return_value = cache self.rpc_mock.get_ready_devices.return_value = ready self.mgr.sync_state() - self.assertEqual(len(refreshed), len(refresh.mock_calls)) + self.assertEqual(len(reloaded), len(reload.mock_calls)) self.assertEqual(len(destroyed), len(destroy.mock_calls)) - refresh.assert_has_calls([mock.call(i) for i in refreshed]) + reload.assert_has_calls([mock.call(i) for i in reloaded]) destroy.assert_has_calls([mock.call(i) for i in destroyed]) self.assertFalse(self.mgr.needs_resync) def test_sync_state_all_known(self): - self._sync_state_helper(['1', '2'], ['1', '2'], ['1', '2'], []) + self._sync_state_helper(['1', '2'], ['1', '2'], []) def test_sync_state_all_unknown(self): - self._sync_state_helper([], ['1', '2'], ['1', '2'], []) + self.mgr.instance_mapping = {} + self._sync_state_helper(['1', '2'], ['1', '2'], []) def test_sync_state_destroy_all(self): - self._sync_state_helper(['1', '2'], [], [], ['1', '2']) + self._sync_state_helper([], [], ['1', '2']) def test_sync_state_both(self): - self._sync_state_helper(['1'], ['2'], ['2'], ['1']) + self.mgr.instance_mapping = {'1': 'devdriver'} + self._sync_state_helper(['2'], ['2'], ['1']) def test_sync_state_exception(self): self.rpc_mock.get_ready_devices.side_effect = Exception @@ -241,127 +122,251 @@ class TestManager(base.BaseTestCase): self.assertTrue(self.log.exception.called) self.assertTrue(self.mgr.needs_resync) - def test_refresh_device_exists(self): - config = self.rpc_mock.get_logical_device.return_value + def test_reload_pool(self): + config = {'driver': 'devdriver'} + self.rpc_mock.get_logical_device.return_value = config + pool_id = 'new_id' + self.assertNotIn(pool_id, self.mgr.instance_mapping) - with mock.patch.object(self.mgr, 'driver') as driver: - with mock.patch.object(self.mgr, 'cache') as cache: - driver.exists.return_value = True + self.mgr._reload_pool(pool_id) - self.mgr.refresh_device(config) + self.driver_mock.deploy_instance.assert_called_once_with(config) + self.assertIn(pool_id, self.mgr.instance_mapping) + self.rpc_mock.pool_deployed.assert_called_once_with(pool_id) - driver.exists.assert_called_once_with(config) - driver.update.assert_called_once_with(config) - cache.put.assert_called_once_with(config) - self.assertFalse(self.mgr.needs_resync) + def test_reload_pool_driver_not_found(self): + config = {'driver': 'unknown_driver'} + self.rpc_mock.get_logical_device.return_value = config + pool_id = 'new_id' + self.assertNotIn(pool_id, self.mgr.instance_mapping) - def test_refresh_device_new(self): - config = self.rpc_mock.get_logical_device.return_value + self.mgr._reload_pool(pool_id) - with mock.patch.object(self.mgr, 'driver') as driver: - with mock.patch.object(self.mgr, 'cache') as cache: - driver.exists.return_value = False + self.assertTrue(self.log.error.called) + self.assertFalse(self.driver_mock.deploy_instance.called) + self.assertNotIn(pool_id, self.mgr.instance_mapping) + self.assertFalse(self.rpc_mock.pool_deployed.called) - self.mgr.refresh_device(config) + def test_reload_pool_exception_on_driver(self): + config = {'driver': 'devdriver'} + self.rpc_mock.get_logical_device.return_value = config + self.driver_mock.deploy_instance.side_effect = Exception + pool_id = 'new_id' + self.assertNotIn(pool_id, self.mgr.instance_mapping) - driver.exists.assert_called_once_with(config) - driver.create.assert_called_once_with(config) - cache.put.assert_called_once_with(config) - self.assertFalse(self.mgr.needs_resync) + self.mgr._reload_pool(pool_id) - def test_refresh_device_exception(self): - config = self.rpc_mock.get_logical_device.return_value + self.driver_mock.deploy_instance.assert_called_once_with(config) + self.assertNotIn(pool_id, self.mgr.instance_mapping) + self.assertFalse(self.rpc_mock.pool_deployed.called) + self.assertTrue(self.log.exception.called) + self.assertTrue(self.mgr.needs_resync) - with mock.patch.object(self.mgr, 'driver') as driver: - with mock.patch.object(self.mgr, 'cache') as cache: - driver.exists.side_effect = Exception - self.mgr.refresh_device(config) + def test_destroy_pool(self): + pool_id = '1' + self.assertIn(pool_id, self.mgr.instance_mapping) - driver.exists.assert_called_once_with(config) - self.assertTrue(self.mgr.needs_resync) - self.assertTrue(self.log.exception.called) - self.assertFalse(cache.put.called) + self.mgr._destroy_pool(pool_id) - def test_destroy_device_known(self): - with mock.patch.object(self.mgr, 'driver') as driver: - with mock.patch.object(self.mgr, 'cache') as cache: - cache.get_by_pool_id.return_value = True + self.driver_mock.undeploy_instance.assert_called_once_with(pool_id) + self.assertNotIn(pool_id, self.mgr.instance_mapping) + self.rpc_mock.pool_destroyed.assert_called_once_with(pool_id) + self.assertFalse(self.mgr.needs_resync) - self.mgr.destroy_device('pool_id') - cache.get_by_pool_id.assert_called_once_with('pool_id') - driver.destroy.assert_called_once_with('pool_id') - self.rpc_mock.pool_destroyed.assert_called_once_with( - 'pool_id' - ) - cache.remove.assert_called_once_with(True) - self.assertFalse(self.mgr.needs_resync) + def test_destroy_pool_exception_on_driver(self): + pool_id = '1' + self.assertIn(pool_id, self.mgr.instance_mapping) + self.driver_mock.undeploy_instance.side_effect = Exception - def test_destroy_device_unknown(self): - with mock.patch.object(self.mgr, 'driver') as driver: - with mock.patch.object(self.mgr, 'cache') as cache: - cache.get_by_pool_id.return_value = None + self.mgr._destroy_pool(pool_id) - self.mgr.destroy_device('pool_id') - cache.get_by_pool_id.assert_called_once_with('pool_id') - self.assertFalse(driver.destroy.called) + self.driver_mock.undeploy_instance.assert_called_once_with(pool_id) + self.assertIn(pool_id, self.mgr.instance_mapping) + self.assertFalse(self.rpc_mock.pool_destroyed.called) + self.assertTrue(self.log.exception.called) + self.assertTrue(self.mgr.needs_resync) - def test_destroy_device_exception(self): - with mock.patch.object(self.mgr, 'driver') as driver: - with mock.patch.object(self.mgr, 'cache') as cache: - cache.get_by_pool_id.return_value = True - driver.destroy.side_effect = Exception - - self.mgr.destroy_device('pool_id') - cache.get_by_pool_id.assert_called_once_with('pool_id') - - self.assertTrue(self.log.exception.called) - self.assertTrue(self.mgr.needs_resync) + def test_get_driver_unknown_device(self): + self.assertRaises(manager.DeviceNotFoundOnAgent, + self.mgr._get_driver, 'unknown') def test_remove_orphans(self): - with mock.patch.object(self.mgr, 'driver') as driver: - with mock.patch.object(self.mgr, 'cache') as cache: - cache.get_pool_ids.return_value = ['1', '2'] - self.mgr.remove_orphans() + self.mgr.remove_orphans() + self.driver_mock.remove_orphans.assert_called_once_with(['1', '2']) - driver.remove_orphans.assert_called_once_with(['1', '2']) + def test_create_vip(self): + vip = {'id': 'id1', 'pool_id': '1'} + self.mgr.create_vip(mock.Mock(), vip) + self.driver_mock.create_vip.assert_called_once_with(vip) + self.rpc_mock.update_status.assert_called_once_with('vip', vip['id'], + constants.ACTIVE) - def test_reload_pool(self): - with mock.patch.object(self.mgr, 'refresh_device') as refresh: - self.mgr.reload_pool(mock.Mock(), pool_id='pool_id') - refresh.assert_called_once_with('pool_id') + def test_create_vip_failed(self): + vip = {'id': 'id1', 'pool_id': '1'} + self.driver_mock.create_vip.side_effect = Exception + self.mgr.create_vip(mock.Mock(), vip) + self.driver_mock.create_vip.assert_called_once_with(vip) + self.rpc_mock.update_status.assert_called_once_with('vip', vip['id'], + constants.ERROR) - def test_modify_pool_known(self): - with mock.patch.object(self.mgr, 'refresh_device') as refresh: - with mock.patch.object(self.mgr, 'cache') as cache: - cache.get_by_pool_id.return_value = True + def test_update_vip(self): + old_vip = {'id': 'id1'} + vip = {'id': 'id1', 'pool_id': '1'} + self.mgr.update_vip(mock.Mock(), old_vip, vip) + self.driver_mock.update_vip.assert_called_once_with(old_vip, vip) + self.rpc_mock.update_status.assert_called_once_with('vip', vip['id'], + constants.ACTIVE) - self.mgr.reload_pool(mock.Mock(), pool_id='pool_id') + def test_update_vip_failed(self): + old_vip = {'id': 'id1'} + vip = {'id': 'id1', 'pool_id': '1'} + self.driver_mock.update_vip.side_effect = Exception + self.mgr.update_vip(mock.Mock(), old_vip, vip) + self.driver_mock.update_vip.assert_called_once_with(old_vip, vip) + self.rpc_mock.update_status.assert_called_once_with('vip', vip['id'], + constants.ERROR) - refresh.assert_called_once_with('pool_id') + def test_delete_vip(self): + vip = {'id': 'id1', 'pool_id': '1'} + self.mgr.delete_vip(mock.Mock(), vip) + self.driver_mock.delete_vip.assert_called_once_with(vip) - def test_modify_pool_unknown(self): - with mock.patch.object(self.mgr, 'refresh_device') as refresh: - with mock.patch.object(self.mgr, 'cache') as cache: - cache.get_by_pool_id.return_value = False + def test_create_pool(self): + pool = {'id': 'id1'} + self.assertNotIn(pool['id'], self.mgr.instance_mapping) + self.mgr.create_pool(mock.Mock(), pool, 'devdriver') + self.driver_mock.create_pool.assert_called_once_with(pool) + self.rpc_mock.update_status.assert_called_once_with('pool', pool['id'], + constants.ACTIVE) + self.assertIn(pool['id'], self.mgr.instance_mapping) - self.mgr.modify_pool(mock.Mock(), pool_id='pool_id') + def test_create_pool_failed(self): + pool = {'id': 'id1'} + self.assertNotIn(pool['id'], self.mgr.instance_mapping) + self.driver_mock.create_pool.side_effect = Exception + self.mgr.create_pool(mock.Mock(), pool, 'devdriver') + self.driver_mock.create_pool.assert_called_once_with(pool) + self.rpc_mock.update_status.assert_called_once_with('pool', pool['id'], + constants.ERROR) + self.assertNotIn(pool['id'], self.mgr.instance_mapping) - self.assertFalse(refresh.called) + def test_update_pool(self): + old_pool = {'id': '1'} + pool = {'id': '1'} + self.mgr.update_pool(mock.Mock(), old_pool, pool) + self.driver_mock.update_pool.assert_called_once_with(old_pool, pool) + self.rpc_mock.update_status.assert_called_once_with('pool', pool['id'], + constants.ACTIVE) - def test_destroy_pool_known(self): - with mock.patch.object(self.mgr, 'destroy_device') as destroy: - with mock.patch.object(self.mgr, 'cache') as cache: - cache.get_by_pool_id.return_value = True + def test_update_pool_failed(self): + old_pool = {'id': '1'} + pool = {'id': '1'} + self.driver_mock.update_pool.side_effect = Exception + self.mgr.update_pool(mock.Mock(), old_pool, pool) + self.driver_mock.update_pool.assert_called_once_with(old_pool, pool) + self.rpc_mock.update_status.assert_called_once_with('pool', pool['id'], + constants.ERROR) - self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id') + def test_delete_pool(self): + pool = {'id': '1'} + self.assertIn(pool['id'], self.mgr.instance_mapping) + self.mgr.delete_pool(mock.Mock(), pool) + self.driver_mock.delete_pool.assert_called_once_with(pool) + self.assertNotIn(pool['id'], self.mgr.instance_mapping) - destroy.assert_called_once_with('pool_id') + def test_create_member(self): + member = {'id': 'id1', 'pool_id': '1'} + self.mgr.create_member(mock.Mock(), member) + self.driver_mock.create_member.assert_called_once_with(member) + self.rpc_mock.update_status.assert_called_once_with('member', + member['id'], + constants.ACTIVE) - def test_destroy_pool_unknown(self): - with mock.patch.object(self.mgr, 'destroy_device') as destroy: - with mock.patch.object(self.mgr, 'cache') as cache: - cache.get_by_pool_id.return_value = False + def test_create_member_failed(self): + member = {'id': 'id1', 'pool_id': '1'} + self.driver_mock.create_member.side_effect = Exception + self.mgr.create_member(mock.Mock(), member) + self.driver_mock.create_member.assert_called_once_with(member) + self.rpc_mock.update_status.assert_called_once_with('member', + member['id'], + constants.ERROR) - self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id') + def test_update_member(self): + old_member = {'id': 'id1'} + member = {'id': 'id1', 'pool_id': '1'} + self.mgr.update_member(mock.Mock(), old_member, member) + self.driver_mock.update_member.assert_called_once_with(old_member, + member) + self.rpc_mock.update_status.assert_called_once_with('member', + member['id'], + constants.ACTIVE) - self.assertFalse(destroy.called) + def test_update_member_failed(self): + old_member = {'id': 'id1'} + member = {'id': 'id1', 'pool_id': '1'} + self.driver_mock.update_member.side_effect = Exception + self.mgr.update_member(mock.Mock(), old_member, member) + self.driver_mock.update_member.assert_called_once_with(old_member, + member) + self.rpc_mock.update_status.assert_called_once_with('member', + member['id'], + constants.ERROR) + + def test_delete_member(self): + member = {'id': 'id1', 'pool_id': '1'} + self.mgr.delete_member(mock.Mock(), member) + self.driver_mock.delete_member.assert_called_once_with(member) + + def test_create_monitor(self): + monitor = {'id': 'id1'} + assoc_id = {'monitor_id': monitor['id'], 'pool_id': '1'} + self.mgr.create_pool_health_monitor(mock.Mock(), monitor, '1') + self.driver_mock.create_pool_health_monitor.assert_called_once_with( + monitor, '1') + self.rpc_mock.update_status.assert_called_once_with('health_monitor', + assoc_id, + constants.ACTIVE) + + def test_create_monitor_failed(self): + monitor = {'id': 'id1'} + assoc_id = {'monitor_id': monitor['id'], 'pool_id': '1'} + self.driver_mock.create_pool_health_monitor.side_effect = Exception + self.mgr.create_pool_health_monitor(mock.Mock(), monitor, '1') + self.driver_mock.create_pool_health_monitor.assert_called_once_with( + monitor, '1') + self.rpc_mock.update_status.assert_called_once_with('health_monitor', + assoc_id, + constants.ERROR) + + def test_update_monitor(self): + monitor = {'id': 'id1'} + assoc_id = {'monitor_id': monitor['id'], 'pool_id': '1'} + self.mgr.update_pool_health_monitor(mock.Mock(), monitor, monitor, '1') + self.driver_mock.update_pool_health_monitor.assert_called_once_with( + monitor, monitor, '1') + self.rpc_mock.update_status.assert_called_once_with('health_monitor', + assoc_id, + constants.ACTIVE) + + def test_update_monitor_failed(self): + monitor = {'id': 'id1'} + assoc_id = {'monitor_id': monitor['id'], 'pool_id': '1'} + self.driver_mock.update_pool_health_monitor.side_effect = Exception + self.mgr.update_pool_health_monitor(mock.Mock(), monitor, monitor, '1') + self.driver_mock.update_pool_health_monitor.assert_called_once_with( + monitor, monitor, '1') + self.rpc_mock.update_status.assert_called_once_with('health_monitor', + assoc_id, + constants.ERROR) + + def test_delete_monitor(self): + monitor = {'id': 'id1'} + self.mgr.delete_pool_health_monitor(mock.Mock(), monitor, '1') + self.driver_mock.delete_pool_health_monitor.assert_called_once_with( + monitor, '1') + + def test_agent_disabled(self): + payload = {'admin_state_up': False} + self.mgr.agent_updated(mock.Mock(), payload) + self.driver_mock.undeploy_instance.assert_has_calls( + [mock.call('1'), mock.call('2')]) diff --git a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_api.py b/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_api.py index 032a3e2a62..0d9ce3a39f 100644 --- a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_api.py +++ b/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_api.py @@ -58,8 +58,7 @@ class TestApiCache(base.BaseTestCase): self.make_msg.assert_called_once_with( 'get_logical_device', - pool_id='pool_id', - host='host') + pool_id='pool_id') self.mock_call.assert_called_once_with( mock.sentinel.context, @@ -75,8 +74,41 @@ class TestApiCache(base.BaseTestCase): self.make_msg.assert_called_once_with( 'pool_destroyed', - pool_id='pool_id', - host='host') + pool_id='pool_id') + + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) + + def test_pool_deployed(self): + self.assertEqual( + self.api.pool_deployed('pool_id'), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with( + 'pool_deployed', + pool_id='pool_id') + + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) + + def test_update_status(self): + self.assertEqual( + self.api.update_status('pool', 'pool_id', 'ACTIVE'), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with( + 'update_status', + obj_type='pool', + obj_id='pool_id', + status='ACTIVE') self.mock_call.assert_called_once_with( mock.sentinel.context, diff --git a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_cfg.py b/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_cfg.py index 9552707fc9..6b40393fd4 100644 --- a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_cfg.py +++ b/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_cfg.py @@ -18,9 +18,8 @@ # @author: Oleg Bondarev (obondarev@mirantis.com) import contextlib -import mock -from oslo.config import cfg as config +import mock from neutron.services.loadbalancer.drivers.haproxy import cfg from neutron.tests import base @@ -50,9 +49,6 @@ class TestHaproxyCfg(base.BaseTestCase): '\n'.join(test_config)) def test_build_global(self): - if not hasattr(config.CONF, 'user_group'): - config.CONF.register_opt(config.StrOpt('user_group')) - config.CONF.set_override('user_group', 'test_group') expected_opts = ['global', '\tdaemon', '\tuser nobody', @@ -60,9 +56,8 @@ class TestHaproxyCfg(base.BaseTestCase): '\tlog /dev/log local0', '\tlog /dev/log local1 notice', '\tstats socket test_path mode 0666 level user'] - opts = cfg._build_global(mock.Mock(), 'test_path') + opts = cfg._build_global(mock.Mock(), 'test_path', 'test_group') self.assertEqual(expected_opts, list(opts)) - config.CONF.reset() def test_build_defaults(self): expected_opts = ['defaults', @@ -74,7 +69,6 @@ class TestHaproxyCfg(base.BaseTestCase): '\ttimeout server 50000'] opts = cfg._build_defaults(mock.Mock()) self.assertEqual(expected_opts, list(opts)) - config.CONF.reset() def test_build_frontend(self): test_config = {'vip': {'id': 'vip_id', diff --git a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_namespace_driver.py b/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_namespace_driver.py index 78a45f5fcc..49fba19244 100644 --- a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_namespace_driver.py +++ b/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_namespace_driver.py @@ -17,6 +17,7 @@ # @author: Mark McClain, DreamHost import contextlib + import mock from neutron.common import exceptions @@ -29,22 +30,33 @@ from neutron.tests import base class TestHaproxyNSDriver(base.BaseTestCase): def setUp(self): super(TestHaproxyNSDriver, self).setUp() + self.addCleanup(mock.patch.stopall) - self.vif_driver = mock.Mock() - self.vip_plug_callback = mock.Mock() + conf = mock.Mock() + conf.haproxy.loadbalancer_state_path = '/the/path' + conf.interface_driver = 'intdriver' + conf.haproxy.user_group = 'test_group' + conf.AGENT.root_helper = 'sudo_test' + self.mock_importer = mock.patch.object(namespace_driver, + 'importutils').start() + self.rpc_mock = mock.Mock() self.driver = namespace_driver.HaproxyNSDriver( - 'sudo', - '/the/path', - self.vif_driver, - self.vip_plug_callback + conf, + self.rpc_mock ) + self.vif_driver = mock.Mock() + self.driver.vif_driver = self.vif_driver self.fake_config = { 'pool': {'id': 'pool_id'}, - 'vip': {'id': 'vip_id', 'port': {'id': 'port_id'}} + 'vip': {'id': 'vip_id', 'port': {'id': 'port_id'}, + 'status': 'ACTIVE', 'admin_state_up': True} } + def test_get_name(self): + self.assertEqual(self.driver.get_name(), namespace_driver.DRIVER_NAME) + def test_create(self): with mock.patch.object(self.driver, '_plug') as plug: with mock.patch.object(self.driver, '_spawn') as spawn: @@ -78,14 +90,15 @@ class TestHaproxyNSDriver(base.BaseTestCase): self.driver._spawn(self.fake_config) - mock_save.assert_called_once_with('conf', self.fake_config, 'sock') + mock_save.assert_called_once_with('conf', self.fake_config, + 'sock', 'test_group') cmd = ['haproxy', '-f', 'conf', '-p', 'pid'] ip_wrap.assert_has_calls([ - mock.call('sudo', 'qlbaas-pool_id'), + mock.call('sudo_test', 'qlbaas-pool_id'), mock.call().netns.execute(cmd) ]) - def test_destroy(self): + def test_undeploy_instance(self): with contextlib.nested( mock.patch.object(self.driver, '_get_state_file_path'), mock.patch.object(namespace_driver, 'kill_pids_in_file'), @@ -99,14 +112,14 @@ class TestHaproxyNSDriver(base.BaseTestCase): self.driver.pool_to_port_id['pool_id'] = 'port_id' isdir.return_value = True - self.driver.destroy('pool_id') + self.driver.undeploy_instance('pool_id') - kill.assert_called_once_with('sudo', '/pool/pid') + kill.assert_called_once_with('sudo_test', '/pool/pid') unplug.assert_called_once_with('qlbaas-pool_id', 'port_id') - isdir.called_once_with('/pool') + isdir.assert_called_once_with('/pool') rmtree.assert_called_once_with('/pool') ip_wrap.assert_has_calls([ - mock.call('sudo', 'qlbaas-pool_id'), + mock.call('sudo_test', 'qlbaas-pool_id'), mock.call().garbage_collect_namespace() ]) @@ -125,7 +138,7 @@ class TestHaproxyNSDriver(base.BaseTestCase): self.driver.exists('pool_id') ip_wrap.assert_has_calls([ - mock.call('sudo'), + mock.call('sudo_test'), mock.call().netns.exists('qlbaas-pool_id') ]) @@ -220,7 +233,8 @@ class TestHaproxyNSDriver(base.BaseTestCase): ip_net.prefixlen = 24 self.driver._plug('test_ns', test_port) - self.vip_plug_callback.assert_called_once_with('plug', test_port) + self.rpc_mock.plug_vip_port.assert_called_once_with( + test_port['id']) self.assertTrue(dev_exists.called) self.vif_driver.plug.assert_called_once_with('net_id', 'port_id', 'test_interface', @@ -232,7 +246,7 @@ class TestHaproxyNSDriver(base.BaseTestCase): 'test_ns') cmd = ['route', 'add', 'default', 'gw', '10.0.0.1'] ip_wrap.assert_has_calls([ - mock.call('sudo', namespace='test_ns'), + mock.call('sudo_test', namespace='test_ns'), mock.call().netns.execute(cmd, check_exit_code=False), ]) @@ -257,7 +271,8 @@ class TestHaproxyNSDriver(base.BaseTestCase): ip_net.prefixlen = 24 self.driver._plug('test_ns', test_port) - self.vip_plug_callback.assert_called_once_with('plug', test_port) + self.rpc_mock.plug_vip_port.assert_called_once_with( + test_port['id']) self.assertTrue(dev_exists.called) self.vif_driver.plug.assert_called_once_with('net_id', 'port_id', 'test_interface', @@ -276,8 +291,7 @@ class TestHaproxyNSDriver(base.BaseTestCase): self.vif_driver.get_device_name.return_value = 'test_interface' self.driver._unplug('test_ns', 'port_id') - self.vip_plug_callback.assert_called_once_with('unplug', - {'id': 'port_id'}) + self.rpc_mock.unplug_vip_port.assert_called_once_with('port_id') self.vif_driver.unplug('test_interface', namespace='test_ns') def test_kill_pids_in_file(self): @@ -293,20 +307,130 @@ class TestHaproxyNSDriver(base.BaseTestCase): file_mock.__iter__.return_value = iter(['123']) path_exists.return_value = False - namespace_driver.kill_pids_in_file('sudo', 'test_path') + namespace_driver.kill_pids_in_file('sudo_test', 'test_path') path_exists.assert_called_once_with('test_path') self.assertFalse(mock_open.called) self.assertFalse(mock_execute.called) path_exists.return_value = True mock_execute.side_effect = RuntimeError - namespace_driver.kill_pids_in_file('sudo', 'test_path') + namespace_driver.kill_pids_in_file('sudo_test', 'test_path') self.assertTrue(mock_log.called) mock_execute.assert_called_once_with( - ['kill', '-9', '123'], 'sudo') + ['kill', '-9', '123'], 'sudo_test') def test_get_state_file_path(self): with mock.patch('os.makedirs') as mkdir: path = self.driver._get_state_file_path('pool_id', 'conf') self.assertEqual('/the/path/pool_id/conf', path) mkdir.assert_called_once_with('/the/path/pool_id', 0o755) + + def test_deploy_instance(self): + with mock.patch.object(self.driver, 'exists') as exists: + with mock.patch.object(self.driver, 'update') as update: + self.driver.deploy_instance(self.fake_config) + exists.assert_called_once_with(self.fake_config['pool']['id']) + update.assert_called_once_with(self.fake_config) + + def test_deploy_instance_non_existing(self): + with mock.patch.object(self.driver, 'exists') as exists: + with mock.patch.object(self.driver, 'create') as create: + exists.return_value = False + self.driver.deploy_instance(self.fake_config) + exists.assert_called_once_with(self.fake_config['pool']['id']) + create.assert_called_once_with(self.fake_config) + + def test_deploy_instance_vip_status_non_active(self): + with mock.patch.object(self.driver, 'exists') as exists: + self.fake_config['vip']['status'] = 'NON_ACTIVE' + self.driver.deploy_instance(self.fake_config) + self.assertFalse(exists.called) + + def test_deploy_instance_vip_admin_state_down(self): + with mock.patch.object(self.driver, 'exists') as exists: + self.fake_config['vip']['admin_state_up'] = False + self.driver.deploy_instance(self.fake_config) + self.assertFalse(exists.called) + + def test_deploy_instance_no_vip(self): + with mock.patch.object(self.driver, 'exists') as exists: + del self.fake_config['vip'] + self.driver.deploy_instance(self.fake_config) + self.assertFalse(exists.called) + + def test_refresh_device(self): + with mock.patch.object(self.driver, 'deploy_instance') as deploy: + pool_id = 'pool_id1' + self.driver._refresh_device(pool_id) + self.rpc_mock.get_logical_device.assert_called_once_with(pool_id) + deploy.assert_called_once_with( + self.rpc_mock.get_logical_device.return_value) + + def test_create_vip(self): + with mock.patch.object(self.driver, '_refresh_device') as refresh: + self.driver.create_vip({'pool_id': '1'}) + refresh.assert_called_once_with('1') + + def test_update_vip(self): + with mock.patch.object(self.driver, '_refresh_device') as refresh: + self.driver.update_vip({}, {'pool_id': '1'}) + refresh.assert_called_once_with('1') + + def test_delete_vip(self): + with mock.patch.object(self.driver, 'undeploy_instance') as undeploy: + self.driver.delete_vip({'pool_id': '1'}) + undeploy.assert_called_once_with('1') + + def test_create_pool(self): + with mock.patch.object(self.driver, '_refresh_device') as refresh: + self.driver.create_pool({'id': '1'}) + self.assertFalse(refresh.called) + + def test_update_pool(self): + with mock.patch.object(self.driver, '_refresh_device') as refresh: + self.driver.update_pool({}, {'id': '1'}) + refresh.assert_called_once_with('1') + + def test_delete_pool_existing(self): + with mock.patch.object(self.driver, 'undeploy_instance') as undeploy: + with mock.patch.object(self.driver, 'exists') as exists: + exists.return_value = True + self.driver.delete_pool({'id': '1'}) + undeploy.assert_called_once_with('1') + + def test_delete_pool_non_existing(self): + with mock.patch.object(self.driver, 'undeploy_instance') as undeploy: + with mock.patch.object(self.driver, 'exists') as exists: + exists.return_value = False + self.driver.delete_pool({'id': '1'}) + self.assertFalse(undeploy.called) + + def test_create_member(self): + with mock.patch.object(self.driver, '_refresh_device') as refresh: + self.driver.create_member({'pool_id': '1'}) + refresh.assert_called_once_with('1') + + def test_update_member(self): + with mock.patch.object(self.driver, '_refresh_device') as refresh: + self.driver.update_member({}, {'pool_id': '1'}) + refresh.assert_called_once_with('1') + + def test_delete_member(self): + with mock.patch.object(self.driver, '_refresh_device') as refresh: + self.driver.delete_member({'pool_id': '1'}) + refresh.assert_called_once_with('1') + + def test_create_pool_health_monitor(self): + with mock.patch.object(self.driver, '_refresh_device') as refresh: + self.driver.create_pool_health_monitor('', '1') + refresh.assert_called_once_with('1') + + def test_update_pool_health_monitor(self): + with mock.patch.object(self.driver, '_refresh_device') as refresh: + self.driver.update_pool_health_monitor('', '', '1') + refresh.assert_called_once_with('1') + + def test_delete_pool_health_monitor(self): + with mock.patch.object(self.driver, '_refresh_device') as refresh: + self.driver.delete_pool_health_monitor('', '1') + refresh.assert_called_once_with('1') diff --git a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_plugin_driver.py b/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_plugin_driver.py index d58e5bd5fb..5c521fdaea 100644 --- a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_plugin_driver.py +++ b/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_plugin_driver.py @@ -16,6 +16,8 @@ # # @author: Mark McClain, DreamHost +import contextlib + import mock from webob import exc @@ -39,14 +41,21 @@ class TestLoadBalancerPluginBase( test_db_loadbalancer.LoadBalancerPluginDbTestCase): def setUp(self): + def reset_device_driver(): + plugin_driver.AgentBasedPluginDriver.device_driver = None + self.addCleanup(reset_device_driver) + + self.mock_importer = mock.patch.object( + plugin_driver, 'importutils').start() + self.addCleanup(mock.patch.stopall) + # needed to reload provider configuration st_db.ServiceTypeManager._instance = None + plugin_driver.AgentBasedPluginDriver.device_driver = 'dummy' super(TestLoadBalancerPluginBase, self).setUp( lbaas_provider=('LOADBALANCER:lbaas:neutron.services.' 'loadbalancer.drivers.haproxy.plugin_driver.' - 'HaproxyOnHostPluginDriver:default')) - # create another API instance to make testing easier - # pass a mock to our API instance + 'AgentBasedPluginDriver:default')) # we need access to loaded plugins to modify models loaded_plugins = manager.NeutronManager().get_service_plugins() @@ -66,12 +75,6 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): '.LbaasAgentSchedulerDbMixin.get_lbaas_agents') get_lbaas_agents_patcher.start() - # mocking plugin_driver create_pool() as it does nothing more than - # pool scheduling which is beyond the scope of this test case - mock.patch('neutron.services.loadbalancer.drivers.haproxy' - '.plugin_driver.HaproxyOnHostPluginDriver' - '.create_pool').start() - self.addCleanup(mock.patch.stopall) def test_get_ready_devices(self): @@ -132,10 +135,10 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): {'id': pools[1].id}, {'id': pools[2].id}]} ready = self.callbacks.get_ready_devices(ctx) - self.assertEqual(len(ready), 2) + self.assertEqual(len(ready), 3) self.assertIn(pools[0].id, ready) self.assertIn(pools[1].id, ready) - self.assertNotIn(pools[2].id, ready) + self.assertIn(pools[2].id, ready) # cleanup ctx.session.query(ldb.Pool).delete() ctx.session.query(ldb.Vip).delete() @@ -158,7 +161,7 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): ready = self.callbacks.get_ready_devices( context.get_admin_context(), ) - self.assertFalse(ready) + self.assertEqual([vip['vip']['pool_id']], ready) def test_get_ready_devices_inactive_pool(self): with self.vip() as vip: @@ -188,15 +191,20 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): exceptions.Invalid, self.callbacks.get_logical_device, context.get_admin_context(), - pool['pool']['id'], - activate=False - ) + pool['pool']['id']) - def test_get_logical_device_activate(self): + def test_get_logical_device_active(self): with self.pool() as pool: with self.vip(pool=pool) as vip: with self.member(pool_id=vip['vip']['pool_id']) as member: ctx = context.get_admin_context() + # activate objects + self.plugin_instance.update_status( + ctx, ldb.Pool, pool['pool']['id'], 'ACTIVE') + self.plugin_instance.update_status( + ctx, ldb.Member, member['member']['id'], 'ACTIVE') + self.plugin_instance.update_status( + ctx, ldb.Vip, vip['vip']['id'], 'ACTIVE') # build the expected port = self.plugin_instance._core_plugin.get_port( @@ -221,11 +229,12 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): 'pool': pool, 'vip': vip['vip'], 'members': [member['member']], - 'healthmonitors': [] + 'healthmonitors': [], + 'driver': 'dummy' } logical_config = self.callbacks.get_logical_device( - ctx, pool['id'], activate=True + ctx, pool['id'] ) self.assertEqual(logical_config, expected) @@ -246,7 +255,7 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): 'INACTIVE') logical_config = self.callbacks.get_logical_device( - ctx, pool['pool']['id'], activate=False) + ctx, pool['pool']['id']) member['member']['status'] = constants.INACTIVE self.assertEqual([member['member']], @@ -308,6 +317,58 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): host='host' ) + def test_pool_deployed(self): + with self.pool() as pool: + with self.vip(pool=pool) as vip: + with self.member(pool_id=vip['vip']['pool_id']) as member: + ctx = context.get_admin_context() + p = self.plugin_instance.get_pool(ctx, pool['pool']['id']) + self.assertEqual('PENDING_CREATE', p['status']) + v = self.plugin_instance.get_vip(ctx, vip['vip']['id']) + self.assertEqual('PENDING_CREATE', v['status']) + m = self.plugin_instance.get_member( + ctx, member['member']['id']) + self.assertEqual('PENDING_CREATE', m['status']) + + self.callbacks.pool_deployed(ctx, pool['pool']['id']) + + p = self.plugin_instance.get_pool(ctx, pool['pool']['id']) + self.assertEqual('ACTIVE', p['status']) + v = self.plugin_instance.get_vip(ctx, vip['vip']['id']) + self.assertEqual('ACTIVE', v['status']) + m = self.plugin_instance.get_member( + ctx, member['member']['id']) + self.assertEqual('ACTIVE', m['status']) + + def test_update_status_pool(self): + with self.pool() as pool: + pool_id = pool['pool']['id'] + ctx = context.get_admin_context() + p = self.plugin_instance.get_pool(ctx, pool_id) + self.assertEqual('PENDING_CREATE', p['status']) + self.callbacks.update_status(ctx, 'pool', pool_id, 'ACTIVE') + p = self.plugin_instance.get_pool(ctx, pool_id) + self.assertEqual('ACTIVE', p['status']) + + def test_update_status_health_monitor(self): + with contextlib.nested( + self.pool(), + self.health_monitor() + ) as (pool, hm): + pool_id = pool['pool']['id'] + ctx = context.get_admin_context() + self.plugin_instance.create_pool_health_monitor(ctx, hm, pool_id) + hm_id = hm['health_monitor']['id'] + h = self.plugin_instance.get_pool_health_monitor(ctx, hm_id, + pool_id) + self.assertEqual('PENDING_CREATE', h['status']) + self.callbacks.update_status( + ctx, 'health_monitor', + {'monitor_id': hm_id, 'pool_id': pool_id}, 'ACTIVE') + h = self.plugin_instance.get_pool_health_monitor(ctx, hm_id, + pool_id) + self.assertEqual('ACTIVE', h['status']) + class TestLoadBalancerAgentApi(base.BaseTestCase): def setUp(self): @@ -321,46 +382,73 @@ class TestLoadBalancerAgentApi(base.BaseTestCase): def test_init(self): self.assertEqual(self.api.topic, 'topic') - def _call_test_helper(self, method_name): - rv = getattr(self.api, method_name)(mock.sentinel.context, 'test', - 'host') - self.assertEqual(rv, self.mock_cast.return_value) - self.mock_cast.assert_called_once_with( - mock.sentinel.context, - self.mock_msg.return_value, - topic='topic.host' - ) - - self.mock_msg.assert_called_once_with( - method_name, - pool_id='test', - host='host' - ) - - def test_reload_pool(self): - self._call_test_helper('reload_pool') - - def test_destroy_pool(self): - self._call_test_helper('destroy_pool') - - def test_modify_pool(self): - self._call_test_helper('modify_pool') - - def test_agent_updated(self): - rv = self.api.agent_updated(mock.sentinel.context, True, 'host') + def _call_test_helper(self, method_name, method_args): + rv = getattr(self.api, method_name)(mock.sentinel.context, + host='host', + **method_args) self.assertEqual(rv, self.mock_cast.return_value) self.mock_cast.assert_called_once_with( mock.sentinel.context, self.mock_msg.return_value, topic='topic.host', - version='1.1' + version=None ) + if method_name == 'agent_updated': + method_args = {'payload': method_args} self.mock_msg.assert_called_once_with( - 'agent_updated', - payload={'admin_state_up': True} + method_name, + **method_args ) + def test_agent_updated(self): + self._call_test_helper('agent_updated', {'admin_state_up': 'test'}) + + def test_create_pool(self): + self._call_test_helper('create_pool', {'pool': 'test', + 'driver_name': 'dummy'}) + + def test_update_pool(self): + self._call_test_helper('update_pool', {'old_pool': 'test', + 'pool': 'test'}) + + def test_delete_pool(self): + self._call_test_helper('delete_pool', {'pool': 'test'}) + + def test_create_vip(self): + self._call_test_helper('create_vip', {'vip': 'test'}) + + def test_update_vip(self): + self._call_test_helper('update_vip', {'old_vip': 'test', + 'vip': 'test'}) + + def test_delete_vip(self): + self._call_test_helper('delete_vip', {'vip': 'test'}) + + def test_create_member(self): + self._call_test_helper('create_member', {'member': 'test'}) + + def test_update_member(self): + self._call_test_helper('update_member', {'old_member': 'test', + 'member': 'test'}) + + def test_delete_member(self): + self._call_test_helper('delete_member', {'member': 'test'}) + + def test_create_monitor(self): + self._call_test_helper('create_pool_health_monitor', + {'health_monitor': 'test', 'pool_id': 'test'}) + + def test_update_monitor(self): + self._call_test_helper('update_pool_health_monitor', + {'old_health_monitor': 'test', + 'health_monitor': 'test', + 'pool_id': 'test'}) + + def test_delete_monitor(self): + self._call_test_helper('delete_pool_health_monitor', + {'health_monitor': 'test', 'pool_id': 'test'}) + class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): def setUp(self): @@ -370,16 +458,10 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): super(TestLoadBalancerPluginNotificationWrapper, self).setUp() self.mock_api = api_cls.return_value - # mocking plugin_driver create_pool() as it does nothing more than - # pool scheduling which is beyond the scope of this test case - mock.patch('neutron.services.loadbalancer.drivers.haproxy' - '.plugin_driver.HaproxyOnHostPluginDriver' - '.create_pool').start() - self.mock_get_driver = mock.patch.object(self.plugin_instance, '_get_driver') self.mock_get_driver.return_value = (plugin_driver. - HaproxyOnHostPluginDriver( + AgentBasedPluginDriver( self.plugin_instance )) @@ -389,9 +471,9 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): with self.subnet() as subnet: with self.pool(subnet=subnet) as pool: with self.vip(pool=pool, subnet=subnet) as vip: - self.mock_api.reload_pool.assert_called_once_with( + self.mock_api.create_vip.assert_called_once_with( mock.ANY, - vip['vip']['pool_id'], + vip['vip'], 'host' ) @@ -399,8 +481,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): with self.subnet() as subnet: with self.pool(subnet=subnet) as pool: with self.vip(pool=pool, subnet=subnet) as vip: - self.mock_api.reset_mock() ctx = context.get_admin_context() + old_vip = vip['vip'].copy() vip['vip'].pop('status') new_vip = self.plugin_instance.update_vip( ctx, @@ -408,9 +490,10 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): vip ) - self.mock_api.reload_pool.assert_called_once_with( + self.mock_api.update_vip.assert_called_once_with( mock.ANY, - vip['vip']['pool_id'], + old_vip, + new_vip, 'host' ) @@ -423,51 +506,55 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): with self.subnet() as subnet: with self.pool(subnet=subnet) as pool: with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip: - self.mock_api.reset_mock() ctx = context.get_admin_context() self.plugin_instance.delete_vip(ctx, vip['vip']['id']) - self.mock_api.destroy_pool.assert_called_once_with( + vip['vip']['status'] = 'PENDING_DELETE' + self.mock_api.delete_vip.assert_called_once_with( mock.ANY, - vip['vip']['pool_id'], + vip['vip'], 'host' ) def test_create_pool(self): - with self.pool(): - self.assertFalse(self.mock_api.reload_pool.called) - self.assertFalse(self.mock_api.modify_pool.called) - self.assertFalse(self.mock_api.destroy_pool.called) + with self.pool() as pool: + self.mock_api.create_pool.assert_called_once_with( + mock.ANY, + pool['pool'], + mock.ANY, + 'dummy' + ) def test_update_pool_non_active(self): with self.pool() as pool: pool['pool']['status'] = 'INACTIVE' ctx = context.get_admin_context() + orig_pool = pool['pool'].copy() del pool['pool']['provider'] self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool) - self.mock_api.destroy_pool.assert_called_once_with( - mock.ANY, pool['pool']['id'], 'host') - self.assertFalse(self.mock_api.reload_pool.called) - self.assertFalse(self.mock_api.modify_pool.called) + self.mock_api.delete_pool.assert_called_once_with( + mock.ANY, orig_pool, 'host') def test_update_pool_no_vip_id(self): with self.pool() as pool: ctx = context.get_admin_context() + orig_pool = pool['pool'].copy() del pool['pool']['provider'] - self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool) - self.assertFalse(self.mock_api.destroy_pool.called) - self.assertFalse(self.mock_api.reload_pool.called) - self.assertFalse(self.mock_api.modify_pool.called) + updated = self.plugin_instance.update_pool( + ctx, pool['pool']['id'], pool) + self.mock_api.update_pool.assert_called_once_with( + mock.ANY, orig_pool, updated, 'host') def test_update_pool_with_vip_id(self): with self.pool() as pool: - with self.vip(pool=pool): + with self.vip(pool=pool) as vip: ctx = context.get_admin_context() + old_pool = pool['pool'].copy() + old_pool['vip_id'] = vip['vip']['id'] del pool['pool']['provider'] - self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool) - self.mock_api.reload_pool.assert_called_once_with( - mock.ANY, pool['pool']['id'], 'host') - self.assertFalse(self.mock_api.destroy_pool.called) - self.assertFalse(self.mock_api.modify_pool.called) + updated = self.plugin_instance.update_pool( + ctx, pool['pool']['id'], pool) + self.mock_api.update_pool.assert_called_once_with( + mock.ANY, old_pool, updated, 'host') def test_delete_pool(self): with self.pool(no_delete=True) as pool: @@ -475,26 +562,26 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): pool['pool']['id']) res = req.get_response(self.ext_api) self.assertEqual(res.status_int, exc.HTTPNoContent.code) - self.mock_api.destroy_pool.assert_called_once_with( - mock.ANY, pool['pool']['id'], 'host') + pool['pool']['status'] = 'PENDING_DELETE' + self.mock_api.delete_pool.assert_called_once_with( + mock.ANY, pool['pool'], 'host') def test_create_member(self): with self.pool() as pool: pool_id = pool['pool']['id'] - with self.member(pool_id=pool_id): - self.mock_api.modify_pool.assert_called_once_with( - mock.ANY, pool_id, 'host') + with self.member(pool_id=pool_id) as member: + self.mock_api.create_member.assert_called_once_with( + mock.ANY, member['member'], 'host') def test_update_member(self): with self.pool() as pool: pool_id = pool['pool']['id'] with self.member(pool_id=pool_id) as member: ctx = context.get_admin_context() - self.mock_api.modify_pool.reset_mock() - self.plugin_instance.update_member( + updated = self.plugin_instance.update_member( ctx, member['member']['id'], member) - self.mock_api.modify_pool.assert_called_once_with( - mock.ANY, pool_id, 'host') + self.mock_api.update_member.assert_called_once_with( + mock.ANY, member['member'], updated, 'host') def test_update_member_new_pool(self): with self.pool() as pool1: @@ -502,89 +589,105 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): with self.pool() as pool2: pool2_id = pool2['pool']['id'] with self.member(pool_id=pool1_id) as member: + self.mock_api.create_member.reset_mock() ctx = context.get_admin_context() - self.mock_api.modify_pool.reset_mock() + old_member = member['member'].copy() member['member']['pool_id'] = pool2_id - self.plugin_instance.update_member(ctx, - member['member']['id'], - member) - self.assertEqual(2, self.mock_api.modify_pool.call_count) - self.mock_api.modify_pool.assert_has_calls( - [mock.call(mock.ANY, pool1_id, 'host'), - mock.call(mock.ANY, pool2_id, 'host')]) + updated = self.plugin_instance.update_member( + ctx, member['member']['id'], member) + self.mock_api.delete_member.assert_called_once_with( + mock.ANY, old_member, 'host') + self.mock_api.create_member.assert_called_once_with( + mock.ANY, updated, 'host') def test_delete_member(self): with self.pool() as pool: pool_id = pool['pool']['id'] with self.member(pool_id=pool_id, no_delete=True) as member: - self.mock_api.modify_pool.reset_mock() req = self.new_delete_request('members', member['member']['id']) res = req.get_response(self.ext_api) self.assertEqual(res.status_int, exc.HTTPNoContent.code) - self.mock_api.modify_pool.assert_called_once_with( - mock.ANY, pool_id, 'host') + member['member']['status'] = 'PENDING_DELETE' + self.mock_api.delete_member.assert_called_once_with( + mock.ANY, member['member'], 'host') def test_create_pool_health_monitor(self): - with self.pool() as pool: + with contextlib.nested( + self.pool(), + self.health_monitor() + ) as (pool, hm): pool_id = pool['pool']['id'] - with self.health_monitor() as hm: - ctx = context.get_admin_context() - self.plugin_instance.create_pool_health_monitor(ctx, - hm, - pool_id) - self.mock_api.modify_pool.assert_called_once_with( - mock.ANY, pool_id, 'host') + ctx = context.get_admin_context() + self.plugin_instance.create_pool_health_monitor(ctx, hm, pool_id) + # hm now has a ref to the pool with which it is associated + hm = self.plugin.get_health_monitor( + ctx, hm['health_monitor']['id']) + self.mock_api.create_pool_health_monitor.assert_called_once_with( + mock.ANY, hm, pool_id, 'host') def test_delete_pool_health_monitor(self): - with self.pool() as pool: + with contextlib.nested( + self.pool(), + self.health_monitor() + ) as (pool, hm): pool_id = pool['pool']['id'] - with self.health_monitor() as hm: - ctx = context.get_admin_context() - self.plugin_instance.create_pool_health_monitor(ctx, - hm, - pool_id) - self.mock_api.modify_pool.reset_mock() - self.plugin_instance.delete_pool_health_monitor( - ctx, hm['health_monitor']['id'], pool_id) - self.mock_api.modify_pool.assert_called_once_with( - mock.ANY, pool_id, 'host') + ctx = context.get_admin_context() + self.plugin_instance.create_pool_health_monitor(ctx, hm, pool_id) + # hm now has a ref to the pool with which it is associated + hm = self.plugin.get_health_monitor( + ctx, hm['health_monitor']['id']) + hm['pools'][0]['status'] = 'PENDING_DELETE' + self.plugin_instance.delete_pool_health_monitor( + ctx, hm['id'], pool_id) + self.mock_api.delete_pool_health_monitor.assert_called_once_with( + mock.ANY, hm, pool_id, 'host') def test_update_health_monitor_associated_with_pool(self): - with self.health_monitor(type='HTTP') as monitor: - with self.pool() as pool: - data = { - 'health_monitor': { - 'id': monitor['health_monitor']['id'], - 'tenant_id': self._tenant_id - } + with contextlib.nested( + self.health_monitor(type='HTTP'), + self.pool() + ) as (monitor, pool): + data = { + 'health_monitor': { + 'id': monitor['health_monitor']['id'], + 'tenant_id': self._tenant_id } - req = self.new_create_request( - 'pools', - data, - fmt=self.fmt, - id=pool['pool']['id'], - subresource='health_monitors') - res = req.get_response(self.ext_api) - self.assertEqual(res.status_int, exc.HTTPCreated.code) - self.mock_api.modify_pool.assert_called_once_with( - mock.ANY, - pool['pool']['id'], - 'host' - ) + } + req = self.new_create_request( + 'pools', + data, + fmt=self.fmt, + id=pool['pool']['id'], + subresource='health_monitors') + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, exc.HTTPCreated.code) + # hm now has a ref to the pool with which it is associated + ctx = context.get_admin_context() + hm = self.plugin.get_health_monitor( + ctx, monitor['health_monitor']['id']) + self.mock_api.create_pool_health_monitor.assert_called_once_with( + mock.ANY, + hm, + pool['pool']['id'], + 'host' + ) - self.mock_api.reset_mock() - data = {'health_monitor': {'delay': 20, - 'timeout': 20, - 'max_retries': 2, - 'admin_state_up': False}} - req = self.new_update_request("health_monitors", - data, - monitor['health_monitor']['id']) - req.get_response(self.ext_api) - self.mock_api.modify_pool.assert_called_once_with( - mock.ANY, - pool['pool']['id'], - 'host' - ) + self.mock_api.reset_mock() + data = {'health_monitor': {'delay': 20, + 'timeout': 20, + 'max_retries': 2, + 'admin_state_up': False}} + updated = hm.copy() + updated.update(data['health_monitor']) + req = self.new_update_request("health_monitors", + data, + monitor['health_monitor']['id']) + req.get_response(self.ext_api) + self.mock_api.update_pool_health_monitor.assert_called_once_with( + mock.ANY, + hm, + updated, + pool['pool']['id'], + 'host') diff --git a/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py b/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py index b5abea4bc8..e9e1f38821 100644 --- a/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py +++ b/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py @@ -21,7 +21,7 @@ from neutron.api import extensions from neutron.api.v2 import attributes from neutron.common import constants from neutron import context -from neutron.db import servicetype_db as sdb +from neutron.db import servicetype_db as st_db from neutron.extensions import agent from neutron.extensions import lbaas_agentscheduler from neutron import manager @@ -79,8 +79,8 @@ class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn, 'HaproxyOnHostPluginDriver:default')], 'service_providers') - #force service type manager to reload configuration: - sdb.ServiceTypeManager._instance = None + # need to reload provider configuration + st_db.ServiceTypeManager._instance = None super(LBaaSAgentSchedulerTestCase, self).setUp( self.plugin_str, service_plugins=service_plugins) @@ -122,8 +122,7 @@ class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn, 'binary': 'neutron-loadbalancer-agent', 'host': LBAAS_HOSTA, 'topic': 'LOADBALANCER_AGENT', - 'configurations': {'device_driver': 'device_driver', - 'interface_driver': 'interface_driver'}, + 'configurations': {'device_drivers': ['haproxy_ns']}, 'agent_type': constants.AGENT_TYPE_LOADBALANCER} self._register_one_agent_state(lbaas_hosta) with self.pool() as pool: @@ -150,8 +149,7 @@ class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn, 'binary': 'neutron-loadbalancer-agent', 'host': LBAAS_HOSTA, 'topic': 'LOADBALANCER_AGENT', - 'configurations': {'device_driver': 'device_driver', - 'interface_driver': 'interface_driver'}, + 'configurations': {'device_drivers': ['haproxy_ns']}, 'agent_type': constants.AGENT_TYPE_LOADBALANCER} self._register_one_agent_state(lbaas_hosta) is_agent_down_str = 'neutron.db.agents_db.AgentDbMixin.is_agent_down' diff --git a/neutron/tests/unit/test_agent_ext_plugin.py b/neutron/tests/unit/test_agent_ext_plugin.py index eb65d79bbc..d46aaf739b 100644 --- a/neutron/tests/unit/test_agent_ext_plugin.py +++ b/neutron/tests/unit/test_agent_ext_plugin.py @@ -116,9 +116,7 @@ class AgentDBTestMixIn(object): 'binary': 'neutron-loadbalancer-agent', 'host': LBAAS_HOSTA, 'topic': 'LOADBALANCER_AGENT', - 'configurations': {'device_driver': 'device_driver', - 'interface_driver': 'interface_driver', - }, + 'configurations': {'device_drivers': ['haproxy_ns']}, 'agent_type': constants.AGENT_TYPE_LOADBALANCER} lbaas_hostb = copy.deepcopy(lbaas_hosta) lbaas_hostb['host'] = LBAAS_HOSTB