plugin/ryu: update for ryu update

blueprint ryu-plugin-update-for-ryu
Now ryu has GRE tunnel support, so update ryu plugin to support it.
This implements/simplifies GRE tunneling support for ryu plugin

Change-Id: I158affcb60c9016753dbbbf5f1ad50286af49c23
Signed-off-by: Isaku Yamahata <yamahata@valinux.co.jp>
This commit is contained in:
Yoshihiro Kaneko 2012-07-24 13:01:07 +00:00 committed by Isaku Yamahata
parent 1a85476b6a
commit cf755b4dba
10 changed files with 337 additions and 340 deletions

View File

@ -17,6 +17,28 @@ openflow_rest_api = 127.0.0.1:8080
# tunnel_key_min = 1 # tunnel_key_min = 1
# tunnel_key_max = 0xffffff # tunnel_key_max = 0xffffff
# tunnel_ip = <ip address for tunneling>
# tunnel_interface = interface for tunneling
# when tunnel_ip is NOT specified, ip address is read
# from this interface
# tunnel_ip =
# tunnel_interface =
tunnel_interface = eth0
# ovsdb_port = port number on which ovsdb is listening
# ryu-agent uses this parameter to setup ovsdb.
# ovs-vsctl set-manager ptcp:<ovsdb_port>
# See set-manager section of man ovs-vsctl for details.
# currently ptcp is only supported.
# ovsdb_ip = <host IP address on which ovsdb is listening>
# ovsdb_interface = interface for ovsdb
# when ovsdb_addr NOT specifiied, ip address is gotten
# from this interface
# ovsdb_port = 6634
# ovsdb_ip =
# ovsdb_interface =
ovsdb_interface = eth0
[AGENT] [AGENT]
# Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real # Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real
# root filter facility. # root filter facility.

View File

@ -20,87 +20,163 @@
# under the License. # under the License.
# @author: Isaku Yamahata # @author: Isaku Yamahata
import httplib
import socket
import sys import sys
import time
from ryu.app.client import OFPClient import netifaces
from ryu.app import client
from ryu.app import conf_switch_key
from ryu.app import rest_nw_id from ryu.app import rest_nw_id
from sqlalchemy.ext.sqlsoup import SqlSoup from sqlalchemy.ext.sqlsoup import SqlSoup
from quantum.agent.linux import ovs_lib from quantum.agent.linux import ovs_lib
from quantum.agent.linux.ovs_lib import VifPort from quantum.agent.linux.ovs_lib import VifPort
from quantum.common import config as logging_config from quantum.common import config as logging_config
from quantum.common import constants
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common.cfg import NoSuchGroupError
from quantum.openstack.common.cfg import NoSuchOptError
from quantum.openstack.common import log as LOG from quantum.openstack.common import log as LOG
from quantum.plugins.ryu.common import config from quantum.plugins.ryu.common import config
# This is copied of nova.flags._get_my_ip()
# Agent shouldn't depend on nova module
def _get_my_ip():
"""
Returns the actual ip of the local machine.
This code figures out what source address would be used if some traffic
were to be sent out to some well known address on the Internet. In this
case, a Google DNS server is used, but the specific address does not
matter much. No traffic is actually sent.
"""
csock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
csock.connect(('8.8.8.8', 80))
(addr, _port) = csock.getsockname()
csock.close()
return addr
def _get_ip(cfg_ip_str, cfg_interface_str):
ip = None
try:
ip = getattr(cfg.CONF.OVS, cfg_ip_str)
except (NoSuchOptError, NoSuchGroupError):
pass
if ip:
return ip
iface = None
try:
iface = getattr(cfg.CONF.OVS, cfg_interface_str)
except (NoSuchOptError, NoSuchGroupError):
pass
if iface:
iface = netifaces.ifaddresses(iface)[netifaces.AF_INET][0]
return iface['addr']
return _get_my_ip()
def _get_tunnel_ip():
return _get_ip('tunnel_ip', 'tunnel_interface')
def _get_ovsdb_ip():
return _get_ip('ovsdb_ip', 'ovsdb_interface')
class OVSBridge(ovs_lib.OVSBridge): class OVSBridge(ovs_lib.OVSBridge):
def __init__(self, br_name, root_helper): def __init__(self, br_name, root_helper):
ovs_lib.OVSBridge.__init__(self, br_name, root_helper) ovs_lib.OVSBridge.__init__(self, br_name, root_helper)
self.datapath_id = None self.datapath_id = None
def find_datapath_id(self): def find_datapath_id(self):
# ovs-vsctl get Bridge br-int datapath_id self.datapath_id = self.get_datapath_id()
res = self.run_vsctl(["get", "Bridge", self.br_name, "datapath_id"])
# remove preceding/trailing double quotes def set_manager(self, target):
dp_id = res.strip().strip('"') self.run_vsctl(["set-manager", target])
self.datapath_id = dp_id
def set_controller(self, target): def get_ofport(self, name):
methods = ("ssl", "tcp", "unix", "pssl", "ptcp", "punix") return self.db_get_val("Interface", name, "ofport")
args = target.split(":")
if not args[0] in methods:
target = "tcp:" + target
self.run_vsctl(["set-controller", self.br_name, target])
def _vifport(self, name, external_ids):
ofport = self.db_get_val("Interface", name, "ofport")
return VifPort(name, ofport, external_ids["iface-id"],
external_ids["attached-mac"], self)
def _get_ports(self, get_port): def _get_ports(self, get_port):
ports = [] ports = []
port_names = self.get_port_name_list() port_names = self.get_port_name_list()
for name in port_names: for name in port_names:
if self.get_ofport(name) < 0:
continue
port = get_port(name) port = get_port(name)
if port: if port:
ports.append(port) ports.append(port)
return ports return ports
def _get_vif_port(self, name):
external_ids = self.db_get_map("Interface", name, "external_ids")
if "iface-id" in external_ids and "attached-mac" in external_ids:
return self._vifport(name, external_ids)
elif ("xs-vif-uuid" in external_ids and
"attached-mac" in external_ids):
# if this is a xenserver and iface-id is not automatically
# synced to OVS from XAPI, we grab it from XAPI directly
ofport = self.db_get_val("Interface", name, "ofport")
iface_id = self.get_xapi_iface_id(external_ids["xs-vif-uuid"])
return VifPort(name, ofport, iface_id,
external_ids["attached-mac"], self)
def get_vif_ports(self):
"returns a VIF object for each VIF port"
return self._get_ports(self._get_vif_port)
def _get_external_port(self, name): def _get_external_port(self, name):
# exclude vif ports
external_ids = self.db_get_map("Interface", name, "external_ids") external_ids = self.db_get_map("Interface", name, "external_ids")
if external_ids: if external_ids:
return return
ofport = self.db_get_val("Interface", name, "ofport") # exclude tunnel ports
options = self.db_get_map("Interface", name, "options")
if "remote_ip" in options:
return
ofport = self.get_ofport(name)
return VifPort(name, ofport, None, None, self) return VifPort(name, ofport, None, None, self)
def get_external_ports(self): def get_external_ports(self):
return self._get_ports(self._get_external_port) return self._get_ports(self._get_external_port)
def check_ofp_mode(db): class VifPortSet(object):
def __init__(self, int_br, ryu_rest_client):
super(VifPortSet, self).__init__()
self.int_br = int_br
self.api = ryu_rest_client
def setup(self):
for port in self.int_br.get_external_ports():
LOG.debug(_('external port %s'), port)
self.api.update_port(rest_nw_id.NW_ID_EXTERNAL,
port.switch.datapath_id, port.ofport)
class OVSQuantumOFPRyuAgent(object):
def __init__(self, integ_br, ofp_rest_api_addr,
tunnel_ip, ovsdb_ip, ovsdb_port,
root_helper):
super(OVSQuantumOFPRyuAgent, self).__init__()
self.int_br = None
self.vif_ports = None
self._setup_integration_br(root_helper, integ_br,
ofp_rest_api_addr,
tunnel_ip, ovsdb_port, ovsdb_ip)
def _setup_integration_br(self, root_helper, integ_br,
ofp_rest_api_addr,
tunnel_ip, ovsdb_port, ovsdb_ip):
self.int_br = OVSBridge(integ_br, root_helper)
self.int_br.find_datapath_id()
ryu_rest_client = client.OFPClient(ofp_rest_api_addr)
self.vif_ports = VifPortSet(self.int_br, ryu_rest_client)
self.vif_ports.setup()
sc_client = client.SwitchConfClient(ofp_rest_api_addr)
sc_client.set_key(self.int_br.datapath_id,
conf_switch_key.OVS_TUNNEL_ADDR, tunnel_ip)
# Currently Ryu supports only tcp methods. (ssl isn't supported yet)
self.int_br.set_manager('ptcp:%d' % ovsdb_port)
sc_client.set_key(self.int_br.datapath_id, conf_switch_key.OVSDB_ADDR,
'tcp:%s:%d' % (ovsdb_ip, ovsdb_port))
def check_ofp_rest_api_addr(db):
LOG.debug("checking db") LOG.debug("checking db")
servers = db.ofp_server.all() servers = db.ofp_server.all()
@ -113,107 +189,16 @@ def check_ofp_mode(db):
elif serv.host_type == "controller": elif serv.host_type == "controller":
ofp_controller_addr = serv.address ofp_controller_addr = serv.address
else: else:
LOG.warn("ignoring unknown server type %s", serv) LOG.warn(_("ignoring unknown server type %s"), serv)
LOG.debug("controller %s", ofp_controller_addr)
LOG.debug("api %s", ofp_rest_api_addr) LOG.debug("api %s", ofp_rest_api_addr)
if not ofp_controller_addr: if ofp_controller_addr:
raise RuntimeError("OF controller isn't specified") LOG.warn(_('OF controller parameter is stale %s'), ofp_controller_addr)
if not ofp_rest_api_addr: if not ofp_rest_api_addr:
raise RuntimeError("Ryu rest API port isn't specified") raise RuntimeError(_("Ryu rest API port isn't specified"))
LOG.debug("going to ofp controller mode %s %s", LOG.debug(_("going to ofp controller mode %s"), ofp_rest_api_addr)
ofp_controller_addr, ofp_rest_api_addr) return ofp_rest_api_addr
return (ofp_controller_addr, ofp_rest_api_addr)
class OVSQuantumOFPRyuAgent:
def __init__(self, integ_br, db, root_helper):
self.root_helper = root_helper
(ofp_controller_addr, ofp_rest_api_addr) = check_ofp_mode(db)
self.nw_id_external = rest_nw_id.NW_ID_EXTERNAL
self.api = OFPClient(ofp_rest_api_addr)
self._setup_integration_br(integ_br, ofp_controller_addr)
def _setup_integration_br(self, integ_br, ofp_controller_addr):
self.int_br = OVSBridge(integ_br, self.root_helper)
self.int_br.find_datapath_id()
self.int_br.set_controller(ofp_controller_addr)
for port in self.int_br.get_external_ports():
self._port_update(self.nw_id_external, port)
def _port_update(self, network_id, port):
self.api.update_port(network_id, port.switch.datapath_id, port.ofport)
def _all_bindings(self, db):
"""return interface id -> port which include network id bindings"""
return dict((port.id, port) for port in db.ports.all())
def _set_port_status(self, port, status):
port.status = status
def daemon_loop(self, db):
# on startup, register all existing ports
all_bindings = self._all_bindings(db)
local_bindings = {}
vif_ports = {}
for port in self.int_br.get_vif_ports():
vif_ports[port.vif_id] = port
if port.vif_id in all_bindings:
net_id = all_bindings[port.vif_id].network_id
local_bindings[port.vif_id] = net_id
self._port_update(net_id, port)
self._set_port_status(all_bindings[port.vif_id],
constants.PORT_STATUS_ACTIVE)
LOG.info("Updating binding to net-id = %s for %s",
net_id, str(port))
db.commit()
old_vif_ports = vif_ports
old_local_bindings = local_bindings
while True:
all_bindings = self._all_bindings(db)
new_vif_ports = {}
new_local_bindings = {}
for port in self.int_br.get_vif_ports():
new_vif_ports[port.vif_id] = port
if port.vif_id in all_bindings:
net_id = all_bindings[port.vif_id].network_id
new_local_bindings[port.vif_id] = net_id
old_b = old_local_bindings.get(port.vif_id)
new_b = new_local_bindings.get(port.vif_id)
if old_b == new_b:
continue
if old_b:
LOG.info("Removing binding to net-id = %s for %s",
old_b, str(port))
if port.vif_id in all_bindings:
self._set_port_status(all_bindings[port.vif_id],
constants.PORT_STATUS_DOWN)
if new_b:
if port.vif_id in all_bindings:
self._set_port_status(all_bindings[port.vif_id],
constants.PORT_STATUS_ACTIVE)
LOG.info("Adding binding to net-id = %s for %s",
new_b, str(port))
for vif_id in old_vif_ports:
if vif_id not in new_vif_ports:
LOG.info("Port Disappeared: %s", vif_id)
if vif_id in all_bindings:
self._set_port_status(all_bindings[port.vif_id],
constants.PORT_STATUS_DOWN)
old_vif_ports = new_vif_ports
old_local_bindings = new_local_bindings
db.commit()
time.sleep(2)
def main(): def main():
@ -227,11 +212,26 @@ def main():
options = {"sql_connection": cfg.CONF.DATABASE.sql_connection} options = {"sql_connection": cfg.CONF.DATABASE.sql_connection}
db = SqlSoup(options["sql_connection"]) db = SqlSoup(options["sql_connection"])
LOG.info("Connecting to database \"%s\" on %s", LOG.info(_("Connecting to database \"%(database)s\" on %(host)s") %
db.engine.url.database, db.engine.url.host) {"database": db.engine.url.database,
plugin = OVSQuantumOFPRyuAgent(integ_br, db, root_helper) "host": db.engine.url.host})
plugin.daemon_loop(db) ofp_rest_api_addr = check_ofp_rest_api_addr(db)
tunnel_ip = _get_tunnel_ip()
LOG.debug(_('tunnel_ip %s'), tunnel_ip)
ovsdb_port = cfg.CONF.OVS.ovsdb_port
LOG.debug(_('ovsdb_port %s'), ovsdb_port)
ovsdb_ip = _get_ovsdb_ip()
LOG.debug(_('ovsdb_ip %s'), ovsdb_ip)
try:
OVSQuantumOFPRyuAgent(integ_br, ofp_rest_api_addr,
tunnel_ip, ovsdb_ip, ovsdb_port, root_helper)
except httplib.HTTPException, e:
LOG.error(_("initialization failed: %s"), e)
sys.exit(1)
LOG.info(_("Ryu initialization on the node is done."
" Now Ryu agent exits successfully."))
sys.exit(0) sys.exit(0)

View File

@ -28,7 +28,12 @@ ovs_opts = [
cfg.StrOpt('openflow_controller', default='127.0.0.1:6633'), cfg.StrOpt('openflow_controller', default='127.0.0.1:6633'),
cfg.StrOpt('openflow_rest_api', default='127.0.0.1:8080'), cfg.StrOpt('openflow_rest_api', default='127.0.0.1:8080'),
cfg.IntOpt('tunnel_key_min', default=1), cfg.IntOpt('tunnel_key_min', default=1),
cfg.IntOpt('tunnel_key_max', default=0xffffff) cfg.IntOpt('tunnel_key_max', default=0xffffff),
cfg.StrOpt('tunnel_ip', default=None),
cfg.StrOpt('tunnel_interface', default=None),
cfg.IntOpt('ovsdb_port', default=6634),
cfg.StrOpt('ovsdb_ip', default=None),
cfg.StrOpt('ovsdb_interface', default=None),
] ]
agent_opts = [ agent_opts = [

View File

@ -56,9 +56,11 @@ class TunnelKey(object):
if (key_min < self._KEY_MIN_HARD or key_max > self._KEY_MAX_HARD or if (key_min < self._KEY_MIN_HARD or key_max > self._KEY_MAX_HARD or
key_min > key_max): key_min > key_max):
raise ValueError('Invalid tunnel key options ' raise ValueError(_('Invalid tunnel key options '
'tunnel_key_min: %d tunnel_key_max: %d. ' 'tunnel_key_min: %(key_min)d '
'Using default value' % (key_min, key_min)) 'tunnel_key_max: %(key_max)d. '
'Using default value') % {'key_min': key_min,
'key_max': key_max})
def _last_key(self, session): def _last_key(self, session):
try: try:
@ -133,9 +135,10 @@ class TunnelKey(object):
).params(last_key=last_key).one() ).params(last_key=last_key).one()
new_key = new_key[0] # the result is tuple. new_key = new_key[0] # the result is tuple.
LOG.debug("last_key %s new_key %s", last_key, new_key) LOG.debug(_("last_key %(last_key)s new_key %(new_key)s") %
{"last_key": last_key, "new_key": new_key})
if new_key > self.key_max: if new_key > self.key_max:
LOG.debug("no key found") LOG.debug(_("no key found"))
raise orm_exc.NoResultFound() raise orm_exc.NoResultFound()
return new_key return new_key
@ -168,8 +171,8 @@ class TunnelKey(object):
count += 1 count += 1
if count > self._TRANSACTION_RETRY_MAX: if count > self._TRANSACTION_RETRY_MAX:
# if this happens too often, increase _TRANSACTION_RETRY_MAX # if this happens too often, increase _TRANSACTION_RETRY_MAX
LOG.warn("Transaction retry reaches to %d. " LOG.warn(_("Transaction retry reaches to %d. "
"abandan to allocate tunnel key." % count) "abandan to allocate tunnel key."), count)
raise q_exc.ResourceExhausted() raise q_exc.ResourceExhausted()
return new_key return new_key
@ -182,3 +185,53 @@ class TunnelKey(object):
def all_list(self): def all_list(self):
session = db.get_session() session = db.get_session()
return session.query(ryu_models_v2.TunnelKey).all() return session.query(ryu_models_v2.TunnelKey).all()
def port_binding_create(port_id, net_id, dpid, port_no):
session = db.get_session()
session.query(models_v2.Port).filter(
models_v2.Port.network_id == net_id).filter(
models_v2.Port.id == port_id).one() # confirm port exists
with session.begin():
port_binding = ryu_models_v2.PortBinding(net_id, port_id,
dpid, port_no)
session.add(port_binding)
session.flush()
return port_binding
def port_binding_get(port_id, net_id):
session = db.get_session()
session.query(models_v2.Port).filter(
models_v2.Port.network_id == net_id).filter(
models_v2.Port.id == port_id).one() # confirm port exists
return session.query(ryu_models_v2.PortBinding).filter_by(
network_id=net_id).filter_by(port_id=port_id).one()
def port_binding_destroy(session, port_id, net_id):
try:
session.query(models_v2.Port).filter(
models_v2.Port.network_id == net_id).filter(
models_v2.Port.id == port_id).one() # confirm port exists
port_binding = session.query(ryu_models_v2.PortBinding).filter_by(
network_id=net_id).filter_by(port_id=port_id).one()
session.delete(port_binding)
session.flush()
return port_binding
except orm_exc.NoResultFound:
raise q_exc.PortNotFound(port_id=port_id, net_id=net_id)
def port_binding_all_list(session):
return session.query(ryu_models_v2.PortBinding).all()
def set_port_status(session, port_id, status):
try:
port = session.query(models_v2.Port).filter_by(id=port_id).one()
port['status'] = status
session.merge(port)
session.flush()
except orm_exc.NoResultFound:
raise q_exc.PortNotFound(port_id=port_id, net_id=None)

View File

@ -17,7 +17,6 @@
import sqlalchemy as sa import sqlalchemy as sa
from quantum.db import model_base from quantum.db import model_base
from quantum.db import models_v2
class OFPServer(model_base.BASEV2): class OFPServer(model_base.BASEV2):
@ -53,3 +52,28 @@ class TunnelKey(model_base.BASEV2):
def __repr__(self): def __repr__(self):
return "<TunnelKey(%s,%x)>" % (self.network_id, self.tunnel_key) return "<TunnelKey(%s,%x)>" % (self.network_id, self.tunnel_key)
class PortBinding(model_base.BASEV2):
"""Represents Port binding to ovs ports."""
__tablename__ = 'port_binding'
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
network_id = sa.Column(sa.String(255), sa.ForeignKey("networks.id"),
nullable=False)
port_id = sa.Column(sa.String(255), sa.ForeignKey("ports.id"), unique=True,
nullable=False)
dpid = sa.Column(sa.String(255), nullable=False)
port_no = sa.Column(sa.Integer, nullable=False)
def __init__(self, network_id, port_id, dpid, port_no):
self.network_id = network_id
self.port_id = port_id
self.dpid = dpid
self.port_no = port_no
def __repr__(self):
return "<PortBinding(%s,%s,%s,%s,%s)>" % (self.network_id,
self.port_id,
self.dpid,
self.port_no)

View File

@ -1,29 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Isaku Yamahata <yamahata at private email ne jp>
# <yamahata at valinux co jp>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
from nova.virt import firewall
class NopFirewallDriver(firewall.FirewallDriver):
def __init__(self, *args, **kwargs):
super(NopFirewallDriver, self).__init__()
for key, _val in inspect.getmembers(self, inspect.ismethod):
if key.startswith('__') or key.endswith('__'):
continue
setattr(self, key, (lambda _self, *_args, **_kwargs: True))

View File

@ -1,74 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Isaku Yamahata <yamahata at private email ne jp>
# <yamahata at valinux co jp>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ryu.app.client import OFPClient
from nova import flags
from nova.network import linux_net
from nova.openstack.common import cfg
from nova.openstack.common import log as logging
from nova import utils
LOG = logging.getLogger(__name__)
ryu_linux_net_opt = cfg.StrOpt('linuxnet_ovs_ryu_api_host',
default='127.0.0.1:8080',
help='Openflow Ryu REST API host:port')
FLAGS = flags.FLAGS
FLAGS.register_opt(ryu_linux_net_opt)
def _get_datapath_id(bridge_name):
out, _err = utils.execute('ovs-vsctl', 'get', 'Bridge',
bridge_name, 'datapath_id', run_as_root=True)
return out.strip().strip('"')
def _get_port_no(dev):
out, _err = utils.execute('ovs-vsctl', 'get', 'Interface', dev,
'ofport', run_as_root=True)
return int(out.strip())
class LinuxOVSRyuInterfaceDriver(linux_net.LinuxOVSInterfaceDriver):
def __init__(self):
super(LinuxOVSRyuInterfaceDriver, self).__init__()
LOG.debug('ryu rest host %s', FLAGS.linuxnet_ovs_ryu_api_host)
self.ryu_client = OFPClient(FLAGS.linuxnet_ovs_ryu_api_host)
self.datapath_id = _get_datapath_id(
FLAGS.linuxnet_ovs_integration_bridge)
if linux_net.binary_name == 'nova-network':
for tables in [linux_net.iptables_manager.ipv4,
linux_net.iptables_manager.ipv6]:
tables['filter'].add_rule(
'FORWARD',
'--in-interface gw-+ --out-interface gw-+ -j DROP')
linux_net.iptables_manager.apply()
def plug(self, network, mac_address, gateway=True):
LOG.debug("network %s mac_adress %s gateway %s",
network, mac_address, gateway)
ret = super(LinuxOVSRyuInterfaceDriver, self).plug(
network, mac_address, gateway)
port_no = _get_port_no(self.get_dev(network))
self.ryu_client.create_port(network['uuid'], self.datapath_id, port_no)
return ret

View File

@ -1,87 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Isaku Yamahata <yamahata at private email ne jp>
# <yamahata at valinux co jp>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import httplib
from ryu.app.client import OFPClient
from nova import flags
from nova.openstack.common import cfg
from nova.openstack.common import log as logging
from nova import utils
from nova.virt.libvirt import vif as libvirt_vif
LOG = logging.getLogger(__name__)
ryu_libvirt_ovs_driver_opt = cfg.StrOpt('libvirt_ovs_ryu_api_host',
default='127.0.0.1:8080',
help='Openflow Ryu REST API host:port')
FLAGS = flags.FLAGS
FLAGS.register_opt(ryu_libvirt_ovs_driver_opt)
def _get_datapath_id(bridge_name):
out, _err = utils.execute('ovs-vsctl', 'get', 'Bridge',
bridge_name, 'datapath_id', run_as_root=True)
return out.strip().strip('"')
def _get_port_no(dev):
out, _err = utils.execute('ovs-vsctl', 'get', 'Interface', dev,
'ofport', run_as_root=True)
return int(out.strip())
class LibvirtOpenVswitchOFPRyuDriver(libvirt_vif.LibvirtHybridOVSBridgeDriver):
def __init__(self, **kwargs):
super(LibvirtOpenVswitchOFPRyuDriver, self).__init__()
LOG.debug('ryu rest host %s', FLAGS.libvirt_ovs_bridge)
self.ryu_client = OFPClient(FLAGS.libvirt_ovs_ryu_api_host)
self.datapath_id = _get_datapath_id(FLAGS.libvirt_ovs_bridge)
def _get_port_no(self, mapping):
iface_id = mapping['vif_uuid']
_v1_name, v2_name = self.get_veth_pair_names(iface_id)
return _get_port_no(v2_name)
def plug(self, instance, vif):
result = super(LibvirtOpenVswitchOFPRyuDriver, self).plug(
instance, vif)
network, mapping = vif
port_no = self._get_port_no(mapping)
try:
self.ryu_client.create_port(network['id'], self.datapath_id,
port_no)
except httplib.HTTPException as e:
res = e.args[0]
if res.status != httplib.CONFLICT:
raise
return result
def unplug(self, instance, vif):
network, mapping = vif
port_no = self._get_port_no(mapping)
try:
self.ryu_client.delete_port(network['id'],
self.datapath_id, port_no)
except httplib.HTTPException as e:
res = e.args[0]
if res.status != httplib.NOT_FOUND:
raise
super(LibvirtOpenVswitchOFPRyuDriver, self).unplug(instance, vif)

View File

@ -17,9 +17,12 @@
# @author: Isaku Yamahata # @author: Isaku Yamahata
from ryu.app import client from ryu.app import client
from ryu.app.client import ignore_http_not_found
from ryu.app import rest_nw_id from ryu.app import rest_nw_id
from sqlalchemy.orm import exc as sql_exc from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import exc as orm_exc
from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc from quantum.common import exceptions as q_exc
from quantum.common import topics from quantum.common import topics
from quantum.db import api as db from quantum.db import api as db
@ -57,7 +60,7 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
ofp_api_host = cfg.CONF.OVS.openflow_rest_api ofp_api_host = cfg.CONF.OVS.openflow_rest_api
if ofp_con_host is None or ofp_api_host is None: if ofp_con_host is None or ofp_api_host is None:
raise q_exc.Invalid("invalid configuration. check ryu.ini") raise q_exc.Invalid(_('invalid configuration. check ryu.ini'))
hosts = [(ofp_con_host, ofp_service_type.CONTROLLER), hosts = [(ofp_con_host, ofp_service_type.CONTROLLER),
(ofp_api_host, ofp_service_type.REST_API)] (ofp_api_host, ofp_service_type.REST_API)]
@ -85,6 +88,21 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self.client.update_network(net.id) self.client.update_network(net.id)
for tun in self.tunnel_key.all_list(): for tun in self.tunnel_key.all_list():
self.tun_client.update_tunnel_key(tun.network_id, tun.tunnel_key) self.tun_client.update_tunnel_key(tun.network_id, tun.tunnel_key)
session = db.get_session()
for port_binding in db_api_v2.port_binding_all_list(session):
network_id = port_binding.network_id
dpid = port_binding.dpid
port_no = port_binding.port_no
try:
port = session.query(models_v2.Port).filter(
models_v2.Port.id == port_binding.port_id).one()
except orm_exc.NoResultFound:
continue
except orm_exc.MultipleResultsFound:
continue
self.client.update_port(network_id, dpid, port_no)
self.client.update_mac(network_id, dpid, port_no, port.mac_address)
def _client_create_network(self, net_id, tunnel_key): def _client_create_network(self, net_id, tunnel_key):
self.client.create_network(net_id) self.client.create_network(net_id)
@ -144,9 +162,74 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
return [self._fields(net, fields) for net in nets] return [self._fields(net, fields) for net in nets]
def delete_port(self, context, id, l3_port_check=True): def delete_port(self, context, id, l3_port_check=True):
with context.session.begin(subtransactions=True):
port = self._get_port(context, id)
net_id = port.network_id
try:
port_binding = db_api_v2.port_binding_destroy(context.session,
port.id, net_id)
datapath_id = port_binding.dpid
port_no = port_binding.port_no
ignore_http_not_found(
lambda: self.client.delete_port(net_id, datapath_id,
port_no))
except q_exc.PortNotFound:
pass
# if needed, check to see if this is a port owned by # if needed, check to see if this is a port owned by
# and l3-router. If so, we should prevent deletion. # and l3-router. If so, we should prevent deletion.
if l3_port_check: if l3_port_check:
self.prevent_l3_port_deletion(context, id) self.prevent_l3_port_deletion(context, id)
self.disassociate_floatingips(context, id) self.disassociate_floatingips(context, id)
return super(RyuQuantumPluginV2, self).delete_port(context, id) return super(RyuQuantumPluginV2, self).delete_port(context, id)
def update_port(self, context, id, port):
p = super(RyuQuantumPluginV2, self).update_port(context, id, port)
net_id = p['network_id']
mac_address = p['mac_address']
deleted = port['port'].get('deleted', False)
if deleted:
session = context.session
try:
db_api_v2.port_binding_destroy(session, id, net_id)
except q_exc.PortNotFound:
pass
db_api_v2.set_port_status(session, id, q_const.PORT_STATUS_DOWN)
return p
datapath_id = port['port'].get('datapath_id', None)
port_no = port['port'].get('port_no', None)
if datapath_id is None or port_no is None:
LOG.debug('p %s', p)
return p
try:
port_binding = db_api_v2.port_binding_get(id, net_id)
except orm_exc.NoResultFound:
try:
db_api_v2.port_binding_create(id, net_id, datapath_id, port_no)
except IntegrityError:
# TODO:XXX should do transaction?
return p
else:
self.client.create_port(net_id, datapath_id, port_no)
self.client.create_mac(net_id, datapath_id, port_no,
mac_address)
else:
if (port_binding.dpid != datapath_id or
port_binding.port_no != port_no):
variables = {'datapath_id': datapath_id,
'port_no': port_no,
'port_binding_dpid': port_binding.dpid,
'port_binding_port_no': port_binding.port_no}
raise q_exc.InvalidInput(
error_message=_('invalid (datapath_id, port_no) '
'is requested'
'(%(datapath_id)s, %(port_no)s), acutal'
'(%(port_binding_dpid)s, '
'%(port_binding_port_no)s)') % variables)
self.client.update_network(net_id)
self.client.update_port(net_id, datapath_id, port_no)
self.client.update_mac(net_id, datapath_id, port_no, mac_address)
return p