Fix a race condition in add_tunnel_endpoint()

If there are multiple OVS agents concurrently executing
'tunnel_sync' RPC call a race condition can occur
leading to insertion of two different TunnelEndpoint
entries having the same 'id' value.

Unfortunately, we can not rely on:
  - @lockutils.synchronized(), because a Neutron installation can use
    more than one API node
  - with_lockmode('update'), because it works differently in PostgreSQL
    comparing to MySQL and doesn't guarantee that no new rows have been
    added to the table since the select query was issued. Please take a
    look at http://www.postgresql.org/files/developer/concurrency.pdf for
    more details.

The proposed fix:
  - ensures there is a unique constraint set for 'id' column
  - wraps creation of a new TunnelEndpoint entry into a
    repeatedly executed transactional block (so even if a concurrent
    DB transaction has been flushed or commited earlier than this one
    we can handle an integrity error and try again, in spite of the
    specified transactions isolation level value)

Fixes bug 1167916

Change-Id: I62dc729d595f090436199d5e1b6b98a884ead7a5
This commit is contained in:
Roman Podolyaka 2013-04-30 15:12:15 +03:00 committed by Mark McClain
parent 74631c6c85
commit 08bec8a2be
4 changed files with 128 additions and 12 deletions

View File

@ -0,0 +1,64 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Add unique constraint for id column of TunnelEndpoint
Revision ID: 63afba73813
Revises: 3c6e57a23db4
Create Date: 2013-04-30 13:53:31.717450
"""
# revision identifiers, used by Alembic.
revision = '63afba73813'
down_revision = '3c6e57a23db4'
# Change to ['*'] if this migration applies to all plugins
migration_for_plugins = [
'neutron.plugins.openvswitch.ovs_neutron_plugin.OVSNeutronPluginV2',
]
from alembic import op
from neutron.db import migration
CONSTRAINT_NAME = 'uniq_ovs_tunnel_endpoints0id'
TABLE_NAME = 'ovs_tunnel_endpoints'
def upgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.create_unique_constraint(
name=CONSTRAINT_NAME,
source=TABLE_NAME,
local_cols=['id']
)
def downgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.drop_constraint(
name=CONSTRAINT_NAME,
tablename=TABLE_NAME,
type='unique'
)

View File

@ -16,8 +16,8 @@
# @author: Aaron Rosen, Nicira Networks, Inc.
# @author: Bob Kukura, Red Hat, Inc.
from sqlalchemy import func
from sqlalchemy.orm import exc
from sqlalchemy.sql import func
from neutron.common import exceptions as q_exc
import neutron.db.api as db
@ -25,6 +25,7 @@ from neutron.db import models_v2
from neutron.db import securitygroups_db as sg_db
from neutron.extensions import securitygroup as ext_sg
from neutron import manager
from neutron.openstack.common.db import exception as db_exc
from neutron.openstack.common import log as logging
from neutron.plugins.openvswitch.common import constants
from neutron.plugins.openvswitch import ovs_models_v2
@ -367,14 +368,33 @@ def _generate_tunnel_id(session):
return max_tunnel_id + 1
def add_tunnel_endpoint(ip):
session = db.get_session()
try:
tunnel = (session.query(ovs_models_v2.TunnelEndpoint).
filter_by(ip_address=ip).with_lockmode('update').one())
except exc.NoResultFound:
tunnel_id = _generate_tunnel_id(session)
tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id)
session.add(tunnel)
session.flush()
return tunnel
def add_tunnel_endpoint(ip, max_retries=10):
"""Return the endpoint of the given IP address or generate a new one."""
# NOTE(rpodolyaka): generation of a new tunnel endpoint must be put into a
# repeatedly executed transactional block to ensure it
# doesn't conflict with any other concurrently executed
# DB transactions in spite of the specified transactions
# isolation level value
for i in xrange(max_retries):
LOG.debug(_('Adding a tunnel endpoint for %s'), ip)
try:
session = db.get_session()
with session.begin(subtransactions=True):
tunnel = (session.query(ovs_models_v2.TunnelEndpoint).
filter_by(ip_address=ip).with_lockmode('update').
first())
if tunnel is None:
tunnel_id = _generate_tunnel_id(session)
tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id)
session.add(tunnel)
return tunnel
except db_exc.DBDuplicateEntry:
# a concurrent transaction has been commited, try again
LOG.debug(_('Adding a tunnel endpoint failed due to a concurrent'
'transaction had been commited (%s attempts left)'),
max_retries - (i + 1))
raise q_exc.NeutronException(message='Unable to generate a new tunnel id')

View File

@ -18,6 +18,7 @@
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.schema import UniqueConstraint
from neutron.db.models_v2 import model_base
@ -86,6 +87,9 @@ class NetworkBinding(model_base.BASEV2):
class TunnelEndpoint(model_base.BASEV2):
"""Represents tunnel endpoint in RPC mode."""
__tablename__ = 'ovs_tunnel_endpoints'
__table_args__ = (
UniqueConstraint('id', name='uniq_ovs_tunnel_endpoints0id'),
)
ip_address = Column(String(64), primary_key=True)
id = Column(Integer, nullable=False)

View File

@ -13,12 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
from testtools import matchers
from neutron.common import exceptions as q_exc
from neutron.db import api as db
from neutron.openstack.common.db import exception as db_exc
from neutron.openstack.common.db.sqlalchemy import session
from neutron.plugins.openvswitch import ovs_db_v2
from neutron.plugins.openvswitch import ovs_models_v2 as ovs_models
from neutron.tests import base
from neutron.tests.unit import test_db_plugin as test_plugin
@ -262,6 +266,30 @@ class TunnelAllocationsTest(base.BaseTestCase):
ovs_db_v2.release_tunnel(self.session, tunnel_id, TUNNEL_RANGES)
self.assertIsNone(ovs_db_v2.get_tunnel_allocation(tunnel_id))
def test_add_tunnel_endpoint_create_new_endpoint(self):
addr = '10.0.0.1'
ovs_db_v2.add_tunnel_endpoint(addr)
self.assertIsNotNone(self.session.query(ovs_models.TunnelEndpoint).
filter_by(ip_address=addr).first())
def test_add_tunnel_endpoint_retrieve_an_existing_endpoint(self):
addr = '10.0.0.1'
self.session.add(ovs_models.TunnelEndpoint(ip_address=addr, id=1))
self.session.flush()
tunnel = ovs_db_v2.add_tunnel_endpoint(addr)
self.assertEquals(tunnel.id, 1)
self.assertEquals(tunnel.ip_address, addr)
def test_add_tunnel_endpoint_handle_duplicate_error(self):
with mock.patch.object(session.Session, 'query') as query_mock:
error = db_exc.DBDuplicateEntry(['id'])
query_mock.side_effect = error
with testtools.ExpectedException(q_exc.NeutronException):
ovs_db_v2.add_tunnel_endpoint('10.0.0.1', 5)
self.assertEquals(query_mock.call_count, 5)
class NetworkBindingsTest(test_plugin.NeutronDbPluginV2TestCase):
def setUp(self):