Merge "Ryu plugin support for v2 Quantum API"

This commit is contained in:
Jenkins 2012-07-18 12:39:38 +00:00 committed by Gerrit Code Review
commit c63c0e51ec
6 changed files with 165 additions and 76 deletions

View File

@ -22,9 +22,6 @@
import logging as LOG import logging as LOG
from optparse import OptionParser from optparse import OptionParser
import shlex
import signal
from subprocess import PIPE, Popen
import sys import sys
import time import time
@ -32,38 +29,17 @@ from ryu.app import rest_nw_id
from ryu.app.client import OFPClient from ryu.app.client import OFPClient
from sqlalchemy.ext.sqlsoup import SqlSoup from sqlalchemy.ext.sqlsoup import SqlSoup
from quantum.agent.linux import utils from quantum.agent.linux import ovs_lib
from quantum.agent.linux.ovs_lib import VifPort
from quantum.plugins.ryu.common import config from quantum.plugins.ryu.common import config
OP_STATUS_UP = "UP" OP_STATUS_UP = "UP"
OP_STATUS_DOWN = "DOWN" OP_STATUS_DOWN = "DOWN"
class VifPort: class OVSBridge(ovs_lib.OVSBridge):
"""
A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
attributes set).
"""
def __init__(self, port_name, ofport, vif_id, vif_mac, switch):
self.port_name = port_name
self.ofport = ofport
self.vif_id = vif_id
self.vif_mac = vif_mac
self.switch = switch
def __str__(self):
return ("iface-id=%s, vif_mac=%s, port_name=%s, ofport=%s, "
"bridge name = %s" % (self.vif_id,
self.vif_mac,
self.port_name,
self.ofport,
self.switch.br_name))
class OVSBridge:
def __init__(self, br_name, root_helper): def __init__(self, br_name, root_helper):
self.br_name = br_name ovs_lib.OVSBridge.__init__(self, br_name, root_helper)
self.root_helper = root_helper
self.datapath_id = None self.datapath_id = None
def find_datapath_id(self): def find_datapath_id(self):
@ -74,10 +50,6 @@ class OVSBridge:
dp_id = res.strip().strip('"') dp_id = res.strip().strip('"')
self.datapath_id = dp_id self.datapath_id = dp_id
def run_vsctl(self, args):
full_args = ["ovs-vsctl", "--timeout=2"] + args
return utils.execute(full_args, root_helper=self.root_helper)
def set_controller(self, target): def set_controller(self, target):
methods = ("ssl", "tcp", "unix", "pssl", "ptcp", "punix") methods = ("ssl", "tcp", "unix", "pssl", "ptcp", "punix")
args = target.split(":") args = target.split(":")
@ -85,35 +57,6 @@ class OVSBridge:
target = "tcp:" + target target = "tcp:" + target
self.run_vsctl(["set-controller", self.br_name, target]) self.run_vsctl(["set-controller", self.br_name, target])
def db_get_map(self, table, record, column):
str_ = self.run_vsctl(["get", table, record, column]).rstrip("\n\r")
return self.db_str_to_map(str_)
def db_get_val(self, table, record, column):
return self.run_vsctl(["get", table, record, column]).rstrip("\n\r")
@staticmethod
def db_str_to_map(full_str):
list = full_str.strip("{}").split(", ")
ret = {}
for elem in list:
if elem.find("=") == -1:
continue
arr = elem.split("=")
ret[arr[0]] = arr[1].strip("\"")
return ret
def get_port_name_list(self):
res = self.run_vsctl(["list-ports", self.br_name])
return res.split("\n")[:-1]
def get_xapi_iface_id(self, xs_vif_uuid):
return utils.execute(["xe", "vif-param-get",
"param-name=other-config",
"param-key=nicira-iface-id",
"uuid=%s" % xs_vif_uuid],
root_helper=self.root_helper).strip()
def _vifport(self, name, external_ids): def _vifport(self, name, external_ids):
ofport = self.db_get_val("Interface", name, "ofport") ofport = self.db_get_val("Interface", name, "ofport")
return VifPort(name, ofport, external_ids["iface-id"], return VifPort(name, ofport, external_ids["iface-id"],
@ -186,12 +129,13 @@ def check_ofp_mode(db):
class OVSQuantumOFPRyuAgent: class OVSQuantumOFPRyuAgent:
def __init__(self, integ_br, db, root_helper): def __init__(self, integ_br, db, root_helper, target_v2_api=False):
self.root_helper = root_helper self.root_helper = root_helper
(ofp_controller_addr, ofp_rest_api_addr) = check_ofp_mode(db) (ofp_controller_addr, ofp_rest_api_addr) = check_ofp_mode(db)
self.nw_id_external = rest_nw_id.NW_ID_EXTERNAL self.nw_id_external = rest_nw_id.NW_ID_EXTERNAL
self.api = OFPClient(ofp_rest_api_addr) self.api = OFPClient(ofp_rest_api_addr)
self.target_v2_api = target_v2_api
self._setup_integration_br(integ_br, ofp_controller_addr) self._setup_integration_br(integ_br, ofp_controller_addr)
def _setup_integration_br(self, integ_br, ofp_controller_addr): def _setup_integration_br(self, integ_br, ofp_controller_addr):
@ -206,8 +150,17 @@ class OVSQuantumOFPRyuAgent:
def _all_bindings(self, db): def _all_bindings(self, db):
"""return interface id -> port which include network id bindings""" """return interface id -> port which include network id bindings"""
if self.target_v2_api:
return dict((port.device_id, port) for port in db.ports.all())
else:
return dict((port.interface_id, port) for port in db.ports.all()) return dict((port.interface_id, port) for port in db.ports.all())
def _set_port_status(self, port, status):
if self.target_v2_api:
port.status = status
else:
port.op_status = status
def daemon_loop(self, db): def daemon_loop(self, db):
# on startup, register all existing ports # on startup, register all existing ports
all_bindings = self._all_bindings(db) all_bindings = self._all_bindings(db)
@ -220,7 +173,7 @@ class OVSQuantumOFPRyuAgent:
net_id = all_bindings[port.vif_id].network_id net_id = all_bindings[port.vif_id].network_id
local_bindings[port.vif_id] = net_id local_bindings[port.vif_id] = net_id
self._port_update(net_id, port) self._port_update(net_id, port)
all_bindings[port.vif_id].op_status = OP_STATUS_UP self._set_port_status(all_bindings[port.vif_id], OP_STATUS_UP)
LOG.info("Updating binding to net-id = %s for %s", LOG.info("Updating binding to net-id = %s for %s",
net_id, str(port)) net_id, str(port))
db.commit() db.commit()
@ -248,10 +201,12 @@ class OVSQuantumOFPRyuAgent:
LOG.info("Removing binding to net-id = %s for %s", LOG.info("Removing binding to net-id = %s for %s",
old_b, str(port)) old_b, str(port))
if port.vif_id in all_bindings: if port.vif_id in all_bindings:
all_bindings[port.vif_id].op_status = OP_STATUS_DOWN self._set_port_status(all_bindings[port.vif_id],
OP_STATUS_DOWN)
if not new_b: if not new_b:
if port.vif_id in all_bindings: if port.vif_id in all_bindings:
all_bindings[port.vif_id].op_status = OP_STATUS_UP self._set_port_status(all_bindings[port.vif_id],
OP_STATUS_UP)
LOG.info("Adding binding to net-id = %s for %s", LOG.info("Adding binding to net-id = %s for %s",
new_b, str(port)) new_b, str(port))
@ -259,7 +214,8 @@ class OVSQuantumOFPRyuAgent:
if vif_id not in new_vif_ports: if vif_id not in new_vif_ports:
LOG.info("Port Disappeared: %s", vif_id) LOG.info("Port Disappeared: %s", vif_id)
if vif_id in all_bindings: if vif_id in all_bindings:
all_bindings[vif_id].op_status = OP_STATUS_DOWN self._set_port_status(all_bindings[port.vif_id],
OP_STATUS_DOWN)
old_vif_ports = new_vif_ports old_vif_ports = new_vif_ports
old_local_bindings = new_local_bindings old_local_bindings = new_local_bindings
@ -289,12 +245,13 @@ def main():
conf = config.parse(config_file) conf = config.parse(config_file)
integ_br = conf.OVS.integration_bridge integ_br = conf.OVS.integration_bridge
root_helper = conf.AGENT.root_helper root_helper = conf.AGENT.root_helper
target_v2_api = conf.AGENT.target_v2_api
options = {"sql_connection": conf.DATABASE.sql_connection} options = {"sql_connection": conf.DATABASE.sql_connection}
db = SqlSoup(options["sql_connection"]) db = SqlSoup(options["sql_connection"])
LOG.info("Connecting to database \"%s\" on %s", LOG.info("Connecting to database \"%s\" on %s",
db.engine.url.database, db.engine.url.host) db.engine.url.database, db.engine.url.host)
plugin = OVSQuantumOFPRyuAgent(integ_br, db, root_helper) plugin = OVSQuantumOFPRyuAgent(integ_br, db, root_helper, target_v2_api)
plugin.daemon_loop(db) plugin.daemon_loop(db)
sys.exit(0) sys.exit(0)

View File

@ -30,6 +30,7 @@ ovs_opts = [
] ]
agent_opts = [ agent_opts = [
cfg.BoolOpt('target_v2_api', default=True),
cfg.IntOpt('polling_interval', default=2), cfg.IntOpt('polling_interval', default=2),
cfg.StrOpt('root_helper', default='sudo'), cfg.StrOpt('root_helper', default='sudo'),
] ]

View File

@ -0,0 +1,33 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Isaku Yamahata <yamahata at private email ne jp>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import quantum.db.api as db
from quantum.db.models_v2 import Network
from quantum.plugins.ryu.db import models_v2
def set_ofp_servers(hosts):
session = db.get_session()
session.query(models_v2.OFPServer).delete()
for (host_address, host_type) in hosts:
host = models_v2.OFPServer(host_address, host_type)
session.add(host)
session.flush()
def network_all_tenant_list():
session = db.get_session()
return session.query(Network).all()

View File

@ -0,0 +1,37 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Isaku Yamahata <yamahata at private email ne jp>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column, Integer, String
from quantum.db import models_v2
class OFPServer(models_v2.model_base.BASEV2):
"""Openflow Server/API address"""
__tablename__ = 'ofp_server'
id = Column(Integer, primary_key=True, autoincrement=True)
address = Column(String(255)) # netloc <host ip address>:<port>
host_type = Column(String(255)) # server type
# Controller, REST_API
def __init__(self, address, host_type):
self.address = address
self.host_type = host_type
def __repr__(self):
return "<OFPServer(%s,%s,%s)>" % (self.id, self.address,
self.host_type)

View File

@ -60,14 +60,22 @@ class LibvirtOpenVswitchOFPRyuDriver(libvirt_vif.LibvirtOpenVswitchDriver):
dev = self.get_dev_name(iface_id) dev = self.get_dev_name(iface_id)
return _get_port_no(dev) return _get_port_no(dev)
def plug(self, instance, network, mapping): def plug(self, instance, vif):
result = super(LibvirtOpenVswitchOFPRyuDriver, self).plug( result = super(LibvirtOpenVswitchOFPRyuDriver, self).plug(
instance, network, mapping) instance, vif)
network, mapping = vif
port_no = self._get_port_no(mapping) port_no = self._get_port_no(mapping)
self.ryu_client.create_port(network['id'], self.datapath_id, port_no) try:
self.ryu_client.create_port(network['id'], self.datapath_id,
port_no)
except httplib.HTTPException as e:
res = e.args[0]
if res.status != httplib.CONFLICT:
raise
return result return result
def unplug(self, instance, network, mapping): def unplug(self, instance, vif):
network, mapping = vif
port_no = self._get_port_no(mapping) port_no = self._get_port_no(mapping)
try: try:
self.ryu_client.delete_port(network['id'], self.ryu_client.delete_port(network['id'],
@ -76,5 +84,4 @@ class LibvirtOpenVswitchOFPRyuDriver(libvirt_vif.LibvirtOpenVswitchDriver):
res = e.args[0] res = e.args[0]
if res.status != httplib.NOT_FOUND: if res.status != httplib.NOT_FOUND:
raise raise
super(LibvirtOpenVswitchOFPRyuDriver, self).unplug(instance, network, super(LibvirtOpenVswitchOFPRyuDriver, self).unplug(instance, vif)
mapping)

View File

@ -16,17 +16,24 @@
# under the License. # under the License.
# @author: Isaku Yamahata # @author: Isaku Yamahata
import logging
import os
from ryu.app import client from ryu.app import client
from ryu.app import rest_nw_id from ryu.app import rest_nw_id
from quantum.common import exceptions as q_exc from quantum.common import exceptions as q_exc
from quantum.common.utils import find_config_file from quantum.common.utils import find_config_file
import quantum.db.api as db from quantum.db import api as db
from quantum.db import db_base_plugin_v2
from quantum.db import models_v2
from quantum.plugins.ryu.db import api as db_api from quantum.plugins.ryu.db import api as db_api
from quantum.plugins.ryu.db import api_v2 as db_api_v2
from quantum.plugins.ryu import ofp_service_type from quantum.plugins.ryu import ofp_service_type
from quantum.plugins.ryu import ovs_quantum_plugin_base from quantum.plugins.ryu import ovs_quantum_plugin_base
from quantum.plugins.ryu.common import config
LOG = logging.getLogger(__name__)
CONF_FILE = find_config_file({"plugin": "ryu"}, "ryu.ini") CONF_FILE = find_config_file({"plugin": "ryu"}, "ryu.ini")
@ -65,3 +72,50 @@ class RyuQuantumPlugin(ovs_quantum_plugin_base.OVSQuantumPluginBase):
def __init__(self, configfile=None): def __init__(self, configfile=None):
super(RyuQuantumPlugin, self).__init__(CONF_FILE, __file__, configfile) super(RyuQuantumPlugin, self).__init__(CONF_FILE, __file__, configfile)
self.driver = OFPRyuDriver(self.conf) self.driver = OFPRyuDriver(self.conf)
class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
def __init__(self, configfile=None):
if configfile is None:
if os.path.exists(CONF_FILE):
configfile = CONF_FILE
if configfile is None:
raise Exception("Configuration file \"%s\" doesn't exist" %
(configfile))
LOG.debug("Using configuration file: %s" % configfile)
conf = config.parse(configfile)
options = {"sql_connection": conf.DATABASE.sql_connection}
options.update({'base': models_v2.model_base.BASEV2})
reconnect_interval = conf.DATABASE.reconnect_interval
options.update({"reconnect_interval": reconnect_interval})
db.configure_db(options)
ofp_con_host = conf.OVS.openflow_controller
ofp_api_host = conf.OVS.openflow_rest_api
if ofp_con_host is None or ofp_api_host is None:
raise q_exc.Invalid("invalid configuration. check ryu.ini")
hosts = [(ofp_con_host, ofp_service_type.CONTROLLER),
(ofp_api_host, ofp_service_type.REST_API)]
db_api_v2.set_ofp_servers(hosts)
self.client = client.OFPClient(ofp_api_host)
self.client.update_network(rest_nw_id.NW_ID_EXTERNAL)
# register known all network list on startup
self._create_all_tenant_network()
def _create_all_tenant_network(self):
networks = db_api_v2.network_all_tenant_list()
for net in networks:
self.client.update_network(net.id)
def create_network(self, context, network):
net = super(RyuQuantumPluginV2, self).create_network(context, network)
self.client.create_network(net['id'])
return net
def delete_network(self, context, id):
self.client.delete_network(id)
return super(RyuQuantumPluginV2, self).delete_network(context, id)