Fixed concurrent PUT requests to accounts or containers

When concurrent PUT requests come in for an account or container, the
resulting DB access will try to lock the DB for writing. Normal access
will retry when it encounters a locked DB, change 0fdad0d9 introduced
a cursor for doing the initialization which did not have the retry
capability, resulting in a hard failure.

Fixes bug 1243973

Change-Id: I73b219e0f5eacf314d87b4d5e56c03daf51b2eca
This commit is contained in:
Steven Lang 2013-10-23 11:35:57 -07:00
parent 8255810e7d
commit ef7f9e27a2
2 changed files with 47 additions and 4 deletions

View File

@ -89,22 +89,38 @@ class GreenDBConnection(sqlite3.Connection):
self.db_file = args[0] if args else'-'
sqlite3.Connection.__init__(self, *args, **kwargs)
def cursor(self, cls=None):
if cls is None:
cls = GreenDBCursor
return sqlite3.Connection.cursor(self, cls)
class GreenDBCursor(sqlite3.Cursor):
"""SQLite Cursor handler that plays well with eventlet."""
def __init__(self, *args, **kwargs):
self.timeout = args[0].timeout
self.db_file = args[0].db_file
sqlite3.Cursor.__init__(self, *args, **kwargs)
def _timeout(self, call):
with LockTimeout(self.timeout, self.db_file):
retry_wait = 0.001
while True:
try:
return call()
except sqlite3.OperationalError as e:
if 'locked' not in str(e):
raise
sleep(0.05)
sleep(retry_wait)
retry_wait = min(retry_wait * 2, 0.05)
def execute(self, *args, **kwargs):
return self._timeout(lambda: sqlite3.Connection.execute(
return self._timeout(lambda: sqlite3.Cursor.execute(
self, *args, **kwargs))
def commit(self):
return self._timeout(lambda: sqlite3.Connection.commit(self))
return self._timeout(lambda: sqlite3.Cursor.commit(self))
def dict_factory(crs, row):

View File

@ -23,7 +23,9 @@ from uuid import uuid4
import simplejson
import sqlite3
from mock import patch
from mock import patch, MagicMock
from eventlet.timeout import Timeout
import swift.common.db
from swift.common.db import chexor, dict_factory, get_db_connection, \
@ -90,6 +92,31 @@ class TestGetDBConnection(unittest.TestCase):
self.assertRaises(DatabaseConnectionError, get_db_connection,
'invalid database path / name')
def test_locked_db(self):
# This test is dependant on the code under test calling execute and
# commit as sqlite3.<Connection/Cursor>.<execute/commit> in a subclass.
class InterceptConnection(sqlite3.Connection):
pass
class InterceptCursor(sqlite3.Cursor):
pass
db_error = sqlite3.OperationalError('database is locked')
mock_db_cmd = MagicMock(side_effect=db_error)
InterceptConnection.execute = mock_db_cmd
InterceptConnection.commit = mock_db_cmd
InterceptCursor.execute = mock_db_cmd
InterceptCursor.commit = mock_db_cmd
with patch.multiple('sqlite3', Connection=InterceptConnection,
Cursor=InterceptCursor):
self.assertRaises(Timeout, get_db_connection, ':memory:',
timeout=0.1)
self.assertTrue(mock_db_cmd.called)
self.assertEqual(mock_db_cmd.call_args_list,
list((mock_db_cmd.call_args,) *
mock_db_cmd.call_count))
class TestDatabaseBroker(unittest.TestCase):