Improve data access method of ryu-agent

fixes bug #1110174

This patch implement rpc in ryu-agent, instead of accessing a
database directly.
Because it was not necessary to transmit information via database,
therefore the table is eliminated.

Also, I remove openflow controller stuff from a configuration file
of the Ryu plugin because it was not used anymore.

Change-Id: I5e261297c3f92c6a1ac5df229084176e84694e87
This commit is contained in:
Yoshihiro Kaneko 2013-01-30 15:50:41 +09:00
parent 8f4cbb9578
commit dfe7a5ebe0
10 changed files with 101 additions and 124 deletions

View File

@ -16,9 +16,7 @@ sql_connection = sqlite://
[OVS] [OVS]
integration_bridge = br-int integration_bridge = br-int
# openflow_controller = <host IP address of ofp controller>:<port: 6633>
# openflow_rest_api = <host IP address of ofp rest api service>:<port: 8080> # openflow_rest_api = <host IP address of ofp rest api service>:<port: 8080>
openflow_controller = 127.0.0.1:6633
openflow_rest_api = 127.0.0.1:8080 openflow_rest_api = 127.0.0.1:8080
# tunnel key range: 0 < tunnel_key_min < tunnel_key_max # tunnel key range: 0 < tunnel_key_min < tunnel_key_max

View File

@ -0,0 +1,59 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""ryu plugin update
Revision ID: 49332180ca96
Revises: 1149d7de0cfa
Create Date: 2013-01-30 07:52:58.472885
"""
# revision identifiers, used by Alembic.
revision = '49332180ca96'
down_revision = '1149d7de0cfa'
# Change to ['*'] if this migration applies to all plugins
migration_for_plugins = [
'quantum.plugins.ryu.ryu_quantum_plugin.RyuQuantumPluginV2'
]
from alembic import op
import sqlalchemy as sa
from quantum.db import migration
def upgrade(active_plugin=None, options=None):
if not migration.should_run(active_plugin, migration_for_plugins):
return
op.drop_table('ofp_server')
def downgrade(active_plugin=None, options=None):
if not migration.should_run(active_plugin, migration_for_plugins):
return
op.create_table(
'ofp_server',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('address', sa.String(length=255)),
sa.Column('host_type', sa.String(length=255)),
sa.PrimaryKeyConstraint(u'id')
)

View File

@ -28,11 +28,14 @@ import netifaces
from ryu.app import client from ryu.app import client
from ryu.app import conf_switch_key from ryu.app import conf_switch_key
from ryu.app import rest_nw_id from ryu.app import rest_nw_id
from sqlalchemy.ext.sqlsoup import SqlSoup
from quantum.agent.linux import ovs_lib from quantum.agent.linux import ovs_lib
from quantum.agent.linux.ovs_lib import VifPort from quantum.agent.linux.ovs_lib import VifPort
from quantum.agent import rpc as agent_rpc
from quantum.common import config as logging_config from quantum.common import config as logging_config
from quantum.common import exceptions as q_exc
from quantum.common import topics
from quantum import q_context
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common.cfg import NoSuchGroupError from quantum.openstack.common.cfg import NoSuchGroupError
from quantum.openstack.common.cfg import NoSuchOptError from quantum.openstack.common.cfg import NoSuchOptError
@ -40,7 +43,6 @@ from quantum.openstack.common import log
from quantum.plugins.ryu.common import config from quantum.plugins.ryu.common import config
cfg.CONF.import_opt('sql_connection', 'quantum.db.api', 'DATABASE')
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -148,29 +150,42 @@ class VifPortSet(object):
port.switch.datapath_id, port.ofport) port.switch.datapath_id, port.ofport)
class RyuPluginApi(agent_rpc.PluginApi):
def get_ofp_rest_api_addr(self, context):
LOG.debug(_("Get Ryu rest API address"))
return self.call(context,
self.make_msg('get_ofp_rest_api'),
topic=self.topic)
class OVSQuantumOFPRyuAgent(object): class OVSQuantumOFPRyuAgent(object):
def __init__(self, integ_br, ofp_rest_api_addr, def __init__(self, integ_br, tunnel_ip, ovsdb_ip, ovsdb_port,
tunnel_ip, ovsdb_ip, ovsdb_port,
root_helper): root_helper):
super(OVSQuantumOFPRyuAgent, self).__init__() super(OVSQuantumOFPRyuAgent, self).__init__()
self.int_br = None self._setup_rpc()
self.vif_ports = None self._setup_integration_br(root_helper, integ_br, tunnel_ip,
self._setup_integration_br(root_helper, integ_br, ovsdb_port, ovsdb_ip)
ofp_rest_api_addr,
tunnel_ip, ovsdb_port, ovsdb_ip) def _setup_rpc(self):
self.plugin_rpc = RyuPluginApi(topics.PLUGIN)
self.context = q_context.get_admin_context_without_session()
def _setup_integration_br(self, root_helper, integ_br, def _setup_integration_br(self, root_helper, integ_br,
ofp_rest_api_addr,
tunnel_ip, ovsdb_port, ovsdb_ip): tunnel_ip, ovsdb_port, ovsdb_ip):
self.int_br = OVSBridge(integ_br, root_helper) self.int_br = OVSBridge(integ_br, root_helper)
self.int_br.find_datapath_id() self.int_br.find_datapath_id()
ryu_rest_client = client.OFPClient(ofp_rest_api_addr) rest_api_addr = self.plugin_rpc.get_ofp_rest_api_addr(self.context)
if not rest_api_addr:
raise q_exc.Invalid(_("Ryu rest API port isn't specified"))
LOG.debug(_("Going to ofp controller mode %s"), rest_api_addr)
ryu_rest_client = client.OFPClient(rest_api_addr)
self.vif_ports = VifPortSet(self.int_br, ryu_rest_client) self.vif_ports = VifPortSet(self.int_br, ryu_rest_client)
self.vif_ports.setup() self.vif_ports.setup()
sc_client = client.SwitchConfClient(ofp_rest_api_addr) sc_client = client.SwitchConfClient(rest_api_addr)
sc_client.set_key(self.int_br.datapath_id, sc_client.set_key(self.int_br.datapath_id,
conf_switch_key.OVS_TUNNEL_ADDR, tunnel_ip) conf_switch_key.OVS_TUNNEL_ADDR, tunnel_ip)
@ -180,31 +195,6 @@ class OVSQuantumOFPRyuAgent(object):
'tcp:%s:%d' % (ovsdb_ip, ovsdb_port)) 'tcp:%s:%d' % (ovsdb_ip, ovsdb_port))
def check_ofp_rest_api_addr(db):
LOG.debug(_("Checking db"))
servers = db.ofp_server.all()
ofp_controller_addr = None
ofp_rest_api_addr = None
for serv in servers:
if serv.host_type == "REST_API":
ofp_rest_api_addr = serv.address
elif serv.host_type == "controller":
ofp_controller_addr = serv.address
else:
LOG.warn(_("Ignoring unknown server type %s"), serv)
LOG.debug(_("API %s"), ofp_rest_api_addr)
if ofp_controller_addr:
LOG.warn(_('OF controller parameter is stale %s'), ofp_controller_addr)
if not ofp_rest_api_addr:
raise RuntimeError(_("Ryu rest API port isn't specified"))
LOG.debug(_("Going to ofp controller mode %s"), ofp_rest_api_addr)
return ofp_rest_api_addr
def main(): def main():
cfg.CONF(project='quantum') cfg.CONF(project='quantum')
@ -212,13 +202,6 @@ def main():
integ_br = cfg.CONF.OVS.integration_bridge integ_br = cfg.CONF.OVS.integration_bridge
root_helper = cfg.CONF.AGENT.root_helper root_helper = cfg.CONF.AGENT.root_helper
options = {"sql_connection": cfg.CONF.DATABASE.sql_connection}
db = SqlSoup(options["sql_connection"])
LOG.info(_("Connecting to database \"%(database)s\" on %(host)s"),
{"database": db.engine.url.database,
"host": db.engine.url.host})
ofp_rest_api_addr = check_ofp_rest_api_addr(db)
tunnel_ip = _get_tunnel_ip() tunnel_ip = _get_tunnel_ip()
LOG.debug(_('tunnel_ip %s'), tunnel_ip) LOG.debug(_('tunnel_ip %s'), tunnel_ip)
@ -227,8 +210,8 @@ def main():
ovsdb_ip = _get_ovsdb_ip() ovsdb_ip = _get_ovsdb_ip()
LOG.debug(_('ovsdb_ip %s'), ovsdb_ip) LOG.debug(_('ovsdb_ip %s'), ovsdb_ip)
try: try:
OVSQuantumOFPRyuAgent(integ_br, ofp_rest_api_addr, OVSQuantumOFPRyuAgent(integ_br, tunnel_ip, ovsdb_ip, ovsdb_port,
tunnel_ip, ovsdb_ip, ovsdb_port, root_helper) root_helper)
except httplib.HTTPException, e: except httplib.HTTPException, e:
LOG.error(_("Initialization failed: %s"), e) LOG.error(_("Initialization failed: %s"), e)
sys.exit(1) sys.exit(1)

View File

@ -20,8 +20,6 @@ from quantum.openstack.common import cfg
ovs_opts = [ ovs_opts = [
cfg.StrOpt('integration_bridge', default='br-int', cfg.StrOpt('integration_bridge', default='br-int',
help=_("Integration bridge to use")), help=_("Integration bridge to use")),
cfg.StrOpt('openflow_controller', default='127.0.0.1:6633',
help=_("OpenFlow controller to connect to")),
cfg.StrOpt('openflow_rest_api', default='127.0.0.1:8080', cfg.StrOpt('openflow_rest_api', default='127.0.0.1:8080',
help=_("OpenFlow REST API location")), help=_("OpenFlow REST API location")),
cfg.IntOpt('tunnel_key_min', default=1, cfg.IntOpt('tunnel_key_min', default=1,

View File

@ -28,16 +28,6 @@ from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def set_ofp_servers(hosts):
session = db.get_session()
session.query(ryu_models_v2.OFPServer).delete()
for (host_address, host_type) in hosts:
host = ryu_models_v2.OFPServer(address=host_address,
host_type=host_type)
session.add(host)
session.flush()
def network_all_tenant_list(): def network_all_tenant_list():
session = db.get_session() session = db.get_session()
return session.query(models_v2.Network).all() return session.query(models_v2.Network).all()

View File

@ -19,20 +19,6 @@ import sqlalchemy as sa
from quantum.db import model_base from quantum.db import model_base
class OFPServer(model_base.BASEV2):
"""Openflow Server/API address."""
__tablename__ = 'ofp_server'
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
address = sa.Column(sa.String(64)) # netloc <host ip address>:<port>
host_type = sa.Column(sa.String(255)) # server type
# Controller, REST_API
def __repr__(self):
return "<OFPServer(%s,%s,%s)>" % (self.id, self.address,
self.host_type)
class TunnelKeyLast(model_base.BASEV2): class TunnelKeyLast(model_base.BASEV2):
"""Lastly allocated Tunnel key. The next key allocation will be started """Lastly allocated Tunnel key. The next key allocation will be started
from this value + 1 from this value + 1

View File

@ -1,19 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Isaku Yamahata <yamahata at valinux co jp>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Isaku Yamahata
CONTROLLER = 'controller'
REST_API = 'REST_API'

View File

@ -34,7 +34,6 @@ from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc from quantum.openstack.common import rpc
from quantum.plugins.ryu.common import config from quantum.plugins.ryu.common import config
from quantum.plugins.ryu.db import api_v2 as db_api_v2 from quantum.plugins.ryu.db import api_v2 as db_api_v2
from quantum.plugins.ryu import ofp_service_type
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -45,9 +44,16 @@ class RyuRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
RPC_API_VERSION = '1.0' RPC_API_VERSION = '1.0'
def __init__(self, ofp_rest_api_addr):
self.ofp_rest_api_addr = ofp_rest_api_addr
def create_rpc_dispatcher(self): def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher([self]) return q_rpc.PluginRpcDispatcher([self])
def get_ofp_rest_api(self, context, **kwargs):
LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
return self.ofp_rest_api_addr
class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
l3_db.L3_NAT_db_mixin): l3_db.L3_NAT_db_mixin):
@ -59,19 +65,13 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self.tunnel_key = db_api_v2.TunnelKey( self.tunnel_key = db_api_v2.TunnelKey(
cfg.CONF.OVS.tunnel_key_min, cfg.CONF.OVS.tunnel_key_max) cfg.CONF.OVS.tunnel_key_min, cfg.CONF.OVS.tunnel_key_max)
ofp_con_host = cfg.CONF.OVS.openflow_controller self.ofp_api_host = cfg.CONF.OVS.openflow_rest_api
ofp_api_host = cfg.CONF.OVS.openflow_rest_api if not self.ofp_api_host:
if ofp_con_host is None or ofp_api_host is None:
raise q_exc.Invalid(_('Invalid configuration. check ryu.ini')) raise q_exc.Invalid(_('Invalid configuration. check ryu.ini'))
hosts = [(ofp_con_host, ofp_service_type.CONTROLLER), self.client = client.OFPClient(self.ofp_api_host)
(ofp_api_host, ofp_service_type.REST_API)] self.tun_client = client.TunnelClient(self.ofp_api_host)
db_api_v2.set_ofp_servers(hosts) self.iface_client = client.QuantumIfaceClient(self.ofp_api_host)
self.client = client.OFPClient(ofp_api_host)
self.tun_client = client.TunnelClient(ofp_api_host)
self.iface_client = client.QuantumIfaceClient(ofp_api_host)
for nw_id in rest_nw_id.RESERVED_NETWORK_IDS: for nw_id in rest_nw_id.RESERVED_NETWORK_IDS:
if nw_id != rest_nw_id.NW_ID_UNKNOWN: if nw_id != rest_nw_id.NW_ID_UNKNOWN:
self.client.update_network(nw_id) self.client.update_network(nw_id)
@ -82,7 +82,7 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
def _setup_rpc(self): def _setup_rpc(self):
self.conn = rpc.create_connection(new=True) self.conn = rpc.create_connection(new=True)
self.callbacks = RyuRpcCallbacks() self.callbacks = RyuRpcCallbacks(self.ofp_api_host)
self.dispatcher = self.callbacks.create_rpc_dispatcher() self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(topics.PLUGIN, self.dispatcher, fanout=False) self.conn.create_consumer(topics.PLUGIN, self.dispatcher, fanout=False)
self.conn.consume_in_thread() self.conn.consume_in_thread()

View File

@ -31,5 +31,4 @@ class ConfigurationTest(unittest2.TestCase):
self.assertEqual(2, cfg.CONF.DATABASE.reconnect_interval) self.assertEqual(2, cfg.CONF.DATABASE.reconnect_interval)
self.assertEqual(2, cfg.CONF.AGENT.polling_interval) self.assertEqual(2, cfg.CONF.AGENT.polling_interval)
self.assertEqual('sudo', cfg.CONF.AGENT.root_helper) self.assertEqual('sudo', cfg.CONF.AGENT.root_helper)
self.assertEqual('127.0.0.1:6633', cfg.CONF.OVS.openflow_controller)
self.assertEqual('127.0.0.1:8080', cfg.CONF.OVS.openflow_rest_api) self.assertEqual('127.0.0.1:8080', cfg.CONF.OVS.openflow_rest_api)

View File

@ -24,27 +24,10 @@ from quantum.openstack.common import cfg
from quantum.plugins.ryu.common import config from quantum.plugins.ryu.common import config
from quantum.plugins.ryu.db import api_v2 as db_api_v2 from quantum.plugins.ryu.db import api_v2 as db_api_v2
from quantum.plugins.ryu.db import models_v2 as ryu_models_v2 from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
from quantum.plugins.ryu import ofp_service_type
from quantum.tests.unit import test_db_plugin as test_plugin from quantum.tests.unit import test_db_plugin as test_plugin
class RyuDBTest(test_plugin.QuantumDbPluginV2TestCase): class RyuDBTest(test_plugin.QuantumDbPluginV2TestCase):
def setUp(self):
super(RyuDBTest, self).setUp()
self.hosts = [(cfg.CONF.OVS.openflow_controller,
ofp_service_type.CONTROLLER),
(cfg.CONF.OVS.openflow_rest_api,
ofp_service_type.REST_API)]
db_api_v2.set_ofp_servers(self.hosts)
def test_ofp_server(self):
session = db.get_session()
servers = session.query(ryu_models_v2.OFPServer).all()
print servers
self.assertEqual(len(servers), 2)
for s in servers:
self.assertTrue((s.address, s.host_type) in self.hosts)
@staticmethod @staticmethod
def _tunnel_key_sort(key_list): def _tunnel_key_sort(key_list):
key_list.sort(key=operator.attrgetter('tunnel_key')) key_list.sort(key=operator.attrgetter('tunnel_key'))