diff --git a/bin/quantum-lbaas-agent b/bin/quantum-lbaas-agent new file mode 100755 index 0000000000..a53e4574b2 --- /dev/null +++ b/bin/quantum-lbaas-agent @@ -0,0 +1,26 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 Openstack, LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import sys +sys.path.insert(0, os.getcwd()) + +from quantum.plugins.services.agent_loadbalancer.agent import main + + +main() diff --git a/etc/lbaas_agent.ini b/etc/lbaas_agent.ini new file mode 100644 index 0000000000..e3ea75c3d3 --- /dev/null +++ b/etc/lbaas_agent.ini @@ -0,0 +1,24 @@ +[DEFAULT] +# Show debugging output in log (sets DEBUG log level output) +# debug = true + +# The LBaaS agent will resync its state with Quantum to recover from any +# transient notification or rpc errors. The interval is number of +# seconds between attempts. +# periodic_interval = 10 + +# OVS based plugins(OVS, Ryu, NEC, NVP, BigSwitch/Floodlight) +interface_driver = quantum.agent.linux.interface.OVSInterfaceDriver +# OVS based plugins(Ryu, NEC, NVP, BigSwitch/Floodlight) that use OVS +# as OpenFlow switch and check port status +# ovs_use_veth = True +# LinuxBridge +# interface_driver = quantum.agent.linux.interface.BridgeInterfaceDriver + +# The agent requires a driver to manage the loadbalancer. HAProxy is the +# opensource version. +device_driver = quantum.plugins.services.agent_loadbalancer.drivers.haproxy.namespace_driver.HaproxyNSDriver + +# Allow overlapping IP (Must have kernel build with CONFIG_NET_NS=y and +# iproute2 package that supports namespaces). +# use_namespaces = True diff --git a/etc/quantum/rootwrap.d/lbaas-haproxy.filters b/etc/quantum/rootwrap.d/lbaas-haproxy.filters new file mode 100644 index 0000000000..e00a7197ae --- /dev/null +++ b/etc/quantum/rootwrap.d/lbaas-haproxy.filters @@ -0,0 +1,29 @@ +# quantum-rootwrap command filters for nodes on which quantum is +# expected to control network +# +# This file should be owned by (and only-writeable by) the root user + +# format seems to be +# cmd-name: filter-name, raw-command, user, args + +[Filters] + +# haproxy +haproxy: CommandFilter, /usr/sbin/haproxy, root + +# lbaas-agent uses kill as well, that's handled by the generic KillFilter +kill_haproxy_usr: KillFilter, root, /usr/sbin/haproxy, -9, -HUP + +# lbaas-agent uses cat +cat: RegExpFilter, /bin/cat, root, cat, /proc/\d+/cmdline + +ovs-vsctl: CommandFilter, /bin/ovs-vsctl, root +ovs-vsctl_usr: CommandFilter, /usr/bin/ovs-vsctl, root +ovs-vsctl_sbin: CommandFilter, /sbin/ovs-vsctl, root +ovs-vsctl_sbin_usr: CommandFilter, /usr/sbin/ovs-vsctl, root + +# ip_lib +ip: IpFilter, /sbin/ip, root +ip_usr: IpFilter, /usr/sbin/ip, root +ip_exec: IpNetnsExecFilter, /sbin/ip, root +ip_exec_usr: IpNetnsExecFilter, /usr/sbin/ip, root diff --git a/quantum/agent/linux/dhcp.py b/quantum/agent/linux/dhcp.py index 0c05fcaabb..38e7fd273f 100644 --- a/quantum/agent/linux/dhcp.py +++ b/quantum/agent/linux/dhcp.py @@ -21,7 +21,6 @@ import re import socket import StringIO import sys -import tempfile import netaddr from oslo.config import cfg @@ -187,7 +186,7 @@ class DhcpLocalProcess(DhcpBase): def interface_name(self, value): interface_file_path = self.get_conf_file_name('interface', ensure_conf_dir=True) - replace_file(interface_file_path, value) + utils.replace_file(interface_file_path, value) @abc.abstractmethod def spawn_process(self): @@ -298,7 +297,7 @@ class Dnsmasq(DhcpLocalProcess): (port.mac_address, name, alloc.ip_address)) name = self.get_conf_file_name('host') - replace_file(name, buf.getvalue()) + utils.replace_file(name, buf.getvalue()) return name def _output_opts_file(self): @@ -344,7 +343,7 @@ class Dnsmasq(DhcpLocalProcess): options.append(self._format_option(i, 'router')) name = self.get_conf_file_name('opts') - replace_file(name, '\n'.join(options)) + utils.replace_file(name, '\n'.join(options)) return name def _make_subnet_interface_ip_map(self): @@ -402,20 +401,3 @@ class Dnsmasq(DhcpLocalProcess): sock.connect(dhcp_relay_socket) sock.send(jsonutils.dumps(data)) sock.close() - - -def replace_file(file_name, data): - """Replaces the contents of file_name with data in a safe manner. - - First write to a temp file and then rename. Since POSIX renames are - atomic, the file is unlikely to be corrupted by competing writes. - - We create the tempfile on the same device to ensure that it can be renamed. - """ - - base_dir = os.path.dirname(os.path.abspath(file_name)) - tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False) - tmp_file.write(data) - tmp_file.close() - os.chmod(tmp_file.name, 0644) - os.rename(tmp_file.name, file_name) diff --git a/quantum/agent/linux/utils.py b/quantum/agent/linux/utils.py index a16d4815e1..cbfefffebe 100644 --- a/quantum/agent/linux/utils.py +++ b/quantum/agent/linux/utils.py @@ -22,6 +22,7 @@ import os import shlex import socket import struct +import tempfile from eventlet.green import subprocess @@ -71,3 +72,20 @@ def get_interface_mac(interface): struct.pack('256s', interface[:DEVICE_NAME_LEN])) return ''.join(['%02x:' % ord(char) for char in info[MAC_START:MAC_END]])[:-1] + + +def replace_file(file_name, data): + """Replaces the contents of file_name with data in a safe manner. + + First write to a temp file and then rename. Since POSIX renames are + atomic, the file is unlikely to be corrupted by competing writes. + + We create the tempfile on the same device to ensure that it can be renamed. + """ + + base_dir = os.path.dirname(os.path.abspath(file_name)) + tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False) + tmp_file.write(data) + tmp_file.close() + os.chmod(tmp_file.name, 0644) + os.rename(tmp_file.name, file_name) diff --git a/quantum/common/topics.py b/quantum/common/topics.py index 91970f2f98..4a25549586 100644 --- a/quantum/common/topics.py +++ b/quantum/common/topics.py @@ -25,9 +25,11 @@ UPDATE = 'update' AGENT = 'q-agent-notifier' PLUGIN = 'q-plugin' DHCP = 'q-dhcp-notifer' +LOADBALANCER_PLUGIN = 'q-loadbalancer-plugin' L3_AGENT = 'l3_agent' DHCP_AGENT = 'dhcp_agent' +LOADBALANCER_AGENT = 'loadbalancer_agent' def get_topic_name(prefix, table, operation): diff --git a/quantum/db/loadbalancer/loadbalancer_db.py b/quantum/db/loadbalancer/loadbalancer_db.py index 69c9bc6003..7158cfe104 100644 --- a/quantum/db/loadbalancer/loadbalancer_db.py +++ b/quantum/db/loadbalancer/loadbalancer_db.py @@ -69,7 +69,7 @@ class Vip(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): protocol_port = sa.Column(sa.Integer, nullable=False) protocol = sa.Column(sa.Enum("HTTP", "HTTPS", "TCP", name="lb_protocols"), nullable=False) - pool_id = sa.Column(sa.String(36), nullable=False) + pool_id = sa.Column(sa.String(36), nullable=False, unique=True) session_persistence = orm.relationship(SessionPersistence, uselist=False, backref="vips", @@ -114,6 +114,7 @@ class Pool(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): cascade="all, delete-orphan") monitors = orm.relationship("PoolMonitorAssociation", backref="pools", cascade="all, delete-orphan") + vip = orm.relationship(Vip, backref='pool') class HealthMonitor(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): @@ -239,6 +240,12 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): raise return r + def assert_modification_allowed(self, obj): + status = getattr(obj, 'status', None) + + if status == constants.PENDING_DELETE: + raise loadbalancer.StateInvalid(id=id, state=status) + ######################################################## # VIP DB access def _make_vip_dict(self, vip, fields=None): @@ -270,11 +277,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): return self._fields(res, fields) - def _update_pool_vip_info(self, context, pool_id, vip_id): - pool_db = self._get_resource(context, Pool, pool_id) - with context.session.begin(subtransactions=True): - pool_db.update({'vip_id': vip_id}) - def _check_session_persistence_info(self, info): """ Performs sanity check on session persistence info. :param info: Session persistence info @@ -355,6 +357,14 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): tenant_id = self._get_tenant_id_for_create(context, v) with context.session.begin(subtransactions=True): + # validate that the pool has same tenant + if v['pool_id']: + pool = self._get_resource(context, Pool, v['pool_id']) + if pool['tenant_id'] != tenant_id: + raise q_exc.NotAuthorized() + else: + pool = None + vip_db = Vip(id=uuidutils.generate_uuid(), tenant_id=tenant_id, name=v['name'], @@ -367,16 +377,18 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): admin_state_up=v['admin_state_up'], status=constants.PENDING_CREATE) - vip_id = vip_db['id'] session_info = v['session_persistence'] if session_info: - s_p = self._create_session_persistence_db(session_info, vip_id) + s_p = self._create_session_persistence_db( + session_info, + vip_db['id']) vip_db.session_persistence = s_p context.session.add(vip_db) context.session.flush() + # create a port to reserve address for IPAM self._create_port_for_vip( context, vip_db, @@ -384,7 +396,9 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): v.get('address') ) - self._update_pool_vip_info(context, v['pool_id'], vip_id) + if pool: + pool['vip_id'] = vip_db['id'] + return self._make_vip_dict(vip_db) def update_vip(self, context, id, vip): @@ -392,20 +406,36 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): sess_persist = v.pop('session_persistence', None) with context.session.begin(subtransactions=True): + vip_db = self._get_resource(context, Vip, id) + + self.assert_modification_allowed(vip_db) + if sess_persist: self._update_vip_session_persistence(context, id, sess_persist) else: self._delete_session_persistence(context, id) - vip_db = self._get_resource(context, Vip, id) - old_pool_id = vip_db['pool_id'] if v: vip_db.update(v) # If the pool_id is changed, we need to update # the associated pools if 'pool_id' in v: - self._update_pool_vip_info(context, old_pool_id, None) - self._update_pool_vip_info(context, v['pool_id'], id) + new_pool = self._get_resource(context, Pool, v['pool_id']) + self.assert_modification_allowed(new_pool) + + # check that the pool matches the tenant_id + if new_pool['tenant_id'] != vip_db['tenant_id']: + raise q_exc.NotAuthorized() + + if vip_db['pool_id']: + old_pool = self._get_resource( + context, + Pool, + vip_db['pool_id'] + ) + old_pool['vip_id'] = None + + new_pool['vip_id'] = vip_db['id'] return self._make_vip_dict(vip_db) @@ -432,7 +462,7 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): ######################################################## # Pool DB access - def _make_pool_dict(self, context, pool, fields=None): + def _make_pool_dict(self, pool, fields=None): res = {'id': pool['id'], 'tenant_id': pool['tenant_id'], 'name': pool['name'], @@ -453,16 +483,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): return self._fields(res, fields) - def _update_pool_member_info(self, context, pool_id, membersInfo): - with context.session.begin(subtransactions=True): - member_qry = context.session.query(Member) - for member_id in membersInfo: - try: - member = member_qry.filter_by(id=member_id).one() - member.update({'pool_id': pool_id}) - except exc.NoResultFound: - raise loadbalancer.MemberNotFound(member_id=member_id) - def _create_pool_stats(self, context, pool_id): # This is internal method to add pool statistics. It won't # be exposed to API @@ -504,17 +524,17 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): context.session.add(pool_db) pool_db = self._get_resource(context, Pool, pool_db['id']) - return self._make_pool_dict(context, pool_db) + return self._make_pool_dict(pool_db) def update_pool(self, context, id, pool): - v = pool['pool'] + p = pool['pool'] with context.session.begin(subtransactions=True): pool_db = self._get_resource(context, Pool, id) - if v: - pool_db.update(v) + if p: + pool_db.update(p) - return self._make_pool_dict(context, pool_db) + return self._make_pool_dict(pool_db) def delete_pool(self, context, id): # Check if the pool is in use @@ -529,15 +549,15 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): def get_pool(self, context, id, fields=None): pool = self._get_resource(context, Pool, id) - return self._make_pool_dict(context, pool, fields) + return self._make_pool_dict(pool, fields) def get_pools(self, context, filters=None, fields=None): collection = self._model_query(context, Pool) collection = self._apply_filters_to_query(collection, Pool, filters) - return [self._make_pool_dict(context, c, fields) + return [self._make_pool_dict(c, fields) for c in collection.all()] - def get_stats(self, context, pool_id): + def stats(self, context, pool_id): with context.session.begin(subtransactions=True): pool_qry = context.session.query(Pool) try: @@ -600,6 +620,7 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): raise loadbalancer.HealthMonitorNotFound(monitor_id=id) def get_pool_health_monitor(self, context, id, pool_id, fields=None): + # TODO(markmcclain) look into why pool_id is ignored healthmonitor = self._get_resource(context, HealthMonitor, id) return self._make_health_monitor_dict(healthmonitor, fields) @@ -644,7 +665,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): v = member['member'] with context.session.begin(subtransactions=True): member_db = self._get_resource(context, Member, id) - old_pool_id = member_db['pool_id'] if v: member_db.update(v) diff --git a/quantum/db/migration/alembic_migrations/versions/54c2c487e913_lbaas.py b/quantum/db/migration/alembic_migrations/versions/54c2c487e913_lbaas.py index 799300313a..09ed5208dc 100644 --- a/quantum/db/migration/alembic_migrations/versions/54c2c487e913_lbaas.py +++ b/quantum/db/migration/alembic_migrations/versions/54c2c487e913_lbaas.py @@ -58,6 +58,7 @@ def upgrade(active_plugin=None, options=None): sa.Column(u'admin_state_up', sa.Boolean(), nullable=False), sa.Column(u'connection_limit', sa.Integer(), nullable=True), sa.ForeignKeyConstraint(['port_id'], ['ports.id'], ), + sa.UniqueConstraint('pool_id'), sa.PrimaryKeyConstraint(u'id') ) op.create_table( diff --git a/quantum/extensions/loadbalancer.py b/quantum/extensions/loadbalancer.py index e1e6944d6b..75e68a6a6b 100644 --- a/quantum/extensions/loadbalancer.py +++ b/quantum/extensions/loadbalancer.py @@ -68,6 +68,7 @@ RESOURCE_ATTRIBUTE_MAP = { 'is_visible': True}, 'name': {'allow_post': True, 'allow_put': True, 'validate': {'type:string': None}, + 'default': '', 'is_visible': True}, 'description': {'allow_post': True, 'allow_put': True, 'validate': {'type:string': None}, @@ -128,6 +129,7 @@ RESOURCE_ATTRIBUTE_MAP = { 'is_visible': True}, 'name': {'allow_post': True, 'allow_put': True, 'validate': {'type:string': None}, + 'default': '', 'is_visible': True}, 'description': {'allow_post': True, 'allow_put': True, 'validate': {'type:string': None}, diff --git a/quantum/plugins/services/loadbalancer/__init__.py b/quantum/plugins/services/agent_loadbalancer/__init__.py similarity index 100% rename from quantum/plugins/services/loadbalancer/__init__.py rename to quantum/plugins/services/agent_loadbalancer/__init__.py diff --git a/quantum/plugins/services/agent_loadbalancer/agent/__init__.py b/quantum/plugins/services/agent_loadbalancer/agent/__init__.py new file mode 100644 index 0000000000..3632729c0b --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/agent/__init__.py @@ -0,0 +1,67 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import eventlet +from oslo.config import cfg + +from quantum.agent.common import config +from quantum.agent.linux import interface +from quantum.common import topics +from quantum.openstack.common.rpc import service as rpc_service +from quantum.openstack.common import service +from quantum.plugins.services.agent_loadbalancer.agent import manager + + +OPTS = [ + cfg.IntOpt( + 'periodic_interval', + default=10, + help=_('Seconds between periodic task runs') + ) +] + + +class LbaasAgentService(rpc_service.Service): + def start(self): + super(LbaasAgentService, self).start() + self.tg.add_timer( + cfg.CONF.periodic_interval, + self.manager.run_periodic_tasks, + None, + None + ) + + +def main(): + eventlet.monkey_patch() + cfg.CONF.register_opts(OPTS) + cfg.CONF.register_opts(manager.OPTS) + # import interface options just in case the driver uses namespaces + cfg.CONF.register_opts(interface.OPTS) + config.register_root_helper(cfg.CONF) + + cfg.CONF(project='quantum') + config.setup_logging(cfg.CONF) + + mgr = manager.LbaasAgentManager(cfg.CONF) + svc = LbaasAgentService( + host=cfg.CONF.host, + topic=topics.LOADBALANCER_AGENT, + manager=mgr + ) + service.launch(svc).wait() diff --git a/quantum/plugins/services/agent_loadbalancer/agent/api.py b/quantum/plugins/services/agent_loadbalancer/agent/api.py new file mode 100644 index 0000000000..cd4314d3dc --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/agent/api.py @@ -0,0 +1,81 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +from quantum.openstack.common.rpc import proxy + + +class LbaasAgentApi(proxy.RpcProxy): + """Agent side of the Agent to Plugin RPC API.""" + + API_VERSION = '1.0' + + def __init__(self, topic, context, host): + super(LbaasAgentApi, self).__init__(topic, self.API_VERSION) + self.context = context + self.host = host + + def get_ready_devices(self): + return self.call( + self.context, + self.make_msg('get_ready_devices', host=self.host), + topic=self.topic + ) + + def get_logical_device(self, pool_id): + return self.call( + self.context, + self.make_msg( + 'get_logical_device', + pool_id=pool_id, + host=self.host + ), + topic=self.topic + ) + + def pool_destroyed(self, pool_id): + return self.call( + self.context, + self.make_msg('pool_destroyed', pool_id=pool_id, host=self.host), + topic=self.topic + ) + + def plug_vip_port(self, port_id): + return self.call( + self.context, + self.make_msg('plug_vip_port', port_id=port_id, host=self.host), + topic=self.topic + ) + + def unplug_vip_port(self, port_id): + return self.call( + self.context, + self.make_msg('unplug_vip_port', port_id=port_id, host=self.host), + topic=self.topic + ) + + def update_pool_stats(self, pool_id, stats): + return self.call( + self.context, + self.make_msg( + 'update_pool_stats', + pool_id=pool_id, + stats=stats, + host=self.host + ), + topic=self.topic + ) diff --git a/quantum/plugins/services/agent_loadbalancer/agent/manager.py b/quantum/plugins/services/agent_loadbalancer/agent/manager.py new file mode 100644 index 0000000000..9dd2a70d36 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/agent/manager.py @@ -0,0 +1,221 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import weakref + +from oslo.config import cfg + +from quantum.agent.common import config +from quantum.common import topics +from quantum import context +from quantum.openstack.common import importutils +from quantum.openstack.common import log as logging +from quantum.openstack.common import periodic_task +from quantum.plugins.services.agent_loadbalancer.agent import api + +LOG = logging.getLogger(__name__) +NS_PREFIX = 'qlbaas-' + +OPTS = [ + cfg.StrOpt( + 'device_driver', + help=_('The driver used to manage the loadbalancing device'), + ), + cfg.StrOpt( + 'loadbalancer_state_path', + default='$state_path/lbaas', + help=_('Location to store config and state files'), + ), + cfg.StrOpt( + 'interface_driver', + help=_('The driver used to manage the virtual interface') + ) +] + + +class LogicalDeviceCache(object): + """Manage a cache of known devices.""" + + class Device(object): + """Inner classes used to hold values for weakref lookups""" + def __init__(self, port_id, pool_id): + self.port_id = port_id + self.pool_id = pool_id + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + def __hash__(self): + return hash((self.port_id, self.pool_id)) + + def __init__(self): + self.devices = set() + self.port_lookup = weakref.WeakValueDictionary() + self.pool_lookup = weakref.WeakValueDictionary() + + def put(self, device): + port_id = device['vip']['port_id'] + pool_id = device['pool']['id'] + d = self.Device(device['vip']['port_id'], device['pool']['id']) + if d not in self.devices: + self.devices.add(d) + self.port_lookup[port_id] = d + self.pool_lookup[pool_id] = d + + def remove(self, device): + if not isinstance(device, self.Device): + device = self.Device( + device['vip']['port_id'], device['pool']['id'] + ) + if device in self.devices: + self.devices.remove(device) + + def remove_by_pool_id(self, pool_id): + d = self.pool_lookup.get(pool_id) + if d: + self.devices.remove(d) + + def get_by_pool_id(self, pool_id): + return self.pool_lookup.get(pool_id) + + def get_by_port_id(self, port_id): + return self.port_lookup.get(port_id) + + def get_pool_ids(self): + return self.pool_lookup.keys() + + +class LbaasAgentManager(periodic_task.PeriodicTasks): + def __init__(self, conf): + self.conf = conf + try: + vif_driver = importutils.import_object(conf.interface_driver, conf) + except ImportError: + # the driver is optional + msg = _('Error importing interface driver: %s') + raise SystemExit(msg % conf.interface_driver) + vif_driver = None + + try: + self.driver = importutils.import_object( + conf.device_driver, + config.get_root_helper(self.conf), + conf.loadbalancer_state_path, + vif_driver, + self._vip_plug_callback + ) + except ImportError: + msg = _('Error importing loadbalancer device driver: %s') + raise SystemExit(msg % conf.device_driver) + ctx = context.get_admin_context_without_session() + self.plugin_rpc = api.LbaasAgentApi( + topics.LOADBALANCER_PLUGIN, + ctx, + conf.host + ) + self.needs_resync = False + self.cache = LogicalDeviceCache() + + def initialize_service_hook(self, started_by): + self.sync_state() + + @periodic_task.periodic_task + def periodic_resync(self, context): + if self.needs_resync: + self.needs_resync = False + self.sync_state() + + @periodic_task.periodic_task(ticks_between_runs=6) + def collect_stats(self, context): + for pool_id in self.cache.get_pool_ids(): + try: + stats = self.driver.get_stats(pool_id) + if stats: + self.plugin_rpc.update_pool_stats(pool_id, stats) + except Exception: + LOG.exception(_('Error upating stats')) + self.needs_resync = True + + def _vip_plug_callback(self, action, port): + if action == 'plug': + self.plugin_rpc.plug_vip_port(port['id']) + elif action == 'unplug': + self.plugin_rpc.unplug_vip_port(port['id']) + + def sync_state(self): + known_devices = set(self.cache.get_pool_ids()) + try: + ready_logical_devices = set(self.plugin_rpc.get_ready_devices()) + + for deleted_id in known_devices - ready_logical_devices: + self.destroy_device(deleted_id) + + for pool_id in ready_logical_devices: + self.refresh_device(pool_id) + + except Exception: + LOG.exception(_('Unable to retrieve ready devices')) + self.needs_resync = True + + self.remove_orphans() + + def refresh_device(self, pool_id): + try: + logical_config = self.plugin_rpc.get_logical_device(pool_id) + + if self.driver.exists(pool_id): + self.driver.update(logical_config) + else: + self.driver.create(logical_config) + self.cache.put(logical_config) + except Exception: + LOG.exception(_('Unable to refresh device for pool: %s'), pool_id) + self.needs_resync = True + + def destroy_device(self, pool_id): + device = self.cache.get_by_pool_id(pool_id) + if not device: + return + try: + self.driver.destroy(pool_id) + self.plugin_rpc.pool_destroyed(pool_id) + except Exception: + LOG.exception(_('Unable to destroy device for pool: %s'), pool_id) + self.needs_resync = True + self.cache.remove(device) + + def remove_orphans(self): + try: + self.driver.remove_orphans(self.cache.get_pool_ids()) + except NotImplementedError: + pass # Not all drivers will support this + + def reload_pool(self, context, pool_id=None, host=None): + """Handle RPC cast from plugin to reload a pool.""" + if pool_id: + self.refresh_device(pool_id) + + def modify_pool(self, context, pool_id=None, host=None): + """Handle RPC cast from plugin to modify a pool if known to agent.""" + if self.cache.get_by_pool_id(pool_id): + self.refresh_device(pool_id) + + def destroy_pool(self, context, pool_id=None, host=None): + """Handle RPC cast from plugin to destroy a pool if known to agent.""" + if self.cache.get_by_pool_id(pool_id): + self.destroy_device(pool_id) diff --git a/quantum/plugins/services/agent_loadbalancer/constants.py b/quantum/plugins/services/agent_loadbalancer/constants.py new file mode 100644 index 0000000000..82b049f173 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/constants.py @@ -0,0 +1,33 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +LB_METHOD_ROUND_ROBIN = 'ROUND_ROBIN' +LB_METHOD_LEAST_CONNECTIONS = 'LEAST_CONNECTIONS' +LB_METHOD_SOURCE_IP = 'SOURCE_IP' + +PROTOCOL_TCP = 'TCP' +PROTOCOL_HTTP = 'HTTP' +PROTOCOL_HTTPS = 'HTTPS' + +HEALTH_MONITOR_PING = 'PING' +HEALTH_MONITOR_TCP = 'TCP' +HEALTH_MONITOR_HTTP = 'HTTP' +HEALTH_MONITOR_HTTPS = 'HTTPS' + +SESSION_PERSISTENCE_SOURCE_IP = 'SOURCE_IP' +SESSION_PERSISTENCE_HTTP_COOKIE = 'HTTP_COOKIE' +SESSION_PERSISTENCE_APP_COOKIE = 'APP_COOKIE' diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/__init__.py b/quantum/plugins/services/agent_loadbalancer/drivers/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/drivers/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/__init__.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/cfg.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/cfg.py new file mode 100644 index 0000000000..936bcff303 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/cfg.py @@ -0,0 +1,184 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import itertools + +from quantum.agent.linux import utils +from quantum.plugins.common import constants as qconstants +from quantum.plugins.services.agent_loadbalancer import constants + + +PROTOCOL_MAP = { + constants.PROTOCOL_TCP: 'tcp', + constants.PROTOCOL_HTTP: 'http', + constants.PROTOCOL_HTTPS: 'tcp', +} + +BALANCE_MAP = { + constants.LB_METHOD_ROUND_ROBIN: 'roundrobin', + constants.LB_METHOD_LEAST_CONNECTIONS: 'leastconn', + constants.LB_METHOD_SOURCE_IP: 'source' +} + +ACTIVE = qconstants.ACTIVE + + +def save_config(conf_path, logical_config, socket_path=None): + """Convert a logical configuration to the HAProxy version""" + data = [] + data.extend(_build_global(logical_config, socket_path=socket_path)) + data.extend(_build_defaults(logical_config)) + data.extend(_build_frontend(logical_config)) + data.extend(_build_backend(logical_config)) + utils.replace_file(conf_path, '\n'.join(data)) + + +def _build_global(config, socket_path=None): + opts = [ + 'daemon', + 'user nobody', + 'group nogroup', + 'log /dev/log local0', + 'log /dev/log local1 notice' + ] + + if socket_path: + opts.append('stats socket %s mode 0666 level user' % socket_path) + + return itertools.chain(['global'], ('\t' + o for o in opts)) + + +def _build_defaults(config): + opts = [ + 'log global', + 'retries 3', + 'option redispatch', + 'timeout connect 5000', + 'timeout client 50000', + 'timeout server 50000', + ] + + return itertools.chain(['defaults'], ('\t' + o for o in opts)) + + +def _build_frontend(config): + protocol = config['vip']['protocol'] + + opts = [ + 'option tcplog', + 'bind %s:%d' % ( + _get_first_ip_from_port(config['vip']['port']), + config['vip']['protocol_port'] + ), + 'mode %s' % PROTOCOL_MAP[protocol], + 'default_backend %s' % config['pool']['id'], + ] + + if config['vip']['connection_limit'] >= 0: + opts.append('maxconn %s' % config['vip']['connection_limit']) + + if protocol == constants.PROTOCOL_HTTP: + opts.append('option forwardfor') + + return itertools.chain( + ['frontend %s' % config['vip']['id']], + ('\t' + o for o in opts) + ) + + +def _build_backend(config): + protocol = config['pool']['protocol'] + lb_method = config['pool']['lb_method'] + + opts = [ + 'mode %s' % PROTOCOL_MAP[protocol], + 'balance %s' % BALANCE_MAP.get(lb_method, 'roundrobin') + ] + + if protocol == constants.PROTOCOL_HTTP: + opts.append('option forwardfor') + + # add the first health_monitor (if available) + server_addon, health_opts = _get_server_health_option(config) + opts.extend(health_opts) + + # add the members + opts.extend( + (('server %(id)s %(address)s:%(protocol_port)s ' + 'weight %(weight)s') % member) + server_addon + for member in config['members'] + if (member['status'] == ACTIVE and member['admin_state_up']) + ) + + return itertools.chain( + ['backend %s' % config['pool']['id']], + ('\t' + o for o in opts) + ) + + +def _get_first_ip_from_port(port): + for fixed_ip in port['fixed_ips']: + return fixed_ip['ip_address'] + + +def _get_server_health_option(config): + """return the first active health option""" + for monitor in config['healthmonitors']: + if monitor['status'] == ACTIVE and monitor['admin_state_up']: + break + else: + return '', [] + + server_addon = ' check inter %(delay)ds fall %(max_retries)d' % monitor + opts = [ + 'timeout check %ds' % monitor['timeout'] + ] + + if monitor['type'] in (constants.HEALTH_MONITOR_HTTP, + constants.HEALTH_MONITOR_HTTPS): + opts.append('option httpchk %(http_method)s %(url_path)s' % monitor) + opts.append( + 'http-check expect rstatus %s' % + '|'.join(_expand_expected_codes(monitor['expected_codes'])) + ) + + if monitor['type'] == constants.HEALTH_MONITOR_HTTPS: + opts.append('option ssl-hello-chk') + + return server_addon, opts + + +def _expand_expected_codes(codes): + """Expand the expected code string in set of codes. + + 200-204 -> 200, 201, 202, 204 + 200, 203 -> 200, 203 + """ + + retval = set() + for code in codes.replace(',', ' ').split(' '): + code = code.strip() + + if not code: + continue + elif '-' in code: + low, hi = code.split('-')[:2] + retval.update(str(i) for i in xrange(int(low), int(hi))) + else: + retval.add(code) + return retval diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/namespace_driver.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/namespace_driver.py new file mode 100644 index 0000000000..f4df283d31 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/namespace_driver.py @@ -0,0 +1,182 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost +import os +import shutil +import socket + +import netaddr + +from quantum.agent.linux import ip_lib +from quantum.common import exceptions +from quantum.openstack.common import log as logging +from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import ( + cfg as hacfg +) + +LOG = logging.getLogger(__name__) +NS_PREFIX = 'qlbaas-' + + +class HaproxyNSDriver(object): + def __init__(self, root_helper, state_path, vif_driver, vip_plug_callback): + self.root_helper = root_helper + self.state_path = state_path + self.vif_driver = vif_driver + self.vip_plug_callback = vip_plug_callback + self.pool_to_port_id = {} + + def create(self, logical_config): + pool_id = logical_config['pool']['id'] + namespace = get_ns_name(pool_id) + + self._plug(namespace, logical_config['vip']['port']) + self._spawn(logical_config) + + def update(self, logical_config): + pool_id = logical_config['pool']['id'] + pid_path = self._get_state_file_path(pool_id, 'pid') + + extra_args = ['-sf'] + extra_args.extend(p.strip() for p in open(pid_path, 'r')) + self._spawn(logical_config, extra_args) + + def _spawn(self, logical_config, extra_cmd_args=()): + pool_id = logical_config['pool']['id'] + namespace = get_ns_name(pool_id) + conf_path = self._get_state_file_path(pool_id, 'conf') + pid_path = self._get_state_file_path(pool_id, 'pid') + sock_path = self._get_state_file_path(pool_id, 'sock') + + hacfg.save_config(conf_path, logical_config, sock_path) + cmd = ['haproxy', '-f', conf_path, '-p', pid_path] + cmd.extend(extra_cmd_args) + + ns = ip_lib.IPWrapper(self.root_helper, namespace) + ns.netns.execute(cmd) + + # remember the pool<>port mapping + self.pool_to_port_id[pool_id] = logical_config['vip']['port']['id'] + + def destroy(self, pool_id): + namespace = get_ns_name(pool_id) + ns = ip_lib.IPWrapper(self.root_helper, namespace) + pid_path = self._get_state_file_path(pool_id, 'pid') + sock_path = self._get_state_file_path(pool_id, 'sock') + + # kill the process + kill_pids_in_file(ns, pid_path) + + # unplug the ports + if pool_id in self.pool_to_port_id: + self._unplug(namespace, self.pool_to_port_id[pool_id]) + + # remove the configuration directory + conf_dir = os.path.dirname(self._get_state_file_path(pool_id, '')) + if os.path.isdir(conf_dir): + shutil.rmtree(conf_dir) + ns.garbage_collect_namespace() + + def exists(self, pool_id): + namespace = get_ns_name(pool_id) + root_ns = ip_lib.IPWrapper(self.root_helper) + + socket_path = self._get_state_file_path(pool_id, 'sock') + if root_ns.netns.exists(namespace) and os.path.exists(socket_path): + try: + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(socket_path) + return True + except socket.error: + pass + return False + + def get_stats(self, pool_id): + pass + + def remove_orphans(self, known_pool_ids): + raise NotImplementedError() + + def _get_state_file_path(self, pool_id, kind, ensure_state_dir=True): + """Returns the file name for a given kind of config file.""" + confs_dir = os.path.abspath(os.path.normpath(self.state_path)) + conf_dir = os.path.join(confs_dir, pool_id) + if ensure_state_dir: + if not os.path.isdir(conf_dir): + os.makedirs(conf_dir, 0755) + return os.path.join(conf_dir, kind) + + def _plug(self, namespace, port, reuse_existing=True): + self.vip_plug_callback('plug', port) + interface_name = self.vif_driver.get_device_name(Wrap(port)) + + if ip_lib.device_exists(interface_name, self.root_helper, namespace): + if not reuse_existing: + raise exceptions.PreexistingDeviceFailure( + dev_name=interface_name + ) + else: + self.vif_driver.plug( + port['network_id'], + port['id'], + interface_name, + port['mac_address'], + namespace=namespace + ) + + cidrs = [ + '%s/%s' % (ip['ip_address'], + netaddr.IPNetwork(ip['subnet']['cidr']).prefixlen) + for ip in port['fixed_ips'] + ] + self.vif_driver.init_l3(interface_name, cidrs, namespace=namespace) + + def _unplug(self, namespace, port_id): + port_stub = {'id': port_id} + self.vip_plug_callback('unplug', port_stub) + interface_name = self.vif_driver.get_device_name(Wrap(port_stub)) + self.vif_driver.unplug(interface_name, namespace=namespace) + + +# NOTE (markmcclain) For compliance with interface.py which expects objects +class Wrap(object): + """A light attribute wrapper for compatibility with the interface lib.""" + def __init__(self, d): + self.__dict__.update(d) + + def __getitem__(self, key): + return self.__dict__[key] + + +def get_ns_name(namespace_id): + return NS_PREFIX + namespace_id + + +def kill_pids_in_file(namespace_wrapper, pid_path): + if os.path.exists(pid_path): + with open(pid_path, 'r') as pids: + for pid in pids: + pid = pid.strip() + try: + namespace_wrapper.netns.execute( + ['kill', '-9', pid.strip()] + ) + except RuntimeError: + LOG.exception( + _('Unable to kill haproxy process: %s'), + pid + ) diff --git a/quantum/plugins/services/agent_loadbalancer/plugin.py b/quantum/plugins/services/agent_loadbalancer/plugin.py new file mode 100644 index 0000000000..3b7b48ec77 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/plugin.py @@ -0,0 +1,338 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from oslo.config import cfg + +from quantum.common import exceptions as q_exc +from quantum.common import rpc as q_rpc +from quantum.common import topics +from quantum.db import api as qdbapi +from quantum.db.loadbalancer import loadbalancer_db +from quantum.openstack.common import log as logging +from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import proxy +from quantum.plugins.common import constants + +LOG = logging.getLogger(__name__) + +ACTIVE_PENDING = ( + constants.ACTIVE, + constants.PENDING_CREATE, + constants.PENDING_UPDATE +) + + +class LoadBalancerCallbacks(object): + RPC_API_VERSION = '1.0' + + def __init__(self, plugin): + self.plugin = plugin + + def create_rpc_dispatcher(self): + return q_rpc.PluginRpcDispatcher([self]) + + def get_ready_devices(self, context, host=None): + with context.session.begin(subtransactions=True): + qry = context.session.query( + loadbalancer_db.Vip, loadbalancer_db.Pool + ) + qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING)) + qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING)) + up = True # makes pep8 and sqlalchemy happy + qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up) + qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up) + return [p.id for v, p in qry.all()] + + def get_logical_device(self, context, pool_id=None, activate=True, + **kwargs): + with context.session.begin(subtransactions=True): + qry = context.session.query(loadbalancer_db.Pool) + qry = qry.filter_by(id=pool_id) + pool = qry.one() + + if activate: + # set all resources to active + if pool.status in ACTIVE_PENDING: + pool.status = constants.ACTIVE + + if pool.vip.status in ACTIVE_PENDING: + pool.vip.status = constants.ACTIVE + + for m in pool.members: + if m.status in ACTIVE_PENDING: + m.status = constants.ACTIVE + + for hm in pool.monitors: + if hm.monitor.status in ACTIVE_PENDING: + hm.monitor.status = constants.ACTIVE + + if (pool.status != constants.ACTIVE + or pool.vip.status != constants.ACTIVE): + raise Exception(_('Expected active pool and vip')) + + retval = {} + retval['pool'] = self.plugin._make_pool_dict(pool) + retval['vip'] = self.plugin._make_vip_dict(pool.vip) + retval['vip']['port'] = ( + self.plugin._core_plugin._make_port_dict(pool.vip.port) + ) + for fixed_ip in retval['vip']['port']['fixed_ips']: + fixed_ip['subnet'] = ( + self.plugin._core_plugin.get_subnet( + context, + fixed_ip['subnet_id'] + ) + ) + retval['members'] = [ + self.plugin._make_member_dict(m) + for m in pool.members if m.status == constants.ACTIVE + ] + retval['healthmonitors'] = [ + self.plugin._make_health_monitor_dict(hm.monitor) + for hm in pool.monitors + if hm.monitor.status == constants.ACTIVE + ] + + return retval + + def pool_destroyed(self, context, pool_id=None, host=None): + """Agent confirmation hook that a pool has been destroyed. + + This method exists for subclasses to change the deletion + behavior. + """ + pass + + def plug_vip_port(self, context, port_id=None, host=None): + if not port_id: + return + + port = self.plugin._core_plugin.get_port( + context, + port_id + ) + + port['admin_state_up'] = True + port['device_owner'] = 'quantum:' + constants.LOADBALANCER + port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host))) + + self.plugin._core_plugin.update_port( + context, + port_id, + {'port': port} + ) + + def unplug_vip_port(self, context, port_id=None, host=None): + if not port_id: + return + + port = self.plugin._core_plugin.get_port( + context, + port_id + ) + + port['admin_state_up'] = False + port['device_owner'] = '' + port['device_id'] = '' + + try: + self.plugin._core_plugin.update_port( + context, + port_id, + {'port': port} + ) + + except q_exc.PortNotFound: + msg = _('Unable to find port %s to unplug. This can occur when ' + 'the Vip has been deleted first.') + LOG.debug(msg, port_id) + + def update_pool_stats(self, context, pool_id=None, stats=None, host=None): + # TODO (markmcclain): add stats collection + pass + + +class LoadBalancerAgentApi(proxy.RpcProxy): + """Plugin side of plugin to agent RPC API.""" + + API_VERSION = '1.0' + + def __init__(self, topic, host): + super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION) + self.host = host + + def reload_pool(self, context, pool_id): + return self.cast( + context, + self.make_msg('reload_pool', pool_id=pool_id, host=self.host), + topic=self.topic + ) + + def destroy_pool(self, context, pool_id): + return self.cast( + context, + self.make_msg('destroy_pool', pool_id=pool_id, host=self.host), + topic=self.topic + ) + + def modify_pool(self, context, pool_id): + return self.cast( + context, + self.make_msg('modify_pool', pool_id=pool_id, host=self.host), + topic=self.topic + ) + + +class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): + + """ + Implementation of the Quantum Loadbalancer Service Plugin. + + This class manages the workflow of LBaaS request/response. + Most DB related works are implemented in class + loadbalancer_db.LoadBalancerPluginDb. + """ + supported_extension_aliases = ["lbaas"] + + def __init__(self): + """ + Do the initialization for the loadbalancer service plugin here. + """ + qdbapi.register_models() + + self.callbacks = LoadBalancerCallbacks(self) + + self.conn = rpc.create_connection(new=True) + self.conn.create_consumer( + topics.LOADBALANCER_PLUGIN, + self.callbacks.create_rpc_dispatcher(), + fanout=False) + self.conn.consume_in_thread() + + self.agent_rpc = LoadBalancerAgentApi( + topics.LOADBALANCER_AGENT, + cfg.CONF.host + ) + + def get_plugin_type(self): + return constants.LOADBALANCER + + def get_plugin_description(self): + return "Quantum LoadBalancer Service Plugin" + + def create_vip(self, context, vip): + vip['vip']['status'] = constants.PENDING_CREATE + v = super(LoadBalancerPlugin, self).create_vip(context, vip) + self.agent_rpc.reload_pool(context, v['pool_id']) + return v + + def update_vip(self, context, id, vip): + if 'status' not in vip['vip']: + vip['vip']['status'] = constants.PENDING_UPDATE + v = super(LoadBalancerPlugin, self).update_vip(context, id, vip) + if v['status'] in ACTIVE_PENDING: + self.agent_rpc.reload_pool(context, v['pool_id']) + else: + self.agent_rpc.destroy_pool(context, v['pool_id']) + return v + + def delete_vip(self, context, id): + vip = self.get_vip(context, id) + super(LoadBalancerPlugin, self).delete_vip(context, id) + self.agent_rpc.destroy_pool(context, vip['pool_id']) + pass + + def create_pool(self, context, pool): + p = super(LoadBalancerPlugin, self).create_pool(context, pool) + # don't notify here because a pool needs a vip to be useful + return p + + def update_pool(self, context, id, pool): + if 'status' not in pool['pool']: + pool['pool']['status'] = constants.PENDING_UPDATE + p = super(LoadBalancerPlugin, self).update_pool(context, id, pool) + if p['status'] in ACTIVE_PENDING: + self.agent_rpc.reload_pool(context, p['id']) + else: + self.agent_rpc.destroy_pool(context, p['id']) + return p + + def delete_pool(self, context, id): + super(LoadBalancerPlugin, self).delete_pool(context, id) + self.agent_rpc.destroy_pool(context, id) + + def create_member(self, context, member): + m = super(LoadBalancerPlugin, self).create_member(context, member) + self.agent_rpc.modify_pool(context, m['pool_id']) + return m + + def update_member(self, context, id, member): + if 'status' not in member['member']: + member['member']['status'] = constants.PENDING_UPDATE + m = super(LoadBalancerPlugin, self).update_member(context, id, member) + self.agent_rpc.modify_pool(context, m['pool_id']) + return m + + def delete_member(self, context, id): + m = self.get_member(context, id) + super(LoadBalancerPlugin, self).delete_member(context, id) + self.agent_rpc.modify_pool(context, m['pool_id']) + + def update_health_monitor(self, context, id, health_monitor): + if 'status' not in health_monitor['health_monitor']: + health_monitor['health_monitor']['status'] = ( + constants.PENDING_UPDATE + ) + hm = super(LoadBalancerPlugin, self).update_health_monitor( + context, + id, + health_monitor + ) + + with context.session.begin(subtransactions=True): + qry = context.session.query( + loadbalancer_db.PoolMonitorAssociation + ) + qry = qry.filter_by(monitor_id=hm['id']) + + for assoc in qry.all(): + self.agent_rpc.modify_pool(context, assoc['pool_id']) + return hm + + def delete_health_monitor(self, context, id): + with context.session.begin(subtransactions=True): + qry = context.session.query( + loadbalancer_db.PoolMonitorAssociation + ) + qry = qry.filter_by(monitor_id=id) + + pool_ids = [a['pool_id'] for a in qry.all()] + super(LoadBalancerPlugin, self).delete_health_monitor(context, id) + for pid in pool_ids: + self.agent_rpc.modify_pool(context, pid) + + def create_pool_health_monitor(self, context, health_monitor, pool_id): + retval = super(LoadBalancerPlugin, self).create_pool_health_monitor( + context, + health_monitor, + pool_id + ) + self.agent_rpc.modify_pool(context, pool_id) + + return retval diff --git a/quantum/plugins/services/loadbalancer/loadbalancerPlugin.py b/quantum/plugins/services/loadbalancer/loadbalancerPlugin.py deleted file mode 100644 index 27a4377ff5..0000000000 --- a/quantum/plugins/services/loadbalancer/loadbalancerPlugin.py +++ /dev/null @@ -1,252 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 OpenStack LLC. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -from quantum.db import api as qdbapi -from quantum.db import model_base -from quantum.db.loadbalancer import loadbalancer_db -from quantum.extensions import loadbalancer -from quantum.openstack.common import log as logging -from quantum.plugins.common import constants - -LOG = logging.getLogger(__name__) - - -class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): - - """ - Implementation of the Quantum Loadbalancer Service Plugin. - - This class manages the workflow of LBaaS request/response. - Most DB related works are implemented in class - loadbalancer_db.LoadBalancerPluginDb. - """ - supported_extension_aliases = ["lbaas"] - - def __init__(self): - """ - Do the initialization for the loadbalancer service plugin here. - """ - qdbapi.register_models(base=model_base.BASEV2) - - # TODO: we probably need to setup RPC channel (to talk to LbAgent) here - - def get_plugin_type(self): - return constants.LOADBALANCER - - def get_plugin_description(self): - return "Quantum LoadBalancer Service Plugin" - - def create_vip(self, context, vip): - v = super(LoadBalancerPlugin, self).create_vip(context, vip) - self.update_status(context, loadbalancer_db.Vip, v['id'], - constants.PENDING_CREATE) - LOG.debug(_("Create vip: %s"), v['id']) - - # If we adopt asynchronous mode, this method should return immediately - # and let client to query the object status. The plugin will listen on - # the event from device and update the object status by calling - # self.update_state(context, Vip, id, ACTIVE/ERROR) - # - # In synchronous mode, send the request to device here and wait for - # response. Eventually update the object status prior to the return. - v_query = self.get_vip(context, v['id']) - return v_query - - def update_vip(self, context, id, vip): - v_query = self.get_vip( - context, id, fields=["status"]) - if v_query['status'] in [ - constants.PENDING_DELETE, constants.ERROR]: - raise loadbalancer.StateInvalid(id=id, - state=v_query['status']) - - v = super(LoadBalancerPlugin, self).update_vip(context, id, vip) - self.update_status(context, loadbalancer_db.Vip, id, - constants.PENDING_UPDATE) - LOG.debug(_("Update vip: %s"), id) - - # TODO notify lbagent - v_rt = self.get_vip(context, id) - return v_rt - - def delete_vip(self, context, id): - self.update_status(context, loadbalancer_db.Vip, id, - constants.PENDING_DELETE) - LOG.debug(_("Delete vip: %s"), id) - - # TODO notify lbagent - super(LoadBalancerPlugin, self).delete_vip(context, id) - - def get_vip(self, context, id, fields=None): - res = super(LoadBalancerPlugin, self).get_vip(context, id, fields) - LOG.debug(_("Get vip: %s"), id) - return res - - def get_vips(self, context, filters=None, fields=None): - res = super(LoadBalancerPlugin, self).get_vips( - context, filters, fields) - LOG.debug(_("Get vips")) - return res - - def create_pool(self, context, pool): - p = super(LoadBalancerPlugin, self).create_pool(context, pool) - self.update_status(context, loadbalancer_db.Pool, p['id'], - constants.PENDING_CREATE) - LOG.debug(_("Create pool: %s"), p['id']) - - # TODO notify lbagent - p_rt = self.get_pool(context, p['id']) - return p_rt - - def update_pool(self, context, id, pool): - p_query = self.get_pool(context, id, fields=["status"]) - if p_query['status'] in [ - constants.PENDING_DELETE, constants.ERROR]: - raise loadbalancer.StateInvalid(id=id, - state=p_query['status']) - p = super(LoadBalancerPlugin, self).update_pool(context, id, pool) - LOG.debug(_("Update pool: %s"), p['id']) - # TODO notify lbagent - p_rt = self.get_pool(context, id) - return p_rt - - def delete_pool(self, context, id): - self.update_status(context, loadbalancer_db.Pool, id, - constants.PENDING_DELETE) - # TODO notify lbagent - super(LoadBalancerPlugin, self).delete_pool(context, id) - LOG.debug(_("Delete pool: %s"), id) - - def get_pool(self, context, id, fields=None): - res = super(LoadBalancerPlugin, self).get_pool(context, id, fields) - LOG.debug(_("Get pool: %s"), id) - return res - - def get_pools(self, context, filters=None, fields=None): - res = super(LoadBalancerPlugin, self).get_pools( - context, filters, fields) - LOG.debug(_("Get Pools")) - return res - - def stats(self, context, pool_id): - res = super(LoadBalancerPlugin, self).get_stats(context, pool_id) - LOG.debug(_("Get stats of Pool: %s"), pool_id) - return res - - def create_pool_health_monitor(self, context, health_monitor, pool_id): - m = super(LoadBalancerPlugin, self).create_pool_health_monitor( - context, health_monitor, pool_id) - LOG.debug(_("Create health_monitor of pool: %s"), pool_id) - return m - - def get_pool_health_monitor(self, context, id, pool_id, fields=None): - m = super(LoadBalancerPlugin, self).get_pool_health_monitor( - context, id, pool_id, fields) - LOG.debug(_("Get health_monitor of pool: %s"), pool_id) - return m - - def delete_pool_health_monitor(self, context, id, pool_id): - super(LoadBalancerPlugin, self).delete_pool_health_monitor( - context, id, pool_id) - LOG.debug(_("Delete health_monitor %(id)s of pool: %(pool_id)s"), - {"id": id, "pool_id": pool_id}) - - def get_member(self, context, id, fields=None): - res = super(LoadBalancerPlugin, self).get_member( - context, id, fields) - LOG.debug(_("Get member: %s"), id) - return res - - def get_members(self, context, filters=None, fields=None): - res = super(LoadBalancerPlugin, self).get_members( - context, filters, fields) - LOG.debug(_("Get members")) - return res - - def create_member(self, context, member): - m = super(LoadBalancerPlugin, self).create_member(context, member) - self.update_status(context, loadbalancer_db.Member, m['id'], - constants.PENDING_CREATE) - LOG.debug(_("Create member: %s"), m['id']) - # TODO notify lbagent - m_rt = self.get_member(context, m['id']) - return m_rt - - def update_member(self, context, id, member): - m_query = self.get_member(context, id, fields=["status"]) - if m_query['status'] in [ - constants.PENDING_DELETE, constants.ERROR]: - raise loadbalancer.StateInvalid(id=id, - state=m_query['status']) - m = super(LoadBalancerPlugin, self).update_member(context, id, member) - self.update_status(context, loadbalancer_db.Member, id, - constants.PENDING_UPDATE) - LOG.debug(_("Update member: %s"), m['id']) - # TODO notify lbagent - m_rt = self.get_member(context, id) - return m_rt - - def delete_member(self, context, id): - self.update_status(context, loadbalancer_db.Member, id, - constants.PENDING_DELETE) - LOG.debug(_("Delete member: %s"), id) - # TODO notify lbagent - super(LoadBalancerPlugin, self).delete_member(context, id) - - def get_health_monitor(self, context, id, fields=None): - res = super(LoadBalancerPlugin, self).get_health_monitor( - context, id, fields) - LOG.debug(_("Get health_monitor: %s"), id) - return res - - def get_health_monitors(self, context, filters=None, fields=None): - res = super(LoadBalancerPlugin, self).get_health_monitors( - context, filters, fields) - LOG.debug(_("Get health_monitors")) - return res - - def create_health_monitor(self, context, health_monitor): - h = super(LoadBalancerPlugin, self).create_health_monitor( - context, health_monitor) - self.update_status(context, loadbalancer_db.HealthMonitor, h['id'], - constants.PENDING_CREATE) - LOG.debug(_("Create health_monitor: %s"), h['id']) - # TODO notify lbagent - h_rt = self.get_health_monitor(context, h['id']) - return h_rt - - def update_health_monitor(self, context, id, health_monitor): - h_query = self.get_health_monitor(context, id, fields=["status"]) - if h_query['status'] in [ - constants.PENDING_DELETE, constants.ERROR]: - raise loadbalancer.StateInvalid(id=id, - state=h_query['status']) - h = super(LoadBalancerPlugin, self).update_health_monitor( - context, id, health_monitor) - self.update_status(context, loadbalancer_db.HealthMonitor, id, - constants.PENDING_UPDATE) - LOG.debug(_("Update health_monitor: %s"), h['id']) - # TODO notify lbagent - h_rt = self.get_health_monitor(context, id) - return h_rt - - def delete_health_monitor(self, context, id): - self.update_status(context, loadbalancer_db.HealthMonitor, id, - constants.PENDING_DELETE) - LOG.debug(_("Delete health_monitor: %s"), id) - super(LoadBalancerPlugin, self).delete_health_monitor(context, id) diff --git a/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py b/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py index f866377f50..a59992841a 100644 --- a/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py +++ b/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py @@ -34,7 +34,9 @@ import quantum.extensions from quantum.extensions import loadbalancer from quantum.manager import QuantumManager from quantum.plugins.common import constants -from quantum.plugins.services.loadbalancer import loadbalancerPlugin +from quantum.plugins.services.agent_loadbalancer import ( + plugin as loadbalancer_plugin +) from quantum.tests.unit import test_db_plugin from quantum.tests.unit import test_extensions from quantum.tests.unit import testlib_api @@ -46,8 +48,7 @@ LOG = logging.getLogger(__name__) DB_CORE_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2' DB_LB_PLUGIN_KLASS = ( - "quantum.plugins.services.loadbalancer." - "loadbalancerPlugin.LoadBalancerPlugin" + "quantum.plugins.services.agent_loadbalancer.plugin.LoadBalancerPlugin" ) ROOTDIR = os.path.dirname(__file__) + '../../../..' ETCDIR = os.path.join(ROOTDIR, 'etc') @@ -74,7 +75,7 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.QuantumDbPluginV2TestCase): self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14" - plugin = loadbalancerPlugin.LoadBalancerPlugin() + plugin = loadbalancer_plugin.LoadBalancerPlugin() ext_mgr = PluginAwareExtensionManager( extensions_path, {constants.LOADBALANCER: plugin} diff --git a/quantum/tests/unit/services/__init__.py b/quantum/tests/unit/services/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/tests/unit/services/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/tests/unit/services/agent_loadbalancer/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py b/quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py new file mode 100644 index 0000000000..2d5a2ee11f --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py @@ -0,0 +1,135 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import mock +import testtools + +from quantum.plugins.services.agent_loadbalancer.agent import api + + +class TestApiCache(testtools.TestCase): + def setUp(self): + super(TestApiCache, self).setUp() + self.addCleanup(mock.patch.stopall) + + self.api = api.LbaasAgentApi('topic', mock.sentinel.context, 'host') + self.make_msg = mock.patch.object(self.api, 'make_msg').start() + self.mock_call = mock.patch.object(self.api, 'call').start() + + def test_init(self): + self.assertEqual(self.api.host, 'host') + self.assertEqual(self.api.context, mock.sentinel.context) + + def test_get_ready_devices(self): + self.assertEqual( + self.api.get_ready_devices(), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with('get_ready_devices', host='host') + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) + + def test_get_logical_device(self): + self.assertEqual( + self.api.get_logical_device('pool_id'), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with( + 'get_logical_device', + pool_id='pool_id', + host='host') + + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) + + def test_pool_destroyed(self): + self.assertEqual( + self.api.pool_destroyed('pool_id'), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with( + 'pool_destroyed', + pool_id='pool_id', + host='host') + + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) + + def test_plug_vip_port(self): + self.assertEqual( + self.api.plug_vip_port('port_id'), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with( + 'plug_vip_port', + port_id='port_id', + host='host') + + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) + + def test_unplug_vip_port(self): + self.assertEqual( + self.api.unplug_vip_port('port_id'), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with( + 'unplug_vip_port', + port_id='port_id', + host='host') + + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) + + def test_update_pool_stats(self): + self.assertEqual( + self.api.update_pool_stats('pool_id', {'stat': 'stat'}), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with( + 'update_pool_stats', + pool_id='pool_id', + stats={'stat': 'stat'}, + host='host') + + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py b/quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py new file mode 100644 index 0000000000..f2e9f22808 --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py @@ -0,0 +1,55 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import contextlib +import mock +from oslo.config import cfg +import testtools + +from quantum.plugins.services.agent_loadbalancer import agent + + +class TestLbaasService(testtools.TestCase): + def setUp(self): + super(TestLbaasService, self).setUp() + self.addCleanup(cfg.CONF.reset) + + cfg.CONF.register_opts(agent.OPTS) + + def test_start(self): + with mock.patch.object( + agent.rpc_service.Service, 'start' + ) as mock_start: + + mgr = mock.Mock() + agent_service = agent.LbaasAgentService('host', 'topic', mgr) + agent_service.start() + + self.assertTrue(mock_start.called) + + def test_main(self): + with contextlib.nested( + mock.patch.object(agent.service, 'launch'), + mock.patch.object(agent, 'eventlet'), + mock.patch('sys.argv'), + mock.patch.object(agent.manager, 'LbaasAgentManager') + ) as (mock_launch, mock_eventlet, sys_argv, mgr_cls): + agent.main() + + self.assertTrue(mock_eventlet.monkey_patch.called) + mock_launch.assert_called_once_with(mock.ANY) diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py b/quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py new file mode 100644 index 0000000000..b025809c05 --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py @@ -0,0 +1,365 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import contextlib + +import mock +import testtools + +from quantum.plugins.services.agent_loadbalancer.agent import manager + + +class TestLogicalDeviceCache(testtools.TestCase): + def setUp(self): + super(TestLogicalDeviceCache, self).setUp() + self.cache = manager.LogicalDeviceCache() + + def test_put(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + self.assertEqual(len(self.cache.devices), 1) + self.assertEqual(len(self.cache.port_lookup), 1) + self.assertEqual(len(self.cache.pool_lookup), 1) + + def test_double_put(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + self.cache.put(fake_device) + + self.assertEqual(len(self.cache.devices), 1) + self.assertEqual(len(self.cache.port_lookup), 1) + self.assertEqual(len(self.cache.pool_lookup), 1) + + def test_remove_in_cache(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + self.assertEqual(len(self.cache.devices), 1) + + self.cache.remove(fake_device) + + self.assertFalse(len(self.cache.devices)) + self.assertFalse(self.cache.port_lookup) + self.assertFalse(self.cache.pool_lookup) + + def test_remove_in_cache_same_object(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + self.assertEqual(len(self.cache.devices), 1) + + self.cache.remove(set(self.cache.devices).pop()) + + self.assertFalse(len(self.cache.devices)) + self.assertFalse(self.cache.port_lookup) + self.assertFalse(self.cache.pool_lookup) + + def test_remove_by_pool_id(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + self.assertEqual(len(self.cache.devices), 1) + + self.cache.remove_by_pool_id('pool_id') + + self.assertFalse(len(self.cache.devices)) + self.assertFalse(self.cache.port_lookup) + self.assertFalse(self.cache.pool_lookup) + + def test_get_by_pool_id(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + dev = self.cache.get_by_pool_id('pool_id') + + self.assertEqual(dev.pool_id, 'pool_id') + self.assertEqual(dev.port_id, 'port_id') + + def test_get_by_port_id(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + dev = self.cache.get_by_port_id('port_id') + + self.assertEqual(dev.pool_id, 'pool_id') + self.assertEqual(dev.port_id, 'port_id') + + def test_get_pool_ids(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + self.assertEqual(self.cache.get_pool_ids(), ['pool_id']) + + +class TestManager(testtools.TestCase): + def setUp(self): + super(TestManager, self).setUp() + self.addCleanup(mock.patch.stopall) + + mock_conf = mock.Mock() + mock_conf.interface_driver = 'intdriver' + mock_conf.device_driver = 'devdriver' + mock_conf.AGENT.root_helper = 'sudo' + mock_conf.loadbalancer_state_path = '/the/path' + + self.mock_importer = mock.patch.object(manager, 'importutils').start() + + rpc_mock_cls = mock.patch( + 'quantum.plugins.services.agent_loadbalancer.agent.api' + '.LbaasAgentApi' + ).start() + + self.mgr = manager.LbaasAgentManager(mock_conf) + self.rpc_mock = rpc_mock_cls.return_value + self.log = mock.patch.object(manager, 'LOG').start() + self.mgr.needs_resync = False + + def test_initialize_service_hook(self): + with mock.patch.object(self.mgr, 'sync_state') as sync: + self.mgr.initialize_service_hook(mock.Mock()) + sync.assert_called_once_with() + + def test_periodic_resync_needs_sync(self): + with mock.patch.object(self.mgr, 'sync_state') as sync: + self.mgr.needs_resync = True + self.mgr.periodic_resync(mock.Mock()) + sync.assert_called_once_with() + + def test_periodic_resync_no_sync(self): + with mock.patch.object(self.mgr, 'sync_state') as sync: + self.mgr.needs_resync = False + self.mgr.periodic_resync(mock.Mock()) + self.assertFalse(sync.called) + + def test_collect_stats(self): + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_pool_ids.return_value = ['1', '2'] + self.mgr.collect_stats(mock.Mock()) + self.rpc_mock.update_pool_stats.assert_has_calls([ + mock.call('1', mock.ANY), + mock.call('2', mock.ANY) + ]) + + def test_collect_stats_exception(self): + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_pool_ids.return_value = ['1', '2'] + with mock.patch.object(self.mgr, 'driver') as driver: + driver.get_stats.side_effect = Exception + + self.mgr.collect_stats(mock.Mock()) + + self.assertFalse(self.rpc_mock.called) + self.assertTrue(self.mgr.needs_resync) + self.assertTrue(self.log.exception.called) + + def test_vip_plug_callback(self): + self.mgr._vip_plug_callback('plug', {'id': 'id'}) + self.rpc_mock.plug_vip_port.assert_called_once_with('id') + + def test_vip_unplug_callback(self): + self.mgr._vip_plug_callback('unplug', {'id': 'id'}) + self.rpc_mock.unplug_vip_port.assert_called_once_with('id') + + def _sync_state_helper(self, cache, ready, refreshed, destroyed): + with contextlib.nested( + mock.patch.object(self.mgr, 'cache'), + mock.patch.object(self.mgr, 'refresh_device'), + mock.patch.object(self.mgr, 'destroy_device') + ) as (mock_cache, refresh, destroy): + + mock_cache.get_pool_ids.return_value = cache + self.rpc_mock.get_ready_devices.return_value = ready + + self.mgr.sync_state() + + self.assertEqual(len(refreshed), len(refresh.mock_calls)) + self.assertEqual(len(destroyed), len(destroy.mock_calls)) + + refresh.assert_has_calls([mock.call(i) for i in refreshed]) + destroy.assert_has_calls([mock.call(i) for i in destroyed]) + self.assertFalse(self.mgr.needs_resync) + + def test_sync_state_all_known(self): + self._sync_state_helper(['1', '2'], ['1', '2'], ['1', '2'], []) + + def test_sync_state_all_unknown(self): + self._sync_state_helper([], ['1', '2'], ['1', '2'], []) + + def test_sync_state_destroy_all(self): + self._sync_state_helper(['1', '2'], [], [], ['1', '2']) + + def test_sync_state_both(self): + self._sync_state_helper(['1'], ['2'], ['2'], ['1']) + + def test_sync_state_exception(self): + self.rpc_mock.get_ready_devices.side_effect = Exception + + self.mgr.sync_state() + + self.assertTrue(self.log.exception.called) + self.assertTrue(self.mgr.needs_resync) + + def test_refresh_device_exists(self): + config = self.rpc_mock.get_logical_device.return_value + + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + driver.exists.return_value = True + + self.mgr.refresh_device(config) + + driver.exists.assert_called_once_with(config) + driver.update.assert_called_once_with(config) + cache.put.assert_called_once_with(config) + self.assertFalse(self.mgr.needs_resync) + + def test_refresh_device_new(self): + config = self.rpc_mock.get_logical_device.return_value + + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + driver.exists.return_value = False + + self.mgr.refresh_device(config) + + driver.exists.assert_called_once_with(config) + driver.create.assert_called_once_with(config) + cache.put.assert_called_once_with(config) + self.assertFalse(self.mgr.needs_resync) + + def test_refresh_device_exception(self): + config = self.rpc_mock.get_logical_device.return_value + + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + driver.exists.side_effect = Exception + self.mgr.refresh_device(config) + + driver.exists.assert_called_once_with(config) + self.assertTrue(self.mgr.needs_resync) + self.assertTrue(self.log.exception.called) + self.assertFalse(cache.put.called) + + def test_destroy_device_known(self): + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = True + + self.mgr.destroy_device('pool_id') + cache.get_by_pool_id.assert_called_once_with('pool_id') + driver.destroy.assert_called_once_with('pool_id') + self.rpc_mock.pool_destroyed.assert_called_once_with( + 'pool_id' + ) + cache.remove.assert_called_once_with(True) + self.assertFalse(self.mgr.needs_resync) + + def test_destroy_device_unknown(self): + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = None + + self.mgr.destroy_device('pool_id') + cache.get_by_pool_id.assert_called_once_with('pool_id') + self.assertFalse(driver.destroy.called) + + def test_destroy_device_exception(self): + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = True + driver.destroy.side_effect = Exception + + self.mgr.destroy_device('pool_id') + cache.get_by_pool_id.assert_called_once_with('pool_id') + + self.assertTrue(self.log.exception.called) + self.assertTrue(self.mgr.needs_resync) + + def test_remove_orphans(self): + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_pool_ids.return_value = ['1', '2'] + self.mgr.remove_orphans() + + driver.remove_orphans.assert_called_once_with(['1', '2']) + + def test_reload_pool(self): + with mock.patch.object(self.mgr, 'refresh_device') as refresh: + self.mgr.reload_pool(mock.Mock(), pool_id='pool_id') + refresh.assert_called_once_with('pool_id') + + def test_modify_pool_known(self): + with mock.patch.object(self.mgr, 'refresh_device') as refresh: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = True + + self.mgr.reload_pool(mock.Mock(), pool_id='pool_id') + + refresh.assert_called_once_with('pool_id') + + def test_modify_pool_unknown(self): + with mock.patch.object(self.mgr, 'refresh_device') as refresh: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = False + + self.mgr.modify_pool(mock.Mock(), pool_id='pool_id') + + self.assertFalse(refresh.called) + + def test_destroy_pool_known(self): + with mock.patch.object(self.mgr, 'destroy_device') as destroy: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = True + + self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id') + + destroy.assert_called_once_with('pool_id') + + def test_destroy_pool_unknown(self): + with mock.patch.object(self.mgr, 'destroy_device') as destroy: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = False + + self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id') + + self.assertFalse(destroy.called) diff --git a/quantum/tests/unit/services/agent_loadbalancer/driver/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/driver/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/driver/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_namespace_driver.py b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_namespace_driver.py new file mode 100644 index 0000000000..16c7e3593d --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_namespace_driver.py @@ -0,0 +1,131 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import contextlib +import mock +import testtools + +from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import ( + namespace_driver +) + + +class TestHaproxyNSDriver(testtools.TestCase): + def setUp(self): + super(TestHaproxyNSDriver, self).setUp() + + self.vif_driver = mock.Mock() + self.vip_plug_callback = mock.Mock() + + self.driver = namespace_driver.HaproxyNSDriver( + 'sudo', + '/the/path', + self.vif_driver, + self.vip_plug_callback + ) + + self.fake_config = { + 'pool': {'id': 'pool_id'}, + 'vip': {'id': 'vip_id', 'port': {'id': 'port_id'}} + } + + def test_create(self): + with mock.patch.object(self.driver, '_plug') as plug: + with mock.patch.object(self.driver, '_spawn') as spawn: + self.driver.create(self.fake_config) + + plug.assert_called_once_with( + 'qlbaas-pool_id', {'id': 'port_id'} + ) + spawn.assert_called_once_with(self.fake_config) + + def test_update(self): + with contextlib.nested( + mock.patch.object(self.driver, '_get_state_file_path'), + mock.patch.object(self.driver, '_spawn'), + mock.patch('__builtin__.open') + ) as (gsp, spawn, mock_open): + mock_open.return_value = ['5'] + + self.driver.update(self.fake_config) + + mock_open.assert_called_once_with(gsp.return_value, 'r') + spawn.assert_called_once_with(self.fake_config, ['-sf', '5']) + + def test_spawn(self): + with contextlib.nested( + mock.patch.object(namespace_driver.hacfg, 'save_config'), + mock.patch.object(self.driver, '_get_state_file_path'), + mock.patch('quantum.agent.linux.ip_lib.IPWrapper') + ) as (mock_save, gsp, ip_wrap): + gsp.side_effect = lambda x, y: y + + self.driver._spawn(self.fake_config) + + mock_save.assert_called_once_with('conf', self.fake_config, 'sock') + cmd = ['haproxy', '-f', 'conf', '-p', 'pid'] + ip_wrap.assert_has_calls([ + mock.call('sudo', 'qlbaas-pool_id'), + mock.call().netns.execute(cmd) + ]) + + def test_destroy(self): + with contextlib.nested( + mock.patch.object(self.driver, '_get_state_file_path'), + mock.patch.object(namespace_driver, 'kill_pids_in_file'), + mock.patch.object(self.driver, '_unplug'), + mock.patch('quantum.agent.linux.ip_lib.IPWrapper'), + mock.patch('os.path.isdir'), + mock.patch('shutil.rmtree') + ) as (gsp, kill, unplug, ip_wrap, isdir, rmtree): + gsp.side_effect = lambda x, y: '/pool/' + y + + self.driver.pool_to_port_id['pool_id'] = 'port_id' + isdir.return_value = True + + self.driver.destroy('pool_id') + + kill.assert_called_once_with(ip_wrap(), '/pool/pid') + unplug.assert_called_once_with('qlbaas-pool_id', 'port_id') + isdir.called_once_with('/pool') + rmtree.assert_called_once_with('/pool') + ip_wrap.assert_has_calls([ + mock.call('sudo', 'qlbaas-pool_id'), + mock.call().garbage_collect_namespace() + ]) + + def test_exists(self): + with contextlib.nested( + mock.patch.object(self.driver, '_get_state_file_path'), + mock.patch('quantum.agent.linux.ip_lib.IPWrapper'), + mock.patch('socket.socket'), + mock.patch('os.path.exists'), + ) as (gsp, ip_wrap, socket, path_exists): + gsp.side_effect = lambda x, y: '/pool/' + y + + ip_wrap.return_value.netns.exists.return_value = True + path_exists.return_value = True + + self.driver.exists('pool_id') + + ip_wrap.assert_has_calls([ + mock.call('sudo'), + mock.call().netns.exists('qlbaas-pool_id') + ]) + + self.assertTrue(self.driver.exists('pool_id')) diff --git a/quantum/tests/unit/services/agent_loadbalancer/test_plugin.py b/quantum/tests/unit/services/agent_loadbalancer/test_plugin.py new file mode 100644 index 0000000000..a4289ef0cf --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/test_plugin.py @@ -0,0 +1,263 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import mock +import testtools + +from quantum import context +from quantum import manager +from quantum.plugins.common import constants +from quantum.plugins.services.agent_loadbalancer import plugin +from quantum.tests.unit.db.loadbalancer import test_db_loadbalancer + + +class TestLoadBalancerPluginBase( + test_db_loadbalancer.LoadBalancerPluginDbTestCase): + + def setUp(self): + super(TestLoadBalancerPluginBase, self).setUp() + + # create another API instance to make testing easier + # pass a mock to our API instance + + # we need access to loaded plugins to modify models + loaded_plugins = manager.QuantumManager().get_service_plugins() + self.plugin_instance = loaded_plugins[constants.LOADBALANCER] + self.callbacks = self.plugin_instance.callbacks + + +class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): + def test_get_ready_devices(self): + with self.vip() as vip: + ready = self.callbacks.get_ready_devices( + context.get_admin_context(), + ) + self.assertEqual(ready, [vip['vip']['pool_id']]) + + def test_get_ready_devices_inactive_vip(self): + with self.vip() as vip: + + # set the vip inactive need to use plugin directly since + # status is not tenant mutable + self.plugin_instance.update_vip( + context.get_admin_context(), + vip['vip']['id'], + {'vip': {'status': constants.INACTIVE}} + ) + + ready = self.callbacks.get_ready_devices( + context.get_admin_context(), + ) + self.assertFalse(ready) + + def test_get_ready_devices_inactive_pool(self): + with self.vip() as vip: + + # set the pool inactive need to use plugin directly since + # status is not tenant mutable + self.plugin_instance.update_pool( + context.get_admin_context(), + vip['vip']['pool_id'], + {'pool': {'status': constants.INACTIVE}} + ) + + ready = self.callbacks.get_ready_devices( + context.get_admin_context(), + ) + self.assertFalse(ready) + + def test_get_logical_device_inactive(self): + with self.pool() as pool: + with self.vip(pool=pool) as vip: + with self.member(pool_id=vip['vip']['pool_id']) as member: + self.assertRaises( + Exception, + self.callbacks.get_logical_device, + context.get_admin_context(), + pool['pool']['id'], + activate=False + ) + + def test_get_logical_device_activate(self): + with self.pool() as pool: + with self.vip(pool=pool) as vip: + with self.member(pool_id=vip['vip']['pool_id']) as member: + ctx = context.get_admin_context() + + # build the expected + port = self.plugin_instance._core_plugin.get_port( + ctx, vip['vip']['port_id'] + ) + subnet = self.plugin_instance._core_plugin.get_subnet( + ctx, vip['vip']['subnet_id'] + ) + port['fixed_ips'][0]['subnet'] = subnet + + # reload pool to add members and vip + pool = self.plugin_instance.get_pool( + ctx, pool['pool']['id'] + ) + + pool['status'] = constants.ACTIVE + vip['vip']['status'] = constants.ACTIVE + vip['vip']['port'] = port + member['member']['status'] = constants.ACTIVE + + expected = { + 'pool': pool, + 'vip': vip['vip'], + 'members': [member['member']], + 'healthmonitors': [] + } + + logical_config = self.callbacks.get_logical_device( + ctx, pool['id'], activate=True + ) + + self.assertEqual(logical_config, expected) + + def _update_port_test_helper(self, expected, func, **kwargs): + core = self.plugin_instance._core_plugin + + with self.pool() as pool: + with self.vip(pool=pool) as vip: + with self.member(pool_id=vip['vip']['pool_id']) as member: + ctx = context.get_admin_context() + func(ctx, port_id=vip['vip']['port_id'], **kwargs) + + db_port = core.get_port(ctx, vip['vip']['port_id']) + + for k, v in expected.iteritems(): + self.assertEqual(db_port[k], v) + + def test_plug_vip_port(self): + exp = { + 'device_owner': 'quantum:' + constants.LOADBALANCER, + 'device_id': 'c596ce11-db30-5c72-8243-15acaae8690f', + 'admin_state_up': True + } + self._update_port_test_helper( + exp, + self.callbacks.plug_vip_port, + host='host' + ) + + def test_unplug_vip_port(self): + exp = { + 'device_owner': '', + 'device_id': '', + 'admin_state_up': False + } + self._update_port_test_helper( + exp, + self.callbacks.unplug_vip_port, + host='host' + ) + + +class TestLoadBalancerAgentApi(testtools.TestCase): + def setUp(self): + super(TestLoadBalancerAgentApi, self).setUp() + self.addCleanup(mock.patch.stopall) + + self.api = plugin.LoadBalancerAgentApi('topic', 'host') + self.mock_cast = mock.patch.object(self.api, 'cast').start() + self.mock_msg = mock.patch.object(self.api, 'make_msg').start() + + def test_init(self): + self.assertEqual(self.api.topic, 'topic') + self.assertEqual(self.api.host, 'host') + + def _call_test_helper(self, method_name): + rv = getattr(self.api, method_name)(mock.sentinel.context, 'the_id') + self.assertEqual(rv, self.mock_cast.return_value) + self.mock_cast.assert_called_once_with( + mock.sentinel.context, + self.mock_msg.return_value, + topic='topic' + ) + + self.mock_msg.assert_called_once_with( + method_name, + pool_id='the_id', + host='host' + ) + + def test_reload_pool(self): + self._call_test_helper('reload_pool') + + def test_destroy_pool(self): + self._call_test_helper('destroy_pool') + + def test_modify_pool(self): + self._call_test_helper('modify_pool') + + +class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): + def setUp(self): + self.log = mock.patch.object(plugin, 'LOG') + api_cls = mock.patch.object(plugin, 'LoadBalancerAgentApi').start() + super(TestLoadBalancerPluginNotificationWrapper, self).setUp() + self.mock_api = api_cls.return_value + + self.addCleanup(mock.patch.stopall) + + def test_create_vip(self): + with self.subnet() as subnet: + with self.pool(subnet=subnet) as pool: + with self.vip(pool=pool, subnet=subnet) as vip: + self.mock_api.reload_pool.assert_called_once_with( + mock.ANY, + vip['vip']['pool_id'] + ) + + def test_update_vip(self): + with self.subnet() as subnet: + with self.pool(subnet=subnet) as pool: + with self.vip(pool=pool, subnet=subnet) as vip: + self.mock_api.reset_mock() + ctx = context.get_admin_context() + vip['vip'].pop('status') + new_vip = self.plugin_instance.update_vip( + ctx, + vip['vip']['id'], + vip + ) + + self.mock_api.reload_pool.assert_called_once_with( + mock.ANY, + vip['vip']['pool_id'] + ) + + self.assertEqual( + new_vip['status'], + constants.PENDING_UPDATE + ) + + def t2est_delete_vip(self): + with self.subnet() as subnet: + with self.pool(subnet=subnet) as pool: + with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip: + self.mock_api.reset_mock() + ctx = context.get_admin_context() + self.plugin_instance.delete_vip(context, vip['vip']['id']) + self.mock_api.destroy_pool.assert_called_once_with( + mock.ANY, + vip['vip']['pool_id'] + ) diff --git a/quantum/tests/unit/test_agent_linux_utils.py b/quantum/tests/unit/test_agent_linux_utils.py index 840d9dcb7c..b1f5c0e7fa 100644 --- a/quantum/tests/unit/test_agent_linux_utils.py +++ b/quantum/tests/unit/test_agent_linux_utils.py @@ -70,3 +70,21 @@ class AgentUtilsGetInterfaceMAC(testtools.TestCase): '\x00' * 232]) actual_val = utils.get_interface_mac('eth0') self.assertEqual(actual_val, expect_val) + + +class AgentUtilsReplaceFile(testtools.TestCase): + def test_replace_file(self): + # make file to replace + with mock.patch('tempfile.NamedTemporaryFile') as ntf: + ntf.return_value.name = '/baz' + with mock.patch('os.chmod') as chmod: + with mock.patch('os.rename') as rename: + utils.replace_file('/foo', 'bar') + + expected = [mock.call('w+', dir='/', delete=False), + mock.call().write('bar'), + mock.call().close()] + + ntf.assert_has_calls(expected) + chmod.assert_called_once_with('/baz', 0644) + rename.assert_called_once_with('/baz', '/foo') diff --git a/quantum/tests/unit/test_linux_dhcp.py b/quantum/tests/unit/test_linux_dhcp.py index c37139a93a..ee9557cff0 100644 --- a/quantum/tests/unit/test_linux_dhcp.py +++ b/quantum/tests/unit/test_linux_dhcp.py @@ -138,22 +138,6 @@ class TestDhcpBase(testtools.TestCase): def test_base_abc_error(self): self.assertRaises(TypeError, dhcp.DhcpBase, None) - def test_replace_file(self): - # make file to replace - with mock.patch('tempfile.NamedTemporaryFile') as ntf: - ntf.return_value.name = '/baz' - with mock.patch('os.chmod') as chmod: - with mock.patch('os.rename') as rename: - dhcp.replace_file('/foo', 'bar') - - expected = [mock.call('w+', dir='/', delete=False), - mock.call().write('bar'), - mock.call().close()] - - ntf.assert_has_calls(expected) - chmod.assert_called_once_with('/baz', 0644) - rename.assert_called_once_with('/baz', '/foo') - def test_restart(self): class SubClass(dhcp.DhcpBase): def __init__(self): @@ -212,7 +196,7 @@ class TestBase(testtools.TestCase): self.conf.set_override('state_path', '') self.conf.use_namespaces = True - self.replace_p = mock.patch('quantum.agent.linux.dhcp.replace_file') + self.replace_p = mock.patch('quantum.agent.linux.utils.replace_file') self.execute_p = mock.patch('quantum.agent.linux.utils.execute') self.addCleanup(self.execute_p.stop) self.safe = self.replace_p.start() @@ -392,7 +376,7 @@ class TestDhcpLocalProcess(TestBase): self.assertEqual(lp.interface_name, 'tap0') def test_set_interface_name(self): - with mock.patch('quantum.agent.linux.dhcp.replace_file') as replace: + with mock.patch('quantum.agent.linux.utils.replace_file') as replace: lp = LocalChild(self.conf, FakeDualNetwork()) with mock.patch.object(lp, 'get_conf_file_name') as conf_file: conf_file.return_value = '/interface' diff --git a/setup.py b/setup.py index e265ab2eca..ac701900a7 100644 --- a/setup.py +++ b/setup.py @@ -137,6 +137,8 @@ else: 'quantum-debug = quantum.debug.shell:main', 'quantum-ovs-cleanup = quantum.agent.ovs_cleanup_util:main', 'quantum-db-manage = quantum.db.migration.cli:main', + ('quantum-lbaas-agent = ' + 'quantum.plugins.services.agent_loadbalancer.agent:main'), ('quantum-check-nvp-config = ' 'quantum.plugins.nicira.nicira_nvp_plugin.check_nvp_config:main'), ]