diff --git a/etc/quantum/plugins/ryu/ryu.ini b/etc/quantum/plugins/ryu/ryu.ini index 42e5525de7..cb376a12a5 100644 --- a/etc/quantum/plugins/ryu/ryu.ini +++ b/etc/quantum/plugins/ryu/ryu.ini @@ -12,6 +12,11 @@ integration_bridge = br-int openflow_controller = 127.0.0.1:6633 openflow_rest_api = 127.0.0.1:8080 +# tunnel key range: 0 < tunnel_key_min < tunnel_key_max +# VLAN: 12bits, GRE, VXLAN: 24bits +# tunnel_key_min = 1 +# tunnel_key_max = 0xffffff + [AGENT] # Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real # root filter facility. diff --git a/quantum/plugins/ryu/agent/ryu_quantum_agent.py b/quantum/plugins/ryu/agent/ryu_quantum_agent.py index 2a9d6e0945..593c53910d 100755 --- a/quantum/plugins/ryu/agent/ryu_quantum_agent.py +++ b/quantum/plugins/ryu/agent/ryu_quantum_agent.py @@ -20,7 +20,6 @@ # under the License. # @author: Isaku Yamahata -import logging as LOG import sys import time @@ -33,6 +32,7 @@ from quantum.agent.linux.ovs_lib import VifPort from quantum.common import config as logging_config from quantum.common import constants from quantum.openstack.common import cfg +from quantum.openstack.common import log as LOG from quantum.plugins.ryu.common import config diff --git a/quantum/plugins/ryu/common/config.py b/quantum/plugins/ryu/common/config.py index 280560d67b..8d771e6da9 100644 --- a/quantum/plugins/ryu/common/config.py +++ b/quantum/plugins/ryu/common/config.py @@ -27,6 +27,8 @@ ovs_opts = [ cfg.StrOpt('integration_bridge', default='br-int'), cfg.StrOpt('openflow_controller', default='127.0.0.1:6633'), cfg.StrOpt('openflow_rest_api', default='127.0.0.1:8080'), + cfg.IntOpt('tunnel_key_min', default=1), + cfg.IntOpt('tunnel_key_max', default=0xffffff) ] agent_opts = [ diff --git a/quantum/plugins/ryu/db/api_v2.py b/quantum/plugins/ryu/db/api_v2.py index 0bd65eb73c..b61416c855 100644 --- a/quantum/plugins/ryu/db/api_v2.py +++ b/quantum/plugins/ryu/db/api_v2.py @@ -14,20 +14,171 @@ # License for the specific language governing permissions and limitations # under the License. +from sqlalchemy import exc as sa_exc +from sqlalchemy import func +from sqlalchemy.orm import exc as orm_exc + +from quantum.common import exceptions as q_exc import quantum.db.api as db -from quantum.db.models_v2 import Network -from quantum.plugins.ryu.db import models_v2 +from quantum.db import models_v2 +from quantum.openstack.common import log as logging +from quantum.plugins.ryu.db import models_v2 as ryu_models_v2 + + +LOG = logging.getLogger(__name__) def set_ofp_servers(hosts): session = db.get_session() - session.query(models_v2.OFPServer).delete() + session.query(ryu_models_v2.OFPServer).delete() for (host_address, host_type) in hosts: - host = models_v2.OFPServer(host_address, host_type) + host = ryu_models_v2.OFPServer(address=host_address, + host_type=host_type) session.add(host) session.flush() def network_all_tenant_list(): session = db.get_session() - return session.query(Network).all() + return session.query(models_v2.Network).all() + + +class TunnelKey(object): + # VLAN: 12 bits + # GRE, VXLAN: 24bits + # TODO(yamahata): STT: 64bits + _KEY_MIN_HARD = 1 + _KEY_MAX_HARD = 0xffffffff + + def __init__(self, key_min=_KEY_MIN_HARD, key_max=_KEY_MAX_HARD): + self.key_min = key_min + self.key_max = key_max + + if (key_min < self._KEY_MIN_HARD or key_max > self._KEY_MAX_HARD or + key_min > key_max): + raise ValueError('Invalid tunnel key options ' + 'tunnel_key_min: %d tunnel_key_max: %d. ' + 'Using default value' % (key_min, key_min)) + + def _last_key(self, session): + try: + return session.query(ryu_models_v2.TunnelKeyLast).one() + except orm_exc.MultipleResultsFound: + max_key = session.query( + func.max(ryu_models_v2.TunnelKeyLast.last_key)) + if max_key > self.key_max: + max_key = self.key_min + + session.query(ryu_models_v2.TunnelKeyLast).delete() + last_key = ryu_models_v2.TunnelKeyLast(last_key=max_key) + except orm_exc.NoResultFound: + last_key = ryu_models_v2.TunnelKeyLast(last_key=self.key_min) + + session.add(last_key) + session.flush() + return session.query(ryu_models_v2.TunnelKeyLast).one() + + def _find_key(self, session, last_key): + """ + Try to find unused tunnel key in TunnelKey table starting + from last_key + 1. + When all keys are used, raise sqlalchemy.orm.exc.NoResultFound + """ + # key 0 is used for special meanings. So don't allocate 0. + + # sqlite doesn't support + # '(select order by limit) union all (select order by limit) ' + # 'order by limit' + # So do it manually + # new_key = session.query("new_key").from_statement( + # # If last_key + 1 isn't used, it's the result + # 'SELECT new_key ' + # 'FROM (SELECT :last_key + 1 AS new_key) q1 ' + # 'WHERE NOT EXISTS ' + # '(SELECT 1 FROM tunnelkeys WHERE tunnel_key = :last_key + 1) ' + # + # 'UNION ALL ' + # + # # if last_key + 1 used, + # # find the least unused key from last_key + 1 + # '(SELECT t.tunnel_key + 1 AS new_key ' + # 'FROM tunnelkeys t ' + # 'WHERE NOT EXISTS ' + # '(SELECT 1 FROM tunnelkeys ti ' + # ' WHERE ti.tunnel_key = t.tunnel_key + 1) ' + # 'AND t.tunnel_key >= :last_key ' + # 'ORDER BY new_key LIMIT 1) ' + # + # 'ORDER BY new_key LIMIT 1' + # ).params(last_key=last_key).one() + try: + new_key = session.query("new_key").from_statement( + # If last_key + 1 isn't used, it's the result + 'SELECT new_key ' + 'FROM (SELECT :last_key + 1 AS new_key) q1 ' + 'WHERE NOT EXISTS ' + '(SELECT 1 FROM tunnelkeys WHERE tunnel_key = :last_key + 1) ' + ).params(last_key=last_key).one() + except orm_exc.NoResultFound: + new_key = session.query("new_key").from_statement( + # if last_key + 1 used, + # find the least unused key from last_key + 1 + '(SELECT t.tunnel_key + 1 AS new_key ' + 'FROM tunnelkeys t ' + 'WHERE NOT EXISTS ' + '(SELECT 1 FROM tunnelkeys ti ' + ' WHERE ti.tunnel_key = t.tunnel_key + 1) ' + 'AND t.tunnel_key >= :last_key ' + 'ORDER BY new_key LIMIT 1) ' + ).params(last_key=last_key).one() + + new_key = new_key[0] # the result is tuple. + LOG.debug("last_key %s new_key %s", last_key, new_key) + if new_key > self.key_max: + LOG.debug("no key found") + raise orm_exc.NoResultFound() + return new_key + + def _allocate(self, session, network_id): + last_key = self._last_key(session) + try: + new_key = self._find_key(session, last_key.last_key) + except orm_exc.NoResultFound: + new_key = self._find_key(session, self.key_min) + + tunnel_key = ryu_models_v2.TunnelKey(network_id=network_id, + tunnel_key=new_key) + last_key.last_key = new_key + session.add(tunnel_key) + return new_key + + _TRANSACTION_RETRY_MAX = 16 + + def allocate(self, session, network_id): + count = 0 + while True: + session.begin(subtransactions=True) + try: + new_key = self._allocate(session, network_id) + session.commit() + break + except sa_exc.SQLAlchemyError: + session.rollback() + + count += 1 + if count > self._TRANSACTION_RETRY_MAX: + # if this happens too often, increase _TRANSACTION_RETRY_MAX + LOG.warn("Transaction retry reaches to %d. " + "abandan to allocate tunnel key." % count) + raise q_exc.ResourceExhausted() + + return new_key + + def delete(self, session, network_id): + session.query(ryu_models_v2.TunnelKey).filter_by( + network_id=network_id).delete() + session.flush() + + def all_list(self): + session = db.get_session() + return session.query(ryu_models_v2.TunnelKey).all() diff --git a/quantum/plugins/ryu/db/models_v2.py b/quantum/plugins/ryu/db/models_v2.py index ce0a8301ba..8a34ee254f 100644 --- a/quantum/plugins/ryu/db/models_v2.py +++ b/quantum/plugins/ryu/db/models_v2.py @@ -14,24 +14,42 @@ # License for the specific language governing permissions and limitations # under the License. -from sqlalchemy import Column, Integer, String +import sqlalchemy as sa +from quantum.db import model_base from quantum.db import models_v2 -class OFPServer(models_v2.model_base.BASEV2): - """Openflow Server/API address""" +class OFPServer(model_base.BASEV2): + """Openflow Server/API address.""" __tablename__ = 'ofp_server' - id = Column(Integer, primary_key=True, autoincrement=True) - address = Column(String(255)) # netloc : - host_type = Column(String(255)) # server type - # Controller, REST_API - - def __init__(self, address, host_type): - self.address = address - self.host_type = host_type + id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) + address = sa.Column(sa.String(64)) # netloc : + host_type = sa.Column(sa.String(255)) # server type + # Controller, REST_API def __repr__(self): return "" % (self.id, self.address, self.host_type) + + +class TunnelKeyLast(model_base.BASEV2): + """Lastly allocated Tunnel key. The next key allocation will be started + from this value + 1 + """ + last_key = sa.Column(sa.Integer, primary_key=True) + + def __repr__(self): + return "" % self.last_key + + +class TunnelKey(model_base.BASEV2): + """Netowrk ID <-> tunnel key mapping.""" + network_id = sa.Column(sa.String(36), sa.ForeignKey("networks.id"), + nullable=False) + tunnel_key = sa.Column(sa.Integer, primary_key=True, + nullable=False, autoincrement=False) + + def __repr__(self): + return "" % (self.network_id, self.tunnel_key) diff --git a/quantum/plugins/ryu/ryu_quantum_plugin.py b/quantum/plugins/ryu/ryu_quantum_plugin.py index 9f95c3b78f..57303e459c 100644 --- a/quantum/plugins/ryu/ryu_quantum_plugin.py +++ b/quantum/plugins/ryu/ryu_quantum_plugin.py @@ -16,10 +16,9 @@ # under the License. # @author: Isaku Yamahata -import logging - from ryu.app import client from ryu.app import rest_nw_id +from sqlalchemy.orm import exc as sql_exc from quantum.common import exceptions as q_exc from quantum.common import topics @@ -29,12 +28,14 @@ from quantum.db.dhcp_rpc_base import DhcpRpcCallbackMixin from quantum.db import l3_db from quantum.db import models_v2 from quantum.openstack.common import cfg +from quantum.openstack.common import log as logging from quantum.openstack.common import rpc from quantum.openstack.common.rpc import dispatcher from quantum.plugins.ryu.common import config from quantum.plugins.ryu.db import api_v2 as db_api_v2 from quantum.plugins.ryu import ofp_service_type + LOG = logging.getLogger(__name__) @@ -50,6 +51,8 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, options.update({"reconnect_interval": reconnect_interval}) db.configure_db(options) + self.tunnel_key = db_api_v2.TunnelKey( + cfg.CONF.OVS.tunnel_key_min, cfg.CONF.OVS.tunnel_key_max) ofp_con_host = cfg.CONF.OVS.openflow_controller ofp_api_host = cfg.CONF.OVS.openflow_rest_api @@ -61,7 +64,10 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, db_api_v2.set_ofp_servers(hosts) self.client = client.OFPClient(ofp_api_host) - self.client.update_network(rest_nw_id.NW_ID_EXTERNAL) + self.tun_client = client.TunnelClient(ofp_api_host) + for nw_id in rest_nw_id.RESERVED_NETWORK_IDS: + if nw_id != rest_nw_id.NW_ID_UNKNOWN: + self.client.update_network(nw_id) self._setup_rpc() # register known all network list on startup @@ -75,18 +81,36 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, self.conn.consume_in_thread() def _create_all_tenant_network(self): - networks = db_api_v2.network_all_tenant_list() - for net in networks: + for net in db_api_v2.network_all_tenant_list(): self.client.update_network(net.id) + for tun in self.tunnel_key.all_list(): + self.tun_client.update_tunnel_key(tun.network_id, tun.tunnel_key) + + def _client_create_network(self, net_id, tunnel_key): + self.client.create_network(net_id) + self.tun_client.create_tunnel_key(net_id, tunnel_key) + + def _client_delete_network(self, net_id): + client.ignore_http_not_found( + lambda: self.client.delete_network(net_id)) + client.ignore_http_not_found( + lambda: self.tun_client.delete_tunnel_key(net_id)) def create_network(self, context, network): session = context.session with session.begin(subtransactions=True): net = super(RyuQuantumPluginV2, self).create_network(context, network) - self.client.create_network(net['id']) self._process_l3_create(context, network['network'], net['id']) self._extend_network_dict_l3(context, net) + + tunnel_key = self.tunnel_key.allocate(session, net['id']) + try: + self._client_create_network(net['id'], tunnel_key) + except: + self._client_delete_network(net['id']) + raise + return net def update_network(self, context, id, network): @@ -99,10 +123,11 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, return net def delete_network(self, context, id): + self._client_delete_network(id) session = context.session with session.begin(subtransactions=True): + self.tunnel_key.delete(session, id) super(RyuQuantumPluginV2, self).delete_network(context, id) - self.client.delete_network(id) def get_network(self, context, id, fields=None): net = super(RyuQuantumPluginV2, self).get_network(context, id, None) diff --git a/quantum/tests/unit/ryu/test_ryu_db.py b/quantum/tests/unit/ryu/test_ryu_db.py index aaf9d7b4c4..f09f95830f 100644 --- a/quantum/tests/unit/ryu/test_ryu_db.py +++ b/quantum/tests/unit/ryu/test_ryu_db.py @@ -15,7 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. -import os +import operator import unittest2 from quantum.db import api as db @@ -52,3 +52,26 @@ class RyuDBTest(unittest2.TestCase): self.assertEqual(len(servers), 2) for s in servers: self.assertTrue((s.address, s.host_type) in self.hosts) + + @staticmethod + def _tunnel_key_sort(key_list): + key_list.sort(key=operator.attrgetter('tunnel_key')) + return [(key.network_id, key.tunnel_key) for key in key_list] + + def test_key_allocation(self): + tunnel_key = db_api_v2.TunnelKey() + session = db.get_session() + network_id0 = u'network-id-0' + key0 = tunnel_key.allocate(session, network_id0) + network_id1 = u'network-id-1' + key1 = tunnel_key.allocate(session, network_id1) + key_list = tunnel_key.all_list() + self.assertEqual(len(key_list), 2) + + expected_list = [(network_id0, key0), (network_id1, key1)] + self.assertEqual(self._tunnel_key_sort(key_list), expected_list) + + tunnel_key.delete(session, network_id0) + key_list = tunnel_key.all_list() + self.assertEqual(self._tunnel_key_sort(key_list), + [(network_id1, key1)]) diff --git a/quantum/tests/unit/ryu/test_ryu_plugin.py b/quantum/tests/unit/ryu/test_ryu_plugin.py index fb508aa34a..366b2fe1fe 100644 --- a/quantum/tests/unit/ryu/test_ryu_plugin.py +++ b/quantum/tests/unit/ryu/test_ryu_plugin.py @@ -28,7 +28,15 @@ class RyuPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase): ryu_app_client = ryu_app_mod.client rest_nw_id = ryu_app_mod.rest_nw_id rest_nw_id.NW_ID_EXTERNAL = '__NW_ID_EXTERNAL__' + rest_nw_id.NW_ID_RESERVED = '__NW_ID_RESERVED__' + rest_nw_id.NW_ID_VPORT_GRE = '__NW_ID_VPORT_GRE__' rest_nw_id.NW_ID_UNKNOWN = '__NW_ID_UNKNOWN__' + rest_nw_id.RESERVED_NETWORK_IDS = [ + rest_nw_id.NW_ID_EXTERNAL, + rest_nw_id.NW_ID_RESERVED, + rest_nw_id.NW_ID_VPORT_GRE, + rest_nw_id.NW_ID_UNKNOWN, + ] return mock.patch.dict('sys.modules', {'ryu': ryu_mod, 'ryu.app': ryu_app_mod, diff --git a/quantum/tests/unit/ryu/utils.py b/quantum/tests/unit/ryu/utils.py deleted file mode 100644 index eaa5541ebe..0000000000 --- a/quantum/tests/unit/ryu/utils.py +++ /dev/null @@ -1,32 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright 2012 Isaku Yamahata -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock - - -def patch_fake_ryu_client(): - ryu_mod = mock.Mock() - ryu_app_mod = ryu_mod.app - ryu_app_client = ryu_app_mod.client - rest_nw_id = ryu_app_mod.rest_nw_id - rest_nw_id.NW_ID_EXTERNAL = '__NW_ID_EXTERNAL__' - rest_nw_id.NW_ID_UNKNOWN = '__NW_ID_UNKNOWN__' - return mock.patch.dict('sys.modules', - {'ryu': ryu_mod, - 'ryu.app': ryu_app_mod, - 'ryu.app.client': ryu_app_client, - 'ryu.app.rest_nw_id': rest_nw_id})