New index code for the dbs; should improve performance for containers with a lot of recent deletes.

This commit is contained in:
gholt 2011-03-16 23:12:44 +00:00 committed by Tarmac
commit 2976515626

View File

@ -165,6 +165,7 @@ class DatabaseBroker(object):
self.logger = logger or logging.getLogger() self.logger = logger or logging.getLogger()
self.account = account self.account = account
self.container = container self.container = container
self._db_version = -1
def initialize(self, put_timestamp=None): def initialize(self, put_timestamp=None):
""" """
@ -607,7 +608,7 @@ class ContainerBroker(DatabaseBroker):
conn.executescript(""" conn.executescript("""
CREATE TABLE object ( CREATE TABLE object (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT, ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE, name TEXT,
created_at TEXT, created_at TEXT,
size INTEGER, size INTEGER,
content_type TEXT, content_type TEXT,
@ -615,7 +616,7 @@ class ContainerBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0 deleted INTEGER DEFAULT 0
); );
CREATE INDEX ix_object_deleted ON object (deleted); CREATE INDEX ix_object_deleted_name ON object (deleted, name);
CREATE TRIGGER object_insert AFTER INSERT ON object CREATE TRIGGER object_insert AFTER INSERT ON object
BEGIN BEGIN
@ -678,6 +679,15 @@ class ContainerBroker(DatabaseBroker):
''', (self.account, self.container, normalize_timestamp(time.time()), ''', (self.account, self.container, normalize_timestamp(time.time()),
str(uuid4()), put_timestamp)) str(uuid4()), put_timestamp))
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_object_deleted_name' '''):
self._db_version = 1
return self._db_version
def _newid(self, conn): def _newid(self, conn):
conn.execute(''' conn.execute('''
UPDATE container_stat UPDATE container_stat
@ -910,37 +920,6 @@ class ContainerBroker(DatabaseBroker):
''', (put_timestamp, delete_timestamp, object_count, bytes_used)) ''', (put_timestamp, delete_timestamp, object_count, bytes_used))
conn.commit() conn.commit()
def get_random_objects(self, max_count=100):
"""
Get random objects from the DB. This is used by the container_auditor
when testing random objects for existence.
:param max_count: maximum number of objects to get
:returns: list of object names
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
rv = []
with self.get() as conn:
row = conn.execute('''
SELECT ROWID FROM object ORDER BY ROWID DESC LIMIT 1
''').fetchone()
if not row:
return []
max_rowid = row['ROWID']
for _junk in xrange(min(max_count, max_rowid)):
row = conn.execute('''
SELECT name FROM object WHERE ROWID >= ? AND +deleted = 0
LIMIT 1
''', (randint(0, max_rowid),)).fetchone()
if row:
rv.append(row['name'])
return list(set(rv))
def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter, def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter,
path=None, format=None): path=None, format=None):
""" """
@ -988,7 +967,11 @@ class ContainerBroker(DatabaseBroker):
elif prefix: elif prefix:
query += ' name >= ? AND' query += ' name >= ? AND'
query_args.append(prefix) query_args.append(prefix)
query += ' +deleted = 0 ORDER BY name LIMIT ?' if self.get_db_version(conn) < 1:
query += ' +deleted = 0'
else:
query += ' deleted = 0'
query += ' ORDER BY name LIMIT ?'
query_args.append(limit - len(results)) query_args.append(limit - len(results))
curs = conn.execute(query, query_args) curs = conn.execute(query, query_args)
curs.row_factory = None curs.row_factory = None
@ -1035,19 +1018,23 @@ class ContainerBroker(DatabaseBroker):
with self.get() as conn: with self.get() as conn:
max_rowid = -1 max_rowid = -1
for rec in item_list: for rec in item_list:
conn.execute(''' query = '''
DELETE FROM object WHERE name = ? AND DELETE FROM object
(created_at < ?) WHERE name = ? AND (created_at < ?)
''', (rec['name'], rec['created_at'])) '''
try: if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
conn.execute(query, (rec['name'], rec['created_at']))
query = 'SELECT 1 FROM object WHERE name = ?'
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
if not conn.execute(query, (rec['name'],)).fetchall():
conn.execute(''' conn.execute('''
INSERT INTO object (name, created_at, size, INSERT INTO object (name, created_at, size,
content_type, etag, deleted) content_type, etag, deleted)
VALUES (?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?)
''', ([rec['name'], rec['created_at'], rec['size'], ''', ([rec['name'], rec['created_at'], rec['size'],
rec['content_type'], rec['etag'], rec['deleted']])) rec['content_type'], rec['etag'], rec['deleted']]))
except sqlite3.IntegrityError:
pass
if source: if source:
max_rowid = max(max_rowid, rec['ROWID']) max_rowid = max(max_rowid, rec['ROWID'])
if source: if source:
@ -1091,7 +1078,7 @@ class AccountBroker(DatabaseBroker):
conn.executescript(""" conn.executescript("""
CREATE TABLE container ( CREATE TABLE container (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT, ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE, name TEXT,
put_timestamp TEXT, put_timestamp TEXT,
delete_timestamp TEXT, delete_timestamp TEXT,
object_count INTEGER, object_count INTEGER,
@ -1099,8 +1086,9 @@ class AccountBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0 deleted INTEGER DEFAULT 0
); );
CREATE INDEX ix_container_deleted ON container (deleted); CREATE INDEX ix_container_deleted_name ON
CREATE INDEX ix_container_name ON container (name); container (deleted, name);
CREATE TRIGGER container_insert AFTER INSERT ON container CREATE TRIGGER container_insert AFTER INSERT ON container
BEGIN BEGIN
UPDATE account_stat UPDATE account_stat
@ -1164,6 +1152,15 @@ class AccountBroker(DatabaseBroker):
''', (self.account, normalize_timestamp(time.time()), str(uuid4()), ''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
put_timestamp)) put_timestamp))
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_container_deleted_name' '''):
self._db_version = 1
return self._db_version
def update_put_timestamp(self, timestamp): def update_put_timestamp(self, timestamp):
""" """
Update the put_timestamp. Only modifies it if it is greater than Update the put_timestamp. Only modifies it if it is greater than
@ -1413,38 +1410,6 @@ class AccountBroker(DatabaseBroker):
FROM account_stat FROM account_stat
''').fetchone() ''').fetchone()
def get_random_containers(self, max_count=100):
"""
Get random containers from the DB. This is used by the
account_auditor when testing random containerss for existence.
:param max_count: maximum number of containers to get
:returns: list of container names
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
rv = []
with self.get() as conn:
row = conn.execute('''
SELECT ROWID FROM container ORDER BY ROWID DESC LIMIT 1
''').fetchone()
if not row:
return []
max_rowid = row['ROWID']
for _junk in xrange(min(max_count, max_rowid)):
row = conn.execute('''
SELECT name FROM container WHERE
ROWID >= ? AND +deleted = 0
LIMIT 1
''', (randint(0, max_rowid),)).fetchone()
if row:
rv.append(row['name'])
return list(set(rv))
def list_containers_iter(self, limit, marker, end_marker, prefix, def list_containers_iter(self, limit, marker, end_marker, prefix,
delimiter): delimiter):
""" """
@ -1485,7 +1450,11 @@ class AccountBroker(DatabaseBroker):
elif prefix: elif prefix:
query += ' name >= ? AND' query += ' name >= ? AND'
query_args.append(prefix) query_args.append(prefix)
query += ' +deleted = 0 ORDER BY name LIMIT ?' if self.get_db_version(conn) < 1:
query += ' +deleted = 0'
else:
query += ' deleted = 0'
query += ' ORDER BY name LIMIT ?'
query_args.append(limit - len(results)) query_args.append(limit - len(results))
curs = conn.execute(query, query_args) curs = conn.execute(query, query_args)
curs.row_factory = None curs.row_factory = None
@ -1529,51 +1498,41 @@ class AccountBroker(DatabaseBroker):
record = [rec['name'], rec['put_timestamp'], record = [rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'], rec['delete_timestamp'], rec['object_count'],
rec['bytes_used'], rec['deleted']] rec['bytes_used'], rec['deleted']]
try: query = '''
conn.execute(''' SELECT name, put_timestamp, delete_timestamp,
INSERT INTO container (name, put_timestamp, object_count, bytes_used, deleted
delete_timestamp, object_count, bytes_used, FROM container WHERE name = ?
deleted) '''
VALUES (?, ?, ?, ?, ?, ?) if self.get_db_version(conn) >= 1:
''', record) query += ' AND deleted IN (0, 1)'
except sqlite3.IntegrityError: curs = conn.execute(query, (rec['name'],))
curs = conn.execute(''' curs.row_factory = None
SELECT name, put_timestamp, delete_timestamp, row = curs.fetchone()
object_count, bytes_used, deleted if row:
FROM container WHERE name = ? AND row = list(row)
(put_timestamp < ? OR delete_timestamp < ? OR for i in xrange(5):
object_count != ? OR bytes_used != ?)''', if record[i] is None and row[i] is not None:
(rec['name'], rec['put_timestamp'], record[i] = row[i]
rec['delete_timestamp'], rec['object_count'], if row[1] > record[1]: # Keep newest put_timestamp
rec['bytes_used'])) record[1] = row[1]
curs.row_factory = None if row[2] > record[2]: # Keep newest delete_timestamp
row = curs.fetchone() record[2] = row[2]
if row: # If deleted, mark as such
row = list(row) if record[2] > record[1] and \
for i in xrange(5): record[3] in (None, '', 0, '0'):
if record[i] is None and row[i] is not None: record[5] = 1
record[i] = row[i] else:
if row[1] > record[1]: # Keep newest put_timestamp record[5] = 0
record[1] = row[1] conn.execute('''
if row[2] > record[2]: # Keep newest delete_timestamp DELETE FROM container WHERE name = ? AND
record[2] = row[2] deleted IN (0, 1)
conn.execute('DELETE FROM container WHERE name = ?', ''', (record[0],))
(record[0],)) conn.execute('''
# If deleted, mark as such INSERT INTO container (name, put_timestamp,
if record[2] > record[1] and \ delete_timestamp, object_count, bytes_used,
record[3] in (None, '', 0, '0'): deleted)
record[5] = 1 VALUES (?, ?, ?, ?, ?, ?)
else: ''', record)
record[5] = 0
try:
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
except sqlite3.IntegrityError:
continue
if source: if source:
max_rowid = max(max_rowid, rec['ROWID']) max_rowid = max(max_rowid, rec['ROWID'])
if source: if source: