l3 agent rpc

On one hand, we sync router data (including routers,
their gw ports, interfaces and floatingips) from l3_agent
to quantum server periodically if needed.

On the other hand, we notify l3 agent from quantum server when
we delete or update a router's stuff, such as floating IP,
interface and gwport and router itself.

blueprint rpc-for-l3-agent
bug #1080286

Change-Id: I60f3081975fc7164b22f9e9fa941e702a3f4c663
This commit is contained in:
gongysh 2012-11-12 20:28:16 +08:00
parent 4ec139ef12
commit b836e71eb1
20 changed files with 975 additions and 266 deletions

View File

@ -10,13 +10,6 @@ interface_driver = quantum.agent.linux.interface.OVSInterfaceDriver
# LinuxBridge # LinuxBridge
#interface_driver = quantum.agent.linux.interface.BridgeInterfaceDriver #interface_driver = quantum.agent.linux.interface.BridgeInterfaceDriver
# The Quantum user information for accessing the Quantum API.
auth_url = http://localhost:35357/v2.0
auth_region = RegionOne
admin_tenant_name = %SERVICE_TENANT_NAME%
admin_user = %SERVICE_USER%
admin_password = %SERVICE_PASSWORD%
# Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real # Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real
# root filter facility. # root filter facility.
# Change to "sudo" to skip the filtering and just run the comand directly # Change to "sudo" to skip the filtering and just run the comand directly
@ -54,9 +47,13 @@ root_helper = sudo
# TCP Port used by Quantum metadata server # TCP Port used by Quantum metadata server
# metadata_port = 9697 # metadata_port = 9697
# The time in seconds between state poll requests
# polling_interval = 3
# Send this many gratuitous ARPs for HA setup. Set it below or equal to 0 # Send this many gratuitous ARPs for HA setup. Set it below or equal to 0
# to disable this feature. # to disable this feature.
# send_arp_for_ha = 3 # send_arp_for_ha = 3
# seconds between re-sync routers' data if needed
# periodic_interval = 40
# seconds to start to sync routers' data after
# starting agent
# periodic_fuzzy_delay = 5

View File

@ -1,5 +1,5 @@
[DEFAULT] [DEFAULT]
# The list of modules to copy from openstack-common # The list of modules to copy from openstack-common
modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,policy,rpc,service,setup,threadgroup,timeutils,uuidutils,version modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,periodic_task,policy,rpc,service,setup,threadgroup,timeutils,uuidutils,version
# The base module to hold the copy of openstack.common # The base module to hold the copy of openstack.common
base=quantum base=quantum

View File

@ -20,8 +20,9 @@
""" """
import sys import sys
import time
import eventlet
from eventlet import semaphore
import netaddr import netaddr
from quantum.agent.common import config from quantum.agent.common import config
@ -30,11 +31,19 @@ from quantum.agent.linux import interface
from quantum.agent.linux import ip_lib from quantum.agent.linux import ip_lib
from quantum.agent.linux import iptables_manager from quantum.agent.linux import iptables_manager
from quantum.agent.linux import utils from quantum.agent.linux import utils
from quantum.db import l3_db from quantum.common import constants as l3_constants
from quantum.common import topics
from quantum import context
from quantum import manager
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common import importutils from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantumclient.v2_0 import client from quantum.openstack.common import periodic_task
from quantum.openstack.common.rpc import common as rpc_common
from quantum.openstack.common.rpc import proxy
from quantum.openstack.common import service
from quantum import service as quantum_service
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
NS_PREFIX = 'qrouter-' NS_PREFIX = 'qrouter-'
@ -42,16 +51,53 @@ INTERNAL_DEV_PREFIX = 'qr-'
EXTERNAL_DEV_PREFIX = 'qg-' EXTERNAL_DEV_PREFIX = 'qg-'
class L3PluginApi(proxy.RpcProxy):
"""Agent side of the l3 agent RPC API.
API version history:
1.0 - Initial version.
"""
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic, host):
super(L3PluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.host = host
def get_routers(self, context, fullsync=True, router_id=None):
"""Make a remote process call to retrieve the sync data for routers."""
router_ids = [router_id] if router_id else None
return self.call(context,
self.make_msg('sync_routers', host=self.host,
fullsync=fullsync,
router_ids=router_ids),
topic=self.topic)
def get_external_network_id(self, context):
"""Make a remote process call to retrieve the external network id.
@raise common.RemoteError: with TooManyExternalNetworks
as exc_type if there are
more than one external network
"""
return self.call(context,
self.make_msg('get_external_network_id',
host=self.host),
topic=self.topic)
class RouterInfo(object): class RouterInfo(object):
def __init__(self, router_id, root_helper, use_namespaces): def __init__(self, router_id, root_helper, use_namespaces, router=None):
self.router_id = router_id self.router_id = router_id
self.ex_gw_port = None self.ex_gw_port = None
self.internal_ports = [] self.internal_ports = []
self.floating_ips = [] self.floating_ips = []
self.root_helper = root_helper self.root_helper = root_helper
self.use_namespaces = use_namespaces self.use_namespaces = use_namespaces
self.router = router
self.iptables_manager = iptables_manager.IptablesManager( self.iptables_manager = iptables_manager.IptablesManager(
root_helper=root_helper, root_helper=root_helper,
#FIXME(danwent): use_ipv6=True, #FIXME(danwent): use_ipv6=True,
@ -62,23 +108,14 @@ class RouterInfo(object):
return NS_PREFIX + self.router_id return NS_PREFIX + self.router_id
class L3NATAgent(object): class L3NATAgent(manager.Manager):
OPTS = [ OPTS = [
cfg.StrOpt('admin_user'),
cfg.StrOpt('admin_password'),
cfg.StrOpt('admin_tenant_name'),
cfg.StrOpt('auth_url'),
cfg.StrOpt('auth_strategy', default='keystone'),
cfg.StrOpt('auth_region'),
cfg.StrOpt('root_helper', default='sudo'), cfg.StrOpt('root_helper', default='sudo'),
cfg.StrOpt('external_network_bridge', default='br-ex', cfg.StrOpt('external_network_bridge', default='br-ex',
help="Name of bridge used for external network traffic."), help="Name of bridge used for external network traffic."),
cfg.StrOpt('interface_driver', cfg.StrOpt('interface_driver',
help="The driver used to manage the virtual interface."), help="The driver used to manage the virtual interface."),
cfg.IntOpt('polling_interval',
default=3,
help="The time in seconds between state poll requests."),
cfg.IntOpt('metadata_port', cfg.IntOpt('metadata_port',
default=9697, default=9697,
help="TCP Port used by Quantum metadata namespace proxy."), help="TCP Port used by Quantum metadata namespace proxy."),
@ -97,36 +134,33 @@ class L3NATAgent(object):
cfg.StrOpt('gateway_external_network_id', default='', cfg.StrOpt('gateway_external_network_id', default='',
help="UUID of external network for routers implemented " help="UUID of external network for routers implemented "
"by the agents."), "by the agents."),
cfg.StrOpt('l3_agent_manager',
default='quantum.agent.l3_agent.L3NATAgent'),
] ]
def __init__(self, conf): def __init__(self, host, conf=None):
self.conf = conf if conf:
self.conf = conf
else:
self.conf = cfg.CONF
self.router_info = {} self.router_info = {}
if not conf.interface_driver: if not self.conf.interface_driver:
LOG.error(_('You must specify an interface driver')) LOG.error(_('An interface driver must be specified'))
sys.exit(1) sys.exit(1)
try: try:
self.driver = importutils.import_object(conf.interface_driver, self.driver = importutils.import_object(self.conf.interface_driver,
conf) self.conf)
except: except:
LOG.exception(_("Error importing interface driver '%s'"), LOG.exception(_("Error importing interface driver '%s'"
conf.interface_driver) % self.conf.interface_driver))
sys.exit(1) sys.exit(1)
self.plugin_rpc = L3PluginApi(topics.PLUGIN, host)
self.polling_interval = conf.polling_interval self.fullsync = True
self.sync_sem = semaphore.Semaphore(1)
self.qclient = client.Client(
username=self.conf.admin_user,
password=self.conf.admin_password,
tenant_name=self.conf.admin_tenant_name,
auth_url=self.conf.auth_url,
auth_strategy=self.conf.auth_strategy,
region_name=self.conf.auth_region
)
if self.conf.use_namespaces: if self.conf.use_namespaces:
self._destroy_all_router_namespaces() self._destroy_all_router_namespaces()
super(L3NATAgent, self).__init__(host=self.conf.host)
def _destroy_all_router_namespaces(self): def _destroy_all_router_namespaces(self):
"""Destroy all router namespaces on the host to eliminate """Destroy all router namespaces on the host to eliminate
@ -138,7 +172,7 @@ class L3NATAgent(object):
try: try:
self._destroy_router_namespace(ns) self._destroy_router_namespace(ns)
except: except:
LOG.exception(_("Couldn't delete namespace '%s'"), ns) LOG.exception(_("Failed deleting namespace '%s'") % ns)
def _destroy_router_namespace(self, namespace): def _destroy_router_namespace(self, namespace):
ns_ip = ip_lib.IPWrapper(self.conf.root_helper, ns_ip = ip_lib.IPWrapper(self.conf.root_helper,
@ -160,81 +194,25 @@ class L3NATAgent(object):
ip_wrapper = ip_wrapper_root.ensure_namespace(ri.ns_name()) ip_wrapper = ip_wrapper_root.ensure_namespace(ri.ns_name())
ip_wrapper.netns.execute(['sysctl', '-w', 'net.ipv4.ip_forward=1']) ip_wrapper.netns.execute(['sysctl', '-w', 'net.ipv4.ip_forward=1'])
def daemon_loop(self):
#TODO(danwent): this simple diff logic does not handle if
# details of a router port (e.g., IP, mac) are changed behind
# our back. Will fix this properly with update notifications.
while True:
try:
self.do_single_loop()
except:
LOG.exception(_("Error running l3_nat daemon_loop"))
time.sleep(self.polling_interval)
def _fetch_external_net_id(self): def _fetch_external_net_id(self):
"""Find UUID of single external network for this agent""" """Find UUID of single external network for this agent"""
if self.conf.gateway_external_network_id: if self.conf.gateway_external_network_id:
return self.conf.gateway_external_network_id return self.conf.gateway_external_network_id
try:
params = {'router:external': True} return self.plugin_rpc.get_external_network_id(
ex_nets = self.qclient.list_networks(**params)['networks'] context.get_admin_context())
if len(ex_nets) > 1: except rpc_common.RemoteError as e:
raise Exception(_("Must configure 'gateway_external_network_id' " if e.exc_type == 'TooManyExternalNetworks':
"if Quantum has more than one external " msg = _(
"network.")) "The 'gateway_external_network_id' must be configured"
if len(ex_nets) == 0: " if Quantum has more than one external network.")
return None raise Exception(msg)
return ex_nets[0]['id']
def do_single_loop(self):
if (self.conf.external_network_bridge and
not ip_lib.device_exists(self.conf.external_network_bridge)):
LOG.error(_("External network bridge '%s' does not exist"),
self.conf.external_network_bridge)
return
prev_router_ids = set(self.router_info)
cur_router_ids = set()
target_ex_net_id = self._fetch_external_net_id()
# identify and update new or modified routers
for r in self.qclient.list_routers()['routers']:
if not r['admin_state_up']:
continue
ex_net_id = (r['external_gateway_info'] and
r['external_gateway_info'].get('network_id'))
if not ex_net_id and not self.conf.handle_internal_only_routers:
continue
if ex_net_id and ex_net_id != target_ex_net_id:
continue
# If namespaces are disabled, only process the router associated
# with the configured agent id.
if (self.conf.use_namespaces or
r['id'] == self.conf.router_id):
cur_router_ids.add(r['id'])
else: else:
continue raise
if r['id'] not in self.router_info:
self._router_added(r['id'])
ri = self.router_info[r['id']] def _router_added(self, router_id, router=None):
self.process_router(ri)
# identify and remove routers that no longer exist
for router_id in prev_router_ids - cur_router_ids:
self._router_removed(router_id)
prev_router_ids = cur_router_ids
def _router_added(self, router_id):
ri = RouterInfo(router_id, self.conf.root_helper, ri = RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces) self.conf.use_namespaces, router)
self.router_info[router_id] = ri self.router_info[router_id] = ri
if self.conf.use_namespaces: if self.conf.use_namespaces:
self._create_router_namespace(ri) self._create_router_namespace(ri)
@ -283,20 +261,15 @@ class L3NATAgent(object):
if not ips: if not ips:
raise Exception(_("Router port %s has no IP address") % port['id']) raise Exception(_("Router port %s has no IP address") % port['id'])
if len(ips) > 1: if len(ips) > 1:
LOG.error(_("Ignoring multiple IPs on router port %s"), port['id']) LOG.error(_("Ignoring multiple IPs on router port %s") %
port['subnet'] = self.qclient.show_subnet( port['id'])
ips[0]['subnet_id'])['subnet']
prefixlen = netaddr.IPNetwork(port['subnet']['cidr']).prefixlen prefixlen = netaddr.IPNetwork(port['subnet']['cidr']).prefixlen
port['ip_cidr'] = "%s/%s" % (ips[0]['ip_address'], prefixlen) port['ip_cidr'] = "%s/%s" % (ips[0]['ip_address'], prefixlen)
def process_router(self, ri): def process_router(self, ri):
ex_gw_port = self._get_ex_gw_port(ri) ex_gw_port = self._get_ex_gw_port(ri)
internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
internal_ports = self.qclient.list_ports(
device_id=ri.router_id,
device_owner=l3_db.DEVICE_OWNER_ROUTER_INTF)['ports']
existing_port_ids = set([p['id'] for p in ri.internal_ports]) existing_port_ids = set([p['id'] for p in ri.internal_ports])
current_port_ids = set([p['id'] for p in internal_ports current_port_ids = set([p['id'] for p in internal_ports
if p['admin_state_up']]) if p['admin_state_up']])
@ -333,8 +306,7 @@ class L3NATAgent(object):
ri.ex_gw_port = ex_gw_port ri.ex_gw_port = ex_gw_port
def process_router_floating_ips(self, ri, ex_gw_port): def process_router_floating_ips(self, ri, ex_gw_port):
floating_ips = self.qclient.list_floatingips( floating_ips = ri.router.get(l3_constants.FLOATINGIP_KEY, [])
router_id=ri.router_id)['floatingips']
existing_floating_ip_ids = set([fip['id'] for fip in ri.floating_ips]) existing_floating_ip_ids = set([fip['id'] for fip in ri.floating_ips])
cur_floating_ip_ids = set([fip['id'] for fip in floating_ips]) cur_floating_ip_ids = set([fip['id'] for fip in floating_ips])
@ -375,16 +347,7 @@ class L3NATAgent(object):
ri.floating_ips.append(new_fip) ri.floating_ips.append(new_fip)
def _get_ex_gw_port(self, ri): def _get_ex_gw_port(self, ri):
ports = self.qclient.list_ports( return ri.router.get('gw_port')
device_id=ri.router_id,
device_owner=l3_db.DEVICE_OWNER_ROUTER_GW)['ports']
if not ports:
return None
elif len(ports) == 1:
return ports[0]
else:
LOG.error(_("Ignoring multiple gateway ports for router %s"),
ri.router_id)
def _send_gratuitous_arp_packet(self, ri, interface_name, ip_address): def _send_gratuitous_arp_packet(self, ri, interface_name, ip_address):
if self.conf.send_arp_for_ha > 0: if self.conf.send_arp_for_ha > 0:
@ -562,14 +525,94 @@ class L3NATAgent(object):
('float-snat', '-s %s -j SNAT --to %s' % ('float-snat', '-s %s -j SNAT --to %s' %
(fixed_ip, floating_ip))] (fixed_ip, floating_ip))]
def router_deleted(self, context, router_id):
"""Deal with router deletion RPC message."""
with self.sync_sem:
if router_id in self.router_info:
try:
self._router_removed(router_id)
except Exception:
msg = _("Failed dealing with router "
"'%s' deletion RPC message")
LOG.debug(msg, router_id)
self.fullsync = True
def routers_updated(self, context, routers):
"""Deal with routers modification and creation RPC message."""
if not routers:
return
with self.sync_sem:
try:
self._process_routers(routers)
except Exception:
msg = _("Failed dealing with routers update RPC message")
LOG.debug(msg)
self.fullsync = True
def _process_routers(self, routers):
if (self.conf.external_network_bridge and
not ip_lib.device_exists(self.conf.external_network_bridge)):
LOG.error(_("The external network bridge '%s' does not exist")
% self.conf.external_network_bridge)
return
target_ex_net_id = self._fetch_external_net_id()
for r in routers:
if not r['admin_state_up']:
continue
# If namespaces are disabled, only process the router associated
# with the configured agent id.
if (not self.conf.use_namespaces and
r['id'] != self.conf.router_id):
continue
ex_net_id = (r['external_gateway_info'] or {}).get('network_id')
if not ex_net_id and not self.conf.handle_internal_only_routers:
continue
if ex_net_id and ex_net_id != target_ex_net_id:
continue
if r['id'] not in self.router_info:
self._router_added(r['id'])
ri = self.router_info[r['id']]
ri.router = r
self.process_router(ri)
@periodic_task.periodic_task
def _sync_routers_task(self, context):
# we need to sync with router deletion RPC message
with self.sync_sem:
if self.fullsync:
try:
if not self.conf.use_namespaces:
router_id = self.conf.router_id
else:
router_id = None
routers = self.plugin_rpc.get_routers(
context, router_id)
self.router_info = {}
self._process_routers(routers)
self.fullsync = False
except Exception:
LOG.exception(_("Failed synchronizing routers"))
self.fullsync = True
def after_start(self):
LOG.info(_("L3 agent started"))
def main(): def main():
conf = config.setup_conf() eventlet.monkey_patch()
conf = cfg.CONF
conf.register_opts(L3NATAgent.OPTS) conf.register_opts(L3NATAgent.OPTS)
conf.register_opts(interface.OPTS) conf.register_opts(interface.OPTS)
conf.register_opts(external_process.OPTS) conf.register_opts(external_process.OPTS)
conf(sys.argv) conf(sys.argv)
config.setup_logging(conf) config.setup_logging(conf)
server = quantum_service.Service.create(binary='quantum-l3-agent',
mgr = L3NATAgent(conf) topic=topics.L3_AGENT)
mgr.daemon_loop() service.launch(server).wait()

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import socket
import netaddr import netaddr
import webob.exc import webob.exc
@ -54,14 +52,6 @@ FAULT_MAP = {exceptions.NotFound: webob.exc.HTTPNotFound,
QUOTAS = quota.QUOTAS QUOTAS = quota.QUOTAS
def _get_hostname():
return socket.gethostname()
# Register the configuration options
cfg.CONF.register_opt(cfg.StrOpt('host', default=_get_hostname()))
def _fields(request): def _fields(request):
""" """
Extracts the list of fields to return Extracts the list of fields to return

View File

@ -25,6 +25,7 @@ import sys
from paste import deploy from paste import deploy
from quantum.api.v2 import attributes from quantum.api.v2 import attributes
from quantum.common import utils
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.version import version_info as quantum_version from quantum.version import version_info as quantum_version
@ -53,7 +54,8 @@ core_opts = [
cfg.BoolOpt('allow_overlapping_ips', default=False), cfg.BoolOpt('allow_overlapping_ips', default=False),
cfg.StrOpt('control_exchange', cfg.StrOpt('control_exchange',
default='quantum', default='quantum',
help='AMQP exchange to connect to if using RabbitMQ or Qpid') help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.StrOpt('host', default=utils.get_hostname()),
] ]

View File

@ -22,3 +22,9 @@ PORT_STATUS_ACTIVE = 'ACTIVE'
PORT_STATUS_BUILD = 'BUILD' PORT_STATUS_BUILD = 'BUILD'
PORT_STATUS_DOWN = 'DOWN' PORT_STATUS_DOWN = 'DOWN'
PORT_STATUS_ERROR = 'ERROR' PORT_STATUS_ERROR = 'ERROR'
DEVICE_OWNER_ROUTER_INTF = "network:router_interface"
DEVICE_OWNER_ROUTER_GW = "network:router_gateway"
DEVICE_OWNER_FLOATINGIP = "network:floatingip"
FLOATINGIP_KEY = '_floatingips'
INTERFACE_KEY = '_interfaces'

View File

@ -235,3 +235,7 @@ class InvalidSharedSetting(QuantumException):
class InvalidExtenstionEnv(QuantumException): class InvalidExtenstionEnv(QuantumException):
message = _("Invalid extension environment: %(reason)s") message = _("Invalid extension environment: %(reason)s")
class TooManyExternalNetworks(QuantumException):
message = _("More than one external network exists")

View File

@ -25,6 +25,8 @@ AGENT = 'q-agent-notifier'
PLUGIN = 'q-plugin' PLUGIN = 'q-plugin'
DHCP = 'q-dhcp-notifer' DHCP = 'q-dhcp-notifer'
L3_AGENT = 'l3_agent'
def get_topic_name(prefix, table, operation): def get_topic_name(prefix, table, operation):
"""Create a topic name. """Create a topic name.

View File

@ -23,6 +23,7 @@
import os import os
import signal import signal
import socket
from eventlet.green import subprocess from eventlet.green import subprocess
@ -145,3 +146,7 @@ def parse_mappings(mapping_list, unique_values=True):
(value, mapping)) (value, mapping))
mappings[key] = value mappings[key] = value
return mappings return mappings
def get_hostname():
return socket.getfqdn()

View File

@ -25,12 +25,13 @@ from sqlalchemy.sql import expression as expr
import webob.exc as w_exc import webob.exc as w_exc
from quantum.api.v2 import attributes from quantum.api.v2 import attributes
from quantum.common import constants as l3_constants
from quantum.common import exceptions as q_exc from quantum.common import exceptions as q_exc
from quantum.db import db_base_plugin_v2 from quantum.db import db_base_plugin_v2
from quantum.db import l3_rpc_agent_api
from quantum.db import model_base from quantum.db import model_base
from quantum.db import models_v2 from quantum.db import models_v2
from quantum.extensions import l3 from quantum.extensions import l3
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common import uuidutils from quantum.openstack.common import uuidutils
from quantum import policy from quantum import policy
@ -38,17 +39,10 @@ from quantum import policy
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
l3_opts = [
cfg.StrOpt('metadata_ip_address', default='127.0.0.1'),
cfg.IntOpt('metadata_port', default=8775)
]
# Register the configuration options DEVICE_OWNER_ROUTER_INTF = l3_constants.DEVICE_OWNER_ROUTER_INTF
cfg.CONF.register_opts(l3_opts) DEVICE_OWNER_ROUTER_GW = l3_constants.DEVICE_OWNER_ROUTER_GW
DEVICE_OWNER_FLOATINGIP = l3_constants.DEVICE_OWNER_FLOATINGIP
DEVICE_OWNER_ROUTER_INTF = "network:router_interface"
DEVICE_OWNER_ROUTER_GW = "network:router_gateway"
DEVICE_OWNER_FLOATINGIP = "network:floatingip"
class Router(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): class Router(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
@ -164,6 +158,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
# Ensure we actually have something to update # Ensure we actually have something to update
if r.keys(): if r.keys():
router_db.update(r) router_db.update(r)
routers = self.get_sync_data(context.elevated(),
[router_db['id']])
l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
return self._make_router_dict(router_db) return self._make_router_dict(router_db)
def _update_router_gw_info(self, context, router_id, info): def _update_router_gw_info(self, context, router_id, info):
@ -250,6 +247,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
self._delete_port(context.elevated(), ports[0]['id']) self._delete_port(context.elevated(), ports[0]['id'])
context.session.delete(router) context.session.delete(router)
l3_rpc_agent_api.L3AgentNofity.router_deleted(context, id)
def get_router(self, context, id, fields=None): def get_router(self, context, id, fields=None):
router = self._get_router(context, id) router = self._get_router(context, id)
@ -324,9 +322,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
self._check_for_dup_router_subnet(context, router_id, self._check_for_dup_router_subnet(context, router_id,
port['network_id'], port['network_id'],
fixed_ips[0]['subnet_id']) fixed_ips[0]['subnet_id'])
with context.session.begin(subtransactions=True): port.update({'device_id': router_id,
port.update({'device_id': router_id, 'device_owner': DEVICE_OWNER_ROUTER_INTF})
'device_owner': DEVICE_OWNER_ROUTER_INTF})
elif 'subnet_id' in interface_info: elif 'subnet_id' in interface_info:
subnet_id = interface_info['subnet_id'] subnet_id = interface_info['subnet_id']
subnet = self._get_subnet(context, subnet_id) subnet = self._get_subnet(context, subnet_id)
@ -348,6 +345,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
'device_id': router_id, 'device_id': router_id,
'device_owner': DEVICE_OWNER_ROUTER_INTF, 'device_owner': DEVICE_OWNER_ROUTER_INTF,
'name': ''}}) 'name': ''}})
routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
return {'port_id': port['id'], return {'port_id': port['id'],
'subnet_id': port['fixed_ips'][0]['subnet_id']} 'subnet_id': port['fixed_ips'][0]['subnet_id']}
@ -423,6 +423,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
raise w_exc.HTTPNotFound("Router %(router_id)s has no " raise w_exc.HTTPNotFound("Router %(router_id)s has no "
"interface on subnet %(subnet_id)s" "interface on subnet %(subnet_id)s"
% locals()) % locals())
routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
def _get_floatingip(self, context, id): def _get_floatingip(self, context, id):
try: try:
@ -621,7 +623,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
except Exception: except Exception:
LOG.exception(_("Floating IP association failed")) LOG.exception(_("Floating IP association failed"))
raise raise
router_id = floatingip_db['router_id']
if router_id:
routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
return self._make_floatingip_dict(floatingip_db) return self._make_floatingip_dict(floatingip_db)
def update_floatingip(self, context, id, floatingip): def update_floatingip(self, context, id, floatingip):
@ -631,18 +636,32 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
fip['tenant_id'] = floatingip_db['tenant_id'] fip['tenant_id'] = floatingip_db['tenant_id']
fip['id'] = id fip['id'] = id
fip_port_id = floatingip_db['floating_port_id'] fip_port_id = floatingip_db['floating_port_id']
before_router_id = floatingip_db['router_id']
self._update_fip_assoc(context, fip, floatingip_db, self._update_fip_assoc(context, fip, floatingip_db,
self.get_port(context.elevated(), self.get_port(context.elevated(),
fip_port_id)) fip_port_id))
router_ids = []
if before_router_id:
router_ids.append(before_router_id)
router_id = floatingip_db['router_id']
if router_id and router_id != before_router_id:
router_ids.append(router_id)
if router_ids:
routers = self.get_sync_data(context.elevated(), router_ids)
l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
return self._make_floatingip_dict(floatingip_db) return self._make_floatingip_dict(floatingip_db)
def delete_floatingip(self, context, id): def delete_floatingip(self, context, id):
floatingip = self._get_floatingip(context, id) floatingip = self._get_floatingip(context, id)
router_id = floatingip['router_id']
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
context.session.delete(floatingip) context.session.delete(floatingip)
self.delete_port(context.elevated(), self.delete_port(context.elevated(),
floatingip['floating_port_id'], floatingip['floating_port_id'],
l3_port_check=False) l3_port_check=False)
if router_id:
routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
def get_floatingip(self, context, id, fields=None): def get_floatingip(self, context, id, fields=None):
floatingip = self._get_floatingip(context, id) floatingip = self._get_floatingip(context, id)
@ -677,6 +696,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
try: try:
fip_qry = context.session.query(FloatingIP) fip_qry = context.session.query(FloatingIP)
floating_ip = fip_qry.filter_by(fixed_port_id=port_id).one() floating_ip = fip_qry.filter_by(fixed_port_id=port_id).one()
router_id = floating_ip['router_id']
floating_ip.update({'fixed_port_id': None, floating_ip.update({'fixed_port_id': None,
'fixed_ip_address': None, 'fixed_ip_address': None,
'router_id': None}) 'router_id': None})
@ -686,6 +706,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
# should never happen # should never happen
raise Exception('Multiple floating IPs found for port %s' raise Exception('Multiple floating IPs found for port %s'
% port_id) % port_id)
if router_id:
routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
def _check_l3_view_auth(self, context, network): def _check_l3_view_auth(self, context, network):
return policy.check(context, return policy.check(context,
@ -761,3 +784,137 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
return [n for n in nets if n['id'] in ext_nets] return [n for n in nets if n['id'] in ext_nets]
else: else:
return [n for n in nets if n['id'] not in ext_nets] return [n for n in nets if n['id'] not in ext_nets]
def _get_sync_routers(self, context, router_ids=None):
"""Query routers and their gw ports for l3 agent.
Query routers with the router_ids. The gateway ports, if any,
will be queried too.
l3 agent has an option to deal with only one router id. In addition,
when we need to notify the agent the data about only one router
(when modification of router, its interfaces, gw_port and floatingips),
we will have router_ids.
@param router_ids: the list of router ids which we want to query.
if it is None, all of routers will be queried.
@return: a list of dicted routers with dicted gw_port populated if any
"""
router_query = context.session.query(Router)
if router_ids:
router_query = router_query.filter(Router.id.in_(router_ids))
routers = router_query.all()
gw_port_ids = []
if not routers:
return []
for router in routers:
gw_port_id = router.gw_port_id
if gw_port_id:
gw_port_ids.append(gw_port_id)
gw_ports = []
if gw_port_ids:
gw_ports = self._get_sync_gw_ports(context, gw_port_ids)
gw_port_id_gw_port_dict = {}
for gw_port in gw_ports:
gw_port_id_gw_port_dict[gw_port['id']] = gw_port
router_id_gw_port_id_dict = {}
for router in routers:
router_id_gw_port_id_dict[router.id] = router.gw_port_id
routers_list = [self._make_router_dict(c, None) for c in routers]
for router in routers_list:
gw_port_id = router_id_gw_port_id_dict[router['id']]
if gw_port_id:
router['gw_port'] = gw_port_id_gw_port_dict[gw_port_id]
return routers_list
def _get_sync_floating_ips(self, context, router_ids):
"""Query floating_ips that relate to list of router_ids."""
if not router_ids:
return []
return self.get_floatingips(context, {'router_id': router_ids})
def _get_sync_gw_ports(self, context, gw_port_ids):
if not gw_port_ids:
return []
filters = {'id': gw_port_ids}
gw_ports = self.get_ports(context, filters)
if gw_ports:
self._populate_subnet_for_ports(context, gw_ports)
return gw_ports
def _get_sync_interfaces(self, context, router_ids):
"""Query router interfaces that relate to list of router_ids."""
if not router_ids:
return []
filters = {'device_id': router_ids,
'device_owner': [DEVICE_OWNER_ROUTER_INTF]}
interfaces = self.get_ports(context, filters)
if interfaces:
self._populate_subnet_for_ports(context, interfaces)
return interfaces
def _populate_subnet_for_ports(self, context, ports):
"""Populate ports with subnet.
These ports already have fixed_ips populated.
"""
if not ports:
return
subnet_id_ports_dict = {}
for port in ports:
fixed_ips = port.get('fixed_ips', [])
if len(fixed_ips) > 1:
LOG.error(_("Ignoring multiple IPs on router port %s") %
port['id'])
ports.remove(port)
continue
# Empty fixed_ips should not happen
fixed_ip = fixed_ips[0]
my_ports = subnet_id_ports_dict.get(fixed_ip['subnet_id'], [])
my_ports.append(port)
subnet_id_ports_dict[fixed_ip['subnet_id']] = my_ports
filters = {'id': subnet_id_ports_dict.keys()}
fields = ['id', 'cidr', 'gateway_ip']
subnet_dicts = self.get_subnets(context, filters, fields)
for subnet_dict in subnet_dicts:
ports = subnet_id_ports_dict.get(subnet_dict['id'], [])
for port in ports:
# TODO(gongysh) stash the subnet into fixed_ips
# to make the payload smaller.
port['subnet'] = {'id': subnet_dict['id'],
'cidr': subnet_dict['cidr'],
'gateway_ip': subnet_dict['gateway_ip']}
def _process_sync_data(self, routers, interfaces, floating_ips):
routers_dict = {}
for router in routers:
routers_dict[router['id']] = router
for floating_ip in floating_ips:
router = routers_dict.get(floating_ip['router_id'])
if router:
router_floatingips = router.get(l3_constants.FLOATINGIP_KEY,
[])
router_floatingips.append(floating_ip)
router[l3_constants.FLOATINGIP_KEY] = router_floatingips
for interface in interfaces:
router = routers_dict.get(interface['device_id'])
if router:
router_interfaces = router.get(l3_constants.INTERFACE_KEY, [])
router_interfaces.append(interface)
router[l3_constants.INTERFACE_KEY] = router_interfaces
return routers_dict.values()
def get_sync_data(self, context, router_ids=None):
"""Query routers and their related floating_ips, interfaces."""
with context.session.begin(subtransactions=True):
routers = self._get_sync_routers(context,
router_ids)
router_ids = [router['id'] for router in routers]
floating_ips = self._get_sync_floating_ips(context, router_ids)
interfaces = self._get_sync_interfaces(context, router_ids)
return self._process_sync_data(routers, interfaces, floating_ips)
def get_external_network_id(self, context):
nets = self.get_networks(context, {'router:external': [True]})
if len(nets) > 1:
raise q_exc.TooManyExternalNetworks()
else:
return nets[0]['id'] if nets else None

View File

@ -0,0 +1,50 @@
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from quantum.common import topics
from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import proxy
LOG = logging.getLogger(__name__)
class L3AgentNotifyAPI(proxy.RpcProxy):
"""API for plugin to notify L3 agent."""
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=topics.L3_AGENT):
super(L3AgentNotifyAPI, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def router_deleted(self, context, router_id):
LOG.debug(_('Nofity agent the router %s is deleted'), router_id)
self.cast(context,
self.make_msg('router_deleted',
router_id=router_id),
topic=self.topic)
def routers_updated(self, context, routers):
if routers:
LOG.debug(_('Nofity agent routers were updated:\n %s'),
jsonutils.dumps(routers, indent=5))
self.cast(context,
self.make_msg('routers_updated',
routers=routers),
topic=self.topic)
L3AgentNofity = L3AgentNotifyAPI()

56
quantum/db/l3_rpc_base.py Normal file
View File

@ -0,0 +1,56 @@
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from quantum import context as quantum_context
from quantum import manager
from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class L3RpcCallbackMixin(object):
"""A mix-in that enable L3 agent rpc support in plugin implementations."""
def sync_routers(self, context, **kwargs):
"""Sync routers according to filters to a specific agent.
@param context: contain user information
@param kwargs: host, or router_id
@return: a list of routers
with their interfaces and floating_ips
"""
router_id = kwargs.get('router_id')
# TODO(gongysh) we will use host in kwargs for multi host BP
context = quantum_context.get_admin_context()
plugin = manager.QuantumManager.get_plugin()
routers = plugin.get_sync_data(context, router_id)
LOG.debug(_("Routers returned to l3 agent:\n %s"),
jsonutils.dumps(routers, indent=5))
return routers
def get_external_network_id(self, context, **kwargs):
"""Get one external network id for l3 agent.
l3 agent expects only on external network when it performs
this query.
"""
context = quantum_context.get_admin_context()
plugin = manager.QuantumManager.get_plugin()
net_id = plugin.get_external_network_id(context)
LOG.debug(_("External network ID returned to l3 agent: %s"),
net_id)
return net_id

View File

@ -16,25 +16,54 @@
# under the License. # under the License.
# @author: Somik Behera, Nicira Networks, Inc. # @author: Somik Behera, Nicira Networks, Inc.
"""
Quantum's Manager class is responsible for parsing a config file and
instantiating the correct plugin that concretely implement quantum_plugin_base
class.
The caller should make sure that QuantumManager is a singleton.
"""
from quantum.common.exceptions import ClassNotFound from quantum.common.exceptions import ClassNotFound
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common import importutils from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common import periodic_task
from quantum.plugins.common import constants from quantum.plugins.common import constants
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class QuantumManager(object): class Manager(periodic_task.PeriodicTasks):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
def __init__(self, host=None):
if not host:
host = cfg.CONF.host
self.host = host
super(Manager, self).__init__()
def periodic_tasks(self, context, raise_on_error=False):
self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self):
"""Handle initialization if this is a standalone service.
Child classes should override this method.
"""
pass
def after_start(self):
"""Handler post initialization stuff.
Child classes can override this method.
"""
pass
class QuantumManager(object):
"""
Quantum's Manager class is responsible for parsing a config file and
instantiating the correct plugin that concretely implement
quantum_plugin_base class.
The caller should make sure that QuantumManager is a singleton.
"""
_instance = None _instance = None
def __init__(self, options=None, config_file=None): def __init__(self, options=None, config_file=None):

View File

@ -0,0 +1,111 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def periodic_task(*args, **kwargs):
"""Decorator to indicate that a method is a periodic task.
This decorator can be used in two ways:
1. Without arguments '@periodic_task', this will be run on every tick
of the periodic scheduler.
2. With arguments, @periodic_task(ticks_between_runs=N), this will be
run on every N ticks of the periodic scheduler.
"""
def decorator(f):
f._periodic_task = True
f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
return f
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
# and without parens.
#
# In the 'with-parens' case (with kwargs present), this function needs to
# return a decorator function since the interpreter will invoke it like:
#
# periodic_task(*args, **kwargs)(f)
#
# In the 'without-parens' case, the original function will be passed
# in as the first argument, like:
#
# periodic_task(f)
if kwargs:
return decorator
else:
return decorator(args[0])
class _PeriodicTasksMeta(type):
def __init__(cls, names, bases, dict_):
"""Metaclass that allows us to collect decorated periodic tasks."""
super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
# NOTE(sirp): if the attribute is not present then we must be the base
# class, so, go ahead and initialize it. If the attribute is present,
# then we're a subclass so make a copy of it so we don't step on our
# parent's toes.
try:
cls._periodic_tasks = cls._periodic_tasks[:]
except AttributeError:
cls._periodic_tasks = []
try:
cls._ticks_to_skip = cls._ticks_to_skip.copy()
except AttributeError:
cls._ticks_to_skip = {}
# This uses __dict__ instead of
# inspect.getmembers(cls, inspect.ismethod) so only the methods of the
# current class are added when this class is scanned, and base classes
# are not added redundantly.
for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):
task = value
name = task.__name__
cls._periodic_tasks.append((name, task))
cls._ticks_to_skip[name] = task._ticks_between_runs
class PeriodicTasks(object):
__metaclass__ = _PeriodicTasksMeta
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
for task_name, task in self._periodic_tasks:
full_task_name = '.'.join([self.__class__.__name__, task_name])
ticks_to_skip = self._ticks_to_skip[task_name]
if ticks_to_skip > 0:
LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
" ticks left until next run"), locals())
self._ticks_to_skip[task_name] -= 1
continue
self._ticks_to_skip[task_name] = task._ticks_between_runs
LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
try:
task(self, context)
except Exception as e:
if raise_on_error:
raise
LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
locals())

View File

@ -24,6 +24,7 @@ from quantum.db import api as db_api
from quantum.db import db_base_plugin_v2 from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base from quantum.db import dhcp_rpc_base
from quantum.db import l3_db from quantum.db import l3_db
from quantum.db import l3_rpc_base
from quantum.extensions import providernet as provider from quantum.extensions import providernet as provider
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
@ -37,7 +38,8 @@ from quantum import policy
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin): class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin):
# Set RPC API version to 1.0 by default. # Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0' RPC_API_VERSION = '1.0'

View File

@ -30,6 +30,7 @@ from quantum.common import topics
from quantum.db import db_base_plugin_v2 from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base from quantum.db import dhcp_rpc_base
from quantum.db import l3_db from quantum.db import l3_db
from quantum.db import l3_rpc_base
from quantum.extensions import providernet as provider from quantum.extensions import providernet as provider
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
@ -44,7 +45,8 @@ from quantum import policy
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin): class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin):
# Set RPC API version to 1.0 by default. # Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0' RPC_API_VERSION = '1.0'

View File

@ -15,14 +15,39 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import inspect
import logging as std_logging import logging as std_logging
import os
import random
from quantum.common import config from quantum.common import config
from quantum import context
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common import loopingcall
from quantum.openstack.common.rpc import service
from quantum import wsgi from quantum import wsgi
LOG = logging.getLogger(__name__)
service_opts = [
cfg.IntOpt('report_interval',
default=10,
help='seconds between nodes reporting state to datastore'),
cfg.IntOpt('periodic_interval',
default=40,
help='seconds between running periodic tasks'),
cfg.IntOpt('periodic_fuzzy_delay',
default=5,
help='range of seconds to randomly delay when starting the'
' periodic task scheduler to reduce stampeding.'
' (Disable by setting to 0)'),
]
CONF = cfg.CONF
CONF.register_opts(service_opts)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -91,3 +116,120 @@ def _run_wsgi(app_name):
{'host': cfg.CONF.bind_host, {'host': cfg.CONF.bind_host,
'port': cfg.CONF.bind_port}) 'port': cfg.CONF.bind_port})
return server return server
class Service(service.Service):
"""Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
on topic. It also periodically runs tasks on the manager."""
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
*args, **kwargs):
self.binary = binary
self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=host, *args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
super(Service, self).__init__(host, topic, manager=self.manager)
def start(self):
self.manager.init_host()
super(Service, self).start()
if self.report_interval:
pulse = loopingcall.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval,
initial_delay=self.report_interval)
self.timers.append(pulse)
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = loopingcall.LoopingCall(self.periodic_tasks)
periodic.start(interval=self.periodic_interval,
initial_delay=initial_delay)
self.timers.append(periodic)
self.manager.after_start()
def __getattr__(self, key):
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'nova-' part
:param manager: defaults to CONF.<topic>_manager
:param report_interval: defaults to CONF.report_interval
:param periodic_interval: defaults to CONF.periodic_interval
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
"""
if not host:
host = CONF.host
if not binary:
binary = os.path.basename(inspect.stack()[-1][1])
if not topic:
topic = binary.rpartition('quantum-')[2]
topic = topic.replace("-", "_")
if not manager:
manager = CONF.get('%s_manager' % topic, None)
if report_interval is None:
report_interval = CONF.report_interval
if periodic_interval is None:
periodic_interval = CONF.periodic_interval
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay)
return service_obj
def kill(self):
"""Destroy the service object."""
self.stop()
def stop(self):
super(Service, self).stop()
for x in self.timers:
try:
x.stop()
except Exception:
LOG.exception("exception occurs when timer stops")
pass
self.timers = []
def wait(self):
super(Service, self).wait()
for x in self.timers:
try:
x.wait()
except Exception:
LOG.exception("exception occurs when waiting for timer")
pass
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
def report_state(self):
"""Update the state of this service."""
# Todo(gongysh) report state to quantum server
pass

View File

@ -445,7 +445,7 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
self._delete('subnets', subnet['subnet']['id']) self._delete('subnets', subnet['subnet']['id'])
@contextlib.contextmanager @contextlib.contextmanager
def port(self, subnet=None, fixed_ips=None, fmt='json', no_delete=False, def port(self, subnet=None, fmt='json', no_delete=False,
**kwargs): **kwargs):
if not subnet: if not subnet:
with self.subnet() as subnet: with self.subnet() as subnet:

View File

@ -16,25 +16,27 @@
# under the License. # under the License.
import copy import copy
import time import unittest2
import unittest
import mock import mock
from quantum.agent.common import config
from quantum.agent import l3_agent from quantum.agent import l3_agent
from quantum.agent.linux import interface from quantum.agent.linux import interface
from quantum.db import l3_db from quantum.common import config as base_config
from quantum.common import constants as l3_constants
from quantum.openstack.common import cfg
from quantum.openstack.common import uuidutils from quantum.openstack.common import uuidutils
_uuid = uuidutils.generate_uuid _uuid = uuidutils.generate_uuid
HOSTNAME = 'myhost'
class TestBasicRouterOperations(unittest.TestCase): class TestBasicRouterOperations(unittest2.TestCase):
def setUp(self): def setUp(self):
self.conf = config.setup_conf() self.conf = cfg.CommonConfigOpts()
self.conf.register_opts(base_config.core_opts)
self.conf.register_opts(l3_agent.L3NATAgent.OPTS) self.conf.register_opts(l3_agent.L3NATAgent.OPTS)
self.conf.register_opts(interface.OPTS) self.conf.register_opts(interface.OPTS)
self.conf.set_override('interface_driver', self.conf.set_override('interface_driver',
@ -65,14 +67,15 @@ class TestBasicRouterOperations(unittest.TestCase):
self.mock_ip = mock.MagicMock() self.mock_ip = mock.MagicMock()
ip_cls.return_value = self.mock_ip ip_cls.return_value = self.mock_ip
self.client_cls_p = mock.patch('quantumclient.v2_0.client.Client') self.l3pluginApi_cls_p = mock.patch(
client_cls = self.client_cls_p.start() 'quantum.agent.l3_agent.L3PluginApi')
self.client_inst = mock.Mock() l3pluginApi_cls = self.l3pluginApi_cls_p.start()
client_cls.return_value = self.client_inst self.plugin_api = mock.Mock()
l3pluginApi_cls.return_value = self.plugin_api
def tearDown(self): def tearDown(self):
self.device_exists_p.stop() self.device_exists_p.stop()
self.client_cls_p.stop() self.l3pluginApi_cls_p.stop()
self.ip_cls_p.stop() self.ip_cls_p.stop()
self.dvr_cls_p.stop() self.dvr_cls_p.stop()
self.utils_exec_p.stop() self.utils_exec_p.stop()
@ -86,7 +89,7 @@ class TestBasicRouterOperations(unittest.TestCase):
self.assertTrue(ri.ns_name().endswith(id)) self.assertTrue(ri.ns_name().endswith(id))
def testAgentCreate(self): def testAgentCreate(self):
agent = l3_agent.L3NATAgent(self.conf) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
def _test_internal_network_action(self, action): def _test_internal_network_action(self, action):
port_id = _uuid() port_id = _uuid()
@ -94,7 +97,7 @@ class TestBasicRouterOperations(unittest.TestCase):
network_id = _uuid() network_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces) self.conf.use_namespaces)
agent = l3_agent.L3NATAgent(self.conf) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
interface_name = agent.get_internal_device_name(port_id) interface_name = agent.get_internal_device_name(port_id)
cidr = '99.0.1.9/24' cidr = '99.0.1.9/24'
mac = 'ca:fe:de:ad:be:ef' mac = 'ca:fe:de:ad:be:ef'
@ -123,7 +126,7 @@ class TestBasicRouterOperations(unittest.TestCase):
router_id = _uuid() router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces) self.conf.use_namespaces)
agent = l3_agent.L3NATAgent(self.conf) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
internal_cidrs = ['100.0.1.0/24', '200.74.0.0/16'] internal_cidrs = ['100.0.1.0/24', '200.74.0.0/16']
ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30', ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30',
'subnet_id': _uuid()}], 'subnet_id': _uuid()}],
@ -167,7 +170,7 @@ class TestBasicRouterOperations(unittest.TestCase):
router_id = _uuid() router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces) self.conf.use_namespaces)
agent = l3_agent.L3NATAgent(self.conf) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
floating_ip = '20.0.0.100' floating_ip = '20.0.0.100'
fixed_ip = '10.0.0.23' fixed_ip = '10.0.0.23'
ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30', ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30',
@ -206,101 +209,79 @@ class TestBasicRouterOperations(unittest.TestCase):
def testProcessRouter(self): def testProcessRouter(self):
agent = l3_agent.L3NATAgent(self.conf) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router_id = _uuid() router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces)
# return data so that state is built up
ex_gw_port = {'id': _uuid(), ex_gw_port = {'id': _uuid(),
'network_id': _uuid(), 'network_id': _uuid(),
'fixed_ips': [{'ip_address': '19.4.4.4', 'fixed_ips': [{'ip_address': '19.4.4.4',
'subnet_id': _uuid()}]} 'subnet_id': _uuid()}],
'subnet': {'cidr': '19.4.4.0/24',
'gateway_ip': '19.4.4.1'}}
internal_port = {'id': _uuid(), internal_port = {'id': _uuid(),
'network_id': _uuid(), 'network_id': _uuid(),
'admin_state_up': True, 'admin_state_up': True,
'fixed_ips': [{'ip_address': '35.4.4.4', 'fixed_ips': [{'ip_address': '35.4.4.4',
'subnet_id': _uuid()}], 'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef'} 'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': '35.4.4.0/24',
def fake_list_ports1(**kwargs): 'gateway_ip': '35.4.4.1'}}
if kwargs['device_owner'] == l3_db.DEVICE_OWNER_ROUTER_GW:
return {'ports': [ex_gw_port]}
elif kwargs['device_owner'] == l3_db.DEVICE_OWNER_ROUTER_INTF:
return {'ports': [internal_port]}
fake_subnet = {'subnet': {'cidr': '19.4.4.0/24',
'gateway_ip': '19.4.4.1'}}
fake_floatingips1 = {'floatingips': [ fake_floatingips1 = {'floatingips': [
{'id': _uuid(), {'id': _uuid(),
'floating_ip_address': '8.8.8.8', 'floating_ip_address': '8.8.8.8',
'fixed_ip_address': '7.7.7.7', 'fixed_ip_address': '7.7.7.7',
'port_id': _uuid()}]} 'port_id': _uuid()}]}
router = {
self.client_inst.list_ports.side_effect = fake_list_ports1 'id': router_id,
self.client_inst.show_subnet.return_value = fake_subnet l3_constants.FLOATINGIP_KEY: fake_floatingips1['floatingips'],
self.client_inst.list_floatingips.return_value = fake_floatingips1 l3_constants.INTERFACE_KEY: [internal_port],
'gw_port': ex_gw_port}
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces, router=router)
agent.process_router(ri) agent.process_router(ri)
# remap floating IP to a new fixed ip # remap floating IP to a new fixed ip
fake_floatingips2 = copy.deepcopy(fake_floatingips1) fake_floatingips2 = copy.deepcopy(fake_floatingips1)
fake_floatingips2['floatingips'][0]['fixed_ip_address'] = '7.7.7.8' fake_floatingips2['floatingips'][0]['fixed_ip_address'] = '7.7.7.8'
self.client_inst.list_floatingips.return_value = fake_floatingips2 router[l3_constants.FLOATINGIP_KEY] = fake_floatingips2['floatingips']
agent.process_router(ri) agent.process_router(ri)
# remove just the floating ips # remove just the floating ips
self.client_inst.list_floatingips.return_value = {'floatingips': []} del router[l3_constants.FLOATINGIP_KEY]
agent.process_router(ri) agent.process_router(ri)
# now return no ports so state is torn down # now no ports so state is torn down
self.client_inst.list_ports.return_value = {'ports': []} del router[l3_constants.INTERFACE_KEY]
del router['gw_port']
agent.process_router(ri) agent.process_router(ri)
def testRoutersWithAdminStateDown(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self.plugin_api.get_external_network_id.return_value = None
routers = [
{'id': _uuid(),
'admin_state_up': False,
'external_gateway_info': {}}]
agent._process_routers(routers)
self.assertNotIn(routers[0]['id'], agent.router_info)
def testSingleLoopRouterRemoval(self): def testSingleLoopRouterRemoval(self):
agent = l3_agent.L3NATAgent(self.conf) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router_id = _uuid() self.plugin_api.get_external_network_id.return_value = None
routers = [
self.client_inst.list_ports.return_value = {'ports': []} {'id': _uuid(),
self.client_inst.list_networks.return_value = {'networks': []}
self.client_inst.list_routers.return_value = {'routers': [
{'id': router_id,
'admin_state_up': True, 'admin_state_up': True,
'external_gateway_info': {}}]} 'external_gateway_info': {}}]
agent.do_single_loop() agent._process_routers(routers)
self.client_inst.list_routers.return_value = {'routers': []}
agent.do_single_loop()
self.external_process.assert_has_calls(
[mock.call(agent.conf, router_id, 'sudo', 'qrouter-' + router_id),
mock.call().enable(mock.ANY),
mock.call(agent.conf, router_id, 'sudo', 'qrouter-' + router_id),
mock.call().disable()])
agent.router_deleted(None, routers[0]['id'])
# verify that remove is called # verify that remove is called
self.assertEquals(self.mock_ip.get_devices.call_count, 1) self.assertEquals(self.mock_ip.get_devices.call_count, 1)
self.device_exists.assert_has_calls( self.device_exists.assert_has_calls(
[mock.call(self.conf.external_network_bridge)]) [mock.call(self.conf.external_network_bridge)])
def testDaemonLoop(self):
# just take a pass through the loop, then raise on time.sleep()
time_sleep_p = mock.patch('time.sleep')
time_sleep = time_sleep_p.start()
class ExpectedException(Exception):
pass
time_sleep.side_effect = ExpectedException()
agent = l3_agent.L3NATAgent(self.conf)
self.assertRaises(ExpectedException, agent.daemon_loop)
time_sleep_p.stop()
def testDestroyNamespace(self): def testDestroyNamespace(self):
class FakeDev(object): class FakeDev(object):
@ -311,16 +292,5 @@ class TestBasicRouterOperations(unittest.TestCase):
self.mock_ip.get_devices.return_value = [FakeDev('qr-aaaa'), self.mock_ip.get_devices.return_value = [FakeDev('qr-aaaa'),
FakeDev('qgw-aaaa')] FakeDev('qgw-aaaa')]
agent = l3_agent.L3NATAgent(self.conf) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
agent._destroy_all_router_namespaces() agent._destroy_all_router_namespaces()
def testMain(self):
agent_mock_p = mock.patch('quantum.agent.l3_agent.L3NATAgent')
agent_mock = agent_mock_p.start()
agent_mock.daemon_loop.return_value = None
with mock.patch('quantum.agent.common.config.setup_logging'):
with mock.patch('quantum.agent.l3_agent.sys') as mock_sys:
mock_sys.argv = []
l3_agent.main()
agent_mock_p.stop()

View File

@ -30,11 +30,13 @@ import webtest
from quantum.api import extensions from quantum.api import extensions
from quantum.api.v2 import attributes from quantum.api.v2 import attributes
from quantum.common import config from quantum.common import config
from quantum.common import constants as l3_constants
from quantum.common import exceptions as q_exc from quantum.common import exceptions as q_exc
from quantum.common.test_lib import test_config from quantum.common.test_lib import test_config
from quantum import context from quantum import context
from quantum.db import db_base_plugin_v2 from quantum.db import db_base_plugin_v2
from quantum.db import l3_db from quantum.db import l3_db
from quantum.db import l3_rpc_agent_api
from quantum.db import models_v2 from quantum.db import models_v2
from quantum.extensions import l3 from quantum.extensions import l3
from quantum import manager from quantum import manager
@ -68,7 +70,6 @@ class L3NatExtensionTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
plugin = 'quantum.extensions.l3.RouterPluginBase' plugin = 'quantum.extensions.l3.RouterPluginBase'
# Ensure 'stale' patched copies of the plugin are never returned # Ensure 'stale' patched copies of the plugin are never returned
manager.QuantumManager._instance = None manager.QuantumManager._instance = None
@ -89,7 +90,6 @@ class L3NatExtensionTestCase(unittest.TestCase):
self._plugin_patcher = mock.patch(plugin, autospec=True) self._plugin_patcher = mock.patch(plugin, autospec=True)
self.plugin = self._plugin_patcher.start() self.plugin = self._plugin_patcher.start()
# Instantiate mock plugin and enable the 'router' extension # Instantiate mock plugin and enable the 'router' extension
manager.QuantumManager.get_plugin().supported_extension_aliases = ( manager.QuantumManager.get_plugin().supported_extension_aliases = (
["router"]) ["router"])
@ -118,9 +118,7 @@ class L3NatExtensionTestCase(unittest.TestCase):
instance = self.plugin.return_value instance = self.plugin.return_value
instance.create_router.return_value = return_value instance.create_router.return_value = return_value
instance.get_routers_count.return_value = 0 instance.get_routers_count.return_value = 0
res = self.api.post_json(_get_path('routers'), data) res = self.api.post_json(_get_path('routers'), data)
instance.create_router.assert_called_with(mock.ANY, instance.create_router.assert_called_with(mock.ANY,
router=data) router=data)
self.assertEqual(res.status_int, exc.HTTPCreated.code) self.assertEqual(res.status_int, exc.HTTPCreated.code)
@ -1194,3 +1192,146 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
with self.network(router__external=True) as ext_net: with self.network(router__external=True) as ext_net:
self.assertEqual(ext_net['network'][l3.EXTERNAL], self.assertEqual(ext_net['network'][l3.EXTERNAL],
True) True)
def _test_notify_op_agent(self, target_func, *args):
l3_rpc_agent_api_str = (
'quantum.db.l3_rpc_agent_api.L3AgentNotifyAPI')
oldNotify = l3_rpc_agent_api.L3AgentNofity
try:
with mock.patch(l3_rpc_agent_api_str) as notifyApi:
l3_rpc_agent_api.L3AgentNofity = notifyApi
kargs = [item for item in args]
kargs.append(notifyApi)
target_func(*kargs)
except:
l3_rpc_agent_api.L3AgentNofity = oldNotify
raise
else:
l3_rpc_agent_api.L3AgentNofity = oldNotify
def _test_router_gateway_op_agent(self, notifyApi):
with self.router() as r:
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
s['subnet']['network_id'])
self._remove_external_gateway_from_router(
r['router']['id'],
s['subnet']['network_id'])
self.assertEquals(
2, notifyApi.routers_updated.call_count)
def test_router_gateway_op_agent(self):
self._test_notify_op_agent(self._test_router_gateway_op_agent)
def _test_interfaces_op_agent(self, r, notifyApi):
with self.port(no_delete=True) as p:
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'])
# clean-up
self._router_interface_action('remove',
r['router']['id'],
None,
p['port']['id'])
self.assertEquals(2, notifyApi.routers_updated.call_count)
def test_interfaces_op_agent(self):
with self.router() as r:
self._test_notify_op_agent(
self._test_interfaces_op_agent, r)
def _test_floatingips_op_agent(self, notifyApi):
with self.floatingip_with_assoc() as fip:
pass
# add gateway, add interface, associate, deletion of floatingip,
# delete gateway, delete interface
self.assertEquals(6, notifyApi.routers_updated.call_count)
def test_floatingips_op_agent(self):
self._test_notify_op_agent(self._test_floatingips_op_agent)
def test_l3_agent_routers_query_interfaces(self):
with self.router() as r:
with self.port(no_delete=True) as p:
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'])
plugin = TestL3NatPlugin()
routers = plugin.get_sync_data(context.get_admin_context(),
None)
self.assertEqual(1, len(routers))
interfaces = routers[0][l3_constants.INTERFACE_KEY]
self.assertEqual(1, len(interfaces))
subnet_id = interfaces[0]['subnet']['id']
wanted_subnetid = p['port']['fixed_ips'][0]['subnet_id']
self.assertEqual(wanted_subnetid, subnet_id)
# clean-up
self._router_interface_action('remove',
r['router']['id'],
None,
p['port']['id'])
def test_l3_agent_routers_query_ignore_interfaces_with_moreThanOneIp(self):
with self.router() as r:
with self.subnet(cidr='9.0.1.0/24') as subnet:
with self.port(subnet=subnet,
no_delete=True,
fixed_ips=[{'ip_address': '9.0.1.3'}]) as p:
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'])
port = {'port': {'fixed_ips':
[{'ip_address': '9.0.1.4',
'subnet_id': subnet['subnet']['id']},
{'ip_address': '9.0.1.5',
'subnet_id': subnet['subnet']['id']}]}}
plugin = TestL3NatPlugin()
ctx = context.get_admin_context()
plugin.update_port(ctx, p['port']['id'], port)
routers = plugin.get_sync_data(ctx, None)
self.assertEqual(1, len(routers))
interfaces = routers[0].get(l3_constants.INTERFACE_KEY, [])
self.assertEqual(0, len(interfaces))
# clean-up
self._router_interface_action('remove',
r['router']['id'],
None,
p['port']['id'])
def test_l3_agent_routers_query_gateway(self):
with self.router() as r:
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
s['subnet']['network_id'])
plugin = TestL3NatPlugin()
routers = plugin.get_sync_data(context.get_admin_context(),
[r['router']['id']])
self.assertEqual(1, len(routers))
gw_port = routers[0]['gw_port']
self.assertEquals(s['subnet']['id'], gw_port['subnet']['id'])
self._remove_external_gateway_from_router(
r['router']['id'],
s['subnet']['network_id'])
def test_l3_agent_routers_query_floatingips(self):
with self.floatingip_with_assoc() as fip:
plugin = TestL3NatPlugin()
routers = plugin.get_sync_data(context.get_admin_context(),
[fip['floatingip']['router_id']])
self.assertEqual(1, len(routers))
floatingips = routers[0][l3_constants.FLOATINGIP_KEY]
self.assertEqual(1, len(floatingips))
self.assertEquals(floatingips[0]['id'],
fip['floatingip']['id'])
self.assertEquals(floatingips[0]['port_id'],
fip['floatingip']['port_id'])
self.assertTrue(floatingips[0]['fixed_ip_address'] is not None)
self.assertTrue(floatingips[0]['router_id'] is not None)