Remove database access from agents
A flag on the agent and plugin would indicate that the agent could access the database directly instead of using the RPC interface. This has been removed. Change-Id: I2f596a020f971e850f2c92b6597412523d006154
This commit is contained in:
parent
84f74d1bff
commit
e77879b79d
@ -51,5 +51,3 @@ polling_interval = 2
|
|||||||
# root filter facility.
|
# root filter facility.
|
||||||
# Change to "sudo" to skip the filtering and just run the comand directly
|
# Change to "sudo" to skip the filtering and just run the comand directly
|
||||||
root_helper = "sudo"
|
root_helper = "sudo"
|
||||||
# Use RPC messaging to interface between agent and plugin
|
|
||||||
# rpc = True
|
|
||||||
|
@ -445,180 +445,6 @@ class LinuxBridgeRpcCallbacks():
|
|||||||
return dispatcher.RpcDispatcher([self])
|
return dispatcher.RpcDispatcher([self])
|
||||||
|
|
||||||
|
|
||||||
class LinuxBridgeQuantumAgentDB:
|
|
||||||
|
|
||||||
def __init__(self, interface_mappings, polling_interval,
|
|
||||||
reconnect_interval, root_helper, db_connection_url):
|
|
||||||
self.polling_interval = polling_interval
|
|
||||||
self.root_helper = root_helper
|
|
||||||
self.setup_linux_bridge(interface_mappings)
|
|
||||||
self.reconnect_interval = reconnect_interval
|
|
||||||
self.db_connected = False
|
|
||||||
self.db_connection_url = db_connection_url
|
|
||||||
|
|
||||||
def setup_linux_bridge(self, interface_mappings):
|
|
||||||
self.linux_br = LinuxBridge(interface_mappings, self.root_helper)
|
|
||||||
|
|
||||||
def process_port_binding(self, network_id, interface_id,
|
|
||||||
physical_network, vlan_id):
|
|
||||||
return self.linux_br.add_interface(network_id,
|
|
||||||
physical_network, vlan_id,
|
|
||||||
interface_id)
|
|
||||||
|
|
||||||
def remove_port_binding(self, network_id, interface_id):
|
|
||||||
bridge_name = self.linux_br.get_bridge_name(network_id)
|
|
||||||
tap_device_name = self.linux_br.get_tap_device_name(interface_id)
|
|
||||||
return self.linux_br.remove_interface(bridge_name, tap_device_name)
|
|
||||||
|
|
||||||
def process_unplugged_interfaces(self, plugged_interfaces):
|
|
||||||
"""
|
|
||||||
If there are any tap devices that are not corresponding to the
|
|
||||||
list of attached VIFs, then those are corresponding to recently
|
|
||||||
unplugged VIFs, so we need to remove those tap devices from their
|
|
||||||
current bridge association
|
|
||||||
"""
|
|
||||||
plugged_tap_device_names = []
|
|
||||||
plugged_gateway_device_names = []
|
|
||||||
for interface in plugged_interfaces:
|
|
||||||
if interface.startswith(GATEWAY_INTERFACE_PREFIX):
|
|
||||||
"""
|
|
||||||
The name for the gateway devices is set by the linux net
|
|
||||||
driver, hence we use the name as is
|
|
||||||
"""
|
|
||||||
plugged_gateway_device_names.append(interface)
|
|
||||||
else:
|
|
||||||
tap_device_name = self.linux_br.get_tap_device_name(interface)
|
|
||||||
plugged_tap_device_names.append(tap_device_name)
|
|
||||||
|
|
||||||
LOG.debug("plugged tap device names %s" % plugged_tap_device_names)
|
|
||||||
for tap_device in self.linux_br.get_all_tap_devices():
|
|
||||||
if tap_device not in plugged_tap_device_names:
|
|
||||||
current_bridge_name = (
|
|
||||||
self.linux_br.get_bridge_for_tap_device(tap_device))
|
|
||||||
if current_bridge_name:
|
|
||||||
self.linux_br.remove_interface(current_bridge_name,
|
|
||||||
tap_device)
|
|
||||||
|
|
||||||
for gw_device in self.linux_br.get_all_gateway_devices():
|
|
||||||
if gw_device not in plugged_gateway_device_names:
|
|
||||||
current_bridge_name = (
|
|
||||||
self.linux_br.get_bridge_for_tap_device(gw_device))
|
|
||||||
if current_bridge_name:
|
|
||||||
self.linux_br.remove_interface(current_bridge_name,
|
|
||||||
gw_device)
|
|
||||||
|
|
||||||
def process_deleted_networks(self, vlan_bindings):
|
|
||||||
current_quantum_networks = vlan_bindings.keys()
|
|
||||||
current_quantum_bridge_names = []
|
|
||||||
for network in current_quantum_networks:
|
|
||||||
bridge_name = self.linux_br.get_bridge_name(network)
|
|
||||||
current_quantum_bridge_names.append(bridge_name)
|
|
||||||
|
|
||||||
quantum_bridges_on_this_host = self.linux_br.get_all_quantum_bridges()
|
|
||||||
for bridge in quantum_bridges_on_this_host:
|
|
||||||
if bridge not in current_quantum_bridge_names:
|
|
||||||
self.linux_br.delete_vlan_bridge(bridge)
|
|
||||||
|
|
||||||
def manage_networks_on_host(self, db,
|
|
||||||
old_vlan_bindings,
|
|
||||||
old_port_bindings):
|
|
||||||
vlan_bindings = {}
|
|
||||||
try:
|
|
||||||
network_binds = db.network_bindings.all()
|
|
||||||
except Exception as e:
|
|
||||||
LOG.info("Unable to get network bindings! Exception: %s" % e)
|
|
||||||
self.db_connected = False
|
|
||||||
return {VLAN_BINDINGS: {},
|
|
||||||
PORT_BINDINGS: []}
|
|
||||||
|
|
||||||
vlans_string = ""
|
|
||||||
for bind in network_binds:
|
|
||||||
entry = {'network_id': bind.network_id,
|
|
||||||
'physical_network': bind.physical_network,
|
|
||||||
'vlan_id': bind.vlan_id}
|
|
||||||
vlan_bindings[bind.network_id] = entry
|
|
||||||
vlans_string = "%s %s" % (vlans_string, entry)
|
|
||||||
|
|
||||||
port_bindings = []
|
|
||||||
try:
|
|
||||||
port_binds = db.ports.all()
|
|
||||||
except Exception as e:
|
|
||||||
LOG.info("Unable to get port bindings! Exception: %s" % e)
|
|
||||||
self.db_connected = False
|
|
||||||
return {VLAN_BINDINGS: {},
|
|
||||||
PORT_BINDINGS: []}
|
|
||||||
|
|
||||||
all_bindings = {}
|
|
||||||
for bind in port_binds:
|
|
||||||
append_entry = False
|
|
||||||
all_bindings[bind.id] = bind
|
|
||||||
entry = {'network_id': bind.network_id,
|
|
||||||
'uuid': bind.id,
|
|
||||||
'status': bind.status,
|
|
||||||
'interface_id': bind.id}
|
|
||||||
append_entry = bind.admin_state_up
|
|
||||||
if append_entry:
|
|
||||||
port_bindings.append(entry)
|
|
||||||
|
|
||||||
plugged_interfaces = []
|
|
||||||
ports_string = ""
|
|
||||||
for pb in port_bindings:
|
|
||||||
ports_string = "%s %s" % (ports_string, pb)
|
|
||||||
port_id = pb['uuid']
|
|
||||||
interface_id = pb['interface_id']
|
|
||||||
network_id = pb['network_id']
|
|
||||||
|
|
||||||
physical_network = vlan_bindings[network_id]['physical_network']
|
|
||||||
vlan_id = str(vlan_bindings[network_id]['vlan_id'])
|
|
||||||
if self.process_port_binding(network_id,
|
|
||||||
interface_id,
|
|
||||||
physical_network,
|
|
||||||
vlan_id):
|
|
||||||
all_bindings[port_id].status = constants.PORT_STATUS_ACTIVE
|
|
||||||
|
|
||||||
plugged_interfaces.append(interface_id)
|
|
||||||
|
|
||||||
if old_port_bindings != port_bindings:
|
|
||||||
LOG.debug("Port-bindings: %s" % ports_string)
|
|
||||||
|
|
||||||
self.process_unplugged_interfaces(plugged_interfaces)
|
|
||||||
|
|
||||||
if old_vlan_bindings != vlan_bindings:
|
|
||||||
LOG.debug("VLAN-bindings: %s" % vlans_string)
|
|
||||||
|
|
||||||
self.process_deleted_networks(vlan_bindings)
|
|
||||||
|
|
||||||
try:
|
|
||||||
db.commit()
|
|
||||||
except Exception as e:
|
|
||||||
LOG.info("Unable to update database! Exception: %s" % e)
|
|
||||||
db.rollback()
|
|
||||||
vlan_bindings = {}
|
|
||||||
port_bindings = []
|
|
||||||
|
|
||||||
return {VLAN_BINDINGS: vlan_bindings,
|
|
||||||
PORT_BINDINGS: port_bindings}
|
|
||||||
|
|
||||||
def daemon_loop(self):
|
|
||||||
old_vlan_bindings = {}
|
|
||||||
old_port_bindings = []
|
|
||||||
self.db_connected = False
|
|
||||||
|
|
||||||
while True:
|
|
||||||
if not self.db_connected:
|
|
||||||
time.sleep(self.reconnect_interval)
|
|
||||||
db = SqlSoup(self.db_connection_url)
|
|
||||||
self.db_connected = True
|
|
||||||
LOG.info("Connecting to database \"%s\" on %s" %
|
|
||||||
(db.engine.url.database, db.engine.url.host))
|
|
||||||
bindings = self.manage_networks_on_host(db,
|
|
||||||
old_vlan_bindings,
|
|
||||||
old_port_bindings)
|
|
||||||
old_vlan_bindings = bindings[VLAN_BINDINGS]
|
|
||||||
old_port_bindings = bindings[PORT_BINDINGS]
|
|
||||||
time.sleep(self.polling_interval)
|
|
||||||
|
|
||||||
|
|
||||||
class LinuxBridgeQuantumAgentRPC:
|
class LinuxBridgeQuantumAgentRPC:
|
||||||
|
|
||||||
def __init__(self, interface_mappings, polling_interval,
|
def __init__(self, interface_mappings, polling_interval,
|
||||||
@ -807,20 +633,10 @@ def main():
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
polling_interval = cfg.CONF.AGENT.polling_interval
|
polling_interval = cfg.CONF.AGENT.polling_interval
|
||||||
reconnect_interval = cfg.CONF.DATABASE.reconnect_interval
|
|
||||||
root_helper = cfg.CONF.AGENT.root_helper
|
root_helper = cfg.CONF.AGENT.root_helper
|
||||||
rpc = cfg.CONF.AGENT.rpc
|
|
||||||
if rpc:
|
|
||||||
plugin = LinuxBridgeQuantumAgentRPC(interface_mappings,
|
plugin = LinuxBridgeQuantumAgentRPC(interface_mappings,
|
||||||
polling_interval,
|
polling_interval,
|
||||||
root_helper)
|
root_helper)
|
||||||
else:
|
|
||||||
db_connection_url = cfg.CONF.DATABASE.sql_connection
|
|
||||||
plugin = LinuxBridgeQuantumAgentDB(interface_mappings,
|
|
||||||
polling_interval,
|
|
||||||
reconnect_interval,
|
|
||||||
root_helper,
|
|
||||||
db_connection_url)
|
|
||||||
LOG.info("Agent initialized successfully, now running... ")
|
LOG.info("Agent initialized successfully, now running... ")
|
||||||
plugin.daemon_loop()
|
plugin.daemon_loop()
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
@ -48,7 +48,6 @@ bridge_opts = [
|
|||||||
agent_opts = [
|
agent_opts = [
|
||||||
cfg.IntOpt('polling_interval', default=2),
|
cfg.IntOpt('polling_interval', default=2),
|
||||||
cfg.StrOpt('root_helper', default='sudo'),
|
cfg.StrOpt('root_helper', default='sudo'),
|
||||||
cfg.BoolOpt('rpc', default=True),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -167,7 +167,6 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
|||||||
"Service terminated!" %
|
"Service terminated!" %
|
||||||
self.tenant_network_type)
|
self.tenant_network_type)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
self.agent_rpc = cfg.CONF.AGENT.rpc
|
|
||||||
self._setup_rpc()
|
self._setup_rpc()
|
||||||
LOG.debug("Linux Bridge Plugin initialization complete")
|
LOG.debug("Linux Bridge Plugin initialization complete")
|
||||||
|
|
||||||
@ -378,7 +377,6 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
|||||||
binding.vlan_id, self.network_vlan_ranges)
|
binding.vlan_id, self.network_vlan_ranges)
|
||||||
# the network_binding record is deleted via cascade from
|
# the network_binding record is deleted via cascade from
|
||||||
# the network record, so explicit removal is not necessary
|
# the network record, so explicit removal is not necessary
|
||||||
if self.agent_rpc:
|
|
||||||
self.notifier.network_delete(self.rpc_context, id)
|
self.notifier.network_delete(self.rpc_context, id)
|
||||||
|
|
||||||
def get_network(self, context, id, fields=None):
|
def get_network(self, context, id, fields=None):
|
||||||
@ -400,11 +398,9 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
|||||||
return [self._fields(net, fields) for net in nets]
|
return [self._fields(net, fields) for net in nets]
|
||||||
|
|
||||||
def update_port(self, context, id, port):
|
def update_port(self, context, id, port):
|
||||||
if self.agent_rpc:
|
|
||||||
original_port = super(LinuxBridgePluginV2, self).get_port(context,
|
original_port = super(LinuxBridgePluginV2, self).get_port(context,
|
||||||
id)
|
id)
|
||||||
port = super(LinuxBridgePluginV2, self).update_port(context, id, port)
|
port = super(LinuxBridgePluginV2, self).update_port(context, id, port)
|
||||||
if self.agent_rpc:
|
|
||||||
if original_port['admin_state_up'] != port['admin_state_up']:
|
if original_port['admin_state_up'] != port['admin_state_up']:
|
||||||
binding = db.get_network_binding(context.session,
|
binding = db.get_network_binding(context.session,
|
||||||
port['network_id'])
|
port['network_id'])
|
||||||
|
@ -135,7 +135,7 @@ class OVSQuantumAgent(object):
|
|||||||
|
|
||||||
def __init__(self, integ_br, tun_br, local_ip,
|
def __init__(self, integ_br, tun_br, local_ip,
|
||||||
bridge_mappings, root_helper,
|
bridge_mappings, root_helper,
|
||||||
polling_interval, reconnect_interval, rpc, enable_tunneling):
|
polling_interval, enable_tunneling):
|
||||||
'''Constructor.
|
'''Constructor.
|
||||||
|
|
||||||
:param integ_br: name of the integration bridge.
|
:param integ_br: name of the integration bridge.
|
||||||
@ -144,8 +144,6 @@ class OVSQuantumAgent(object):
|
|||||||
:param bridge_mappings: mappings from phyiscal interface to bridge.
|
:param bridge_mappings: mappings from phyiscal interface to bridge.
|
||||||
:param root_helper: utility to use when running shell cmds.
|
:param root_helper: utility to use when running shell cmds.
|
||||||
:param polling_interval: interval (secs) to poll DB.
|
:param polling_interval: interval (secs) to poll DB.
|
||||||
:param reconnect_internal: retry interval (secs) on DB error.
|
|
||||||
:param rpc: if True use RPC interface to interface with plugin.
|
|
||||||
:param enable_tunneling: if True enable GRE networks.
|
:param enable_tunneling: if True enable GRE networks.
|
||||||
'''
|
'''
|
||||||
self.root_helper = root_helper
|
self.root_helper = root_helper
|
||||||
@ -157,7 +155,6 @@ class OVSQuantumAgent(object):
|
|||||||
self.local_vlan_map = {}
|
self.local_vlan_map = {}
|
||||||
|
|
||||||
self.polling_interval = polling_interval
|
self.polling_interval = polling_interval
|
||||||
self.reconnect_interval = reconnect_interval
|
|
||||||
|
|
||||||
self.enable_tunneling = enable_tunneling
|
self.enable_tunneling = enable_tunneling
|
||||||
self.local_ip = local_ip
|
self.local_ip = local_ip
|
||||||
@ -165,8 +162,6 @@ class OVSQuantumAgent(object):
|
|||||||
if self.enable_tunneling:
|
if self.enable_tunneling:
|
||||||
self.setup_tunnel_br(tun_br)
|
self.setup_tunnel_br(tun_br)
|
||||||
|
|
||||||
self.rpc = rpc
|
|
||||||
if rpc:
|
|
||||||
self.setup_rpc(integ_br)
|
self.setup_rpc(integ_br)
|
||||||
|
|
||||||
def setup_rpc(self, integ_br):
|
def setup_rpc(self, integ_br):
|
||||||
@ -510,150 +505,6 @@ class OVSQuantumAgent(object):
|
|||||||
int_veth.link.set_up()
|
int_veth.link.set_up()
|
||||||
phys_veth.link.set_up()
|
phys_veth.link.set_up()
|
||||||
|
|
||||||
def manage_tunnels(self, tunnel_ips, old_tunnel_ips, db):
|
|
||||||
if self.local_ip in tunnel_ips:
|
|
||||||
tunnel_ips.remove(self.local_ip)
|
|
||||||
else:
|
|
||||||
db.ovs_tunnel_ips.insert(ip_address=self.local_ip)
|
|
||||||
|
|
||||||
new_tunnel_ips = tunnel_ips - old_tunnel_ips
|
|
||||||
if new_tunnel_ips:
|
|
||||||
LOG.info("Adding tunnels to: %s", new_tunnel_ips)
|
|
||||||
for ip in new_tunnel_ips:
|
|
||||||
tun_name = "gre-" + str(self.tunnel_count)
|
|
||||||
self.tun_br.add_tunnel_port(tun_name, ip)
|
|
||||||
self.tunnel_count += 1
|
|
||||||
|
|
||||||
def rollback_until_success(self, db):
|
|
||||||
while True:
|
|
||||||
time.sleep(self.reconnect_interval)
|
|
||||||
try:
|
|
||||||
db.rollback()
|
|
||||||
break
|
|
||||||
except:
|
|
||||||
LOG.exception("Problem connecting to database")
|
|
||||||
|
|
||||||
def db_loop(self, db_connection_url):
|
|
||||||
'''Main processing loop for Tunneling Agent.
|
|
||||||
|
|
||||||
:param options: database information - in the event need to reconnect
|
|
||||||
'''
|
|
||||||
old_local_bindings = {}
|
|
||||||
old_vif_ports = {}
|
|
||||||
old_tunnel_ips = set()
|
|
||||||
|
|
||||||
db = sqlsoup.SqlSoup(db_connection_url)
|
|
||||||
LOG.info("Connecting to database \"%s\" on %s",
|
|
||||||
db.engine.url.database, db.engine.url.host)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
all_bindings = dict((p.id, Port(p))
|
|
||||||
for p in db.ports.all())
|
|
||||||
all_bindings_vif_port_ids = set(all_bindings)
|
|
||||||
net_bindings = dict((bind.network_id, bind)
|
|
||||||
for bind in
|
|
||||||
db.ovs_network_bindings.all())
|
|
||||||
|
|
||||||
if self.enable_tunneling:
|
|
||||||
tunnel_ips = set(x.ip_address for x in
|
|
||||||
db.ovs_tunnel_ips.all())
|
|
||||||
self.manage_tunnels(tunnel_ips, old_tunnel_ips, db)
|
|
||||||
|
|
||||||
# Get bindings from OVS bridge.
|
|
||||||
vif_ports = self.int_br.get_vif_ports()
|
|
||||||
new_vif_ports = dict([(p.vif_id, p) for p in vif_ports])
|
|
||||||
new_vif_ports_ids = set(new_vif_ports.keys())
|
|
||||||
|
|
||||||
old_vif_ports_ids = set(old_vif_ports.keys())
|
|
||||||
dead_vif_ports_ids = (new_vif_ports_ids -
|
|
||||||
all_bindings_vif_port_ids)
|
|
||||||
dead_vif_ports = [new_vif_ports[p] for p in dead_vif_ports_ids]
|
|
||||||
disappeared_vif_ports_ids = (old_vif_ports_ids -
|
|
||||||
new_vif_ports_ids)
|
|
||||||
new_local_bindings_ids = (all_bindings_vif_port_ids.
|
|
||||||
intersection(new_vif_ports_ids))
|
|
||||||
new_local_bindings = dict([(p, all_bindings.get(p))
|
|
||||||
for p in new_vif_ports_ids])
|
|
||||||
new_bindings = set(
|
|
||||||
(p, old_local_bindings.get(p),
|
|
||||||
new_local_bindings.get(p)) for p in new_vif_ports_ids)
|
|
||||||
changed_bindings = set([b for b in new_bindings
|
|
||||||
if b[2] != b[1]])
|
|
||||||
|
|
||||||
LOG.debug('all_bindings: %s', all_bindings)
|
|
||||||
LOG.debug('net_bindings: %s', net_bindings)
|
|
||||||
LOG.debug('new_vif_ports_ids: %s', new_vif_ports_ids)
|
|
||||||
LOG.debug('dead_vif_ports_ids: %s', dead_vif_ports_ids)
|
|
||||||
LOG.debug('old_vif_ports_ids: %s', old_vif_ports_ids)
|
|
||||||
LOG.debug('new_local_bindings_ids: %s',
|
|
||||||
new_local_bindings_ids)
|
|
||||||
LOG.debug('new_local_bindings: %s', new_local_bindings)
|
|
||||||
LOG.debug('new_bindings: %s', new_bindings)
|
|
||||||
LOG.debug('changed_bindings: %s', changed_bindings)
|
|
||||||
|
|
||||||
# Take action.
|
|
||||||
for p in dead_vif_ports:
|
|
||||||
LOG.info("No quantum binding for port " + str(p)
|
|
||||||
+ "putting on dead vlan")
|
|
||||||
self.port_dead(p)
|
|
||||||
|
|
||||||
for b in changed_bindings:
|
|
||||||
port_id, old_port, new_port = b
|
|
||||||
p = new_vif_ports[port_id]
|
|
||||||
if old_port:
|
|
||||||
old_net_uuid = old_port.network_id
|
|
||||||
LOG.info("Removing binding to net-id = " +
|
|
||||||
old_net_uuid + " for " + str(p)
|
|
||||||
+ " added to dead vlan")
|
|
||||||
self.port_unbound(p.vif_id, old_net_uuid)
|
|
||||||
if p.vif_id in all_bindings:
|
|
||||||
all_bindings[p.vif_id].status = (
|
|
||||||
q_const.PORT_STATUS_DOWN)
|
|
||||||
if not new_port:
|
|
||||||
self.port_dead(p)
|
|
||||||
|
|
||||||
if new_port:
|
|
||||||
new_net_uuid = new_port.network_id
|
|
||||||
if new_net_uuid not in net_bindings:
|
|
||||||
LOG.warn("No network binding found for net-id"
|
|
||||||
" '%s'", new_net_uuid)
|
|
||||||
continue
|
|
||||||
|
|
||||||
bind = net_bindings[new_net_uuid]
|
|
||||||
self.port_bound(p, new_net_uuid,
|
|
||||||
bind.network_type,
|
|
||||||
bind.physical_network,
|
|
||||||
bind.segmentation_id)
|
|
||||||
all_bindings[p.vif_id].status = (
|
|
||||||
q_const.PORT_STATUS_ACTIVE)
|
|
||||||
LOG.info("Port %s on net-id = %s bound to %s ",
|
|
||||||
str(p), new_net_uuid,
|
|
||||||
str(self.local_vlan_map[new_net_uuid]))
|
|
||||||
|
|
||||||
for vif_id in disappeared_vif_ports_ids:
|
|
||||||
LOG.info("Port Disappeared: " + vif_id)
|
|
||||||
if vif_id in all_bindings:
|
|
||||||
all_bindings[vif_id].status = (
|
|
||||||
q_const.PORT_STATUS_DOWN)
|
|
||||||
old_port = old_local_bindings.get(vif_id)
|
|
||||||
if old_port:
|
|
||||||
self.port_unbound(vif_id, old_port.network_id)
|
|
||||||
# commit any DB changes and expire
|
|
||||||
# data loaded from the database
|
|
||||||
db.commit()
|
|
||||||
|
|
||||||
# sleep and re-initialize state for next pass
|
|
||||||
time.sleep(self.polling_interval)
|
|
||||||
if self.enable_tunneling:
|
|
||||||
old_tunnel_ips = tunnel_ips
|
|
||||||
old_vif_ports = new_vif_ports
|
|
||||||
old_local_bindings = new_local_bindings
|
|
||||||
|
|
||||||
except:
|
|
||||||
LOG.exception("Main-loop Exception:")
|
|
||||||
self.rollback_until_success(db)
|
|
||||||
|
|
||||||
def update_ports(self, registered_ports):
|
def update_ports(self, registered_ports):
|
||||||
ports = self.int_br.get_vif_port_set()
|
ports = self.int_br.get_vif_port_set()
|
||||||
if ports == registered_ports:
|
if ports == registered_ports:
|
||||||
@ -786,11 +637,8 @@ class OVSQuantumAgent(object):
|
|||||||
LOG.debug("Loop iteration exceeded interval (%s vs. %s)!",
|
LOG.debug("Loop iteration exceeded interval (%s vs. %s)!",
|
||||||
self.polling_interval, elapsed)
|
self.polling_interval, elapsed)
|
||||||
|
|
||||||
def daemon_loop(self, db_connection_url):
|
def daemon_loop(self):
|
||||||
if self.rpc:
|
|
||||||
self.rpc_loop()
|
self.rpc_loop()
|
||||||
else:
|
|
||||||
self.db_loop(db_connection_url)
|
|
||||||
|
|
||||||
|
|
||||||
def parse_bridge_mappings(bridge_mapping_list):
|
def parse_bridge_mappings(bridge_mapping_list):
|
||||||
@ -827,8 +675,6 @@ def create_agent_config_map(config):
|
|||||||
bridge_mappings=bridge_mappings,
|
bridge_mappings=bridge_mappings,
|
||||||
root_helper=config.AGENT.root_helper,
|
root_helper=config.AGENT.root_helper,
|
||||||
polling_interval=config.AGENT.polling_interval,
|
polling_interval=config.AGENT.polling_interval,
|
||||||
reconnect_interval=config.DATABASE.reconnect_interval,
|
|
||||||
rpc=config.AGENT.rpc,
|
|
||||||
enable_tunneling=config.OVS.enable_tunneling,
|
enable_tunneling=config.OVS.enable_tunneling,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -856,8 +702,7 @@ def main():
|
|||||||
|
|
||||||
# Start everything.
|
# Start everything.
|
||||||
LOG.info("Agent initialized successfully, now running... ")
|
LOG.info("Agent initialized successfully, now running... ")
|
||||||
plugin.daemon_loop(cfg.CONF.DATABASE.sql_connection)
|
plugin.daemon_loop()
|
||||||
|
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
|
@ -50,7 +50,6 @@ ovs_opts = [
|
|||||||
agent_opts = [
|
agent_opts = [
|
||||||
cfg.IntOpt('polling_interval', default=2),
|
cfg.IntOpt('polling_interval', default=2),
|
||||||
cfg.StrOpt('root_helper', default='sudo'),
|
cfg.StrOpt('root_helper', default='sudo'),
|
||||||
cfg.BoolOpt('rpc', default=True),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -213,7 +213,6 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
|||||||
LOG.error("Tunneling disabled but tenant_network_type is 'gre'. "
|
LOG.error("Tunneling disabled but tenant_network_type is 'gre'. "
|
||||||
"Agent terminated!")
|
"Agent terminated!")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
self.agent_rpc = cfg.CONF.AGENT.rpc
|
|
||||||
self.setup_rpc()
|
self.setup_rpc()
|
||||||
|
|
||||||
def setup_rpc(self):
|
def setup_rpc(self):
|
||||||
@ -461,7 +460,6 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
|||||||
self.network_vlan_ranges)
|
self.network_vlan_ranges)
|
||||||
# the network_binding record is deleted via cascade from
|
# the network_binding record is deleted via cascade from
|
||||||
# the network record, so explicit removal is not necessary
|
# the network record, so explicit removal is not necessary
|
||||||
if self.agent_rpc:
|
|
||||||
self.notifier.network_delete(self.rpc_context, id)
|
self.notifier.network_delete(self.rpc_context, id)
|
||||||
|
|
||||||
def get_network(self, context, id, fields=None):
|
def get_network(self, context, id, fields=None):
|
||||||
@ -483,11 +481,9 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
|||||||
return [self._fields(net, fields) for net in nets]
|
return [self._fields(net, fields) for net in nets]
|
||||||
|
|
||||||
def update_port(self, context, id, port):
|
def update_port(self, context, id, port):
|
||||||
if self.agent_rpc:
|
|
||||||
original_port = super(OVSQuantumPluginV2, self).get_port(context,
|
original_port = super(OVSQuantumPluginV2, self).get_port(context,
|
||||||
id)
|
id)
|
||||||
port = super(OVSQuantumPluginV2, self).update_port(context, id, port)
|
port = super(OVSQuantumPluginV2, self).update_port(context, id, port)
|
||||||
if self.agent_rpc:
|
|
||||||
if original_port['admin_state_up'] != port['admin_state_up']:
|
if original_port['admin_state_up'] != port['admin_state_up']:
|
||||||
binding = ovs_db_v2.get_network_binding(None,
|
binding = ovs_db_v2.get_network_binding(None,
|
||||||
port['network_id'])
|
port['network_id'])
|
||||||
|
@ -30,7 +30,6 @@ class ConfigurationTest(unittest.TestCase):
|
|||||||
self.assertEqual(2, cfg.CONF.DATABASE.reconnect_interval)
|
self.assertEqual(2, cfg.CONF.DATABASE.reconnect_interval)
|
||||||
self.assertEqual(2, cfg.CONF.AGENT.polling_interval)
|
self.assertEqual(2, cfg.CONF.AGENT.polling_interval)
|
||||||
self.assertEqual('sudo', cfg.CONF.AGENT.root_helper)
|
self.assertEqual('sudo', cfg.CONF.AGENT.root_helper)
|
||||||
self.assertTrue(cfg.CONF.AGENT.rpc)
|
|
||||||
self.assertEqual('local', cfg.CONF.OVS.tenant_network_type)
|
self.assertEqual('local', cfg.CONF.OVS.tenant_network_type)
|
||||||
self.assertEqual(0, len(cfg.CONF.OVS.bridge_mappings))
|
self.assertEqual(0, len(cfg.CONF.OVS.bridge_mappings))
|
||||||
self.assertEqual(0, len(cfg.CONF.OVS.network_vlan_ranges))
|
self.assertEqual(0, len(cfg.CONF.OVS.network_vlan_ranges))
|
||||||
|
@ -64,11 +64,14 @@ class TestOvsQuantumAgent(unittest.TestCase):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.addCleanup(cfg.CONF.reset)
|
self.addCleanup(cfg.CONF.reset)
|
||||||
# Avoid rpc initialization for unit tests
|
# Avoid rpc initialization for unit tests
|
||||||
cfg.CONF.set_override('rpc', False, group='AGENT')
|
cfg.CONF.set_override('rpc_backend',
|
||||||
|
'quantum.openstack.common.rpc.impl_fake')
|
||||||
kwargs = ovs_quantum_agent.create_agent_config_map(cfg.CONF)
|
kwargs = ovs_quantum_agent.create_agent_config_map(cfg.CONF)
|
||||||
with mock.patch('quantum.plugins.openvswitch.agent.ovs_quantum_agent.'
|
with mock.patch('quantum.plugins.openvswitch.agent.ovs_quantum_agent.'
|
||||||
'OVSQuantumAgent.setup_integration_br',
|
'OVSQuantumAgent.setup_integration_br',
|
||||||
return_value=mock.Mock()):
|
return_value=mock.Mock()):
|
||||||
|
with mock.patch('quantum.agent.linux.utils.get_interface_mac',
|
||||||
|
return_value='000000000001'):
|
||||||
self.agent = ovs_quantum_agent.OVSQuantumAgent(**kwargs)
|
self.agent = ovs_quantum_agent.OVSQuantumAgent(**kwargs)
|
||||||
self.agent.plugin_rpc = mock.Mock()
|
self.agent.plugin_rpc = mock.Mock()
|
||||||
self.agent.context = mock.Mock()
|
self.agent.context = mock.Mock()
|
||||||
@ -78,12 +81,9 @@ class TestOvsQuantumAgent(unittest.TestCase):
|
|||||||
port = mock.Mock()
|
port = mock.Mock()
|
||||||
port.ofport = ofport
|
port.ofport = ofport
|
||||||
net_uuid = 'my-net-uuid'
|
net_uuid = 'my-net-uuid'
|
||||||
with mock.patch.object(self.agent.int_br,
|
|
||||||
'set_db_attribute') as db_func:
|
|
||||||
with mock.patch.object(self.agent.int_br,
|
with mock.patch.object(self.agent.int_br,
|
||||||
'delete_flows') as delete_flows_func:
|
'delete_flows') as delete_flows_func:
|
||||||
self.agent.port_bound(port, net_uuid, 'local', None, None)
|
self.agent.port_bound(port, net_uuid, 'local', None, None)
|
||||||
self.assertTrue(db_func.called)
|
|
||||||
self.assertEqual(delete_flows_func.called, ofport != -1)
|
self.assertEqual(delete_flows_func.called, ofport != -1)
|
||||||
|
|
||||||
def test_port_bound_deletes_flows_for_valid_ofport(self):
|
def test_port_bound_deletes_flows_for_valid_ofport(self):
|
||||||
@ -93,12 +93,9 @@ class TestOvsQuantumAgent(unittest.TestCase):
|
|||||||
self.mock_port_bound(ofport=-1)
|
self.mock_port_bound(ofport=-1)
|
||||||
|
|
||||||
def test_port_dead(self):
|
def test_port_dead(self):
|
||||||
with mock.patch.object(self.agent.int_br,
|
|
||||||
'set_db_attribute') as db_func:
|
|
||||||
with mock.patch.object(self.agent.int_br,
|
with mock.patch.object(self.agent.int_br,
|
||||||
'add_flow') as add_flow_func:
|
'add_flow') as add_flow_func:
|
||||||
self.agent.port_dead(mock.Mock())
|
self.agent.port_dead(mock.Mock())
|
||||||
self.assertTrue(db_func.called)
|
|
||||||
self.assertTrue(add_flow_func.called)
|
self.assertTrue(add_flow_func.called)
|
||||||
|
|
||||||
def mock_update_ports(self, vif_port_set=None, registered_ports=None):
|
def mock_update_ports(self, vif_port_set=None, registered_ports=None):
|
||||||
|
@ -21,6 +21,7 @@ import unittest
|
|||||||
import mox
|
import mox
|
||||||
|
|
||||||
from quantum.agent.linux import ovs_lib
|
from quantum.agent.linux import ovs_lib
|
||||||
|
from quantum.agent.linux import utils
|
||||||
from quantum.plugins.openvswitch.agent import ovs_quantum_agent
|
from quantum.plugins.openvswitch.agent import ovs_quantum_agent
|
||||||
|
|
||||||
# Useful global dummy variables.
|
# Useful global dummy variables.
|
||||||
@ -74,6 +75,9 @@ class TunnelTest(unittest.TestCase):
|
|||||||
self.mock_tun_bridge.remove_all_flows()
|
self.mock_tun_bridge.remove_all_flows()
|
||||||
self.mock_tun_bridge.add_flow(priority=1, actions='drop')
|
self.mock_tun_bridge.add_flow(priority=1, actions='drop')
|
||||||
|
|
||||||
|
self.mox.StubOutWithMock(utils, 'get_interface_mac')
|
||||||
|
utils.get_interface_mac(self.INT_BRIDGE).AndReturn('000000000001')
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.mox.UnsetStubs()
|
self.mox.UnsetStubs()
|
||||||
|
|
||||||
@ -83,7 +87,7 @@ class TunnelTest(unittest.TestCase):
|
|||||||
b = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
|
b = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
|
||||||
self.TUN_BRIDGE,
|
self.TUN_BRIDGE,
|
||||||
'10.0.0.1', {},
|
'10.0.0.1', {},
|
||||||
'sudo', 2, 2, False, True)
|
'sudo', 2, True)
|
||||||
self.mox.VerifyAll()
|
self.mox.VerifyAll()
|
||||||
|
|
||||||
def testProvisionLocalVlan(self):
|
def testProvisionLocalVlan(self):
|
||||||
@ -100,7 +104,7 @@ class TunnelTest(unittest.TestCase):
|
|||||||
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
|
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
|
||||||
self.TUN_BRIDGE,
|
self.TUN_BRIDGE,
|
||||||
'10.0.0.1', {},
|
'10.0.0.1', {},
|
||||||
'sudo', 2, 2, False, True)
|
'sudo', 2, True)
|
||||||
a.available_local_vlans = set([LV_ID])
|
a.available_local_vlans = set([LV_ID])
|
||||||
a.provision_local_vlan(NET_UUID, 'gre', None, LS_ID)
|
a.provision_local_vlan(NET_UUID, 'gre', None, LS_ID)
|
||||||
self.mox.VerifyAll()
|
self.mox.VerifyAll()
|
||||||
@ -114,7 +118,7 @@ class TunnelTest(unittest.TestCase):
|
|||||||
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
|
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
|
||||||
self.TUN_BRIDGE,
|
self.TUN_BRIDGE,
|
||||||
'10.0.0.1', {},
|
'10.0.0.1', {},
|
||||||
'sudo', 2, 2, False, True)
|
'sudo', 2, True)
|
||||||
a.available_local_vlans = set()
|
a.available_local_vlans = set()
|
||||||
a.local_vlan_map[NET_UUID] = LVM
|
a.local_vlan_map[NET_UUID] = LVM
|
||||||
a.reclaim_local_vlan(NET_UUID, LVM)
|
a.reclaim_local_vlan(NET_UUID, LVM)
|
||||||
@ -135,7 +139,7 @@ class TunnelTest(unittest.TestCase):
|
|||||||
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
|
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
|
||||||
self.TUN_BRIDGE,
|
self.TUN_BRIDGE,
|
||||||
'10.0.0.1', {},
|
'10.0.0.1', {},
|
||||||
'sudo', 2, 2, False, True)
|
'sudo', 2, True)
|
||||||
a.local_vlan_map[NET_UUID] = LVM
|
a.local_vlan_map[NET_UUID] = LVM
|
||||||
a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID)
|
a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID)
|
||||||
self.mox.VerifyAll()
|
self.mox.VerifyAll()
|
||||||
@ -146,7 +150,7 @@ class TunnelTest(unittest.TestCase):
|
|||||||
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
|
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
|
||||||
self.TUN_BRIDGE,
|
self.TUN_BRIDGE,
|
||||||
'10.0.0.1', {},
|
'10.0.0.1', {},
|
||||||
'sudo', 2, 2, False, True)
|
'sudo', 2, True)
|
||||||
a.available_local_vlans = set([LV_ID])
|
a.available_local_vlans = set([LV_ID])
|
||||||
a.local_vlan_map[NET_UUID] = LVM
|
a.local_vlan_map[NET_UUID] = LVM
|
||||||
a.port_unbound(VIF_ID, NET_UUID)
|
a.port_unbound(VIF_ID, NET_UUID)
|
||||||
@ -163,7 +167,7 @@ class TunnelTest(unittest.TestCase):
|
|||||||
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
|
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
|
||||||
self.TUN_BRIDGE,
|
self.TUN_BRIDGE,
|
||||||
'10.0.0.1', {},
|
'10.0.0.1', {},
|
||||||
'sudo', 2, 2, False, True)
|
'sudo', 2, True)
|
||||||
a.available_local_vlans = set([LV_ID])
|
a.available_local_vlans = set([LV_ID])
|
||||||
a.local_vlan_map[NET_UUID] = LVM
|
a.local_vlan_map[NET_UUID] = LVM
|
||||||
a.port_dead(VIF_PORT)
|
a.port_dead(VIF_PORT)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user