diff --git a/etc/quantum/plugins/ryu/ryu.ini b/etc/quantum/plugins/ryu/ryu.ini index cb376a12a5..34f6725ba6 100644 --- a/etc/quantum/plugins/ryu/ryu.ini +++ b/etc/quantum/plugins/ryu/ryu.ini @@ -17,6 +17,28 @@ openflow_rest_api = 127.0.0.1:8080 # tunnel_key_min = 1 # tunnel_key_max = 0xffffff +# tunnel_ip = +# tunnel_interface = interface for tunneling +# when tunnel_ip is NOT specified, ip address is read +# from this interface +# tunnel_ip = +# tunnel_interface = +tunnel_interface = eth0 + +# ovsdb_port = port number on which ovsdb is listening +# ryu-agent uses this parameter to setup ovsdb. +# ovs-vsctl set-manager ptcp: +# See set-manager section of man ovs-vsctl for details. +# currently ptcp is only supported. +# ovsdb_ip = +# ovsdb_interface = interface for ovsdb +# when ovsdb_addr NOT specifiied, ip address is gotten +# from this interface +# ovsdb_port = 6634 +# ovsdb_ip = +# ovsdb_interface = +ovsdb_interface = eth0 + [AGENT] # Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real # root filter facility. diff --git a/quantum/plugins/ryu/agent/ryu_quantum_agent.py b/quantum/plugins/ryu/agent/ryu_quantum_agent.py index 593c53910d..344b1e10b7 100755 --- a/quantum/plugins/ryu/agent/ryu_quantum_agent.py +++ b/quantum/plugins/ryu/agent/ryu_quantum_agent.py @@ -20,87 +20,163 @@ # under the License. # @author: Isaku Yamahata +import httplib +import socket import sys -import time -from ryu.app.client import OFPClient +import netifaces +from ryu.app import client +from ryu.app import conf_switch_key from ryu.app import rest_nw_id from sqlalchemy.ext.sqlsoup import SqlSoup from quantum.agent.linux import ovs_lib from quantum.agent.linux.ovs_lib import VifPort from quantum.common import config as logging_config -from quantum.common import constants from quantum.openstack.common import cfg +from quantum.openstack.common.cfg import NoSuchGroupError +from quantum.openstack.common.cfg import NoSuchOptError from quantum.openstack.common import log as LOG from quantum.plugins.ryu.common import config +# This is copied of nova.flags._get_my_ip() +# Agent shouldn't depend on nova module +def _get_my_ip(): + """ + Returns the actual ip of the local machine. + + This code figures out what source address would be used if some traffic + were to be sent out to some well known address on the Internet. In this + case, a Google DNS server is used, but the specific address does not + matter much. No traffic is actually sent. + """ + csock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + csock.connect(('8.8.8.8', 80)) + (addr, _port) = csock.getsockname() + csock.close() + return addr + + +def _get_ip(cfg_ip_str, cfg_interface_str): + ip = None + try: + ip = getattr(cfg.CONF.OVS, cfg_ip_str) + except (NoSuchOptError, NoSuchGroupError): + pass + if ip: + return ip + + iface = None + try: + iface = getattr(cfg.CONF.OVS, cfg_interface_str) + except (NoSuchOptError, NoSuchGroupError): + pass + if iface: + iface = netifaces.ifaddresses(iface)[netifaces.AF_INET][0] + return iface['addr'] + + return _get_my_ip() + + +def _get_tunnel_ip(): + return _get_ip('tunnel_ip', 'tunnel_interface') + + +def _get_ovsdb_ip(): + return _get_ip('ovsdb_ip', 'ovsdb_interface') + + class OVSBridge(ovs_lib.OVSBridge): def __init__(self, br_name, root_helper): ovs_lib.OVSBridge.__init__(self, br_name, root_helper) self.datapath_id = None def find_datapath_id(self): - # ovs-vsctl get Bridge br-int datapath_id - res = self.run_vsctl(["get", "Bridge", self.br_name, "datapath_id"]) + self.datapath_id = self.get_datapath_id() - # remove preceding/trailing double quotes - dp_id = res.strip().strip('"') - self.datapath_id = dp_id + def set_manager(self, target): + self.run_vsctl(["set-manager", target]) - def set_controller(self, target): - methods = ("ssl", "tcp", "unix", "pssl", "ptcp", "punix") - args = target.split(":") - if not args[0] in methods: - target = "tcp:" + target - self.run_vsctl(["set-controller", self.br_name, target]) - - def _vifport(self, name, external_ids): - ofport = self.db_get_val("Interface", name, "ofport") - return VifPort(name, ofport, external_ids["iface-id"], - external_ids["attached-mac"], self) + def get_ofport(self, name): + return self.db_get_val("Interface", name, "ofport") def _get_ports(self, get_port): ports = [] port_names = self.get_port_name_list() for name in port_names: + if self.get_ofport(name) < 0: + continue port = get_port(name) if port: ports.append(port) return ports - def _get_vif_port(self, name): - external_ids = self.db_get_map("Interface", name, "external_ids") - if "iface-id" in external_ids and "attached-mac" in external_ids: - return self._vifport(name, external_ids) - elif ("xs-vif-uuid" in external_ids and - "attached-mac" in external_ids): - # if this is a xenserver and iface-id is not automatically - # synced to OVS from XAPI, we grab it from XAPI directly - ofport = self.db_get_val("Interface", name, "ofport") - iface_id = self.get_xapi_iface_id(external_ids["xs-vif-uuid"]) - return VifPort(name, ofport, iface_id, - external_ids["attached-mac"], self) - - def get_vif_ports(self): - "returns a VIF object for each VIF port" - return self._get_ports(self._get_vif_port) - def _get_external_port(self, name): + # exclude vif ports external_ids = self.db_get_map("Interface", name, "external_ids") if external_ids: return - ofport = self.db_get_val("Interface", name, "ofport") + # exclude tunnel ports + options = self.db_get_map("Interface", name, "options") + if "remote_ip" in options: + return + + ofport = self.get_ofport(name) return VifPort(name, ofport, None, None, self) def get_external_ports(self): return self._get_ports(self._get_external_port) -def check_ofp_mode(db): +class VifPortSet(object): + def __init__(self, int_br, ryu_rest_client): + super(VifPortSet, self).__init__() + self.int_br = int_br + self.api = ryu_rest_client + + def setup(self): + for port in self.int_br.get_external_ports(): + LOG.debug(_('external port %s'), port) + self.api.update_port(rest_nw_id.NW_ID_EXTERNAL, + port.switch.datapath_id, port.ofport) + + +class OVSQuantumOFPRyuAgent(object): + def __init__(self, integ_br, ofp_rest_api_addr, + tunnel_ip, ovsdb_ip, ovsdb_port, + root_helper): + super(OVSQuantumOFPRyuAgent, self).__init__() + self.int_br = None + self.vif_ports = None + self._setup_integration_br(root_helper, integ_br, + ofp_rest_api_addr, + tunnel_ip, ovsdb_port, ovsdb_ip) + + def _setup_integration_br(self, root_helper, integ_br, + ofp_rest_api_addr, + tunnel_ip, ovsdb_port, ovsdb_ip): + self.int_br = OVSBridge(integ_br, root_helper) + self.int_br.find_datapath_id() + + ryu_rest_client = client.OFPClient(ofp_rest_api_addr) + + self.vif_ports = VifPortSet(self.int_br, ryu_rest_client) + self.vif_ports.setup() + + sc_client = client.SwitchConfClient(ofp_rest_api_addr) + sc_client.set_key(self.int_br.datapath_id, + conf_switch_key.OVS_TUNNEL_ADDR, tunnel_ip) + + # Currently Ryu supports only tcp methods. (ssl isn't supported yet) + self.int_br.set_manager('ptcp:%d' % ovsdb_port) + sc_client.set_key(self.int_br.datapath_id, conf_switch_key.OVSDB_ADDR, + 'tcp:%s:%d' % (ovsdb_ip, ovsdb_port)) + + +def check_ofp_rest_api_addr(db): LOG.debug("checking db") servers = db.ofp_server.all() @@ -113,107 +189,16 @@ def check_ofp_mode(db): elif serv.host_type == "controller": ofp_controller_addr = serv.address else: - LOG.warn("ignoring unknown server type %s", serv) + LOG.warn(_("ignoring unknown server type %s"), serv) - LOG.debug("controller %s", ofp_controller_addr) LOG.debug("api %s", ofp_rest_api_addr) - if not ofp_controller_addr: - raise RuntimeError("OF controller isn't specified") + if ofp_controller_addr: + LOG.warn(_('OF controller parameter is stale %s'), ofp_controller_addr) if not ofp_rest_api_addr: - raise RuntimeError("Ryu rest API port isn't specified") + raise RuntimeError(_("Ryu rest API port isn't specified")) - LOG.debug("going to ofp controller mode %s %s", - ofp_controller_addr, ofp_rest_api_addr) - return (ofp_controller_addr, ofp_rest_api_addr) - - -class OVSQuantumOFPRyuAgent: - def __init__(self, integ_br, db, root_helper): - self.root_helper = root_helper - (ofp_controller_addr, ofp_rest_api_addr) = check_ofp_mode(db) - - self.nw_id_external = rest_nw_id.NW_ID_EXTERNAL - self.api = OFPClient(ofp_rest_api_addr) - self._setup_integration_br(integ_br, ofp_controller_addr) - - def _setup_integration_br(self, integ_br, ofp_controller_addr): - self.int_br = OVSBridge(integ_br, self.root_helper) - self.int_br.find_datapath_id() - self.int_br.set_controller(ofp_controller_addr) - for port in self.int_br.get_external_ports(): - self._port_update(self.nw_id_external, port) - - def _port_update(self, network_id, port): - self.api.update_port(network_id, port.switch.datapath_id, port.ofport) - - def _all_bindings(self, db): - """return interface id -> port which include network id bindings""" - return dict((port.id, port) for port in db.ports.all()) - - def _set_port_status(self, port, status): - port.status = status - - def daemon_loop(self, db): - # on startup, register all existing ports - all_bindings = self._all_bindings(db) - - local_bindings = {} - vif_ports = {} - for port in self.int_br.get_vif_ports(): - vif_ports[port.vif_id] = port - if port.vif_id in all_bindings: - net_id = all_bindings[port.vif_id].network_id - local_bindings[port.vif_id] = net_id - self._port_update(net_id, port) - self._set_port_status(all_bindings[port.vif_id], - constants.PORT_STATUS_ACTIVE) - LOG.info("Updating binding to net-id = %s for %s", - net_id, str(port)) - db.commit() - - old_vif_ports = vif_ports - old_local_bindings = local_bindings - - while True: - all_bindings = self._all_bindings(db) - - new_vif_ports = {} - new_local_bindings = {} - for port in self.int_br.get_vif_ports(): - new_vif_ports[port.vif_id] = port - if port.vif_id in all_bindings: - net_id = all_bindings[port.vif_id].network_id - new_local_bindings[port.vif_id] = net_id - - old_b = old_local_bindings.get(port.vif_id) - new_b = new_local_bindings.get(port.vif_id) - if old_b == new_b: - continue - - if old_b: - LOG.info("Removing binding to net-id = %s for %s", - old_b, str(port)) - if port.vif_id in all_bindings: - self._set_port_status(all_bindings[port.vif_id], - constants.PORT_STATUS_DOWN) - if new_b: - if port.vif_id in all_bindings: - self._set_port_status(all_bindings[port.vif_id], - constants.PORT_STATUS_ACTIVE) - LOG.info("Adding binding to net-id = %s for %s", - new_b, str(port)) - - for vif_id in old_vif_ports: - if vif_id not in new_vif_ports: - LOG.info("Port Disappeared: %s", vif_id) - if vif_id in all_bindings: - self._set_port_status(all_bindings[port.vif_id], - constants.PORT_STATUS_DOWN) - - old_vif_ports = new_vif_ports - old_local_bindings = new_local_bindings - db.commit() - time.sleep(2) + LOG.debug(_("going to ofp controller mode %s"), ofp_rest_api_addr) + return ofp_rest_api_addr def main(): @@ -227,11 +212,26 @@ def main(): options = {"sql_connection": cfg.CONF.DATABASE.sql_connection} db = SqlSoup(options["sql_connection"]) - LOG.info("Connecting to database \"%s\" on %s", - db.engine.url.database, db.engine.url.host) - plugin = OVSQuantumOFPRyuAgent(integ_br, db, root_helper) - plugin.daemon_loop(db) + LOG.info(_("Connecting to database \"%(database)s\" on %(host)s") % + {"database": db.engine.url.database, + "host": db.engine.url.host}) + ofp_rest_api_addr = check_ofp_rest_api_addr(db) + tunnel_ip = _get_tunnel_ip() + LOG.debug(_('tunnel_ip %s'), tunnel_ip) + ovsdb_port = cfg.CONF.OVS.ovsdb_port + LOG.debug(_('ovsdb_port %s'), ovsdb_port) + ovsdb_ip = _get_ovsdb_ip() + LOG.debug(_('ovsdb_ip %s'), ovsdb_ip) + try: + OVSQuantumOFPRyuAgent(integ_br, ofp_rest_api_addr, + tunnel_ip, ovsdb_ip, ovsdb_port, root_helper) + except httplib.HTTPException, e: + LOG.error(_("initialization failed: %s"), e) + sys.exit(1) + + LOG.info(_("Ryu initialization on the node is done." + " Now Ryu agent exits successfully.")) sys.exit(0) diff --git a/quantum/plugins/ryu/common/config.py b/quantum/plugins/ryu/common/config.py index 8d771e6da9..5020ac6053 100644 --- a/quantum/plugins/ryu/common/config.py +++ b/quantum/plugins/ryu/common/config.py @@ -28,7 +28,12 @@ ovs_opts = [ cfg.StrOpt('openflow_controller', default='127.0.0.1:6633'), cfg.StrOpt('openflow_rest_api', default='127.0.0.1:8080'), cfg.IntOpt('tunnel_key_min', default=1), - cfg.IntOpt('tunnel_key_max', default=0xffffff) + cfg.IntOpt('tunnel_key_max', default=0xffffff), + cfg.StrOpt('tunnel_ip', default=None), + cfg.StrOpt('tunnel_interface', default=None), + cfg.IntOpt('ovsdb_port', default=6634), + cfg.StrOpt('ovsdb_ip', default=None), + cfg.StrOpt('ovsdb_interface', default=None), ] agent_opts = [ diff --git a/quantum/plugins/ryu/db/api_v2.py b/quantum/plugins/ryu/db/api_v2.py index b61416c855..79d0e5043d 100644 --- a/quantum/plugins/ryu/db/api_v2.py +++ b/quantum/plugins/ryu/db/api_v2.py @@ -56,9 +56,11 @@ class TunnelKey(object): if (key_min < self._KEY_MIN_HARD or key_max > self._KEY_MAX_HARD or key_min > key_max): - raise ValueError('Invalid tunnel key options ' - 'tunnel_key_min: %d tunnel_key_max: %d. ' - 'Using default value' % (key_min, key_min)) + raise ValueError(_('Invalid tunnel key options ' + 'tunnel_key_min: %(key_min)d ' + 'tunnel_key_max: %(key_max)d. ' + 'Using default value') % {'key_min': key_min, + 'key_max': key_max}) def _last_key(self, session): try: @@ -133,9 +135,10 @@ class TunnelKey(object): ).params(last_key=last_key).one() new_key = new_key[0] # the result is tuple. - LOG.debug("last_key %s new_key %s", last_key, new_key) + LOG.debug(_("last_key %(last_key)s new_key %(new_key)s") % + {"last_key": last_key, "new_key": new_key}) if new_key > self.key_max: - LOG.debug("no key found") + LOG.debug(_("no key found")) raise orm_exc.NoResultFound() return new_key @@ -168,8 +171,8 @@ class TunnelKey(object): count += 1 if count > self._TRANSACTION_RETRY_MAX: # if this happens too often, increase _TRANSACTION_RETRY_MAX - LOG.warn("Transaction retry reaches to %d. " - "abandan to allocate tunnel key." % count) + LOG.warn(_("Transaction retry reaches to %d. " + "abandan to allocate tunnel key."), count) raise q_exc.ResourceExhausted() return new_key @@ -182,3 +185,53 @@ class TunnelKey(object): def all_list(self): session = db.get_session() return session.query(ryu_models_v2.TunnelKey).all() + + +def port_binding_create(port_id, net_id, dpid, port_no): + session = db.get_session() + session.query(models_v2.Port).filter( + models_v2.Port.network_id == net_id).filter( + models_v2.Port.id == port_id).one() # confirm port exists + with session.begin(): + port_binding = ryu_models_v2.PortBinding(net_id, port_id, + dpid, port_no) + session.add(port_binding) + session.flush() + return port_binding + + +def port_binding_get(port_id, net_id): + session = db.get_session() + session.query(models_v2.Port).filter( + models_v2.Port.network_id == net_id).filter( + models_v2.Port.id == port_id).one() # confirm port exists + return session.query(ryu_models_v2.PortBinding).filter_by( + network_id=net_id).filter_by(port_id=port_id).one() + + +def port_binding_destroy(session, port_id, net_id): + try: + session.query(models_v2.Port).filter( + models_v2.Port.network_id == net_id).filter( + models_v2.Port.id == port_id).one() # confirm port exists + port_binding = session.query(ryu_models_v2.PortBinding).filter_by( + network_id=net_id).filter_by(port_id=port_id).one() + session.delete(port_binding) + session.flush() + return port_binding + except orm_exc.NoResultFound: + raise q_exc.PortNotFound(port_id=port_id, net_id=net_id) + + +def port_binding_all_list(session): + return session.query(ryu_models_v2.PortBinding).all() + + +def set_port_status(session, port_id, status): + try: + port = session.query(models_v2.Port).filter_by(id=port_id).one() + port['status'] = status + session.merge(port) + session.flush() + except orm_exc.NoResultFound: + raise q_exc.PortNotFound(port_id=port_id, net_id=None) diff --git a/quantum/plugins/ryu/db/models_v2.py b/quantum/plugins/ryu/db/models_v2.py index 8a34ee254f..e1d25ea12d 100644 --- a/quantum/plugins/ryu/db/models_v2.py +++ b/quantum/plugins/ryu/db/models_v2.py @@ -17,7 +17,6 @@ import sqlalchemy as sa from quantum.db import model_base -from quantum.db import models_v2 class OFPServer(model_base.BASEV2): @@ -53,3 +52,28 @@ class TunnelKey(model_base.BASEV2): def __repr__(self): return "" % (self.network_id, self.tunnel_key) + + +class PortBinding(model_base.BASEV2): + """Represents Port binding to ovs ports.""" + __tablename__ = 'port_binding' + + id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) + network_id = sa.Column(sa.String(255), sa.ForeignKey("networks.id"), + nullable=False) + port_id = sa.Column(sa.String(255), sa.ForeignKey("ports.id"), unique=True, + nullable=False) + dpid = sa.Column(sa.String(255), nullable=False) + port_no = sa.Column(sa.Integer, nullable=False) + + def __init__(self, network_id, port_id, dpid, port_no): + self.network_id = network_id + self.port_id = port_id + self.dpid = dpid + self.port_no = port_no + + def __repr__(self): + return "" % (self.network_id, + self.port_id, + self.dpid, + self.port_no) diff --git a/quantum/plugins/ryu/nova/__init__.py b/quantum/plugins/ryu/nova/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/quantum/plugins/ryu/nova/firewall.py b/quantum/plugins/ryu/nova/firewall.py deleted file mode 100644 index c84f046821..0000000000 --- a/quantum/plugins/ryu/nova/firewall.py +++ /dev/null @@ -1,29 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2012 Isaku Yamahata -# -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import inspect - -from nova.virt import firewall - - -class NopFirewallDriver(firewall.FirewallDriver): - def __init__(self, *args, **kwargs): - super(NopFirewallDriver, self).__init__() - for key, _val in inspect.getmembers(self, inspect.ismethod): - if key.startswith('__') or key.endswith('__'): - continue - setattr(self, key, (lambda _self, *_args, **_kwargs: True)) diff --git a/quantum/plugins/ryu/nova/linux_net.py b/quantum/plugins/ryu/nova/linux_net.py deleted file mode 100644 index c2ea4d3726..0000000000 --- a/quantum/plugins/ryu/nova/linux_net.py +++ /dev/null @@ -1,74 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2012 Isaku Yamahata -# -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from ryu.app.client import OFPClient - -from nova import flags -from nova.network import linux_net -from nova.openstack.common import cfg -from nova.openstack.common import log as logging -from nova import utils - - -LOG = logging.getLogger(__name__) - -ryu_linux_net_opt = cfg.StrOpt('linuxnet_ovs_ryu_api_host', - default='127.0.0.1:8080', - help='Openflow Ryu REST API host:port') - -FLAGS = flags.FLAGS -FLAGS.register_opt(ryu_linux_net_opt) - - -def _get_datapath_id(bridge_name): - out, _err = utils.execute('ovs-vsctl', 'get', 'Bridge', - bridge_name, 'datapath_id', run_as_root=True) - return out.strip().strip('"') - - -def _get_port_no(dev): - out, _err = utils.execute('ovs-vsctl', 'get', 'Interface', dev, - 'ofport', run_as_root=True) - return int(out.strip()) - - -class LinuxOVSRyuInterfaceDriver(linux_net.LinuxOVSInterfaceDriver): - def __init__(self): - super(LinuxOVSRyuInterfaceDriver, self).__init__() - - LOG.debug('ryu rest host %s', FLAGS.linuxnet_ovs_ryu_api_host) - self.ryu_client = OFPClient(FLAGS.linuxnet_ovs_ryu_api_host) - self.datapath_id = _get_datapath_id( - FLAGS.linuxnet_ovs_integration_bridge) - - if linux_net.binary_name == 'nova-network': - for tables in [linux_net.iptables_manager.ipv4, - linux_net.iptables_manager.ipv6]: - tables['filter'].add_rule( - 'FORWARD', - '--in-interface gw-+ --out-interface gw-+ -j DROP') - linux_net.iptables_manager.apply() - - def plug(self, network, mac_address, gateway=True): - LOG.debug("network %s mac_adress %s gateway %s", - network, mac_address, gateway) - ret = super(LinuxOVSRyuInterfaceDriver, self).plug( - network, mac_address, gateway) - - port_no = _get_port_no(self.get_dev(network)) - self.ryu_client.create_port(network['uuid'], self.datapath_id, port_no) - return ret diff --git a/quantum/plugins/ryu/nova/vif.py b/quantum/plugins/ryu/nova/vif.py deleted file mode 100644 index 1b92463183..0000000000 --- a/quantum/plugins/ryu/nova/vif.py +++ /dev/null @@ -1,87 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2012 Isaku Yamahata -# -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import httplib - -from ryu.app.client import OFPClient - -from nova import flags -from nova.openstack.common import cfg -from nova.openstack.common import log as logging -from nova import utils -from nova.virt.libvirt import vif as libvirt_vif - - -LOG = logging.getLogger(__name__) - -ryu_libvirt_ovs_driver_opt = cfg.StrOpt('libvirt_ovs_ryu_api_host', - default='127.0.0.1:8080', - help='Openflow Ryu REST API host:port') - -FLAGS = flags.FLAGS -FLAGS.register_opt(ryu_libvirt_ovs_driver_opt) - - -def _get_datapath_id(bridge_name): - out, _err = utils.execute('ovs-vsctl', 'get', 'Bridge', - bridge_name, 'datapath_id', run_as_root=True) - return out.strip().strip('"') - - -def _get_port_no(dev): - out, _err = utils.execute('ovs-vsctl', 'get', 'Interface', dev, - 'ofport', run_as_root=True) - return int(out.strip()) - - -class LibvirtOpenVswitchOFPRyuDriver(libvirt_vif.LibvirtHybridOVSBridgeDriver): - def __init__(self, **kwargs): - super(LibvirtOpenVswitchOFPRyuDriver, self).__init__() - LOG.debug('ryu rest host %s', FLAGS.libvirt_ovs_bridge) - self.ryu_client = OFPClient(FLAGS.libvirt_ovs_ryu_api_host) - self.datapath_id = _get_datapath_id(FLAGS.libvirt_ovs_bridge) - - def _get_port_no(self, mapping): - iface_id = mapping['vif_uuid'] - _v1_name, v2_name = self.get_veth_pair_names(iface_id) - return _get_port_no(v2_name) - - def plug(self, instance, vif): - result = super(LibvirtOpenVswitchOFPRyuDriver, self).plug( - instance, vif) - network, mapping = vif - port_no = self._get_port_no(mapping) - try: - self.ryu_client.create_port(network['id'], self.datapath_id, - port_no) - except httplib.HTTPException as e: - res = e.args[0] - if res.status != httplib.CONFLICT: - raise - return result - - def unplug(self, instance, vif): - network, mapping = vif - port_no = self._get_port_no(mapping) - try: - self.ryu_client.delete_port(network['id'], - self.datapath_id, port_no) - except httplib.HTTPException as e: - res = e.args[0] - if res.status != httplib.NOT_FOUND: - raise - super(LibvirtOpenVswitchOFPRyuDriver, self).unplug(instance, vif) diff --git a/quantum/plugins/ryu/ryu_quantum_plugin.py b/quantum/plugins/ryu/ryu_quantum_plugin.py index 57303e459c..90792ea1f0 100644 --- a/quantum/plugins/ryu/ryu_quantum_plugin.py +++ b/quantum/plugins/ryu/ryu_quantum_plugin.py @@ -17,9 +17,12 @@ # @author: Isaku Yamahata from ryu.app import client +from ryu.app.client import ignore_http_not_found from ryu.app import rest_nw_id -from sqlalchemy.orm import exc as sql_exc +from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import exc as orm_exc +from quantum.common import constants as q_const from quantum.common import exceptions as q_exc from quantum.common import topics from quantum.db import api as db @@ -57,7 +60,7 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, ofp_api_host = cfg.CONF.OVS.openflow_rest_api if ofp_con_host is None or ofp_api_host is None: - raise q_exc.Invalid("invalid configuration. check ryu.ini") + raise q_exc.Invalid(_('invalid configuration. check ryu.ini')) hosts = [(ofp_con_host, ofp_service_type.CONTROLLER), (ofp_api_host, ofp_service_type.REST_API)] @@ -85,6 +88,21 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, self.client.update_network(net.id) for tun in self.tunnel_key.all_list(): self.tun_client.update_tunnel_key(tun.network_id, tun.tunnel_key) + session = db.get_session() + for port_binding in db_api_v2.port_binding_all_list(session): + network_id = port_binding.network_id + dpid = port_binding.dpid + port_no = port_binding.port_no + try: + port = session.query(models_v2.Port).filter( + models_v2.Port.id == port_binding.port_id).one() + except orm_exc.NoResultFound: + continue + except orm_exc.MultipleResultsFound: + continue + + self.client.update_port(network_id, dpid, port_no) + self.client.update_mac(network_id, dpid, port_no, port.mac_address) def _client_create_network(self, net_id, tunnel_key): self.client.create_network(net_id) @@ -144,9 +162,74 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, return [self._fields(net, fields) for net in nets] def delete_port(self, context, id, l3_port_check=True): + with context.session.begin(subtransactions=True): + port = self._get_port(context, id) + net_id = port.network_id + try: + port_binding = db_api_v2.port_binding_destroy(context.session, + port.id, net_id) + datapath_id = port_binding.dpid + port_no = port_binding.port_no + ignore_http_not_found( + lambda: self.client.delete_port(net_id, datapath_id, + port_no)) + except q_exc.PortNotFound: + pass + # if needed, check to see if this is a port owned by # and l3-router. If so, we should prevent deletion. if l3_port_check: self.prevent_l3_port_deletion(context, id) self.disassociate_floatingips(context, id) return super(RyuQuantumPluginV2, self).delete_port(context, id) + + def update_port(self, context, id, port): + p = super(RyuQuantumPluginV2, self).update_port(context, id, port) + net_id = p['network_id'] + mac_address = p['mac_address'] + + deleted = port['port'].get('deleted', False) + if deleted: + session = context.session + try: + db_api_v2.port_binding_destroy(session, id, net_id) + except q_exc.PortNotFound: + pass + db_api_v2.set_port_status(session, id, q_const.PORT_STATUS_DOWN) + return p + + datapath_id = port['port'].get('datapath_id', None) + port_no = port['port'].get('port_no', None) + if datapath_id is None or port_no is None: + LOG.debug('p %s', p) + return p + + try: + port_binding = db_api_v2.port_binding_get(id, net_id) + except orm_exc.NoResultFound: + try: + db_api_v2.port_binding_create(id, net_id, datapath_id, port_no) + except IntegrityError: + # TODO:XXX should do transaction? + return p + else: + self.client.create_port(net_id, datapath_id, port_no) + self.client.create_mac(net_id, datapath_id, port_no, + mac_address) + else: + if (port_binding.dpid != datapath_id or + port_binding.port_no != port_no): + variables = {'datapath_id': datapath_id, + 'port_no': port_no, + 'port_binding_dpid': port_binding.dpid, + 'port_binding_port_no': port_binding.port_no} + raise q_exc.InvalidInput( + error_message=_('invalid (datapath_id, port_no) ' + 'is requested' + '(%(datapath_id)s, %(port_no)s), acutal' + '(%(port_binding_dpid)s, ' + '%(port_binding_port_no)s)') % variables) + self.client.update_network(net_id) + self.client.update_port(net_id, datapath_id, port_no) + self.client.update_mac(net_id, datapath_id, port_no, mac_address) + return p