Merge "Combine acc/cont put_* methods and fix their lock problem."

This commit is contained in:
Jenkins 2014-09-06 19:51:40 +00:00 committed by Gerrit Code Review
commit 8bb33cd673
3 changed files with 82 additions and 84 deletions

View File

@ -16,17 +16,14 @@
Pluggable Back-end for Account Server Pluggable Back-end for Account Server
""" """
import os
from uuid import uuid4 from uuid import uuid4
import time import time
import cPickle as pickle import cPickle as pickle
import errno
import sqlite3 import sqlite3
from swift.common.utils import Timestamp, lock_parent_directory from swift.common.utils import Timestamp
from swift.common.db import DatabaseBroker, DatabaseConnectionError, \ from swift.common.db import DatabaseBroker, utf8encode
PENDING_CAP, PICKLE_PROTOCOL, utf8encode
DATADIR = 'accounts' DATADIR = 'accounts'
@ -235,6 +232,12 @@ class AccountBroker(DatabaseBroker):
'SELECT container_count from account_stat').fetchone() 'SELECT container_count from account_stat').fetchone()
return (row[0] == 0) return (row[0] == 0)
def make_tuple_for_pickle(self, record):
return (record['name'], record['put_timestamp'],
record['delete_timestamp'], record['object_count'],
record['bytes_used'], record['deleted'],
record['storage_policy_index'])
def put_container(self, name, put_timestamp, delete_timestamp, def put_container(self, name, put_timestamp, delete_timestamp,
object_count, bytes_used, storage_policy_index): object_count, bytes_used, storage_policy_index):
""" """
@ -258,31 +261,7 @@ class AccountBroker(DatabaseBroker):
'bytes_used': bytes_used, 'bytes_used': bytes_used,
'deleted': deleted, 'deleted': deleted,
'storage_policy_index': storage_policy_index} 'storage_policy_index': storage_policy_index}
if self.db_file == ':memory:': self.put_record(record)
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError as err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with lock_parent_directory(self.pending_file,
self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, put_timestamp, delete_timestamp, object_count,
bytes_used, deleted, storage_policy_index),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def _is_deleted_info(self, status, container_count, delete_timestamp, def _is_deleted_info(self, status, container_count, delete_timestamp,
put_timestamp): put_timestamp):

View File

@ -23,6 +23,7 @@ from uuid import uuid4
import sys import sys
import time import time
import errno import errno
import cPickle as pickle
from swift import gettext_ as _ from swift import gettext_ as _
from tempfile import mkstemp from tempfile import mkstemp
@ -550,10 +551,36 @@ class DatabaseBroker(object):
curs.row_factory = dict_factory curs.row_factory = dict_factory
return curs.fetchone() return curs.fetchone()
def put_record(self, record):
if self.db_file == ':memory:':
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
with lock_parent_directory(self.pending_file, self.pending_timeout):
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError as err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
self.make_tuple_for_pickle(record),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def _commit_puts(self, item_list=None): def _commit_puts(self, item_list=None):
""" """
Scan for .pending files and commit the found records by feeding them Scan for .pending files and commit the found records by feeding them
to merge_items(). to merge_items(). Assume that lock_parent_directory has already been
called.
:param item_list: A list of items to commit in addition to .pending :param item_list: A list of items to commit in addition to .pending
""" """
@ -561,36 +588,39 @@ class DatabaseBroker(object):
return return
if item_list is None: if item_list is None:
item_list = [] item_list = []
with lock_parent_directory(self.pending_file, self.pending_timeout): self._preallocate()
self._preallocate() if not os.path.getsize(self.pending_file):
if not os.path.getsize(self.pending_file): if item_list:
if item_list: self.merge_items(item_list)
self.merge_items(item_list) return
return with open(self.pending_file, 'r+b') as fp:
with open(self.pending_file, 'r+b') as fp: for entry in fp.read().split(':'):
for entry in fp.read().split(':'): if entry:
if entry: try:
try: self._commit_puts_load(item_list, entry)
self._commit_puts_load(item_list, entry) except Exception:
except Exception: self.logger.exception(
self.logger.exception( _('Invalid pending entry %(file)s: %(entry)s'),
_('Invalid pending entry %(file)s: %(entry)s'), {'file': self.pending_file, 'entry': entry})
{'file': self.pending_file, 'entry': entry}) if item_list:
if item_list: self.merge_items(item_list)
self.merge_items(item_list) try:
try: os.ftruncate(fp.fileno(), 0)
os.ftruncate(fp.fileno(), 0) except OSError as err:
except OSError as err: if err.errno != errno.ENOENT:
if err.errno != errno.ENOENT: raise
raise
def _commit_puts_stale_ok(self): def _commit_puts_stale_ok(self):
""" """
Catch failures of _commit_puts() if broker is intended for Catch failures of _commit_puts() if broker is intended for
reading of stats, and thus does not care for pending updates. reading of stats, and thus does not care for pending updates.
""" """
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
return
try: try:
self._commit_puts() with lock_parent_directory(self.pending_file,
self.pending_timeout):
self._commit_puts()
except LockTimeout: except LockTimeout:
if not self.stale_reads_ok: if not self.stale_reads_ok:
raise raise
@ -603,6 +633,13 @@ class DatabaseBroker(object):
""" """
raise NotImplementedError raise NotImplementedError
def make_tuple_for_pickle(self, record):
"""
Turn this db record dict into the format this service uses for
pending pickles.
"""
raise NotImplementedError
def merge_syncs(self, sync_points, incoming=True): def merge_syncs(self, sync_points, incoming=True):
""" """
Merge a list of sync points with the incoming sync table. Merge a list of sync points with the incoming sync table.
@ -731,7 +768,10 @@ class DatabaseBroker(object):
:param age_timestamp: max created_at timestamp of object rows to delete :param age_timestamp: max created_at timestamp of object rows to delete
:param sync_timestamp: max update_at timestamp of sync rows to delete :param sync_timestamp: max update_at timestamp of sync rows to delete
""" """
self._commit_puts() if self.db_file != ':memory:' and os.path.exists(self.pending_file):
with lock_parent_directory(self.pending_file,
self.pending_timeout):
self._commit_puts()
with self.get() as conn: with self.get() as conn:
conn.execute(''' conn.execute('''
DELETE FROM %s WHERE deleted = 1 AND %s < ? DELETE FROM %s WHERE deleted = 1 AND %s < ?

View File

@ -20,13 +20,11 @@ import os
from uuid import uuid4 from uuid import uuid4
import time import time
import cPickle as pickle import cPickle as pickle
import errno
import sqlite3 import sqlite3
from swift.common.utils import Timestamp, lock_parent_directory from swift.common.utils import Timestamp
from swift.common.db import DatabaseBroker, DatabaseConnectionError, \ from swift.common.db import DatabaseBroker, utf8encode
PENDING_CAP, PICKLE_PROTOCOL, utf8encode
SQLITE_ARG_LIMIT = 999 SQLITE_ARG_LIMIT = 999
@ -320,6 +318,11 @@ class ContainerBroker(DatabaseBroker):
self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', self.put_object(name, timestamp, 0, 'application/deleted', 'noetag',
deleted=1, storage_policy_index=storage_policy_index) deleted=1, storage_policy_index=storage_policy_index)
def make_tuple_for_pickle(self, record):
return (record['name'], record['created_at'], record['size'],
record['content_type'], record['etag'], record['deleted'],
record['storage_policy_index'])
def put_object(self, name, timestamp, size, content_type, etag, deleted=0, def put_object(self, name, timestamp, size, content_type, etag, deleted=0,
storage_policy_index=0): storage_policy_index=0):
""" """
@ -338,31 +341,7 @@ class ContainerBroker(DatabaseBroker):
'content_type': content_type, 'etag': etag, 'content_type': content_type, 'etag': etag,
'deleted': deleted, 'deleted': deleted,
'storage_policy_index': storage_policy_index} 'storage_policy_index': storage_policy_index}
if self.db_file == ':memory:': self.put_record(record)
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError as err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with lock_parent_directory(self.pending_file,
self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted,
storage_policy_index),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def _is_deleted_info(self, object_count, put_timestamp, delete_timestamp, def _is_deleted_info(self, object_count, put_timestamp, delete_timestamp,
**kwargs): **kwargs):