Combine acc/cont put_* methods and fix their lock problem.

The container backend is supposed to build a pending file and,
when it gets to a certain size, flush it all at once into the
sqlite db. Before this fix, many concurrent threads would ask
what the pending file size is to see if they should flush
instead of just appending to the pending file. The problem is
that many would ask, find it's too big, and try to get a lock.
The first one wins, flushes, but all the other waiting threads
still think they have to flush- which is a much slower opertaion
than just the append. This change gets the lock first and makes
sure that merge_items is only called when the pending file is full.

Change-Id: I29cfa13a48c8f7d16dd414b2288d50461adbafd2
This commit is contained in:
David Goetz 2014-08-28 14:31:29 -07:00
parent 8d02147d04
commit 849b21a442
3 changed files with 82 additions and 84 deletions

View File

@ -16,17 +16,14 @@
Pluggable Back-end for Account Server
"""
import os
from uuid import uuid4
import time
import cPickle as pickle
import errno
import sqlite3
from swift.common.utils import Timestamp, lock_parent_directory
from swift.common.db import DatabaseBroker, DatabaseConnectionError, \
PENDING_CAP, PICKLE_PROTOCOL, utf8encode
from swift.common.utils import Timestamp
from swift.common.db import DatabaseBroker, utf8encode
DATADIR = 'accounts'
@ -235,6 +232,12 @@ class AccountBroker(DatabaseBroker):
'SELECT container_count from account_stat').fetchone()
return (row[0] == 0)
def make_tuple_for_pickle(self, record):
return (record['name'], record['put_timestamp'],
record['delete_timestamp'], record['object_count'],
record['bytes_used'], record['deleted'],
record['storage_policy_index'])
def put_container(self, name, put_timestamp, delete_timestamp,
object_count, bytes_used, storage_policy_index):
"""
@ -258,31 +261,7 @@ class AccountBroker(DatabaseBroker):
'bytes_used': bytes_used,
'deleted': deleted,
'storage_policy_index': storage_policy_index}
if self.db_file == ':memory:':
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError as err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with lock_parent_directory(self.pending_file,
self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, put_timestamp, delete_timestamp, object_count,
bytes_used, deleted, storage_policy_index),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
self.put_record(record)
def _is_deleted_info(self, status, container_count, delete_timestamp,
put_timestamp):

View File

@ -23,6 +23,7 @@ from uuid import uuid4
import sys
import time
import errno
import cPickle as pickle
from swift import gettext_ as _
from tempfile import mkstemp
@ -550,10 +551,36 @@ class DatabaseBroker(object):
curs.row_factory = dict_factory
return curs.fetchone()
def put_record(self, record):
if self.db_file == ':memory:':
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
with lock_parent_directory(self.pending_file, self.pending_timeout):
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError as err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
self.make_tuple_for_pickle(record),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def _commit_puts(self, item_list=None):
"""
Scan for .pending files and commit the found records by feeding them
to merge_items().
to merge_items(). Assume that lock_parent_directory has already been
called.
:param item_list: A list of items to commit in addition to .pending
"""
@ -561,36 +588,39 @@ class DatabaseBroker(object):
return
if item_list is None:
item_list = []
with lock_parent_directory(self.pending_file, self.pending_timeout):
self._preallocate()
if not os.path.getsize(self.pending_file):
if item_list:
self.merge_items(item_list)
return
with open(self.pending_file, 'r+b') as fp:
for entry in fp.read().split(':'):
if entry:
try:
self._commit_puts_load(item_list, entry)
except Exception:
self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'),
{'file': self.pending_file, 'entry': entry})
if item_list:
self.merge_items(item_list)
try:
os.ftruncate(fp.fileno(), 0)
except OSError as err:
if err.errno != errno.ENOENT:
raise
self._preallocate()
if not os.path.getsize(self.pending_file):
if item_list:
self.merge_items(item_list)
return
with open(self.pending_file, 'r+b') as fp:
for entry in fp.read().split(':'):
if entry:
try:
self._commit_puts_load(item_list, entry)
except Exception:
self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'),
{'file': self.pending_file, 'entry': entry})
if item_list:
self.merge_items(item_list)
try:
os.ftruncate(fp.fileno(), 0)
except OSError as err:
if err.errno != errno.ENOENT:
raise
def _commit_puts_stale_ok(self):
"""
Catch failures of _commit_puts() if broker is intended for
reading of stats, and thus does not care for pending updates.
"""
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
return
try:
self._commit_puts()
with lock_parent_directory(self.pending_file,
self.pending_timeout):
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
@ -603,6 +633,13 @@ class DatabaseBroker(object):
"""
raise NotImplementedError
def make_tuple_for_pickle(self, record):
"""
Turn this db record dict into the format this service uses for
pending pickles.
"""
raise NotImplementedError
def merge_syncs(self, sync_points, incoming=True):
"""
Merge a list of sync points with the incoming sync table.
@ -731,7 +768,10 @@ class DatabaseBroker(object):
:param age_timestamp: max created_at timestamp of object rows to delete
:param sync_timestamp: max update_at timestamp of sync rows to delete
"""
self._commit_puts()
if self.db_file != ':memory:' and os.path.exists(self.pending_file):
with lock_parent_directory(self.pending_file,
self.pending_timeout):
self._commit_puts()
with self.get() as conn:
conn.execute('''
DELETE FROM %s WHERE deleted = 1 AND %s < ?

View File

@ -20,13 +20,11 @@ import os
from uuid import uuid4
import time
import cPickle as pickle
import errno
import sqlite3
from swift.common.utils import Timestamp, lock_parent_directory
from swift.common.db import DatabaseBroker, DatabaseConnectionError, \
PENDING_CAP, PICKLE_PROTOCOL, utf8encode
from swift.common.utils import Timestamp
from swift.common.db import DatabaseBroker, utf8encode
DATADIR = 'containers'
@ -317,6 +315,11 @@ class ContainerBroker(DatabaseBroker):
self.put_object(name, timestamp, 0, 'application/deleted', 'noetag',
deleted=1, storage_policy_index=storage_policy_index)
def make_tuple_for_pickle(self, record):
return (record['name'], record['created_at'], record['size'],
record['content_type'], record['etag'], record['deleted'],
record['storage_policy_index'])
def put_object(self, name, timestamp, size, content_type, etag, deleted=0,
storage_policy_index=0):
"""
@ -335,31 +338,7 @@ class ContainerBroker(DatabaseBroker):
'content_type': content_type, 'etag': etag,
'deleted': deleted,
'storage_policy_index': storage_policy_index}
if self.db_file == ':memory:':
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError as err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with lock_parent_directory(self.pending_file,
self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted,
storage_policy_index),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
self.put_record(record)
def _is_deleted_info(self, object_count, put_timestamp, delete_timestamp,
**kwargs):