From b836e71eb1d2136795817ead27e1d9fa0ce65f6e Mon Sep 17 00:00:00 2001 From: gongysh Date: Mon, 12 Nov 2012 20:28:16 +0800 Subject: [PATCH] l3 agent rpc On one hand, we sync router data (including routers, their gw ports, interfaces and floatingips) from l3_agent to quantum server periodically if needed. On the other hand, we notify l3 agent from quantum server when we delete or update a router's stuff, such as floating IP, interface and gwport and router itself. blueprint rpc-for-l3-agent bug #1080286 Change-Id: I60f3081975fc7164b22f9e9fa941e702a3f4c663 --- etc/l3_agent.ini | 17 +- openstack-common.conf | 2 +- quantum/agent/l3_agent.py | 299 ++++++++++-------- quantum/api/v2/base.py | 10 - quantum/common/config.py | 4 +- quantum/common/constants.py | 6 + quantum/common/exceptions.py | 4 + quantum/common/topics.py | 2 + quantum/common/utils.py | 5 + quantum/db/l3_db.py | 187 ++++++++++- quantum/db/l3_rpc_agent_api.py | 50 +++ quantum/db/l3_rpc_base.py | 56 ++++ quantum/manager.py | 45 ++- quantum/openstack/common/periodic_task.py | 111 +++++++ .../plugins/linuxbridge/lb_quantum_plugin.py | 4 +- .../plugins/openvswitch/ovs_quantum_plugin.py | 4 +- quantum/service.py | 142 +++++++++ quantum/tests/unit/test_db_plugin.py | 2 +- quantum/tests/unit/test_l3_agent.py | 142 ++++----- quantum/tests/unit/test_l3_plugin.py | 149 ++++++++- 20 files changed, 975 insertions(+), 266 deletions(-) create mode 100644 quantum/db/l3_rpc_agent_api.py create mode 100644 quantum/db/l3_rpc_base.py create mode 100644 quantum/openstack/common/periodic_task.py diff --git a/etc/l3_agent.ini b/etc/l3_agent.ini index 7be4853b42..84149cd6a6 100644 --- a/etc/l3_agent.ini +++ b/etc/l3_agent.ini @@ -10,13 +10,6 @@ interface_driver = quantum.agent.linux.interface.OVSInterfaceDriver # LinuxBridge #interface_driver = quantum.agent.linux.interface.BridgeInterfaceDriver -# The Quantum user information for accessing the Quantum API. -auth_url = http://localhost:35357/v2.0 -auth_region = RegionOne -admin_tenant_name = %SERVICE_TENANT_NAME% -admin_user = %SERVICE_USER% -admin_password = %SERVICE_PASSWORD% - # Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real # root filter facility. # Change to "sudo" to skip the filtering and just run the comand directly @@ -54,9 +47,13 @@ root_helper = sudo # TCP Port used by Quantum metadata server # metadata_port = 9697 -# The time in seconds between state poll requests -# polling_interval = 3 - # Send this many gratuitous ARPs for HA setup. Set it below or equal to 0 # to disable this feature. # send_arp_for_ha = 3 + +# seconds between re-sync routers' data if needed +# periodic_interval = 40 + +# seconds to start to sync routers' data after +# starting agent +# periodic_fuzzy_delay = 5 diff --git a/openstack-common.conf b/openstack-common.conf index 7bb3a36562..00364b7bde 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,5 +1,5 @@ [DEFAULT] # The list of modules to copy from openstack-common -modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,policy,rpc,service,setup,threadgroup,timeutils,uuidutils,version +modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,periodic_task,policy,rpc,service,setup,threadgroup,timeutils,uuidutils,version # The base module to hold the copy of openstack.common base=quantum diff --git a/quantum/agent/l3_agent.py b/quantum/agent/l3_agent.py index 1eb32a8038..4a54d569c1 100644 --- a/quantum/agent/l3_agent.py +++ b/quantum/agent/l3_agent.py @@ -20,8 +20,9 @@ """ import sys -import time +import eventlet +from eventlet import semaphore import netaddr from quantum.agent.common import config @@ -30,11 +31,19 @@ from quantum.agent.linux import interface from quantum.agent.linux import ip_lib from quantum.agent.linux import iptables_manager from quantum.agent.linux import utils -from quantum.db import l3_db +from quantum.common import constants as l3_constants +from quantum.common import topics +from quantum import context +from quantum import manager from quantum.openstack.common import cfg from quantum.openstack.common import importutils from quantum.openstack.common import log as logging -from quantumclient.v2_0 import client +from quantum.openstack.common import periodic_task +from quantum.openstack.common.rpc import common as rpc_common +from quantum.openstack.common.rpc import proxy +from quantum.openstack.common import service +from quantum import service as quantum_service + LOG = logging.getLogger(__name__) NS_PREFIX = 'qrouter-' @@ -42,16 +51,53 @@ INTERNAL_DEV_PREFIX = 'qr-' EXTERNAL_DEV_PREFIX = 'qg-' +class L3PluginApi(proxy.RpcProxy): + """Agent side of the l3 agent RPC API. + + API version history: + 1.0 - Initial version. + + """ + + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, topic, host): + super(L3PluginApi, self).__init__( + topic=topic, default_version=self.BASE_RPC_API_VERSION) + self.host = host + + def get_routers(self, context, fullsync=True, router_id=None): + """Make a remote process call to retrieve the sync data for routers.""" + router_ids = [router_id] if router_id else None + return self.call(context, + self.make_msg('sync_routers', host=self.host, + fullsync=fullsync, + router_ids=router_ids), + topic=self.topic) + + def get_external_network_id(self, context): + """Make a remote process call to retrieve the external network id. + + @raise common.RemoteError: with TooManyExternalNetworks + as exc_type if there are + more than one external network + """ + return self.call(context, + self.make_msg('get_external_network_id', + host=self.host), + topic=self.topic) + + class RouterInfo(object): - def __init__(self, router_id, root_helper, use_namespaces): + def __init__(self, router_id, root_helper, use_namespaces, router=None): self.router_id = router_id self.ex_gw_port = None self.internal_ports = [] self.floating_ips = [] self.root_helper = root_helper self.use_namespaces = use_namespaces - + self.router = router self.iptables_manager = iptables_manager.IptablesManager( root_helper=root_helper, #FIXME(danwent): use_ipv6=True, @@ -62,23 +108,14 @@ class RouterInfo(object): return NS_PREFIX + self.router_id -class L3NATAgent(object): +class L3NATAgent(manager.Manager): OPTS = [ - cfg.StrOpt('admin_user'), - cfg.StrOpt('admin_password'), - cfg.StrOpt('admin_tenant_name'), - cfg.StrOpt('auth_url'), - cfg.StrOpt('auth_strategy', default='keystone'), - cfg.StrOpt('auth_region'), cfg.StrOpt('root_helper', default='sudo'), cfg.StrOpt('external_network_bridge', default='br-ex', help="Name of bridge used for external network traffic."), cfg.StrOpt('interface_driver', help="The driver used to manage the virtual interface."), - cfg.IntOpt('polling_interval', - default=3, - help="The time in seconds between state poll requests."), cfg.IntOpt('metadata_port', default=9697, help="TCP Port used by Quantum metadata namespace proxy."), @@ -97,36 +134,33 @@ class L3NATAgent(object): cfg.StrOpt('gateway_external_network_id', default='', help="UUID of external network for routers implemented " "by the agents."), + cfg.StrOpt('l3_agent_manager', + default='quantum.agent.l3_agent.L3NATAgent'), ] - def __init__(self, conf): - self.conf = conf + def __init__(self, host, conf=None): + if conf: + self.conf = conf + else: + self.conf = cfg.CONF self.router_info = {} - if not conf.interface_driver: - LOG.error(_('You must specify an interface driver')) + if not self.conf.interface_driver: + LOG.error(_('An interface driver must be specified')) sys.exit(1) try: - self.driver = importutils.import_object(conf.interface_driver, - conf) + self.driver = importutils.import_object(self.conf.interface_driver, + self.conf) except: - LOG.exception(_("Error importing interface driver '%s'"), - conf.interface_driver) + LOG.exception(_("Error importing interface driver '%s'" + % self.conf.interface_driver)) sys.exit(1) - - self.polling_interval = conf.polling_interval - - self.qclient = client.Client( - username=self.conf.admin_user, - password=self.conf.admin_password, - tenant_name=self.conf.admin_tenant_name, - auth_url=self.conf.auth_url, - auth_strategy=self.conf.auth_strategy, - region_name=self.conf.auth_region - ) - + self.plugin_rpc = L3PluginApi(topics.PLUGIN, host) + self.fullsync = True + self.sync_sem = semaphore.Semaphore(1) if self.conf.use_namespaces: self._destroy_all_router_namespaces() + super(L3NATAgent, self).__init__(host=self.conf.host) def _destroy_all_router_namespaces(self): """Destroy all router namespaces on the host to eliminate @@ -138,7 +172,7 @@ class L3NATAgent(object): try: self._destroy_router_namespace(ns) except: - LOG.exception(_("Couldn't delete namespace '%s'"), ns) + LOG.exception(_("Failed deleting namespace '%s'") % ns) def _destroy_router_namespace(self, namespace): ns_ip = ip_lib.IPWrapper(self.conf.root_helper, @@ -160,81 +194,25 @@ class L3NATAgent(object): ip_wrapper = ip_wrapper_root.ensure_namespace(ri.ns_name()) ip_wrapper.netns.execute(['sysctl', '-w', 'net.ipv4.ip_forward=1']) - def daemon_loop(self): - #TODO(danwent): this simple diff logic does not handle if - # details of a router port (e.g., IP, mac) are changed behind - # our back. Will fix this properly with update notifications. - - while True: - try: - self.do_single_loop() - except: - LOG.exception(_("Error running l3_nat daemon_loop")) - - time.sleep(self.polling_interval) - def _fetch_external_net_id(self): """Find UUID of single external network for this agent""" if self.conf.gateway_external_network_id: return self.conf.gateway_external_network_id - - params = {'router:external': True} - ex_nets = self.qclient.list_networks(**params)['networks'] - if len(ex_nets) > 1: - raise Exception(_("Must configure 'gateway_external_network_id' " - "if Quantum has more than one external " - "network.")) - if len(ex_nets) == 0: - return None - return ex_nets[0]['id'] - - def do_single_loop(self): - - if (self.conf.external_network_bridge and - not ip_lib.device_exists(self.conf.external_network_bridge)): - LOG.error(_("External network bridge '%s' does not exist"), - self.conf.external_network_bridge) - return - - prev_router_ids = set(self.router_info) - cur_router_ids = set() - - target_ex_net_id = self._fetch_external_net_id() - - # identify and update new or modified routers - for r in self.qclient.list_routers()['routers']: - if not r['admin_state_up']: - continue - - ex_net_id = (r['external_gateway_info'] and - r['external_gateway_info'].get('network_id')) - if not ex_net_id and not self.conf.handle_internal_only_routers: - continue - - if ex_net_id and ex_net_id != target_ex_net_id: - continue - - # If namespaces are disabled, only process the router associated - # with the configured agent id. - if (self.conf.use_namespaces or - r['id'] == self.conf.router_id): - cur_router_ids.add(r['id']) + try: + return self.plugin_rpc.get_external_network_id( + context.get_admin_context()) + except rpc_common.RemoteError as e: + if e.exc_type == 'TooManyExternalNetworks': + msg = _( + "The 'gateway_external_network_id' must be configured" + " if Quantum has more than one external network.") + raise Exception(msg) else: - continue - if r['id'] not in self.router_info: - self._router_added(r['id']) + raise - ri = self.router_info[r['id']] - self.process_router(ri) - - # identify and remove routers that no longer exist - for router_id in prev_router_ids - cur_router_ids: - self._router_removed(router_id) - prev_router_ids = cur_router_ids - - def _router_added(self, router_id): + def _router_added(self, router_id, router=None): ri = RouterInfo(router_id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, router) self.router_info[router_id] = ri if self.conf.use_namespaces: self._create_router_namespace(ri) @@ -283,20 +261,15 @@ class L3NATAgent(object): if not ips: raise Exception(_("Router port %s has no IP address") % port['id']) if len(ips) > 1: - LOG.error(_("Ignoring multiple IPs on router port %s"), port['id']) - port['subnet'] = self.qclient.show_subnet( - ips[0]['subnet_id'])['subnet'] + LOG.error(_("Ignoring multiple IPs on router port %s") % + port['id']) prefixlen = netaddr.IPNetwork(port['subnet']['cidr']).prefixlen port['ip_cidr'] = "%s/%s" % (ips[0]['ip_address'], prefixlen) def process_router(self, ri): ex_gw_port = self._get_ex_gw_port(ri) - - internal_ports = self.qclient.list_ports( - device_id=ri.router_id, - device_owner=l3_db.DEVICE_OWNER_ROUTER_INTF)['ports'] - + internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, []) existing_port_ids = set([p['id'] for p in ri.internal_ports]) current_port_ids = set([p['id'] for p in internal_ports if p['admin_state_up']]) @@ -333,8 +306,7 @@ class L3NATAgent(object): ri.ex_gw_port = ex_gw_port def process_router_floating_ips(self, ri, ex_gw_port): - floating_ips = self.qclient.list_floatingips( - router_id=ri.router_id)['floatingips'] + floating_ips = ri.router.get(l3_constants.FLOATINGIP_KEY, []) existing_floating_ip_ids = set([fip['id'] for fip in ri.floating_ips]) cur_floating_ip_ids = set([fip['id'] for fip in floating_ips]) @@ -375,16 +347,7 @@ class L3NATAgent(object): ri.floating_ips.append(new_fip) def _get_ex_gw_port(self, ri): - ports = self.qclient.list_ports( - device_id=ri.router_id, - device_owner=l3_db.DEVICE_OWNER_ROUTER_GW)['ports'] - if not ports: - return None - elif len(ports) == 1: - return ports[0] - else: - LOG.error(_("Ignoring multiple gateway ports for router %s"), - ri.router_id) + return ri.router.get('gw_port') def _send_gratuitous_arp_packet(self, ri, interface_name, ip_address): if self.conf.send_arp_for_ha > 0: @@ -562,14 +525,94 @@ class L3NATAgent(object): ('float-snat', '-s %s -j SNAT --to %s' % (fixed_ip, floating_ip))] + def router_deleted(self, context, router_id): + """Deal with router deletion RPC message.""" + with self.sync_sem: + if router_id in self.router_info: + try: + self._router_removed(router_id) + except Exception: + msg = _("Failed dealing with router " + "'%s' deletion RPC message") + LOG.debug(msg, router_id) + self.fullsync = True + + def routers_updated(self, context, routers): + """Deal with routers modification and creation RPC message.""" + if not routers: + return + with self.sync_sem: + try: + self._process_routers(routers) + except Exception: + msg = _("Failed dealing with routers update RPC message") + LOG.debug(msg) + self.fullsync = True + + def _process_routers(self, routers): + if (self.conf.external_network_bridge and + not ip_lib.device_exists(self.conf.external_network_bridge)): + LOG.error(_("The external network bridge '%s' does not exist") + % self.conf.external_network_bridge) + return + + target_ex_net_id = self._fetch_external_net_id() + + for r in routers: + if not r['admin_state_up']: + continue + + # If namespaces are disabled, only process the router associated + # with the configured agent id. + if (not self.conf.use_namespaces and + r['id'] != self.conf.router_id): + continue + + ex_net_id = (r['external_gateway_info'] or {}).get('network_id') + if not ex_net_id and not self.conf.handle_internal_only_routers: + continue + + if ex_net_id and ex_net_id != target_ex_net_id: + continue + + if r['id'] not in self.router_info: + self._router_added(r['id']) + + ri = self.router_info[r['id']] + ri.router = r + self.process_router(ri) + + @periodic_task.periodic_task + def _sync_routers_task(self, context): + # we need to sync with router deletion RPC message + with self.sync_sem: + if self.fullsync: + try: + if not self.conf.use_namespaces: + router_id = self.conf.router_id + else: + router_id = None + routers = self.plugin_rpc.get_routers( + context, router_id) + self.router_info = {} + self._process_routers(routers) + self.fullsync = False + except Exception: + LOG.exception(_("Failed synchronizing routers")) + self.fullsync = True + + def after_start(self): + LOG.info(_("L3 agent started")) + def main(): - conf = config.setup_conf() + eventlet.monkey_patch() + conf = cfg.CONF conf.register_opts(L3NATAgent.OPTS) conf.register_opts(interface.OPTS) conf.register_opts(external_process.OPTS) conf(sys.argv) config.setup_logging(conf) - - mgr = L3NATAgent(conf) - mgr.daemon_loop() + server = quantum_service.Service.create(binary='quantum-l3-agent', + topic=topics.L3_AGENT) + service.launch(server).wait() diff --git a/quantum/api/v2/base.py b/quantum/api/v2/base.py index 1076e48afd..02b2e935b1 100644 --- a/quantum/api/v2/base.py +++ b/quantum/api/v2/base.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import socket - import netaddr import webob.exc @@ -54,14 +52,6 @@ FAULT_MAP = {exceptions.NotFound: webob.exc.HTTPNotFound, QUOTAS = quota.QUOTAS -def _get_hostname(): - return socket.gethostname() - - -# Register the configuration options -cfg.CONF.register_opt(cfg.StrOpt('host', default=_get_hostname())) - - def _fields(request): """ Extracts the list of fields to return diff --git a/quantum/common/config.py b/quantum/common/config.py index ee5c514e36..709471e665 100644 --- a/quantum/common/config.py +++ b/quantum/common/config.py @@ -25,6 +25,7 @@ import sys from paste import deploy from quantum.api.v2 import attributes +from quantum.common import utils from quantum.openstack.common import cfg from quantum.openstack.common import log as logging from quantum.version import version_info as quantum_version @@ -53,7 +54,8 @@ core_opts = [ cfg.BoolOpt('allow_overlapping_ips', default=False), cfg.StrOpt('control_exchange', default='quantum', - help='AMQP exchange to connect to if using RabbitMQ or Qpid') + help='AMQP exchange to connect to if using RabbitMQ or Qpid'), + cfg.StrOpt('host', default=utils.get_hostname()), ] diff --git a/quantum/common/constants.py b/quantum/common/constants.py index a803cdf589..2f4efb64db 100644 --- a/quantum/common/constants.py +++ b/quantum/common/constants.py @@ -22,3 +22,9 @@ PORT_STATUS_ACTIVE = 'ACTIVE' PORT_STATUS_BUILD = 'BUILD' PORT_STATUS_DOWN = 'DOWN' PORT_STATUS_ERROR = 'ERROR' + +DEVICE_OWNER_ROUTER_INTF = "network:router_interface" +DEVICE_OWNER_ROUTER_GW = "network:router_gateway" +DEVICE_OWNER_FLOATINGIP = "network:floatingip" +FLOATINGIP_KEY = '_floatingips' +INTERFACE_KEY = '_interfaces' diff --git a/quantum/common/exceptions.py b/quantum/common/exceptions.py index 365687bad3..4d1013c574 100644 --- a/quantum/common/exceptions.py +++ b/quantum/common/exceptions.py @@ -235,3 +235,7 @@ class InvalidSharedSetting(QuantumException): class InvalidExtenstionEnv(QuantumException): message = _("Invalid extension environment: %(reason)s") + + +class TooManyExternalNetworks(QuantumException): + message = _("More than one external network exists") diff --git a/quantum/common/topics.py b/quantum/common/topics.py index d46769b6de..15621ef59d 100644 --- a/quantum/common/topics.py +++ b/quantum/common/topics.py @@ -25,6 +25,8 @@ AGENT = 'q-agent-notifier' PLUGIN = 'q-plugin' DHCP = 'q-dhcp-notifer' +L3_AGENT = 'l3_agent' + def get_topic_name(prefix, table, operation): """Create a topic name. diff --git a/quantum/common/utils.py b/quantum/common/utils.py index ed8e6d772e..aeca0484e9 100644 --- a/quantum/common/utils.py +++ b/quantum/common/utils.py @@ -23,6 +23,7 @@ import os import signal +import socket from eventlet.green import subprocess @@ -145,3 +146,7 @@ def parse_mappings(mapping_list, unique_values=True): (value, mapping)) mappings[key] = value return mappings + + +def get_hostname(): + return socket.getfqdn() diff --git a/quantum/db/l3_db.py b/quantum/db/l3_db.py index 19ae37d107..78ac1cf5ee 100644 --- a/quantum/db/l3_db.py +++ b/quantum/db/l3_db.py @@ -25,12 +25,13 @@ from sqlalchemy.sql import expression as expr import webob.exc as w_exc from quantum.api.v2 import attributes +from quantum.common import constants as l3_constants from quantum.common import exceptions as q_exc from quantum.db import db_base_plugin_v2 +from quantum.db import l3_rpc_agent_api from quantum.db import model_base from quantum.db import models_v2 from quantum.extensions import l3 -from quantum.openstack.common import cfg from quantum.openstack.common import log as logging from quantum.openstack.common import uuidutils from quantum import policy @@ -38,17 +39,10 @@ from quantum import policy LOG = logging.getLogger(__name__) -l3_opts = [ - cfg.StrOpt('metadata_ip_address', default='127.0.0.1'), - cfg.IntOpt('metadata_port', default=8775) -] -# Register the configuration options -cfg.CONF.register_opts(l3_opts) - -DEVICE_OWNER_ROUTER_INTF = "network:router_interface" -DEVICE_OWNER_ROUTER_GW = "network:router_gateway" -DEVICE_OWNER_FLOATINGIP = "network:floatingip" +DEVICE_OWNER_ROUTER_INTF = l3_constants.DEVICE_OWNER_ROUTER_INTF +DEVICE_OWNER_ROUTER_GW = l3_constants.DEVICE_OWNER_ROUTER_GW +DEVICE_OWNER_FLOATINGIP = l3_constants.DEVICE_OWNER_FLOATINGIP class Router(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): @@ -164,6 +158,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): # Ensure we actually have something to update if r.keys(): router_db.update(r) + routers = self.get_sync_data(context.elevated(), + [router_db['id']]) + l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers) return self._make_router_dict(router_db) def _update_router_gw_info(self, context, router_id, info): @@ -250,6 +247,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): self._delete_port(context.elevated(), ports[0]['id']) context.session.delete(router) + l3_rpc_agent_api.L3AgentNofity.router_deleted(context, id) def get_router(self, context, id, fields=None): router = self._get_router(context, id) @@ -324,9 +322,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): self._check_for_dup_router_subnet(context, router_id, port['network_id'], fixed_ips[0]['subnet_id']) - with context.session.begin(subtransactions=True): - port.update({'device_id': router_id, - 'device_owner': DEVICE_OWNER_ROUTER_INTF}) + port.update({'device_id': router_id, + 'device_owner': DEVICE_OWNER_ROUTER_INTF}) elif 'subnet_id' in interface_info: subnet_id = interface_info['subnet_id'] subnet = self._get_subnet(context, subnet_id) @@ -348,6 +345,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): 'device_id': router_id, 'device_owner': DEVICE_OWNER_ROUTER_INTF, 'name': ''}}) + + routers = self.get_sync_data(context.elevated(), [router_id]) + l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers) return {'port_id': port['id'], 'subnet_id': port['fixed_ips'][0]['subnet_id']} @@ -423,6 +423,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): raise w_exc.HTTPNotFound("Router %(router_id)s has no " "interface on subnet %(subnet_id)s" % locals()) + routers = self.get_sync_data(context.elevated(), [router_id]) + l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers) def _get_floatingip(self, context, id): try: @@ -621,7 +623,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): except Exception: LOG.exception(_("Floating IP association failed")) raise - + router_id = floatingip_db['router_id'] + if router_id: + routers = self.get_sync_data(context.elevated(), [router_id]) + l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers) return self._make_floatingip_dict(floatingip_db) def update_floatingip(self, context, id, floatingip): @@ -631,18 +636,32 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): fip['tenant_id'] = floatingip_db['tenant_id'] fip['id'] = id fip_port_id = floatingip_db['floating_port_id'] + before_router_id = floatingip_db['router_id'] self._update_fip_assoc(context, fip, floatingip_db, self.get_port(context.elevated(), fip_port_id)) + router_ids = [] + if before_router_id: + router_ids.append(before_router_id) + router_id = floatingip_db['router_id'] + if router_id and router_id != before_router_id: + router_ids.append(router_id) + if router_ids: + routers = self.get_sync_data(context.elevated(), router_ids) + l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers) return self._make_floatingip_dict(floatingip_db) def delete_floatingip(self, context, id): floatingip = self._get_floatingip(context, id) + router_id = floatingip['router_id'] with context.session.begin(subtransactions=True): context.session.delete(floatingip) self.delete_port(context.elevated(), floatingip['floating_port_id'], l3_port_check=False) + if router_id: + routers = self.get_sync_data(context.elevated(), [router_id]) + l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers) def get_floatingip(self, context, id, fields=None): floatingip = self._get_floatingip(context, id) @@ -677,6 +696,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): try: fip_qry = context.session.query(FloatingIP) floating_ip = fip_qry.filter_by(fixed_port_id=port_id).one() + router_id = floating_ip['router_id'] floating_ip.update({'fixed_port_id': None, 'fixed_ip_address': None, 'router_id': None}) @@ -686,6 +706,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): # should never happen raise Exception('Multiple floating IPs found for port %s' % port_id) + if router_id: + routers = self.get_sync_data(context.elevated(), [router_id]) + l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers) def _check_l3_view_auth(self, context, network): return policy.check(context, @@ -761,3 +784,137 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): return [n for n in nets if n['id'] in ext_nets] else: return [n for n in nets if n['id'] not in ext_nets] + + def _get_sync_routers(self, context, router_ids=None): + """Query routers and their gw ports for l3 agent. + + Query routers with the router_ids. The gateway ports, if any, + will be queried too. + l3 agent has an option to deal with only one router id. In addition, + when we need to notify the agent the data about only one router + (when modification of router, its interfaces, gw_port and floatingips), + we will have router_ids. + @param router_ids: the list of router ids which we want to query. + if it is None, all of routers will be queried. + @return: a list of dicted routers with dicted gw_port populated if any + """ + router_query = context.session.query(Router) + if router_ids: + router_query = router_query.filter(Router.id.in_(router_ids)) + routers = router_query.all() + gw_port_ids = [] + if not routers: + return [] + for router in routers: + gw_port_id = router.gw_port_id + if gw_port_id: + gw_port_ids.append(gw_port_id) + gw_ports = [] + if gw_port_ids: + gw_ports = self._get_sync_gw_ports(context, gw_port_ids) + gw_port_id_gw_port_dict = {} + for gw_port in gw_ports: + gw_port_id_gw_port_dict[gw_port['id']] = gw_port + router_id_gw_port_id_dict = {} + for router in routers: + router_id_gw_port_id_dict[router.id] = router.gw_port_id + routers_list = [self._make_router_dict(c, None) for c in routers] + for router in routers_list: + gw_port_id = router_id_gw_port_id_dict[router['id']] + if gw_port_id: + router['gw_port'] = gw_port_id_gw_port_dict[gw_port_id] + return routers_list + + def _get_sync_floating_ips(self, context, router_ids): + """Query floating_ips that relate to list of router_ids.""" + if not router_ids: + return [] + return self.get_floatingips(context, {'router_id': router_ids}) + + def _get_sync_gw_ports(self, context, gw_port_ids): + if not gw_port_ids: + return [] + filters = {'id': gw_port_ids} + gw_ports = self.get_ports(context, filters) + if gw_ports: + self._populate_subnet_for_ports(context, gw_ports) + return gw_ports + + def _get_sync_interfaces(self, context, router_ids): + """Query router interfaces that relate to list of router_ids.""" + if not router_ids: + return [] + filters = {'device_id': router_ids, + 'device_owner': [DEVICE_OWNER_ROUTER_INTF]} + interfaces = self.get_ports(context, filters) + if interfaces: + self._populate_subnet_for_ports(context, interfaces) + return interfaces + + def _populate_subnet_for_ports(self, context, ports): + """Populate ports with subnet. + + These ports already have fixed_ips populated. + """ + if not ports: + return + subnet_id_ports_dict = {} + for port in ports: + fixed_ips = port.get('fixed_ips', []) + if len(fixed_ips) > 1: + LOG.error(_("Ignoring multiple IPs on router port %s") % + port['id']) + ports.remove(port) + continue + # Empty fixed_ips should not happen + fixed_ip = fixed_ips[0] + my_ports = subnet_id_ports_dict.get(fixed_ip['subnet_id'], []) + my_ports.append(port) + subnet_id_ports_dict[fixed_ip['subnet_id']] = my_ports + filters = {'id': subnet_id_ports_dict.keys()} + fields = ['id', 'cidr', 'gateway_ip'] + subnet_dicts = self.get_subnets(context, filters, fields) + for subnet_dict in subnet_dicts: + ports = subnet_id_ports_dict.get(subnet_dict['id'], []) + for port in ports: + # TODO(gongysh) stash the subnet into fixed_ips + # to make the payload smaller. + port['subnet'] = {'id': subnet_dict['id'], + 'cidr': subnet_dict['cidr'], + 'gateway_ip': subnet_dict['gateway_ip']} + + def _process_sync_data(self, routers, interfaces, floating_ips): + routers_dict = {} + for router in routers: + routers_dict[router['id']] = router + for floating_ip in floating_ips: + router = routers_dict.get(floating_ip['router_id']) + if router: + router_floatingips = router.get(l3_constants.FLOATINGIP_KEY, + []) + router_floatingips.append(floating_ip) + router[l3_constants.FLOATINGIP_KEY] = router_floatingips + for interface in interfaces: + router = routers_dict.get(interface['device_id']) + if router: + router_interfaces = router.get(l3_constants.INTERFACE_KEY, []) + router_interfaces.append(interface) + router[l3_constants.INTERFACE_KEY] = router_interfaces + return routers_dict.values() + + def get_sync_data(self, context, router_ids=None): + """Query routers and their related floating_ips, interfaces.""" + with context.session.begin(subtransactions=True): + routers = self._get_sync_routers(context, + router_ids) + router_ids = [router['id'] for router in routers] + floating_ips = self._get_sync_floating_ips(context, router_ids) + interfaces = self._get_sync_interfaces(context, router_ids) + return self._process_sync_data(routers, interfaces, floating_ips) + + def get_external_network_id(self, context): + nets = self.get_networks(context, {'router:external': [True]}) + if len(nets) > 1: + raise q_exc.TooManyExternalNetworks() + else: + return nets[0]['id'] if nets else None diff --git a/quantum/db/l3_rpc_agent_api.py b/quantum/db/l3_rpc_agent_api.py new file mode 100644 index 0000000000..de032d38d6 --- /dev/null +++ b/quantum/db/l3_rpc_agent_api.py @@ -0,0 +1,50 @@ +# Copyright (c) 2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from quantum.common import topics +from quantum.openstack.common import jsonutils +from quantum.openstack.common import log as logging +from quantum.openstack.common.rpc import proxy + + +LOG = logging.getLogger(__name__) + + +class L3AgentNotifyAPI(proxy.RpcProxy): + """API for plugin to notify L3 agent.""" + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, topic=topics.L3_AGENT): + super(L3AgentNotifyAPI, self).__init__( + topic=topic, default_version=self.BASE_RPC_API_VERSION) + + def router_deleted(self, context, router_id): + LOG.debug(_('Nofity agent the router %s is deleted'), router_id) + self.cast(context, + self.make_msg('router_deleted', + router_id=router_id), + topic=self.topic) + + def routers_updated(self, context, routers): + if routers: + LOG.debug(_('Nofity agent routers were updated:\n %s'), + jsonutils.dumps(routers, indent=5)) + self.cast(context, + self.make_msg('routers_updated', + routers=routers), + topic=self.topic) + + +L3AgentNofity = L3AgentNotifyAPI() diff --git a/quantum/db/l3_rpc_base.py b/quantum/db/l3_rpc_base.py new file mode 100644 index 0000000000..2a11b701e7 --- /dev/null +++ b/quantum/db/l3_rpc_base.py @@ -0,0 +1,56 @@ +# Copyright (c) 2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from quantum import context as quantum_context +from quantum import manager +from quantum.openstack.common import jsonutils +from quantum.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class L3RpcCallbackMixin(object): + """A mix-in that enable L3 agent rpc support in plugin implementations.""" + + def sync_routers(self, context, **kwargs): + """Sync routers according to filters to a specific agent. + + @param context: contain user information + @param kwargs: host, or router_id + @return: a list of routers + with their interfaces and floating_ips + """ + router_id = kwargs.get('router_id') + # TODO(gongysh) we will use host in kwargs for multi host BP + context = quantum_context.get_admin_context() + plugin = manager.QuantumManager.get_plugin() + routers = plugin.get_sync_data(context, router_id) + LOG.debug(_("Routers returned to l3 agent:\n %s"), + jsonutils.dumps(routers, indent=5)) + return routers + + def get_external_network_id(self, context, **kwargs): + """Get one external network id for l3 agent. + + l3 agent expects only on external network when it performs + this query. + """ + context = quantum_context.get_admin_context() + plugin = manager.QuantumManager.get_plugin() + net_id = plugin.get_external_network_id(context) + LOG.debug(_("External network ID returned to l3 agent: %s"), + net_id) + return net_id diff --git a/quantum/manager.py b/quantum/manager.py index e1cd169287..00aca48068 100644 --- a/quantum/manager.py +++ b/quantum/manager.py @@ -16,25 +16,54 @@ # under the License. # @author: Somik Behera, Nicira Networks, Inc. -""" -Quantum's Manager class is responsible for parsing a config file and -instantiating the correct plugin that concretely implement quantum_plugin_base -class. -The caller should make sure that QuantumManager is a singleton. -""" - from quantum.common.exceptions import ClassNotFound from quantum.openstack.common import cfg from quantum.openstack.common import importutils from quantum.openstack.common import log as logging +from quantum.openstack.common import periodic_task from quantum.plugins.common import constants LOG = logging.getLogger(__name__) -class QuantumManager(object): +class Manager(periodic_task.PeriodicTasks): + # Set RPC API version to 1.0 by default. + RPC_API_VERSION = '1.0' + + def __init__(self, host=None): + if not host: + host = cfg.CONF.host + self.host = host + super(Manager, self).__init__() + + def periodic_tasks(self, context, raise_on_error=False): + self.run_periodic_tasks(context, raise_on_error=raise_on_error) + + def init_host(self): + """Handle initialization if this is a standalone service. + + Child classes should override this method. + + """ + pass + + def after_start(self): + """Handler post initialization stuff. + + Child classes can override this method. + """ + pass + + +class QuantumManager(object): + """ + Quantum's Manager class is responsible for parsing a config file and + instantiating the correct plugin that concretely implement + quantum_plugin_base class. + The caller should make sure that QuantumManager is a singleton. + """ _instance = None def __init__(self, options=None, config_file=None): diff --git a/quantum/openstack/common/periodic_task.py b/quantum/openstack/common/periodic_task.py new file mode 100644 index 0000000000..ba2f5119a9 --- /dev/null +++ b/quantum/openstack/common/periodic_task.py @@ -0,0 +1,111 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from quantum.openstack.common.gettextutils import _ +from quantum.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +def periodic_task(*args, **kwargs): + """Decorator to indicate that a method is a periodic task. + + This decorator can be used in two ways: + + 1. Without arguments '@periodic_task', this will be run on every tick + of the periodic scheduler. + + 2. With arguments, @periodic_task(ticks_between_runs=N), this will be + run on every N ticks of the periodic scheduler. + """ + def decorator(f): + f._periodic_task = True + f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0) + return f + + # NOTE(sirp): The `if` is necessary to allow the decorator to be used with + # and without parens. + # + # In the 'with-parens' case (with kwargs present), this function needs to + # return a decorator function since the interpreter will invoke it like: + # + # periodic_task(*args, **kwargs)(f) + # + # In the 'without-parens' case, the original function will be passed + # in as the first argument, like: + # + # periodic_task(f) + if kwargs: + return decorator + else: + return decorator(args[0]) + + +class _PeriodicTasksMeta(type): + def __init__(cls, names, bases, dict_): + """Metaclass that allows us to collect decorated periodic tasks.""" + super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_) + + # NOTE(sirp): if the attribute is not present then we must be the base + # class, so, go ahead and initialize it. If the attribute is present, + # then we're a subclass so make a copy of it so we don't step on our + # parent's toes. + try: + cls._periodic_tasks = cls._periodic_tasks[:] + except AttributeError: + cls._periodic_tasks = [] + + try: + cls._ticks_to_skip = cls._ticks_to_skip.copy() + except AttributeError: + cls._ticks_to_skip = {} + + # This uses __dict__ instead of + # inspect.getmembers(cls, inspect.ismethod) so only the methods of the + # current class are added when this class is scanned, and base classes + # are not added redundantly. + for value in cls.__dict__.values(): + if getattr(value, '_periodic_task', False): + task = value + name = task.__name__ + cls._periodic_tasks.append((name, task)) + cls._ticks_to_skip[name] = task._ticks_between_runs + + +class PeriodicTasks(object): + __metaclass__ = _PeriodicTasksMeta + + def run_periodic_tasks(self, context, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + for task_name, task in self._periodic_tasks: + full_task_name = '.'.join([self.__class__.__name__, task_name]) + + ticks_to_skip = self._ticks_to_skip[task_name] + if ticks_to_skip > 0: + LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s" + " ticks left until next run"), locals()) + self._ticks_to_skip[task_name] -= 1 + continue + + self._ticks_to_skip[task_name] = task._ticks_between_runs + LOG.debug(_("Running periodic task %(full_task_name)s"), locals()) + + try: + task(self, context) + except Exception as e: + if raise_on_error: + raise + LOG.exception(_("Error during %(full_task_name)s: %(e)s"), + locals()) diff --git a/quantum/plugins/linuxbridge/lb_quantum_plugin.py b/quantum/plugins/linuxbridge/lb_quantum_plugin.py index 5086662b7d..711a059c2e 100644 --- a/quantum/plugins/linuxbridge/lb_quantum_plugin.py +++ b/quantum/plugins/linuxbridge/lb_quantum_plugin.py @@ -24,6 +24,7 @@ from quantum.db import api as db_api from quantum.db import db_base_plugin_v2 from quantum.db import dhcp_rpc_base from quantum.db import l3_db +from quantum.db import l3_rpc_base from quantum.extensions import providernet as provider from quantum.openstack.common import cfg from quantum.openstack.common import log as logging @@ -37,7 +38,8 @@ from quantum import policy LOG = logging.getLogger(__name__) -class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin): +class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, + l3_rpc_base.L3RpcCallbackMixin): # Set RPC API version to 1.0 by default. RPC_API_VERSION = '1.0' diff --git a/quantum/plugins/openvswitch/ovs_quantum_plugin.py b/quantum/plugins/openvswitch/ovs_quantum_plugin.py index f7416b139b..eea17377cb 100644 --- a/quantum/plugins/openvswitch/ovs_quantum_plugin.py +++ b/quantum/plugins/openvswitch/ovs_quantum_plugin.py @@ -30,6 +30,7 @@ from quantum.common import topics from quantum.db import db_base_plugin_v2 from quantum.db import dhcp_rpc_base from quantum.db import l3_db +from quantum.db import l3_rpc_base from quantum.extensions import providernet as provider from quantum.openstack.common import cfg from quantum.openstack.common import log as logging @@ -44,7 +45,8 @@ from quantum import policy LOG = logging.getLogger(__name__) -class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin): +class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, + l3_rpc_base.L3RpcCallbackMixin): # Set RPC API version to 1.0 by default. RPC_API_VERSION = '1.0' diff --git a/quantum/service.py b/quantum/service.py index 39c0850f89..3876bff64f 100644 --- a/quantum/service.py +++ b/quantum/service.py @@ -15,14 +15,39 @@ # License for the specific language governing permissions and limitations # under the License. +import inspect import logging as std_logging +import os +import random from quantum.common import config +from quantum import context from quantum.openstack.common import cfg +from quantum.openstack.common import importutils from quantum.openstack.common import log as logging +from quantum.openstack.common import loopingcall +from quantum.openstack.common.rpc import service from quantum import wsgi +LOG = logging.getLogger(__name__) + +service_opts = [ + cfg.IntOpt('report_interval', + default=10, + help='seconds between nodes reporting state to datastore'), + cfg.IntOpt('periodic_interval', + default=40, + help='seconds between running periodic tasks'), + cfg.IntOpt('periodic_fuzzy_delay', + default=5, + help='range of seconds to randomly delay when starting the' + ' periodic task scheduler to reduce stampeding.' + ' (Disable by setting to 0)'), +] +CONF = cfg.CONF +CONF.register_opts(service_opts) + LOG = logging.getLogger(__name__) @@ -91,3 +116,120 @@ def _run_wsgi(app_name): {'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port}) return server + + +class Service(service.Service): + """Service object for binaries running on hosts. + + A service takes a manager and enables rpc by listening to queues based + on topic. It also periodically runs tasks on the manager.""" + + def __init__(self, host, binary, topic, manager, report_interval=None, + periodic_interval=None, periodic_fuzzy_delay=None, + *args, **kwargs): + + self.binary = binary + self.manager_class_name = manager + manager_class = importutils.import_class(self.manager_class_name) + self.manager = manager_class(host=host, *args, **kwargs) + self.report_interval = report_interval + self.periodic_interval = periodic_interval + self.periodic_fuzzy_delay = periodic_fuzzy_delay + self.saved_args, self.saved_kwargs = args, kwargs + self.timers = [] + super(Service, self).__init__(host, topic, manager=self.manager) + + def start(self): + self.manager.init_host() + super(Service, self).start() + if self.report_interval: + pulse = loopingcall.LoopingCall(self.report_state) + pulse.start(interval=self.report_interval, + initial_delay=self.report_interval) + self.timers.append(pulse) + + if self.periodic_interval: + if self.periodic_fuzzy_delay: + initial_delay = random.randint(0, self.periodic_fuzzy_delay) + else: + initial_delay = None + + periodic = loopingcall.LoopingCall(self.periodic_tasks) + periodic.start(interval=self.periodic_interval, + initial_delay=initial_delay) + self.timers.append(periodic) + self.manager.after_start() + + def __getattr__(self, key): + manager = self.__dict__.get('manager', None) + return getattr(manager, key) + + @classmethod + def create(cls, host=None, binary=None, topic=None, manager=None, + report_interval=None, periodic_interval=None, + periodic_fuzzy_delay=None): + """Instantiates class and passes back application object. + + :param host: defaults to CONF.host + :param binary: defaults to basename of executable + :param topic: defaults to bin_name - 'nova-' part + :param manager: defaults to CONF._manager + :param report_interval: defaults to CONF.report_interval + :param periodic_interval: defaults to CONF.periodic_interval + :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay + + """ + if not host: + host = CONF.host + if not binary: + binary = os.path.basename(inspect.stack()[-1][1]) + if not topic: + topic = binary.rpartition('quantum-')[2] + topic = topic.replace("-", "_") + if not manager: + manager = CONF.get('%s_manager' % topic, None) + if report_interval is None: + report_interval = CONF.report_interval + if periodic_interval is None: + periodic_interval = CONF.periodic_interval + if periodic_fuzzy_delay is None: + periodic_fuzzy_delay = CONF.periodic_fuzzy_delay + service_obj = cls(host, binary, topic, manager, + report_interval=report_interval, + periodic_interval=periodic_interval, + periodic_fuzzy_delay=periodic_fuzzy_delay) + + return service_obj + + def kill(self): + """Destroy the service object.""" + self.stop() + + def stop(self): + super(Service, self).stop() + for x in self.timers: + try: + x.stop() + except Exception: + LOG.exception("exception occurs when timer stops") + pass + self.timers = [] + + def wait(self): + super(Service, self).wait() + for x in self.timers: + try: + x.wait() + except Exception: + LOG.exception("exception occurs when waiting for timer") + pass + + def periodic_tasks(self, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + ctxt = context.get_admin_context() + self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) + + def report_state(self): + """Update the state of this service.""" + # Todo(gongysh) report state to quantum server + pass diff --git a/quantum/tests/unit/test_db_plugin.py b/quantum/tests/unit/test_db_plugin.py index 339e7c07ba..049cb07207 100644 --- a/quantum/tests/unit/test_db_plugin.py +++ b/quantum/tests/unit/test_db_plugin.py @@ -445,7 +445,7 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): self._delete('subnets', subnet['subnet']['id']) @contextlib.contextmanager - def port(self, subnet=None, fixed_ips=None, fmt='json', no_delete=False, + def port(self, subnet=None, fmt='json', no_delete=False, **kwargs): if not subnet: with self.subnet() as subnet: diff --git a/quantum/tests/unit/test_l3_agent.py b/quantum/tests/unit/test_l3_agent.py index 44f16ba9d7..dc474c5bae 100644 --- a/quantum/tests/unit/test_l3_agent.py +++ b/quantum/tests/unit/test_l3_agent.py @@ -16,25 +16,27 @@ # under the License. import copy -import time -import unittest +import unittest2 import mock -from quantum.agent.common import config from quantum.agent import l3_agent from quantum.agent.linux import interface -from quantum.db import l3_db +from quantum.common import config as base_config +from quantum.common import constants as l3_constants +from quantum.openstack.common import cfg from quantum.openstack.common import uuidutils _uuid = uuidutils.generate_uuid +HOSTNAME = 'myhost' -class TestBasicRouterOperations(unittest.TestCase): +class TestBasicRouterOperations(unittest2.TestCase): def setUp(self): - self.conf = config.setup_conf() + self.conf = cfg.CommonConfigOpts() + self.conf.register_opts(base_config.core_opts) self.conf.register_opts(l3_agent.L3NATAgent.OPTS) self.conf.register_opts(interface.OPTS) self.conf.set_override('interface_driver', @@ -65,14 +67,15 @@ class TestBasicRouterOperations(unittest.TestCase): self.mock_ip = mock.MagicMock() ip_cls.return_value = self.mock_ip - self.client_cls_p = mock.patch('quantumclient.v2_0.client.Client') - client_cls = self.client_cls_p.start() - self.client_inst = mock.Mock() - client_cls.return_value = self.client_inst + self.l3pluginApi_cls_p = mock.patch( + 'quantum.agent.l3_agent.L3PluginApi') + l3pluginApi_cls = self.l3pluginApi_cls_p.start() + self.plugin_api = mock.Mock() + l3pluginApi_cls.return_value = self.plugin_api def tearDown(self): self.device_exists_p.stop() - self.client_cls_p.stop() + self.l3pluginApi_cls_p.stop() self.ip_cls_p.stop() self.dvr_cls_p.stop() self.utils_exec_p.stop() @@ -86,7 +89,7 @@ class TestBasicRouterOperations(unittest.TestCase): self.assertTrue(ri.ns_name().endswith(id)) def testAgentCreate(self): - agent = l3_agent.L3NATAgent(self.conf) + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) def _test_internal_network_action(self, action): port_id = _uuid() @@ -94,7 +97,7 @@ class TestBasicRouterOperations(unittest.TestCase): network_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, self.conf.use_namespaces) - agent = l3_agent.L3NATAgent(self.conf) + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) interface_name = agent.get_internal_device_name(port_id) cidr = '99.0.1.9/24' mac = 'ca:fe:de:ad:be:ef' @@ -123,7 +126,7 @@ class TestBasicRouterOperations(unittest.TestCase): router_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, self.conf.use_namespaces) - agent = l3_agent.L3NATAgent(self.conf) + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) internal_cidrs = ['100.0.1.0/24', '200.74.0.0/16'] ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30', 'subnet_id': _uuid()}], @@ -167,7 +170,7 @@ class TestBasicRouterOperations(unittest.TestCase): router_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, self.conf.use_namespaces) - agent = l3_agent.L3NATAgent(self.conf) + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) floating_ip = '20.0.0.100' fixed_ip = '10.0.0.23' ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30', @@ -206,101 +209,79 @@ class TestBasicRouterOperations(unittest.TestCase): def testProcessRouter(self): - agent = l3_agent.L3NATAgent(self.conf) + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router_id = _uuid() - ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, - self.conf.use_namespaces) - - # return data so that state is built up ex_gw_port = {'id': _uuid(), 'network_id': _uuid(), 'fixed_ips': [{'ip_address': '19.4.4.4', - 'subnet_id': _uuid()}]} + 'subnet_id': _uuid()}], + 'subnet': {'cidr': '19.4.4.0/24', + 'gateway_ip': '19.4.4.1'}} internal_port = {'id': _uuid(), 'network_id': _uuid(), 'admin_state_up': True, 'fixed_ips': [{'ip_address': '35.4.4.4', 'subnet_id': _uuid()}], - 'mac_address': 'ca:fe:de:ad:be:ef'} - - def fake_list_ports1(**kwargs): - if kwargs['device_owner'] == l3_db.DEVICE_OWNER_ROUTER_GW: - return {'ports': [ex_gw_port]} - elif kwargs['device_owner'] == l3_db.DEVICE_OWNER_ROUTER_INTF: - return {'ports': [internal_port]} - - fake_subnet = {'subnet': {'cidr': '19.4.4.0/24', - 'gateway_ip': '19.4.4.1'}} + 'mac_address': 'ca:fe:de:ad:be:ef', + 'subnet': {'cidr': '35.4.4.0/24', + 'gateway_ip': '35.4.4.1'}} fake_floatingips1 = {'floatingips': [ {'id': _uuid(), 'floating_ip_address': '8.8.8.8', 'fixed_ip_address': '7.7.7.7', 'port_id': _uuid()}]} - - self.client_inst.list_ports.side_effect = fake_list_ports1 - self.client_inst.show_subnet.return_value = fake_subnet - self.client_inst.list_floatingips.return_value = fake_floatingips1 + router = { + 'id': router_id, + l3_constants.FLOATINGIP_KEY: fake_floatingips1['floatingips'], + l3_constants.INTERFACE_KEY: [internal_port], + 'gw_port': ex_gw_port} + ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, + self.conf.use_namespaces, router=router) agent.process_router(ri) # remap floating IP to a new fixed ip fake_floatingips2 = copy.deepcopy(fake_floatingips1) fake_floatingips2['floatingips'][0]['fixed_ip_address'] = '7.7.7.8' - self.client_inst.list_floatingips.return_value = fake_floatingips2 + router[l3_constants.FLOATINGIP_KEY] = fake_floatingips2['floatingips'] agent.process_router(ri) # remove just the floating ips - self.client_inst.list_floatingips.return_value = {'floatingips': []} + del router[l3_constants.FLOATINGIP_KEY] agent.process_router(ri) - # now return no ports so state is torn down - self.client_inst.list_ports.return_value = {'ports': []} + # now no ports so state is torn down + del router[l3_constants.INTERFACE_KEY] + del router['gw_port'] agent.process_router(ri) + def testRoutersWithAdminStateDown(self): + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + self.plugin_api.get_external_network_id.return_value = None + + routers = [ + {'id': _uuid(), + 'admin_state_up': False, + 'external_gateway_info': {}}] + agent._process_routers(routers) + self.assertNotIn(routers[0]['id'], agent.router_info) + def testSingleLoopRouterRemoval(self): - agent = l3_agent.L3NATAgent(self.conf) - router_id = _uuid() - - self.client_inst.list_ports.return_value = {'ports': []} - - self.client_inst.list_networks.return_value = {'networks': []} - - self.client_inst.list_routers.return_value = {'routers': [ - {'id': router_id, + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + self.plugin_api.get_external_network_id.return_value = None + routers = [ + {'id': _uuid(), 'admin_state_up': True, - 'external_gateway_info': {}}]} - agent.do_single_loop() - - self.client_inst.list_routers.return_value = {'routers': []} - agent.do_single_loop() - self.external_process.assert_has_calls( - [mock.call(agent.conf, router_id, 'sudo', 'qrouter-' + router_id), - mock.call().enable(mock.ANY), - mock.call(agent.conf, router_id, 'sudo', 'qrouter-' + router_id), - mock.call().disable()]) + 'external_gateway_info': {}}] + agent._process_routers(routers) + agent.router_deleted(None, routers[0]['id']) # verify that remove is called self.assertEquals(self.mock_ip.get_devices.call_count, 1) self.device_exists.assert_has_calls( [mock.call(self.conf.external_network_bridge)]) - def testDaemonLoop(self): - - # just take a pass through the loop, then raise on time.sleep() - time_sleep_p = mock.patch('time.sleep') - time_sleep = time_sleep_p.start() - - class ExpectedException(Exception): - pass - - time_sleep.side_effect = ExpectedException() - - agent = l3_agent.L3NATAgent(self.conf) - self.assertRaises(ExpectedException, agent.daemon_loop) - - time_sleep_p.stop() - def testDestroyNamespace(self): class FakeDev(object): @@ -311,16 +292,5 @@ class TestBasicRouterOperations(unittest.TestCase): self.mock_ip.get_devices.return_value = [FakeDev('qr-aaaa'), FakeDev('qgw-aaaa')] - agent = l3_agent.L3NATAgent(self.conf) + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) agent._destroy_all_router_namespaces() - - def testMain(self): - agent_mock_p = mock.patch('quantum.agent.l3_agent.L3NATAgent') - agent_mock = agent_mock_p.start() - agent_mock.daemon_loop.return_value = None - with mock.patch('quantum.agent.common.config.setup_logging'): - with mock.patch('quantum.agent.l3_agent.sys') as mock_sys: - mock_sys.argv = [] - l3_agent.main() - - agent_mock_p.stop() diff --git a/quantum/tests/unit/test_l3_plugin.py b/quantum/tests/unit/test_l3_plugin.py index 5f252c3cf6..556716cc2d 100644 --- a/quantum/tests/unit/test_l3_plugin.py +++ b/quantum/tests/unit/test_l3_plugin.py @@ -30,11 +30,13 @@ import webtest from quantum.api import extensions from quantum.api.v2 import attributes from quantum.common import config +from quantum.common import constants as l3_constants from quantum.common import exceptions as q_exc from quantum.common.test_lib import test_config from quantum import context from quantum.db import db_base_plugin_v2 from quantum.db import l3_db +from quantum.db import l3_rpc_agent_api from quantum.db import models_v2 from quantum.extensions import l3 from quantum import manager @@ -68,7 +70,6 @@ class L3NatExtensionTestCase(unittest.TestCase): def setUp(self): plugin = 'quantum.extensions.l3.RouterPluginBase' - # Ensure 'stale' patched copies of the plugin are never returned manager.QuantumManager._instance = None @@ -89,7 +90,6 @@ class L3NatExtensionTestCase(unittest.TestCase): self._plugin_patcher = mock.patch(plugin, autospec=True) self.plugin = self._plugin_patcher.start() - # Instantiate mock plugin and enable the 'router' extension manager.QuantumManager.get_plugin().supported_extension_aliases = ( ["router"]) @@ -118,9 +118,7 @@ class L3NatExtensionTestCase(unittest.TestCase): instance = self.plugin.return_value instance.create_router.return_value = return_value instance.get_routers_count.return_value = 0 - res = self.api.post_json(_get_path('routers'), data) - instance.create_router.assert_called_with(mock.ANY, router=data) self.assertEqual(res.status_int, exc.HTTPCreated.code) @@ -1194,3 +1192,146 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase): with self.network(router__external=True) as ext_net: self.assertEqual(ext_net['network'][l3.EXTERNAL], True) + + def _test_notify_op_agent(self, target_func, *args): + l3_rpc_agent_api_str = ( + 'quantum.db.l3_rpc_agent_api.L3AgentNotifyAPI') + oldNotify = l3_rpc_agent_api.L3AgentNofity + try: + with mock.patch(l3_rpc_agent_api_str) as notifyApi: + l3_rpc_agent_api.L3AgentNofity = notifyApi + kargs = [item for item in args] + kargs.append(notifyApi) + target_func(*kargs) + except: + l3_rpc_agent_api.L3AgentNofity = oldNotify + raise + else: + l3_rpc_agent_api.L3AgentNofity = oldNotify + + def _test_router_gateway_op_agent(self, notifyApi): + with self.router() as r: + with self.subnet() as s: + self._set_net_external(s['subnet']['network_id']) + self._add_external_gateway_to_router( + r['router']['id'], + s['subnet']['network_id']) + self._remove_external_gateway_from_router( + r['router']['id'], + s['subnet']['network_id']) + self.assertEquals( + 2, notifyApi.routers_updated.call_count) + + def test_router_gateway_op_agent(self): + self._test_notify_op_agent(self._test_router_gateway_op_agent) + + def _test_interfaces_op_agent(self, r, notifyApi): + with self.port(no_delete=True) as p: + self._router_interface_action('add', + r['router']['id'], + None, + p['port']['id']) + # clean-up + self._router_interface_action('remove', + r['router']['id'], + None, + p['port']['id']) + self.assertEquals(2, notifyApi.routers_updated.call_count) + + def test_interfaces_op_agent(self): + with self.router() as r: + self._test_notify_op_agent( + self._test_interfaces_op_agent, r) + + def _test_floatingips_op_agent(self, notifyApi): + with self.floatingip_with_assoc() as fip: + pass + # add gateway, add interface, associate, deletion of floatingip, + # delete gateway, delete interface + self.assertEquals(6, notifyApi.routers_updated.call_count) + + def test_floatingips_op_agent(self): + self._test_notify_op_agent(self._test_floatingips_op_agent) + + def test_l3_agent_routers_query_interfaces(self): + with self.router() as r: + with self.port(no_delete=True) as p: + self._router_interface_action('add', + r['router']['id'], + None, + p['port']['id']) + + plugin = TestL3NatPlugin() + routers = plugin.get_sync_data(context.get_admin_context(), + None) + self.assertEqual(1, len(routers)) + interfaces = routers[0][l3_constants.INTERFACE_KEY] + self.assertEqual(1, len(interfaces)) + subnet_id = interfaces[0]['subnet']['id'] + wanted_subnetid = p['port']['fixed_ips'][0]['subnet_id'] + self.assertEqual(wanted_subnetid, subnet_id) + # clean-up + self._router_interface_action('remove', + r['router']['id'], + None, + p['port']['id']) + + def test_l3_agent_routers_query_ignore_interfaces_with_moreThanOneIp(self): + with self.router() as r: + with self.subnet(cidr='9.0.1.0/24') as subnet: + with self.port(subnet=subnet, + no_delete=True, + fixed_ips=[{'ip_address': '9.0.1.3'}]) as p: + self._router_interface_action('add', + r['router']['id'], + None, + p['port']['id']) + port = {'port': {'fixed_ips': + [{'ip_address': '9.0.1.4', + 'subnet_id': subnet['subnet']['id']}, + {'ip_address': '9.0.1.5', + 'subnet_id': subnet['subnet']['id']}]}} + plugin = TestL3NatPlugin() + ctx = context.get_admin_context() + plugin.update_port(ctx, p['port']['id'], port) + routers = plugin.get_sync_data(ctx, None) + self.assertEqual(1, len(routers)) + interfaces = routers[0].get(l3_constants.INTERFACE_KEY, []) + self.assertEqual(0, len(interfaces)) + # clean-up + self._router_interface_action('remove', + r['router']['id'], + None, + p['port']['id']) + + def test_l3_agent_routers_query_gateway(self): + with self.router() as r: + with self.subnet() as s: + self._set_net_external(s['subnet']['network_id']) + self._add_external_gateway_to_router( + r['router']['id'], + s['subnet']['network_id']) + plugin = TestL3NatPlugin() + routers = plugin.get_sync_data(context.get_admin_context(), + [r['router']['id']]) + self.assertEqual(1, len(routers)) + gw_port = routers[0]['gw_port'] + self.assertEquals(s['subnet']['id'], gw_port['subnet']['id']) + self._remove_external_gateway_from_router( + r['router']['id'], + s['subnet']['network_id']) + + def test_l3_agent_routers_query_floatingips(self): + with self.floatingip_with_assoc() as fip: + plugin = TestL3NatPlugin() + routers = plugin.get_sync_data(context.get_admin_context(), + [fip['floatingip']['router_id']]) + self.assertEqual(1, len(routers)) + floatingips = routers[0][l3_constants.FLOATINGIP_KEY] + self.assertEqual(1, len(floatingips)) + self.assertEquals(floatingips[0]['id'], + fip['floatingip']['id']) + self.assertEquals(floatingips[0]['port_id'], + fip['floatingip']['port_id']) + self.assertTrue(floatingips[0]['fixed_ip_address'] is not None) + self.assertTrue(floatingips[0]['router_id'] is not None)