From a44635ca9767d8ee737e0189063cf3b5b3842285 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: Sat, 29 Jan 2011 00:54:12 +0000 Subject: [PATCH 1/9] support WAL journaling instead of .pending files --- swift/common/db.py | 195 +++++++-------------------------------------- 1 file changed, 29 insertions(+), 166 deletions(-) diff --git a/swift/common/db.py b/swift/common/db.py index be96411619..36ef1f3c91 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -33,7 +33,7 @@ import simplejson as json import sqlite3 from swift.common.utils import normalize_timestamp, renamer, \ - mkdirs, lock_parent_directory, fallocate + mkdirs, lock_parent_directory from swift.common.exceptions import LockTimeout @@ -41,8 +41,7 @@ from swift.common.exceptions import LockTimeout BROKER_TIMEOUT = 25 #: Pickle protocol to use PICKLE_PROTOCOL = 2 -#: Max number of pending entries -PENDING_CAP = 131072 +PENDING_COMMIT_TIMEOUT = 900 class DatabaseConnectionError(sqlite3.DatabaseError): @@ -139,7 +138,7 @@ def get_db_connection(path, timeout=30, okay_to_create=False): conn.execute('PRAGMA synchronous = NORMAL') conn.execute('PRAGMA count_changes = OFF') conn.execute('PRAGMA temp_store = MEMORY') - conn.execute('PRAGMA journal_mode = DELETE') + conn.execute('PRAGMA journal_mode = WAL') conn.create_function('chexor', 3, chexor) except sqlite3.DatabaseError: import traceback @@ -152,13 +151,10 @@ class DatabaseBroker(object): """Encapsulates working with a database.""" def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None, - account=None, container=None, pending_timeout=10, - stale_reads_ok=False): + account=None, container=None, stale_reads_ok=False): """ Encapsulates working with a database. """ self.conn = None self.db_file = db_file - self.pending_file = self.db_file + '.pending' - self.pending_timeout = pending_timeout self.stale_reads_ok = stale_reads_ok self.db_dir = os.path.dirname(db_file) self.timeout = timeout @@ -233,7 +229,7 @@ class DatabaseBroker(object): conn.close() with open(tmp_db_file, 'r+b') as fp: os.fsync(fp.fileno()) - with lock_parent_directory(self.db_file, self.pending_timeout): + with lock_parent_directory(self.db_file, self.timeout): if os.path.exists(self.db_file): # It's as if there was a "condition" where different parts # of the system were "racing" each other. @@ -348,11 +344,6 @@ class DatabaseBroker(object): :param count: number to get :returns: list of objects between start and end """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise with self.get() as conn: curs = conn.execute(''' SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ? @@ -401,11 +392,7 @@ class DatabaseBroker(object): :returns: dict containing keys: hash, id, created_at, put_timestamp, delete_timestamp, count, max_row, and metadata """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts() query_part1 = ''' SELECT hash, id, created_at, put_timestamp, delete_timestamp, %s_count AS count, @@ -455,34 +442,6 @@ class DatabaseBroker(object): (rec['sync_point'], rec['remote_id'])) conn.commit() - def _preallocate(self): - """ - The idea is to allocate space in front of an expanding db. If it gets - within 512k of a boundary, it allocates to the next boundary. - Boundaries are 2m, 5m, 10m, 25m, 50m, then every 50m after. - """ - if self.db_file == ':memory:': - return - MB = (1024 * 1024) - - def prealloc_points(): - for pm in (1, 2, 5, 10, 25, 50): - yield pm * MB - while True: - pm += 50 - yield pm * MB - - stat = os.stat(self.db_file) - file_size = stat.st_size - allocated_size = stat.st_blocks * 512 - for point in prealloc_points(): - if file_size <= point - MB / 2: - prealloc_size = point - break - if allocated_size < prealloc_size: - with open(self.db_file, 'rb+') as fp: - fallocate(fp.fileno(), int(prealloc_size)) - @property def metadata(self): """ @@ -717,11 +676,6 @@ class ContainerBroker(DatabaseBroker): :returns: True if the database has no active objects, False otherwise """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise with self.get() as conn: row = conn.execute( 'SELECT object_count from container_stat').fetchone() @@ -729,17 +683,16 @@ class ContainerBroker(DatabaseBroker): def _commit_puts(self, item_list=None): """Handles commiting rows in .pending files.""" - if self.db_file == ':memory:' or not os.path.exists(self.pending_file): + pending_file = self.db_file + '.pending' + if self.db_file == ':memory:' or not os.path.exists(pending_file): + return + if not os.path.getsize(pending_file): + os.unlink(pending_file) return if item_list is None: item_list = [] - with lock_parent_directory(self.pending_file, self.pending_timeout): - self._preallocate() - if not os.path.getsize(self.pending_file): - if item_list: - self.merge_items(item_list) - return - with open(self.pending_file, 'r+b') as fp: + with lock_parent_directory(pending_file, PENDING_COMMIT_TIMEOUT): + with open(pending_file, 'r+b') as fp: for entry in fp.read().split(':'): if entry: try: @@ -752,11 +705,11 @@ class ContainerBroker(DatabaseBroker): except Exception: self.logger.exception( _('Invalid pending entry %(file)s: %(entry)s'), - {'file': self.pending_file, 'entry': entry}) + {'file': pending_file, 'entry': entry}) if item_list: self.merge_items(item_list) try: - os.ftruncate(fp.fileno(), 0) + os.unlink(pending_file) except OSError, err: if err.errno != errno.ENOENT: raise @@ -774,7 +727,6 @@ class ContainerBroker(DatabaseBroker): delete :param sync_timestamp: max update_at timestamp of sync rows to delete """ - self._commit_puts() with self.get() as conn: conn.execute(""" DELETE FROM object @@ -818,30 +770,9 @@ class ContainerBroker(DatabaseBroker): record = {'name': name, 'created_at': timestamp, 'size': size, 'content_type': content_type, 'etag': etag, 'deleted': deleted} - if self.db_file == ':memory:': - self.merge_items([record]) - return - if not os.path.exists(self.db_file): + if self.db_file != ':memory:' and not os.path.exists(self.db_file): raise DatabaseConnectionError(self.db_file, "DB doesn't exist") - pending_size = 0 - try: - pending_size = os.path.getsize(self.pending_file) - except OSError, err: - if err.errno != errno.ENOENT: - raise - if pending_size > PENDING_CAP: - self._commit_puts([record]) - else: - with lock_parent_directory( - self.pending_file, self.pending_timeout): - with open(self.pending_file, 'a+b') as fp: - # Colons aren't used in base64 encoding; so they are our - # delimiter - fp.write(':') - fp.write(pickle.dumps( - (name, timestamp, size, content_type, etag, deleted), - protocol=PICKLE_PROTOCOL).encode('base64')) - fp.flush() + self.merge_items([record]) def is_deleted(self, timestamp=None): """ @@ -851,11 +782,6 @@ class ContainerBroker(DatabaseBroker): """ if self.db_file != ':memory:' and not os.path.exists(self.db_file): return True - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise with self.get() as conn: row = conn.execute(''' SELECT put_timestamp, delete_timestamp, object_count @@ -878,11 +804,6 @@ class ContainerBroker(DatabaseBroker): reported_put_timestamp, reported_delete_timestamp, reported_object_count, reported_bytes_used, hash, id) """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise with self.get() as conn: return conn.execute(''' SELECT account, container, created_at, put_timestamp, @@ -919,11 +840,6 @@ class ContainerBroker(DatabaseBroker): :returns: list of object names """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise rv = [] with self.get() as conn: row = conn.execute(''' @@ -960,11 +876,6 @@ class ContainerBroker(DatabaseBroker): :returns: list of tuples of (name, created_at, size, content_type, etag) """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise if path is not None: prefix = path if path: @@ -1193,17 +1104,16 @@ class AccountBroker(DatabaseBroker): def _commit_puts(self, item_list=None): """Handles commiting rows in .pending files.""" - if self.db_file == ':memory:' or not os.path.exists(self.pending_file): + pending_file = self.db_file + '.pending' + if self.db_file == ':memory:' or not os.path.exists(pending_file): + return + if not os.path.getsize(pending_file): + os.unlink(pending_file) return if item_list is None: item_list = [] - with lock_parent_directory(self.pending_file, self.pending_timeout): - self._preallocate() - if not os.path.getsize(self.pending_file): - if item_list: - self.merge_items(item_list) - return - with open(self.pending_file, 'r+b') as fp: + with lock_parent_directory(pending_file, PENDING_COMMIT_TIMEOUT): + with open(pending_file, 'r+b') as fp: for entry in fp.read().split(':'): if entry: try: @@ -1219,11 +1129,11 @@ class AccountBroker(DatabaseBroker): except Exception: self.logger.exception( _('Invalid pending entry %(file)s: %(entry)s'), - {'file': self.pending_file, 'entry': entry}) + {'file': pending_file, 'entry': entry}) if item_list: self.merge_items(item_list) try: - os.ftruncate(fp.fileno(), 0) + os.unlink(pending_file) except OSError, err: if err.errno != errno.ENOENT: raise @@ -1234,11 +1144,6 @@ class AccountBroker(DatabaseBroker): :returns: True if the database has no active containers. """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise with self.get() as conn: row = conn.execute( 'SELECT container_count from account_stat').fetchone() @@ -1258,7 +1163,6 @@ class AccountBroker(DatabaseBroker): :param sync_timestamp: max update_at timestamp of sync rows to delete """ - self._commit_puts() with self.get() as conn: conn.execute(''' DELETE FROM container WHERE @@ -1286,11 +1190,6 @@ class AccountBroker(DatabaseBroker): :returns: put_timestamp of the container """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise with self.get() as conn: ret = conn.execute(''' SELECT put_timestamp FROM container @@ -1311,6 +1210,8 @@ class AccountBroker(DatabaseBroker): :param object_count: number of objects in the container :param bytes_used: number of bytes used by the container """ + if self.db_file != ':memory:' and not os.path.exists(self.db_file): + raise DatabaseConnectionError(self.db_file, "DB doesn't exist") if delete_timestamp > put_timestamp and \ object_count in (None, '', 0, '0'): deleted = 1 @@ -1321,24 +1222,7 @@ class AccountBroker(DatabaseBroker): 'object_count': object_count, 'bytes_used': bytes_used, 'deleted': deleted} - if self.db_file == ':memory:': - self.merge_items([record]) - return - commit = False - with lock_parent_directory(self.pending_file, self.pending_timeout): - with open(self.pending_file, 'a+b') as fp: - # Colons aren't used in base64 encoding; so they are our - # delimiter - fp.write(':') - fp.write(pickle.dumps( - (name, put_timestamp, delete_timestamp, object_count, - bytes_used, deleted), - protocol=PICKLE_PROTOCOL).encode('base64')) - fp.flush() - if fp.tell() > PENDING_CAP: - commit = True - if commit: - self._commit_puts() + self.merge_items([record]) def can_delete_db(self, cutoff): """ @@ -1346,7 +1230,6 @@ class AccountBroker(DatabaseBroker): :returns: True if the account can be deleted, False otherwise """ - self._commit_puts() with self.get() as conn: row = conn.execute(''' SELECT status, put_timestamp, delete_timestamp, container_count @@ -1372,11 +1255,6 @@ class AccountBroker(DatabaseBroker): """ if self.db_file != ':memory:' and not os.path.exists(self.db_file): return True - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise with self.get() as conn: row = conn.execute(''' SELECT put_timestamp, delete_timestamp, container_count, status @@ -1401,11 +1279,6 @@ class AccountBroker(DatabaseBroker): delete_timestamp, container_count, object_count, bytes_used, hash, id) """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise with self.get() as conn: return conn.execute(''' SELECT account, created_at, put_timestamp, delete_timestamp, @@ -1422,11 +1295,6 @@ class AccountBroker(DatabaseBroker): :returns: list of container names """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise rv = [] with self.get() as conn: row = conn.execute(''' @@ -1460,11 +1328,6 @@ class AccountBroker(DatabaseBroker): :returns: list of tuples of (name, object_count, bytes_used, 0) """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise if delimiter and not prefix: prefix = '' orig_marker = marker From 4e100f6b325cbc5b2d83b4f3b622636ca25b069d Mon Sep 17 00:00:00 2001 From: Michael Barton Date: Sat, 29 Jan 2011 01:23:18 +0000 Subject: [PATCH 2/9] retry connect refactor --- swift/common/db.py | 49 +++++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/swift/common/db.py b/swift/common/db.py index 36ef1f3c91..ca667edf1a 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -27,6 +27,7 @@ import cPickle as pickle import errno from random import randint from tempfile import mkstemp +import traceback from eventlet import sleep import simplejson as json @@ -41,6 +42,7 @@ from swift.common.exceptions import LockTimeout BROKER_TIMEOUT = 25 #: Pickle protocol to use PICKLE_PROTOCOL = 2 +CONNECT_ATTEMPTS = 4 PENDING_COMMIT_TIMEOUT = 900 @@ -122,29 +124,32 @@ def get_db_connection(path, timeout=30, okay_to_create=False): :param okay_to_create: if True, create the DB if it doesn't exist :returns: DB connection object """ - try: - connect_time = time.time() - conn = sqlite3.connect(path, check_same_thread=False, - factory=GreenDBConnection, timeout=timeout) - if path != ':memory:' and not okay_to_create: + # retry logic to address: + # http://www.mail-archive.com/sqlite-users@sqlite.org/msg57092.html + for tries in xrange(1, CONNECT_ATTEMPTS + 1): + try: + connect_time = time.time() + conn = sqlite3.connect(path, check_same_thread=False, + factory=GreenDBConnection, timeout=timeout) # attempt to detect and fail when connect creates the db file - stat = os.stat(path) - if stat.st_size == 0 and stat.st_ctime >= connect_time: - os.unlink(path) - raise DatabaseConnectionError(path, - 'DB file created by connect?') - conn.row_factory = sqlite3.Row - conn.text_factory = str - conn.execute('PRAGMA synchronous = NORMAL') - conn.execute('PRAGMA count_changes = OFF') - conn.execute('PRAGMA temp_store = MEMORY') - conn.execute('PRAGMA journal_mode = WAL') - conn.create_function('chexor', 3, chexor) - except sqlite3.DatabaseError: - import traceback - raise DatabaseConnectionError(path, traceback.format_exc(), - timeout=timeout) - return conn + if path != ':memory:' and not okay_to_create: + stat = os.stat(path) + if stat.st_size == 0 and stat.st_ctime >= connect_time: + os.unlink(path) + raise DatabaseConnectionError(path, + 'DB file created by connect?') + conn.execute('PRAGMA synchronous = NORMAL') + conn.execute('PRAGMA count_changes = OFF') + conn.execute('PRAGMA temp_store = MEMORY') + conn.execute('PRAGMA journal_mode = WAL') + conn.create_function('chexor', 3, chexor) + conn.row_factory = sqlite3.Row + conn.text_factory = str + return conn + except sqlite3.DatabaseError, e: + if tries == CONNECT_ATTEMPTS or 'locking protocol' not in str(e): + raise DatabaseConnectionError(path, traceback.format_exc(), + timeout=timeout) class DatabaseBroker(object): From d83ce428afec5af180e5f85104da6242f8801fc1 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: Sat, 29 Jan 2011 01:40:55 +0000 Subject: [PATCH 3/9] increase WAL autocheckpoint --- swift/common/db.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/swift/common/db.py b/swift/common/db.py index ca667edf1a..e06739d85f 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -44,6 +44,7 @@ BROKER_TIMEOUT = 25 PICKLE_PROTOCOL = 2 CONNECT_ATTEMPTS = 4 PENDING_COMMIT_TIMEOUT = 900 +AUTOCHECKPOINT = 8192 class DatabaseConnectionError(sqlite3.DatabaseError): @@ -142,6 +143,7 @@ def get_db_connection(path, timeout=30, okay_to_create=False): conn.execute('PRAGMA count_changes = OFF') conn.execute('PRAGMA temp_store = MEMORY') conn.execute('PRAGMA journal_mode = WAL') + conn.execute('PRAGMA wal_autocheckpoint = %s' % AUTOCHECKPOINT) conn.create_function('chexor', 3, chexor) conn.row_factory = sqlite3.Row conn.text_factory = str From 0649d9cc602baaacdd428e3455df4f6a9254e681 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: Sat, 29 Jan 2011 03:26:26 +0000 Subject: [PATCH 4/9] replication fixes for WAL --- swift/common/db.py | 2 ++ swift/common/db_replicator.py | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/swift/common/db.py b/swift/common/db.py index e06739d85f..0f288b74a1 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -288,6 +288,7 @@ class DatabaseBroker(object): self.conn = None orig_isolation_level = conn.isolation_level conn.isolation_level = None + conn.execute('PRAGMA journal_mode = DELETE') # remove any journal files conn.execute('BEGIN IMMEDIATE') try: yield True @@ -295,6 +296,7 @@ class DatabaseBroker(object): pass try: conn.execute('ROLLBACK') + conn.execute('PRAGMA journal_mode = WAL') # back to WAL mode conn.isolation_level = orig_isolation_level self.conn = conn except Exception: diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 49756f1f7b..01a7d202de 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -180,7 +180,9 @@ class Replicator(Daemon): return False # perform block-level sync if the db was modified during the first sync if os.path.exists(broker.db_file + '-journal') or \ - os.path.getmtime(broker.db_file) > mtime: + os.path.exists(broker.db_file + '-wal') or \ + os.path.exists(broker.db_file + '-shm') or \ + os.path.getmtime(broker.db_file) > mtime: # grab a lock so nobody else can modify it with broker.lock(): if not self._rsync_file(broker.db_file, remote_file, False): From 68cda9b72446df358120bd9fed00a8804e960375 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: Sat, 29 Jan 2011 18:22:16 +0000 Subject: [PATCH 5/9] refactor db open retry loop slightly --- swift/common/db.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/swift/common/db.py b/swift/common/db.py index 0f288b74a1..1e0057908c 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -127,7 +127,7 @@ def get_db_connection(path, timeout=30, okay_to_create=False): """ # retry logic to address: # http://www.mail-archive.com/sqlite-users@sqlite.org/msg57092.html - for tries in xrange(1, CONNECT_ATTEMPTS + 1): + for attempt in xrange(CONNECT_ATTEMPTS): try: connect_time = time.time() conn = sqlite3.connect(path, check_same_thread=False, @@ -139,19 +139,18 @@ def get_db_connection(path, timeout=30, okay_to_create=False): os.unlink(path) raise DatabaseConnectionError(path, 'DB file created by connect?') + conn.execute('PRAGMA journal_mode = WAL') conn.execute('PRAGMA synchronous = NORMAL') + conn.execute('PRAGMA wal_autocheckpoint = %s' % AUTOCHECKPOINT) conn.execute('PRAGMA count_changes = OFF') conn.execute('PRAGMA temp_store = MEMORY') - conn.execute('PRAGMA journal_mode = WAL') - conn.execute('PRAGMA wal_autocheckpoint = %s' % AUTOCHECKPOINT) conn.create_function('chexor', 3, chexor) conn.row_factory = sqlite3.Row conn.text_factory = str return conn except sqlite3.DatabaseError, e: - if tries == CONNECT_ATTEMPTS or 'locking protocol' not in str(e): - raise DatabaseConnectionError(path, traceback.format_exc(), - timeout=timeout) + errstr = traceback.format_exc() + raise DatabaseConnectionError(path, errstr, timeout=timeout) class DatabaseBroker(object): From 625255da39d0dda986c47f4390343513a34e5943 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: Sat, 29 Jan 2011 19:26:06 +0000 Subject: [PATCH 6/9] remove pending_timeout references --- swift/account/server.py | 7 ------- swift/common/db.py | 3 +-- swift/common/db_replicator.py | 2 +- swift/container/server.py | 4 ---- 4 files changed, 2 insertions(+), 14 deletions(-) diff --git a/swift/account/server.py b/swift/account/server.py index 2c83f51cc6..94399eec22 100644 --- a/swift/account/server.py +++ b/swift/account/server.py @@ -86,8 +86,6 @@ class AccountController(object): return Response(status='507 %s is not mounted' % drive) broker = self._get_account_broker(drive, part, account) if container: # put account container - if 'x-cf-trans-id' in req.headers: - broker.pending_timeout = 3 if req.headers.get('x-account-override-deleted', 'no').lower() != \ 'yes' and broker.is_deleted(): return HTTPNotFound(request=req) @@ -140,9 +138,6 @@ class AccountController(object): if self.mount_check and not check_mount(self.root, drive): return Response(status='507 %s is not mounted' % drive) broker = self._get_account_broker(drive, part, account) - if not container: - broker.pending_timeout = 0.1 - broker.stale_reads_ok = True if broker.is_deleted(): return HTTPNotFound(request=req) info = broker.get_info() @@ -171,8 +166,6 @@ class AccountController(object): if self.mount_check and not check_mount(self.root, drive): return Response(status='507 %s is not mounted' % drive) broker = self._get_account_broker(drive, part, account) - broker.pending_timeout = 0.1 - broker.stale_reads_ok = True if broker.is_deleted(): return HTTPNotFound(request=req) info = broker.get_info() diff --git a/swift/common/db.py b/swift/common/db.py index 1e0057908c..7040b2446a 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -157,11 +157,10 @@ class DatabaseBroker(object): """Encapsulates working with a database.""" def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None, - account=None, container=None, stale_reads_ok=False): + account=None, container=None): """ Encapsulates working with a database. """ self.conn = None self.db_file = db_file - self.stale_reads_ok = stale_reads_ok self.db_dir = os.path.dirname(db_file) self.timeout = timeout self.logger = logger or logging.getLogger() diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 01a7d202de..5c4d4ebd8e 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -318,7 +318,7 @@ class Replicator(Daemon): self.logger.debug(_('Replicating db %s'), object_file) self.stats['attempted'] += 1 try: - broker = self.brokerclass(object_file, pending_timeout=30) + broker = self.brokerclass(object_file) broker.reclaim(time.time() - self.reclaim_age, time.time() - (self.reclaim_age * 2)) info = broker.get_replication_info() diff --git a/swift/container/server.py b/swift/container/server.py index cfcdded1e4..9a6b4aa210 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -219,8 +219,6 @@ class ContainerController(object): if self.mount_check and not check_mount(self.root, drive): return Response(status='507 %s is not mounted' % drive) broker = self._get_container_broker(drive, part, account, container) - broker.pending_timeout = 0.1 - broker.stale_reads_ok = True if broker.is_deleted(): return HTTPNotFound(request=req) info = broker.get_info() @@ -246,8 +244,6 @@ class ContainerController(object): if self.mount_check and not check_mount(self.root, drive): return Response(status='507 %s is not mounted' % drive) broker = self._get_container_broker(drive, part, account, container) - broker.pending_timeout = 0.1 - broker.stale_reads_ok = True if broker.is_deleted(): return HTTPNotFound(request=req) info = broker.get_info() From ee4a9a85ac8763b14deb9c55e6c9be2a163bb5a8 Mon Sep 17 00:00:00 2001 From: gholt Date: Fri, 4 Feb 2011 11:16:21 -0800 Subject: [PATCH 7/9] Indexing and integrity changes in dbs. --- swift/common/db.py | 125 +++++++++++++++++++++++++-------------------- 1 file changed, 69 insertions(+), 56 deletions(-) diff --git a/swift/common/db.py b/swift/common/db.py index 7040b2446a..4327ffa311 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -166,6 +166,7 @@ class DatabaseBroker(object): self.logger = logger or logging.getLogger() self.account = account self.container = container + self.db_version = -1 def initialize(self, put_timestamp=None): """ @@ -573,7 +574,7 @@ class ContainerBroker(DatabaseBroker): conn.executescript(""" CREATE TABLE object ( ROWID INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT UNIQUE, + name TEXT, created_at TEXT, size INTEGER, content_type TEXT, @@ -581,7 +582,7 @@ class ContainerBroker(DatabaseBroker): deleted INTEGER DEFAULT 0 ); - CREATE INDEX ix_object_deleted ON object (deleted); + CREATE INDEX ix_object_deleted_name ON object (deleted, name); CREATE TRIGGER object_insert AFTER INSERT ON object BEGIN @@ -812,6 +813,12 @@ class ContainerBroker(DatabaseBroker): reported_object_count, reported_bytes_used, hash, id) """ with self.get() as conn: + if self.db_version == -1: + self.db_version = 0 + for row in conn.execute(''' + SELECT name FROM sqlite_master + WHERE name = 'ix_object_deleted_name' '''): + self.db_version = 1 return conn.execute(''' SELECT account, container, created_at, put_timestamp, delete_timestamp, object_count, bytes_used, @@ -906,7 +913,10 @@ class ContainerBroker(DatabaseBroker): elif prefix: query += ' name >= ? AND' query_args.append(prefix) - query += ' +deleted = 0 ORDER BY name LIMIT ?' + if self.db_version < 1: + query += ' +deleted = 0 ORDER BY name LIMIT ?' + else: + query += ' deleted = 0 ORDER BY name LIMIT ?' query_args.append(limit - len(results)) curs = conn.execute(query, query_args) curs.row_factory = None @@ -954,18 +964,19 @@ class ContainerBroker(DatabaseBroker): max_rowid = -1 for rec in item_list: conn.execute(''' - DELETE FROM object WHERE name = ? AND - (created_at < ?) + DELETE FROM object WHERE name = ? AND created_at < ? AND + deleted IN (0, 1) ''', (rec['name'], rec['created_at'])) - try: + if not conn.execute(''' + SELECT name FROM object WHERE name = ? AND + deleted IN (0, 1) + ''', (rec['name'],)).fetchall(): conn.execute(''' INSERT INTO object (name, created_at, size, content_type, etag, deleted) VALUES (?, ?, ?, ?, ?, ?) ''', ([rec['name'], rec['created_at'], rec['size'], rec['content_type'], rec['etag'], rec['deleted']])) - except sqlite3.IntegrityError: - pass if source: max_rowid = max(max_rowid, rec['ROWID']) if source: @@ -1009,7 +1020,7 @@ class AccountBroker(DatabaseBroker): conn.executescript(""" CREATE TABLE container ( ROWID INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT UNIQUE, + name TEXT, put_timestamp TEXT, delete_timestamp TEXT, object_count INTEGER, @@ -1017,8 +1028,9 @@ class AccountBroker(DatabaseBroker): deleted INTEGER DEFAULT 0 ); - CREATE INDEX ix_container_deleted ON container (deleted); - CREATE INDEX ix_container_name ON container (name); + CREATE INDEX ix_container_deleted_name ON + container (deleted, name); + CREATE TRIGGER container_insert AFTER INSERT ON container BEGIN UPDATE account_stat @@ -1287,6 +1299,12 @@ class AccountBroker(DatabaseBroker): bytes_used, hash, id) """ with self.get() as conn: + if self.db_version == -1: + self.db_version = 0 + for row in conn.execute(''' + SELECT name FROM sqlite_master + WHERE name = 'ix_container_deleted_name' '''): + self.db_version = 1 return conn.execute(''' SELECT account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id @@ -1355,7 +1373,10 @@ class AccountBroker(DatabaseBroker): elif prefix: query += ' name >= ? AND' query_args.append(prefix) - query += ' +deleted = 0 ORDER BY name LIMIT ?' + if self.db_version < 1: + query += ' +deleted = 0 ORDER BY name LIMIT ?' + else: + query += ' deleted = 0 ORDER BY name LIMIT ?' query_args.append(limit - len(results)) curs = conn.execute(query, query_args) curs.row_factory = None @@ -1399,51 +1420,43 @@ class AccountBroker(DatabaseBroker): record = [rec['name'], rec['put_timestamp'], rec['delete_timestamp'], rec['object_count'], rec['bytes_used'], rec['deleted']] - try: + curs = conn.execute(''' + SELECT name, put_timestamp, delete_timestamp, + object_count, bytes_used, deleted + FROM container WHERE name = ? AND + (put_timestamp < ? OR delete_timestamp < ? OR + object_count != ? OR bytes_used != ?) AND + deleted IN (0, 1)''', + (rec['name'], rec['put_timestamp'], + rec['delete_timestamp'], rec['object_count'], + rec['bytes_used'])) + curs.row_factory = None + row = curs.fetchone() + if row: + row = list(row) + for i in xrange(5): + if record[i] is None and row[i] is not None: + record[i] = row[i] + if row[1] > record[1]: # Keep newest put_timestamp + record[1] = row[1] + if row[2] > record[2]: # Keep newest delete_timestamp + record[2] = row[2] conn.execute(''' - INSERT INTO container (name, put_timestamp, - delete_timestamp, object_count, bytes_used, - deleted) - VALUES (?, ?, ?, ?, ?, ?) - ''', record) - except sqlite3.IntegrityError: - curs = conn.execute(''' - SELECT name, put_timestamp, delete_timestamp, - object_count, bytes_used, deleted - FROM container WHERE name = ? AND - (put_timestamp < ? OR delete_timestamp < ? OR - object_count != ? OR bytes_used != ?)''', - (rec['name'], rec['put_timestamp'], - rec['delete_timestamp'], rec['object_count'], - rec['bytes_used'])) - curs.row_factory = None - row = curs.fetchone() - if row: - row = list(row) - for i in xrange(5): - if record[i] is None and row[i] is not None: - record[i] = row[i] - if row[1] > record[1]: # Keep newest put_timestamp - record[1] = row[1] - if row[2] > record[2]: # Keep newest delete_timestamp - record[2] = row[2] - conn.execute('DELETE FROM container WHERE name = ?', - (record[0],)) - # If deleted, mark as such - if record[2] > record[1] and \ - record[3] in (None, '', 0, '0'): - record[5] = 1 - else: - record[5] = 0 - try: - conn.execute(''' - INSERT INTO container (name, put_timestamp, - delete_timestamp, object_count, bytes_used, - deleted) - VALUES (?, ?, ?, ?, ?, ?) - ''', record) - except sqlite3.IntegrityError: - continue + DELETE FROM container WHERE name = ? AND + deleted IN (0, 1) + ''', (record[0],)) + # If deleted, mark as such + if record[2] > record[1] and \ + record[3] in (None, '', 0, '0'): + record[5] = 1 + else: + record[5] = 0 + conn.execute(''' + INSERT INTO container (name, put_timestamp, + delete_timestamp, object_count, bytes_used, + deleted) + VALUES (?, ?, ?, ?, ?, ?) + ''', record) if source: max_rowid = max(max_rowid, rec['ROWID']) if source: From 2fffdfede24f79df757faffd197afff32ca432eb Mon Sep 17 00:00:00 2001 From: gholt Date: Fri, 4 Feb 2011 11:37:35 -0800 Subject: [PATCH 8/9] Move db version resolution to its own function --- swift/common/db.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/swift/common/db.py b/swift/common/db.py index 4327ffa311..2341f8141f 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -166,7 +166,7 @@ class DatabaseBroker(object): self.logger = logger or logging.getLogger() self.account = account self.container = container - self.db_version = -1 + self._db_version = -1 def initialize(self, put_timestamp=None): """ @@ -645,6 +645,15 @@ class ContainerBroker(DatabaseBroker): ''', (self.account, self.container, normalize_timestamp(time.time()), str(uuid4()), put_timestamp)) + def _get_db_version(self, conn): + if self._db_version == -1: + self._db_version = 0 + for row in conn.execute(''' + SELECT name FROM sqlite_master + WHERE name = 'ix_object_deleted_name' '''): + self._db_version = 1 + return self._db_version + def _newid(self, conn): conn.execute(''' UPDATE container_stat @@ -813,12 +822,6 @@ class ContainerBroker(DatabaseBroker): reported_object_count, reported_bytes_used, hash, id) """ with self.get() as conn: - if self.db_version == -1: - self.db_version = 0 - for row in conn.execute(''' - SELECT name FROM sqlite_master - WHERE name = 'ix_object_deleted_name' '''): - self.db_version = 1 return conn.execute(''' SELECT account, container, created_at, put_timestamp, delete_timestamp, object_count, bytes_used, @@ -913,7 +916,7 @@ class ContainerBroker(DatabaseBroker): elif prefix: query += ' name >= ? AND' query_args.append(prefix) - if self.db_version < 1: + if self._get_db_version(conn) < 1: query += ' +deleted = 0 ORDER BY name LIMIT ?' else: query += ' deleted = 0 ORDER BY name LIMIT ?' @@ -1094,6 +1097,15 @@ class AccountBroker(DatabaseBroker): ''', (self.account, normalize_timestamp(time.time()), str(uuid4()), put_timestamp)) + def _get_db_version(self, conn): + if self._db_version == -1: + self._db_version = 0 + for row in conn.execute(''' + SELECT name FROM sqlite_master + WHERE name = 'ix_container_deleted_name' '''): + self._db_version = 1 + return self._db_version + def update_put_timestamp(self, timestamp): """ Update the put_timestamp. Only modifies it if it is greater than @@ -1299,12 +1311,6 @@ class AccountBroker(DatabaseBroker): bytes_used, hash, id) """ with self.get() as conn: - if self.db_version == -1: - self.db_version = 0 - for row in conn.execute(''' - SELECT name FROM sqlite_master - WHERE name = 'ix_container_deleted_name' '''): - self.db_version = 1 return conn.execute(''' SELECT account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id @@ -1373,7 +1379,7 @@ class AccountBroker(DatabaseBroker): elif prefix: query += ' name >= ? AND' query_args.append(prefix) - if self.db_version < 1: + if self._get_db_version(conn) < 1: query += ' +deleted = 0 ORDER BY name LIMIT ?' else: query += ' deleted = 0 ORDER BY name LIMIT ?' From 98090b7217c69bba06b2f9ecb1dfaceb29de877d Mon Sep 17 00:00:00 2001 From: gholt Date: Fri, 4 Feb 2011 11:50:30 -0800 Subject: [PATCH 9/9] Fix account db change --- swift/common/db.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/swift/common/db.py b/swift/common/db.py index 2341f8141f..83cd0e8188 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -1430,12 +1430,8 @@ class AccountBroker(DatabaseBroker): SELECT name, put_timestamp, delete_timestamp, object_count, bytes_used, deleted FROM container WHERE name = ? AND - (put_timestamp < ? OR delete_timestamp < ? OR - object_count != ? OR bytes_used != ?) AND - deleted IN (0, 1)''', - (rec['name'], rec['put_timestamp'], - rec['delete_timestamp'], rec['object_count'], - rec['bytes_used'])) + deleted IN (0, 1) + ''', (rec['name'],)) curs.row_factory = None row = curs.fetchone() if row: @@ -1447,16 +1443,16 @@ class AccountBroker(DatabaseBroker): record[1] = row[1] if row[2] > record[2]: # Keep newest delete_timestamp record[2] = row[2] - conn.execute(''' - DELETE FROM container WHERE name = ? AND - deleted IN (0, 1) - ''', (record[0],)) # If deleted, mark as such if record[2] > record[1] and \ record[3] in (None, '', 0, '0'): record[5] = 1 else: record[5] = 0 + conn.execute(''' + DELETE FROM container WHERE name = ? AND + deleted IN (0, 1) + ''', (record[0],)) conn.execute(''' INSERT INTO container (name, put_timestamp, delete_timestamp, object_count, bytes_used,