From 2fc8ff9cbf581a130616bf2f0b5c3dc7e8fde84b Mon Sep 17 00:00:00 2001 From: Gary Kotton Date: Mon, 6 Aug 2012 07:45:34 -0400 Subject: [PATCH] RPC support for OVS Plugin and Agent blueprint scalable-agent-comms This adds support for the OVS plugin. Change-Id: I613de63f5c7f374be87520f32a2f7129d86ef109 --- quantum/agent/linux/ovs_lib.py | 22 +- quantum/agent/rpc.py | 33 ++ .../agent/linuxbridge_quantum_agent.py | 37 +- .../linuxbridge/tests/unit/test_rpcapi.py | 6 +- .../openvswitch/agent/ovs_quantum_agent.py | 336 +++++++++++++++++- quantum/plugins/openvswitch/common/config.py | 4 + quantum/plugins/openvswitch/ovs_db_v2.py | 61 ++++ quantum/plugins/openvswitch/ovs_models_v2.py | 15 + .../plugins/openvswitch/ovs_quantum_plugin.py | 158 +++++++- .../openvswitch/tests/unit/test_rpcapi.py | 107 ++++++ .../openvswitch/tests/unit/test_tunnel.py | 12 +- tools/pip-requires | 2 +- 12 files changed, 743 insertions(+), 50 deletions(-) create mode 100644 quantum/plugins/openvswitch/tests/unit/test_rpcapi.py diff --git a/quantum/agent/linux/ovs_lib.py b/quantum/agent/linux/ovs_lib.py index 204c981572..67f06580e2 100644 --- a/quantum/agent/linux/ovs_lib.py +++ b/quantum/agent/linux/ovs_lib.py @@ -89,7 +89,6 @@ class OVSBridge: def _build_flow_expr_arr(self, **kwargs): flow_expr_arr = [] is_delete_expr = kwargs.get('delete', False) - if not is_delete_expr: prefix = ("hard_timeout=%s,idle_timeout=%s,priority=%s" % (kwargs.get('hard_timeout', '0'), @@ -206,3 +205,24 @@ class OVSBridge: edge_ports.append(p) return edge_ports + + def get_vif_port_set(self): + edge_ports = set() + port_names = self.get_port_name_list() + for name in port_names: + external_ids = self.db_get_map("Interface", name, "external_ids") + if "iface-id" in external_ids and "attached-mac" in external_ids: + edge_ports.add(external_ids['iface-id']) + elif ("xs-vif-uuid" in external_ids and + "attached-mac" in external_ids): + # if this is a xenserver and iface-id is not automatically + # synced to OVS from XAPI, we grab it from XAPI directly + iface_id = self.get_xapi_iface_id(external_ids["xs-vif-uuid"]) + edge_ports.add(iface_id) + return edge_ports + + def get_vif_port(self, port_name): + external_ids = self.db_get_map("Interface", port_name, "external_ids") + ofport = self.db_get_val("Interface", port_name, "ofport") + return VifPort(port_name, ofport, external_ids["iface-id"], + external_ids["attached-mac"], self) diff --git a/quantum/agent/rpc.py b/quantum/agent/rpc.py index 7eb85f10f4..7e7fe791f0 100644 --- a/quantum/agent/rpc.py +++ b/quantum/agent/rpc.py @@ -15,6 +15,7 @@ from quantum.common import topics from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import proxy def create_consumers(dispatcher, prefix, topic_details): @@ -34,3 +35,35 @@ def create_consumers(dispatcher, prefix, topic_details): connection.create_consumer(topic_name, dispatcher, fanout=True) connection.consume_in_thread() return connection + + +class PluginApi(proxy.RpcProxy): + '''Agent side of the rpc API. + + API version history: + 1.0 - Initial version. + + ''' + + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, topic): + super(PluginApi, self).__init__( + topic=topic, default_version=self.BASE_RPC_API_VERSION) + + def get_device_details(self, context, device, agent_id): + return self.call(context, + self.make_msg('get_device_details', device=device, + agent_id=agent_id), + topic=self.topic) + + def update_device_down(self, context, device, agent_id): + return self.call(context, + self.make_msg('update_device_down', device=device, + agent_id=agent_id), + topic=self.topic) + + def tunnel_sync(self, context, tunnel_ip): + return self.call(context, + self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip), + topic=self.topic) diff --git a/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py b/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py index 1acae609e0..2679a6474a 100755 --- a/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py +++ b/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py @@ -34,14 +34,13 @@ import eventlet import pyudev from sqlalchemy.ext.sqlsoup import SqlSoup -from quantum.agent.rpc import create_consumers +from quantum.agent import rpc as agent_rpc from quantum.common import config as logging_config from quantum.common import topics from quantum.openstack.common import cfg from quantum.openstack.common import context from quantum.openstack.common import rpc from quantum.openstack.common.rpc import dispatcher -from quantum.openstack.common.rpc import proxy from quantum.plugins.linuxbridge.common import config from quantum.agent.linux import utils @@ -320,33 +319,6 @@ class LinuxBridge: LOG.debug("Done deleting subinterface %s" % interface) -class PluginApi(proxy.RpcProxy): - '''Agent side of the linux bridge rpc API. - - API version history: - 1.0 - Initial version. - - ''' - - BASE_RPC_API_VERSION = '1.0' - - def __init__(self, topic): - super(PluginApi, self).__init__( - topic=topic, default_version=self.BASE_RPC_API_VERSION) - - def get_device_details(self, context, device, agent_id): - return self.call(context, - self.make_msg('get_device_details', device=device, - agent_id=agent_id), - topic=self.topic) - - def update_device_down(self, context, device, agent_id): - return self.call(context, - self.make_msg('update_device_down', device=device, - agent_id=agent_id), - topic=self.topic) - - class LinuxBridgeRpcCallbacks(): # Set RPC API version to 1.0 by default. @@ -578,7 +550,7 @@ class LinuxBridgeQuantumAgentRPC: mac = utils.get_interface_mac(physical_interface) self.agent_id = '%s%s' % ('lb', (mac.replace(":", ""))) self.topic = topics.AGENT - self.plugin_rpc = PluginApi(topics.PLUGIN) + self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN) # RPC network init self.context = context.RequestContext('quantum', 'quantum', @@ -590,8 +562,9 @@ class LinuxBridgeQuantumAgentRPC: # Define the listening consumers for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE]] - self.connection = create_consumers(self.dispatcher, self.topic, - consumers) + self.connection = agent_rpc.create_consumers(self.dispatcher, + self.topic, + consumers) self.udev = pyudev.Context() monitor = pyudev.Monitor.from_netlink(self.udev) monitor.filter_by('net') diff --git a/quantum/plugins/linuxbridge/tests/unit/test_rpcapi.py b/quantum/plugins/linuxbridge/tests/unit/test_rpcapi.py index 22213c5f2d..1fc3f6fd6a 100644 --- a/quantum/plugins/linuxbridge/tests/unit/test_rpcapi.py +++ b/quantum/plugins/linuxbridge/tests/unit/test_rpcapi.py @@ -21,10 +21,10 @@ Unit Tests for linuxbridge rpc import stubout import unittest2 +from quantum.agent import rpc as agent_rpc from quantum.common import topics from quantum.openstack.common import context from quantum.openstack.common import rpc -from quantum.plugins.linuxbridge.agent import linuxbridge_quantum_agent as alb from quantum.plugins.linuxbridge import lb_quantum_plugin as plb @@ -77,14 +77,14 @@ class rpcApiTestCase(unittest2.TestCase): port='fake_port', vlan_id='fake_vlan_id') def test_device_details(self): - rpcapi = alb.PluginApi(topics.PLUGIN) + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) self._test_lb_api(rpcapi, topics.PLUGIN, 'get_device_details', rpc_method='call', device='fake_device', agent_id='fake_agent_id') def test_update_device_down(self): - rpcapi = alb.PluginApi(topics.PLUGIN) + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) self._test_lb_api(rpcapi, topics.PLUGIN, 'update_device_down', rpc_method='call', device='fake_device', diff --git a/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py b/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py index 6a239a1486..cb4b473c5c 100755 --- a/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py +++ b/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py @@ -24,11 +24,18 @@ import logging import sys import time +import eventlet from sqlalchemy.ext import sqlsoup +from quantum.agent import rpc as agent_rpc from quantum.agent.linux import ovs_lib +from quantum.agent.linux import utils from quantum.common import config as logging_config +from quantum.common import topics from quantum.openstack.common import cfg +from quantum.openstack.common import context +from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import dispatcher from quantum.plugins.openvswitch.common import config logging.basicConfig() @@ -120,15 +127,87 @@ class Portv2(object): return hash(self.id) +class OVSRpcCallbacks(): + + # Set RPC API version to 1.0 by default. + RPC_API_VERSION = '1.0' + + def __init__(self, context, int_br, local_ip=None, tun_br=None): + self.context = context + self.int_br = int_br + # Tunneling variables + self.local_ip = local_ip + self.tun_br = tun_br + + def network_delete(self, context, **kwargs): + LOG.debug("network_delete received") + network_id = kwargs.get('network_id') + # (TODO) garyk delete the bridge interface + LOG.debug("Delete %s", network_id) + + def port_update(self, context, **kwargs): + LOG.debug("port_update received") + port = kwargs.get('port') + port_name = 'tap%s' % port['id'][0:11] + vif_port = self.int_br.get_vif_port(port_name) + if port['admin_state_up']: + vlan_id = kwargs.get('vlan_id') + # create the networking for the port + self.int_br.set_db_attribute("Port", vif_port.port_name, + "tag", str(vlan_id)) + self.int_br.delete_flows(in_port=vif_port.ofport) + else: + self.int_br.clear_db_attribute("Port", vif_port.port_name, "tag") + + def tunnel_update(self, context, **kwargs): + LOG.debug("tunnel_update received") + tunnel_ip = kwargs.get('tunnel_ip') + tunnel_id = kwargs.get('tunnel_id') + if tunnel_ip == self.local_ip: + return + tun_name = 'gre-%s' % tunnel_id + self.tun_br.add_tunnel_port(tun_name, tunnel_ip) + + def create_rpc_dispatcher(self): + '''Get the rpc dispatcher for this manager. + + If a manager would like to set an rpc API version, or support more than + one class as the target of rpc messages, override this method. + ''' + return dispatcher.RpcDispatcher([self]) + + class OVSQuantumAgent(object): def __init__(self, integ_br, root_helper, polling_interval, - reconnect_interval, target_v2_api=False): + reconnect_interval, target_v2_api, rpc): self.root_helper = root_helper self.setup_integration_br(integ_br) self.polling_interval = polling_interval self.reconnect_interval = reconnect_interval self.target_v2_api = target_v2_api + self.rpc = rpc + if rpc: + self.setup_rpc(integ_br) + + def setup_rpc(self, integ_br): + mac = utils.get_interface_mac(integ_br) + self.agent_id = '%s' % (mac.replace(":", "")) + self.topic = topics.AGENT + self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN) + + # RPC network init + self.context = context.RequestContext('quantum', 'quantum', + is_admin=False) + # Handle updates from service + self.callbacks = OVSRpcCallbacks(self.context, self.int_br) + self.dispatcher = self.callbacks.create_rpc_dispatcher() + # Define the listening consumers for the agent + consumers = [[topics.PORT, topics.UPDATE], + [topics.NETWORK, topics.DELETE]] + self.connection = agent_rpc.create_consumers(self.dispatcher, + self.topic, + consumers) def port_bound(self, port, vlan_id): self.int_br.set_db_attribute("Port", port.port_name, @@ -145,7 +224,7 @@ class OVSQuantumAgent(object): # switch all traffic using L2 learning self.int_br.add_flow(priority=1, actions="normal") - def daemon_loop(self, db_connection_url): + def db_loop(self, db_connection_url): '''Main processing loop for Non-Tunneling Agent. :param options: database information - in the event need to reconnect @@ -247,6 +326,102 @@ class OVSQuantumAgent(object): time.sleep(self.polling_interval) + def update_ports(self, registered_ports): + ports = self.int_br.get_vif_port_set() + if ports == registered_ports: + return + added = ports - registered_ports + removed = registered_ports - ports + return {'current': ports, + 'added': added, + 'removed': removed} + + def treat_devices_added(self, devices): + resync = False + for device in devices: + LOG.info("Port %s added", device) + try: + details = self.plugin_rpc.get_device_details(self.context, + device, + self.agent_id) + except Exception as e: + LOG.debug("Unable to get port details for %s: %s", device, e) + resync = True + continue + if 'port_id' in details: + LOG.info("Port %s updated. Details: %s", device, details) + port_name = 'tap%s' % details['port_id'][0:11] + port = self.int_br.get_vif_port(port_name) + if details['admin_state_up']: + self.port_bound(port, details['vlan_id']) + else: + self.port_unbound(port, True) + else: + LOG.debug("Device %s not defined on plugin", device) + return resync + + def treat_devices_removed(self, devices): + resync = False + for device in devices: + LOG.info("Attachment %s removed", device) + try: + details = self.plugin_rpc.update_device_down(self.context, + device, + self.agent_id) + except Exception as e: + LOG.debug("port_removed failed for %s: %s", device, e) + resync = True + if details['exists']: + LOG.info("Port %s updated.", device) + # Nothing to do regarding local networking + else: + LOG.debug("Device %s not defined on plugin", device) + return resync + + def process_network_ports(self, port_info): + resync_a = False + resync_b = False + if 'added' in port_info: + resync_a = self.treat_devices_added(port_info['added']) + if 'removed' in port_info: + resync_b = self.treat_devices_removed(port_info['removed']) + # If one of the above opertaions fails => resync with plugin + return (resync_a | resync_b) + + def rpc_loop(self): + sync = True + ports = set() + + while True: + start = time.time() + if sync: + LOG.info("Agent out of sync with plugin!") + ports.clear() + sync = False + + port_info = self.update_ports(ports) + + # notify plugin about port deltas + if port_info: + LOG.debug("Agent loop has new devices!") + # If treat devices fails - indicates must resync with plugin + sync = self.process_network_ports(port_info) + ports = port_info['current'] + + # sleep till end of polling interval + elapsed = (time.time() - start) + if (elapsed < self.polling_interval): + time.sleep(self.polling_interval - elapsed) + else: + LOG.debug("Loop iteration exceeded interval (%s vs. %s)!", + self.polling_interval, elapsed) + + def daemon_loop(self, db_connection_url): + if self.rpc: + self.rpc_loop() + else: + self.db_loop(db_connection_url) + class OVSQuantumTunnelAgent(object): '''Implements OVS-based tunneling. @@ -273,7 +448,8 @@ class OVSQuantumTunnelAgent(object): MAX_VLAN_TAG = 4094 def __init__(self, integ_br, tun_br, local_ip, root_helper, - polling_interval, reconnect_interval, target_v2_api=False): + polling_interval, reconnect_interval, target_v2_api, + rpc): '''Constructor. :param integ_br: name of the integration bridge. @@ -283,6 +459,7 @@ class OVSQuantumTunnelAgent(object): :param polling_interval: interval (secs) to poll DB. :param reconnect_internal: retry interval (secs) on DB error. :param target_v2_api: if True use v2 api. + :param rpc: if True use RPC interface to interface with plugin. ''' self.root_helper = root_helper self.available_local_vlans = set( @@ -298,6 +475,30 @@ class OVSQuantumTunnelAgent(object): self.tunnel_count = 0 self.setup_tunnel_br(tun_br) self.target_v2_api = target_v2_api + self.rpc = rpc + if rpc: + self.setup_rpc(integ_br) + + def setup_rpc(self, integ_br): + mac = utils.get_interface_mac(integ_br) + self.agent_id = '%s%s' % ('ovs', (mac.replace(":", ""))) + self.topic = topics.AGENT + self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN) + + # RPC network init + self.context = context.RequestContext('quantum', 'quantum', + is_admin=False) + # Handle updates from service + self.callbacks = OVSRpcCallbacks(self.context, self.int_br, + self.local_ip, self.tun_br) + self.dispatcher = self.callbacks.create_rpc_dispatcher() + # Define the listening consumers for the agent + consumers = [[topics.PORT, topics.UPDATE], + [topics.NETWORK, topics.DELETE], + [config.TUNNEL, topics.UPDATE]] + self.connection = agent_rpc.create_consumers(self.dispatcher, + self.topic, + consumers) def provision_local_vlan(self, net_uuid, lsw_id): '''Provisions a local VLAN. @@ -431,7 +632,7 @@ class OVSQuantumTunnelAgent(object): except: LOG.exception("Problem connecting to database") - def daemon_loop(self, db_connection_url): + def db_loop(self, db_connection_url): '''Main processing loop for Tunneling Agent. :param options: database information - in the event need to reconnect @@ -547,6 +748,123 @@ class OVSQuantumTunnelAgent(object): LOG.exception("Main-loop Exception:") self.rollback_until_success(db) + def update_ports(self, registered_ports): + ports = self.int_br.get_vif_port_set() + if ports == registered_ports: + return + added = ports - registered_ports + removed = registered_ports - ports + return {'current': ports, + 'added': added, + 'removed': removed} + + def treat_devices_added(self, devices): + resync = False + for device in devices: + LOG.info("Port %s added", device) + try: + details = self.plugin_rpc.get_device_details(self.context, + device, + self.agent_id) + except Exception as e: + LOG.debug("Unable to get port details for %s: %s", device, e) + resync = True + continue + if 'port_id' in details: + LOG.info("Port %s updated. Details: %s", device, details) + port_name = 'tap%s' % details['port_id'][0:11] + port = self.int_br.get_vif_port(port_name) + if details['admin_state_up']: + self.port_bound(port, details['network_id'], + details['vlan_id']) + else: + self.port_unbound(port, details['network_id']) + else: + LOG.debug("Device %s not defined on plugin", device) + return resync + + def treat_devices_removed(self, devices): + resync = False + for device in devices: + LOG.info("Attachment %s removed", device) + try: + details = self.plugin_rpc.update_device_down(self.context, + device, + self.agent_id) + except Exception as e: + LOG.debug("port_removed failed for %s: %s", device, e) + resync = True + if details['exists']: + LOG.info("Port %s updated.", device) + # Nothing to do regarding local networking + else: + LOG.debug("Device %s not defined on plugin", device) + return resync + + def process_network_ports(self, port_info): + resync_a = False + resync_b = False + if 'added' in port_info: + resync_a = self.treat_devices_added(port_info['added']) + if 'removed' in port_info: + resync_b = self.treat_devices_removed(port_info['removed']) + # If one of the above opertaions fails => resync with plugin + return (resync_a | resync_b) + + def tunnel_sync(self): + resync = False + try: + details = self.plugin_rpc.tunnel_sync(self.context, self.local_ip) + tunnels = details['tunnels'] + for tunnel in tunnels: + if self.local_ip != tunnel['ip_address']: + tun_name = 'gre-%s' % tunnel['id'] + self.tun_br.add_tunnel_port(tun_name, tunnel['ip_address']) + except Exception as e: + LOG.debug("Unable to sync tunnel IP %s: %s", self.local_ip, e) + resync = True + return resync + + def rpc_loop(self): + sync = True + ports = set() + tunnel_sync = True + + while True: + start = time.time() + if sync: + LOG.info("Agent out of sync with plugin!") + ports.clear() + sync = False + + # Notify the plugin of tunnel IP + if tunnel_sync: + LOG.info("Agent tunnel out of sync with plugin!") + tunnel_sync = self.tunnel_sync() + + port_info = self.update_ports(ports) + + # notify plugin about port deltas + if port_info: + LOG.debug("Agent loop has new devices!") + # If treat devices fails - indicates must resync with plugin + sync = self.process_network_ports(port_info) + ports = port_info['current'] + + # sleep till end of polling interval + elapsed = (time.time() - start) + if (elapsed < self.polling_interval): + time.sleep(self.polling_interval - elapsed) + else: + LOG.debug("Loop iteration exceeded interval (%s vs. %s)!", + self.polling_interval, elapsed) + + def daemon_loop(self, db_connection_url): + if self.rpc: + self.rpc_loop() + else: + self.db_loop(db_connection_url) + def main(): cfg.CONF(args=sys.argv, project='quantum') @@ -561,10 +879,15 @@ def main(): polling_interval = cfg.CONF.AGENT.polling_interval reconnect_interval = cfg.CONF.DATABASE.reconnect_interval root_helper = cfg.CONF.AGENT.root_helper + rpc = cfg.CONF.AGENT.rpc # Determine API Version to use target_v2_api = cfg.CONF.AGENT.target_v2_api + # RPC only works with v2 + if rpc and not target_v2_api: + rpc = False + if enable_tunneling: # Get parameters for OVSQuantumTunnelAgent tun_br = cfg.CONF.OVS.tunnel_bridge @@ -572,11 +895,11 @@ def main(): local_ip = cfg.CONF.OVS.local_ip plugin = OVSQuantumTunnelAgent(integ_br, tun_br, local_ip, root_helper, polling_interval, reconnect_interval, - target_v2_api) + target_v2_api, rpc) else: # Get parameters for OVSQuantumAgent. plugin = OVSQuantumAgent(integ_br, root_helper, polling_interval, - reconnect_interval, target_v2_api) + reconnect_interval, target_v2_api, rpc) # Start everything. plugin.daemon_loop(db_connection_url) @@ -584,4 +907,5 @@ def main(): sys.exit(0) if __name__ == "__main__": + eventlet.monkey_patch() main() diff --git a/quantum/plugins/openvswitch/common/config.py b/quantum/plugins/openvswitch/common/config.py index e11ac94c7f..36b787bbbb 100644 --- a/quantum/plugins/openvswitch/common/config.py +++ b/quantum/plugins/openvswitch/common/config.py @@ -17,6 +17,9 @@ from quantum.openstack.common import cfg +# Topic for tunnel notifications between the plugin and agent +TUNNEL = 'tunnel' + database_opts = [ cfg.StrOpt('sql_connection', default='sqlite://'), cfg.IntOpt('sql_max_retries', default=-1), @@ -37,6 +40,7 @@ agent_opts = [ cfg.IntOpt('polling_interval', default=2), cfg.StrOpt('root_helper', default='sudo'), cfg.StrOpt('log_file', default=None), + cfg.BoolOpt('rpc', default=True), ] diff --git a/quantum/plugins/openvswitch/ovs_db_v2.py b/quantum/plugins/openvswitch/ovs_db_v2.py index 45a391165a..7298e439d7 100644 --- a/quantum/plugins/openvswitch/ovs_db_v2.py +++ b/quantum/plugins/openvswitch/ovs_db_v2.py @@ -20,7 +20,9 @@ import logging from sqlalchemy.orm import exc +from quantum.api import api_common from quantum.common import exceptions as q_exc +from quantum.db import models_v2 import quantum.db.api as db from quantum.openstack.common import cfg from quantum.plugins.openvswitch import ovs_models_v2 @@ -169,3 +171,62 @@ def release_vlan_id(vlan_id): session.delete(record) except exc.NoResultFound: LOG.error("vlan id %s not found in release_vlan_id" % vlan_id) + + +def get_port(port_id): + session = db.get_session() + try: + port = session.query(models_v2.Port).filter_by(id=port_id).one() + except exc.NoResultFound: + port = None + return port + + +def set_port_status(port_id, status): + session = db.get_session() + try: + port = session.query(models_v2.Port).filter_by(id=port_id).one() + port['status'] = status + if status == api_common.PORT_STATUS_DOWN: + port['device_id'] = '' + session.merge(port) + session.flush() + except exc.NoResultFound: + raise q_exc.PortNotFound(port_id=port_id) + + +def get_tunnels(): + session = db.get_session() + try: + tunnels = session.query(ovs_models_v2.TunnelInfo).all() + except exc.NoResultFound: + return [] + return [{'id': tunnel.id, + 'ip_address': tunnel.ip_address} for tunnel in tunnels] + + +def generate_tunnel_id(session): + try: + tunnels = session.query(ovs_models_v2.TunnelInfo).all() + except exc.NoResultFound: + return 0 + tunnel_ids = ([tunnel['id'] for tunnel in tunnels]) + if tunnel_ids: + id = max(tunnel_ids) + else: + id = 0 + return id + 1 + + +def add_tunnel(ip): + session = db.get_session() + try: + tunnel = (session.query(ovs_models_v2.TunnelInfo). + filter_by(ip_address=ip).one()) + except exc.NoResultFound: + # Generate an id for the tunnel + id = generate_tunnel_id(session) + tunnel = ovs_models_v2.TunnelInfo(ip, id) + session.add(tunnel) + session.flush() + return tunnel diff --git a/quantum/plugins/openvswitch/ovs_models_v2.py b/quantum/plugins/openvswitch/ovs_models_v2.py index 755dc8ee20..aa427a4198 100644 --- a/quantum/plugins/openvswitch/ovs_models_v2.py +++ b/quantum/plugins/openvswitch/ovs_models_v2.py @@ -65,3 +65,18 @@ class TunnelIP(model_base.BASEV2): def __repr__(self): return "" % (self.ip_address) + + +class TunnelInfo(model_base.BASEV2): + """Represents remote tunnel information in tunnel mode.""" + __tablename__ = 'tunnel_info' + + ip_address = Column(String(64), primary_key=True) + id = Column(Integer, nullable=False) + + def __init__(self, ip_address, id): + self.ip_address = ip_address + self.id = id + + def __repr__(self): + return "" % (self.ip_address, self.id) diff --git a/quantum/plugins/openvswitch/ovs_quantum_plugin.py b/quantum/plugins/openvswitch/ovs_quantum_plugin.py index aa24cbdcaa..068f4adc4d 100644 --- a/quantum/plugins/openvswitch/ovs_quantum_plugin.py +++ b/quantum/plugins/openvswitch/ovs_quantum_plugin.py @@ -23,19 +23,143 @@ import logging import os +from quantum.api import api_common from quantum.api.v2 import attributes from quantum.common import exceptions as q_exc +from quantum.common import topics from quantum.common.utils import find_config_file from quantum.db import api as db from quantum.db import db_base_plugin_v2 from quantum.db import models_v2 +from quantum.openstack.common import context from quantum.openstack.common import cfg +from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import dispatcher +from quantum.openstack.common.rpc import proxy from quantum.plugins.openvswitch.common import config from quantum.plugins.openvswitch import ovs_db_v2 from quantum import policy -LOG = logging.getLogger("ovs_quantum_plugin") +LOG = logging.getLogger(__name__) + + +class OVSRpcCallbacks(): + + # Set RPC API version to 1.0 by default. + RPC_API_VERSION = '1.0' + + def __init__(self, context, notifier): + self.context = context + self.notifier = notifier + + def create_rpc_dispatcher(self): + '''Get the rpc dispatcher for this manager. + + If a manager would like to set an rpc API version, or support more than + one class as the target of rpc messages, override this method. + ''' + return dispatcher.RpcDispatcher([self]) + + def get_device_details(self, context, **kwargs): + """Agent requests device details""" + agent_id = kwargs.get('agent_id') + device = kwargs.get('device') + LOG.debug("Device %s details requested from %s", device, agent_id) + port = ovs_db_v2.get_port(device) + if port: + vlan_id = ovs_db_v2.get_vlan(port['network_id']) + entry = {'device': device, + 'vlan_id': vlan_id, + 'network_id': port['network_id'], + 'port_id': port['id'], + 'admin_state_up': port['admin_state_up']} + # Set the port status to UP + ovs_db_v2.set_port_status(port['id'], api_common.PORT_STATUS_UP) + else: + entry = {'device': device} + LOG.debug("%s can not be found in database", device) + return entry + + def update_device_down(self, context, **kwargs): + """Device no longer exists on agent""" + # (TODO) garyk - live migration and port status + agent_id = kwargs.get('agent_id') + device = kwargs.get('device') + LOG.debug("Device %s no longer exists on %s", device, agent_id) + port = ovs_db_v2.get_port(device) + if port: + entry = {'device': device, + 'exists': True} + # Set port status to DOWN + ovs_db_v2.set_port_status(port['id'], api_common.PORT_STATUS_DOWN) + else: + entry = {'device': device, + 'exists': False} + LOG.debug("%s can not be found in database", device) + return entry + + def tunnel_sync(self, context, **kwargs): + """Update new tunnel. + + Updates the datbase with the tunnel IP. All listening agents will also + be notified about the new tunnel IP. + """ + tunnel_ip = kwargs.get('tunnel_ip') + # Update the database with the IP + tunnel = ovs_db_v2.add_tunnel(tunnel_ip) + tunnels = ovs_db_v2.get_tunnels() + entry = dict() + entry['tunnels'] = tunnels + # Notify all other listening agents + self.notifier.tunnel_update(self.context, tunnel.ip_address, + tunnel.id) + # Return the list of tunnels IP's to the agent + return entry + + +class AgentNotifierApi(proxy.RpcProxy): + '''Agent side of the linux bridge rpc API. + + API version history: + 1.0 - Initial version. + + ''' + + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, topic): + super(AgentNotifierApi, self).__init__( + topic=topic, default_version=self.BASE_RPC_API_VERSION) + self.topic_network_delete = topics.get_topic_name(topic, + topics.NETWORK, + topics.DELETE) + self.topic_port_update = topics.get_topic_name(topic, + topics.PORT, + topics.UPDATE) + self.topic_tunnel_update = topics.get_topic_name(topic, + config.TUNNEL, + topics.UPDATE) + + def network_delete(self, context, network_id): + self.fanout_cast(context, + self.make_msg('network_delete', + network_id=network_id), + topic=self.topic_network_delete) + + def port_update(self, context, port, vlan_id): + self.fanout_cast(context, + self.make_msg('port_update', + port=port, + vlan_id=vlan_id), + topic=self.topic_port_update) + + def tunnel_update(self, context, tunnel_ip, tunnel_id): + self.fanout_cast(context, + self.make_msg('tunnel_update', + tunnel_ip=tunnel_ip, + tunnel_id=tunnel_id), + topic=self.topic_tunnel_update) class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2): @@ -67,6 +191,25 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2): # update the vlan_id table based on current configuration ovs_db_v2.update_vlan_id_pool() + self.rpc = cfg.CONF.AGENT.rpc + if cfg.CONF.AGENT.rpc and cfg.CONF.AGENT.target_v2_api: + self.setup_rpc() + if not cfg.CONF.AGENT.target_v2_api: + self.rpc = False + + def setup_rpc(self): + # RPC support + self.topic = topics.PLUGIN + self.context = context.RequestContext('quantum', 'quantum', + is_admin=False) + self.conn = rpc.create_connection(new=True) + self.notifier = AgentNotifierApi(topics.AGENT) + self.callbacks = OVSRpcCallbacks(self.context, self.notifier) + self.dispatcher = self.callbacks.create_rpc_dispatcher() + self.conn.create_consumer(self.topic, self.dispatcher, + fanout=False) + # Consume from all consumers in a thread + self.conn.consume_in_thread() # TODO(rkukura) Use core mechanism for attribute authorization # when available. @@ -114,6 +257,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2): vlan_id = ovs_db_v2.get_vlan(id) result = super(OVSQuantumPluginV2, self).delete_network(context, id) ovs_db_v2.release_vlan_id(vlan_id) + if self.rpc: + self.notifier.network_delete(self.context, id) return result def get_network(self, context, id, fields=None, verbose=None): @@ -129,3 +274,14 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2): self._extend_network_dict(context, net) # TODO(rkukura): Filter on extended attributes. return [self._fields(net, fields) for net in nets] + + def update_port(self, context, id, port): + if self.rpc: + original_port = super(OVSQuantumPluginV2, self).get_port(context, + id) + port = super(OVSQuantumPluginV2, self).update_port(context, id, port) + if self.rpc: + if original_port['admin_state_up'] != port['admin_state_up']: + vlan_id = ovs_db_v2.get_vlan(port['network_id']) + self.notifier.port_update(self.context, port, vlan_id) + return port diff --git a/quantum/plugins/openvswitch/tests/unit/test_rpcapi.py b/quantum/plugins/openvswitch/tests/unit/test_rpcapi.py new file mode 100644 index 0000000000..11c3506c00 --- /dev/null +++ b/quantum/plugins/openvswitch/tests/unit/test_rpcapi.py @@ -0,0 +1,107 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit Tests for openvswitch rpc +""" + +import stubout +import unittest2 + +from quantum.agent import rpc as agent_rpc +from quantum.common import topics +from quantum.openstack.common import context +from quantum.openstack.common import rpc +from quantum.plugins.openvswitch import ovs_quantum_plugin as povs +from quantum.plugins.openvswitch.common import config + + +class rpcApiTestCase(unittest2.TestCase): + + def _test_ovs_api(self, rpcapi, topic, method, rpc_method, **kwargs): + ctxt = context.RequestContext('fake_user', 'fake_project') + expected_retval = 'foo' if method == 'call' else None + expected_msg = rpcapi.make_msg(method, **kwargs) + expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION + if rpc_method == 'cast' and method == 'run_instance': + kwargs['call'] = False + + self.fake_args = None + self.fake_kwargs = None + + def _fake_rpc_method(*args, **kwargs): + self.fake_args = args + self.fake_kwargs = kwargs + if expected_retval: + return expected_retval + + self.stubs = stubout.StubOutForTesting() + self.stubs.Set(rpc, rpc_method, _fake_rpc_method) + + retval = getattr(rpcapi, method)(ctxt, **kwargs) + + self.assertEqual(retval, expected_retval) + expected_args = [ctxt, topic, expected_msg] + + for arg, expected_arg in zip(self.fake_args, expected_args): + self.assertEqual(arg, expected_arg) + + def test_delete_network(self): + rpcapi = povs.AgentNotifierApi(topics.AGENT) + self._test_ovs_api(rpcapi, + topics.get_topic_name(topics.AGENT, + topics.NETWORK, + topics.DELETE), + 'network_delete', rpc_method='fanout_cast', + network_id='fake_request_spec') + + def test_port_update(self): + rpcapi = povs.AgentNotifierApi(topics.AGENT) + self._test_ovs_api(rpcapi, + topics.get_topic_name(topics.AGENT, + topics.PORT, + topics.UPDATE), + 'port_update', rpc_method='fanout_cast', + port='fake_port', vlan_id='fake_vlan_id') + + def test_tunnel_update(self): + rpcapi = povs.AgentNotifierApi(topics.AGENT) + self._test_ovs_api(rpcapi, + topics.get_topic_name(topics.AGENT, + config.TUNNEL, + topics.UPDATE), + 'tunnel_update', rpc_method='fanout_cast', + tunnel_ip='fake_ip', tunnel_id='fake_id') + + def test_device_details(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + self._test_ovs_api(rpcapi, topics.PLUGIN, + 'get_device_details', rpc_method='call', + device='fake_device', + agent_id='fake_agent_id') + + def test_update_device_down(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + self._test_ovs_api(rpcapi, topics.PLUGIN, + 'update_device_down', rpc_method='call', + device='fake_device', + agent_id='fake_agent_id') + + def test_tunnel_sync(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + self._test_ovs_api(rpcapi, topics.PLUGIN, + 'tunnel_sync', rpc_method='call', + tunnel_ip='fake_tunnel_ip') diff --git a/quantum/plugins/openvswitch/tests/unit/test_tunnel.py b/quantum/plugins/openvswitch/tests/unit/test_tunnel.py index 5fd954591c..6589b6425d 100644 --- a/quantum/plugins/openvswitch/tests/unit/test_tunnel.py +++ b/quantum/plugins/openvswitch/tests/unit/test_tunnel.py @@ -81,7 +81,7 @@ class TunnelTest(unittest.TestCase): b = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, self.TUN_BRIDGE, '10.0.0.1', - 'sudo', 2, 2) + 'sudo', 2, 2, False, False) self.mox.VerifyAll() def testProvisionLocalVlan(self): @@ -98,7 +98,7 @@ class TunnelTest(unittest.TestCase): a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, self.TUN_BRIDGE, '10.0.0.1', - 'sudo', 2, 2) + 'sudo', 2, 2, False, False) a.available_local_vlans = set([LV_ID]) a.provision_local_vlan(NET_UUID, LS_ID) self.mox.VerifyAll() @@ -112,7 +112,7 @@ class TunnelTest(unittest.TestCase): a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, self.TUN_BRIDGE, '10.0.0.1', - 'sudo', 2, 2) + 'sudo', 2, 2, False, False) a.available_local_vlans = set() a.local_vlan_map[NET_UUID] = LVM a.reclaim_local_vlan(NET_UUID, LVM) @@ -128,7 +128,7 @@ class TunnelTest(unittest.TestCase): a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, self.TUN_BRIDGE, '10.0.0.1', - 'sudo', 2, 2) + 'sudo', 2, 2, False, False) a.local_vlan_map[NET_UUID] = LVM a.port_bound(VIF_PORT, NET_UUID, LS_ID) self.mox.VerifyAll() @@ -138,7 +138,7 @@ class TunnelTest(unittest.TestCase): a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, self.TUN_BRIDGE, '10.0.0.1', - 'sudo', 2, 2) + 'sudo', 2, 2, False, False) a.available_local_vlans = set([LV_ID]) a.local_vlan_map[NET_UUID] = LVM a.port_unbound(VIF_PORT, NET_UUID) @@ -155,7 +155,7 @@ class TunnelTest(unittest.TestCase): a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, self.TUN_BRIDGE, '10.0.0.1', - 'sudo', 2, 2) + 'sudo', 2, 2, False, False) a.available_local_vlans = set([LV_ID]) a.local_vlan_map[NET_UUID] = LVM a.port_dead(VIF_PORT) diff --git a/tools/pip-requires b/tools/pip-requires index 4bc0410503..4416579010 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -1,7 +1,7 @@ Paste PasteDeploy==1.5.0 Routes>=1.12.3 -eventlet>=0.9.12 +eventlet>=0.9.17 httplib2 iso8601>=0.1.4 lxml