diff --git a/swift/account/backend.py b/swift/account/backend.py index 2734548cf0..1ff940d4f6 100644 --- a/swift/account/backend.py +++ b/swift/account/backend.py @@ -22,7 +22,7 @@ import six.moves.cPickle as pickle import sqlite3 from swift.common.utils import Timestamp -from swift.common.db import DatabaseBroker, utf8encode +from swift.common.db import DatabaseBroker, utf8encode, zero_like DATADIR = 'accounts' @@ -233,7 +233,7 @@ class AccountBroker(DatabaseBroker): with self.get() as conn: row = conn.execute( 'SELECT container_count from account_stat').fetchone() - return (row[0] == 0) + return zero_like(row[0]) def make_tuple_for_pickle(self, record): return (record['name'], record['put_timestamp'], @@ -254,7 +254,7 @@ class AccountBroker(DatabaseBroker): :param storage_policy_index: the storage policy for this container """ if Timestamp(delete_timestamp) > Timestamp(put_timestamp) and \ - object_count in (None, '', 0, '0'): + zero_like(object_count): deleted = 1 else: deleted = 0 @@ -273,8 +273,7 @@ class AccountBroker(DatabaseBroker): :returns: True if the DB is considered to be deleted, False otherwise """ - return status == 'DELETED' or ( - container_count in (None, '', 0, '0') and + return status == 'DELETED' or zero_like(container_count) and ( Timestamp(delete_timestamp) > Timestamp(put_timestamp)) def _is_deleted(self, conn): @@ -509,7 +508,7 @@ class AccountBroker(DatabaseBroker): record[2] = row[2] # If deleted, mark as such if Timestamp(record[2]) > Timestamp(record[1]) and \ - record[3] in (None, '', 0, '0'): + zero_like(record[3]): record[5] = 1 else: record[5] = 0 diff --git a/swift/common/db.py b/swift/common/db.py index b05eeb8d11..6425e85034 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -71,6 +71,18 @@ def native_str_keys(metadata): metadata[k.decode('utf-8')] = sv +ZERO_LIKE_VALUES = {None, '', 0, '0'} + + +def zero_like(count): + """ + We've cargo culted our consumers to be tolerant of various expressions of + zero in our databases for backwards compatibility with less disciplined + producers. + """ + return count in ZERO_LIKE_VALUES + + def _db_timeout(timeout, db_file, call): with LockTimeout(timeout, db_file): retry_wait = 0.001 @@ -208,11 +220,27 @@ class DatabaseBroker(object): def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None, account=None, container=None, pending_timeout=None, - stale_reads_ok=False): - """Encapsulates working with a database.""" + stale_reads_ok=False, skip_commits=False): + """Encapsulates working with a database. + + :param db_file: path to a database file. + :param timeout: timeout used for database operations. + :param logger: a logger instance. + :param account: name of account. + :param container: name of container. + :param pending_timeout: timeout used when attempting to take a lock to + write to pending file. + :param stale_reads_ok: if True then no error is raised if pending + commits cannot be committed before the database is read, otherwise + an error is raised. + :param skip_commits: if True then this broker instance will never + commit records from the pending file to the database; + :meth:`~swift.common.db.DatabaseBroker.put_record` should not + called on brokers with skip_commits True. + """ self.conn = None - self.db_file = db_file - self.pending_file = self.db_file + '.pending' + self._db_file = db_file + self.pending_file = self._db_file + '.pending' self.pending_timeout = pending_timeout or 10 self.stale_reads_ok = stale_reads_ok self.db_dir = os.path.dirname(db_file) @@ -221,6 +249,7 @@ class DatabaseBroker(object): self.account = account self.container = container self._db_version = -1 + self.skip_commits = skip_commits def __str__(self): """ @@ -240,9 +269,9 @@ class DatabaseBroker(object): :param put_timestamp: internalized timestamp of initial PUT request :param storage_policy_index: only required for containers """ - if self.db_file == ':memory:': + if self._db_file == ':memory:': tmp_db_file = None - conn = get_db_connection(self.db_file, self.timeout) + conn = get_db_connection(self._db_file, self.timeout) else: mkdirs(self.db_dir) fd, tmp_db_file = mkstemp(suffix='.tmp', dir=self.db_dir) @@ -329,15 +358,22 @@ class DatabaseBroker(object): self._delete_db(conn, timestamp) conn.commit() + @property + def db_file(self): + return self._db_file + + def get_device_path(self): + suffix_path = os.path.dirname(self.db_dir) + partition_path = os.path.dirname(suffix_path) + dbs_path = os.path.dirname(partition_path) + return os.path.dirname(dbs_path) + def quarantine(self, reason): """ The database will be quarantined and a sqlite3.DatabaseError will be raised indicating the action taken. """ - prefix_path = os.path.dirname(self.db_dir) - partition_path = os.path.dirname(prefix_path) - dbs_path = os.path.dirname(partition_path) - device_path = os.path.dirname(dbs_path) + device_path = self.get_device_path() quar_path = os.path.join(device_path, 'quarantined', self.db_type + 's', os.path.basename(self.db_dir)) @@ -377,6 +413,20 @@ class DatabaseBroker(object): self.quarantine(exc_hint) + @contextmanager + def updated_timeout(self, new_timeout): + """Use with "with" statement; updates ``timeout`` within the block.""" + old_timeout = self.timeout + try: + self.timeout = new_timeout + if self.conn: + self.conn.timeout = new_timeout + yield old_timeout + finally: + self.timeout = old_timeout + if self.conn: + self.conn.timeout = old_timeout + @contextmanager def get(self): """Use with the "with" statement; returns a database connection.""" @@ -477,6 +527,23 @@ class DatabaseBroker(object): with self.get() as conn: return self._is_deleted(conn) + def empty(self): + """ + Check if the broker abstraction contains any undeleted records. + """ + raise NotImplementedError() + + def is_reclaimable(self, now, reclaim_age): + """ + Check if the broker abstraction is empty, and has been marked deleted + for at least a reclaim age. + """ + info = self.get_replication_info() + return (zero_like(info['count']) and + (Timestamp(now - reclaim_age) > + Timestamp(info['delete_timestamp']) > + Timestamp(info['put_timestamp']))) + def merge_timestamps(self, created_at, put_timestamp, delete_timestamp): """ Used in replication to handle updating timestamps. @@ -548,13 +615,15 @@ class DatabaseBroker(object): result.append({'remote_id': row[0], 'sync_point': row[1]}) return result - def get_max_row(self): + def get_max_row(self, table=None): + if not table: + table = self.db_contains_type query = ''' SELECT SQLITE_SEQUENCE.seq FROM SQLITE_SEQUENCE WHERE SQLITE_SEQUENCE.name == '%s' LIMIT 1 - ''' % (self.db_contains_type) + ''' % (table, ) with self.get() as conn: row = conn.execute(query).fetchone() return row[0] if row else -1 @@ -582,11 +651,26 @@ class DatabaseBroker(object): return curs.fetchone() def put_record(self, record): - if self.db_file == ':memory:': + """ + Put a record into the DB. If the DB has an associated pending file with + space then the record is appended to that file and a commit to the DB + is deferred. If the DB is in-memory or its pending file is full then + the record will be committed immediately. + + :param record: a record to be added to the DB. + :raises DatabaseConnectionError: if the DB file does not exist or if + ``skip_commits`` is True. + :raises LockTimeout: if a timeout occurs while waiting to take a lock + to write to the pending file. + """ + if self._db_file == ':memory:': self.merge_items([record]) return if not os.path.exists(self.db_file): raise DatabaseConnectionError(self.db_file, "DB doesn't exist") + if self.skip_commits: + raise DatabaseConnectionError(self.db_file, + 'commits not accepted') with lock_parent_directory(self.pending_file, self.pending_timeout): pending_size = 0 try: @@ -606,6 +690,10 @@ class DatabaseBroker(object): protocol=PICKLE_PROTOCOL).encode('base64')) fp.flush() + def _skip_commit_puts(self): + return (self._db_file == ':memory:' or self.skip_commits or not + os.path.exists(self.pending_file)) + def _commit_puts(self, item_list=None): """ Scan for .pending files and commit the found records by feeding them @@ -614,7 +702,13 @@ class DatabaseBroker(object): :param item_list: A list of items to commit in addition to .pending """ - if self.db_file == ':memory:' or not os.path.exists(self.pending_file): + if self._skip_commit_puts(): + if item_list: + # this broker instance should not be used to commit records, + # but if it is then raise an error rather than quietly + # discarding the records in item_list. + raise DatabaseConnectionError(self.db_file, + 'commits not accepted') return if item_list is None: item_list = [] @@ -645,7 +739,7 @@ class DatabaseBroker(object): Catch failures of _commit_puts() if broker is intended for reading of stats, and thus does not care for pending updates. """ - if self.db_file == ':memory:' or not os.path.exists(self.pending_file): + if self._skip_commit_puts(): return try: with lock_parent_directory(self.pending_file, @@ -663,6 +757,12 @@ class DatabaseBroker(object): """ raise NotImplementedError + def merge_items(self, item_list, source=None): + """ + Save :param:item_list to the database. + """ + raise NotImplementedError + def make_tuple_for_pickle(self, record): """ Turn this db record dict into the format this service uses for @@ -701,7 +801,7 @@ class DatabaseBroker(object): within 512k of a boundary, it allocates to the next boundary. Boundaries are 2m, 5m, 10m, 25m, 50m, then every 50m after. """ - if not DB_PREALLOCATION or self.db_file == ':memory:': + if not DB_PREALLOCATION or self._db_file == ':memory:': return MB = (1024 * 1024) @@ -830,40 +930,46 @@ class DatabaseBroker(object): def reclaim(self, age_timestamp, sync_timestamp): """ - Delete rows from the db_contains_type table that are marked deleted - and whose created_at timestamp is < age_timestamp. Also deletes rows - from incoming_sync and outgoing_sync where the updated_at timestamp is - < sync_timestamp. + Delete reclaimable rows and metadata from the db. - In addition, this calls the DatabaseBroker's :func:`_reclaim` method. + By default this method will delete rows from the db_contains_type table + that are marked deleted and whose created_at timestamp is < + age_timestamp, and deletes rows from incoming_sync and outgoing_sync + where the updated_at timestamp is < sync_timestamp. In addition, this + calls the :meth:`_reclaim_metadata` method. + + Subclasses may reclaim other items by overriding :meth:`_reclaim`. :param age_timestamp: max created_at timestamp of object rows to delete :param sync_timestamp: max update_at timestamp of sync rows to delete """ - if self.db_file != ':memory:' and os.path.exists(self.pending_file): + if not self._skip_commit_puts(): with lock_parent_directory(self.pending_file, self.pending_timeout): self._commit_puts() with self.get() as conn: - conn.execute(''' - DELETE FROM %s WHERE deleted = 1 AND %s < ? - ''' % (self.db_contains_type, self.db_reclaim_timestamp), - (age_timestamp,)) - try: - conn.execute(''' - DELETE FROM outgoing_sync WHERE updated_at < ? - ''', (sync_timestamp,)) - conn.execute(''' - DELETE FROM incoming_sync WHERE updated_at < ? - ''', (sync_timestamp,)) - except sqlite3.OperationalError as err: - # Old dbs didn't have updated_at in the _sync tables. - if 'no such column: updated_at' not in str(err): - raise - DatabaseBroker._reclaim(self, conn, age_timestamp) + self._reclaim(conn, age_timestamp, sync_timestamp) + self._reclaim_metadata(conn, age_timestamp) conn.commit() - def _reclaim(self, conn, timestamp): + def _reclaim(self, conn, age_timestamp, sync_timestamp): + conn.execute(''' + DELETE FROM %s WHERE deleted = 1 AND %s < ? + ''' % (self.db_contains_type, self.db_reclaim_timestamp), + (age_timestamp,)) + try: + conn.execute(''' + DELETE FROM outgoing_sync WHERE updated_at < ? + ''', (sync_timestamp,)) + conn.execute(''' + DELETE FROM incoming_sync WHERE updated_at < ? + ''', (sync_timestamp,)) + except sqlite3.OperationalError as err: + # Old dbs didn't have updated_at in the _sync tables. + if 'no such column: updated_at' not in str(err): + raise + + def _reclaim_metadata(self, conn, timestamp): """ Removes any empty metadata values older than the timestamp using the given database connection. This function will not call commit on the diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index c464341b21..c27914bffb 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -33,10 +33,11 @@ from swift.common.direct_client import quote from swift.common.utils import get_logger, whataremyips, storage_directory, \ renamer, mkdirs, lock_parent_directory, config_true_value, \ unlink_older_than, dump_recon_cache, rsync_module_interpolation, \ - json, Timestamp, parse_override_options, round_robin_iter, Everything + json, parse_override_options, round_robin_iter, Everything from swift.common import ring from swift.common.ring.utils import is_local_device -from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE +from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE, \ + is_success from swift.common.bufferedhttp import BufferedHTTPConnection from swift.common.exceptions import DriveNotMounted from swift.common.daemon import Daemon @@ -87,11 +88,14 @@ def roundrobin_datadirs(datadirs): found (in their proper places). The partitions within each data dir are walked randomly, however. - :param datadirs: a list of (path, node_id, partition_filter) to walk - :returns: A generator of (partition, path_to_db_file, node_id) + :param datadirs: a list of tuples of (path, context, partition_filter) to + walk. The context may be any object; the context is not + used by this function but is included with each yielded + tuple. + :returns: A generator of (partition, path_to_db_file, context) """ - def walk_datadir(datadir, node_id, part_filter): + def walk_datadir(datadir, context, part_filter): partitions = [pd for pd in os.listdir(datadir) if looks_like_partition(pd) and part_filter(pd)] random.shuffle(partitions) @@ -117,7 +121,7 @@ def roundrobin_datadirs(datadirs): continue object_file = os.path.join(hash_dir, hsh + '.db') if os.path.exists(object_file): - yield (partition, object_file, node_id) + yield (partition, object_file, context) else: try: os.rmdir(hash_dir) @@ -125,8 +129,8 @@ def roundrobin_datadirs(datadirs): if e.errno != errno.ENOTEMPTY: raise - its = [walk_datadir(datadir, node_id, filt) - for datadir, node_id, filt in datadirs] + its = [walk_datadir(datadir, context, filt) + for datadir, context, filt in datadirs] rr_its = round_robin_iter(its) for datadir in rr_its: @@ -312,6 +316,16 @@ class Replicator(Daemon): response = http.replicate(replicate_method, local_id) return response and 200 <= response.status < 300 + def _send_merge_items(self, http, local_id, items): + with Timeout(self.node_timeout): + response = http.replicate('merge_items', items, local_id) + if not response or not is_success(response.status): + if response: + self.logger.error('ERROR Bad response %s from %s', + response.status, http.host) + return False + return True + def _usync_db(self, point, broker, http, remote_id, local_id): """ Sync a db by sending all records since the last sync. @@ -326,26 +340,28 @@ class Replicator(Daemon): """ self.stats['diff'] += 1 self.logger.increment('diffs') - self.logger.debug('Syncing chunks with %s, starting at %s', - http.host, point) + self.logger.debug('%s usyncing chunks to %s, starting at row %s', + broker.db_file, + '%(ip)s:%(port)s/%(device)s' % http.node, + point) + start = time.time() sync_table = broker.get_syncs() objects = broker.get_items_since(point, self.per_diff) diffs = 0 while len(objects) and diffs < self.max_diffs: diffs += 1 - with Timeout(self.node_timeout): - response = http.replicate('merge_items', objects, local_id) - if not response or response.status >= 300 or response.status < 200: - if response: - self.logger.error(_('ERROR Bad response %(status)s from ' - '%(host)s'), - {'status': response.status, - 'host': http.host}) + if not self._send_merge_items(http, local_id, objects): return False # replication relies on db order to send the next merge batch in # order with no gaps point = objects[-1]['ROWID'] objects = broker.get_items_since(point, self.per_diff) + + self.logger.debug('%s usyncing chunks to %s, finished at row %s (%gs)', + broker.db_file, + '%(ip)s:%(port)s/%(device)s' % http.node, + point, time.time() - start) + if objects: self.logger.debug( 'Synchronization for %s has fallen more than ' @@ -449,32 +465,79 @@ class Replicator(Daemon): if rinfo.get('metadata', ''): broker.update_metadata(json.loads(rinfo['metadata'])) if self._in_sync(rinfo, info, broker, local_sync): + self.logger.debug('%s in sync with %s, nothing to do', + broker.db_file, + '%(ip)s:%(port)s/%(device)s' % node) return True - # if the difference in rowids between the two differs by - # more than 50% and the difference is greater than per_diff, - # rsync then do a remote merge. - # NOTE: difference > per_diff stops us from dropping to rsync - # on smaller containers, who have only a few rows to sync. - if rinfo['max_row'] / float(info['max_row']) < 0.5 and \ - info['max_row'] - rinfo['max_row'] > self.per_diff: - self.stats['remote_merge'] += 1 - self.logger.increment('remote_merges') - return self._rsync_db(broker, node, http, info['id'], - replicate_method='rsync_then_merge', - replicate_timeout=(info['count'] / 2000), - different_region=different_region) - # else send diffs over to the remote server - return self._usync_db(max(rinfo['point'], local_sync), - broker, http, rinfo['id'], info['id']) + return self._choose_replication_mode( + node, rinfo, info, local_sync, broker, http, + different_region) + return False + + def _choose_replication_mode(self, node, rinfo, info, local_sync, broker, + http, different_region): + # if the difference in rowids between the two differs by + # more than 50% and the difference is greater than per_diff, + # rsync then do a remote merge. + # NOTE: difference > per_diff stops us from dropping to rsync + # on smaller containers, who have only a few rows to sync. + if (rinfo['max_row'] / float(info['max_row']) < 0.5 and + info['max_row'] - rinfo['max_row'] > self.per_diff): + self.stats['remote_merge'] += 1 + self.logger.increment('remote_merges') + return self._rsync_db(broker, node, http, info['id'], + replicate_method='rsync_then_merge', + replicate_timeout=(info['count'] / 2000), + different_region=different_region) + # else send diffs over to the remote server + return self._usync_db(max(rinfo['point'], local_sync), + broker, http, rinfo['id'], info['id']) def _post_replicate_hook(self, broker, info, responses): """ - :param broker: the container that just replicated + :param broker: broker instance for the database that just replicated :param info: pre-replication full info dict :param responses: a list of bools indicating success from nodes """ pass + def cleanup_post_replicate(self, broker, orig_info, responses): + """ + Cleanup non primary database from disk if needed. + + :param broker: the broker for the database we're replicating + :param orig_info: snapshot of the broker replication info dict taken + before replication + :param responses: a list of boolean success values for each replication + request to other nodes + + :return success: returns False if deletion of the database was + attempted but unsuccessful, otherwise returns True. + """ + log_template = 'Not deleting db %s (%%s)' % broker.db_file + max_row_delta = broker.get_max_row() - orig_info['max_row'] + if max_row_delta < 0: + reason = 'negative max_row_delta: %s' % max_row_delta + self.logger.error(log_template, reason) + return True + if max_row_delta: + reason = '%s new rows' % max_row_delta + self.logger.debug(log_template, reason) + return True + if not (responses and all(responses)): + reason = '%s/%s success' % (responses.count(True), len(responses)) + self.logger.debug(log_template, reason) + return True + # If the db has been successfully synced to all of its peers, it can be + # removed. Callers should have already checked that the db is not on a + # primary node. + if not self.delete_db(broker): + self.logger.debug( + 'Failed to delete db %s', broker.db_file) + return False + self.logger.debug('Successfully deleted db %s', broker.db_file) + return True + def _replicate_object(self, partition, object_file, node_id): """ Replicate the db, choosing method based on whether or not it @@ -483,12 +546,20 @@ class Replicator(Daemon): :param partition: partition to be replicated to :param object_file: DB file name to be replicated :param node_id: node id of the node to be replicated to + :returns: a tuple (success, responses). ``success`` is a boolean that + is True if the method completed successfully, False otherwise. + ``responses`` is a list of booleans each of which indicates the + success or not of replicating to a peer node if replication has + been attempted. ``success`` is False if any of ``responses`` is + False; when ``responses`` is empty, ``success`` may be either True + or False. """ start_time = now = time.time() self.logger.debug('Replicating db %s', object_file) self.stats['attempted'] += 1 self.logger.increment('attempts') shouldbehere = True + responses = [] try: broker = self.brokerclass(object_file, pending_timeout=30) broker.reclaim(now - self.reclaim_age, @@ -518,18 +589,12 @@ class Replicator(Daemon): failure_dev['device']) for failure_dev in nodes]) self.logger.increment('failures') - return - # The db is considered deleted if the delete_timestamp value is greater - # than the put_timestamp, and there are no objects. - delete_timestamp = Timestamp(info.get('delete_timestamp') or 0) - put_timestamp = Timestamp(info.get('put_timestamp') or 0) - if (now - self.reclaim_age) > delete_timestamp > put_timestamp and \ - info['count'] in (None, '', 0, '0'): + return False, responses + if broker.is_reclaimable(now, self.reclaim_age): if self.report_up_to_date(info): self.delete_db(broker) self.logger.timing_since('timing', start_time) - return - responses = [] + return True, responses failure_devs_info = set() nodes = self.ring.get_part_nodes(int(partition)) local_dev = None @@ -587,14 +652,11 @@ class Replicator(Daemon): except (Exception, Timeout): self.logger.exception('UNHANDLED EXCEPTION: in post replicate ' 'hook for %s', broker.db_file) - if not shouldbehere and responses and all(responses): - # If the db shouldn't be on this node and has been successfully - # synced to all of its peers, it can be removed. - if not self.delete_db(broker): + if not shouldbehere: + if not self.cleanup_post_replicate(broker, info, responses): failure_devs_info.update( [(failure_dev['replication_ip'], failure_dev['device']) for failure_dev in repl_nodes]) - target_devs_info = set([(target_dev['replication_ip'], target_dev['device']) for target_dev in repl_nodes]) @@ -602,6 +664,9 @@ class Replicator(Daemon): self._add_failure_stats(failure_devs_info) self.logger.timing_since('timing', start_time) + if shouldbehere: + responses.append(True) + return all(responses), responses def delete_db(self, broker): object_file = broker.db_file @@ -746,6 +811,9 @@ class ReplicatorRpc(object): self.mount_check = mount_check self.logger = logger or get_logger({}, log_route='replicator-rpc') + def _db_file_exists(self, db_path): + return os.path.exists(db_path) + def dispatch(self, replicate_args, args): if not hasattr(args, 'pop'): return HTTPBadRequest(body='Invalid object type') @@ -764,7 +832,7 @@ class ReplicatorRpc(object): # someone might be about to rsync a db to us, # make sure there's a tmp dir to receive it. mkdirs(os.path.join(self.root, drive, 'tmp')) - if not os.path.exists(db_file): + if not self._db_file_exists(db_file): return HTTPNotFound() return getattr(self, op)(self.broker_class(db_file), args) @@ -872,12 +940,17 @@ class ReplicatorRpc(object): renamer(old_filename, db_file) return HTTPNoContent() + def _abort_rsync_then_merge(self, db_file, tmp_filename): + return not (self._db_file_exists(db_file) and + os.path.exists(tmp_filename)) + def rsync_then_merge(self, drive, db_file, args): - old_filename = os.path.join(self.root, drive, 'tmp', args[0]) - if not os.path.exists(db_file) or not os.path.exists(old_filename): + tmp_filename = os.path.join(self.root, drive, 'tmp', args[0]) + if self._abort_rsync_then_merge(db_file, tmp_filename): return HTTPNotFound() - new_broker = self.broker_class(old_filename) + new_broker = self.broker_class(tmp_filename) existing_broker = self.broker_class(db_file) + db_file = existing_broker.db_file point = -1 objects = existing_broker.get_items_since(point, 1000) while len(objects): @@ -885,9 +958,12 @@ class ReplicatorRpc(object): point = objects[-1]['ROWID'] objects = existing_broker.get_items_since(point, 1000) sleep() + new_broker.merge_syncs(existing_broker.get_syncs()) new_broker.newid(args[0]) new_broker.update_metadata(existing_broker.metadata) - renamer(old_filename, db_file) + if self._abort_rsync_then_merge(db_file, tmp_filename): + return HTTPNotFound() + renamer(tmp_filename, db_file) return HTTPNoContent() # Footnote [1]: diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index 752e8767aa..2a9409d92e 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -45,6 +45,9 @@ from swift.common.utils import capture_stdio, disable_fallocate, \ validate_configuration, get_hub, config_auto_int_value, \ reiterate +SIGNUM_TO_NAME = {getattr(signal, n): n for n in dir(signal) + if n.startswith('SIG') and '_' not in n} + # Set maximum line size of message headers to be accepted. wsgi.MAX_HEADER_LINE = constraints.MAX_HEADER_SIZE @@ -559,7 +562,8 @@ class WorkersStrategy(object): :param int pid: The new worker process' PID """ - self.logger.notice('Started child %s' % pid) + self.logger.notice('Started child %s from parent %s', + pid, os.getpid()) self.children.append(pid) def register_worker_exit(self, pid): @@ -569,7 +573,8 @@ class WorkersStrategy(object): :param int pid: The PID of the worker that exited. """ - self.logger.error('Removing dead child %s' % pid) + self.logger.error('Removing dead child %s from parent %s', + pid, os.getpid()) self.children.remove(pid) def shutdown_sockets(self): @@ -935,24 +940,17 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): run_server(conf, logger, no_fork_sock, global_conf=global_conf) return 0 - def kill_children(*args): - """Kills the entire process group.""" - logger.error('SIGTERM received') - signal.signal(signal.SIGTERM, signal.SIG_IGN) - running[0] = False - os.killpg(0, signal.SIGTERM) + def stop_with_signal(signum, *args): + """Set running flag to False and capture the signum""" + running_context[0] = False + running_context[1] = signum - def hup(*args): - """Shuts down the server, but allows running requests to complete""" - logger.error('SIGHUP received') - signal.signal(signal.SIGHUP, signal.SIG_IGN) - running[0] = False + # context to hold boolean running state and stop signum + running_context = [True, None] + signal.signal(signal.SIGTERM, stop_with_signal) + signal.signal(signal.SIGHUP, stop_with_signal) - running = [True] - signal.signal(signal.SIGTERM, kill_children) - signal.signal(signal.SIGHUP, hup) - - while running[0]: + while running_context[0]: for sock, sock_info in strategy.new_worker_socks(): pid = os.fork() if pid == 0: @@ -992,11 +990,23 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): sleep(0.01) except KeyboardInterrupt: logger.notice('User quit') - running[0] = False + running_context[0] = False break + if running_context[1] is not None: + try: + signame = SIGNUM_TO_NAME[running_context[1]] + except KeyError: + logger.error('Stopping with unexpected signal %r' % + running_context[1]) + else: + logger.error('%s received', signame) + if running_context[1] == signal.SIGTERM: + os.killpg(0, signal.SIGTERM) + strategy.shutdown_sockets() - logger.notice('Exited') + signal.signal(signal.SIGTERM, signal.SIG_IGN) + logger.notice('Exited (%s)', os.getpid()) return 0 diff --git a/swift/container/backend.py b/swift/container/backend.py index bab618286a..c61c633739 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -15,7 +15,6 @@ """ Pluggable Back-ends for Container Server """ - import os from uuid import uuid4 @@ -25,9 +24,9 @@ from six.moves import range import sqlite3 from swift.common.utils import Timestamp, encode_timestamps, \ - decode_timestamps, extract_swift_bytes -from swift.common.db import DatabaseBroker, utf8encode - + decode_timestamps, extract_swift_bytes, storage_directory, hash_path +from swift.common.db import DatabaseBroker, utf8encode, \ + zero_like, DatabaseAlreadyExists SQLITE_ARG_LIMIT = 999 @@ -227,6 +226,35 @@ class ContainerBroker(DatabaseBroker): db_contains_type = 'object' db_reclaim_timestamp = 'created_at' + @classmethod + def create_broker(self, device_path, part, account, container, logger=None, + put_timestamp=None, storage_policy_index=None): + """ + Create a ContainerBroker instance. If the db doesn't exist, initialize + the db file. + + :param device_path: device path + :param part: partition number + :param account: account name string + :param container: container name string + :param logger: a logger instance + :param put_timestamp: initial timestamp if broker needs to be + initialized + :param storage_policy_index: the storage policy index + :return: a :class:`swift.container.backend.ContainerBroker` instance + """ + hsh = hash_path(account, container) + db_dir = storage_directory(DATADIR, part, hsh) + db_path = os.path.join(device_path, db_dir, hsh + '.db') + broker = ContainerBroker(db_path, account=account, container=container, + logger=logger) + if not os.path.exists(broker.db_file): + try: + broker.initialize(put_timestamp, storage_policy_index) + except DatabaseAlreadyExists: + pass + return broker + @property def storage_policy_index(self): if not hasattr(self, '_storage_policy_index'): @@ -401,7 +429,7 @@ class ContainerBroker(DatabaseBroker): raise row = conn.execute( 'SELECT object_count from container_stat').fetchone() - return (row[0] == 0) + return zero_like(row[0]) def delete_object(self, name, timestamp, storage_policy_index=0): """ @@ -457,7 +485,7 @@ class ContainerBroker(DatabaseBroker): # The container is considered deleted if the delete_timestamp # value is greater than the put_timestamp, and there are no # objects in the container. - return (object_count in (None, '', 0, '0')) and ( + return zero_like(object_count) and ( Timestamp(delete_timestamp) > Timestamp(put_timestamp)) def _is_deleted(self, conn): @@ -473,6 +501,17 @@ class ContainerBroker(DatabaseBroker): FROM container_stat''').fetchone() return self._is_deleted_info(**info) + def is_reclaimable(self, now, reclaim_age): + with self.get() as conn: + info = conn.execute(''' + SELECT put_timestamp, delete_timestamp + FROM container_stat''').fetchone() + if (Timestamp(now - reclaim_age) > + Timestamp(info['delete_timestamp']) > + Timestamp(info['put_timestamp'])): + return self.empty() + return False + def get_info_is_deleted(self): """ Get the is_deleted status and info for the container. diff --git a/swift/container/replicator.py b/swift/container/replicator.py index 41c048716d..9f3fdb53c7 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -28,9 +28,7 @@ from swift.common import db_replicator from swift.common.storage_policy import POLICIES from swift.common.exceptions import DeviceUnavailable from swift.common.http import is_success -from swift.common.db import DatabaseAlreadyExists -from swift.common.utils import (Timestamp, hash_path, - storage_directory, majority_size) +from swift.common.utils import Timestamp, majority_size class ContainerReplicator(db_replicator.Replicator): @@ -39,6 +37,10 @@ class ContainerReplicator(db_replicator.Replicator): datadir = DATADIR default_port = 6201 + def __init__(self, conf, logger=None): + super(ContainerReplicator, self).__init__(conf, logger=logger) + self.reconciler_cleanups = self.sync_store = None + def report_up_to_date(self, full_info): reported_key_map = { 'reported_put_timestamp': 'put_timestamp', @@ -61,8 +63,7 @@ class ContainerReplicator(db_replicator.Replicator): return sync_args def _handle_sync_response(self, node, response, info, broker, http, - different_region): - parent = super(ContainerReplicator, self) + different_region=False): if is_success(response.status): remote_info = json.loads(response.data) if incorrect_policy_index(info, remote_info): @@ -75,9 +76,8 @@ class ContainerReplicator(db_replicator.Replicator): if any(info[key] != remote_info[key] for key in sync_timestamps): broker.merge_timestamps(*(remote_info[key] for key in sync_timestamps)) - rv = parent._handle_sync_response( + return super(ContainerReplicator, self)._handle_sync_response( node, response, info, broker, http, different_region) - return rv def find_local_handoff_for_part(self, part): """ @@ -114,15 +114,10 @@ class ContainerReplicator(db_replicator.Replicator): raise DeviceUnavailable( 'No mounted devices found suitable to Handoff reconciler ' 'container %s in partition %s' % (container, part)) - hsh = hash_path(account, container) - db_dir = storage_directory(DATADIR, part, hsh) - db_path = os.path.join(self.root, node['device'], db_dir, hsh + '.db') - broker = ContainerBroker(db_path, account=account, container=container) - if not os.path.exists(broker.db_file): - try: - broker.initialize(timestamp, 0) - except DatabaseAlreadyExists: - pass + broker = ContainerBroker.create_broker( + os.path.join(self.root, node['device']), part, account, container, + logger=self.logger, put_timestamp=timestamp, + storage_policy_index=0) if self.reconciler_containers is not None: self.reconciler_containers[container] = part, broker, node['id'] return broker @@ -217,12 +212,13 @@ class ContainerReplicator(db_replicator.Replicator): # this container shouldn't be here, make sure it's cleaned up self.reconciler_cleanups[broker.container] = broker return - try: - # DB is going to get deleted. Be preemptive about it - self.sync_store.remove_synced_container(broker) - except Exception: - self.logger.exception('Failed to remove sync_store entry %s' % - broker.db_file) + if self.sync_store: + try: + # DB is going to get deleted. Be preemptive about it + self.sync_store.remove_synced_container(broker) + except Exception: + self.logger.exception('Failed to remove sync_store entry %s' % + broker.db_file) return super(ContainerReplicator, self).delete_db(broker) diff --git a/swift/container/server.py b/swift/container/server.py index c7df07ac8e..a3c233b664 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -343,6 +343,40 @@ class ContainerController(BaseStorageServer): broker.update_status_changed_at(timestamp) return recreated + def _maybe_autocreate(self, broker, req_timestamp, account, + policy_index): + created = False + if account.startswith(self.auto_create_account_prefix) and \ + not os.path.exists(broker.db_file): + if policy_index is None: + raise HTTPBadRequest( + 'X-Backend-Storage-Policy-Index header is required') + try: + broker.initialize(req_timestamp.internal, policy_index) + except DatabaseAlreadyExists: + pass + else: + created = True + if not os.path.exists(broker.db_file): + raise HTTPNotFound() + return created + + def _update_metadata(self, req, broker, req_timestamp, method): + metadata = {} + metadata.update( + (key, (value, req_timestamp.internal)) + for key, value in req.headers.items() + if key.lower() in self.save_headers or + is_sys_or_user_meta('container', key)) + if metadata: + if 'X-Container-Sync-To' in metadata: + if 'X-Container-Sync-To' not in broker.metadata or \ + metadata['X-Container-Sync-To'][0] != \ + broker.metadata['X-Container-Sync-To'][0]: + broker.set_x_container_sync_points(-1, -1) + broker.update_metadata(metadata, validate_metadata=True) + self._update_sync_store(broker, method) + @public @timing_stats() def PUT(self, req): @@ -364,14 +398,8 @@ class ContainerController(BaseStorageServer): # obj put expects the policy_index header, default is for # legacy support during upgrade. obj_policy_index = requested_policy_index or 0 - if account.startswith(self.auto_create_account_prefix) and \ - not os.path.exists(broker.db_file): - try: - broker.initialize(req_timestamp.internal, obj_policy_index) - except DatabaseAlreadyExists: - pass - if not os.path.exists(broker.db_file): - return HTTPNotFound() + self._maybe_autocreate(broker, req_timestamp, account, + obj_policy_index) broker.put_object(obj, req_timestamp.internal, int(req.headers['x-size']), req.headers['x-content-type'], @@ -391,20 +419,7 @@ class ContainerController(BaseStorageServer): req_timestamp.internal, new_container_policy, requested_policy_index) - metadata = {} - metadata.update( - (key, (value, req_timestamp.internal)) - for key, value in req.headers.items() - if key.lower() in self.save_headers or - is_sys_or_user_meta('container', key)) - if 'X-Container-Sync-To' in metadata: - if 'X-Container-Sync-To' not in broker.metadata or \ - metadata['X-Container-Sync-To'][0] != \ - broker.metadata['X-Container-Sync-To'][0]: - broker.set_x_container_sync_points(-1, -1) - broker.update_metadata(metadata, validate_metadata=True) - if metadata: - self._update_sync_store(broker, 'PUT') + self._update_metadata(req, broker, req_timestamp, 'PUT') resp = self.account_update(req, account, container, broker) if resp: return resp @@ -562,20 +577,7 @@ class ContainerController(BaseStorageServer): if broker.is_deleted(): return HTTPNotFound(request=req) broker.update_put_timestamp(req_timestamp.internal) - metadata = {} - metadata.update( - (key, (value, req_timestamp.internal)) - for key, value in req.headers.items() - if key.lower() in self.save_headers or - is_sys_or_user_meta('container', key)) - if metadata: - if 'X-Container-Sync-To' in metadata: - if 'X-Container-Sync-To' not in broker.metadata or \ - metadata['X-Container-Sync-To'][0] != \ - broker.metadata['X-Container-Sync-To'][0]: - broker.set_x_container_sync_points(-1, -1) - broker.update_metadata(metadata, validate_metadata=True) - self._update_sync_store(broker, 'POST') + self._update_metadata(req, broker, req_timestamp, 'POST') return HTTPNoContent(request=req) def __call__(self, env, start_response): diff --git a/test/__init__.py b/test/__init__.py index 1a56597158..51e3aa9d82 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -17,7 +17,11 @@ # The code below enables nosetests to work with i18n _() blocks from __future__ import print_function import sys +from contextlib import contextmanager + import os +from six import reraise + try: from unittest.util import safe_repr except ImportError: @@ -86,3 +90,26 @@ def listen_zero(): sock.bind(("127.0.0.1", 0)) sock.listen(50) return sock + + +@contextmanager +def annotate_failure(msg): + """ + Catch AssertionError and annotate it with a message. Useful when making + assertions in a loop where the message can indicate the loop index or + richer context about the failure. + + :param msg: A message to be prefixed to the AssertionError message. + """ + try: + yield + except AssertionError as err: + err_typ, err_val, err_tb = sys.exc_info() + if err_val.args: + msg = '%s Failed with %s' % (msg, err_val.args[0]) + err_val.args = (msg, ) + err_val.args[1:] + else: + # workaround for some IDE's raising custom AssertionErrors + err_val = '%s Failed with %s' % (msg, err) + err_typ = AssertionError + reraise(err_typ, err_val, err_tb) diff --git a/test/probe/brain.py b/test/probe/brain.py index 843754210e..fd597cf6b3 100644 --- a/test/probe/brain.py +++ b/test/probe/brain.py @@ -99,9 +99,11 @@ class BrainSplitter(object): raise ValueError('Unknown server_type: %r' % server_type) self.server_type = server_type - part, nodes = self.ring.get_nodes(self.account, c, o) + self.part, self.nodes = self.ring.get_nodes(self.account, c, o) + + node_ids = [n['id'] for n in self.nodes] + self.node_numbers = [n + 1 for n in node_ids] - node_ids = [n['id'] for n in nodes] if all(n_id in node_ids for n_id in (0, 1)): self.primary_numbers = (1, 2) self.handoff_numbers = (3, 4) diff --git a/test/probe/common.py b/test/probe/common.py index ccb5751f26..5622d71b64 100644 --- a/test/probe/common.py +++ b/test/probe/common.py @@ -14,6 +14,8 @@ # limitations under the License. from __future__ import print_function + +import errno import os from subprocess import Popen, PIPE import sys @@ -125,13 +127,17 @@ def kill_server(ipport, ipport2server): if err: raise Exception('unable to kill %s' % (server if not number else '%s%s' % (server, number))) + return wait_for_server_to_hangup(ipport) + + +def wait_for_server_to_hangup(ipport): try_until = time() + 30 while True: try: conn = HTTPConnection(*ipport) conn.request('GET', '/') conn.getresponse() - except Exception as err: + except Exception: break if time() > try_until: raise Exception( @@ -334,33 +340,35 @@ class ProbeTest(unittest.TestCase): Don't instantiate this directly, use a child class instead. """ + def _load_rings_and_configs(self): + self.ipport2server = {} + self.configs = defaultdict(dict) + self.account_ring = get_ring( + 'account', + self.acct_cont_required_replicas, + self.acct_cont_required_devices, + ipport2server=self.ipport2server, + config_paths=self.configs) + self.container_ring = get_ring( + 'container', + self.acct_cont_required_replicas, + self.acct_cont_required_devices, + ipport2server=self.ipport2server, + config_paths=self.configs) + self.policy = get_policy(**self.policy_requirements) + self.object_ring = get_ring( + self.policy.ring_name, + self.obj_required_replicas, + self.obj_required_devices, + server='object', + ipport2server=self.ipport2server, + config_paths=self.configs) + def setUp(self): resetswift() kill_orphans() + self._load_rings_and_configs() try: - self.ipport2server = {} - self.configs = defaultdict(dict) - self.account_ring = get_ring( - 'account', - self.acct_cont_required_replicas, - self.acct_cont_required_devices, - ipport2server=self.ipport2server, - config_paths=self.configs) - self.container_ring = get_ring( - 'container', - self.acct_cont_required_replicas, - self.acct_cont_required_devices, - ipport2server=self.ipport2server, - config_paths=self.configs) - self.policy = get_policy(**self.policy_requirements) - self.object_ring = get_ring( - self.policy.ring_name, - self.obj_required_replicas, - self.obj_required_devices, - server='object', - ipport2server=self.ipport2server, - config_paths=self.configs) - self.servers_per_port = any( int(readconf(c, section_name='object-replicator').get( 'servers_per_port', '0')) @@ -489,6 +497,49 @@ class ProbeTest(unittest.TestCase): finally: shutil.rmtree(tempdir) + def get_all_object_nodes(self): + """ + Returns a list of all nodes in all object storage policies. + + :return: a list of node dicts. + """ + all_obj_nodes = {} + for policy in ENABLED_POLICIES: + for dev in policy.object_ring.devs: + all_obj_nodes[dev['device']] = dev + return all_obj_nodes.values() + + def gather_async_pendings(self, onodes): + """ + Returns a list of paths to async pending files found on given nodes. + + :param onodes: a list of nodes. + :return: a list of file paths. + """ + async_pendings = [] + for onode in onodes: + device_dir = self.device_dir('', onode) + for ap_pol_dir in os.listdir(device_dir): + if not ap_pol_dir.startswith('async_pending'): + # skip 'objects', 'containers', etc. + continue + async_pending_dir = os.path.join(device_dir, ap_pol_dir) + try: + ap_dirs = os.listdir(async_pending_dir) + except OSError as err: + if err.errno == errno.ENOENT: + pass + else: + raise + else: + for ap_dir in ap_dirs: + ap_dir_fullpath = os.path.join( + async_pending_dir, ap_dir) + async_pendings.extend([ + os.path.join(ap_dir_fullpath, ent) + for ent in os.listdir(ap_dir_fullpath)]) + return async_pendings + class ReplProbeTest(ProbeTest): diff --git a/test/probe/test_object_expirer.py b/test/probe/test_object_expirer.py index 92642f19d6..ad31662730 100644 --- a/test/probe/test_object_expirer.py +++ b/test/probe/test_object_expirer.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import errno -import os import random import time import uuid @@ -143,31 +141,6 @@ class TestObjectExpirer(ReplProbeTest): # tha the object server does not write out any async pendings; this # test asserts that this is the case. - def gather_async_pendings(onodes): - async_pendings = [] - for onode in onodes: - device_dir = self.device_dir('', onode) - for ap_pol_dir in os.listdir(device_dir): - if not ap_pol_dir.startswith('async_pending'): - # skip 'objects', 'containers', etc. - continue - async_pending_dir = os.path.join(device_dir, ap_pol_dir) - try: - ap_dirs = os.listdir(async_pending_dir) - except OSError as err: - if err.errno == errno.ENOENT: - pass - else: - raise - else: - for ap_dir in ap_dirs: - ap_dir_fullpath = os.path.join( - async_pending_dir, ap_dir) - async_pendings.extend([ - os.path.join(ap_dir_fullpath, ent) - for ent in os.listdir(ap_dir_fullpath)]) - return async_pendings - # Make an expiring object in each policy for policy in ENABLED_POLICIES: container_name = "expirer-test-%d" % policy.idx @@ -191,15 +164,12 @@ class TestObjectExpirer(ReplProbeTest): # Make sure there's no async_pendings anywhere. Probe tests only run # on single-node installs anyway, so this set should be small enough # that an exhaustive check doesn't take too long. - all_obj_nodes = {} - for policy in ENABLED_POLICIES: - for dev in policy.object_ring.devs: - all_obj_nodes[dev['device']] = dev - pendings_before = gather_async_pendings(all_obj_nodes.values()) + all_obj_nodes = self.get_all_object_nodes() + pendings_before = self.gather_async_pendings(all_obj_nodes) # expire the objects Manager(['object-expirer']).once() - pendings_after = gather_async_pendings(all_obj_nodes.values()) + pendings_after = self.gather_async_pendings(all_obj_nodes) self.assertEqual(pendings_after, pendings_before) def test_expirer_object_should_not_be_expired(self): diff --git a/test/unit/__init__.py b/test/unit/__init__.py index a07b1b2879..2e611806a4 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -751,6 +751,8 @@ class FakeStatus(object): :param response_sleep: float, time to eventlet sleep during response """ # connect exception + if inspect.isclass(status) and issubclass(status, Exception): + raise status('FakeStatus Error') if isinstance(status, (Exception, eventlet.Timeout)): raise status if isinstance(status, tuple): @@ -1063,6 +1065,15 @@ def make_timestamp_iter(offset=0): for t in itertools.count(int(time.time()) + offset)) +@contextmanager +def mock_timestamp_now(now=None): + if now is None: + now = Timestamp.now() + with mocklib.patch('swift.common.utils.Timestamp.now', + classmethod(lambda c: now)): + yield now + + class Timeout(object): def __init__(self, seconds): self.seconds = seconds @@ -1323,3 +1334,12 @@ def skip_if_no_xattrs(): if not xattr_supported_check(): raise SkipTest('Large xattrs not supported in `%s`. Skipping test' % gettempdir()) + + +def unlink_files(paths): + for path in paths: + try: + os.unlink(path) + except OSError as err: + if err.errno != errno.ENOENT: + raise diff --git a/test/unit/account/test_server.py b/test/unit/account/test_server.py index 2c00773441..4a8f58cb05 100644 --- a/test/unit/account/test_server.py +++ b/test/unit/account/test_server.py @@ -404,7 +404,7 @@ class TestAccountController(unittest.TestCase): elif state[0] == 'race': # Save the original db_file attribute value self._saved_db_file = self.db_file - self.db_file += '.doesnotexist' + self._db_file += '.doesnotexist' def initialize(self, *args, **kwargs): if state[0] == 'initial': @@ -413,7 +413,7 @@ class TestAccountController(unittest.TestCase): elif state[0] == 'race': # Restore the original db_file attribute to get the race # behavior - self.db_file = self._saved_db_file + self._db_file = self._saved_db_file return super(InterceptedAcBr, self).initialize(*args, **kwargs) with mock.patch("swift.account.server.AccountBroker", InterceptedAcBr): diff --git a/test/unit/common/test_db.py b/test/unit/common/test_db.py index f605d0acba..6f723e13a7 100644 --- a/test/unit/common/test_db.py +++ b/test/unit/common/test_db.py @@ -38,7 +38,7 @@ from swift.common.constraints import \ MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE from swift.common.db import chexor, dict_factory, get_db_connection, \ DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \ - GreenDBConnection, PICKLE_PROTOCOL + GreenDBConnection, PICKLE_PROTOCOL, zero_like from swift.common.utils import normalize_timestamp, mkdirs, Timestamp from swift.common.exceptions import LockTimeout from swift.common.swob import HTTPException @@ -46,6 +46,30 @@ from swift.common.swob import HTTPException from test.unit import with_tempdir +class TestHelperFunctions(unittest.TestCase): + + def test_zero_like(self): + expectations = { + # value => expected + None: True, + True: False, + '': True, + 'asdf': False, + 0: True, + 1: False, + '0': True, + '1': False, + } + errors = [] + for value, expected in expectations.items(): + rv = zero_like(value) + if rv != expected: + errors.append('zero_like(%r) => %r expected %r' % ( + value, rv, expected)) + if errors: + self.fail('Some unexpected return values:\n' + '\n'.join(errors)) + + class TestDatabaseConnectionError(unittest.TestCase): def test_str(self): @@ -989,6 +1013,19 @@ class TestDatabaseBroker(unittest.TestCase): self.assertEqual(broker.get_sync(uuid3), 2) broker.merge_syncs([{'sync_point': 5, 'remote_id': uuid2}]) self.assertEqual(broker.get_sync(uuid2), 5) + # max sync point sticks + broker.merge_syncs([{'sync_point': 5, 'remote_id': uuid2}]) + self.assertEqual(broker.get_sync(uuid2), 5) + self.assertEqual(broker.get_sync(uuid3), 2) + broker.merge_syncs([{'sync_point': 4, 'remote_id': uuid2}]) + self.assertEqual(broker.get_sync(uuid2), 5) + self.assertEqual(broker.get_sync(uuid3), 2) + broker.merge_syncs([{'sync_point': -1, 'remote_id': uuid2}, + {'sync_point': 3, 'remote_id': uuid3}]) + self.assertEqual(broker.get_sync(uuid2), 5) + self.assertEqual(broker.get_sync(uuid3), 3) + self.assertEqual(broker.get_sync(uuid2, incoming=False), 3) + self.assertEqual(broker.get_sync(uuid3, incoming=False), 4) def test_get_replication_info(self): self.get_replication_info_tester(metadata=False) @@ -1089,11 +1126,9 @@ class TestDatabaseBroker(unittest.TestCase): 'max_row': 1, 'id': broker_uuid, 'metadata': broker_metadata}) return broker - def test_metadata(self): - def reclaim(broker, timestamp): - with broker.get() as conn: - broker._reclaim(conn, timestamp) - conn.commit() + # only testing _reclaim_metadata here + @patch.object(DatabaseBroker, '_reclaim') + def test_metadata(self, mock_reclaim): # Initializes a good broker for us broker = self.get_replication_info_tester(metadata=True) # Add our first item @@ -1134,7 +1169,7 @@ class TestDatabaseBroker(unittest.TestCase): self.assertEqual(broker.metadata['Second'], [second_value, second_timestamp]) # Reclaim at point before second item was deleted - reclaim(broker, normalize_timestamp(3)) + broker.reclaim(normalize_timestamp(3), normalize_timestamp(3)) self.assertIn('First', broker.metadata) self.assertEqual(broker.metadata['First'], [first_value, first_timestamp]) @@ -1142,7 +1177,7 @@ class TestDatabaseBroker(unittest.TestCase): self.assertEqual(broker.metadata['Second'], [second_value, second_timestamp]) # Reclaim at point second item was deleted - reclaim(broker, normalize_timestamp(4)) + broker.reclaim(normalize_timestamp(4), normalize_timestamp(4)) self.assertIn('First', broker.metadata) self.assertEqual(broker.metadata['First'], [first_value, first_timestamp]) @@ -1150,11 +1185,18 @@ class TestDatabaseBroker(unittest.TestCase): self.assertEqual(broker.metadata['Second'], [second_value, second_timestamp]) # Reclaim after point second item was deleted - reclaim(broker, normalize_timestamp(5)) + broker.reclaim(normalize_timestamp(5), normalize_timestamp(5)) self.assertIn('First', broker.metadata) self.assertEqual(broker.metadata['First'], [first_value, first_timestamp]) self.assertNotIn('Second', broker.metadata) + # Delete first item (by setting to empty string) + first_timestamp = normalize_timestamp(6) + broker.update_metadata({'First': ['', first_timestamp]}) + self.assertIn('First', broker.metadata) + # Check that sync_timestamp doesn't cause item to be reclaimed + broker.reclaim(normalize_timestamp(5), normalize_timestamp(99)) + self.assertIn('First', broker.metadata) def test_update_metadata_missing_container_info(self): # Test missing container_info/container_stat row @@ -1197,7 +1239,7 @@ class TestDatabaseBroker(unittest.TestCase): exc = None try: with broker.get() as conn: - broker._reclaim(conn, 0) + broker._reclaim_metadata(conn, 0) except Exception as err: exc = err self.assertEqual( @@ -1333,5 +1375,141 @@ class TestDatabaseBroker(unittest.TestCase): else: self.fail('Expected an exception to be raised') + def test_skip_commits(self): + broker = DatabaseBroker(':memory:') + self.assertTrue(broker._skip_commit_puts()) + broker._initialize = MagicMock() + broker.initialize(Timestamp.now()) + self.assertTrue(broker._skip_commit_puts()) + + # not initialized + db_file = os.path.join(self.testdir, '1.db') + broker = DatabaseBroker(db_file) + self.assertFalse(os.path.exists(broker.db_file)) # sanity check + self.assertTrue(broker._skip_commit_puts()) + + # no pending file + broker._initialize = MagicMock() + broker.initialize(Timestamp.now()) + self.assertTrue(os.path.exists(broker.db_file)) # sanity check + self.assertFalse(os.path.exists(broker.pending_file)) # sanity check + self.assertTrue(broker._skip_commit_puts()) + + # pending file exists + with open(broker.pending_file, 'wb'): + pass + self.assertTrue(os.path.exists(broker.pending_file)) # sanity check + self.assertFalse(broker._skip_commit_puts()) + + # skip_commits is True + broker.skip_commits = True + self.assertTrue(broker._skip_commit_puts()) + + # re-init + broker = DatabaseBroker(db_file) + self.assertFalse(broker._skip_commit_puts()) + + # constructor can override + broker = DatabaseBroker(db_file, skip_commits=True) + self.assertTrue(broker._skip_commit_puts()) + + def test_commit_puts(self): + db_file = os.path.join(self.testdir, '1.db') + broker = DatabaseBroker(db_file) + broker._initialize = MagicMock() + broker.initialize(Timestamp.now()) + with open(broker.pending_file, 'wb'): + pass + + # merge given list + with patch.object(broker, 'merge_items') as mock_merge_items: + broker._commit_puts(['test']) + mock_merge_items.assert_called_once_with(['test']) + + # load file and merge + with open(broker.pending_file, 'wb') as fd: + fd.write(':1:2:99') + with patch.object(broker, 'merge_items') as mock_merge_items: + broker._commit_puts_load = lambda l, e: l.append(e) + broker._commit_puts() + mock_merge_items.assert_called_once_with(['1', '2', '99']) + self.assertEqual(0, os.path.getsize(broker.pending_file)) + + # load file and merge with given list + with open(broker.pending_file, 'wb') as fd: + fd.write(':bad') + with patch.object(broker, 'merge_items') as mock_merge_items: + broker._commit_puts_load = lambda l, e: l.append(e) + broker._commit_puts(['not']) + mock_merge_items.assert_called_once_with(['not', 'bad']) + self.assertEqual(0, os.path.getsize(broker.pending_file)) + + # skip_commits True - no merge + db_file = os.path.join(self.testdir, '2.db') + broker = DatabaseBroker(db_file, skip_commits=True) + broker._initialize = MagicMock() + broker.initialize(Timestamp.now()) + with open(broker.pending_file, 'wb') as fd: + fd.write(':ignored') + with patch.object(broker, 'merge_items') as mock_merge_items: + with self.assertRaises(DatabaseConnectionError) as cm: + broker._commit_puts(['hmmm']) + mock_merge_items.assert_not_called() + self.assertIn('commits not accepted', str(cm.exception)) + with open(broker.pending_file, 'rb') as fd: + self.assertEqual(':ignored', fd.read()) + + def test_put_record(self): + db_file = os.path.join(self.testdir, '1.db') + broker = DatabaseBroker(db_file) + broker._initialize = MagicMock() + broker.initialize(Timestamp.now()) + + # pending file created and record written + broker.make_tuple_for_pickle = lambda x: x.upper() + with patch.object(broker, '_commit_puts') as mock_commit_puts: + broker.put_record('pinky') + mock_commit_puts.assert_not_called() + with open(broker.pending_file, 'rb') as fd: + pending = fd.read() + items = pending.split(':') + self.assertEqual(['PINKY'], + [pickle.loads(i.decode('base64')) for i in items[1:]]) + + # record appended + with patch.object(broker, '_commit_puts') as mock_commit_puts: + broker.put_record('perky') + mock_commit_puts.assert_not_called() + with open(broker.pending_file, 'rb') as fd: + pending = fd.read() + items = pending.split(':') + self.assertEqual(['PINKY', 'PERKY'], + [pickle.loads(i.decode('base64')) for i in items[1:]]) + + # pending file above cap + cap = swift.common.db.PENDING_CAP + while os.path.getsize(broker.pending_file) < cap: + with open(broker.pending_file, 'ab') as fd: + fd.write('x' * 100000) + with patch.object(broker, '_commit_puts') as mock_commit_puts: + broker.put_record('direct') + mock_commit_puts.called_once_with(['direct']) + + # records shouldn't be put to brokers with skip_commits True because + # they cannot be accepted if the pending file is full + broker.skip_commits = True + with open(broker.pending_file, 'wb'): + # empty the pending file + pass + with patch.object(broker, '_commit_puts') as mock_commit_puts: + with self.assertRaises(DatabaseConnectionError) as cm: + broker.put_record('unwelcome') + self.assertIn('commits not accepted', str(cm.exception)) + mock_commit_puts.assert_not_called() + with open(broker.pending_file, 'rb') as fd: + pending = fd.read() + self.assertFalse(pending) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py index 7c4143d641..20c5d6738a 100644 --- a/test/unit/common/test_db_replicator.py +++ b/test/unit/common/test_db_replicator.py @@ -16,6 +16,8 @@ from __future__ import print_function import unittest from contextlib import contextmanager + +import eventlet import os import logging import errno @@ -26,6 +28,7 @@ from tempfile import mkdtemp, NamedTemporaryFile import json import mock +from copy import deepcopy from mock import patch, call from six.moves import reload_module @@ -37,6 +40,7 @@ from swift.common.exceptions import DriveNotMounted from swift.common.swob import HTTPException from test import unit +from test.unit import FakeLogger from test.unit.common.test_db import ExampleBroker @@ -160,6 +164,11 @@ class ReplHttp(object): self.set_status = set_status replicated = False host = 'localhost' + node = { + 'ip': '127.0.0.1', + 'port': '6000', + 'device': 'sdb', + } def replicate(self, *args): self.replicated = True @@ -230,11 +239,27 @@ class FakeBroker(object): 'put_timestamp': 1, 'created_at': 1, 'count': 0, + 'max_row': 99, + 'id': 'ID', + 'metadata': {} }) if self.stub_replication_info: info.update(self.stub_replication_info) return info + def get_max_row(self, table=None): + return self.get_replication_info()['max_row'] + + def is_reclaimable(self, now, reclaim_age): + info = self.get_replication_info() + return info['count'] == 0 and ( + (now - reclaim_age) > + info['delete_timestamp'] > + info['put_timestamp']) + + def get_other_replication_items(self): + return None + def reclaim(self, item_timestamp, sync_timestamp): pass @@ -273,6 +298,7 @@ class TestDBReplicator(unittest.TestCase): self.recon_cache = mkdtemp() rmtree(self.recon_cache, ignore_errors=1) os.mkdir(self.recon_cache) + self.logger = unit.debug_logger('test-replicator') def tearDown(self): for patcher in self._patchers: @@ -287,6 +313,7 @@ class TestDBReplicator(unittest.TestCase): def stub_delete_db(self, broker): self.delete_db_calls.append('/path/to/file') + return True def test_creation(self): # later config should be extended to assert more config options @@ -647,11 +674,107 @@ class TestDBReplicator(unittest.TestCase): }) def test_replicate_object(self): + # verify return values from replicate_object db_replicator.ring = FakeRingWithNodes() - replicator = TestReplicator({}) - replicator.delete_db = self.stub_delete_db - replicator._replicate_object('0', '/path/to/file', 'node_id') - self.assertEqual([], self.delete_db_calls) + db_path = '/path/to/file' + replicator = TestReplicator({}, logger=FakeLogger()) + info = FakeBroker().get_replication_info() + # make remote appear to be in sync + rinfo = {'point': info['max_row'], 'id': 'remote_id'} + + class FakeResponse(object): + def __init__(self, status, rinfo): + self._status = status + self.data = json.dumps(rinfo) + + @property + def status(self): + if isinstance(self._status, (Exception, eventlet.Timeout)): + raise self._status + return self._status + + # all requests fail + replicate = 'swift.common.db_replicator.ReplConnection.replicate' + with mock.patch(replicate) as fake_replicate: + fake_replicate.side_effect = [ + FakeResponse(500, None), + FakeResponse(500, None), + FakeResponse(500, None)] + with mock.patch.object(replicator, 'delete_db') as mock_delete: + res = replicator._replicate_object('0', db_path, 'node_id') + self.assertRaises(StopIteration, next, fake_replicate.side_effect) + self.assertEqual((False, [False, False, False]), res) + self.assertEqual(0, mock_delete.call_count) + self.assertFalse(replicator.logger.get_lines_for_level('error')) + self.assertFalse(replicator.logger.get_lines_for_level('warning')) + replicator.logger.clear() + + with mock.patch(replicate) as fake_replicate: + fake_replicate.side_effect = [ + FakeResponse(Exception('ugh'), None), + FakeResponse(eventlet.Timeout(), None), + FakeResponse(200, rinfo)] + with mock.patch.object(replicator, 'delete_db') as mock_delete: + res = replicator._replicate_object('0', db_path, 'node_id') + self.assertRaises(StopIteration, next, fake_replicate.side_effect) + self.assertEqual((False, [False, False, True]), res) + self.assertEqual(0, mock_delete.call_count) + lines = replicator.logger.get_lines_for_level('error') + self.assertIn('ERROR syncing', lines[0]) + self.assertIn('ERROR syncing', lines[1]) + self.assertFalse(lines[2:]) + self.assertFalse(replicator.logger.get_lines_for_level('warning')) + replicator.logger.clear() + + # partial success + with mock.patch(replicate) as fake_replicate: + fake_replicate.side_effect = [ + FakeResponse(200, rinfo), + FakeResponse(200, rinfo), + FakeResponse(500, None)] + with mock.patch.object(replicator, 'delete_db') as mock_delete: + res = replicator._replicate_object('0', db_path, 'node_id') + self.assertRaises(StopIteration, next, fake_replicate.side_effect) + self.assertEqual((False, [True, True, False]), res) + self.assertEqual(0, mock_delete.call_count) + self.assertFalse(replicator.logger.get_lines_for_level('error')) + self.assertFalse(replicator.logger.get_lines_for_level('warning')) + replicator.logger.clear() + + # 507 triggers additional requests + with mock.patch(replicate) as fake_replicate: + fake_replicate.side_effect = [ + FakeResponse(200, rinfo), + FakeResponse(200, rinfo), + FakeResponse(507, None), + FakeResponse(507, None), + FakeResponse(200, rinfo)] + with mock.patch.object(replicator, 'delete_db') as mock_delete: + res = replicator._replicate_object('0', db_path, 'node_id') + self.assertRaises(StopIteration, next, fake_replicate.side_effect) + self.assertEqual((False, [True, True, False, False, True]), res) + self.assertEqual(0, mock_delete.call_count) + lines = replicator.logger.get_lines_for_level('error') + self.assertIn('Remote drive not mounted', lines[0]) + self.assertIn('Remote drive not mounted', lines[1]) + self.assertFalse(lines[2:]) + self.assertFalse(replicator.logger.get_lines_for_level('warning')) + replicator.logger.clear() + + # all requests succeed; node id == 'node_id' causes node to be + # considered a handoff so expect the db to be deleted + with mock.patch(replicate) as fake_replicate: + fake_replicate.side_effect = [ + FakeResponse(200, rinfo), + FakeResponse(200, rinfo), + FakeResponse(200, rinfo)] + with mock.patch.object(replicator, 'delete_db') as mock_delete: + res = replicator._replicate_object('0', db_path, 'node_id') + self.assertRaises(StopIteration, next, fake_replicate.side_effect) + self.assertEqual((True, [True, True, True]), res) + self.assertEqual(1, mock_delete.call_count) + self.assertFalse(replicator.logger.get_lines_for_level('error')) + self.assertFalse(replicator.logger.get_lines_for_level('warning')) def test_replicate_object_quarantine(self): replicator = TestReplicator({}) @@ -695,8 +818,122 @@ class TestDBReplicator(unittest.TestCase): replicator.brokerclass = FakeAccountBroker replicator._repl_to_node = lambda *args: True replicator.delete_db = self.stub_delete_db - replicator._replicate_object('0', '/path/to/file', 'node_id') + orig_cleanup = replicator.cleanup_post_replicate + with mock.patch.object(replicator, 'cleanup_post_replicate', + side_effect=orig_cleanup) as mock_cleanup: + replicator._replicate_object('0', '/path/to/file', 'node_id') + mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3) + self.assertIsInstance(mock_cleanup.call_args[0][0], + replicator.brokerclass) self.assertEqual(['/path/to/file'], self.delete_db_calls) + self.assertEqual(0, replicator.stats['failure']) + + def test_replicate_object_delete_delegated_to_cleanup_post_replicate(self): + replicator = TestReplicator({}) + replicator.ring = FakeRingWithNodes().Ring('path') + replicator.brokerclass = FakeAccountBroker + replicator._repl_to_node = lambda *args: True + replicator.delete_db = self.stub_delete_db + + # cleanup succeeds + with mock.patch.object(replicator, 'cleanup_post_replicate', + return_value=True) as mock_cleanup: + replicator._replicate_object('0', '/path/to/file', 'node_id') + mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3) + self.assertIsInstance(mock_cleanup.call_args[0][0], + replicator.brokerclass) + self.assertFalse(self.delete_db_calls) + self.assertEqual(0, replicator.stats['failure']) + self.assertEqual(3, replicator.stats['success']) + + # cleanup fails + replicator._zero_stats() + with mock.patch.object(replicator, 'cleanup_post_replicate', + return_value=False) as mock_cleanup: + replicator._replicate_object('0', '/path/to/file', 'node_id') + mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3) + self.assertIsInstance(mock_cleanup.call_args[0][0], + replicator.brokerclass) + self.assertFalse(self.delete_db_calls) + self.assertEqual(3, replicator.stats['failure']) + self.assertEqual(0, replicator.stats['success']) + + # shouldbehere True - cleanup not required + replicator._zero_stats() + primary_node_id = replicator.ring.get_part_nodes('0')[0]['id'] + with mock.patch.object(replicator, 'cleanup_post_replicate', + return_value=True) as mock_cleanup: + replicator._replicate_object('0', '/path/to/file', primary_node_id) + mock_cleanup.assert_not_called() + self.assertFalse(self.delete_db_calls) + self.assertEqual(0, replicator.stats['failure']) + self.assertEqual(2, replicator.stats['success']) + + def test_cleanup_post_replicate(self): + replicator = TestReplicator({}, logger=self.logger) + replicator.ring = FakeRingWithNodes().Ring('path') + broker = FakeBroker() + replicator._repl_to_node = lambda *args: True + info = broker.get_replication_info() + + with mock.patch.object(replicator, 'delete_db') as mock_delete_db: + res = replicator.cleanup_post_replicate( + broker, info, [False] * 3) + mock_delete_db.assert_not_called() + self.assertTrue(res) + self.assertEqual(['Not deleting db %s (0/3 success)' % broker.db_file], + replicator.logger.get_lines_for_level('debug')) + replicator.logger.clear() + + with mock.patch.object(replicator, 'delete_db') as mock_delete_db: + res = replicator.cleanup_post_replicate( + broker, info, [True, False, True]) + mock_delete_db.assert_not_called() + self.assertTrue(res) + self.assertEqual(['Not deleting db %s (2/3 success)' % broker.db_file], + replicator.logger.get_lines_for_level('debug')) + replicator.logger.clear() + + broker.stub_replication_info = {'max_row': 101} + with mock.patch.object(replicator, 'delete_db') as mock_delete_db: + res = replicator.cleanup_post_replicate( + broker, info, [True] * 3) + mock_delete_db.assert_not_called() + self.assertTrue(res) + self.assertEqual(['Not deleting db %s (2 new rows)' % broker.db_file], + replicator.logger.get_lines_for_level('debug')) + replicator.logger.clear() + + broker.stub_replication_info = {'max_row': 98} + with mock.patch.object(replicator, 'delete_db') as mock_delete_db: + res = replicator.cleanup_post_replicate( + broker, info, [True] * 3) + mock_delete_db.assert_not_called() + self.assertTrue(res) + broker.stub_replication_info = None + self.assertEqual(['Not deleting db %s (negative max_row_delta: -1)' % + broker.db_file], + replicator.logger.get_lines_for_level('error')) + replicator.logger.clear() + + with mock.patch.object(replicator, 'delete_db') as mock_delete_db: + res = replicator.cleanup_post_replicate( + broker, info, [True] * 3) + mock_delete_db.assert_called_once_with(broker) + self.assertTrue(res) + self.assertEqual(['Successfully deleted db %s' % broker.db_file], + replicator.logger.get_lines_for_level('debug')) + replicator.logger.clear() + + with mock.patch.object(replicator, 'delete_db', + return_value=False) as mock_delete_db: + res = replicator.cleanup_post_replicate( + broker, info, [True] * 3) + mock_delete_db.assert_called_once_with(broker) + self.assertFalse(res) + self.assertEqual(['Failed to delete db %s' % broker.db_file], + replicator.logger.get_lines_for_level('debug')) + replicator.logger.clear() def test_replicate_object_with_exception(self): replicator = TestReplicator({}) @@ -949,6 +1186,8 @@ class TestDBReplicator(unittest.TestCase): response = rpc.dispatch(('drive', 'part', 'hash'), ['rsync_then_merge', 'arg1', 'arg2']) expected_calls = [call('/part/ash/hash/hash.db'), + call('/drive/tmp/arg1'), + call(FakeBroker.db_file), call('/drive/tmp/arg1')] self.assertEqual(mock_os.path.exists.call_args_list, expected_calls) @@ -1010,7 +1249,8 @@ class TestDBReplicator(unittest.TestCase): def mock_renamer(old, new): self.assertEqual('/drive/tmp/arg1', old) - self.assertEqual('/data/db.db', new) + # FakeBroker uses module filename as db_file! + self.assertEqual(__file__, new) self._patch(patch.object, db_replicator, 'renamer', mock_renamer) @@ -1023,7 +1263,7 @@ class TestDBReplicator(unittest.TestCase): self.assertEqual('204 No Content', response.status) self.assertEqual(204, response.status_int) - def test_complete_rsync_db_does_not_exist(self): + def test_complete_rsync_db_exists(self): rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, mount_check=False) @@ -1740,7 +1980,7 @@ class TestReplToNode(unittest.TestCase): def test_repl_to_node_300_status(self): self.http = ReplHttp('{"id": 3, "point": -1}', set_status=300) - self.assertIsNone(self.replicator._repl_to_node( + self.assertFalse(self.replicator._repl_to_node( self.fake_node, FakeBroker(), '0', self.fake_info)) def test_repl_to_node_not_response(self): @@ -1783,7 +2023,7 @@ class FakeHTTPResponse(object): return self.resp.body -def attach_fake_replication_rpc(rpc, replicate_hook=None): +def attach_fake_replication_rpc(rpc, replicate_hook=None, errors=None): class FakeReplConnection(object): def __init__(self, node, partition, hash_, logger): @@ -1795,12 +2035,16 @@ def attach_fake_replication_rpc(rpc, replicate_hook=None): def replicate(self, op, *sync_args): print('REPLICATE: %s, %s, %r' % (self.path, op, sync_args)) - replicate_args = self.path.lstrip('/').split('/') - args = [op] + list(sync_args) - with unit.mock_check_drive(isdir=not rpc.mount_check, - ismount=rpc.mount_check): - swob_response = rpc.dispatch(replicate_args, args) - resp = FakeHTTPResponse(swob_response) + resp = None + if errors and op in errors and errors[op]: + resp = errors[op].pop(0) + if not resp: + replicate_args = self.path.lstrip('/').split('/') + args = [op] + deepcopy(list(sync_args)) + with unit.mock_check_drive(isdir=not rpc.mount_check, + ismount=rpc.mount_check): + swob_response = rpc.dispatch(replicate_args, args) + resp = FakeHTTPResponse(swob_response) if replicate_hook: replicate_hook(op, *sync_args) return resp @@ -1872,15 +2116,19 @@ class TestReplicatorSync(unittest.TestCase): conf.update(conf_updates) return self.replicator_daemon(conf, logger=self.logger) - def _run_once(self, node, conf_updates=None, daemon=None): - daemon = daemon or self._get_daemon(node, conf_updates) - + def _install_fake_rsync_file(self, daemon, captured_calls=None): def _rsync_file(db_file, remote_file, **kwargs): + if captured_calls is not None: + captured_calls.append((db_file, remote_file, kwargs)) remote_server, remote_path = remote_file.split('/', 1) dest_path = os.path.join(self.root, remote_path) copy(db_file, dest_path) return True daemon._rsync_file = _rsync_file + + def _run_once(self, node, conf_updates=None, daemon=None): + daemon = daemon or self._get_daemon(node, conf_updates) + self._install_fake_rsync_file(daemon) with mock.patch('swift.common.db_replicator.whataremyips', new=lambda *a, **kw: [node['replication_ip']]), \ unit.mock_check_drive(isdir=not daemon.mount_check, diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index b9caaabf34..48724eac33 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -1454,6 +1454,15 @@ class TestUtils(unittest.TestCase): with open(testcache_file) as fd: file_dict = json.loads(fd.readline()) self.assertEqual(expect_dict, file_dict) + # nested dict items are not sticky + submit_dict = {'key1': {'key2': {'value3': 3}}} + expect_dict = {'key0': 101, + 'key1': {'key2': {'value3': 3}, + 'value1': 1, 'value2': 2}} + utils.dump_recon_cache(submit_dict, testcache_file, logger) + with open(testcache_file) as fd: + file_dict = json.loads(fd.readline()) + self.assertEqual(expect_dict, file_dict) # cached entries are sticky submit_dict = {} utils.dump_recon_cache(submit_dict, testcache_file, logger) diff --git a/test/unit/common/test_wsgi.py b/test/unit/common/test_wsgi.py index 774fcf84e8..8e88d09b59 100644 --- a/test/unit/common/test_wsgi.py +++ b/test/unit/common/test_wsgi.py @@ -1270,9 +1270,10 @@ class TestWorkersStrategy(unittest.TestCase): pid += 1 sock_count += 1 + mypid = os.getpid() self.assertEqual([ - 'Started child %s' % 88, - 'Started child %s' % 89, + 'Started child %s from parent %s' % (88, mypid), + 'Started child %s from parent %s' % (89, mypid), ], self.logger.get_lines_for_level('notice')) self.assertEqual(2, sock_count) @@ -1282,7 +1283,7 @@ class TestWorkersStrategy(unittest.TestCase): self.strategy.register_worker_exit(88) self.assertEqual([ - 'Removing dead child %s' % 88, + 'Removing dead child %s from parent %s' % (88, mypid) ], self.logger.get_lines_for_level('error')) for s, i in self.strategy.new_worker_socks(): @@ -1294,9 +1295,9 @@ class TestWorkersStrategy(unittest.TestCase): self.assertEqual(1, sock_count) self.assertEqual([ - 'Started child %s' % 88, - 'Started child %s' % 89, - 'Started child %s' % 90, + 'Started child %s from parent %s' % (88, mypid), + 'Started child %s from parent %s' % (89, mypid), + 'Started child %s from parent %s' % (90, mypid), ], self.logger.get_lines_for_level('notice')) def test_post_fork_hook(self): diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py index 1febf47cfb..9a3d86d4d4 100644 --- a/test/unit/container/test_backend.py +++ b/test/unit/container/test_backend.py @@ -20,7 +20,6 @@ import hashlib import unittest from time import sleep, time from uuid import uuid4 -import itertools import random from collections import defaultdict from contextlib import contextmanager @@ -30,7 +29,7 @@ import json from swift.container.backend import ContainerBroker, \ update_new_item_from_existing -from swift.common.utils import Timestamp, encode_timestamps +from swift.common.utils import Timestamp, encode_timestamps, hash_path from swift.common.storage_policy import POLICIES import mock @@ -46,7 +45,7 @@ class TestContainerBroker(unittest.TestCase): def test_creation(self): # Test ContainerBroker.__init__ broker = ContainerBroker(':memory:', account='a', container='c') - self.assertEqual(broker.db_file, ':memory:') + self.assertEqual(broker._db_file, ':memory:') broker.initialize(Timestamp('1').internal, 0) with broker.get() as conn: curs = conn.cursor() @@ -55,11 +54,11 @@ class TestContainerBroker(unittest.TestCase): @patch_policies def test_storage_policy_property(self): - ts = (Timestamp(t).internal for t in itertools.count(int(time()))) + ts = make_timestamp_iter() for policy in POLICIES: broker = ContainerBroker(':memory:', account='a', container='policy_%s' % policy.name) - broker.initialize(next(ts), policy.idx) + broker.initialize(next(ts).internal, policy.idx) with broker.get() as conn: try: conn.execute('''SELECT storage_policy_index @@ -165,17 +164,17 @@ class TestContainerBroker(unittest.TestCase): broker.delete_db(Timestamp.now().internal) def test_get_info_is_deleted(self): - start = int(time()) - ts = (Timestamp(t).internal for t in itertools.count(start)) + ts = make_timestamp_iter() + start = next(ts) broker = ContainerBroker(':memory:', account='test_account', container='test_container') # create it - broker.initialize(next(ts), POLICIES.default.idx) + broker.initialize(start.internal, POLICIES.default.idx) info, is_deleted = broker.get_info_is_deleted() self.assertEqual(is_deleted, broker.is_deleted()) self.assertEqual(is_deleted, False) # sanity self.assertEqual(info, broker.get_info()) - self.assertEqual(info['put_timestamp'], Timestamp(start).internal) + self.assertEqual(info['put_timestamp'], start.internal) self.assertTrue(Timestamp(info['created_at']) >= start) self.assertEqual(info['delete_timestamp'], '0') if self.__class__ in (TestContainerBrokerBeforeMetadata, @@ -184,28 +183,28 @@ class TestContainerBroker(unittest.TestCase): self.assertEqual(info['status_changed_at'], '0') else: self.assertEqual(info['status_changed_at'], - Timestamp(start).internal) + start.internal) # delete it delete_timestamp = next(ts) - broker.delete_db(delete_timestamp) + broker.delete_db(delete_timestamp.internal) info, is_deleted = broker.get_info_is_deleted() self.assertEqual(is_deleted, True) # sanity self.assertEqual(is_deleted, broker.is_deleted()) self.assertEqual(info, broker.get_info()) - self.assertEqual(info['put_timestamp'], Timestamp(start).internal) + self.assertEqual(info['put_timestamp'], start.internal) self.assertTrue(Timestamp(info['created_at']) >= start) self.assertEqual(info['delete_timestamp'], delete_timestamp) self.assertEqual(info['status_changed_at'], delete_timestamp) # bring back to life - broker.put_object('obj', next(ts), 0, 'text/plain', 'etag', + broker.put_object('obj', next(ts).internal, 0, 'text/plain', 'etag', storage_policy_index=broker.storage_policy_index) info, is_deleted = broker.get_info_is_deleted() self.assertEqual(is_deleted, False) # sanity self.assertEqual(is_deleted, broker.is_deleted()) self.assertEqual(info, broker.get_info()) - self.assertEqual(info['put_timestamp'], Timestamp(start).internal) + self.assertEqual(info['put_timestamp'], start.internal) self.assertTrue(Timestamp(info['created_at']) >= start) self.assertEqual(info['delete_timestamp'], delete_timestamp) self.assertEqual(info['status_changed_at'], delete_timestamp) @@ -559,7 +558,7 @@ class TestContainerBroker(unittest.TestCase): "SELECT deleted FROM object").fetchone()[0], deleted) def _test_put_object_multiple_encoded_timestamps(self, broker): - ts = (Timestamp(t) for t in itertools.count(int(time()))) + ts = make_timestamp_iter() broker.initialize(next(ts).internal, 0) t = [next(ts) for _ in range(9)] @@ -629,7 +628,7 @@ class TestContainerBroker(unittest.TestCase): self._test_put_object_multiple_encoded_timestamps(broker) def _test_put_object_multiple_explicit_timestamps(self, broker): - ts = (Timestamp(t) for t in itertools.count(int(time()))) + ts = make_timestamp_iter() broker.initialize(next(ts).internal, 0) t = [next(ts) for _ in range(11)] @@ -733,7 +732,7 @@ class TestContainerBroker(unittest.TestCase): def test_last_modified_time(self): # Test container listing reports the most recent of data or metadata # timestamp as last-modified time - ts = (Timestamp(t) for t in itertools.count(int(time()))) + ts = make_timestamp_iter() broker = ContainerBroker(':memory:', account='a', container='c') broker.initialize(next(ts).internal, 0) @@ -786,18 +785,17 @@ class TestContainerBroker(unittest.TestCase): @patch_policies def test_put_misplaced_object_does_not_effect_container_stats(self): policy = random.choice(list(POLICIES)) - ts = (Timestamp(t).internal for t in - itertools.count(int(time()))) + ts = make_timestamp_iter() broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(next(ts), policy.idx) + broker.initialize(next(ts).internal, policy.idx) # migration tests may not honor policy on initialize if isinstance(self, ContainerBrokerMigrationMixin): real_storage_policy_index = \ broker.get_info()['storage_policy_index'] policy = [p for p in POLICIES if p.idx == real_storage_policy_index][0] - broker.put_object('correct_o', next(ts), 123, 'text/plain', + broker.put_object('correct_o', next(ts).internal, 123, 'text/plain', '5af83e3196bf99f440f31f2e1a6c9afe', storage_policy_index=policy.idx) info = broker.get_info() @@ -805,7 +803,7 @@ class TestContainerBroker(unittest.TestCase): self.assertEqual(123, info['bytes_used']) other_policy = random.choice([p for p in POLICIES if p is not policy]) - broker.put_object('wrong_o', next(ts), 123, 'text/plain', + broker.put_object('wrong_o', next(ts).internal, 123, 'text/plain', '5af83e3196bf99f440f31f2e1a6c9afe', storage_policy_index=other_policy.idx) self.assertEqual(1, info['object_count']) @@ -814,23 +812,22 @@ class TestContainerBroker(unittest.TestCase): @patch_policies def test_has_multiple_policies(self): policy = random.choice(list(POLICIES)) - ts = (Timestamp(t).internal for t in - itertools.count(int(time()))) + ts = make_timestamp_iter() broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(next(ts), policy.idx) + broker.initialize(next(ts).internal, policy.idx) # migration tests may not honor policy on initialize if isinstance(self, ContainerBrokerMigrationMixin): real_storage_policy_index = \ broker.get_info()['storage_policy_index'] policy = [p for p in POLICIES if p.idx == real_storage_policy_index][0] - broker.put_object('correct_o', next(ts), 123, 'text/plain', + broker.put_object('correct_o', next(ts).internal, 123, 'text/plain', '5af83e3196bf99f440f31f2e1a6c9afe', storage_policy_index=policy.idx) self.assertFalse(broker.has_multiple_policies()) other_policy = [p for p in POLICIES if p is not policy][0] - broker.put_object('wrong_o', next(ts), 123, 'text/plain', + broker.put_object('wrong_o', next(ts).internal, 123, 'text/plain', '5af83e3196bf99f440f31f2e1a6c9afe', storage_policy_index=other_policy.idx) self.assertTrue(broker.has_multiple_policies()) @@ -838,11 +835,10 @@ class TestContainerBroker(unittest.TestCase): @patch_policies def test_get_policy_info(self): policy = random.choice(list(POLICIES)) - ts = (Timestamp(t).internal for t in - itertools.count(int(time()))) + ts = make_timestamp_iter() broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(next(ts), policy.idx) + broker.initialize(next(ts).internal, policy.idx) # migration tests may not honor policy on initialize if isinstance(self, ContainerBrokerMigrationMixin): real_storage_policy_index = \ @@ -854,7 +850,7 @@ class TestContainerBroker(unittest.TestCase): self.assertEqual(policy_stats, expected) # add an object - broker.put_object('correct_o', next(ts), 123, 'text/plain', + broker.put_object('correct_o', next(ts).internal, 123, 'text/plain', '5af83e3196bf99f440f31f2e1a6c9afe', storage_policy_index=policy.idx) policy_stats = broker.get_policy_stats() @@ -864,7 +860,7 @@ class TestContainerBroker(unittest.TestCase): # add a misplaced object other_policy = random.choice([p for p in POLICIES if p is not policy]) - broker.put_object('wrong_o', next(ts), 123, 'text/plain', + broker.put_object('wrong_o', next(ts).internal, 123, 'text/plain', '5af83e3196bf99f440f31f2e1a6c9afe', storage_policy_index=other_policy.idx) policy_stats = broker.get_policy_stats() @@ -876,15 +872,14 @@ class TestContainerBroker(unittest.TestCase): @patch_policies def test_policy_stat_tracking(self): - ts = (Timestamp(t).internal for t in - itertools.count(int(time()))) + ts = make_timestamp_iter() broker = ContainerBroker(':memory:', account='a', container='c') # Note: in subclasses of this TestCase that inherit the # ContainerBrokerMigrationMixin, passing POLICIES.default.idx here has # no effect and broker.get_policy_stats() returns a dict with a single # entry mapping policy index 0 to the container stats - broker.initialize(next(ts), POLICIES.default.idx) + broker.initialize(next(ts).internal, POLICIES.default.idx) stats = defaultdict(dict) def assert_empty_default_policy_stats(policy_stats): @@ -904,7 +899,7 @@ class TestContainerBroker(unittest.TestCase): policy_index = random.randint(0, iters * 0.1) name = 'object-%s' % random.randint(0, iters * 0.1) size = random.randint(0, iters) - broker.put_object(name, next(ts), size, 'text/plain', + broker.put_object(name, next(ts).internal, size, 'text/plain', '5af83e3196bf99f440f31f2e1a6c9afe', storage_policy_index=policy_index) # track the size of the latest timestamp put for each object @@ -1930,12 +1925,11 @@ class TestContainerBroker(unittest.TestCase): self.assertEqual(rec['content_type'], 'text/plain') def test_set_storage_policy_index(self): - ts = (Timestamp(t).internal for t in - itertools.count(int(time()))) + ts = make_timestamp_iter() broker = ContainerBroker(':memory:', account='test_account', container='test_container') timestamp = next(ts) - broker.initialize(timestamp, 0) + broker.initialize(timestamp.internal, 0) info = broker.get_info() self.assertEqual(0, info['storage_policy_index']) # sanity check @@ -1946,39 +1940,40 @@ class TestContainerBroker(unittest.TestCase): TestContainerBrokerBeforeSPI): self.assertEqual(info['status_changed_at'], '0') else: - self.assertEqual(timestamp, info['status_changed_at']) + self.assertEqual(timestamp.internal, info['status_changed_at']) expected = {0: {'object_count': 0, 'bytes_used': 0}} self.assertEqual(expected, broker.get_policy_stats()) timestamp = next(ts) - broker.set_storage_policy_index(111, timestamp) + broker.set_storage_policy_index(111, timestamp.internal) self.assertEqual(broker.storage_policy_index, 111) info = broker.get_info() self.assertEqual(111, info['storage_policy_index']) self.assertEqual(0, info['object_count']) self.assertEqual(0, info['bytes_used']) - self.assertEqual(timestamp, info['status_changed_at']) + self.assertEqual(timestamp.internal, info['status_changed_at']) expected[111] = {'object_count': 0, 'bytes_used': 0} self.assertEqual(expected, broker.get_policy_stats()) timestamp = next(ts) - broker.set_storage_policy_index(222, timestamp) + broker.set_storage_policy_index(222, timestamp.internal) self.assertEqual(broker.storage_policy_index, 222) info = broker.get_info() self.assertEqual(222, info['storage_policy_index']) self.assertEqual(0, info['object_count']) self.assertEqual(0, info['bytes_used']) - self.assertEqual(timestamp, info['status_changed_at']) + self.assertEqual(timestamp.internal, info['status_changed_at']) expected[222] = {'object_count': 0, 'bytes_used': 0} self.assertEqual(expected, broker.get_policy_stats()) old_timestamp, timestamp = timestamp, next(ts) - broker.set_storage_policy_index(222, timestamp) # it's idempotent + # setting again is idempotent + broker.set_storage_policy_index(222, timestamp.internal) info = broker.get_info() self.assertEqual(222, info['storage_policy_index']) self.assertEqual(0, info['object_count']) self.assertEqual(0, info['bytes_used']) - self.assertEqual(old_timestamp, info['status_changed_at']) + self.assertEqual(old_timestamp.internal, info['status_changed_at']) self.assertEqual(expected, broker.get_policy_stats()) def test_set_storage_policy_index_empty(self): @@ -2004,19 +1999,18 @@ class TestContainerBroker(unittest.TestCase): @with_tempdir def test_legacy_pending_files(self, tempdir): - ts = (Timestamp(t).internal for t in - itertools.count(int(time()))) + ts = make_timestamp_iter() db_path = os.path.join(tempdir, 'container.db') # first init an acct DB without the policy_stat table present broker = ContainerBroker(db_path, account='a', container='c') - broker.initialize(next(ts), 1) + broker.initialize(next(ts).internal, 1) # manually make some pending entries lacking storage_policy_index with open(broker.pending_file, 'a+b') as fp: for i in range(10): name, timestamp, size, content_type, etag, deleted = ( - 'o%s' % i, next(ts), 0, 'c', 'e', 0) + 'o%s' % i, next(ts).internal, 0, 'c', 'e', 0) fp.write(':') fp.write(pickle.dumps( (name, timestamp, size, content_type, etag, deleted), @@ -2033,7 +2027,7 @@ class TestContainerBroker(unittest.TestCase): else: size = 2 storage_policy_index = 1 - broker.put_object(name, next(ts), size, 'c', 'e', 0, + broker.put_object(name, next(ts).internal, size, 'c', 'e', 0, storage_policy_index=storage_policy_index) broker._commit_puts_stale_ok() @@ -2049,8 +2043,7 @@ class TestContainerBroker(unittest.TestCase): @with_tempdir def test_get_info_no_stale_reads(self, tempdir): - ts = (Timestamp(t).internal for t in - itertools.count(int(time()))) + ts = make_timestamp_iter() db_path = os.path.join(tempdir, 'container.db') def mock_commit_puts(): @@ -2058,13 +2051,13 @@ class TestContainerBroker(unittest.TestCase): broker = ContainerBroker(db_path, account='a', container='c', stale_reads_ok=False) - broker.initialize(next(ts), 1) + broker.initialize(next(ts).internal, 1) # manually make some pending entries with open(broker.pending_file, 'a+b') as fp: for i in range(10): name, timestamp, size, content_type, etag, deleted = ( - 'o%s' % i, next(ts), 0, 'c', 'e', 0) + 'o%s' % i, next(ts).internal, 0, 'c', 'e', 0) fp.write(':') fp.write(pickle.dumps( (name, timestamp, size, content_type, etag, deleted), @@ -2079,8 +2072,7 @@ class TestContainerBroker(unittest.TestCase): @with_tempdir def test_get_info_stale_read_ok(self, tempdir): - ts = (Timestamp(t).internal for t in - itertools.count(int(time()))) + ts = make_timestamp_iter() db_path = os.path.join(tempdir, 'container.db') def mock_commit_puts(): @@ -2088,13 +2080,13 @@ class TestContainerBroker(unittest.TestCase): broker = ContainerBroker(db_path, account='a', container='c', stale_reads_ok=True) - broker.initialize(next(ts), 1) + broker.initialize(next(ts).internal, 1) # manually make some pending entries with open(broker.pending_file, 'a+b') as fp: for i in range(10): name, timestamp, size, content_type, etag, deleted = ( - 'o%s' % i, next(ts), 0, 'c', 'e', 0) + 'o%s' % i, next(ts).internal, 0, 'c', 'e', 0) fp.write(':') fp.write(pickle.dumps( (name, timestamp, size, content_type, etag, deleted), @@ -2104,6 +2096,26 @@ class TestContainerBroker(unittest.TestCase): broker._commit_puts = mock_commit_puts broker.get_info() + @with_tempdir + def test_create_broker(self, tempdir): + broker = ContainerBroker.create_broker(tempdir, 0, 'a', 'c') + hsh = hash_path('a', 'c') + expected_path = os.path.join( + tempdir, 'containers', '0', hsh[-3:], hsh, hsh + '.db') + self.assertEqual(expected_path, broker.db_file) + self.assertTrue(os.path.isfile(expected_path)) + + ts = Timestamp.now() + broker = ContainerBroker.create_broker(tempdir, 0, 'a', 'c1', + put_timestamp=ts.internal) + hsh = hash_path('a', 'c1') + expected_path = os.path.join( + tempdir, 'containers', '0', hsh[-3:], hsh, hsh + '.db') + self.assertEqual(expected_path, broker.db_file) + self.assertTrue(os.path.isfile(expected_path)) + self.assertEqual(ts.internal, broker.get_info()['put_timestamp']) + self.assertEqual(0, broker.get_info()['storage_policy_index']) + class TestCommonContainerBroker(test_db.TestExampleBroker): diff --git a/test/unit/container/test_server.py b/test/unit/container/test_server.py index 54ce6d973b..8327e8754b 100644 --- a/test/unit/container/test_server.py +++ b/test/unit/container/test_server.py @@ -424,7 +424,7 @@ class TestContainerController(unittest.TestCase): elif state[0] == 'race': # Save the original db_file attribute value self._saved_db_file = self.db_file - self.db_file += '.doesnotexist' + self._db_file += '.doesnotexist' def initialize(self, *args, **kwargs): if state[0] == 'initial': @@ -433,7 +433,7 @@ class TestContainerController(unittest.TestCase): elif state[0] == 'race': # Restore the original db_file attribute to get the race # behavior - self.db_file = self._saved_db_file + self._db_file = self._saved_db_file return super(InterceptedCoBr, self).initialize(*args, **kwargs) with mock.patch("swift.container.server.ContainerBroker",