blueprint agent-db-ha

bug 985470
bug 985646

The fixes enable the OVS and linuxbridge agenets to "keep alive" when the host running the server/plugin is down.

Fixes after comments. Better logging

Fixes after comments - added reconnect interval + cleanup

Fixes after comments - simplify code + ovs intervals moved to configuration file

Fixes after comments - move int conversion to configuration

Fixes after comments - if one of the polling interval or reconnect interval are not
defined in the relevant ini files then a default value is used.

Fixes after comments and merges with HACKING.rst fixes

Fixes after port binding comments

Fixes after comments from gongysh

Fixes after comments - align comments in agent ini files

Fixes - revert some code

Change-Id: I9194f142478b130e8ef198b019539357a9916d7f
This commit is contained in:
Gary Kotton 2012-04-24 02:02:03 -04:00
parent 4ccd0131a0
commit c3e7d4a7c0
6 changed files with 187 additions and 97 deletions

View File

@ -16,12 +16,14 @@ host = <hostname_or_IP_address_of_Quantum_server>
port = 3306
[LINUX_BRIDGE]
#this is the interface connected to the switch on your Quantum network
# This is the interface connected to the switch on your Quantum network
physical_interface = eth1
[AGENT]
#agent's polling interval in seconds
# Agent's polling interval in seconds
polling_interval = 2
# Agent's database reconnection interval in seconds - in event connectivity is lost
reconnect_interval = 2
# Change to "sudo quantum-rootwrap" to limit commands that can be run
# as root.
root_helper = sudo

View File

@ -32,6 +32,10 @@ integration-bridge = br-int
# local-ip = 10.0.0.3
[AGENT]
# Agent's polling interval in seconds
polling_interval = 2
# Agent's database reconnection interval in seconds - in event connectivity is lost
reconnect_interval = 2
# Change to "sudo quantum-rootwrap" to limit commands that can be run
# as root.
root_helper = sudo

View File

@ -28,14 +28,13 @@ from optparse import OptionParser
import os
import shlex
import signal
import sqlite3
import subprocess
import sys
import time
import MySQLdb
from sqlalchemy.ext.sqlsoup import SqlSoup
logging.basicConfig()
LOG = logging.getLogger(__name__)
@ -52,7 +51,9 @@ VLAN_BINDINGS = "vlan_bindings"
PORT_BINDINGS = "port_bindings"
OP_STATUS_UP = "UP"
OP_STATUS_DOWN = "DOWN"
DB_CONNECTION = None
# Default inteval values
DEFAULT_POLLING_INTERVAL = 2
DEFAULT_RECONNECT_INTERVAL = 2
class LinuxBridge:
@ -288,10 +289,12 @@ class LinuxBridge:
class LinuxBridgeQuantumAgent:
def __init__(self, br_name_prefix, physical_interface, polling_interval,
root_helper):
self.polling_interval = int(polling_interval)
reconnect_interval, root_helper):
self.polling_interval = polling_interval
self.reconnect_interval = reconnect_interval
self.root_helper = root_helper
self.setup_linux_bridge(br_name_prefix, physical_interface)
self.db_connected = False
def setup_linux_bridge(self, br_name_prefix, physical_interface):
self.linux_br = LinuxBridge(br_name_prefix, physical_interface,
@ -350,27 +353,43 @@ class LinuxBridgeQuantumAgent:
if bridge not in current_quantum_bridge_names:
self.linux_br.delete_vlan_bridge(bridge)
def manage_networks_on_host(self, conn, old_vlan_bindings,
def manage_networks_on_host(self, db,
old_vlan_bindings,
old_port_bindings):
if DB_CONNECTION != 'sqlite':
cursor = MySQLdb.cursors.DictCursor(conn)
else:
cursor = conn.cursor()
cursor.execute("SELECT * FROM vlan_bindings")
rows = cursor.fetchall()
cursor.close()
vlan_bindings = {}
try:
vlan_binds = db.vlan_bindings.all()
except Exception as e:
LOG.info("Unable to get vlan bindings! Exception: %s" % e)
self.db_connected = False
return {VLAN_BINDINGS: {},
PORT_BINDINGS: []}
vlans_string = ""
for row in rows:
vlan_bindings[row['network_id']] = row
vlans_string = "%s %s" % (vlans_string, row)
for bind in vlan_binds:
entry = {'network_id': bind.network_id, 'vlan_id': bind.vlan_id}
vlan_bindings[bind.network_id] = entry
vlans_string = "%s %s" % (vlans_string, entry)
port_bindings = []
try:
port_binds = db.ports.all()
except Exception as e:
LOG.info("Unable to get port bindings! Exception: %s" % e)
self.db_connected = False
return {VLAN_BINDINGS: {},
PORT_BINDINGS: []}
all_bindings = {}
for bind in port_binds:
all_bindings[bind.uuid] = bind
entry = {'network_id': bind.network_id, 'state': bind.state,
'op_status': bind.op_status, 'uuid': bind.uuid,
'interface_id': bind.interface_id}
if bind.state == 'ACTIVE':
port_bindings.append(entry)
plugged_interfaces = []
cursor = MySQLdb.cursors.DictCursor(conn)
cursor.execute("SELECT * FROM ports where state = 'ACTIVE'")
port_bindings = cursor.fetchall()
cursor.close()
ports_string = ""
for pb in port_bindings:
ports_string = "%s %s" % (ports_string, pb)
@ -380,10 +399,7 @@ class LinuxBridgeQuantumAgent:
pb['network_id'],
pb['interface_id'],
vlan_id):
cursor = MySQLdb.cursors.DictCursor(conn)
sql = PORT_OPSTATUS_UPDATESQL % (pb['uuid'], OP_STATUS_UP)
cursor.execute(sql)
cursor.close()
all_bindings[pb['uuid']].op_status = OP_STATUS_UP
plugged_interfaces.append(pb['interface_id'])
if old_port_bindings != port_bindings:
@ -396,16 +412,30 @@ class LinuxBridgeQuantumAgent:
self.process_deleted_networks(vlan_bindings)
conn.commit()
try:
db.commit()
except Exception as e:
LOG.info("Unable to update database! Exception: %s" % e)
db.rollback()
vlan_bindings = {}
port_bindings = []
return {VLAN_BINDINGS: vlan_bindings,
PORT_BINDINGS: port_bindings}
def daemon_loop(self, conn):
def daemon_loop(self, db_connection_url):
old_vlan_bindings = {}
old_port_bindings = {}
old_port_bindings = []
self.db_connected = False
while True:
bindings = self.manage_networks_on_host(conn,
if not self.db_connected:
time.sleep(self.reconnect_interval)
db = SqlSoup(db_connection_url)
self.db_connected = True
LOG.info("Connecting to database \"%s\" on %s" %
(db.engine.url.database, db.engine.url.host))
bindings = self.manage_networks_on_host(db,
old_vlan_bindings,
old_port_bindings)
old_vlan_bindings = bindings[VLAN_BINDINGS]
@ -422,9 +452,9 @@ def main():
options, args = parser.parse_args()
if options.verbose:
LOG.basicConfig(level=LOG.DEBUG)
LOG.setLevel(logging.DEBUG)
else:
LOG.basicConfig(level=LOG.WARN)
LOG.setLevel(logging.WARNING)
if len(args) != 1:
parser.print_help()
@ -432,22 +462,28 @@ def main():
config_file = args[0]
config = ConfigParser.ConfigParser()
conn = None
try:
fh = open(config_file)
fh.close()
config.read(config_file)
br_name_prefix = BRIDGE_NAME_PREFIX
physical_interface = config.get("LINUX_BRIDGE", "physical_interface")
polling_interval = config.get("AGENT", "polling_interval")
if config.has_option("AGENT", "polling_interval"):
polling_interval = config.getint("AGENT", "polling_interval")
else:
polling_interval = DEFAULT_POLLING_INTERVAL
LOG.info("Polling interval not defined. Using default.")
if config.has_option("AGENT", "reconnect_interval"):
reconnect_interval = config.getint("AGENT", "reconnect_interval")
else:
reconnect_interval = DEFAULT_RECONNECT_INTERVAL
LOG.info("Reconnect interval not defined. Using default.")
root_helper = config.get("AGENT", "root_helper")
'Establish database connection and load models'
global DB_CONNECTION
DB_CONNECTION = config.get("DATABASE", "connection")
if DB_CONNECTION == 'sqlite':
connection = config.get("DATABASE", "connection")
if connection == 'sqlite':
LOG.info("Connecting to sqlite DB")
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
db_connection_url = "sqlite:///:memory:"
else:
db_name = config.get("DATABASE", "name")
db_user = config.get("DATABASE", "user")
@ -455,21 +491,18 @@ def main():
db_host = config.get("DATABASE", "host")
db_port = int(config.get("DATABASE", "port"))
LOG.info("Connecting to database %s on %s" % (db_name, db_host))
conn = MySQLdb.connect(host=db_host, user=db_user, port=db_port,
passwd=db_pass, db=db_name)
except Exception, e:
LOG.error("Unable to parse config file \"%s\": \nException%s"
% (config_file, str(e)))
db_connection_url = ("%s://%s:%s@%s:%d/%s" %
(connection, db_user, db_pass, db_host, db_port, db_name))
except Exception as e:
LOG.error("Unable to parse config file \"%s\": \nException %s" %
(config_file, str(e)))
sys.exit(1)
try:
plugin = LinuxBridgeQuantumAgent(br_name_prefix, physical_interface,
polling_interval, root_helper)
LOG.info("Agent initialized successfully, now running...")
plugin.daemon_loop(conn)
finally:
if conn:
conn.close()
plugin = LinuxBridgeQuantumAgent(br_name_prefix, physical_interface,
polling_interval, reconnect_interval,
root_helper)
LOG.info("Agent initialized successfully, now running... ")
plugin.daemon_loop(db_connection_url)
sys.exit(0)

View File

@ -34,7 +34,7 @@ from quantum.plugins.linuxbridge.common import constants as lconst
from quantum.plugins.linuxbridge.db import l2network_db as cdb
LOG = logger.getLogger(__name__)
LOG = logging.getLogger(__name__)
class LinuxBridgeAgentTest(unittest.TestCase):

View File

@ -30,10 +30,9 @@ import time
from sqlalchemy.ext.sqlsoup import SqlSoup
logging.basicConfig()
LOG = logging.getLogger(__name__)
# Global constants.
OP_STATUS_UP = "UP"
OP_STATUS_DOWN = "DOWN"
@ -41,7 +40,9 @@ OP_STATUS_DOWN = "DOWN"
# A placeholder for dead vlans.
DEAD_VLAN_TAG = "4095"
REFRESH_INTERVAL = 2
# Default interval values
DEFAULT_POLLING_INTERVAL = 2
DEFAULT_RECONNECT_INTERVAL = 2
# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
@ -215,9 +216,12 @@ class LocalVLANMapping:
class OVSQuantumAgent(object):
def __init__(self, integ_br, root_helper):
def __init__(self, integ_br, root_helper,
polling_interval, reconnect_interval):
self.root_helper = root_helper
self.setup_integration_br(integ_br)
self.polling_interval = polling_interval
self.reconnect_interval = reconnect_interval
def port_bound(self, port, vlan_id):
self.int_br.set_db_attribute("Port", port.port_name,
@ -234,26 +238,39 @@ class OVSQuantumAgent(object):
# switch all traffic using L2 learning
self.int_br.add_flow(priority=1, actions="normal")
def daemon_loop(self, db):
def daemon_loop(self, db_connection_url):
self.local_vlan_map = {}
old_local_bindings = {}
old_vif_ports = {}
db_connected = False
while True:
if not db_connected:
time.sleep(self.reconnect_interval)
db = SqlSoup(db_connection_url)
db_connected = True
LOG.info("Connecting to database \"%s\" on %s" %
(db.engine.url.database, db.engine.url.host))
all_bindings = {}
try:
ports = db.ports.all()
except:
ports = []
except Exception as e:
LOG.info("Unable to get port bindings! Exception: %s" % e)
db_connected = False
continue
for port in ports:
all_bindings[port.interface_id] = port
vlan_bindings = {}
try:
vlan_binds = db.vlan_bindings.all()
except:
vlan_binds = []
except Exception as e:
LOG.info("Unable to get vlan bindings! Exception: %s" % e)
db_connected = False
continue
for bind in vlan_binds:
vlan_bindings[bind.network_id] = bind.vlan_id
@ -306,8 +323,15 @@ class OVSQuantumAgent(object):
old_vif_ports = new_vif_ports
old_local_bindings = new_local_bindings
db.commit()
time.sleep(REFRESH_INTERVAL)
try:
db.commit()
except Exception as e:
LOG.info("Unable to commit to database! Exception: %s" % e)
db.rollback()
old_local_bindings = {}
old_vif_ports = {}
time.sleep(self.polling_interval)
class OVSQuantumTunnelAgent(object):
@ -335,7 +359,7 @@ class OVSQuantumTunnelAgent(object):
MAX_VLAN_TAG = 4094
def __init__(self, integ_br, tun_br, remote_ip_file, local_ip,
root_helper):
root_helper, polling_interval, reconnect_interval):
'''Constructor.
:param integ_br: name of the integration bridge.
@ -349,6 +373,9 @@ class OVSQuantumTunnelAgent(object):
self.setup_integration_br(integ_br)
self.local_vlan_map = {}
self.setup_tunnel_br(tun_br, remote_ip_file, local_ip)
self.db_connected = False
self.polling_interval = polling_interval
self.reconnect_interval = reconnect_interval
def provision_local_vlan(self, net_uuid, lsw_id):
'''Provisions a local VLAN.
@ -466,7 +493,7 @@ class OVSQuantumTunnelAgent(object):
tunnel_ips = (x for x in clean_ips if x != local_ip and x)
for i, remote_ip in enumerate(tunnel_ips):
self.tun_br.add_tunnel_port("gre-" + str(i), remote_ip)
except Exception, e:
except Exception as e:
LOG.error("Error configuring tunnels: '%s' %s" %
(remote_ip_file, str(e)))
raise
@ -485,8 +512,10 @@ class OVSQuantumTunnelAgent(object):
ports = []
try:
ports = db.ports.all()
except Exception, e:
LOG.info("Exception accessing db.ports: %s" % e)
except Exceptioni as e:
LOG.info("Unable to get port bindings! Exception: %s" % e)
self.db_connected = False
return {}
return dict([(port.interface_id, port) for port in ports])
@ -500,25 +529,39 @@ class OVSQuantumTunnelAgent(object):
lsw_id_binds = []
try:
lsw_id_binds.extend(db.vlan_bindings.all())
except Exception, e:
LOG.info("Exception accessing db.vlan_bindings: %s" % e)
except Exception as e:
LOG.info("Unable to get vlan bindings! Exception: %s" % e)
self.db_connected = False
return {}
return dict([(bind.network_id, bind.vlan_id)
for bind in lsw_id_binds])
def daemon_loop(self, db):
def daemon_loop(self, db_connection_url):
'''Main processing loop (not currently used).
:param db: reference to database layer.
:param options: database information - in the event need to reconnect
'''
old_local_bindings = {}
old_vif_ports = {}
self.db_connected = False
while True:
if not self.db_connected:
time.sleep(self.reconnect_interval)
db = SqlSoup(db_connection_url)
self.db_connected = True
LOG.info("Connecting to database \"%s\" on %s" %
(db.engine.url.database, db.engine.url.host))
# Get bindings from db.
all_bindings = self.get_db_port_bindings(db)
if not self.db_connected:
continue
all_bindings_vif_port_ids = set(all_bindings.keys())
lsw_id_bindings = self.get_db_vlan_bindings(db)
if not self.db_connected:
continue
# Get bindings from OVS bridge.
vif_ports = self.int_br.get_vif_ports()
@ -579,7 +622,7 @@ class OVSQuantumTunnelAgent(object):
LOG.info("Port " + str(p) + " on net-id = "
+ new_net_uuid + " bound to " +
str(self.local_vlan_map[new_net_uuid]))
except Exception, e:
except Exception as e:
LOG.info("Unable to bind Port " + str(p) +
" on netid = " + new_net_uuid + " to "
+ str(self.local_vlan_map[new_net_uuid]))
@ -597,7 +640,7 @@ class OVSQuantumTunnelAgent(object):
old_vif_ports = new_vif_ports
old_local_bindings = new_local_bindings
time.sleep(REFRESH_INTERVAL)
time.sleep(self.polling_interval)
def main():
@ -609,9 +652,9 @@ def main():
options, args = parser.parse_args()
if options.verbose:
LOG.basicConfig(level=LOG.DEBUG)
LOG.setLevel(logging.DEBUG)
else:
LOG.basicConfig(level=LOG.WARN)
LOG.setLevel(logging.WARNING)
if len(args) != 1:
parser.print_help()
@ -621,7 +664,7 @@ def main():
config = ConfigParser.ConfigParser()
try:
config.read(config_file)
except Exception, e:
except Exception as e:
LOG.error("Unable to parse config file \"%s\": %s" %
(config_file, str(e)))
raise e
@ -630,7 +673,7 @@ def main():
enable_tunneling = False
try:
enable_tunneling = config.getboolean("OVS", "enable-tunneling")
except Exception, e:
except Exception as e:
pass
# Get common parameters.
@ -643,9 +686,19 @@ def main():
if not len(db_connection_url):
raise Exception('Empty db_connection_url in configuration file.')
if config.has_option("AGENT", "polling_interval"):
polling_interval = config.getint("AGENT", "polling_interval")
else:
polling_interval = DEFAULT_POLLING_INTERVAL
LOG.info("Polling interval not defined. Using default.")
if config.has_option("AGENT", "reconnect_interval"):
reconnect_interval = config.getint("AGENT", "reconnect_interval")
else:
reconnect_interval = DEFAULT_RECONNECT_INTERVAL
LOG.info("Reconnect interval not defined. Using default.")
root_helper = config.get("AGENT", "root_helper")
except Exception, e:
except Exception as e:
LOG.error("Error parsing common params in config_file: '%s': %s" %
(config_file, str(e)))
sys.exit(1)
@ -668,24 +721,22 @@ def main():
local_ip = config.get("OVS", "local-ip")
if not len(local_ip):
raise Exception('Empty local-ip in configuration file.')
except Exception, e:
except Exception as e:
LOG.error("Error parsing tunnel params in config_file: '%s': %s" %
(config_file, str(e)))
sys.exit(1)
plugin = OVSQuantumTunnelAgent(integ_br, tun_br, remote_ip_file,
local_ip, root_helper)
local_ip, root_helper,
polling_interval, reconnect_interval)
else:
# Get parameters for OVSQuantumAgent.
plugin = OVSQuantumAgent(integ_br, root_helper)
plugin = OVSQuantumAgent(integ_br, root_helper,
polling_interval, reconnect_interval)
# Start everything.
options = {"sql_connection": db_connection_url}
db = SqlSoup(options["sql_connection"])
LOG.info("Connecting to database \"%s\" on %s" %
(db.engine.url.database, db.engine.url.host))
plugin.daemon_loop(db)
plugin.daemon_loop(db_connection_url)
sys.exit(0)

View File

@ -93,7 +93,7 @@ class TunnelTest(unittest.TestCase):
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo')
'sudo', 2, 2)
self.mox.VerifyAll()
def testProvisionLocalVlan(self):
@ -113,7 +113,7 @@ class TunnelTest(unittest.TestCase):
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo')
'sudo', 2, 2)
a.available_local_vlans = set([LV_ID])
a.provision_local_vlan(NET_UUID, LS_ID)
self.mox.VerifyAll()
@ -130,7 +130,7 @@ class TunnelTest(unittest.TestCase):
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo')
'sudo', 2, 2)
a.available_local_vlans = set()
a.local_vlan_map[NET_UUID] = LVM
a.reclaim_local_vlan(NET_UUID, LVM)
@ -147,7 +147,7 @@ class TunnelTest(unittest.TestCase):
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo')
'sudo', 2, 2)
a.local_vlan_map[NET_UUID] = LVM
a.port_bound(VIF_PORT, NET_UUID, LS_ID)
self.mox.VerifyAll()
@ -158,7 +158,7 @@ class TunnelTest(unittest.TestCase):
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo')
'sudo', 2, 2)
a.available_local_vlans = set([LV_ID])
a.local_vlan_map[NET_UUID] = LVM
a.port_unbound(VIF_PORT, NET_UUID)
@ -177,7 +177,7 @@ class TunnelTest(unittest.TestCase):
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo')
'sudo', 2, 2)
a.available_local_vlans = set([LV_ID])
a.local_vlan_map[NET_UUID] = LVM
a.port_dead(VIF_PORT)
@ -200,7 +200,7 @@ class TunnelTest(unittest.TestCase):
self.TUN_BRIDGE,
REMOTE_IP_FILE,
'10.0.0.1',
'sudo')
'sudo', 2, 2)
all_bindings = a.get_db_port_bindings(db)
lsw_id_bindings = a.get_db_vlan_bindings(db)