plugin/ryu: add tunnel support

blueprint ryu-tunnel-support
This patch adds tunneling support to Ryu plugin.
Ryu supports gre tunneling which requires quantum ryu plugin to manage
key assignment.

Change-Id: I9f8db0913941c3da13045170e1557d333f0c68e2
Signed-off-by: Isaku Yamahata <yamahata@valinux.co.jp>
This commit is contained in:
Isaku Yamahata 2012-08-10 14:53:08 +09:00
parent d64a8469dd
commit 1a85476b6a
9 changed files with 257 additions and 57 deletions

View File

@ -12,6 +12,11 @@ integration_bridge = br-int
openflow_controller = 127.0.0.1:6633 openflow_controller = 127.0.0.1:6633
openflow_rest_api = 127.0.0.1:8080 openflow_rest_api = 127.0.0.1:8080
# tunnel key range: 0 < tunnel_key_min < tunnel_key_max
# VLAN: 12bits, GRE, VXLAN: 24bits
# tunnel_key_min = 1
# tunnel_key_max = 0xffffff
[AGENT] [AGENT]
# Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real # Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real
# root filter facility. # root filter facility.

View File

@ -20,7 +20,6 @@
# under the License. # under the License.
# @author: Isaku Yamahata # @author: Isaku Yamahata
import logging as LOG
import sys import sys
import time import time
@ -33,6 +32,7 @@ from quantum.agent.linux.ovs_lib import VifPort
from quantum.common import config as logging_config from quantum.common import config as logging_config
from quantum.common import constants from quantum.common import constants
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common import log as LOG
from quantum.plugins.ryu.common import config from quantum.plugins.ryu.common import config

View File

@ -27,6 +27,8 @@ ovs_opts = [
cfg.StrOpt('integration_bridge', default='br-int'), cfg.StrOpt('integration_bridge', default='br-int'),
cfg.StrOpt('openflow_controller', default='127.0.0.1:6633'), cfg.StrOpt('openflow_controller', default='127.0.0.1:6633'),
cfg.StrOpt('openflow_rest_api', default='127.0.0.1:8080'), cfg.StrOpt('openflow_rest_api', default='127.0.0.1:8080'),
cfg.IntOpt('tunnel_key_min', default=1),
cfg.IntOpt('tunnel_key_max', default=0xffffff)
] ]
agent_opts = [ agent_opts = [

View File

@ -14,20 +14,171 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from sqlalchemy import exc as sa_exc
from sqlalchemy import func
from sqlalchemy.orm import exc as orm_exc
from quantum.common import exceptions as q_exc
import quantum.db.api as db import quantum.db.api as db
from quantum.db.models_v2 import Network from quantum.db import models_v2
from quantum.plugins.ryu.db import models_v2 from quantum.openstack.common import log as logging
from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
LOG = logging.getLogger(__name__)
def set_ofp_servers(hosts): def set_ofp_servers(hosts):
session = db.get_session() session = db.get_session()
session.query(models_v2.OFPServer).delete() session.query(ryu_models_v2.OFPServer).delete()
for (host_address, host_type) in hosts: for (host_address, host_type) in hosts:
host = models_v2.OFPServer(host_address, host_type) host = ryu_models_v2.OFPServer(address=host_address,
host_type=host_type)
session.add(host) session.add(host)
session.flush() session.flush()
def network_all_tenant_list(): def network_all_tenant_list():
session = db.get_session() session = db.get_session()
return session.query(Network).all() return session.query(models_v2.Network).all()
class TunnelKey(object):
# VLAN: 12 bits
# GRE, VXLAN: 24bits
# TODO(yamahata): STT: 64bits
_KEY_MIN_HARD = 1
_KEY_MAX_HARD = 0xffffffff
def __init__(self, key_min=_KEY_MIN_HARD, key_max=_KEY_MAX_HARD):
self.key_min = key_min
self.key_max = key_max
if (key_min < self._KEY_MIN_HARD or key_max > self._KEY_MAX_HARD or
key_min > key_max):
raise ValueError('Invalid tunnel key options '
'tunnel_key_min: %d tunnel_key_max: %d. '
'Using default value' % (key_min, key_min))
def _last_key(self, session):
try:
return session.query(ryu_models_v2.TunnelKeyLast).one()
except orm_exc.MultipleResultsFound:
max_key = session.query(
func.max(ryu_models_v2.TunnelKeyLast.last_key))
if max_key > self.key_max:
max_key = self.key_min
session.query(ryu_models_v2.TunnelKeyLast).delete()
last_key = ryu_models_v2.TunnelKeyLast(last_key=max_key)
except orm_exc.NoResultFound:
last_key = ryu_models_v2.TunnelKeyLast(last_key=self.key_min)
session.add(last_key)
session.flush()
return session.query(ryu_models_v2.TunnelKeyLast).one()
def _find_key(self, session, last_key):
"""
Try to find unused tunnel key in TunnelKey table starting
from last_key + 1.
When all keys are used, raise sqlalchemy.orm.exc.NoResultFound
"""
# key 0 is used for special meanings. So don't allocate 0.
# sqlite doesn't support
# '(select order by limit) union all (select order by limit) '
# 'order by limit'
# So do it manually
# new_key = session.query("new_key").from_statement(
# # If last_key + 1 isn't used, it's the result
# 'SELECT new_key '
# 'FROM (SELECT :last_key + 1 AS new_key) q1 '
# 'WHERE NOT EXISTS '
# '(SELECT 1 FROM tunnelkeys WHERE tunnel_key = :last_key + 1) '
#
# 'UNION ALL '
#
# # if last_key + 1 used,
# # find the least unused key from last_key + 1
# '(SELECT t.tunnel_key + 1 AS new_key '
# 'FROM tunnelkeys t '
# 'WHERE NOT EXISTS '
# '(SELECT 1 FROM tunnelkeys ti '
# ' WHERE ti.tunnel_key = t.tunnel_key + 1) '
# 'AND t.tunnel_key >= :last_key '
# 'ORDER BY new_key LIMIT 1) '
#
# 'ORDER BY new_key LIMIT 1'
# ).params(last_key=last_key).one()
try:
new_key = session.query("new_key").from_statement(
# If last_key + 1 isn't used, it's the result
'SELECT new_key '
'FROM (SELECT :last_key + 1 AS new_key) q1 '
'WHERE NOT EXISTS '
'(SELECT 1 FROM tunnelkeys WHERE tunnel_key = :last_key + 1) '
).params(last_key=last_key).one()
except orm_exc.NoResultFound:
new_key = session.query("new_key").from_statement(
# if last_key + 1 used,
# find the least unused key from last_key + 1
'(SELECT t.tunnel_key + 1 AS new_key '
'FROM tunnelkeys t '
'WHERE NOT EXISTS '
'(SELECT 1 FROM tunnelkeys ti '
' WHERE ti.tunnel_key = t.tunnel_key + 1) '
'AND t.tunnel_key >= :last_key '
'ORDER BY new_key LIMIT 1) '
).params(last_key=last_key).one()
new_key = new_key[0] # the result is tuple.
LOG.debug("last_key %s new_key %s", last_key, new_key)
if new_key > self.key_max:
LOG.debug("no key found")
raise orm_exc.NoResultFound()
return new_key
def _allocate(self, session, network_id):
last_key = self._last_key(session)
try:
new_key = self._find_key(session, last_key.last_key)
except orm_exc.NoResultFound:
new_key = self._find_key(session, self.key_min)
tunnel_key = ryu_models_v2.TunnelKey(network_id=network_id,
tunnel_key=new_key)
last_key.last_key = new_key
session.add(tunnel_key)
return new_key
_TRANSACTION_RETRY_MAX = 16
def allocate(self, session, network_id):
count = 0
while True:
session.begin(subtransactions=True)
try:
new_key = self._allocate(session, network_id)
session.commit()
break
except sa_exc.SQLAlchemyError:
session.rollback()
count += 1
if count > self._TRANSACTION_RETRY_MAX:
# if this happens too often, increase _TRANSACTION_RETRY_MAX
LOG.warn("Transaction retry reaches to %d. "
"abandan to allocate tunnel key." % count)
raise q_exc.ResourceExhausted()
return new_key
def delete(self, session, network_id):
session.query(ryu_models_v2.TunnelKey).filter_by(
network_id=network_id).delete()
session.flush()
def all_list(self):
session = db.get_session()
return session.query(ryu_models_v2.TunnelKey).all()

View File

@ -14,24 +14,42 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from sqlalchemy import Column, Integer, String import sqlalchemy as sa
from quantum.db import model_base
from quantum.db import models_v2 from quantum.db import models_v2
class OFPServer(models_v2.model_base.BASEV2): class OFPServer(model_base.BASEV2):
"""Openflow Server/API address""" """Openflow Server/API address."""
__tablename__ = 'ofp_server' __tablename__ = 'ofp_server'
id = Column(Integer, primary_key=True, autoincrement=True) id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
address = Column(String(255)) # netloc <host ip address>:<port> address = sa.Column(sa.String(64)) # netloc <host ip address>:<port>
host_type = Column(String(255)) # server type host_type = sa.Column(sa.String(255)) # server type
# Controller, REST_API # Controller, REST_API
def __init__(self, address, host_type):
self.address = address
self.host_type = host_type
def __repr__(self): def __repr__(self):
return "<OFPServer(%s,%s,%s)>" % (self.id, self.address, return "<OFPServer(%s,%s,%s)>" % (self.id, self.address,
self.host_type) self.host_type)
class TunnelKeyLast(model_base.BASEV2):
"""Lastly allocated Tunnel key. The next key allocation will be started
from this value + 1
"""
last_key = sa.Column(sa.Integer, primary_key=True)
def __repr__(self):
return "<TunnelKeyLast(%x)>" % self.last_key
class TunnelKey(model_base.BASEV2):
"""Netowrk ID <-> tunnel key mapping."""
network_id = sa.Column(sa.String(36), sa.ForeignKey("networks.id"),
nullable=False)
tunnel_key = sa.Column(sa.Integer, primary_key=True,
nullable=False, autoincrement=False)
def __repr__(self):
return "<TunnelKey(%s,%x)>" % (self.network_id, self.tunnel_key)

View File

@ -16,10 +16,9 @@
# under the License. # under the License.
# @author: Isaku Yamahata # @author: Isaku Yamahata
import logging
from ryu.app import client from ryu.app import client
from ryu.app import rest_nw_id from ryu.app import rest_nw_id
from sqlalchemy.orm import exc as sql_exc
from quantum.common import exceptions as q_exc from quantum.common import exceptions as q_exc
from quantum.common import topics from quantum.common import topics
@ -29,12 +28,14 @@ from quantum.db.dhcp_rpc_base import DhcpRpcCallbackMixin
from quantum.db import l3_db from quantum.db import l3_db
from quantum.db import models_v2 from quantum.db import models_v2
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import dispatcher from quantum.openstack.common.rpc import dispatcher
from quantum.plugins.ryu.common import config from quantum.plugins.ryu.common import config
from quantum.plugins.ryu.db import api_v2 as db_api_v2 from quantum.plugins.ryu.db import api_v2 as db_api_v2
from quantum.plugins.ryu import ofp_service_type from quantum.plugins.ryu import ofp_service_type
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -50,6 +51,8 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
options.update({"reconnect_interval": reconnect_interval}) options.update({"reconnect_interval": reconnect_interval})
db.configure_db(options) db.configure_db(options)
self.tunnel_key = db_api_v2.TunnelKey(
cfg.CONF.OVS.tunnel_key_min, cfg.CONF.OVS.tunnel_key_max)
ofp_con_host = cfg.CONF.OVS.openflow_controller ofp_con_host = cfg.CONF.OVS.openflow_controller
ofp_api_host = cfg.CONF.OVS.openflow_rest_api ofp_api_host = cfg.CONF.OVS.openflow_rest_api
@ -61,7 +64,10 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
db_api_v2.set_ofp_servers(hosts) db_api_v2.set_ofp_servers(hosts)
self.client = client.OFPClient(ofp_api_host) self.client = client.OFPClient(ofp_api_host)
self.client.update_network(rest_nw_id.NW_ID_EXTERNAL) self.tun_client = client.TunnelClient(ofp_api_host)
for nw_id in rest_nw_id.RESERVED_NETWORK_IDS:
if nw_id != rest_nw_id.NW_ID_UNKNOWN:
self.client.update_network(nw_id)
self._setup_rpc() self._setup_rpc()
# register known all network list on startup # register known all network list on startup
@ -75,18 +81,36 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self.conn.consume_in_thread() self.conn.consume_in_thread()
def _create_all_tenant_network(self): def _create_all_tenant_network(self):
networks = db_api_v2.network_all_tenant_list() for net in db_api_v2.network_all_tenant_list():
for net in networks:
self.client.update_network(net.id) self.client.update_network(net.id)
for tun in self.tunnel_key.all_list():
self.tun_client.update_tunnel_key(tun.network_id, tun.tunnel_key)
def _client_create_network(self, net_id, tunnel_key):
self.client.create_network(net_id)
self.tun_client.create_tunnel_key(net_id, tunnel_key)
def _client_delete_network(self, net_id):
client.ignore_http_not_found(
lambda: self.client.delete_network(net_id))
client.ignore_http_not_found(
lambda: self.tun_client.delete_tunnel_key(net_id))
def create_network(self, context, network): def create_network(self, context, network):
session = context.session session = context.session
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
net = super(RyuQuantumPluginV2, self).create_network(context, net = super(RyuQuantumPluginV2, self).create_network(context,
network) network)
self.client.create_network(net['id'])
self._process_l3_create(context, network['network'], net['id']) self._process_l3_create(context, network['network'], net['id'])
self._extend_network_dict_l3(context, net) self._extend_network_dict_l3(context, net)
tunnel_key = self.tunnel_key.allocate(session, net['id'])
try:
self._client_create_network(net['id'], tunnel_key)
except:
self._client_delete_network(net['id'])
raise
return net return net
def update_network(self, context, id, network): def update_network(self, context, id, network):
@ -99,10 +123,11 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
return net return net
def delete_network(self, context, id): def delete_network(self, context, id):
self._client_delete_network(id)
session = context.session session = context.session
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
self.tunnel_key.delete(session, id)
super(RyuQuantumPluginV2, self).delete_network(context, id) super(RyuQuantumPluginV2, self).delete_network(context, id)
self.client.delete_network(id)
def get_network(self, context, id, fields=None): def get_network(self, context, id, fields=None):
net = super(RyuQuantumPluginV2, self).get_network(context, id, None) net = super(RyuQuantumPluginV2, self).get_network(context, id, None)

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os import operator
import unittest2 import unittest2
from quantum.db import api as db from quantum.db import api as db
@ -52,3 +52,26 @@ class RyuDBTest(unittest2.TestCase):
self.assertEqual(len(servers), 2) self.assertEqual(len(servers), 2)
for s in servers: for s in servers:
self.assertTrue((s.address, s.host_type) in self.hosts) self.assertTrue((s.address, s.host_type) in self.hosts)
@staticmethod
def _tunnel_key_sort(key_list):
key_list.sort(key=operator.attrgetter('tunnel_key'))
return [(key.network_id, key.tunnel_key) for key in key_list]
def test_key_allocation(self):
tunnel_key = db_api_v2.TunnelKey()
session = db.get_session()
network_id0 = u'network-id-0'
key0 = tunnel_key.allocate(session, network_id0)
network_id1 = u'network-id-1'
key1 = tunnel_key.allocate(session, network_id1)
key_list = tunnel_key.all_list()
self.assertEqual(len(key_list), 2)
expected_list = [(network_id0, key0), (network_id1, key1)]
self.assertEqual(self._tunnel_key_sort(key_list), expected_list)
tunnel_key.delete(session, network_id0)
key_list = tunnel_key.all_list()
self.assertEqual(self._tunnel_key_sort(key_list),
[(network_id1, key1)])

View File

@ -28,7 +28,15 @@ class RyuPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
ryu_app_client = ryu_app_mod.client ryu_app_client = ryu_app_mod.client
rest_nw_id = ryu_app_mod.rest_nw_id rest_nw_id = ryu_app_mod.rest_nw_id
rest_nw_id.NW_ID_EXTERNAL = '__NW_ID_EXTERNAL__' rest_nw_id.NW_ID_EXTERNAL = '__NW_ID_EXTERNAL__'
rest_nw_id.NW_ID_RESERVED = '__NW_ID_RESERVED__'
rest_nw_id.NW_ID_VPORT_GRE = '__NW_ID_VPORT_GRE__'
rest_nw_id.NW_ID_UNKNOWN = '__NW_ID_UNKNOWN__' rest_nw_id.NW_ID_UNKNOWN = '__NW_ID_UNKNOWN__'
rest_nw_id.RESERVED_NETWORK_IDS = [
rest_nw_id.NW_ID_EXTERNAL,
rest_nw_id.NW_ID_RESERVED,
rest_nw_id.NW_ID_VPORT_GRE,
rest_nw_id.NW_ID_UNKNOWN,
]
return mock.patch.dict('sys.modules', return mock.patch.dict('sys.modules',
{'ryu': ryu_mod, {'ryu': ryu_mod,
'ryu.app': ryu_app_mod, 'ryu.app': ryu_app_mod,

View File

@ -1,32 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012 Isaku Yamahata <yamahata at private email ne jp>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
def patch_fake_ryu_client():
ryu_mod = mock.Mock()
ryu_app_mod = ryu_mod.app
ryu_app_client = ryu_app_mod.client
rest_nw_id = ryu_app_mod.rest_nw_id
rest_nw_id.NW_ID_EXTERNAL = '__NW_ID_EXTERNAL__'
rest_nw_id.NW_ID_UNKNOWN = '__NW_ID_UNKNOWN__'
return mock.patch.dict('sys.modules',
{'ryu': ryu_mod,
'ryu.app': ryu_app_mod,
'ryu.app.client': ryu_app_client,
'ryu.app.rest_nw_id': rest_nw_id})