diff --git a/swift/cli/manage_shard_ranges.py b/swift/cli/manage_shard_ranges.py index aacd91c607..995c23df35 100644 --- a/swift/cli/manage_shard_ranges.py +++ b/swift/cli/manage_shard_ranges.py @@ -208,9 +208,10 @@ class InvalidSolutionException(ManageShardRangesException): def _print_shard_range(sr, level=0): indent = ' ' * level print(indent + '%r' % sr.name) - print(indent + ' objects: %9d lower: %r' % (sr.object_count, - sr.lower_str)) - print(indent + ' state: %9s upper: %r' % (sr.state_text, sr.upper_str)) + print(indent + ' objects: %9d, tombstones: %9d, lower: %r' + % (sr.object_count, sr.tombstones, sr.lower_str)) + print(indent + ' state: %9s, upper: %r' + % (sr.state_text, sr.upper_str)) @contextmanager @@ -504,8 +505,8 @@ def compact_shard_ranges(broker, args): for sequence in compactible: acceptor = sequence[-1] donors = sequence[:-1] - print('Donor shard range(s) with total of %d objects:' - % donors.object_count) + print('Donor shard range(s) with total of %d rows:' + % donors.row_count) for donor in donors: _print_shard_range(donor, level=1) print('can be compacted into acceptor shard range:') diff --git a/swift/common/db.py b/swift/common/db.py index 69d91db4a0..7962e461f4 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -227,6 +227,82 @@ def get_db_connection(path, timeout=30, logger=None, okay_to_create=False): return conn +class TombstoneReclaimer(object): + """Encapsulates reclamation of deleted rows in a database.""" + def __init__(self, broker, age_timestamp): + """ + Encapsulates reclamation of deleted rows in a database. + + :param broker: an instance of :class:`~swift.common.db.DatabaseBroker`. + :param age_timestamp: a float timestamp: tombstones older than this + time will be deleted. + """ + self.broker = broker + self.age_timestamp = age_timestamp + self.marker = '' + self.remaining_tombstones = self.reclaimed = 0 + self.finished = False + # limit 1 offset N gives back the N+1th matching row; that row is used + # as an exclusive end_marker for a batch of deletes, so a batch + # comprises rows satisfying self.marker <= name < end_marker. + self.batch_query = ''' + SELECT name FROM %s WHERE deleted = 1 + AND name >= ? + ORDER BY NAME LIMIT 1 OFFSET ? + ''' % self.broker.db_contains_type + self.clean_batch_query = ''' + DELETE FROM %s WHERE deleted = 1 + AND name >= ? AND %s < %s + ''' % (self.broker.db_contains_type, self.broker.db_reclaim_timestamp, + self.age_timestamp) + + def _reclaim(self, conn): + curs = conn.execute(self.batch_query, (self.marker, RECLAIM_PAGE_SIZE)) + row = curs.fetchone() + end_marker = row[0] if row else '' + if end_marker: + # do a single book-ended DELETE and bounce out + curs = conn.execute(self.clean_batch_query + ' AND name < ?', + (self.marker, end_marker)) + self.marker = end_marker + self.reclaimed += curs.rowcount + self.remaining_tombstones += RECLAIM_PAGE_SIZE - curs.rowcount + else: + # delete off the end + curs = conn.execute(self.clean_batch_query, (self.marker,)) + self.finished = True + self.reclaimed += curs.rowcount + + def reclaim(self): + """ + Perform reclaim of deleted rows older than ``age_timestamp``. + """ + while not self.finished: + with self.broker.get() as conn: + self._reclaim(conn) + conn.commit() + + def get_tombstone_count(self): + """ + Return the number of remaining tombstones newer than ``age_timestamp``. + Executes the ``reclaim`` method if it has not already been called on + this instance. + + :return: The number of tombstones in the ``broker`` that are newer than + ``age_timestamp``. + """ + if not self.finished: + self.reclaim() + with self.broker.get() as conn: + curs = conn.execute(''' + SELECT COUNT(*) FROM %s WHERE deleted = 1 + AND name >= ? + ''' % (self.broker.db_contains_type,), (self.marker,)) + tombstones = curs.fetchone()[0] + self.remaining_tombstones += tombstones + return self.remaining_tombstones + + class DatabaseBroker(object): """Encapsulates working with a database.""" @@ -988,47 +1064,22 @@ class DatabaseBroker(object): with lock_parent_directory(self.pending_file, self.pending_timeout): self._commit_puts() - marker = '' - finished = False - while not finished: - with self.get() as conn: - marker = self._reclaim(conn, age_timestamp, marker) - if not marker: - finished = True - self._reclaim_other_stuff( - conn, age_timestamp, sync_timestamp) - conn.commit() + + tombstone_reclaimer = TombstoneReclaimer(self, age_timestamp) + tombstone_reclaimer.reclaim() + with self.get() as conn: + self._reclaim_other_stuff(conn, age_timestamp, sync_timestamp) + conn.commit() + return tombstone_reclaimer def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp): """ - This is only called once at the end of reclaim after _reclaim has been - called for each page. + This is only called once at the end of reclaim after tombstone reclaim + has been completed. """ self._reclaim_sync(conn, sync_timestamp) self._reclaim_metadata(conn, age_timestamp) - def _reclaim(self, conn, age_timestamp, marker): - clean_batch_qry = ''' - DELETE FROM %s WHERE deleted = 1 - AND name >= ? AND %s < ? - ''' % (self.db_contains_type, self.db_reclaim_timestamp) - curs = conn.execute(''' - SELECT name FROM %s WHERE deleted = 1 - AND name >= ? - ORDER BY NAME LIMIT 1 OFFSET ? - ''' % (self.db_contains_type,), (marker, RECLAIM_PAGE_SIZE)) - row = curs.fetchone() - if row: - # do a single book-ended DELETE and bounce out - end_marker = row[0] - conn.execute(clean_batch_qry + ' AND name < ?', ( - marker, age_timestamp, end_marker)) - else: - # delete off the end and reset marker to indicate we're done - end_marker = '' - conn.execute(clean_batch_qry, (marker, age_timestamp)) - return end_marker - def _reclaim_sync(self, conn, sync_timestamp): try: conn.execute(''' diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index fa180b6f47..1150f1c7d3 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -566,6 +566,12 @@ class Replicator(Daemon): self.logger.debug('Successfully deleted db %s', broker.db_file) return True + def _reclaim(self, broker, now=None): + if not now: + now = time.time() + return broker.reclaim(now - self.reclaim_age, + now - (self.reclaim_age * 2)) + def _replicate_object(self, partition, object_file, node_id): """ Replicate the db, choosing method based on whether or not it @@ -591,8 +597,7 @@ class Replicator(Daemon): try: broker = self.brokerclass(object_file, pending_timeout=30, logger=self.logger) - broker.reclaim(now - self.reclaim_age, - now - (self.reclaim_age * 2)) + self._reclaim(broker, now) info = broker.get_replication_info() bpart = self.ring.get_part( info['account'], info.get('container')) diff --git a/swift/common/utils.py b/swift/common/utils.py index ac8772721c..8a7be261f7 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -5042,6 +5042,8 @@ class ShardRange(object): sharding was enabled for a container. :param reported: optional indicator that this shard and its stats have been reported to the root container. + :param tombstones: the number of tombstones in the shard range; defaults to + -1 to indicate that the value is unknown. """ FOUND = 10 CREATED = 20 @@ -5079,7 +5081,7 @@ class ShardRange(object): def __init__(self, name, timestamp, lower=MIN, upper=MAX, object_count=0, bytes_used=0, meta_timestamp=None, deleted=False, state=None, state_timestamp=None, epoch=None, - reported=False): + reported=False, tombstones=-1): self.account = self.container = self._timestamp = \ self._meta_timestamp = self._state_timestamp = self._epoch = None self._lower = ShardRange.MIN @@ -5099,6 +5101,7 @@ class ShardRange(object): self.state_timestamp = state_timestamp self.epoch = epoch self.reported = reported + self.tombstones = tombstones @classmethod def sort_key(cls, sr): @@ -5274,6 +5277,24 @@ class ShardRange(object): raise ValueError('bytes_used cannot be < 0') self._bytes = bytes_used + @property + def tombstones(self): + return self._tombstones + + @tombstones.setter + def tombstones(self, tombstones): + self._tombstones = int(tombstones) + + @property + def row_count(self): + """ + Returns the total number of rows in the shard range i.e. the sum of + objects and tombstones. + + :return: the row count + """ + return self.object_count + max(self.tombstones, 0) + def update_meta(self, object_count, bytes_used, meta_timestamp=None): """ Set the object stats metadata to the given values and update the @@ -5300,6 +5321,27 @@ class ShardRange(object): else: self.meta_timestamp = meta_timestamp + def update_tombstones(self, tombstones, meta_timestamp=None): + """ + Set the tombstones metadata to the given values and update the + meta_timestamp to the current time. + + :param tombstones: should be an integer + :param meta_timestamp: timestamp for metadata; if not given the + current time will be set. + :raises ValueError: if ``tombstones`` cannot be cast to an int, or + if meta_timestamp is neither None nor can be cast to a + :class:`~swift.common.utils.Timestamp`. + """ + tombstones = int(tombstones) + if 0 <= tombstones != self.tombstones: + self.tombstones = tombstones + self.reported = False + if meta_timestamp is None: + self.meta_timestamp = Timestamp.now() + else: + self.meta_timestamp = meta_timestamp + def increment_meta(self, object_count, bytes_used): """ Increment the object stats metadata by the given values and update the @@ -5518,6 +5560,7 @@ class ShardRange(object): yield 'state_timestamp', self.state_timestamp.internal yield 'epoch', self.epoch.internal if self.epoch is not None else None yield 'reported', 1 if self.reported else 0 + yield 'tombstones', self.tombstones def copy(self, timestamp=None, **kwargs): """ @@ -5550,7 +5593,7 @@ class ShardRange(object): params['upper'], params['object_count'], params['bytes_used'], params['meta_timestamp'], params['deleted'], params['state'], params['state_timestamp'], params['epoch'], - params.get('reported', 0)) + params.get('reported', 0), params.get('tombstones', -1)) def expand(self, donors): """ @@ -5625,6 +5668,15 @@ class ShardRangeList(UserList): """ return sum(sr.object_count for sr in self) + @property + def row_count(self): + """ + Returns the total number of rows of all items in the list. + + :return: total row count + """ + return sum(sr.row_count for sr in self) + @property def bytes_used(self): """ diff --git a/swift/container/backend.py b/swift/container/backend.py index 92b39a12ea..334f578fbc 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -66,7 +66,7 @@ SHARD_AUDITING_STATES = [ShardRange.CREATED, ShardRange.CLEAVED, # tuples and vice-versa SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count', 'bytes_used', 'meta_timestamp', 'deleted', 'state', - 'state_timestamp', 'epoch', 'reported') + 'state_timestamp', 'epoch', 'reported', 'tombstones') POLICY_STAT_TABLE_CREATE = ''' CREATE TABLE policy_stat ( @@ -287,6 +287,7 @@ def merge_shards(shard_data, existing): if existing['meta_timestamp'] >= shard_data['meta_timestamp']: for k in ('object_count', 'bytes_used', 'meta_timestamp'): shard_data[k] = existing[k] + shard_data['tombstones'] = existing.get('tombstones', -1) else: new_content = True @@ -294,6 +295,7 @@ def merge_shards(shard_data, existing): if existing['reported'] and \ existing['object_count'] == shard_data['object_count'] and \ existing['bytes_used'] == shard_data['bytes_used'] and \ + existing.get('tombstones', -1) == shard_data['tombstones'] and \ existing['state'] == shard_data['state'] and \ existing['epoch'] == shard_data['epoch']: shard_data['reported'] = 1 @@ -618,7 +620,8 @@ class ContainerBroker(DatabaseBroker): state INTEGER, state_timestamp TEXT, epoch TEXT, - reported INTEGER DEFAULT 0 + reported INTEGER DEFAULT 0, + tombstones INTEGER DEFAULT -1 ); """ % SHARD_RANGE_TABLE) @@ -1450,22 +1453,34 @@ class ContainerBroker(DatabaseBroker): for item in to_add.values())) conn.commit() + migrations = { + 'no such column: reported': + self._migrate_add_shard_range_reported, + 'no such column: tombstones': + self._migrate_add_shard_range_tombstones, + ('no such table: %s' % SHARD_RANGE_TABLE): + self.create_shard_range_table, + } + migrations_done = set() with self.get() as conn: - try: - return _really_merge_items(conn) - except sqlite3.OperationalError as err: - # Without the rollback, new enough (>= py37) python/sqlite3 - # will panic: - # sqlite3.OperationalError: cannot start a transaction - # within a transaction - conn.rollback() - if 'no such column: reported' in str(err): - self._migrate_add_shard_range_reported(conn) + while True: + try: return _really_merge_items(conn) - if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err): - self.create_shard_range_table(conn) - return _really_merge_items(conn) - raise + except sqlite3.OperationalError as err: + # Without the rollback, new enough (>= py37) python/sqlite3 + # will panic: + # sqlite3.OperationalError: cannot start a transaction + # within a transaction + conn.rollback() + for err_str, migration in migrations.items(): + if err_str in migrations_done: + continue + if err_str in str(err): + migration(conn) + migrations_done.add(err_str) + break + else: + raise def get_reconciler_sync(self): with self.get() as conn: @@ -1624,6 +1639,17 @@ class ContainerBroker(DatabaseBroker): COMMIT; ''' % SHARD_RANGE_TABLE) + def _migrate_add_shard_range_tombstones(self, conn): + """ + Add the tombstones column to the 'shard_range' table. + """ + conn.executescript(''' + BEGIN; + ALTER TABLE %s + ADD COLUMN tombstones INTEGER DEFAULT -1; + COMMIT; + ''' % SHARD_RANGE_TABLE) + def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp): super(ContainerBroker, self)._reclaim_other_stuff( conn, age_timestamp, sync_timestamp) @@ -1673,7 +1699,11 @@ class ContainerBroker(DatabaseBroker): elif states is not None: included_states.add(states) - def do_query(conn, use_reported_column=True): + # defaults to be used when legacy db's are missing columns + default_values = {'reported': 0, + 'tombstones': -1} + + def do_query(conn, defaults=None): condition = '' conditions = [] params = [] @@ -1691,10 +1721,13 @@ class ContainerBroker(DatabaseBroker): params.append(self.path) if conditions: condition = ' WHERE ' + ' AND '.join(conditions) - if use_reported_column: - columns = SHARD_RANGE_KEYS - else: - columns = SHARD_RANGE_KEYS[:-1] + ('0 as reported', ) + columns = SHARD_RANGE_KEYS[:-2] + for column in SHARD_RANGE_KEYS[-2:]: + if column in defaults: + columns += (('%s as %s' % + (default_values[column], column)),) + else: + columns += (column,) sql = ''' SELECT %s FROM %s%s; @@ -1704,14 +1737,26 @@ class ContainerBroker(DatabaseBroker): return [row for row in data] with self.maybe_get(connection) as conn: - try: - return do_query(conn) - except sqlite3.OperationalError as err: - if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err): - return [] - if 'no such column: reported' in str(err): - return do_query(conn, use_reported_column=False) - raise + defaults = set() + attempts = len(default_values) + 1 + while attempts: + attempts -= 1 + try: + return do_query(conn, defaults) + except sqlite3.OperationalError as err: + if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err): + return [] + if not attempts: + raise + new_defaults = set() + for column in default_values.keys(): + if 'no such column: %s' % column in str(err): + new_defaults.add(column) + if not new_defaults: + raise + if new_defaults.intersection(defaults): + raise + defaults.update(new_defaults) @classmethod def resolve_shard_range_states(cls, states): diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 971021b90f..0ca8c339c8 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -120,6 +120,8 @@ def find_overlapping_ranges(shard_ranges): def is_sharding_candidate(shard_range, threshold): + # note: use *object* count as the condition for sharding: tombstones will + # eventually be reclaimed so should not trigger sharding return (shard_range.state == ShardRange.ACTIVE and shard_range.object_count >= threshold) @@ -127,10 +129,13 @@ def is_sharding_candidate(shard_range, threshold): def is_shrinking_candidate(shard_range, shrink_threshold, merge_size, states=None): # typically shrink_threshold < merge_size but check both just in case + # note: use *row* count (objects plus tombstones) as the condition for + # shrinking to avoid inadvertently moving large numbers of tombstones into + # an acceptor states = states or (ShardRange.ACTIVE,) return (shard_range.state in states and - shard_range.object_count < shrink_threshold and - shard_range.object_count <= merge_size) + shard_range.row_count < shrink_threshold and + shard_range.row_count <= merge_size) def find_sharding_candidates(broker, threshold, shard_ranges=None): @@ -186,6 +191,8 @@ def find_compactible_shard_sequences(broker, compacted into each acceptor; -1 implies unlimited. :param max_expanding: the maximum number of acceptors to be found (i.e. the maximum number of sequences to be returned); -1 implies unlimited. + :param include_shrinking: if True then existing compactible sequences are + included in the results; default is False. :returns: A list of :class:`~swift.common.utils.ShardRangeList` each containing a sequence of neighbouring shard ranges that may be compacted; the final shard range in the list is the acceptor @@ -196,10 +203,6 @@ def find_compactible_shard_sequences(broker, # First cut is simple: assume root container shard usage stats are good # enough to make decision; only merge with upper neighbour so that # upper bounds never change (shard names include upper bound). - # TODO: object counts may well not be the appropriate metric for - # deciding to shrink because a shard with low object_count may have a - # large number of deleted object rows that will need to be merged with - # a neighbour. We may need to expose row count as well as object count. shard_ranges = broker.get_shard_ranges() own_shard_range = broker.get_own_shard_range() @@ -216,7 +219,7 @@ def find_compactible_shard_sequences(broker, sequence[-1], shrink_threshold, merge_size, states=(ShardRange.ACTIVE, ShardRange.SHRINKING)) or 0 < max_shrinking < len(sequence) or - sequence.object_count >= merge_size)): + sequence.row_count >= merge_size)): return True return False @@ -250,7 +253,7 @@ def find_compactible_shard_sequences(broker, if shard_range.state == ShardRange.SHRINKING: # already shrinking: add to sequence unconditionally sequence.append(shard_range) - elif (sequence.object_count + shard_range.object_count + elif (sequence.row_count + shard_range.row_count <= merge_size): # add to sequence: could be a donor or acceptor sequence.append(shard_range) @@ -1825,7 +1828,18 @@ class ContainerSharder(ContainerReplicator): def _update_root_container(self, broker): own_shard_range = broker.get_own_shard_range(no_default=True) - if not own_shard_range or own_shard_range.reported: + if not own_shard_range: + return + + # do a reclaim *now* in order to get best estimate of tombstone count + # that is consistent with the current object_count + reclaimer = self._reclaim(broker) + tombstones = reclaimer.get_tombstone_count() + self.logger.debug('tombstones in %s = %d', + quote(broker.path), tombstones) + own_shard_range.update_tombstones(tombstones) + + if own_shard_range.reported: return # persist the reported shard metadata diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 3b46cf8d5a..0b68d31935 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -1682,8 +1682,13 @@ class TestContainerSharding(BaseTestContainerSharding): orig_range_data, range_data, excludes=['meta_timestamp', 'state_timestamp']) - # ...until the sharders run and update root - self.run_sharders(orig_shard_ranges[0]) + # ...until the sharders run and update root; reclaim tombstones so + # that the shard is shrinkable + shard_0_part = self.get_part_and_node_numbers( + orig_shard_ranges[0])[0] + for conf_index in self.configs['container-sharder'].keys(): + self.run_custom_sharder(conf_index, {'reclaim_age': 0}, + override_partitions=[shard_0_part]) exp_obj_count = len(second_shard_objects) + 1 self.assert_container_object_count(exp_obj_count) self.assert_container_listing([alpha] + second_shard_objects) @@ -1748,15 +1753,37 @@ class TestContainerSharding(BaseTestContainerSharding): self.assert_container_listing([alpha]) - # runs sharders so second range shrinks away, requires up to 3 - # cycles - self.sharders.once() # shard updates root stats + # run sharders: second range should not shrink away yet because it + # has tombstones + self.sharders.once() # second shard updates root stats self.assert_container_listing([alpha]) self.sharders.once() # root finds shrinkable shard self.assert_container_listing([alpha]) self.sharders.once() # shards shrink themselves self.assert_container_listing([alpha]) + # the acceptor shard is intact... + shard_nodes_data = self.direct_get_container_shard_ranges( + orig_shard_ranges[1].account, orig_shard_ranges[1].container) + obj_count, bytes_used = check_shard_nodes_data(shard_nodes_data) + self.assertEqual(1, obj_count) + + # run sharders to reclaim tombstones so that the second shard is + # shrinkable + shard_1_part = self.get_part_and_node_numbers( + orig_shard_ranges[1])[0] + for conf_index in self.configs['container-sharder'].keys(): + self.run_custom_sharder(conf_index, {'reclaim_age': 0}, + override_partitions=[shard_1_part]) + self.assert_container_listing([alpha]) + + # run sharders so second range shrinks away, requires up to 2 + # cycles + self.sharders.once() # root finds shrinkable shard + self.assert_container_listing([alpha]) + self.sharders.once() # shards shrink themselves + self.assert_container_listing([alpha]) + # the second shard range has sharded and is empty shard_nodes_data = self.direct_get_container_shard_ranges( orig_shard_ranges[1].account, orig_shard_ranges[1].container) @@ -2215,10 +2242,15 @@ class TestContainerSharding(BaseTestContainerSharding): self.assert_container_listing([]) self.assert_container_post_ok('has objects') - # run sharder on shard containers to update root stats + # run sharder on shard containers to update root stats; reclaim + # the tombstones so that the shards appear to be shrinkable shard_ranges = self.get_container_shard_ranges() self.assertLengthEqual(shard_ranges, 2) - self.run_sharders(shard_ranges) + shard_partitions = [self.get_part_and_node_numbers(sr)[0] + for sr in shard_ranges] + for conf_index in self.configs['container-sharder'].keys(): + self.run_custom_sharder(conf_index, {'reclaim_age': 0}, + override_partitions=shard_partitions) self.assert_container_object_count(0) # First, test a misplaced object moving from one shard to another. @@ -2349,8 +2381,12 @@ class TestContainerSharding(BaseTestContainerSharding): self.assert_container_listing(shard_1_objects) self.assert_container_post_ok('has objects') - # run sharder on first shard container to update root stats - self.run_sharders(shard_ranges[0]) + # run sharder on first shard container to update root stats; reclaim + # the tombstones so that the shard appears to be shrinkable + shard_0_part = self.get_part_and_node_numbers(shard_ranges[0])[0] + for conf_index in self.configs['container-sharder'].keys(): + self.run_custom_sharder(conf_index, {'reclaim_age': 0}, + override_partitions=[shard_0_part]) self.assert_container_object_count(len(shard_1_objects)) # First, test a misplaced object moving from one shard to another. @@ -2384,10 +2420,13 @@ class TestContainerSharding(BaseTestContainerSharding): # Now we have just one active shard, test a misplaced object moving # from that shard to the root. - # delete most objects from second shard range and run sharder on root - # to discover second shrink candidate + # delete most objects from second shard range, reclaim the tombstones, + # and run sharder on root to discover second shrink candidate self.delete_objects(shard_1_objects) - self.run_sharders(shard_ranges[1]) + shard_1_part = self.get_part_and_node_numbers(shard_ranges[1])[0] + for conf_index in self.configs['container-sharder'].keys(): + self.run_custom_sharder(conf_index, {'reclaim_age': 0}, + override_partitions=[shard_1_part]) self.sharders.once(additional_args='--partitions=%s' % self.brain.part) # then run sharder on the shard node to shrink it to root - note this # moves alpha to the root db @@ -2457,7 +2496,10 @@ class TestContainerSharding(BaseTestContainerSharding): self.assert_container_post_ok('has objects') # run sharder on first shard container to update root stats - self.run_sharders(shard_ranges[0]) + shard_0_part = self.get_part_and_node_numbers(shard_ranges[0])[0] + for conf_index in self.configs['container-sharder'].keys(): + self.run_custom_sharder(conf_index, {'reclaim_age': 0}, + override_partitions=[shard_0_part]) self.assert_container_object_count(len(shard_1_objects)) # First, test a misplaced object moving from one shard to another. diff --git a/test/unit/account/test_backend.py b/test/unit/account/test_backend.py index a284bc246e..8d6d6a954d 100644 --- a/test/unit/account/test_backend.py +++ b/test/unit/account/test_backend.py @@ -36,7 +36,7 @@ import six from swift.account.backend import AccountBroker from swift.common.utils import Timestamp from test.unit import patch_policies, with_tempdir, make_timestamp_iter -from swift.common.db import DatabaseConnectionError +from swift.common.db import DatabaseConnectionError, TombstoneReclaimer from swift.common.request_helpers import get_reserved_name from swift.common.storage_policy import StoragePolicy, POLICIES from swift.common.utils import md5 @@ -218,15 +218,17 @@ class TestAccountBroker(unittest.TestCase): self.assertEqual(count_reclaimable(conn, reclaim_age), num_of_containers / 4) - orig__reclaim = broker._reclaim trace = [] - def tracing_reclaim(conn, age_timestamp, marker): - trace.append((age_timestamp, marker, - count_reclaimable(conn, age_timestamp))) - return orig__reclaim(conn, age_timestamp, marker) + class TracingReclaimer(TombstoneReclaimer): + def _reclaim(self, conn): + trace.append( + (self.age_timestamp, self.marker, + count_reclaimable(conn, self.age_timestamp))) + return super(TracingReclaimer, self)._reclaim(conn) - with mock.patch.object(broker, '_reclaim', new=tracing_reclaim), \ + with mock.patch( + 'swift.common.db.TombstoneReclaimer', TracingReclaimer), \ mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10): broker.reclaim(reclaim_age, reclaim_age) with broker.get() as conn: diff --git a/test/unit/cli/test_manage_shard_ranges.py b/test/unit/cli/test_manage_shard_ranges.py index e1a9be9da7..de3e010532 100644 --- a/test/unit/cli/test_manage_shard_ranges.py +++ b/test/unit/cli/test_manage_shard_ranges.py @@ -429,6 +429,7 @@ class TestManageShardRanges(unittest.TestCase): ' "state": "sharding",', ' "state_timestamp": "%s",' % now.internal, ' "timestamp": "%s",' % now.internal, + ' "tombstones": -1,', ' "upper": ""', '}', 'db_state = sharding', @@ -472,6 +473,7 @@ class TestManageShardRanges(unittest.TestCase): ' "state": "sharding",', ' "state_timestamp": "%s",' % now.internal, ' "timestamp": "%s",' % now.internal, + ' "tombstones": -1,', ' "upper": ""', '}', 'db_state = sharded', @@ -861,12 +863,37 @@ class TestManageShardRanges(unittest.TestCase): broker = self._make_broker() shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') for i, sr in enumerate(shard_ranges): + sr.tombstones = 999 if i not in small_ranges: sr.object_count = 100001 sr.update_state(ShardRange.ACTIVE) broker.merge_shard_ranges(shard_ranges) self._move_broker_to_sharded_state(broker) + expected_base = [ + 'Donor shard range(s) with total of 2018 rows:', + " '.shards_a", + " objects: 10, tombstones: 999, lower: 'obj29'", + " state: active, upper: 'obj39'", + " '.shards_a", + " objects: 10, tombstones: 999, lower: 'obj39'", + " state: active, upper: 'obj49'", + 'can be compacted into acceptor shard range:', + " '.shards_a", + " objects: 100001, tombstones: 999, lower: 'obj49'", + " state: active, upper: 'obj59'", + 'Donor shard range(s) with total of 1009 rows:', + " '.shards_a", + " objects: 10, tombstones: 999, lower: 'obj69'", + " state: active, upper: 'obj79'", + 'can be compacted into acceptor shard range:', + " '.shards_a", + " objects: 100001, tombstones: 999, lower: 'obj79'", + " state: active, upper: 'obj89'", + 'Once applied to the broker these changes will result in ' + 'shard range compaction the next time the sharder runs.', + ] + def do_compact(user_input, exit_code): out = StringIO() err = StringIO() @@ -880,29 +907,7 @@ class TestManageShardRanges(unittest.TestCase): err_lines = err.getvalue().split('\n') self.assert_starts_with(err_lines[0], 'Loaded db broker for ') out_lines = out.getvalue().split('\n') - expected = [ - 'Donor shard range(s) with total of 20 objects:', - " '.shards_a", - " objects: 10 lower: 'obj29'", - " state: active upper: 'obj39'", - " '.shards_a", - " objects: 10 lower: 'obj39'", - " state: active upper: 'obj49'", - 'can be compacted into acceptor shard range:', - " '.shards_a", - " objects: 100001 lower: 'obj49'", - " state: active upper: 'obj59'", - 'Donor shard range(s) with total of 10 objects:', - " '.shards_a", - " objects: 10 lower: 'obj69'", - " state: active upper: 'obj79'", - 'can be compacted into acceptor shard range:', - " '.shards_a", - " objects: 100001 lower: 'obj79'", - " state: active upper: 'obj89'", - 'Once applied to the broker these changes will result in ' - 'shard range compaction the next time the sharder runs.', - ] + expected = list(expected_base) if user_input == 'yes': expected.extend([ 'Updated 2 shard sequences for compaction.', @@ -1334,14 +1339,12 @@ class TestManageShardRanges(unittest.TestCase): ['No shards identified for compaction.'], out_lines[:1]) - def test_compact_shrink_threshold(self): + def _do_test_compact_shrink_threshold(self, broker, shard_ranges): # verify option to set the shrink threshold for compaction; - broker = self._make_broker() - shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') for i, sr in enumerate(shard_ranges): sr.update_state(ShardRange.ACTIVE) # (n-2)th shard range has one extra object - shard_ranges[-2].object_count = 11 + shard_ranges[-2].object_count = shard_ranges[-2].object_count + 1 broker.merge_shard_ranges(shard_ranges) self._move_broker_to_sharded_state(broker) # with threshold set to 10 no shard ranges can be shrunk @@ -1384,6 +1387,19 @@ class TestManageShardRanges(unittest.TestCase): self.assertEqual([ShardRange.SHRINKING] * 8 + [ShardRange.ACTIVE] * 2, [sr.state for sr in updated_ranges]) + def test_compact_shrink_threshold(self): + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + self._do_test_compact_shrink_threshold(broker, shard_ranges) + + def test_compact_shrink_threshold_with_tombstones(self): + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + sr.object_count = sr.object_count - i + sr.tombstones = i + self._do_test_compact_shrink_threshold(broker, shard_ranges) + def test_repair_not_root(self): broker = self._make_broker() shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') diff --git a/test/unit/common/test_db.py b/test/unit/common/test_db.py index 8d1531dcce..b7b4aa114b 100644 --- a/test/unit/common/test_db.py +++ b/test/unit/common/test_db.py @@ -41,7 +41,7 @@ from swift.common.constraints import \ MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE from swift.common.db import chexor, dict_factory, get_db_connection, \ DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \ - GreenDBConnection, PICKLE_PROTOCOL, zero_like + GreenDBConnection, PICKLE_PROTOCOL, zero_like, TombstoneReclaimer from swift.common.utils import normalize_timestamp, mkdirs, Timestamp from swift.common.exceptions import LockTimeout from swift.common.swob import HTTPException @@ -1093,6 +1093,7 @@ class TestDatabaseBroker(unittest.TestCase): broker = DatabaseBroker(':memory:', account='a') broker.db_type = 'test' broker.db_contains_type = 'test' + broker.db_reclaim_timestamp = 'created_at' broker_creation = normalize_timestamp(1) broker_uuid = str(uuid4()) broker_metadata = metadata and json.dumps( @@ -1183,7 +1184,7 @@ class TestDatabaseBroker(unittest.TestCase): return broker # only testing _reclaim_metadata here - @patch.object(DatabaseBroker, '_reclaim', return_value='') + @patch.object(TombstoneReclaimer, 'reclaim') def test_metadata(self, mock_reclaim): # Initializes a good broker for us broker = self.get_replication_info_tester(metadata=True) @@ -1569,7 +1570,7 @@ class TestDatabaseBroker(unittest.TestCase): self.assertFalse(pending) -class TestTombstoneReclaim(unittest.TestCase): +class TestTombstoneReclaimer(unittest.TestCase): def _make_object(self, broker, obj_name, ts, deleted): if deleted: broker.delete_test(obj_name, ts.internal) @@ -1586,29 +1587,32 @@ class TestTombstoneReclaim(unittest.TestCase): with broker.get() as conn: return self._count_reclaimable(conn, reclaim_age) - def _setup_reclaimable_active(self): + def _setup_tombstones(self, reverse_names=True): broker = ExampleBroker(':memory:', account='test_account', container='test_container') broker.initialize(Timestamp('1').internal, 0) now = time.time() top_of_the_minute = now - (now % 60) - # namespace: + # namespace if reverse: + # a-* has 70 'active' tombstones followed by 70 reclaimable + # b-* has 70 'active' tombstones followed by 70 reclaimable + # else: # a-* has 70 reclaimable followed by 70 'active' tombstones # b-* has 70 reclaimable followed by 70 'active' tombstones for i in range(0, 560, 4): - self._make_object(broker, 'a_%3d' % (560 - i), - Timestamp(top_of_the_minute - (i * 60)), - True) - self._make_object(broker, 'a_%3d' % (559 - i), - Timestamp(top_of_the_minute - ((i + 1) * 60)), - False) - self._make_object(broker, 'b_%3d' % (560 - i), - Timestamp(top_of_the_minute - ((i + 2) * 60)), - True) - self._make_object(broker, 'b_%3d' % (559 - i), - Timestamp(top_of_the_minute - ((i + 3) * 60)), - False) + self._make_object( + broker, 'a_%3d' % (560 - i if reverse_names else i), + Timestamp(top_of_the_minute - (i * 60)), True) + self._make_object( + broker, 'a_%3d' % (559 - i if reverse_names else i + 1), + Timestamp(top_of_the_minute - ((i + 1) * 60)), False) + self._make_object( + broker, 'b_%3d' % (560 - i if reverse_names else i), + Timestamp(top_of_the_minute - ((i + 2) * 60)), True) + self._make_object( + broker, 'b_%3d' % (559 - i if reverse_names else i + 1), + Timestamp(top_of_the_minute - ((i + 3) * 60)), False) broker._commit_puts() # divide the set of timestamps exactly in half for reclaim @@ -1635,11 +1639,12 @@ class TestTombstoneReclaim(unittest.TestCase): yield reclaimable def test_batched_reclaim_several_small_batches(self): - broker, totm, reclaim_age = self._setup_reclaimable_active() + broker, totm, reclaim_age = self._setup_tombstones() with self._mock_broker_get(broker, reclaim_age) as reclaimable: with patch('swift.common.db.RECLAIM_PAGE_SIZE', 50): - broker.reclaim(reclaim_age, reclaim_age) + reclaimer = TombstoneReclaimer(broker, reclaim_age) + reclaimer.reclaim() expected_reclaimable = [140, # 0 rows fetched 90, # 50 rows fetched, 50 reclaimed @@ -1652,11 +1657,12 @@ class TestTombstoneReclaim(unittest.TestCase): self.assertEqual(0, self._get_reclaimable(broker, reclaim_age)) def test_batched_reclaim_exactly_two_batches(self): - broker, totm, reclaim_age = self._setup_reclaimable_active() + broker, totm, reclaim_age = self._setup_tombstones() with self._mock_broker_get(broker, reclaim_age) as reclaimable: with patch('swift.common.db.RECLAIM_PAGE_SIZE', 140): - broker.reclaim(reclaim_age, reclaim_age) + reclaimer = TombstoneReclaimer(broker, reclaim_age) + reclaimer.reclaim() expected_reclaimable = [140, # 0 rows fetched 70, # 140 rows fetched, 70 reclaimed @@ -1665,16 +1671,57 @@ class TestTombstoneReclaim(unittest.TestCase): self.assertEqual(0, self._get_reclaimable(broker, reclaim_age)) def test_batched_reclaim_one_large_batch(self): - broker, totm, reclaim_age = self._setup_reclaimable_active() + broker, totm, reclaim_age = self._setup_tombstones() with self._mock_broker_get(broker, reclaim_age) as reclaimable: with patch('swift.common.db.RECLAIM_PAGE_SIZE', 1000): - broker.reclaim(reclaim_age, reclaim_age) + reclaimer = TombstoneReclaimer(broker, reclaim_age) + reclaimer.reclaim() expected_reclaimable = [140] # 0 rows fetched self.assertEqual(expected_reclaimable, reclaimable) self.assertEqual(0, self._get_reclaimable(broker, reclaim_age)) + def test_reclaim_get_tombstone_count(self): + broker, totm, reclaim_age = self._setup_tombstones(reverse_names=False) + with patch('swift.common.db.RECLAIM_PAGE_SIZE', 122): + reclaimer = TombstoneReclaimer(broker, reclaim_age) + reclaimer.reclaim() + self.assertEqual(0, self._get_reclaimable(broker, reclaim_age)) + tombstones = self._get_reclaimable(broker, totm + 1) + self.assertEqual(140, tombstones) + # in this scenario the reclaim phase finds the remaining tombstone + # count (140) + self.assertEqual(140, reclaimer.remaining_tombstones) + self.assertEqual(140, reclaimer.get_tombstone_count()) + + def test_reclaim_get_tombstone_count_with_leftover(self): + broker, totm, reclaim_age = self._setup_tombstones() + with patch('swift.common.db.RECLAIM_PAGE_SIZE', 122): + reclaimer = TombstoneReclaimer(broker, reclaim_age) + reclaimer.reclaim() + + self.assertEqual(0, self._get_reclaimable(broker, reclaim_age)) + tombstones = self._get_reclaimable(broker, totm + 1) + self.assertEqual(140, tombstones) + # in this scenario the reclaim phase finds a subset (104) of all + # tombstones (140) + self.assertEqual(104, reclaimer.remaining_tombstones) + # get_tombstone_count finds the rest + actual = reclaimer.get_tombstone_count() + self.assertEqual(140, actual) + + def test_get_tombstone_count_with_leftover(self): + # verify that a call to get_tombstone_count() will invoke a reclaim if + # reclaim not already invoked + broker, totm, reclaim_age = self._setup_tombstones() + with patch('swift.common.db.RECLAIM_PAGE_SIZE', 122): + reclaimer = TombstoneReclaimer(broker, reclaim_age) + actual = reclaimer.get_tombstone_count() + + self.assertEqual(0, self._get_reclaimable(broker, reclaim_age)) + self.assertEqual(140, actual) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 2e1938f7fa..6dad72299d 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -7846,7 +7846,7 @@ class TestShardRange(unittest.TestCase): meta_timestamp=ts_1.internal, deleted=0, state=utils.ShardRange.FOUND, state_timestamp=ts_1.internal, epoch=None, - reported=0) + reported=0, tombstones=-1) assert_initialisation_ok(dict(empty_run, name='a/c', timestamp=ts_1), expect) assert_initialisation_ok(dict(name='a/c', timestamp=ts_1), expect) @@ -7856,17 +7856,18 @@ class TestShardRange(unittest.TestCase): meta_timestamp=ts_2, deleted=0, state=utils.ShardRange.CREATED, state_timestamp=ts_3.internal, epoch=ts_4, - reported=0) + reported=0, tombstones=11) expect.update({'lower': 'l', 'upper': 'u', 'object_count': 2, 'bytes_used': 10, 'meta_timestamp': ts_2.internal, 'state': utils.ShardRange.CREATED, 'state_timestamp': ts_3.internal, 'epoch': ts_4, - 'reported': 0}) + 'reported': 0, 'tombstones': 11}) assert_initialisation_ok(good_run.copy(), expect) - # obj count and bytes used as int strings + # obj count, tombstones and bytes used as int strings good_str_run = good_run.copy() - good_str_run.update({'object_count': '2', 'bytes_used': '10'}) + good_str_run.update({'object_count': '2', 'bytes_used': '10', + 'tombstones': '11'}) assert_initialisation_ok(good_str_run, expect) good_no_meta = good_run.copy() @@ -7922,7 +7923,7 @@ class TestShardRange(unittest.TestCase): 'upper': upper, 'object_count': 10, 'bytes_used': 100, 'meta_timestamp': ts_2.internal, 'deleted': 0, 'state': utils.ShardRange.FOUND, 'state_timestamp': ts_3.internal, - 'epoch': ts_4, 'reported': 0} + 'epoch': ts_4, 'reported': 0, 'tombstones': -1} self.assertEqual(expected, sr_dict) self.assertIsInstance(sr_dict['lower'], six.string_types) self.assertIsInstance(sr_dict['upper'], six.string_types) @@ -7937,9 +7938,9 @@ class TestShardRange(unittest.TestCase): for key in sr_dict: bad_dict = dict(sr_dict) bad_dict.pop(key) - if key == 'reported': - # This was added after the fact, and we need to be able to eat - # data from old servers + if key in ('reported', 'tombstones'): + # These were added after the fact, and we need to be able to + # eat data from old servers utils.ShardRange.from_dict(bad_dict) utils.ShardRange(**bad_dict) continue @@ -8053,6 +8054,62 @@ class TestShardRange(unittest.TestCase): check_bad_args('bad', 10) check_bad_args(10, 'bad') + def test_update_tombstones(self): + ts_1 = next(self.ts_iter) + sr = utils.ShardRange('a/test', ts_1, 'l', 'u', 0, 0, None) + self.assertEqual(-1, sr.tombstones) + self.assertFalse(sr.reported) + + with mock_timestamp_now(next(self.ts_iter)) as now: + sr.update_tombstones(1) + self.assertEqual(1, sr.tombstones) + self.assertEqual(now, sr.meta_timestamp) + self.assertFalse(sr.reported) + + sr.reported = True + with mock_timestamp_now(next(self.ts_iter)) as now: + sr.update_tombstones(3, None) + self.assertEqual(3, sr.tombstones) + self.assertEqual(now, sr.meta_timestamp) + self.assertFalse(sr.reported) + + sr.reported = True + ts_2 = next(self.ts_iter) + sr.update_tombstones(5, ts_2) + self.assertEqual(5, sr.tombstones) + self.assertEqual(ts_2, sr.meta_timestamp) + self.assertFalse(sr.reported) + + # no change in value -> no change in reported + sr.reported = True + ts_3 = next(self.ts_iter) + sr.update_tombstones(5, ts_3) + self.assertEqual(5, sr.tombstones) + self.assertEqual(ts_3, sr.meta_timestamp) + self.assertTrue(sr.reported) + + sr.update_meta('11', '12') + self.assertEqual(11, sr.object_count) + self.assertEqual(12, sr.bytes_used) + + def check_bad_args(*args): + with self.assertRaises(ValueError): + sr.update_tombstones(*args) + check_bad_args('bad') + check_bad_args(10, 'bad') + + def test_row_count(self): + ts_1 = next(self.ts_iter) + sr = utils.ShardRange('a/test', ts_1, 'l', 'u', 0, 0, None) + self.assertEqual(0, sr.row_count) + + sr.update_meta(11, 123) + self.assertEqual(11, sr.row_count) + sr.update_tombstones(13) + self.assertEqual(24, sr.row_count) + sr.update_meta(0, 0) + self.assertEqual(13, sr.row_count) + def test_state_timestamp_setter(self): ts_1 = next(self.ts_iter) sr = utils.ShardRange('a/test', ts_1, 'l', 'u', 0, 0, None) @@ -8662,9 +8719,9 @@ class TestShardRangeList(unittest.TestCase): self.ts_iter = make_timestamp_iter() self.shard_ranges = [ utils.ShardRange('a/b', self.t1, 'a', 'b', - object_count=2, bytes_used=22), + object_count=2, bytes_used=22, tombstones=222), utils.ShardRange('b/c', self.t2, 'b', 'c', - object_count=4, bytes_used=44), + object_count=4, bytes_used=44, tombstones=444), utils.ShardRange('c/y', self.t1, 'c', 'y', object_count=6, bytes_used=66), ] @@ -8676,6 +8733,7 @@ class TestShardRangeList(unittest.TestCase): self.assertEqual(utils.ShardRange.MIN, srl.upper) self.assertEqual(0, srl.object_count) self.assertEqual(0, srl.bytes_used) + self.assertEqual(0, srl.row_count) def test_init_with_list(self): srl = ShardRangeList(self.shard_ranges[:2]) @@ -8684,6 +8742,7 @@ class TestShardRangeList(unittest.TestCase): self.assertEqual('c', srl.upper) self.assertEqual(6, srl.object_count) self.assertEqual(66, srl.bytes_used) + self.assertEqual(672, srl.row_count) srl.append(self.shard_ranges[2]) self.assertEqual(3, len(srl)) @@ -8691,6 +8750,8 @@ class TestShardRangeList(unittest.TestCase): self.assertEqual('y', srl.upper) self.assertEqual(12, srl.object_count) self.assertEqual(132, srl.bytes_used) + self.assertEqual(-1, self.shard_ranges[2].tombstones) # sanity check + self.assertEqual(678, srl.row_count) # NB: tombstones=-1 not counted def test_pop(self): srl = ShardRangeList(self.shard_ranges[:2]) @@ -8700,6 +8761,7 @@ class TestShardRangeList(unittest.TestCase): self.assertEqual('b', srl.upper) self.assertEqual(2, srl.object_count) self.assertEqual(22, srl.bytes_used) + self.assertEqual(224, srl.row_count) def test_slice(self): srl = ShardRangeList(self.shard_ranges) @@ -8710,6 +8772,7 @@ class TestShardRangeList(unittest.TestCase): self.assertEqual('b', sublist.upper) self.assertEqual(2, sublist.object_count) self.assertEqual(22, sublist.bytes_used) + self.assertEqual(224, sublist.row_count) sublist = srl[1:] self.assertIsInstance(sublist, ShardRangeList) @@ -8718,6 +8781,7 @@ class TestShardRangeList(unittest.TestCase): self.assertEqual('y', sublist.upper) self.assertEqual(10, sublist.object_count) self.assertEqual(110, sublist.bytes_used) + self.assertEqual(454, sublist.row_count) def test_includes(self): srl = ShardRangeList(self.shard_ranges) diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py index b4e75905cd..baa5a07874 100644 --- a/test/unit/container/test_backend.py +++ b/test/unit/container/test_backend.py @@ -36,7 +36,8 @@ from swift.common.exceptions import LockTimeout from swift.container.backend import ContainerBroker, \ update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \ COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES -from swift.common.db import DatabaseAlreadyExists, GreenDBConnection +from swift.common.db import DatabaseAlreadyExists, GreenDBConnection, \ + TombstoneReclaimer from swift.common.request_helpers import get_reserved_name from swift.common.utils import Timestamp, encode_timestamps, hash_path, \ ShardRange, make_db_file_path, md5, ShardRangeList @@ -715,15 +716,17 @@ class TestContainerBroker(unittest.TestCase): self.assertEqual(count_reclaimable(conn, reclaim_age), num_of_objects / 4) - orig__reclaim = broker._reclaim trace = [] - def tracing_reclaim(conn, age_timestamp, marker): - trace.append((age_timestamp, marker, - count_reclaimable(conn, age_timestamp))) - return orig__reclaim(conn, age_timestamp, marker) + class TracingReclaimer(TombstoneReclaimer): + def _reclaim(self, conn): + trace.append( + (self.age_timestamp, self.marker, + count_reclaimable(conn, self.age_timestamp))) + return super(TracingReclaimer, self)._reclaim(conn) - with mock.patch.object(broker, '_reclaim', new=tracing_reclaim), \ + with mock.patch( + 'swift.common.db.TombstoneReclaimer', TracingReclaimer), \ mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10): broker.reclaim(reclaim_age, reclaim_age) @@ -856,7 +859,8 @@ class TestContainerBroker(unittest.TestCase): TestContainerBrokerBeforeXSync, TestContainerBrokerBeforeSPI, TestContainerBrokerBeforeShardRanges, - TestContainerBrokerBeforeShardRangeReportedColumn): + TestContainerBrokerBeforeShardRangeReportedColumn, + TestContainerBrokerBeforeShardRangeTombstonesColumn): self.assertEqual(info['status_changed_at'], '0') else: self.assertEqual(info['status_changed_at'], @@ -2210,7 +2214,8 @@ class TestContainerBroker(unittest.TestCase): TestContainerBrokerBeforeXSync, TestContainerBrokerBeforeSPI, TestContainerBrokerBeforeShardRanges, - TestContainerBrokerBeforeShardRangeReportedColumn): + TestContainerBrokerBeforeShardRangeReportedColumn, + TestContainerBrokerBeforeShardRangeTombstonesColumn): self.assertEqual(info['status_changed_at'], '0') else: self.assertEqual(info['status_changed_at'], @@ -3509,7 +3514,8 @@ class TestContainerBroker(unittest.TestCase): TestContainerBrokerBeforeXSync, TestContainerBrokerBeforeSPI, TestContainerBrokerBeforeShardRanges, - TestContainerBrokerBeforeShardRangeReportedColumn): + TestContainerBrokerBeforeShardRangeReportedColumn, + TestContainerBrokerBeforeShardRangeTombstonesColumn): self.assertEqual(info['status_changed_at'], '0') else: self.assertEqual(timestamp.internal, info['status_changed_at']) @@ -4843,7 +4849,7 @@ class TestContainerBroker(unittest.TestCase): @with_tempdir def test_merge_shard_ranges(self, tempdir): - ts = [next(self.ts) for _ in range(14)] + ts = [next(self.ts) for _ in range(16)] db_path = os.path.join( tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db') broker = ContainerBroker( @@ -4947,6 +4953,20 @@ class TestContainerBroker(unittest.TestCase): broker.merge_shard_ranges(ShardRangeList([sr_c_13, sr_b_13])) self._assert_shard_ranges( broker, [sr_b_13, sr_c_13]) + # merge with tombstones but same meta_timestamp + sr_c_13_tombs = ShardRange('a/c_c', ts[13], lower='b', upper='c', + object_count=10, meta_timestamp=ts[13], + tombstones=999) + broker.merge_shard_ranges(sr_c_13_tombs) + self._assert_shard_ranges( + broker, [sr_b_13, sr_c_13]) + # merge with tombstones at newer meta_timestamp + sr_c_13_tombs = ShardRange('a/c_c', ts[13], lower='b', upper='c', + object_count=1, meta_timestamp=ts[14], + tombstones=999) + broker.merge_shard_ranges(sr_c_13_tombs) + self._assert_shard_ranges( + broker, [sr_b_13, sr_c_13_tombs]) @with_tempdir def test_merge_shard_ranges_state(self, tempdir): @@ -5670,7 +5690,7 @@ class TestContainerBrokerBeforeShardRangeReportedColumn( ContainerBrokerMigrationMixin, TestContainerBroker): """ Tests for ContainerBroker against databases created - before the shard_ranges table was added. + before the shard_ranges table reported column was added. """ # *grumble grumble* This should include container_info/policy_stat :-/ expected_db_tables = {'outgoing_sync', 'incoming_sync', 'object', @@ -5699,6 +5719,234 @@ class TestContainerBrokerBeforeShardRangeReportedColumn( conn.execute('''SELECT reported FROM shard_range''') + @with_tempdir + def test_get_shard_ranges_attempts(self, tempdir): + # verify that old broker handles new sql query for shard range rows + db_path = os.path.join(tempdir, 'container.db') + broker = ContainerBroker(db_path, account='a', container='c') + broker.initialize(next(self.ts).internal, 0) + + @contextmanager + def patch_execute(): + with broker.get() as conn: + mock_conn = mock.MagicMock() + mock_execute = mock.MagicMock() + mock_conn.execute = mock_execute + + @contextmanager + def mock_get(): + yield mock_conn + + with mock.patch.object(broker, 'get', mock_get): + yield mock_execute, conn + + with patch_execute() as (mock_execute, conn): + mock_execute.side_effect = conn.execute + broker.get_shard_ranges() + + expected = [ + mock.call('\n SELECT name, timestamp, lower, upper, ' + 'object_count, bytes_used, meta_timestamp, deleted, ' + 'state, state_timestamp, epoch, reported, ' + 'tombstones\n ' + 'FROM shard_range WHERE deleted=0 AND name != ?;\n' + ' ', ['a/c']), + mock.call('\n SELECT name, timestamp, lower, upper, ' + 'object_count, bytes_used, meta_timestamp, deleted, ' + 'state, state_timestamp, epoch, 0 as reported, ' + 'tombstones\n ' + 'FROM shard_range WHERE deleted=0 AND name != ?;\n' + ' ', ['a/c']), + mock.call('\n SELECT name, timestamp, lower, upper, ' + 'object_count, bytes_used, meta_timestamp, deleted, ' + 'state, state_timestamp, epoch, 0 as reported, ' + '-1 as tombstones\n ' + 'FROM shard_range WHERE deleted=0 AND name != ?;\n' + ' ', ['a/c']), + ] + + self.assertEqual(expected, mock_execute.call_args_list, + mock_execute.call_args_list) + + # if unexpectedly the call to execute continues to fail for reported, + # verify that the exception is raised after a retry + with patch_execute() as (mock_execute, conn): + def mock_execute_handler(*args, **kwargs): + if len(mock_execute.call_args_list) < 3: + return conn.execute(*args, **kwargs) + else: + raise sqlite3.OperationalError('no such column: reported') + mock_execute.side_effect = mock_execute_handler + with self.assertRaises(sqlite3.OperationalError): + broker.get_shard_ranges() + self.assertEqual(expected, mock_execute.call_args_list, + mock_execute.call_args_list) + + # if unexpectedly the call to execute continues to fail for tombstones, + # verify that the exception is raised after a retry + with patch_execute() as (mock_execute, conn): + def mock_execute_handler(*args, **kwargs): + if len(mock_execute.call_args_list) < 3: + return conn.execute(*args, **kwargs) + else: + raise sqlite3.OperationalError( + 'no such column: tombstones') + mock_execute.side_effect = mock_execute_handler + with self.assertRaises(sqlite3.OperationalError): + broker.get_shard_ranges() + self.assertEqual(expected, mock_execute.call_args_list, + mock_execute.call_args_list) + + @with_tempdir + def test_merge_shard_ranges_migrates_table(self, tempdir): + # verify that old broker migrates shard range table + db_path = os.path.join(tempdir, 'container.db') + broker = ContainerBroker(db_path, account='a', container='c') + broker.initialize(next(self.ts).internal, 0) + shard_ranges = [ShardRange('.shards_a/c_0', next(self.ts), 'a', 'b'), + ShardRange('.shards_a/c_1', next(self.ts), 'b', 'c')] + + orig_migrate_reported = broker._migrate_add_shard_range_reported + orig_migrate_tombstones = broker._migrate_add_shard_range_tombstones + + with mock.patch.object( + broker, '_migrate_add_shard_range_reported', + side_effect=orig_migrate_reported) as mocked_reported: + with mock.patch.object( + broker, '_migrate_add_shard_range_tombstones', + side_effect=orig_migrate_tombstones) as mocked_tombstones: + broker.merge_shard_ranges(shard_ranges[:1]) + + mocked_reported.assert_called_once_with(mock.ANY) + mocked_tombstones.assert_called_once_with(mock.ANY) + self._assert_shard_ranges(broker, shard_ranges[:1]) + + with mock.patch.object( + broker, '_migrate_add_shard_range_reported', + side_effect=orig_migrate_reported) as mocked_reported: + with mock.patch.object( + broker, '_migrate_add_shard_range_tombstones', + side_effect=orig_migrate_tombstones) as mocked_tombstones: + broker.merge_shard_ranges(shard_ranges[1:]) + + mocked_reported.assert_not_called() + mocked_tombstones.assert_not_called() + self._assert_shard_ranges(broker, shard_ranges) + + @with_tempdir + def test_merge_shard_ranges_fails_to_migrate_table(self, tempdir): + # verify that old broker will raise exception if it unexpectedly fails + # to migrate shard range table + db_path = os.path.join(tempdir, 'container.db') + broker = ContainerBroker(db_path, account='a', container='c') + broker.initialize(next(self.ts).internal, 0) + shard_ranges = [ShardRange('.shards_a/c_0', next(self.ts), 'a', 'b'), + ShardRange('.shards_a/c_1', next(self.ts), 'b', 'c')] + + # unexpected error during migration + with mock.patch.object( + broker, '_migrate_add_shard_range_reported', + side_effect=sqlite3.OperationalError('unexpected')) \ + as mocked_reported: + with self.assertRaises(sqlite3.OperationalError): + broker.merge_shard_ranges(shard_ranges) + + # one failed attempt was made to add reported column + self.assertEqual(1, mocked_reported.call_count) + + # migration silently fails + with mock.patch.object( + broker, '_migrate_add_shard_range_reported') \ + as mocked_reported: + with self.assertRaises(sqlite3.OperationalError): + broker.merge_shard_ranges(shard_ranges) + + # one failed attempt was made to add reported column + self.assertEqual(1, mocked_reported.call_count) + + with mock.patch.object( + broker, '_migrate_add_shard_range_tombstones') \ + as mocked_tombstones: + with self.assertRaises(sqlite3.OperationalError): + broker.merge_shard_ranges(shard_ranges) + + # first migration adds reported column + # one failed attempt was made to add tombstones column + self.assertEqual(1, mocked_tombstones.call_count) + + +def pre_tombstones_create_shard_range_table(self, conn): + """ + Copied from ContainerBroker before the + tombstones column was added; used for testing with + TestContainerBrokerBeforeShardRangeTombstonesColumn. + + Create a shard_range table with no 'tombstones' column. + + :param conn: DB connection object + """ + # Use execute (not executescript) so we get the benefits of our + # GreenDBConnection. Creating a table requires a whole-DB lock; + # *any* in-progress cursor will otherwise trip a "database is locked" + # error. + conn.execute(""" + CREATE TABLE shard_range ( + ROWID INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT, + timestamp TEXT, + lower TEXT, + upper TEXT, + object_count INTEGER DEFAULT 0, + bytes_used INTEGER DEFAULT 0, + meta_timestamp TEXT, + deleted INTEGER DEFAULT 0, + state INTEGER, + state_timestamp TEXT, + epoch TEXT, + reported INTEGER DEFAULT 0 + ); + """) + + conn.execute(""" + CREATE TRIGGER shard_range_update BEFORE UPDATE ON shard_range + BEGIN + SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT'); + END; + """) + + +class TestContainerBrokerBeforeShardRangeTombstonesColumn( + ContainerBrokerMigrationMixin, TestContainerBroker): + """ + Tests for ContainerBroker against databases created + before the shard_ranges table tombstones column was added. + """ + expected_db_tables = {'outgoing_sync', 'incoming_sync', 'object', + 'sqlite_sequence', 'container_stat', 'shard_range'} + + def setUp(self): + super(TestContainerBrokerBeforeShardRangeTombstonesColumn, + self).setUp() + ContainerBroker.create_shard_range_table = \ + pre_tombstones_create_shard_range_table + + broker = ContainerBroker(':memory:', account='a', container='c') + broker.initialize(Timestamp('1').internal, 0) + with self.assertRaises(sqlite3.DatabaseError) as raised, \ + broker.get() as conn: + conn.execute('''SELECT tombstones + FROM shard_range''') + self.assertIn('no such column: tombstones', str(raised.exception)) + + def tearDown(self): + super(TestContainerBrokerBeforeShardRangeTombstonesColumn, + self).tearDown() + broker = ContainerBroker(':memory:', account='a', container='c') + broker.initialize(Timestamp('1').internal, 0) + with broker.get() as conn: + conn.execute('''SELECT tombstones + FROM shard_range''') + class TestUpdateNewItemFromExisting(unittest.TestCase): # TODO: add test scenarios that have swift_bytes in content_type diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 3dfbd27eac..fa77722e5e 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -112,13 +112,13 @@ class BaseTestSharder(unittest.TestCase): return broker def _make_shard_ranges(self, bounds, state=None, object_count=0, - timestamp=Timestamp.now()): + timestamp=Timestamp.now(), **kwargs): if not isinstance(state, (tuple, list)): state = [state] * len(bounds) state_iter = iter(state) return [ShardRange('.shards_a/c_%s' % upper, timestamp, lower, upper, state=next(state_iter), - object_count=object_count) + object_count=object_count, **kwargs) for lower, upper in bounds] def ts_encoded(self): @@ -4628,10 +4628,39 @@ class TestSharder(BaseTestSharder): bytes_used=own_shard_range.bytes_used + 1)] self.check_shard_ranges_sent(broker, expected_sent) + # initialise tombstones + with mock_timestamp_now(next(self.ts_iter)): + own_shard_range = broker.get_own_shard_range() + own_shard_range.update_tombstones(0) + broker.merge_shard_ranges([own_shard_range]) + for state in ShardRange.STATES: with annotate_failure(state): check_only_own_shard_range_sent(state) + def check_tombstones_sent(state): + own_shard_range = broker.get_own_shard_range() + self.assertTrue(own_shard_range.update_state( + state, state_timestamp=next(self.ts_iter))) + broker.merge_shard_ranges([own_shard_range]) + # delete an object, expect to see it reflected in the own shard + # range that is sent + broker.delete_object(str(own_shard_range.object_count), + next(self.ts_iter).internal) + with mock_timestamp_now() as now: + # force own shard range meta updates to be at fixed timestamp + expected_sent = [ + dict(own_shard_range, + meta_timestamp=now.internal, + object_count=own_shard_range.object_count - 1, + bytes_used=own_shard_range.bytes_used - 1, + tombstones=own_shard_range.tombstones + 1)] + self.check_shard_ranges_sent(broker, expected_sent) + + for state in ShardRange.STATES: + with annotate_failure(state): + check_tombstones_sent(state) + def test_update_root_container_already_reported(self): broker = self._make_broker() @@ -4654,6 +4683,12 @@ class TestSharder(BaseTestSharder): sharder._update_root_container(broker) self.assertFalse(mock_conn.requests) + # initialise tombstones + with mock_timestamp_now(next(self.ts_iter)): + own_shard_range = broker.get_own_shard_range() + own_shard_range.update_tombstones(0) + broker.merge_shard_ranges([own_shard_range]) + for state in ShardRange.STATES: with annotate_failure(state): check_already_reported_not_sent(state) @@ -4685,7 +4720,8 @@ class TestSharder(BaseTestSharder): own_shard_range.copy( meta_timestamp=now.internal, object_count=own_shard_range.object_count + 1, - bytes_used=own_shard_range.bytes_used + 1)] + + bytes_used=own_shard_range.bytes_used + 1, + tombstones=0)] + shard_ranges, key=lambda sr: (sr.upper, sr.state, sr.lower)) self.check_shard_ranges_sent( @@ -5360,8 +5396,11 @@ class TestSharder(BaseTestSharder): shard_bounds = (('', 'here'), ('here', 'there'), ('there', '')) size = (DEFAULT_SHARD_SHRINK_POINT * DEFAULT_SHARD_CONTAINER_THRESHOLD / 100) + + # all shard ranges too big to shrink shard_ranges = self._make_shard_ranges( - shard_bounds, state=ShardRange.ACTIVE, object_count=size) + shard_bounds, state=ShardRange.ACTIVE, object_count=size - 1, + tombstones=1) own_sr = broker.get_own_shard_range() own_sr.update_state(ShardRange.SHARDED, Timestamp.now()) broker.merge_shard_ranges(shard_ranges + [own_sr]) @@ -5373,7 +5412,7 @@ class TestSharder(BaseTestSharder): broker.get_shard_ranges()) # one range just below threshold - shard_ranges[0].update_meta(size - 1, 0) + shard_ranges[0].update_meta(size - 2, 0) broker.merge_shard_ranges(shard_ranges[0]) with self._mock_sharder() as sharder: with mock_timestamp_now() as now: @@ -6732,13 +6771,8 @@ class TestSharderFunctions(BaseTestSharder): sequences = find_compactible_shard_sequences(broker, 10, 999, 3, 3) self.assertEqual([shard_ranges[8:]], sequences) - def test_find_compactible_shrink_threshold(self): + def _do_test_find_compactible_shrink_threshold(self, broker, shard_ranges): # verify option to set the shrink threshold for compaction; - broker = self._make_broker() - shard_ranges = self._make_shard_ranges( - (('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'), - ('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')), - state=ShardRange.ACTIVE, object_count=10) # (n-2)th shard range has one extra object shard_ranges[-2].object_count = 11 broker.merge_shard_ranges(shard_ranges) @@ -6750,13 +6784,24 @@ class TestSharderFunctions(BaseTestSharder): sequences = find_compactible_shard_sequences(broker, 11, 999, -1, -1) self.assertEqual([shard_ranges[:9]], sequences) - def test_find_compactible_expansion_limit(self): - # verify option to limit the size of each acceptor after compaction + def test_find_compactible_shrink_threshold(self): broker = self._make_broker() shard_ranges = self._make_shard_ranges( (('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'), ('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')), - state=ShardRange.ACTIVE, object_count=6) + state=ShardRange.ACTIVE, object_count=10) + self._do_test_find_compactible_shrink_threshold(broker, shard_ranges) + + def test_find_compactible_shrink_threshold_with_tombstones(self): + broker = self._make_broker() + shard_ranges = self._make_shard_ranges( + (('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'), + ('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')), + state=ShardRange.ACTIVE, object_count=7, tombstones=3) + self._do_test_find_compactible_shrink_threshold(broker, shard_ranges) + + def _do_test_find_compactible_expansion_limit(self, broker, shard_ranges): + # verify option to limit the size of each acceptor after compaction broker.merge_shard_ranges(shard_ranges) sequences = find_compactible_shard_sequences(broker, 10, 33, -1, -1) self.assertEqual([shard_ranges[:5], shard_ranges[5:]], sequences) @@ -6766,11 +6811,30 @@ class TestSharderFunctions(BaseTestSharder): sequences = find_compactible_shard_sequences(broker, 10, 33, -1, -1) self.assertEqual([shard_ranges[:4], shard_ranges[7:]], sequences) + def test_find_compactible_expansion_limit(self): + # verify option to limit the size of each acceptor after compaction + broker = self._make_broker() + shard_ranges = self._make_shard_ranges( + (('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'), + ('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')), + state=ShardRange.ACTIVE, object_count=6) + self._do_test_find_compactible_expansion_limit(broker, shard_ranges) + + def test_find_compactible_expansion_limit_with_tombstones(self): + # verify option to limit the size of each acceptor after compaction + broker = self._make_broker() + shard_ranges = self._make_shard_ranges( + (('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'), + ('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')), + state=ShardRange.ACTIVE, object_count=1, tombstones=5) + self._do_test_find_compactible_expansion_limit(broker, shard_ranges) + def test_is_sharding_candidate(self): for state in ShardRange.STATES: for object_count in (9, 10, 11): sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '', - state=state, object_count=object_count) + state=state, object_count=object_count, + tombstones=100) # tombstones not considered with annotate_failure('%s %s' % (state, object_count)): if state == ShardRange.ACTIVE and object_count >= 10: self.assertTrue(is_sharding_candidate(sr, 10)) @@ -6783,6 +6847,10 @@ class TestSharderFunctions(BaseTestSharder): sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '', state=state, object_count=9) self.assertTrue(is_shrinking_candidate(sr, 10, 9, ok_states)) + # shard range has 9 rows + sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '', + state=state, object_count=4, tombstones=5) + self.assertTrue(is_shrinking_candidate(sr, 10, 9, ok_states)) do_check_true(ShardRange.ACTIVE, (ShardRange.ACTIVE,)) do_check_true(ShardRange.ACTIVE, @@ -6790,11 +6858,12 @@ class TestSharderFunctions(BaseTestSharder): do_check_true(ShardRange.SHRINKING, (ShardRange.ACTIVE, ShardRange.SHRINKING)) - def do_check_false(state, object_count): + def do_check_false(state, object_count, tombstones): states = (ShardRange.ACTIVE, ShardRange.SHRINKING) # shard range has 10 objects sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '', - state=state, object_count=object_count) + state=state, object_count=object_count, + tombstones=tombstones) self.assertFalse(is_shrinking_candidate(sr, 10, 20)) self.assertFalse(is_shrinking_candidate(sr, 10, 20, states)) self.assertFalse(is_shrinking_candidate(sr, 10, 9)) @@ -6805,7 +6874,13 @@ class TestSharderFunctions(BaseTestSharder): for state in ShardRange.STATES: for object_count in (10, 11): with annotate_failure('%s %s' % (state, object_count)): - do_check_false(state, object_count) + do_check_false(state, object_count, 0) + for tombstones in (10, 11): + with annotate_failure('%s %s' % (state, tombstones)): + do_check_false(state, 0, tombstones) + for tombstones in (5, 6): + with annotate_failure('%s %s' % (state, tombstones)): + do_check_false(state, 5, tombstones) def test_find_and_rank_whole_path_split(self): ts_0 = next(self.ts_iter)