OVS plugin: add tunnel ips to central database

bp simplify-ovs-tunnel-mgmt

Makes the OVS plugin agent report its own IP address to the centralized
quantum database, and build its set of tunnels based on the contents of
this centralized database.  This removes the need for a removes need for
a 'remote-ips' file on each compute node that needs to be updated
when a new host is added.

Also:
- simplifies error handling within tunnel manager daemon_loop
- fixes issues with operational status not working for tunnel-mode
- fixes issue that not-stripping vlan of tunneled packet potentially
crashes OVS, causing flows to get wiped.

Change-Id: I4d285669e29beecf745fe620581fa6bc332a446c
This commit is contained in:
Dan Wendlandt 2012-05-15 12:41:52 -07:00
parent 75c7ee0ec3
commit b4365a2dac
5 changed files with 186 additions and 206 deletions

View File

@ -20,15 +20,9 @@ enable-tunneling = False
# connectivity.
integration-bridge = br-int
# Uncomment this line if enable-tunneling is True above.
# Only used if enable-tunneling (above) is True.
# In most cases, the default value should be fine.
# tunnel-bridge = br-tun
# Uncomment this line if enable-tunneling is True above.
# This file contains a list of IP addresses (one per line) that point to
# hypervisors to which tunnels should be connected. It is best to use
# an absolute path to this file.
# remote-ip-file = /opt/stack/remote-ips.txt
tunnel-bridge = br-tun
# Uncomment this line if enable-tunneling is True above.
# Set local-ip to be the local IP address of this hypervisor.

View File

@ -28,7 +28,10 @@ import subprocess
import sys
import time
from sqlalchemy.ext.sqlsoup import SqlSoup
import sqlalchemy
from sqlalchemy.ext import sqlsoup
from quantum.plugins.openvswitch import ovs_models
logging.basicConfig()
LOG = logging.getLogger(__name__)
@ -100,6 +103,10 @@ class OVSBridge:
full_args = ["ovs-ofctl", cmd, self.br_name] + args
return self.run_cmd(full_args)
def count_flows(self):
flow_list = self.run_ofctl("dump-flows", []).split("\n")[1:]
return len(flow_list) - 1
def remove_all_flows(self):
self.run_ofctl("del-flows", [])
@ -214,6 +221,36 @@ class LocalVLANMapping:
return "lv-id = %s ls-id = %s" % (self.vlan, self.lsw_id)
class Port(object):
'''class stores port data in an ORM-free way,
so attributes are still available even if a
row has been deleted.
'''
def __init__(self, p):
self.uuid = p.uuid
self.network_id = p.network_id
self.interface_id = p.interface_id
self.state = p.state
self.op_status = p.op_status
def __eq__(self, other):
'''compare only fields that will cause us to re-wire
'''
try:
return (self and other
and self.interface_id == other.interface_id
and self.state == other.state)
except:
return False
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(self.uuid)
class OVSQuantumAgent(object):
def __init__(self, integ_br, root_helper,
@ -239,6 +276,10 @@ class OVSQuantumAgent(object):
self.int_br.add_flow(priority=1, actions="normal")
def daemon_loop(self, db_connection_url):
'''Main processing loop for Non-Tunneling Agent.
:param options: database information - in the event need to reconnect
'''
self.local_vlan_map = {}
old_local_bindings = {}
old_vif_ports = {}
@ -247,7 +288,7 @@ class OVSQuantumAgent(object):
while True:
if not db_connected:
time.sleep(self.reconnect_interval)
db = SqlSoup(db_connection_url)
db = sqlsoup.SqlSoup(db_connection_url)
db_connected = True
LOG.info("Connecting to database \"%s\" on %s" %
(db.engine.url.database, db.engine.url.host))
@ -358,25 +399,30 @@ class OVSQuantumTunnelAgent(object):
# Upper bound on available vlans.
MAX_VLAN_TAG = 4094
def __init__(self, integ_br, tun_br, remote_ip_file, local_ip,
root_helper, polling_interval, reconnect_interval):
def __init__(self, integ_br, tun_br, local_ip, root_helper,
polling_interval, reconnect_interval):
'''Constructor.
:param integ_br: name of the integration bridge.
:param tun_br: name of the tunnel bridge.
:param remote_ip_file: name of file containing list of hypervisor IPs.
:param local_ip: local IP address of this hypervisor.'''
:param local_ip: local IP address of this hypervisor.
:param root_helper: utility to use when running shell cmds.
:param polling_interval: interval (secs) to poll DB.
:param reconnect_internal: retry interval (secs) on DB error.'''
self.root_helper = root_helper
self.available_local_vlans = set(
xrange(OVSQuantumTunnelAgent.MIN_VLAN_TAG,
OVSQuantumTunnelAgent.MAX_VLAN_TAG))
self.setup_integration_br(integ_br)
self.local_vlan_map = {}
self.setup_tunnel_br(tun_br, remote_ip_file, local_ip)
self.db_connected = False
self.polling_interval = polling_interval
self.reconnect_interval = reconnect_interval
self.local_ip = local_ip
self.tunnel_count = 0
self.setup_tunnel_br(tun_br)
def provision_local_vlan(self, net_uuid, lsw_id):
'''Provisions a local VLAN.
@ -391,8 +437,8 @@ class OVSQuantumTunnelAgent(object):
# outbound
self.tun_br.add_flow(priority=4, match="in_port=%s,dl_vlan=%s" %
(self.patch_int_ofport, lvid),
actions="set_tunnel:%s,normal" % (lsw_id))
actions="strip_vlan,set_tunnel:%s,normal" %
(lsw_id))
# inbound
self.tun_br.add_flow(priority=3, match="tun_id=%s" % lsw_id,
actions="mod_vlan_vid:%s,output:%s" %
@ -471,184 +517,159 @@ class OVSQuantumTunnelAgent(object):
# switch all traffic using L2 learning
self.int_br.add_flow(priority=1, actions="normal")
def setup_tunnel_br(self, tun_br, remote_ip_file, local_ip):
def setup_tunnel_br(self, tun_br):
'''Setup the tunnel bridge.
Reads in list of IP addresses. Creates GRE tunnels to each of these
addresses and then clears out existing flows. local_ip is the address
of the local node. A tunnel is not created to this IP address.
Creates tunnel bridge, and links it to the integration bridge
using a patch port.
:param tun_br: the name of the tunnel bridge.
:param remote_ip_file: path to file that contains list of destination
IP addresses.
:param local_ip: the ip address of this node.'''
:param tun_br: the name of the tunnel bridge.'''
self.tun_br = OVSBridge(tun_br, self.root_helper)
self.tun_br.reset_bridge()
self.patch_int_ofport = self.tun_br.add_patch_port("patch-int",
"patch-tun")
try:
with open(remote_ip_file, 'r') as f:
remote_ip_list = f.readlines()
clean_ips = (x.rstrip() for x in remote_ip_list)
tunnel_ips = (x for x in clean_ips if x != local_ip and x)
for i, remote_ip in enumerate(tunnel_ips):
self.tun_br.add_tunnel_port("gre-" + str(i), remote_ip)
except Exception as e:
LOG.error("Error configuring tunnels: '%s' %s" %
(remote_ip_file, str(e)))
raise
self.tun_br.remove_all_flows()
# default drop
self.tun_br.add_flow(priority=1, actions="drop")
def get_db_port_bindings(self, db):
'''Get database port bindings from central Quantum database.
def manage_tunnels(self, tunnel_ips, old_tunnel_ips, db):
if self.local_ip in tunnel_ips:
tunnel_ips.remove(self.local_ip)
else:
db.tunnel_ips.insert(ip_address=self.local_ip)
The central quantum database 'ovs_quantum' resides on the openstack
mysql server.
new_tunnel_ips = tunnel_ips - old_tunnel_ips
if new_tunnel_ips:
LOG.info("adding tunnels to: %s" % new_tunnel_ips)
for ip in new_tunnel_ips:
tun_name = "gre-" + str(self.tunnel_count)
self.tun_br.add_tunnel_port(tun_name, ip)
self.tunnel_count += 1
:returns: a dictionary containing port bindings.'''
ports = []
try:
ports = db.ports.all()
except Exceptioni as e:
LOG.info("Unable to get port bindings! Exception: %s" % e)
self.db_connected = False
return {}
# adding new ports can void flows, so reset flows
self.tun_br.remove_all_flows()
self.tun_br.add_flow(priority=1, actions="drop")
for lv_ojb in self.local_vlan_map.values():
self.add_tun_br_flows_for_local_vlan(lv_obj)
return dict([(port.interface_id, port) for port in ports])
def get_db_vlan_bindings(self, db):
'''Get database vlan bindings from central Quantum database.
The central quantum database 'ovs_quantum' resides on the openstack
mysql server.
:returns: a dictionary containing vlan bindings.'''
lsw_id_binds = []
try:
lsw_id_binds.extend(db.vlan_bindings.all())
except Exception as e:
LOG.info("Unable to get vlan bindings! Exception: %s" % e)
self.db_connected = False
return {}
return dict([(bind.network_id, bind.vlan_id)
for bind in lsw_id_binds])
def rollback_until_success(self, db):
while True:
time.sleep(self.reconnect_interval)
try:
db.rollback()
break
except:
LOG.exception("Problem connecting to database")
def daemon_loop(self, db_connection_url):
'''Main processing loop (not currently used).
'''Main processing loop for Tunneling Agent.
:param options: database information - in the event need to reconnect
'''
old_local_bindings = {}
old_vif_ports = {}
self.db_connected = False
old_tunnel_ips = set()
db = sqlsoup.SqlSoup(db_connection_url)
LOG.info("Connecting to database \"%s\" on %s" %
(db.engine.url.database, db.engine.url.host))
while True:
if not self.db_connected:
time.sleep(self.reconnect_interval)
db = SqlSoup(db_connection_url)
self.db_connected = True
LOG.info("Connecting to database \"%s\" on %s" %
(db.engine.url.database, db.engine.url.host))
try:
all_bindings = dict((p.interface_id, Port(p))
for p in db.ports.all())
all_bindings_vif_port_ids = set(all_bindings)
lsw_id_bindings = dict((bind.network_id, bind.vlan_id)
for bind in db.vlan_bindings.all())
# Get bindings from db.
all_bindings = self.get_db_port_bindings(db)
if not self.db_connected:
continue
all_bindings_vif_port_ids = set(all_bindings.keys())
lsw_id_bindings = self.get_db_vlan_bindings(db)
if not self.db_connected:
continue
tunnel_ips = set(x.ip_address for x in db.tunnel_ips.all())
self.manage_tunnels(tunnel_ips, old_tunnel_ips, db)
# Get bindings from OVS bridge.
vif_ports = self.int_br.get_vif_ports()
new_vif_ports = dict([(p.vif_id, p) for p in vif_ports])
new_vif_ports_ids = set(new_vif_ports.keys())
# Get bindings from OVS bridge.
vif_ports = self.int_br.get_vif_ports()
new_vif_ports = dict([(p.vif_id, p) for p in vif_ports])
new_vif_ports_ids = set(new_vif_ports.keys())
old_vif_ports_ids = set(old_vif_ports.keys())
dead_vif_ports_ids = new_vif_ports_ids - all_bindings_vif_port_ids
dead_vif_ports = [new_vif_ports[p] for p in dead_vif_ports_ids]
disappeared_vif_ports_ids = old_vif_ports_ids - new_vif_ports_ids
new_local_bindings_ids = all_bindings_vif_port_ids.intersection(
new_vif_ports_ids)
new_local_bindings = dict([(p, all_bindings.get(p))
for p in new_vif_ports_ids])
new_bindings = set(
(p, old_local_bindings.get(p),
new_local_bindings.get(p)) for p in new_vif_ports_ids)
changed_bindings = set([b for b in new_bindings if b[2] != b[1]])
old_vif_ports_ids = set(old_vif_ports.keys())
dead_vif_ports_ids = (new_vif_ports_ids -
all_bindings_vif_port_ids)
dead_vif_ports = [new_vif_ports[p] for p in dead_vif_ports_ids]
disappeared_vif_ports_ids = (old_vif_ports_ids -
new_vif_ports_ids)
new_local_bindings_ids = (all_bindings_vif_port_ids.
intersection(new_vif_ports_ids))
new_local_bindings = dict([(p, all_bindings.get(p))
for p in new_vif_ports_ids])
new_bindings = set(
(p, old_local_bindings.get(p),
new_local_bindings.get(p)) for p in new_vif_ports_ids)
changed_bindings = set([b for b in new_bindings
if b[2] != b[1]])
LOG.debug('all_bindings: %s' % all_bindings)
LOG.debug('lsw_id_bindings: %s' % lsw_id_bindings)
LOG.debug('old_vif_ports_ids: %s' % old_vif_ports_ids)
LOG.debug('dead_vif_ports_ids: %s' % dead_vif_ports_ids)
LOG.debug('old_vif_ports_ids: %s' % old_vif_ports_ids)
LOG.debug('new_local_bindings_ids: %s' % new_local_bindings_ids)
LOG.debug('new_local_bindings: %s' % new_local_bindings)
LOG.debug('new_bindings: %s' % new_bindings)
LOG.debug('changed_bindings: %s' % changed_bindings)
LOG.debug('all_bindings: %s', all_bindings)
LOG.debug('lsw_id_bindings: %s', lsw_id_bindings)
LOG.debug('new_vif_ports_ids: %s', new_vif_ports_ids)
LOG.debug('dead_vif_ports_ids: %s', dead_vif_ports_ids)
LOG.debug('old_vif_ports_ids: %s', old_vif_ports_ids)
LOG.debug('new_local_bindings_ids: %s',
new_local_bindings_ids)
LOG.debug('new_local_bindings: %s', new_local_bindings)
LOG.debug('new_bindings: %s', new_bindings)
LOG.debug('changed_bindings: %s', changed_bindings)
# Take action.
for p in dead_vif_ports:
LOG.info("No quantum binding for port " + str(p)
+ "putting on dead vlan")
self.port_dead(p)
# Take action.
for p in dead_vif_ports:
LOG.info("No quantum binding for port " + str(p)
+ "putting on dead vlan")
self.port_dead(p)
for b in changed_bindings:
port_id, old_port, new_port = b
p = new_vif_ports[port_id]
if old_port:
old_net_uuid = old_port.network_id
LOG.info("Removing binding to net-id = " +
old_net_uuid + " for " + str(p)
+ " added to dead vlan")
self.port_unbound(p, old_net_uuid)
if not new_port:
self.port_dead(p)
for b in changed_bindings:
port_id, old_port, new_port = b
p = new_vif_ports[port_id]
if old_port:
old_net_uuid = old_port.network_id
LOG.info("Removing binding to net-id = " +
old_net_uuid + " for " + str(p)
+ " added to dead vlan")
self.port_unbound(p, old_net_uuid)
all_bindings[p.vif_id].op_status = OP_STATUS_DOWN
if not new_port:
self.port_dead(p)
if new_port:
new_net_uuid = new_port.network_id
if new_net_uuid not in lsw_id_bindings:
LOG.warn("No ls-id binding found for net-id '%s'" %
new_net_uuid)
continue
if new_port:
new_net_uuid = new_port.network_id
if new_net_uuid not in lsw_id_bindings:
LOG.warn("No ls-id binding found for net-id '%s'" %
new_net_uuid)
continue
lsw_id = lsw_id_bindings[new_net_uuid]
try:
lsw_id = lsw_id_bindings[new_net_uuid]
self.port_bound(p, new_net_uuid, lsw_id)
all_bindings[p.vif_id].op_status = OP_STATUS_UP
LOG.info("Port " + str(p) + " on net-id = "
+ new_net_uuid + " bound to " +
str(self.local_vlan_map[new_net_uuid]))
except Exception as e:
LOG.info("Unable to bind Port " + str(p) +
" on netid = " + new_net_uuid + " to "
+ str(self.local_vlan_map[new_net_uuid]))
for vif_id in disappeared_vif_ports_ids:
LOG.info("Port Disappeared: " + vif_id)
old_port = old_local_bindings.get(vif_id)
if old_port:
try:
for vif_id in disappeared_vif_ports_ids:
LOG.info("Port Disappeared: " + vif_id)
if vif_id in all_bindings:
all_bindings[vif_id].op_status = OP_STATUS_DOWN
old_port = old_local_bindings.get(vif_id)
if old_port:
self.port_unbound(old_vif_ports[vif_id],
old_port.network_id)
except Exception:
LOG.info("Unable to unbind Port " + str(p) +
" on net-id = " + old_port.network_uuid)
old_vif_ports = new_vif_ports
old_local_bindings = new_local_bindings
try:
# commit any DB changes and expire
# data loaded from the database
db.commit()
except Exception as e:
LOG.info("Unable to commit to database! Exception: %s" % e)
db.rollback()
old_local_bindings = {}
old_vif_ports = {}
time.sleep(self.polling_interval)
# sleep and re-initialize state for next pass
time.sleep(self.polling_interval)
old_tunnel_ips = tunnel_ips
old_vif_ports = new_vif_ports
old_local_bindings = new_local_bindings
except:
LOG.exception("Main-loop Exception:")
self.rollback_until_success(db)
def main():
@ -721,12 +742,6 @@ def main():
raise Exception('Empty tunnel-bridge in configuration file.')
# Mandatory parameter.
remote_ip_file = config.get("OVS", "remote-ip-file")
if not len(remote_ip_file):
raise Exception('Empty remote-ip-file in configuration file.')
# Mandatory parameter.
remote_ip_file = config.get("OVS", "remote-ip-file")
local_ip = config.get("OVS", "local-ip")
if not len(local_ip):
raise Exception('Empty local-ip in configuration file.')
@ -736,8 +751,7 @@ def main():
(config_file, str(e)))
sys.exit(1)
plugin = OVSQuantumTunnelAgent(integ_br, tun_br, remote_ip_file,
local_ip, root_helper,
plugin = OVSQuantumTunnelAgent(integ_br, tun_br, local_ip, root_helper,
polling_interval, reconnect_interval)
else:
# Get parameters for OVSQuantumAgent.

View File

@ -40,3 +40,16 @@ class VlanBinding(BASE):
def __repr__(self):
return "<VlanBinding(%s,%s)>" % \
(self.vlan_id, self.network_id)
class TunnelIP(BASE):
"""Represents a remote IP in tunnel mode"""
__tablename__ = 'tunnel_ips'
ip_address = Column(String(255), primary_key=True)
def __init__(self, ip_address):
self.ip_address = ip_address
def __repr__(self):
return "<TunnelIP(%s)>" % (self.ip_address)

View File

@ -24,9 +24,6 @@ import mox
from quantum.plugins.openvswitch.agent import ovs_quantum_agent
LOCAL_DIR = os.path.dirname(__file__)
REMOTE_IP_FILE = LOCAL_DIR + '/remote-ip-file.txt'
# Useful global dummy variables.
NET_UUID = '3faeebfe-5d37-11e1-a64b-000c29d5f0a7'
LS_ID = '42'
@ -53,7 +50,6 @@ class DummyVlanBinding:
class TunnelTest(unittest.TestCase):
def setUp(self):
print LOCAL_DIR
self.mox = mox.Mox()
self.INT_BRIDGE = 'integration_bridge'
@ -86,7 +82,6 @@ class TunnelTest(unittest.TestCase):
b = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo', 2, 2)
self.mox.VerifyAll()
@ -106,7 +101,6 @@ class TunnelTest(unittest.TestCase):
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo', 2, 2)
a.available_local_vlans = set([LV_ID])
@ -123,7 +117,6 @@ class TunnelTest(unittest.TestCase):
self.mox.ReplayAll()
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo', 2, 2)
a.available_local_vlans = set()
@ -140,7 +133,6 @@ class TunnelTest(unittest.TestCase):
self.mox.ReplayAll()
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo', 2, 2)
a.local_vlan_map[NET_UUID] = LVM
@ -151,7 +143,6 @@ class TunnelTest(unittest.TestCase):
self.mox.ReplayAll()
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo', 2, 2)
a.available_local_vlans = set([LV_ID])
@ -170,41 +161,9 @@ class TunnelTest(unittest.TestCase):
self.mox.ReplayAll()
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo', 2, 2)
a.available_local_vlans = set([LV_ID])
a.local_vlan_map[NET_UUID] = LVM
a.port_dead(VIF_PORT)
self.mox.VerifyAll()
def testDbBindings(self):
db = self.mox.CreateMockAnything()
db.ports = self.mox.CreateMockAnything()
interface_ids = ['interface-id-%d' % x for x in range(3)]
db.ports.all().AndReturn([DummyPort(x) for x in interface_ids])
db.vlan_bindings = self.mox.CreateMockAnything()
vlan_bindings = [
['network-id-%d' % x, 'vlan-id-%d' % x] for x in range(3)]
db.vlan_bindings.all().AndReturn(
[DummyVlanBinding(*x) for x in vlan_bindings])
self.mox.ReplayAll()
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo', 2, 2)
all_bindings = a.get_db_port_bindings(db)
lsw_id_bindings = a.get_db_vlan_bindings(db)
for interface_id, port in all_bindings.iteritems():
self.assertTrue(interface_id in interface_ids)
for network_id, vlan_id in lsw_id_bindings.iteritems():
self.assertTrue(network_id in [x[0] for x in vlan_bindings])
self.assertTrue(vlan_id in [x[1] for x in vlan_bindings])
self.mox.VerifyAll()