RPC support for OVS Plugin and Agent

blueprint scalable-agent-comms

This adds support for the OVS plugin.

Change-Id: I613de63f5c7f374be87520f32a2f7129d86ef109
This commit is contained in:
Gary Kotton 2012-08-06 07:45:34 -04:00
parent 319b05ff5f
commit 2fc8ff9cbf
12 changed files with 743 additions and 50 deletions

View File

@ -89,7 +89,6 @@ class OVSBridge:
def _build_flow_expr_arr(self, **kwargs):
flow_expr_arr = []
is_delete_expr = kwargs.get('delete', False)
if not is_delete_expr:
prefix = ("hard_timeout=%s,idle_timeout=%s,priority=%s" %
(kwargs.get('hard_timeout', '0'),
@ -206,3 +205,24 @@ class OVSBridge:
edge_ports.append(p)
return edge_ports
def get_vif_port_set(self):
edge_ports = set()
port_names = self.get_port_name_list()
for name in port_names:
external_ids = self.db_get_map("Interface", name, "external_ids")
if "iface-id" in external_ids and "attached-mac" in external_ids:
edge_ports.add(external_ids['iface-id'])
elif ("xs-vif-uuid" in external_ids and
"attached-mac" in external_ids):
# if this is a xenserver and iface-id is not automatically
# synced to OVS from XAPI, we grab it from XAPI directly
iface_id = self.get_xapi_iface_id(external_ids["xs-vif-uuid"])
edge_ports.add(iface_id)
return edge_ports
def get_vif_port(self, port_name):
external_ids = self.db_get_map("Interface", port_name, "external_ids")
ofport = self.db_get_val("Interface", port_name, "ofport")
return VifPort(port_name, ofport, external_ids["iface-id"],
external_ids["attached-mac"], self)

View File

@ -15,6 +15,7 @@
from quantum.common import topics
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
def create_consumers(dispatcher, prefix, topic_details):
@ -34,3 +35,35 @@ def create_consumers(dispatcher, prefix, topic_details):
connection.create_consumer(topic_name, dispatcher, fanout=True)
connection.consume_in_thread()
return connection
class PluginApi(proxy.RpcProxy):
'''Agent side of the rpc API.
API version history:
1.0 - Initial version.
'''
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
super(PluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def get_device_details(self, context, device, agent_id):
return self.call(context,
self.make_msg('get_device_details', device=device,
agent_id=agent_id),
topic=self.topic)
def update_device_down(self, context, device, agent_id):
return self.call(context,
self.make_msg('update_device_down', device=device,
agent_id=agent_id),
topic=self.topic)
def tunnel_sync(self, context, tunnel_ip):
return self.call(context,
self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip),
topic=self.topic)

View File

@ -34,14 +34,13 @@ import eventlet
import pyudev
from sqlalchemy.ext.sqlsoup import SqlSoup
from quantum.agent.rpc import create_consumers
from quantum.agent import rpc as agent_rpc
from quantum.common import config as logging_config
from quantum.common import topics
from quantum.openstack.common import cfg
from quantum.openstack.common import context
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import dispatcher
from quantum.openstack.common.rpc import proxy
from quantum.plugins.linuxbridge.common import config
from quantum.agent.linux import utils
@ -320,33 +319,6 @@ class LinuxBridge:
LOG.debug("Done deleting subinterface %s" % interface)
class PluginApi(proxy.RpcProxy):
'''Agent side of the linux bridge rpc API.
API version history:
1.0 - Initial version.
'''
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
super(PluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def get_device_details(self, context, device, agent_id):
return self.call(context,
self.make_msg('get_device_details', device=device,
agent_id=agent_id),
topic=self.topic)
def update_device_down(self, context, device, agent_id):
return self.call(context,
self.make_msg('update_device_down', device=device,
agent_id=agent_id),
topic=self.topic)
class LinuxBridgeRpcCallbacks():
# Set RPC API version to 1.0 by default.
@ -578,7 +550,7 @@ class LinuxBridgeQuantumAgentRPC:
mac = utils.get_interface_mac(physical_interface)
self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
self.topic = topics.AGENT
self.plugin_rpc = PluginApi(topics.PLUGIN)
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
# RPC network init
self.context = context.RequestContext('quantum', 'quantum',
@ -590,7 +562,8 @@ class LinuxBridgeQuantumAgentRPC:
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE]]
self.connection = create_consumers(self.dispatcher, self.topic,
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
self.udev = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(self.udev)

View File

@ -21,10 +21,10 @@ Unit Tests for linuxbridge rpc
import stubout
import unittest2
from quantum.agent import rpc as agent_rpc
from quantum.common import topics
from quantum.openstack.common import context
from quantum.openstack.common import rpc
from quantum.plugins.linuxbridge.agent import linuxbridge_quantum_agent as alb
from quantum.plugins.linuxbridge import lb_quantum_plugin as plb
@ -77,14 +77,14 @@ class rpcApiTestCase(unittest2.TestCase):
port='fake_port', vlan_id='fake_vlan_id')
def test_device_details(self):
rpcapi = alb.PluginApi(topics.PLUGIN)
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
def test_update_device_down(self):
rpcapi = alb.PluginApi(topics.PLUGIN)
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',

View File

@ -24,11 +24,18 @@ import logging
import sys
import time
import eventlet
from sqlalchemy.ext import sqlsoup
from quantum.agent import rpc as agent_rpc
from quantum.agent.linux import ovs_lib
from quantum.agent.linux import utils
from quantum.common import config as logging_config
from quantum.common import topics
from quantum.openstack.common import cfg
from quantum.openstack.common import context
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import dispatcher
from quantum.plugins.openvswitch.common import config
logging.basicConfig()
@ -120,15 +127,87 @@ class Portv2(object):
return hash(self.id)
class OVSRpcCallbacks():
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
def __init__(self, context, int_br, local_ip=None, tun_br=None):
self.context = context
self.int_br = int_br
# Tunneling variables
self.local_ip = local_ip
self.tun_br = tun_br
def network_delete(self, context, **kwargs):
LOG.debug("network_delete received")
network_id = kwargs.get('network_id')
# (TODO) garyk delete the bridge interface
LOG.debug("Delete %s", network_id)
def port_update(self, context, **kwargs):
LOG.debug("port_update received")
port = kwargs.get('port')
port_name = 'tap%s' % port['id'][0:11]
vif_port = self.int_br.get_vif_port(port_name)
if port['admin_state_up']:
vlan_id = kwargs.get('vlan_id')
# create the networking for the port
self.int_br.set_db_attribute("Port", vif_port.port_name,
"tag", str(vlan_id))
self.int_br.delete_flows(in_port=vif_port.ofport)
else:
self.int_br.clear_db_attribute("Port", vif_port.port_name, "tag")
def tunnel_update(self, context, **kwargs):
LOG.debug("tunnel_update received")
tunnel_ip = kwargs.get('tunnel_ip')
tunnel_id = kwargs.get('tunnel_id')
if tunnel_ip == self.local_ip:
return
tun_name = 'gre-%s' % tunnel_id
self.tun_br.add_tunnel_port(tun_name, tunnel_ip)
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return dispatcher.RpcDispatcher([self])
class OVSQuantumAgent(object):
def __init__(self, integ_br, root_helper, polling_interval,
reconnect_interval, target_v2_api=False):
reconnect_interval, target_v2_api, rpc):
self.root_helper = root_helper
self.setup_integration_br(integ_br)
self.polling_interval = polling_interval
self.reconnect_interval = reconnect_interval
self.target_v2_api = target_v2_api
self.rpc = rpc
if rpc:
self.setup_rpc(integ_br)
def setup_rpc(self, integ_br):
mac = utils.get_interface_mac(integ_br)
self.agent_id = '%s' % (mac.replace(":", ""))
self.topic = topics.AGENT
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
# RPC network init
self.context = context.RequestContext('quantum', 'quantum',
is_admin=False)
# Handle updates from service
self.callbacks = OVSRpcCallbacks(self.context, self.int_br)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
def port_bound(self, port, vlan_id):
self.int_br.set_db_attribute("Port", port.port_name,
@ -145,7 +224,7 @@ class OVSQuantumAgent(object):
# switch all traffic using L2 learning
self.int_br.add_flow(priority=1, actions="normal")
def daemon_loop(self, db_connection_url):
def db_loop(self, db_connection_url):
'''Main processing loop for Non-Tunneling Agent.
:param options: database information - in the event need to reconnect
@ -247,6 +326,102 @@ class OVSQuantumAgent(object):
time.sleep(self.polling_interval)
def update_ports(self, registered_ports):
ports = self.int_br.get_vif_port_set()
if ports == registered_ports:
return
added = ports - registered_ports
removed = registered_ports - ports
return {'current': ports,
'added': added,
'removed': removed}
def treat_devices_added(self, devices):
resync = False
for device in devices:
LOG.info("Port %s added", device)
try:
details = self.plugin_rpc.get_device_details(self.context,
device,
self.agent_id)
except Exception as e:
LOG.debug("Unable to get port details for %s: %s", device, e)
resync = True
continue
if 'port_id' in details:
LOG.info("Port %s updated. Details: %s", device, details)
port_name = 'tap%s' % details['port_id'][0:11]
port = self.int_br.get_vif_port(port_name)
if details['admin_state_up']:
self.port_bound(port, details['vlan_id'])
else:
self.port_unbound(port, True)
else:
LOG.debug("Device %s not defined on plugin", device)
return resync
def treat_devices_removed(self, devices):
resync = False
for device in devices:
LOG.info("Attachment %s removed", device)
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id)
except Exception as e:
LOG.debug("port_removed failed for %s: %s", device, e)
resync = True
if details['exists']:
LOG.info("Port %s updated.", device)
# Nothing to do regarding local networking
else:
LOG.debug("Device %s not defined on plugin", device)
return resync
def process_network_ports(self, port_info):
resync_a = False
resync_b = False
if 'added' in port_info:
resync_a = self.treat_devices_added(port_info['added'])
if 'removed' in port_info:
resync_b = self.treat_devices_removed(port_info['removed'])
# If one of the above opertaions fails => resync with plugin
return (resync_a | resync_b)
def rpc_loop(self):
sync = True
ports = set()
while True:
start = time.time()
if sync:
LOG.info("Agent out of sync with plugin!")
ports.clear()
sync = False
port_info = self.update_ports(ports)
# notify plugin about port deltas
if port_info:
LOG.debug("Agent loop has new devices!")
# If treat devices fails - indicates must resync with plugin
sync = self.process_network_ports(port_info)
ports = port_info['current']
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self.polling_interval):
time.sleep(self.polling_interval - elapsed)
else:
LOG.debug("Loop iteration exceeded interval (%s vs. %s)!",
self.polling_interval, elapsed)
def daemon_loop(self, db_connection_url):
if self.rpc:
self.rpc_loop()
else:
self.db_loop(db_connection_url)
class OVSQuantumTunnelAgent(object):
'''Implements OVS-based tunneling.
@ -273,7 +448,8 @@ class OVSQuantumTunnelAgent(object):
MAX_VLAN_TAG = 4094
def __init__(self, integ_br, tun_br, local_ip, root_helper,
polling_interval, reconnect_interval, target_v2_api=False):
polling_interval, reconnect_interval, target_v2_api,
rpc):
'''Constructor.
:param integ_br: name of the integration bridge.
@ -283,6 +459,7 @@ class OVSQuantumTunnelAgent(object):
:param polling_interval: interval (secs) to poll DB.
:param reconnect_internal: retry interval (secs) on DB error.
:param target_v2_api: if True use v2 api.
:param rpc: if True use RPC interface to interface with plugin.
'''
self.root_helper = root_helper
self.available_local_vlans = set(
@ -298,6 +475,30 @@ class OVSQuantumTunnelAgent(object):
self.tunnel_count = 0
self.setup_tunnel_br(tun_br)
self.target_v2_api = target_v2_api
self.rpc = rpc
if rpc:
self.setup_rpc(integ_br)
def setup_rpc(self, integ_br):
mac = utils.get_interface_mac(integ_br)
self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
self.topic = topics.AGENT
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
# RPC network init
self.context = context.RequestContext('quantum', 'quantum',
is_admin=False)
# Handle updates from service
self.callbacks = OVSRpcCallbacks(self.context, self.int_br,
self.local_ip, self.tun_br)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[config.TUNNEL, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
def provision_local_vlan(self, net_uuid, lsw_id):
'''Provisions a local VLAN.
@ -431,7 +632,7 @@ class OVSQuantumTunnelAgent(object):
except:
LOG.exception("Problem connecting to database")
def daemon_loop(self, db_connection_url):
def db_loop(self, db_connection_url):
'''Main processing loop for Tunneling Agent.
:param options: database information - in the event need to reconnect
@ -547,6 +748,123 @@ class OVSQuantumTunnelAgent(object):
LOG.exception("Main-loop Exception:")
self.rollback_until_success(db)
def update_ports(self, registered_ports):
ports = self.int_br.get_vif_port_set()
if ports == registered_ports:
return
added = ports - registered_ports
removed = registered_ports - ports
return {'current': ports,
'added': added,
'removed': removed}
def treat_devices_added(self, devices):
resync = False
for device in devices:
LOG.info("Port %s added", device)
try:
details = self.plugin_rpc.get_device_details(self.context,
device,
self.agent_id)
except Exception as e:
LOG.debug("Unable to get port details for %s: %s", device, e)
resync = True
continue
if 'port_id' in details:
LOG.info("Port %s updated. Details: %s", device, details)
port_name = 'tap%s' % details['port_id'][0:11]
port = self.int_br.get_vif_port(port_name)
if details['admin_state_up']:
self.port_bound(port, details['network_id'],
details['vlan_id'])
else:
self.port_unbound(port, details['network_id'])
else:
LOG.debug("Device %s not defined on plugin", device)
return resync
def treat_devices_removed(self, devices):
resync = False
for device in devices:
LOG.info("Attachment %s removed", device)
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id)
except Exception as e:
LOG.debug("port_removed failed for %s: %s", device, e)
resync = True
if details['exists']:
LOG.info("Port %s updated.", device)
# Nothing to do regarding local networking
else:
LOG.debug("Device %s not defined on plugin", device)
return resync
def process_network_ports(self, port_info):
resync_a = False
resync_b = False
if 'added' in port_info:
resync_a = self.treat_devices_added(port_info['added'])
if 'removed' in port_info:
resync_b = self.treat_devices_removed(port_info['removed'])
# If one of the above opertaions fails => resync with plugin
return (resync_a | resync_b)
def tunnel_sync(self):
resync = False
try:
details = self.plugin_rpc.tunnel_sync(self.context, self.local_ip)
tunnels = details['tunnels']
for tunnel in tunnels:
if self.local_ip != tunnel['ip_address']:
tun_name = 'gre-%s' % tunnel['id']
self.tun_br.add_tunnel_port(tun_name, tunnel['ip_address'])
except Exception as e:
LOG.debug("Unable to sync tunnel IP %s: %s", self.local_ip, e)
resync = True
return resync
def rpc_loop(self):
sync = True
ports = set()
tunnel_sync = True
while True:
start = time.time()
if sync:
LOG.info("Agent out of sync with plugin!")
ports.clear()
sync = False
# Notify the plugin of tunnel IP
if tunnel_sync:
LOG.info("Agent tunnel out of sync with plugin!")
tunnel_sync = self.tunnel_sync()
port_info = self.update_ports(ports)
# notify plugin about port deltas
if port_info:
LOG.debug("Agent loop has new devices!")
# If treat devices fails - indicates must resync with plugin
sync = self.process_network_ports(port_info)
ports = port_info['current']
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self.polling_interval):
time.sleep(self.polling_interval - elapsed)
else:
LOG.debug("Loop iteration exceeded interval (%s vs. %s)!",
self.polling_interval, elapsed)
def daemon_loop(self, db_connection_url):
if self.rpc:
self.rpc_loop()
else:
self.db_loop(db_connection_url)
def main():
cfg.CONF(args=sys.argv, project='quantum')
@ -561,10 +879,15 @@ def main():
polling_interval = cfg.CONF.AGENT.polling_interval
reconnect_interval = cfg.CONF.DATABASE.reconnect_interval
root_helper = cfg.CONF.AGENT.root_helper
rpc = cfg.CONF.AGENT.rpc
# Determine API Version to use
target_v2_api = cfg.CONF.AGENT.target_v2_api
# RPC only works with v2
if rpc and not target_v2_api:
rpc = False
if enable_tunneling:
# Get parameters for OVSQuantumTunnelAgent
tun_br = cfg.CONF.OVS.tunnel_bridge
@ -572,11 +895,11 @@ def main():
local_ip = cfg.CONF.OVS.local_ip
plugin = OVSQuantumTunnelAgent(integ_br, tun_br, local_ip, root_helper,
polling_interval, reconnect_interval,
target_v2_api)
target_v2_api, rpc)
else:
# Get parameters for OVSQuantumAgent.
plugin = OVSQuantumAgent(integ_br, root_helper, polling_interval,
reconnect_interval, target_v2_api)
reconnect_interval, target_v2_api, rpc)
# Start everything.
plugin.daemon_loop(db_connection_url)
@ -584,4 +907,5 @@ def main():
sys.exit(0)
if __name__ == "__main__":
eventlet.monkey_patch()
main()

View File

@ -17,6 +17,9 @@
from quantum.openstack.common import cfg
# Topic for tunnel notifications between the plugin and agent
TUNNEL = 'tunnel'
database_opts = [
cfg.StrOpt('sql_connection', default='sqlite://'),
cfg.IntOpt('sql_max_retries', default=-1),
@ -37,6 +40,7 @@ agent_opts = [
cfg.IntOpt('polling_interval', default=2),
cfg.StrOpt('root_helper', default='sudo'),
cfg.StrOpt('log_file', default=None),
cfg.BoolOpt('rpc', default=True),
]

View File

@ -20,7 +20,9 @@ import logging
from sqlalchemy.orm import exc
from quantum.api import api_common
from quantum.common import exceptions as q_exc
from quantum.db import models_v2
import quantum.db.api as db
from quantum.openstack.common import cfg
from quantum.plugins.openvswitch import ovs_models_v2
@ -169,3 +171,62 @@ def release_vlan_id(vlan_id):
session.delete(record)
except exc.NoResultFound:
LOG.error("vlan id %s not found in release_vlan_id" % vlan_id)
def get_port(port_id):
session = db.get_session()
try:
port = session.query(models_v2.Port).filter_by(id=port_id).one()
except exc.NoResultFound:
port = None
return port
def set_port_status(port_id, status):
session = db.get_session()
try:
port = session.query(models_v2.Port).filter_by(id=port_id).one()
port['status'] = status
if status == api_common.PORT_STATUS_DOWN:
port['device_id'] = ''
session.merge(port)
session.flush()
except exc.NoResultFound:
raise q_exc.PortNotFound(port_id=port_id)
def get_tunnels():
session = db.get_session()
try:
tunnels = session.query(ovs_models_v2.TunnelInfo).all()
except exc.NoResultFound:
return []
return [{'id': tunnel.id,
'ip_address': tunnel.ip_address} for tunnel in tunnels]
def generate_tunnel_id(session):
try:
tunnels = session.query(ovs_models_v2.TunnelInfo).all()
except exc.NoResultFound:
return 0
tunnel_ids = ([tunnel['id'] for tunnel in tunnels])
if tunnel_ids:
id = max(tunnel_ids)
else:
id = 0
return id + 1
def add_tunnel(ip):
session = db.get_session()
try:
tunnel = (session.query(ovs_models_v2.TunnelInfo).
filter_by(ip_address=ip).one())
except exc.NoResultFound:
# Generate an id for the tunnel
id = generate_tunnel_id(session)
tunnel = ovs_models_v2.TunnelInfo(ip, id)
session.add(tunnel)
session.flush()
return tunnel

View File

@ -65,3 +65,18 @@ class TunnelIP(model_base.BASEV2):
def __repr__(self):
return "<TunnelIP(%s)>" % (self.ip_address)
class TunnelInfo(model_base.BASEV2):
"""Represents remote tunnel information in tunnel mode."""
__tablename__ = 'tunnel_info'
ip_address = Column(String(64), primary_key=True)
id = Column(Integer, nullable=False)
def __init__(self, ip_address, id):
self.ip_address = ip_address
self.id = id
def __repr__(self):
return "<TunnelInfo(%s,%s)>" % (self.ip_address, self.id)

View File

@ -23,19 +23,143 @@
import logging
import os
from quantum.api import api_common
from quantum.api.v2 import attributes
from quantum.common import exceptions as q_exc
from quantum.common import topics
from quantum.common.utils import find_config_file
from quantum.db import api as db
from quantum.db import db_base_plugin_v2
from quantum.db import models_v2
from quantum.openstack.common import context
from quantum.openstack.common import cfg
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import dispatcher
from quantum.openstack.common.rpc import proxy
from quantum.plugins.openvswitch.common import config
from quantum.plugins.openvswitch import ovs_db_v2
from quantum import policy
LOG = logging.getLogger("ovs_quantum_plugin")
LOG = logging.getLogger(__name__)
class OVSRpcCallbacks():
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
def __init__(self, context, notifier):
self.context = context
self.notifier = notifier
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return dispatcher.RpcDispatcher([self])
def get_device_details(self, context, **kwargs):
"""Agent requests device details"""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
LOG.debug("Device %s details requested from %s", device, agent_id)
port = ovs_db_v2.get_port(device)
if port:
vlan_id = ovs_db_v2.get_vlan(port['network_id'])
entry = {'device': device,
'vlan_id': vlan_id,
'network_id': port['network_id'],
'port_id': port['id'],
'admin_state_up': port['admin_state_up']}
# Set the port status to UP
ovs_db_v2.set_port_status(port['id'], api_common.PORT_STATUS_UP)
else:
entry = {'device': device}
LOG.debug("%s can not be found in database", device)
return entry
def update_device_down(self, context, **kwargs):
"""Device no longer exists on agent"""
# (TODO) garyk - live migration and port status
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
LOG.debug("Device %s no longer exists on %s", device, agent_id)
port = ovs_db_v2.get_port(device)
if port:
entry = {'device': device,
'exists': True}
# Set port status to DOWN
ovs_db_v2.set_port_status(port['id'], api_common.PORT_STATUS_DOWN)
else:
entry = {'device': device,
'exists': False}
LOG.debug("%s can not be found in database", device)
return entry
def tunnel_sync(self, context, **kwargs):
"""Update new tunnel.
Updates the datbase with the tunnel IP. All listening agents will also
be notified about the new tunnel IP.
"""
tunnel_ip = kwargs.get('tunnel_ip')
# Update the database with the IP
tunnel = ovs_db_v2.add_tunnel(tunnel_ip)
tunnels = ovs_db_v2.get_tunnels()
entry = dict()
entry['tunnels'] = tunnels
# Notify all other listening agents
self.notifier.tunnel_update(self.context, tunnel.ip_address,
tunnel.id)
# Return the list of tunnels IP's to the agent
return entry
class AgentNotifierApi(proxy.RpcProxy):
'''Agent side of the linux bridge rpc API.
API version history:
1.0 - Initial version.
'''
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
super(AgentNotifierApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
topics.DELETE)
self.topic_port_update = topics.get_topic_name(topic,
topics.PORT,
topics.UPDATE)
self.topic_tunnel_update = topics.get_topic_name(topic,
config.TUNNEL,
topics.UPDATE)
def network_delete(self, context, network_id):
self.fanout_cast(context,
self.make_msg('network_delete',
network_id=network_id),
topic=self.topic_network_delete)
def port_update(self, context, port, vlan_id):
self.fanout_cast(context,
self.make_msg('port_update',
port=port,
vlan_id=vlan_id),
topic=self.topic_port_update)
def tunnel_update(self, context, tunnel_ip, tunnel_id):
self.fanout_cast(context,
self.make_msg('tunnel_update',
tunnel_ip=tunnel_ip,
tunnel_id=tunnel_id),
topic=self.topic_tunnel_update)
class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
@ -67,6 +191,25 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
# update the vlan_id table based on current configuration
ovs_db_v2.update_vlan_id_pool()
self.rpc = cfg.CONF.AGENT.rpc
if cfg.CONF.AGENT.rpc and cfg.CONF.AGENT.target_v2_api:
self.setup_rpc()
if not cfg.CONF.AGENT.target_v2_api:
self.rpc = False
def setup_rpc(self):
# RPC support
self.topic = topics.PLUGIN
self.context = context.RequestContext('quantum', 'quantum',
is_admin=False)
self.conn = rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.callbacks = OVSRpcCallbacks(self.context, self.notifier)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
# TODO(rkukura) Use core mechanism for attribute authorization
# when available.
@ -114,6 +257,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
vlan_id = ovs_db_v2.get_vlan(id)
result = super(OVSQuantumPluginV2, self).delete_network(context, id)
ovs_db_v2.release_vlan_id(vlan_id)
if self.rpc:
self.notifier.network_delete(self.context, id)
return result
def get_network(self, context, id, fields=None, verbose=None):
@ -129,3 +274,14 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
self._extend_network_dict(context, net)
# TODO(rkukura): Filter on extended attributes.
return [self._fields(net, fields) for net in nets]
def update_port(self, context, id, port):
if self.rpc:
original_port = super(OVSQuantumPluginV2, self).get_port(context,
id)
port = super(OVSQuantumPluginV2, self).update_port(context, id, port)
if self.rpc:
if original_port['admin_state_up'] != port['admin_state_up']:
vlan_id = ovs_db_v2.get_vlan(port['network_id'])
self.notifier.port_update(self.context, port, vlan_id)
return port

View File

@ -0,0 +1,107 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for openvswitch rpc
"""
import stubout
import unittest2
from quantum.agent import rpc as agent_rpc
from quantum.common import topics
from quantum.openstack.common import context
from quantum.openstack.common import rpc
from quantum.plugins.openvswitch import ovs_quantum_plugin as povs
from quantum.plugins.openvswitch.common import config
class rpcApiTestCase(unittest2.TestCase):
def _test_ovs_api(self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
self.fake_args = None
self.fake_kwargs = None
def _fake_rpc_method(*args, **kwargs):
self.fake_args = args
self.fake_kwargs = kwargs
if expected_retval:
return expected_retval
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg]
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
def test_delete_network(self):
rpcapi = povs.AgentNotifierApi(topics.AGENT)
self._test_ovs_api(rpcapi,
topics.get_topic_name(topics.AGENT,
topics.NETWORK,
topics.DELETE),
'network_delete', rpc_method='fanout_cast',
network_id='fake_request_spec')
def test_port_update(self):
rpcapi = povs.AgentNotifierApi(topics.AGENT)
self._test_ovs_api(rpcapi,
topics.get_topic_name(topics.AGENT,
topics.PORT,
topics.UPDATE),
'port_update', rpc_method='fanout_cast',
port='fake_port', vlan_id='fake_vlan_id')
def test_tunnel_update(self):
rpcapi = povs.AgentNotifierApi(topics.AGENT)
self._test_ovs_api(rpcapi,
topics.get_topic_name(topics.AGENT,
config.TUNNEL,
topics.UPDATE),
'tunnel_update', rpc_method='fanout_cast',
tunnel_ip='fake_ip', tunnel_id='fake_id')
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_ovs_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_ovs_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_ovs_api(rpcapi, topics.PLUGIN,
'tunnel_sync', rpc_method='call',
tunnel_ip='fake_tunnel_ip')

View File

@ -81,7 +81,7 @@ class TunnelTest(unittest.TestCase):
b = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1',
'sudo', 2, 2)
'sudo', 2, 2, False, False)
self.mox.VerifyAll()
def testProvisionLocalVlan(self):
@ -98,7 +98,7 @@ class TunnelTest(unittest.TestCase):
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1',
'sudo', 2, 2)
'sudo', 2, 2, False, False)
a.available_local_vlans = set([LV_ID])
a.provision_local_vlan(NET_UUID, LS_ID)
self.mox.VerifyAll()
@ -112,7 +112,7 @@ class TunnelTest(unittest.TestCase):
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1',
'sudo', 2, 2)
'sudo', 2, 2, False, False)
a.available_local_vlans = set()
a.local_vlan_map[NET_UUID] = LVM
a.reclaim_local_vlan(NET_UUID, LVM)
@ -128,7 +128,7 @@ class TunnelTest(unittest.TestCase):
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1',
'sudo', 2, 2)
'sudo', 2, 2, False, False)
a.local_vlan_map[NET_UUID] = LVM
a.port_bound(VIF_PORT, NET_UUID, LS_ID)
self.mox.VerifyAll()
@ -138,7 +138,7 @@ class TunnelTest(unittest.TestCase):
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1',
'sudo', 2, 2)
'sudo', 2, 2, False, False)
a.available_local_vlans = set([LV_ID])
a.local_vlan_map[NET_UUID] = LVM
a.port_unbound(VIF_PORT, NET_UUID)
@ -155,7 +155,7 @@ class TunnelTest(unittest.TestCase):
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1',
'sudo', 2, 2)
'sudo', 2, 2, False, False)
a.available_local_vlans = set([LV_ID])
a.local_vlan_map[NET_UUID] = LVM
a.port_dead(VIF_PORT)

View File

@ -1,7 +1,7 @@
Paste
PasteDeploy==1.5.0
Routes>=1.12.3
eventlet>=0.9.12
eventlet>=0.9.17
httplib2
iso8601>=0.1.4
lxml