Lock tables for update on allocation/deletion

Allocating, creating and deleting port might happen
in parallel and we need to make sure we don't
assign same IP to multiple different requests.

Added treatment for vlan tags and tunnel ID's

Fixes: bug #1110807

Change-Id: Idbb04d3ce6eacd308b05536f1942a35a0792199e
This commit is contained in:
Ante Karamatic 2013-02-07 12:33:49 +01:00 committed by Gary Kotton
parent 71bb9359f0
commit 79b91a500d
3 changed files with 25 additions and 9 deletions

View File

@ -282,7 +282,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
@staticmethod @staticmethod
def _hold_ip(context, network_id, subnet_id, port_id, ip_address): def _hold_ip(context, network_id, subnet_id, port_id, ip_address):
alloc_qry = context.session.query(models_v2.IPAllocation) alloc_qry = context.session.query(
models_v2.IPAllocation).with_lockmode('update')
allocated = alloc_qry.filter_by(network_id=network_id, allocated = alloc_qry.filter_by(network_id=network_id,
port_id=port_id, port_id=port_id,
ip_address=ip_address, ip_address=ip_address,
@ -306,7 +307,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
if network_id in getattr(context, '_recycled_networks', set()): if network_id in getattr(context, '_recycled_networks', set()):
return return
expired_qry = context.session.query(models_v2.IPAllocation) expired_qry = context.session.query(
models_v2.IPAllocation).with_lockmode('update')
expired_qry = expired_qry.filter_by(network_id=network_id, expired_qry = expired_qry.filter_by(network_id=network_id,
port_id=None) port_id=None)
expired_qry = expired_qry.filter( expired_qry = expired_qry.filter(
@ -329,7 +331,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
subnet. subnet.
""" """
# Grab all allocation pools for the subnet # Grab all allocation pools for the subnet
pool_qry = context.session.query(models_v2.IPAllocationPool) pool_qry = context.session.query(
models_v2.IPAllocationPool).with_lockmode('update')
allocation_pools = pool_qry.filter_by(subnet_id=subnet_id).all() allocation_pools = pool_qry.filter_by(subnet_id=subnet_id).all()
# Find the allocation pool for the IP to recycle # Find the allocation pool for the IP to recycle
pool_id = None pool_id = None
@ -350,7 +353,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
# If 1 of the above holds true then the specific entry will be # If 1 of the above holds true then the specific entry will be
# modified. If both hold true then the two ranges will be merged. # modified. If both hold true then the two ranges will be merged.
# If there are no entries then a single entry will be added. # If there are no entries then a single entry will be added.
range_qry = context.session.query(models_v2.IPAvailabilityRange) range_qry = context.session.query(
models_v2.IPAvailabilityRange).with_lockmode('update')
ip_first = str(netaddr.IPAddress(ip_address) + 1) ip_first = str(netaddr.IPAddress(ip_address) + 1)
ip_last = str(netaddr.IPAddress(ip_address) - 1) ip_last = str(netaddr.IPAddress(ip_address) - 1)
LOG.debug(_("Recycle %s"), ip_address) LOG.debug(_("Recycle %s"), ip_address)
@ -433,7 +437,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
# Delete the IP address from the IPAllocate table # Delete the IP address from the IPAllocate table
LOG.debug(_("Delete allocated IP %(ip_address)s " LOG.debug(_("Delete allocated IP %(ip_address)s "
"(%(network_id)s/%(subnet_id)s)"), locals()) "(%(network_id)s/%(subnet_id)s)"), locals())
alloc_qry = context.session.query(models_v2.IPAllocation) alloc_qry = context.session.query(
models_v2.IPAllocation).with_lockmode('update')
allocated = alloc_qry.filter_by(network_id=network_id, allocated = alloc_qry.filter_by(network_id=network_id,
ip_address=ip_address, ip_address=ip_address,
subnet_id=subnet_id).delete() subnet_id=subnet_id).delete()
@ -447,7 +452,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
""" """
range_qry = context.session.query( range_qry = context.session.query(
models_v2.IPAvailabilityRange).join( models_v2.IPAvailabilityRange).join(
models_v2.IPAllocationPool) models_v2.IPAllocationPool).with_lockmode('update')
for subnet in subnets: for subnet in subnets:
range = range_qry.filter_by(subnet_id=subnet['id']).first() range = range_qry.filter_by(subnet_id=subnet['id']).first()
if not range: if not range:
@ -479,7 +484,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
range_qry = context.session.query( range_qry = context.session.query(
models_v2.IPAvailabilityRange, models_v2.IPAvailabilityRange,
models_v2.IPAllocationPool).join( models_v2.IPAllocationPool).join(
models_v2.IPAllocationPool) models_v2.IPAllocationPool).with_lockmode('update')
results = range_qry.filter_by(subnet_id=subnet_id).all() results = range_qry.filter_by(subnet_id=subnet_id).all()
for (range, pool) in results: for (range, pool) in results:
first = int(netaddr.IPAddress(range['first_ip'])) first = int(netaddr.IPAddress(range['first_ip']))
@ -1356,7 +1361,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
def _delete_port(self, context, id): def _delete_port(self, context, id):
port = self._get_port(context, id) port = self._get_port(context, id)
allocated_qry = context.session.query(models_v2.IPAllocation) allocated_qry = context.session.query(
models_v2.IPAllocation).with_lockmode('update')
# recycle all of the IP's # recycle all of the IP's
allocated = allocated_qry.filter_by(port_id=id).all() allocated = allocated_qry.filter_by(port_id=id).all()
if allocated: if allocated:

View File

@ -111,6 +111,7 @@ def reserve_network(session):
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
state = (session.query(l2network_models_v2.NetworkState). state = (session.query(l2network_models_v2.NetworkState).
filter_by(allocated=False). filter_by(allocated=False).
with_lockmode('update').
first()) first())
if not state: if not state:
raise q_exc.NoNetworkAvailable() raise q_exc.NoNetworkAvailable()
@ -128,6 +129,7 @@ def reserve_specific_network(session, physical_network, vlan_id):
state = (session.query(l2network_models_v2.NetworkState). state = (session.query(l2network_models_v2.NetworkState).
filter_by(physical_network=physical_network, filter_by(physical_network=physical_network,
vlan_id=vlan_id). vlan_id=vlan_id).
with_lockmode('update').
one()) one())
if state.allocated: if state.allocated:
if vlan_id == constants.FLAT_VLAN_ID: if vlan_id == constants.FLAT_VLAN_ID:
@ -153,6 +155,7 @@ def release_network(session, physical_network, vlan_id, network_vlan_ranges):
state = (session.query(l2network_models_v2.NetworkState). state = (session.query(l2network_models_v2.NetworkState).
filter_by(physical_network=physical_network, filter_by(physical_network=physical_network,
vlan_id=vlan_id). vlan_id=vlan_id).
with_lockmode('update').
one()) one())
state.allocated = False state.allocated = False
inside = False inside = False

View File

@ -130,6 +130,7 @@ def reserve_vlan(session):
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
alloc = (session.query(ovs_models_v2.VlanAllocation). alloc = (session.query(ovs_models_v2.VlanAllocation).
filter_by(allocated=False). filter_by(allocated=False).
with_lockmode('update').
first()) first())
if alloc: if alloc:
LOG.debug(_("Reserving vlan %(vlan_id)s on physical network " LOG.debug(_("Reserving vlan %(vlan_id)s on physical network "
@ -147,6 +148,7 @@ def reserve_specific_vlan(session, physical_network, vlan_id):
alloc = (session.query(ovs_models_v2.VlanAllocation). alloc = (session.query(ovs_models_v2.VlanAllocation).
filter_by(physical_network=physical_network, filter_by(physical_network=physical_network,
vlan_id=vlan_id). vlan_id=vlan_id).
with_lockmode('update').
one()) one())
if alloc.allocated: if alloc.allocated:
if vlan_id == constants.FLAT_VLAN_ID: if vlan_id == constants.FLAT_VLAN_ID:
@ -173,6 +175,7 @@ def release_vlan(session, physical_network, vlan_id, network_vlan_ranges):
alloc = (session.query(ovs_models_v2.VlanAllocation). alloc = (session.query(ovs_models_v2.VlanAllocation).
filter_by(physical_network=physical_network, filter_by(physical_network=physical_network,
vlan_id=vlan_id). vlan_id=vlan_id).
with_lockmode('update').
one()) one())
alloc.allocated = False alloc.allocated = False
inside = False inside = False
@ -237,6 +240,7 @@ def get_tunnel_allocation(tunnel_id):
try: try:
alloc = (session.query(ovs_models_v2.TunnelAllocation). alloc = (session.query(ovs_models_v2.TunnelAllocation).
filter_by(tunnel_id=tunnel_id). filter_by(tunnel_id=tunnel_id).
with_lockmode('update').
one()) one())
return alloc return alloc
except exc.NoResultFound: except exc.NoResultFound:
@ -247,6 +251,7 @@ def reserve_tunnel(session):
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
alloc = (session.query(ovs_models_v2.TunnelAllocation). alloc = (session.query(ovs_models_v2.TunnelAllocation).
filter_by(allocated=False). filter_by(allocated=False).
with_lockmode('update').
first()) first())
if alloc: if alloc:
LOG.debug(_("Reserving tunnel %s from pool"), alloc.tunnel_id) LOG.debug(_("Reserving tunnel %s from pool"), alloc.tunnel_id)
@ -260,6 +265,7 @@ def reserve_specific_tunnel(session, tunnel_id):
try: try:
alloc = (session.query(ovs_models_v2.TunnelAllocation). alloc = (session.query(ovs_models_v2.TunnelAllocation).
filter_by(tunnel_id=tunnel_id). filter_by(tunnel_id=tunnel_id).
with_lockmode('update').
one()) one())
if alloc.allocated: if alloc.allocated:
raise q_exc.TunnelIdInUse(tunnel_id=tunnel_id) raise q_exc.TunnelIdInUse(tunnel_id=tunnel_id)
@ -278,6 +284,7 @@ def release_tunnel(session, tunnel_id, tunnel_id_ranges):
try: try:
alloc = (session.query(ovs_models_v2.TunnelAllocation). alloc = (session.query(ovs_models_v2.TunnelAllocation).
filter_by(tunnel_id=tunnel_id). filter_by(tunnel_id=tunnel_id).
with_lockmode('update').
one()) one())
alloc.allocated = False alloc.allocated = False
inside = False inside = False
@ -368,7 +375,7 @@ def add_tunnel_endpoint(ip):
session = db.get_session() session = db.get_session()
try: try:
tunnel = (session.query(ovs_models_v2.TunnelEndpoint). tunnel = (session.query(ovs_models_v2.TunnelEndpoint).
filter_by(ip_address=ip).one()) filter_by(ip_address=ip).with_lockmode('update').one())
except exc.NoResultFound: except exc.NoResultFound:
id = _generate_tunnel_id(session) id = _generate_tunnel_id(session)
tunnel = ovs_models_v2.TunnelEndpoint(ip, id) tunnel = ovs_models_v2.TunnelEndpoint(ip, id)