diff --git a/swift/account/backend.py b/swift/account/backend.py index 9fc3c27468..1ad37c22c1 100644 --- a/swift/account/backend.py +++ b/swift/account/backend.py @@ -16,17 +16,14 @@ Pluggable Back-end for Account Server """ -import os from uuid import uuid4 import time import cPickle as pickle -import errno import sqlite3 -from swift.common.utils import Timestamp, lock_parent_directory -from swift.common.db import DatabaseBroker, DatabaseConnectionError, \ - PENDING_CAP, PICKLE_PROTOCOL, utf8encode +from swift.common.utils import Timestamp +from swift.common.db import DatabaseBroker, utf8encode DATADIR = 'accounts' @@ -235,6 +232,12 @@ class AccountBroker(DatabaseBroker): 'SELECT container_count from account_stat').fetchone() return (row[0] == 0) + def make_tuple_for_pickle(self, record): + return (record['name'], record['put_timestamp'], + record['delete_timestamp'], record['object_count'], + record['bytes_used'], record['deleted'], + record['storage_policy_index']) + def put_container(self, name, put_timestamp, delete_timestamp, object_count, bytes_used, storage_policy_index): """ @@ -258,31 +261,7 @@ class AccountBroker(DatabaseBroker): 'bytes_used': bytes_used, 'deleted': deleted, 'storage_policy_index': storage_policy_index} - if self.db_file == ':memory:': - self.merge_items([record]) - return - if not os.path.exists(self.db_file): - raise DatabaseConnectionError(self.db_file, "DB doesn't exist") - pending_size = 0 - try: - pending_size = os.path.getsize(self.pending_file) - except OSError as err: - if err.errno != errno.ENOENT: - raise - if pending_size > PENDING_CAP: - self._commit_puts([record]) - else: - with lock_parent_directory(self.pending_file, - self.pending_timeout): - with open(self.pending_file, 'a+b') as fp: - # Colons aren't used in base64 encoding; so they are our - # delimiter - fp.write(':') - fp.write(pickle.dumps( - (name, put_timestamp, delete_timestamp, object_count, - bytes_used, deleted, storage_policy_index), - protocol=PICKLE_PROTOCOL).encode('base64')) - fp.flush() + self.put_record(record) def _is_deleted_info(self, status, container_count, delete_timestamp, put_timestamp): diff --git a/swift/common/db.py b/swift/common/db.py index c6523817c5..daad589ed4 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -23,6 +23,7 @@ from uuid import uuid4 import sys import time import errno +import cPickle as pickle from swift import gettext_ as _ from tempfile import mkstemp @@ -550,10 +551,36 @@ class DatabaseBroker(object): curs.row_factory = dict_factory return curs.fetchone() + def put_record(self, record): + if self.db_file == ':memory:': + self.merge_items([record]) + return + if not os.path.exists(self.db_file): + raise DatabaseConnectionError(self.db_file, "DB doesn't exist") + with lock_parent_directory(self.pending_file, self.pending_timeout): + pending_size = 0 + try: + pending_size = os.path.getsize(self.pending_file) + except OSError as err: + if err.errno != errno.ENOENT: + raise + if pending_size > PENDING_CAP: + self._commit_puts([record]) + else: + with open(self.pending_file, 'a+b') as fp: + # Colons aren't used in base64 encoding; so they are our + # delimiter + fp.write(':') + fp.write(pickle.dumps( + self.make_tuple_for_pickle(record), + protocol=PICKLE_PROTOCOL).encode('base64')) + fp.flush() + def _commit_puts(self, item_list=None): """ Scan for .pending files and commit the found records by feeding them - to merge_items(). + to merge_items(). Assume that lock_parent_directory has already been + called. :param item_list: A list of items to commit in addition to .pending """ @@ -561,36 +588,39 @@ class DatabaseBroker(object): return if item_list is None: item_list = [] - with lock_parent_directory(self.pending_file, self.pending_timeout): - self._preallocate() - if not os.path.getsize(self.pending_file): - if item_list: - self.merge_items(item_list) - return - with open(self.pending_file, 'r+b') as fp: - for entry in fp.read().split(':'): - if entry: - try: - self._commit_puts_load(item_list, entry) - except Exception: - self.logger.exception( - _('Invalid pending entry %(file)s: %(entry)s'), - {'file': self.pending_file, 'entry': entry}) - if item_list: - self.merge_items(item_list) - try: - os.ftruncate(fp.fileno(), 0) - except OSError as err: - if err.errno != errno.ENOENT: - raise + self._preallocate() + if not os.path.getsize(self.pending_file): + if item_list: + self.merge_items(item_list) + return + with open(self.pending_file, 'r+b') as fp: + for entry in fp.read().split(':'): + if entry: + try: + self._commit_puts_load(item_list, entry) + except Exception: + self.logger.exception( + _('Invalid pending entry %(file)s: %(entry)s'), + {'file': self.pending_file, 'entry': entry}) + if item_list: + self.merge_items(item_list) + try: + os.ftruncate(fp.fileno(), 0) + except OSError as err: + if err.errno != errno.ENOENT: + raise def _commit_puts_stale_ok(self): """ Catch failures of _commit_puts() if broker is intended for reading of stats, and thus does not care for pending updates. """ + if self.db_file == ':memory:' or not os.path.exists(self.pending_file): + return try: - self._commit_puts() + with lock_parent_directory(self.pending_file, + self.pending_timeout): + self._commit_puts() except LockTimeout: if not self.stale_reads_ok: raise @@ -603,6 +633,13 @@ class DatabaseBroker(object): """ raise NotImplementedError + def make_tuple_for_pickle(self, record): + """ + Turn this db record dict into the format this service uses for + pending pickles. + """ + raise NotImplementedError + def merge_syncs(self, sync_points, incoming=True): """ Merge a list of sync points with the incoming sync table. @@ -731,7 +768,10 @@ class DatabaseBroker(object): :param age_timestamp: max created_at timestamp of object rows to delete :param sync_timestamp: max update_at timestamp of sync rows to delete """ - self._commit_puts() + if self.db_file != ':memory:' and os.path.exists(self.pending_file): + with lock_parent_directory(self.pending_file, + self.pending_timeout): + self._commit_puts() with self.get() as conn: conn.execute(''' DELETE FROM %s WHERE deleted = 1 AND %s < ? diff --git a/swift/container/backend.py b/swift/container/backend.py index eb27369507..df67a3ece6 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -20,13 +20,11 @@ import os from uuid import uuid4 import time import cPickle as pickle -import errno import sqlite3 -from swift.common.utils import Timestamp, lock_parent_directory -from swift.common.db import DatabaseBroker, DatabaseConnectionError, \ - PENDING_CAP, PICKLE_PROTOCOL, utf8encode +from swift.common.utils import Timestamp +from swift.common.db import DatabaseBroker, utf8encode DATADIR = 'containers' @@ -317,6 +315,11 @@ class ContainerBroker(DatabaseBroker): self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', deleted=1, storage_policy_index=storage_policy_index) + def make_tuple_for_pickle(self, record): + return (record['name'], record['created_at'], record['size'], + record['content_type'], record['etag'], record['deleted'], + record['storage_policy_index']) + def put_object(self, name, timestamp, size, content_type, etag, deleted=0, storage_policy_index=0): """ @@ -335,31 +338,7 @@ class ContainerBroker(DatabaseBroker): 'content_type': content_type, 'etag': etag, 'deleted': deleted, 'storage_policy_index': storage_policy_index} - if self.db_file == ':memory:': - self.merge_items([record]) - return - if not os.path.exists(self.db_file): - raise DatabaseConnectionError(self.db_file, "DB doesn't exist") - pending_size = 0 - try: - pending_size = os.path.getsize(self.pending_file) - except OSError as err: - if err.errno != errno.ENOENT: - raise - if pending_size > PENDING_CAP: - self._commit_puts([record]) - else: - with lock_parent_directory(self.pending_file, - self.pending_timeout): - with open(self.pending_file, 'a+b') as fp: - # Colons aren't used in base64 encoding; so they are our - # delimiter - fp.write(':') - fp.write(pickle.dumps( - (name, timestamp, size, content_type, etag, deleted, - storage_policy_index), - protocol=PICKLE_PROTOCOL).encode('base64')) - fp.flush() + self.put_record(record) def _is_deleted_info(self, object_count, put_timestamp, delete_timestamp, **kwargs):