diff --git a/swift/cli/manage_shard_ranges.py b/swift/cli/manage_shard_ranges.py index e482e7f689..c25ffb89a0 100644 --- a/swift/cli/manage_shard_ranges.py +++ b/swift/cli/manage_shard_ranges.py @@ -167,7 +167,14 @@ from six.moves import input from swift.common.utils import Timestamp, get_logger, ShardRange from swift.container.backend import ContainerBroker, UNSHARDED from swift.container.sharder import make_shard_ranges, sharding_enabled, \ - CleavingContext + CleavingContext, process_compactable_shard_sequences, \ + find_compactable_shard_sequences, find_overlapping_ranges, \ + finalize_shrinking + +DEFAULT_ROWS_PER_SHARD = 500000 +DEFAULT_SHRINK_THRESHOLD = 10000 +DEFAULT_MAX_SHRINKING = 1 +DEFAULT_MAX_EXPANDING = -1 def _load_and_validate_shard_data(args): @@ -289,6 +296,7 @@ def db_info(broker, args): print('Metadata:') for k, (v, t) in broker.metadata.items(): print(' %s = %s' % (k, v)) + return 0 def delete_shard_ranges(broker, args): @@ -410,8 +418,76 @@ def enable_sharding(broker, args): return 0 +def compact_shard_ranges(broker, args): + if not broker.is_root_container(): + print('WARNING: Shard containers cannot be compacted.') + print('This command should be used on a root container.') + return 2 + + if not broker.is_sharded(): + print('WARNING: Container is not yet sharded so cannot be compacted.') + return 2 + + shard_ranges = broker.get_shard_ranges() + if find_overlapping_ranges([sr for sr in shard_ranges if + sr.state != ShardRange.SHRINKING]): + print('WARNING: Container has overlapping shard ranges so cannot be ' + 'compacted.') + return 2 + + compactable = find_compactable_shard_sequences(broker, + args.shrink_threshold, + args.expansion_limit, + args.max_shrinking, + args.max_expanding) + if not compactable: + print('No shards identified for compaction.') + return 0 + + for sequence in compactable: + if sequence[-1].state not in (ShardRange.ACTIVE, ShardRange.SHARDED): + print('ERROR: acceptor not in correct state: %s' % sequence[-1], + file=sys.stderr) + return 1 + + if not args.yes: + for sequence in compactable: + acceptor = sequence[-1] + donors = sequence[:-1] + print('Shard %s (object count %d) can expand to accept %d objects ' + 'from:' % + (acceptor, acceptor.object_count, donors.object_count)) + for donor in donors: + print(' shard %s (object count %d)' % + (donor, donor.object_count)) + print('Once applied to the broker these changes will result in shard ' + 'range compaction the next time the sharder runs.') + choice = input('Do you want to apply these changes? [y/N]') + if choice != 'y': + print('No changes applied') + return 0 + + timestamp = Timestamp.now() + acceptor_ranges, shrinking_ranges = process_compactable_shard_sequences( + compactable, timestamp) + finalize_shrinking(broker, acceptor_ranges, shrinking_ranges, timestamp) + print('Updated %s shard sequences for compaction.' % len(compactable)) + print('Run container-replicator to replicate the changes to other ' + 'nodes.') + print('Run container-sharder on all nodes to compact shards.') + return 0 + + +def _positive_int(arg): + val = int(arg) + if val <= 0: + raise argparse.ArgumentTypeError('must be > 0') + return val + + def _add_find_args(parser): - parser.add_argument('rows_per_shard', nargs='?', type=int, default=500000) + parser.add_argument('rows_per_shard', nargs='?', type=int, + default=DEFAULT_ROWS_PER_SHARD) def _add_replace_args(parser): @@ -500,6 +576,50 @@ def _make_parser(): _add_enable_args(enable_parser) enable_parser.set_defaults(func=enable_sharding) _add_replace_args(enable_parser) + + # compact + compact_parser = subparsers.add_parser( + 'compact', + help='Compact shard ranges with less than the shrink-threshold number ' + 'of rows. This command only works on root containers.') + compact_parser.add_argument( + '--yes', '-y', action='store_true', default=False, + help='Apply shard range changes to broker without prompting.') + compact_parser.add_argument('--shrink-threshold', nargs='?', + type=_positive_int, + default=DEFAULT_SHRINK_THRESHOLD, + help='The number of rows below which a shard ' + 'can qualify for shrinking. Defaults to ' + '%d' % DEFAULT_SHRINK_THRESHOLD) + compact_parser.add_argument('--expansion-limit', nargs='?', + type=_positive_int, + default=DEFAULT_ROWS_PER_SHARD, + help='Maximum number of rows for an expanding ' + 'shard to have after compaction has ' + 'completed. Defaults to %d' % + DEFAULT_ROWS_PER_SHARD) + # If just one donor shard is chosen to shrink to an acceptor then the + # expanded acceptor will handle object listings as soon as the donor shard + # has shrunk. If more than one donor shard are chosen to shrink to an + # acceptor then the acceptor may not handle object listings for some donor + # shards that have shrunk until *all* donors have shrunk, resulting in + # temporary gap(s) in object listings where the shrunk donors are missing. + compact_parser.add_argument('--max-shrinking', nargs='?', + type=_positive_int, + default=DEFAULT_MAX_SHRINKING, + help='Maximum number of shards that should be ' + 'shrunk into each expanding shard. ' + 'Defaults to 1. Using values greater ' + 'than 1 may result in temporary gaps in ' + 'object listings until all selected ' + 'shards have shrunk.') + compact_parser.add_argument('--max-expanding', nargs='?', + type=_positive_int, + default=DEFAULT_MAX_EXPANDING, + help='Maximum number of shards that should be ' + 'expanded. Defaults to unlimited.') + compact_parser.set_defaults(func=compact_shard_ranges) + return parser diff --git a/swift/cli/shard-info.py b/swift/cli/shard-info.py index 01223787f7..fdcfdf5d3e 100644 --- a/swift/cli/shard-info.py +++ b/swift/cli/shard-info.py @@ -89,7 +89,7 @@ def print_own_shard_range(node, sr, indent_level): indent = indent_level * TAB range = '%r - %r' % (sr.lower, sr.upper) print('%s(%s) %23s, objs: %3s, bytes: %3s, timestamp: %s (%s), ' - 'modified: %s (%s), %7s: %s (%s), deleted: %s epoch: %s' % + 'modified: %s (%s), %7s: %s (%s), deleted: %s, epoch: %s' % (indent, node[1][0], range, sr.object_count, sr.bytes_used, sr.timestamp.isoformat, sr.timestamp.internal, sr.meta_timestamp.isoformat, sr.meta_timestamp.internal, @@ -108,12 +108,13 @@ def print_shard_range(node, sr, indent_level): indent = indent_level * TAB range = '%r - %r' % (sr.lower, sr.upper) print('%s(%s) %23s, objs: %3s, bytes: %3s, timestamp: %s (%s), ' - 'modified: %s (%s), %7s: %s (%s), deleted: %s %s' % + 'modified: %s (%s), %7s: %s (%s), deleted: %s, epoch: %s %s' % (indent, node[1][0], range, sr.object_count, sr.bytes_used, sr.timestamp.isoformat, sr.timestamp.internal, sr.meta_timestamp.isoformat, sr.meta_timestamp.internal, sr.state_text, sr.state_timestamp.isoformat, - sr.state_timestamp.internal, sr.deleted, sr.name)) + sr.state_timestamp.internal, sr.deleted, + sr.epoch.internal if sr.epoch else None, sr.name)) def print_shard_range_info(node, shard_ranges, indent_level=0): diff --git a/swift/common/utils.py b/swift/common/utils.py index 6a51e7d05c..6daf22b193 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -82,6 +82,7 @@ from six.moves.configparser import (ConfigParser, NoSectionError, from six.moves import range, http_client from six.moves.urllib.parse import quote as _quote, unquote from six.moves.urllib.parse import urlparse +from six.moves import UserList from swift import gettext_ as _ import swift.common.exceptions @@ -5483,6 +5484,105 @@ class ShardRange(object): params['state_timestamp'], params['epoch'], params.get('reported', 0)) + def expand(self, donors): + """ + Expands the bounds as necessary to match the minimum and maximum bounds + of the given donors. + + :param donors: A list of :class:`~swift.common.utils.ShardRange` + :return: True if the bounds have been modified, False otherwise. + """ + modified = False + new_lower = self.lower + new_upper = self.upper + for donor in donors: + new_lower = min(new_lower, donor.lower) + new_upper = max(new_upper, donor.upper) + if self.lower > new_lower or self.upper < new_upper: + self.lower = new_lower + self.upper = new_upper + modified = True + return modified + + +class ShardRangeList(UserList): + """ + This class provides some convenience functions for working with lists of + :class:`~swift.common.utils.ShardRange`. + + This class does not enforce ordering or continuity of the list items: + callers should ensure that items are added in order as appropriate. + """ + def __getitem__(self, index): + # workaround for py3 - not needed for py2.7,py3.8 + result = self.data[index] + return ShardRangeList(result) if type(result) == list else result + + @property + def lower(self): + """ + Returns the lower bound of the first item in the list. Note: this will + only be equal to the lowest bound of all items in the list if the list + contents has been sorted. + + :return: lower bound of first item in the list, or ShardRange.MIN + if the list is empty. + """ + if not self: + # empty list has range MIN->MIN + return ShardRange.MIN + return self[0].lower + + @property + def upper(self): + """ + Returns the upper bound of the first item in the list. Note: this will + only be equal to the uppermost bound of all items in the list if the + list has previously been sorted. + + :return: upper bound of first item in the list, or ShardRange.MIN + if the list is empty. + """ + if not self: + # empty list has range MIN->MIN + return ShardRange.MIN + return self[-1].upper + + @property + def object_count(self): + """ + Returns the total number of objects of all items in the list. + + :return: total object count + """ + return sum(sr.object_count for sr in self) + + @property + def bytes_used(self): + """ + Returns the total number of bytes in all items in the list. + + :return: total bytes used + """ + return sum(sr.bytes_used for sr in self) + + def includes(self, other): + """ + Check if another ShardRange namespace is enclosed between the list's + ``lower`` and ``upper`` properties. Note: the list's ``lower`` and + ``upper`` properties will only equal the outermost bounds of all items + in the list if the list has previously been sorted. + + Note: the list does not need to contain an item matching ``other`` for + this method to return True, although if the list has been sorted and + does contain an item matching ``other`` then the method will return + True. + + :param other: an instance of :class:`~swift.common.utils.ShardRange` + :return: True if other's namespace is enclosed, False otherwise. + """ + return self.lower <= other.lower and self.upper >= other.upper + def find_shard_range(item, ranges): """ diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 41b091f983..2c51d36bac 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -35,7 +35,8 @@ from swift.common.swob import str_to_wsgi from swift.common.utils import get_logger, config_true_value, \ dump_recon_cache, whataremyips, Timestamp, ShardRange, GreenAsyncPile, \ config_float_value, config_positive_int_value, \ - quorum_size, parse_override_options, Everything, config_auto_int_value + quorum_size, parse_override_options, Everything, config_auto_int_value, \ + ShardRangeList from swift.container.backend import ContainerBroker, \ RECORD_TYPE_SHARD, UNSHARDED, SHARDING, SHARDED, COLLAPSED, \ SHARD_UPDATE_STATES @@ -146,6 +147,42 @@ def find_sharding_candidates(broker, threshold, shard_ranges=None): def find_shrinking_candidates(broker, shrink_threshold, merge_size): + # this is only here to preserve a legacy public function signature; + # superseded by find_compactable_shard_sequences + merge_pairs = {} + # restrict search to sequences with one donor + results = find_compactable_shard_sequences(broker, shrink_threshold, + merge_size, 1, -1) + for sequence in results: + # map acceptor -> donor list + merge_pairs[sequence[-1]] = sequence[-2] + return merge_pairs + + +def find_compactable_shard_sequences(broker, + shrink_threshold, + merge_size, + max_shrinking, + max_expanding): + """ + Find sequences of shard ranges that could be compacted into a single + acceptor shard range. + + This function does not modify shard ranges. + + :param broker: A :class:`~swift.container.backend.ContainerBroker`. + :param shrink_threshold: the number of rows below which a shard may be + considered for shrinking into another shard + :param merge_size: the maximum number of rows that an acceptor shard range + should have after other shard ranges have been compacted into it + :param max_shrinking: the maximum number of shard ranges that should be + compacted into each acceptor; -1 implies unlimited. + :param max_expanding: the maximum number of acceptors to be found (i.e. the + maximum number of sequences to be returned); -1 implies unlimited. + :returns: A list of :class:`~swift.common.utils.ShardRangeList` each + containing a sequence of neighbouring shard ranges that may be + compacted; the final shard range in the list is the acceptor + """ # this should only execute on root containers that have sharded; the # goal is to find small shard containers that could be retired by # merging with a neighbour. @@ -158,47 +195,125 @@ def find_shrinking_candidates(broker, shrink_threshold, merge_size): # a neighbour. We may need to expose row count as well as object count. shard_ranges = broker.get_shard_ranges() own_shard_range = broker.get_own_shard_range() - if len(shard_ranges) == 1: - # special case to enable final shard to shrink into root - shard_ranges.append(own_shard_range) - merge_pairs = {} - for donor, acceptor in zip(shard_ranges, shard_ranges[1:]): - if donor in merge_pairs: - # this range may already have been made an acceptor; if so then - # move on. In principle it might be that even after expansion - # this range and its donor(s) could all be merged with the next - # range. In practice it is much easier to reason about a single - # donor merging into a single acceptor. Don't fret - eventually - # all the small ranges will be retired. - continue - if (acceptor.name != own_shard_range.name and - acceptor.state != ShardRange.ACTIVE): - # don't shrink into a range that is not yet ACTIVE - continue - if donor.state not in (ShardRange.ACTIVE, ShardRange.SHRINKING): - # found? created? sharded? don't touch it - continue + def sequence_complete(sequence): + # a sequence is considered complete if any of the following are true: + # - the final shard range has more objects than the shrink_threshold, + # so should not be shrunk (this shard will be the acceptor) + # - the max number of shard ranges to be compacted (max_shrinking) has + # been reached + # - the total number of objects in the sequence has reached the + # merge_size + if (sequence and + (sequence[-1].object_count >= shrink_threshold or + 0 < max_shrinking < len(sequence) or + sequence.object_count >= merge_size)): + return True + return False - proposed_object_count = donor.object_count + acceptor.object_count - if (donor.state == ShardRange.SHRINKING or - (donor.object_count < shrink_threshold and - proposed_object_count < merge_size)): - # include previously identified merge pairs on presumption that - # following shrink procedure is idempotent - merge_pairs[acceptor] = donor - if donor.update_state(ShardRange.SHRINKING): - # Set donor state to shrinking so that next cycle won't use - # it as an acceptor; state_timestamp defines new epoch for - # donor and new timestamp for the expanded acceptor below. - donor.epoch = donor.state_timestamp = Timestamp.now() - if acceptor.lower != donor.lower: - # Update the acceptor container with its expanding state to - # prevent it treating objects cleaved from the donor - # as misplaced. - acceptor.lower = donor.lower - acceptor.timestamp = donor.state_timestamp - return merge_pairs + def find_compactable_sequence(shard_ranges_todo): + compactable_sequence = ShardRangeList() + object_count = 0 + consumed = 0 + for shard_range in shard_ranges_todo: + if (compactable_sequence and + compactable_sequence.upper < shard_range.lower): + # found a gap! break before consuming this range because it + # could become the first in the next sequence + break + consumed += 1 + if (shard_range.name != own_shard_range.name and + shard_range.state not in (ShardRange.ACTIVE, + ShardRange.SHRINKING)): + # found? created? sharded? don't touch it + break + proposed_object_count = object_count + shard_range.object_count + if (shard_range.state == ShardRange.SHRINKING or + proposed_object_count <= merge_size): + compactable_sequence.append(shard_range) + object_count += shard_range.object_count + if shard_range.state == ShardRange.SHRINKING: + continue + if sequence_complete(compactable_sequence): + break + return compactable_sequence, consumed + + compactable_sequences = [] + index = 0 + while ((max_expanding < 0 or + len(compactable_sequences) < max_expanding) and + index < len(shard_ranges)): + sequence, consumed = find_compactable_sequence(shard_ranges[index:]) + index += consumed + if (index == len(shard_ranges) and + not compactable_sequences and + not sequence_complete(sequence) and + sequence.includes(own_shard_range)): + # special case: only one sequence has been found, which encompasses + # the entire namespace, has no more than merge_size records and + # whose shard ranges are all shrinkable; all the shards in the + # sequence can be shrunk to the root, so append own_shard_range to + # the sequence to act as an acceptor; note: only shrink to the root + # when *all* the remaining shard ranges can be simultaneously + # shrunk to the root. + sequence.append(own_shard_range) + compactable_sequences.append(sequence) + elif len(sequence) > 1 and sequence[-1].state == ShardRange.ACTIVE: + compactable_sequences.append(sequence) + # else: this sequence doesn't end with a suitable acceptor shard range + + return compactable_sequences + + +def finalize_shrinking(broker, acceptor_ranges, donor_ranges, timestamp): + """ + Update donor shard ranges to shrinking state and merge donors and acceptors + to broker. + + :param broker: A :class:`~swift.container.backend.ContainerBroker`. + :param acceptor_ranges: A list of :class:`~swift.common.utils.ShardRange` + that are to be acceptors. + :param donor_ranges: A list of :class:`~swift.common.utils.ShardRange` + that are to be donors; these will have their state and timestamp + updated. + :param timestamp: timestamp to use when updating donor state + """ + for donor in donor_ranges: + if donor.update_state(ShardRange.SHRINKING): + # Set donor state to shrinking state_timestamp defines new epoch + donor.epoch = donor.state_timestamp = timestamp + broker.merge_shard_ranges(acceptor_ranges + donor_ranges) + + +def process_compactable_shard_sequences(sequences, timestamp): + """ + Transform the given sequences of shard ranges into a list of acceptors and + a list of shrinking donors. For each given sequence the final ShardRange in + the sequence (the acceptor) is expanded to accommodate the other + ShardRanges in the sequence (the donors). + + :param sequences: A list of :class:`~swift.common.utils.ShardRangeList` + :param timestamp: an instance of :class:`~swift.common.utils.Timestamp` + that is used when updating acceptor range bounds or state + :return: a tuple (acceptor_ranges, shrinking_ranges) + """ + acceptor_ranges = [] + shrinking_ranges = [] + for sequence in sequences: + donors = sequence[:-1] + shrinking_ranges.extend(donors) + # Update the acceptor container with its expanded bounds to prevent it + # treating objects cleaved from the donor as misplaced. + acceptor = sequence[-1] + if acceptor.expand(donors): + # Update the acceptor container with its expanded bounds to prevent + # it treating objects cleaved from the donor as misplaced. + acceptor.timestamp = timestamp + if acceptor.update_state(ShardRange.ACTIVE): + # Ensure acceptor state is ACTIVE (when acceptor is root) + acceptor.state_timestamp = timestamp + acceptor_ranges.append(acceptor) + return acceptor_ranges, shrinking_ranges class CleavingContext(object): @@ -1509,31 +1624,36 @@ class ContainerSharder(ContainerReplicator): quote(broker.path)) return - merge_pairs = find_shrinking_candidates( - broker, self.shrink_size, self.merge_size) - self.logger.debug('Found %s shrinking candidates' % len(merge_pairs)) + compactable_sequences = find_compactable_shard_sequences( + broker, self.shrink_size, self.merge_size, 1, -1) + self.logger.debug('Found %s compactable sequences of length(s) %s' % + (len(compactable_sequences), + [len(s) for s in compactable_sequences])) + timestamp = Timestamp.now() + acceptors, donors = process_compactable_shard_sequences( + compactable_sequences, timestamp) + finalize_shrinking(broker, acceptors, donors, timestamp) own_shard_range = broker.get_own_shard_range() - for acceptor, donor in merge_pairs.items(): - self.logger.debug('shrinking shard range %s into %s in %s' % - (donor, acceptor, broker.db_file)) - broker.merge_shard_ranges([acceptor, donor]) + for sequence in compactable_sequences: + acceptor = sequence[-1] + donors = ShardRangeList(sequence[:-1]) + self.logger.debug( + 'shrinking %d objects from %d shard ranges into %s in %s' % + (donors.object_count, len(donors), acceptor, broker.db_file)) if acceptor.name != own_shard_range.name: self._send_shard_ranges( acceptor.account, acceptor.container, [acceptor]) - acceptor.increment_meta(donor.object_count, donor.bytes_used) - else: - # no need to change namespace or stats - acceptor.update_state(ShardRange.ACTIVE, - state_timestamp=Timestamp.now()) + acceptor.increment_meta(donors.object_count, donors.bytes_used) # Now send a copy of the expanded acceptor, with an updated - # timestamp, to the donor container. This forces the donor to + # timestamp, to each donor container. This forces each donor to # asynchronously cleave its entire contents to the acceptor and # delete itself. The donor will pass its own deleted shard range to # the acceptor when cleaving. Subsequent updates from the donor or # the acceptor will then update the root to have the deleted donor # shard range. - self._send_shard_ranges( - donor.account, donor.container, [donor, acceptor]) + for donor in donors: + self._send_shard_ranges( + donor.account, donor.container, [donor, acceptor]) def _update_root_container(self, broker): own_shard_range = broker.get_own_shard_range(no_default=True) diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index e939cc2a8c..7215b38662 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -2549,28 +2549,79 @@ class TestManagedContainerSharding(BaseTestContainerSharding): self.assert_container_state(self.brain.nodes[2], 'sharded', 2) self.assert_container_listing(obj_names) - # Let's pretend that some actor in the system has determined that all - # the shard ranges should shrink back to root - # TODO: replace this db manipulation if/when manage_shard_ranges can - # manage shrinking... - broker = self.get_broker(self.brain.part, self.brain.nodes[0]) - shard_ranges = broker.get_shard_ranges() - self.assertEqual(2, len(shard_ranges)) - for sr in shard_ranges: - self.assertTrue(sr.update_state(ShardRange.SHRINKING)) - sr.epoch = sr.state_timestamp = Timestamp.now() - own_sr = broker.get_own_shard_range() - own_sr.update_state(ShardRange.ACTIVE, state_timestamp=Timestamp.now()) - broker.merge_shard_ranges(shard_ranges + [own_sr]) + def test_manage_shard_ranges_compact(self): + # verify shard range compaction using swift-manage-shard-ranges + obj_names = self._make_object_names(8) + self.put_objects(obj_names) + client.post_container(self.url, self.admin_token, self.container_name, + headers={'X-Container-Sharding': 'on'}) + # run replicators first time to get sync points set, and get container + # sharded into 4 shards + self.replicators.once() + subprocess.check_output([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, self.brain.nodes[0]), + 'find_and_replace', '2', '--enable'], stderr=subprocess.STDOUT) + self.assert_container_state(self.brain.nodes[0], 'unsharded', 4) + self.replicators.once() + # run sharders twice to cleave all 4 shard ranges + self.sharders_once(additional_args='--partitions=%s' % self.brain.part) + self.sharders_once(additional_args='--partitions=%s' % self.brain.part) + self.assert_container_state(self.brain.nodes[0], 'sharded', 4) + self.assert_container_state(self.brain.nodes[1], 'sharded', 4) + self.assert_container_state(self.brain.nodes[2], 'sharded', 4) + self.assert_container_listing(obj_names) - # replicate and run sharders + # now compact some ranges; use --max-shrinking to allow 2 shrinking + # shards + subprocess.check_output([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, self.brain.nodes[0]), + 'compact', '--max-expanding', '1', '--max-shrinking', '2', + '--yes'], + stderr=subprocess.STDOUT) + shard_ranges = self.assert_container_state( + self.brain.nodes[0], 'sharded', 4) + self.assertEqual([ShardRange.SHRINKING] * 2 + [ShardRange.ACTIVE] * 2, + [sr.state for sr in shard_ranges]) self.replicators.once() self.sharders_once() + # check there's now just 2 remaining shard ranges + shard_ranges = self.assert_container_state( + self.brain.nodes[0], 'sharded', 2) + self.assertEqual([ShardRange.ACTIVE] * 2, + [sr.state for sr in shard_ranges]) + self.assert_container_listing(obj_names, req_hdrs={'X-Newest': 'True'}) + # root container own shard range should still be SHARDED + for i, node in enumerate(self.brain.nodes): + with annotate_failure('node[%d]' % i): + broker = self.get_broker(self.brain.part, self.brain.nodes[0]) + self.assertEqual(ShardRange.SHARDED, + broker.get_own_shard_range().state) + + # now compact the final two shard ranges to the root; use + # --max-shrinking to allow 2 shrinking shards + subprocess.check_output([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, self.brain.nodes[0]), + 'compact', '--yes', '--max-shrinking', '2'], + stderr=subprocess.STDOUT) + shard_ranges = self.assert_container_state( + self.brain.nodes[0], 'sharded', 2) + self.assertEqual([ShardRange.SHRINKING] * 2, + [sr.state for sr in shard_ranges]) + self.replicators.once() + self.sharders_once() self.assert_container_state(self.brain.nodes[0], 'collapsed', 0) - self.assert_container_state(self.brain.nodes[1], 'collapsed', 0) - self.assert_container_state(self.brain.nodes[2], 'collapsed', 0) - self.assert_container_listing(obj_names) + self.assert_container_listing(obj_names, req_hdrs={'X-Newest': 'True'}) + + # root container own shard range should now be ACTIVE + for i, node in enumerate(self.brain.nodes): + with annotate_failure('node[%d]' % i): + broker = self.get_broker(self.brain.part, self.brain.nodes[0]) + self.assertEqual(ShardRange.ACTIVE, + broker.get_own_shard_range().state) def test_manage_shard_ranges_used_poorly(self): obj_names = self._make_object_names(8) diff --git a/test/unit/cli/test_manage_shard_ranges.py b/test/unit/cli/test_manage_shard_ranges.py index 7f0aa8857d..83b9a1e719 100644 --- a/test/unit/cli/test_manage_shard_ranges.py +++ b/test/unit/cli/test_manage_shard_ranges.py @@ -13,6 +13,7 @@ import json import os import unittest + import mock from shutil import rmtree from tempfile import mkdtemp @@ -23,6 +24,7 @@ from swift.cli.manage_shard_ranges import main from swift.common import utils from swift.common.utils import Timestamp, ShardRange from swift.container.backend import ContainerBroker +from swift.container.sharder import make_shard_ranges from test.unit import mock_timestamp_now @@ -32,7 +34,8 @@ class TestManageShardRanges(unittest.TestCase): utils.mkdirs(self.testdir) rmtree(self.testdir) self.shard_data = [ - {'index': 0, 'lower': '', 'upper': 'obj09', 'object_count': 10}, + {'index': 0, 'lower': '', 'upper': 'obj09', + 'object_count': 10}, {'index': 1, 'lower': 'obj09', 'upper': 'obj19', 'object_count': 10}, {'index': 2, 'lower': 'obj19', 'upper': 'obj29', @@ -49,7 +52,8 @@ class TestManageShardRanges(unittest.TestCase): 'object_count': 10}, {'index': 8, 'lower': 'obj79', 'upper': 'obj89', 'object_count': 10}, - {'index': 9, 'lower': 'obj89', 'upper': '', 'object_count': 10}, + {'index': 9, 'lower': 'obj89', 'upper': '', + 'object_count': 10}, ] def tearDown(self): @@ -79,6 +83,16 @@ class TestManageShardRanges(unittest.TestCase): broker.initialize() return broker + def _move_broker_to_sharded_state(self, broker): + epoch = Timestamp.now() + broker.enable_sharding(epoch) + self.assertTrue(broker.set_sharding_state()) + self.assertTrue(broker.set_sharded_state()) + own_sr = broker.get_own_shard_range() + own_sr.update_state(ShardRange.SHARDED, epoch) + broker.merge_shard_ranges([own_sr]) + return epoch + def test_find_shard_ranges(self): db_file = os.path.join(self.testdir, 'hash.db') broker = ContainerBroker(db_file) @@ -380,3 +394,547 @@ class TestManageShardRanges(unittest.TestCase): self.assertEqual(expected, out.getvalue().splitlines()) self.assertEqual(['Loaded db broker for a/c.'], err.getvalue().splitlines()) + + def test_compact_bad_args(self): + broker = self._make_broker() + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + with self.assertRaises(SystemExit): + main([broker.db_file, 'compact', '--shrink-threshold', '0']) + with self.assertRaises(SystemExit): + main([broker.db_file, 'compact', '--expansion-limit', '0']) + with self.assertRaises(SystemExit): + main([broker.db_file, 'compact', '--max-shrinking', '0']) + with self.assertRaises(SystemExit): + main([broker.db_file, 'compact', '--max-expanding', '0']) + + def test_compact_not_root(self): + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + broker.merge_shard_ranges(shard_ranges) + # make broker appear to not be a root container + out = StringIO() + err = StringIO() + broker.set_sharding_sysmeta('Quoted-Root', 'not_a/c') + self.assertFalse(broker.is_root_container()) + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact']) + self.assertEqual(2, ret) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['WARNING: Shard containers cannot be compacted.', + 'This command should be used on a root container.'], + out_lines[:2] + ) + updated_ranges = broker.get_shard_ranges() + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.FOUND] * 10, + [sr.state for sr in updated_ranges]) + + def test_compact_not_sharded(self): + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + broker.merge_shard_ranges(shard_ranges) + # make broker appear to be a root container but it isn't sharded + out = StringIO() + err = StringIO() + broker.set_sharding_sysmeta('Quoted-Root', 'a/c') + self.assertTrue(broker.is_root_container()) + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact']) + self.assertEqual(2, ret) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['WARNING: Container is not yet sharded so cannot be compacted.'], + out_lines[:1]) + updated_ranges = broker.get_shard_ranges() + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.FOUND] * 10, + [sr.state for sr in updated_ranges]) + + def test_compact_overlapping_shard_ranges(self): + # verify that containers with overlaps will not be compacted + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + sr.update_state(ShardRange.ACTIVE) + shard_ranges[3].upper = shard_ranges[4].upper + broker.merge_shard_ranges(shard_ranges) + self._move_broker_to_sharded_state(broker) + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, + 'compact', '--yes', '--max-expanding', '10']) + self.assertEqual(2, ret) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['WARNING: Container has overlapping shard ranges so cannot be ' + 'compacted.'], + out_lines[:1]) + updated_ranges = broker.get_shard_ranges() + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.ACTIVE] * 10, + [sr.state for sr in updated_ranges]) + + def test_compact_shard_ranges_in_found_state(self): + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + broker.merge_shard_ranges(shard_ranges) + self._move_broker_to_sharded_state(broker) + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact']) + self.assertEqual(0, ret) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['No shards identified for compaction.'], + out_lines[:1]) + updated_ranges = broker.get_shard_ranges() + self.assertEqual([ShardRange.FOUND] * 10, + [sr.state for sr in updated_ranges]) + + def test_compact_user_input(self): + # verify user input 'y' or 'n' is respected + small_ranges = (3, 4, 7) + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + if i not in small_ranges: + sr.object_count = 100001 + sr.update_state(ShardRange.ACTIVE) + broker.merge_shard_ranges(shard_ranges) + self._move_broker_to_sharded_state(broker) + + def do_compact(user_input): + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out),\ + mock.patch('sys.stderr', err), \ + mock.patch('swift.cli.manage_shard_ranges.input', + return_value=user_input): + ret = main([broker.db_file, 'compact', + '--max-shrinking', '99']) + self.assertEqual(0, ret) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertIn('can expand to accept 20 objects', out_lines[0]) + self.assertIn('(object count 10)', out_lines[1]) + self.assertIn('(object count 10)', out_lines[2]) + self.assertIn('can expand to accept 10 objects', out_lines[3]) + self.assertIn('(object count 10)', out_lines[4]) + broker_ranges = broker.get_shard_ranges() + return broker_ranges + + broker_ranges = do_compact('n') + # expect no changes to shard ranges + self.assertEqual(shard_ranges, broker_ranges) + for i, sr in enumerate(broker_ranges): + self.assertEqual(ShardRange.ACTIVE, sr.state) + + broker_ranges = do_compact('y') + # expect updated shard ranges + shard_ranges[5].lower = shard_ranges[3].lower + shard_ranges[8].lower = shard_ranges[7].lower + self.assertEqual(shard_ranges, broker_ranges) + for i, sr in enumerate(broker_ranges): + if i in small_ranges: + self.assertEqual(ShardRange.SHRINKING, sr.state) + else: + self.assertEqual(ShardRange.ACTIVE, sr.state) + + def test_compact_three_donors_two_acceptors(self): + small_ranges = (2, 3, 4, 7) + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + if i not in small_ranges: + sr.object_count = 100001 + sr.update_state(ShardRange.ACTIVE) + broker.merge_shard_ranges(shard_ranges) + self._move_broker_to_sharded_state(broker) + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--max-shrinking', '99']) + self.assertEqual(0, ret) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['Updated 2 shard sequences for compaction.'], + out_lines[:1]) + updated_ranges = broker.get_shard_ranges() + for i, sr in enumerate(updated_ranges): + if i in small_ranges: + self.assertEqual(ShardRange.SHRINKING, sr.state) + else: + self.assertEqual(ShardRange.ACTIVE, sr.state) + shard_ranges[5].lower = shard_ranges[2].lower + shard_ranges[8].lower = shard_ranges[7].lower + self.assertEqual(shard_ranges, updated_ranges) + for i in (5, 8): + # acceptors should have updated timestamp + self.assertLess(shard_ranges[i].timestamp, + updated_ranges[i].timestamp) + # check idempotency + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--max-shrinking', '99']) + + self.assertEqual(0, ret) + updated_ranges = broker.get_shard_ranges() + self.assertEqual(shard_ranges, updated_ranges) + for i, sr in enumerate(updated_ranges): + if i in small_ranges: + self.assertEqual(ShardRange.SHRINKING, sr.state) + else: + self.assertEqual(ShardRange.ACTIVE, sr.state) + + def test_compact_all_donors_shrink_to_root(self): + # by default all shard ranges are small enough to shrink so the root + # becomes the acceptor + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + sr.update_state(ShardRange.ACTIVE) + broker.merge_shard_ranges(shard_ranges) + epoch = self._move_broker_to_sharded_state(broker) + own_sr = broker.get_own_shard_range(no_default=True) + self.assertEqual(epoch, own_sr.state_timestamp) # sanity check + self.assertEqual(ShardRange.SHARDED, own_sr.state) # sanity check + + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--max-shrinking', '99']) + self.assertEqual(0, ret, 'stdout:\n%s\nstderr\n%s' % + (out.getvalue(), err.getvalue())) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['Updated 1 shard sequences for compaction.'], + out_lines[:1]) + updated_ranges = broker.get_shard_ranges() + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.SHRINKING] * 10, + [sr.state for sr in updated_ranges]) + updated_own_sr = broker.get_own_shard_range(no_default=True) + self.assertEqual(own_sr.timestamp, updated_own_sr.timestamp) + self.assertEqual(own_sr.epoch, updated_own_sr.epoch) + self.assertLess(own_sr.state_timestamp, + updated_own_sr.state_timestamp) + self.assertEqual(ShardRange.ACTIVE, updated_own_sr.state) + + def test_compact_single_donor_shrink_to_root(self): + # single shard range small enough to shrink so the root becomes the + # acceptor + broker = self._make_broker() + shard_data = [ + {'index': 0, 'lower': '', 'upper': '', 'object_count': 10} + ] + + shard_ranges = make_shard_ranges(broker, shard_data, '.shards_') + shard_ranges[0].update_state(ShardRange.ACTIVE) + broker.merge_shard_ranges(shard_ranges) + epoch = self._move_broker_to_sharded_state(broker) + own_sr = broker.get_own_shard_range(no_default=True) + self.assertEqual(epoch, own_sr.state_timestamp) # sanity check + self.assertEqual(ShardRange.SHARDED, own_sr.state) # sanity check + + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes']) + self.assertEqual(0, ret, 'stdout:\n%s\nstderr\n%s' % + (out.getvalue(), err.getvalue())) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['Updated 1 shard sequences for compaction.'], + out_lines[:1]) + updated_ranges = broker.get_shard_ranges() + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.SHRINKING], + [sr.state for sr in updated_ranges]) + updated_own_sr = broker.get_own_shard_range(no_default=True) + self.assertEqual(own_sr.timestamp, updated_own_sr.timestamp) + self.assertEqual(own_sr.epoch, updated_own_sr.epoch) + self.assertLess(own_sr.state_timestamp, + updated_own_sr.state_timestamp) + self.assertEqual(ShardRange.ACTIVE, updated_own_sr.state) + + def test_compact_donors_but_no_suitable_acceptor(self): + # if shard ranges are already shrinking, check that the final one is + # not made into an acceptor if a suitable adjacent acceptor is not + # found (unexpected scenario but possible in an overlap situation) + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, state in enumerate([ShardRange.SHRINKING] * 3 + + [ShardRange.SHARDING] + + [ShardRange.ACTIVE] * 6): + shard_ranges[i].update_state(state) + broker.merge_shard_ranges(shard_ranges) + epoch = self._move_broker_to_sharded_state(broker) + with mock_timestamp_now(epoch): + own_sr = broker.get_own_shard_range(no_default=True) + self.assertEqual(epoch, own_sr.state_timestamp) # sanity check + self.assertEqual(ShardRange.SHARDED, own_sr.state) # sanity check + + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--max-shrinking', '99']) + self.assertEqual(0, ret, 'stdout:\n%s\nstderr\n%s' % + (out.getvalue(), err.getvalue())) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['Updated 1 shard sequences for compaction.'], + out_lines[:1]) + updated_ranges = broker.get_shard_ranges() + shard_ranges[9].lower = shard_ranges[4].lower # expanded acceptor + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.SHRINKING] * 3 + # unchanged + [ShardRange.SHARDING] + # unchanged + [ShardRange.SHRINKING] * 5 + # moved to shrinking + [ShardRange.ACTIVE], # unchanged + [sr.state for sr in updated_ranges]) + with mock_timestamp_now(epoch): # force equal meta-timestamp + updated_own_sr = broker.get_own_shard_range(no_default=True) + self.assertEqual(dict(own_sr), dict(updated_own_sr)) + + def test_compact_no_gaps(self): + # verify that compactable sequences do not include gaps + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + sr.update_state(ShardRange.ACTIVE) + gapped_ranges = shard_ranges[:3] + shard_ranges[4:] + broker.merge_shard_ranges(gapped_ranges) + self._move_broker_to_sharded_state(broker) + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--max-shrinking', '99']) + self.assertEqual(0, ret) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['Updated 2 shard sequences for compaction.'], + out_lines[:1]) + updated_ranges = broker.get_shard_ranges() + gapped_ranges[2].lower = gapped_ranges[0].lower + gapped_ranges[8].lower = gapped_ranges[3].lower + self.assertEqual(gapped_ranges, updated_ranges) + self.assertEqual([ShardRange.SHRINKING] * 2 + [ShardRange.ACTIVE] + + [ShardRange.SHRINKING] * 5 + [ShardRange.ACTIVE], + [sr.state for sr in updated_ranges]) + + def test_compact_max_shrinking_default(self): + # verify default limit on number of shrinking shards per acceptor + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + sr.update_state(ShardRange.ACTIVE) + broker.merge_shard_ranges(shard_ranges) + self._move_broker_to_sharded_state(broker) + + def do_compact(): + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes']) + self.assertEqual(0, ret) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['Updated 5 shard sequences for compaction.'], + out_lines[:1]) + return broker.get_shard_ranges() + + updated_ranges = do_compact() + for acceptor in (1, 3, 5, 7, 9): + shard_ranges[acceptor].lower = shard_ranges[acceptor - 1].lower + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.SHRINKING, ShardRange.ACTIVE] * 5, + [sr.state for sr in updated_ranges]) + + # check idempotency + updated_ranges = do_compact() + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.SHRINKING, ShardRange.ACTIVE] * 5, + [sr.state for sr in updated_ranges]) + + def test_compact_max_shrinking(self): + # verify option to limit the number of shrinking shards per acceptor + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + sr.update_state(ShardRange.ACTIVE) + broker.merge_shard_ranges(shard_ranges) + self._move_broker_to_sharded_state(broker) + + def do_compact(): + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--max-shrinking', '7']) + self.assertEqual(0, ret) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['Updated 2 shard sequences for compaction.'], + out_lines[:1]) + return broker.get_shard_ranges() + + updated_ranges = do_compact() + shard_ranges[7].lower = shard_ranges[0].lower + shard_ranges[9].lower = shard_ranges[8].lower + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.SHRINKING] * 7 + [ShardRange.ACTIVE] + + [ShardRange.SHRINKING] + [ShardRange.ACTIVE], + [sr.state for sr in updated_ranges]) + + # check idempotency + updated_ranges = do_compact() + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.SHRINKING] * 7 + [ShardRange.ACTIVE] + + [ShardRange.SHRINKING] + [ShardRange.ACTIVE], + [sr.state for sr in updated_ranges]) + + def test_compact_max_expanding(self): + # verify option to limit the number of expanding shards per acceptor + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + sr.update_state(ShardRange.ACTIVE) + broker.merge_shard_ranges(shard_ranges) + self._move_broker_to_sharded_state(broker) + out = StringIO() + err = StringIO() + # note: max_shrinking is set to 3 so that there is opportunity for more + # than 2 acceptors + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--max-shrinking', '3', '--max-expanding', '2']) + self.assertEqual(0, ret) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['Updated 2 shard sequences for compaction.'], + out_lines[:1]) + updated_ranges = broker.get_shard_ranges() + shard_ranges[3].lower = shard_ranges[0].lower + shard_ranges[7].lower = shard_ranges[4].lower + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.SHRINKING] * 3 + [ShardRange.ACTIVE] + + [ShardRange.SHRINKING] * 3 + [ShardRange.ACTIVE] * 3, + [sr.state for sr in updated_ranges]) + + def test_compact_expansion_limit(self): + # verify option to limit the size of each acceptor after compaction + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + sr.update_state(ShardRange.ACTIVE) + broker.merge_shard_ranges(shard_ranges) + self._move_broker_to_sharded_state(broker) + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--expansion-limit', '20']) + self.assertEqual(0, ret, out.getvalue()) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['Updated 5 shard sequences for compaction.'], + out_lines[:1]) + updated_ranges = broker.get_shard_ranges() + shard_ranges[1].lower = shard_ranges[0].lower + shard_ranges[3].lower = shard_ranges[2].lower + shard_ranges[5].lower = shard_ranges[4].lower + shard_ranges[7].lower = shard_ranges[6].lower + shard_ranges[9].lower = shard_ranges[8].lower + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.SHRINKING] + [ShardRange.ACTIVE] + + [ShardRange.SHRINKING] + [ShardRange.ACTIVE] + + [ShardRange.SHRINKING] + [ShardRange.ACTIVE] + + [ShardRange.SHRINKING] + [ShardRange.ACTIVE] + + [ShardRange.SHRINKING] + [ShardRange.ACTIVE], + [sr.state for sr in updated_ranges]) + + def test_compact_shrink_threshold(self): + # verify option to set the shrink threshold for compaction; + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + sr.update_state(ShardRange.ACTIVE) + # (n-2)th shard range has one extra object + shard_ranges[-2].object_count = 11 + broker.merge_shard_ranges(shard_ranges) + self._move_broker_to_sharded_state(broker) + # with threshold set to 10 no shard ranges can be shrunk + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--max-shrinking', '99', + '--shrink-threshold', '10']) + self.assertEqual(0, ret) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['No shards identified for compaction.'], + out_lines[:1]) + updated_ranges = broker.get_shard_ranges() + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.ACTIVE] * 10, + [sr.state for sr in updated_ranges]) + + # with threshold == 11 all but the final 2 shard ranges can be shrunk; + # note: the (n-1)th shard range is NOT shrunk to root + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--max-shrinking', '99', + '--shrink-threshold', '11']) + self.assertEqual(0, ret) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['Updated 1 shard sequences for compaction.'], + out_lines[:1]) + updated_ranges = broker.get_shard_ranges() + shard_ranges[8].lower = shard_ranges[0].lower + self.assertEqual(shard_ranges, updated_ranges) + self.assertEqual([ShardRange.SHRINKING] * 8 + [ShardRange.ACTIVE] * 2, + [sr.state for sr in updated_ranges]) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 9b5b343d6b..59cf145760 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -74,7 +74,7 @@ from swift.common.exceptions import Timeout, MessageTimeout, \ MimeInvalid from swift.common import utils from swift.common.utils import is_valid_ip, is_valid_ipv4, is_valid_ipv6, \ - set_swift_dir, md5 + set_swift_dir, md5, ShardRangeList from swift.common.container_sync_realms import ContainerSyncRealms from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import POLICIES, reload_storage_policies @@ -8323,6 +8323,135 @@ class TestShardRange(unittest.TestCase): self.assertEqual('a/root-%s-%s-foo' % (parent_hash, ts.internal), actual) + def test_expand(self): + bounds = (('', 'd'), ('d', 'k'), ('k', 't'), ('t', '')) + donors = [ + utils.ShardRange('a/c-%d' % i, utils.Timestamp.now(), b[0], b[1]) + for i, b in enumerate(bounds) + ] + acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), 'f', 's') + self.assertTrue(acceptor.expand(donors[:1])) + self.assertEqual((utils.ShardRange.MIN, 's'), + (acceptor.lower, acceptor.upper)) + + acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), 'f', 's') + self.assertTrue(acceptor.expand(donors[:2])) + self.assertEqual((utils.ShardRange.MIN, 's'), + (acceptor.lower, acceptor.upper)) + + acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), 'f', 's') + self.assertTrue(acceptor.expand(donors[1:3])) + self.assertEqual(('d', 't'), + (acceptor.lower, acceptor.upper)) + + acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), 'f', 's') + self.assertTrue(acceptor.expand(donors)) + self.assertEqual((utils.ShardRange.MIN, utils.ShardRange.MAX), + (acceptor.lower, acceptor.upper)) + + acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), 'f', 's') + self.assertTrue(acceptor.expand(donors[1:2] + donors[3:])) + self.assertEqual(('d', utils.ShardRange.MAX), + (acceptor.lower, acceptor.upper)) + + acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), '', 'd') + self.assertFalse(acceptor.expand(donors[:1])) + self.assertEqual((utils.ShardRange.MIN, 'd'), + (acceptor.lower, acceptor.upper)) + + acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), 'b', 'v') + self.assertFalse(acceptor.expand(donors[1:3])) + self.assertEqual(('b', 'v'), + (acceptor.lower, acceptor.upper)) + + +class TestShardRangeList(unittest.TestCase): + def setUp(self): + self.shard_ranges = [ + utils.ShardRange('a/b', utils.Timestamp.now(), 'a', 'b', + object_count=2, bytes_used=22), + utils.ShardRange('b/c', utils.Timestamp.now(), 'b', 'c', + object_count=4, bytes_used=44), + utils.ShardRange('x/y', utils.Timestamp.now(), 'x', 'y', + object_count=6, bytes_used=66), + ] + + def test_init(self): + srl = ShardRangeList() + self.assertEqual(0, len(srl)) + self.assertEqual(utils.ShardRange.MIN, srl.lower) + self.assertEqual(utils.ShardRange.MIN, srl.upper) + self.assertEqual(0, srl.object_count) + self.assertEqual(0, srl.bytes_used) + + def test_init_with_list(self): + srl = ShardRangeList(self.shard_ranges[:2]) + self.assertEqual(2, len(srl)) + self.assertEqual('a', srl.lower) + self.assertEqual('c', srl.upper) + self.assertEqual(6, srl.object_count) + self.assertEqual(66, srl.bytes_used) + + srl.append(self.shard_ranges[2]) + self.assertEqual(3, len(srl)) + self.assertEqual('a', srl.lower) + self.assertEqual('y', srl.upper) + self.assertEqual(12, srl.object_count) + self.assertEqual(132, srl.bytes_used) + + def test_pop(self): + srl = ShardRangeList(self.shard_ranges[:2]) + srl.pop() + self.assertEqual(1, len(srl)) + self.assertEqual('a', srl.lower) + self.assertEqual('b', srl.upper) + self.assertEqual(2, srl.object_count) + self.assertEqual(22, srl.bytes_used) + + def test_slice(self): + srl = ShardRangeList(self.shard_ranges) + sublist = srl[:1] + self.assertIsInstance(sublist, ShardRangeList) + self.assertEqual(1, len(sublist)) + self.assertEqual('a', sublist.lower) + self.assertEqual('b', sublist.upper) + self.assertEqual(2, sublist.object_count) + self.assertEqual(22, sublist.bytes_used) + + sublist = srl[1:] + self.assertIsInstance(sublist, ShardRangeList) + self.assertEqual(2, len(sublist)) + self.assertEqual('b', sublist.lower) + self.assertEqual('y', sublist.upper) + self.assertEqual(10, sublist.object_count) + self.assertEqual(110, sublist.bytes_used) + + def test_includes(self): + srl = ShardRangeList(self.shard_ranges) + + for sr in self.shard_ranges: + self.assertTrue(srl.includes(sr)) + + self.assertTrue(srl.includes(srl)) + + sr = utils.ShardRange('a/a', utils.Timestamp.now(), '', 'a') + self.assertFalse(srl.includes(sr)) + sr = utils.ShardRange('a/a', utils.Timestamp.now(), '', 'b') + self.assertFalse(srl.includes(sr)) + sr = utils.ShardRange('a/z', utils.Timestamp.now(), 'x', 'z') + self.assertFalse(srl.includes(sr)) + sr = utils.ShardRange('a/z', utils.Timestamp.now(), 'y', 'z') + self.assertFalse(srl.includes(sr)) + sr = utils.ShardRange('a/entire', utils.Timestamp.now(), '', '') + self.assertFalse(srl.includes(sr)) + + # entire range + srl_entire = ShardRangeList([sr]) + self.assertFalse(srl.includes(srl_entire)) + # make a fresh instance + sr = utils.ShardRange('a/entire', utils.Timestamp.now(), '', '') + self.assertTrue(srl_entire.includes(sr)) + @patch('ctypes.get_errno') @patch.object(utils, '_sys_posix_fallocate') diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index c300b3382a..e5fff6f2c9 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -39,7 +39,8 @@ from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \ SHARDED, DATADIR from swift.container.sharder import ContainerSharder, sharding_enabled, \ CleavingContext, DEFAULT_SHARD_SHRINK_POINT, \ - DEFAULT_SHARD_CONTAINER_THRESHOLD + DEFAULT_SHARD_CONTAINER_THRESHOLD, finalize_shrinking, \ + find_shrinking_candidates, process_compactable_shard_sequences from swift.common.utils import ShardRange, Timestamp, hash_path, \ encode_timestamps, parse_db_filename, quorum_size, Everything, md5 from test import annotate_failure @@ -5146,7 +5147,9 @@ class TestSharder(BaseTestSharder): DEFAULT_SHARD_CONTAINER_THRESHOLD / 100) shard_ranges = self._make_shard_ranges( shard_bounds, state=ShardRange.ACTIVE, object_count=size) - broker.merge_shard_ranges(shard_ranges) + own_sr = broker.get_own_shard_range() + own_sr.update_state(ShardRange.SHARDED, Timestamp.now()) + broker.merge_shard_ranges(shard_ranges + [own_sr]) self.assertTrue(broker.set_sharding_state()) self.assertTrue(broker.set_sharded_state()) with self._mock_sharder() as sharder: @@ -5239,6 +5242,53 @@ class TestSharder(BaseTestSharder): [final_donor, broker.get_own_shard_range()])] ) + def test_find_and_enable_multiple_shrinking_candidates(self): + broker = self._make_broker() + broker.enable_sharding(next(self.ts_iter)) + shard_bounds = (('', 'a'), ('a', 'b'), ('b', 'c'), + ('c', 'd'), ('d', 'e'), ('e', '')) + size = (DEFAULT_SHARD_SHRINK_POINT * + DEFAULT_SHARD_CONTAINER_THRESHOLD / 100) + shard_ranges = self._make_shard_ranges( + shard_bounds, state=ShardRange.ACTIVE, object_count=size) + own_sr = broker.get_own_shard_range() + own_sr.update_state(ShardRange.SHARDED, Timestamp.now()) + broker.merge_shard_ranges(shard_ranges + [own_sr]) + self.assertTrue(broker.set_sharding_state()) + self.assertTrue(broker.set_sharded_state()) + with self._mock_sharder() as sharder: + sharder._find_and_enable_shrinking_candidates(broker) + self._assert_shard_ranges_equal(shard_ranges, + broker.get_shard_ranges()) + + # three ranges just below threshold + shard_ranges = broker.get_shard_ranges() # get timestamps updated + shard_ranges[0].update_meta(size - 1, 0) + shard_ranges[1].update_meta(size - 1, 0) + shard_ranges[3].update_meta(size - 1, 0) + broker.merge_shard_ranges(shard_ranges) + with self._mock_sharder() as sharder: + with mock_timestamp_now() as now: + sharder._send_shard_ranges = mock.MagicMock() + sharder._find_and_enable_shrinking_candidates(broker) + # 0 shrinks into 1 (only one donor per acceptor is allowed) + shard_ranges[0].update_state(ShardRange.SHRINKING, state_timestamp=now) + shard_ranges[0].epoch = now + shard_ranges[1].lower = shard_ranges[0].lower + shard_ranges[1].timestamp = now + # 3 shrinks into 4 + shard_ranges[3].update_state(ShardRange.SHRINKING, state_timestamp=now) + shard_ranges[3].epoch = now + shard_ranges[4].lower = shard_ranges[3].lower + shard_ranges[4].timestamp = now + self._assert_shard_ranges_equal(shard_ranges, + broker.get_shard_ranges()) + for donor, acceptor in (shard_ranges[:2], shard_ranges[3:5]): + sharder._send_shard_ranges.assert_has_calls( + [mock.call(acceptor.account, acceptor.container, [acceptor]), + mock.call(donor.account, donor.container, [donor, acceptor])] + ) + def test_partition_and_device_filters(self): # verify partitions and devices kwargs result in filtering of processed # containers but not of the local device ids. @@ -5804,3 +5854,135 @@ class TestCleavingContext(BaseTestSharder): self.assertEqual(2, ctx.ranges_done) self.assertEqual(8, ctx.ranges_todo) self.assertEqual('c', ctx.cursor) + + +class TestSharderFunctions(BaseTestSharder): + def test_find_shrinking_candidates(self): + broker = self._make_broker() + shard_bounds = (('', 'a'), ('a', 'b'), ('b', 'c'), ('c', 'd')) + threshold = (DEFAULT_SHARD_SHRINK_POINT * + DEFAULT_SHARD_CONTAINER_THRESHOLD / 100) + shard_ranges = self._make_shard_ranges( + shard_bounds, state=ShardRange.ACTIVE, object_count=threshold) + broker.merge_shard_ranges(shard_ranges) + pairs = find_shrinking_candidates(broker, threshold, threshold * 4) + self.assertEqual({}, pairs) + + # one range just below threshold + shard_ranges[0].update_meta(threshold - 1, 0) + broker.merge_shard_ranges(shard_ranges[0]) + pairs = find_shrinking_candidates(broker, threshold, threshold * 4) + self.assertEqual(1, len(pairs), pairs) + for acceptor, donor in pairs.items(): + self.assertEqual(shard_ranges[1], acceptor) + self.assertEqual(shard_ranges[0], donor) + + # two ranges just below threshold + shard_ranges[2].update_meta(threshold - 1, 0) + broker.merge_shard_ranges(shard_ranges[2]) + pairs = find_shrinking_candidates(broker, threshold, threshold * 4) + # shenanigans to work around dicts with ShardRanges keys not comparing + acceptors = [] + donors = [] + for acceptor, donor in pairs.items(): + acceptors.append(acceptor) + donors.append(donor) + acceptors.sort(key=ShardRange.sort_key) + donors.sort(key=ShardRange.sort_key) + self.assertEqual([shard_ranges[1], shard_ranges[3]], acceptors) + self.assertEqual([shard_ranges[0], shard_ranges[2]], donors) + + def test_finalize_shrinking(self): + broker = self._make_broker() + broker.enable_sharding(next(self.ts_iter)) + shard_bounds = (('', 'here'), ('here', 'there'), ('there', '')) + ts_0 = next(self.ts_iter) + shard_ranges = self._make_shard_ranges( + shard_bounds, state=ShardRange.ACTIVE, timestamp=ts_0) + self.assertTrue(broker.set_sharding_state()) + self.assertTrue(broker.set_sharded_state()) + ts_1 = next(self.ts_iter) + finalize_shrinking(broker, shard_ranges[2:], shard_ranges[:2], ts_1) + updated_ranges = broker.get_shard_ranges() + self.assertEqual( + [ShardRange.SHRINKING, ShardRange.SHRINKING, ShardRange.ACTIVE], + [sr.state for sr in updated_ranges] + ) + # acceptor is not updated... + self.assertEqual(ts_0, updated_ranges[2].timestamp) + # donors are updated... + self.assertEqual([ts_1] * 2, + [sr.state_timestamp for sr in updated_ranges[:2]]) + self.assertEqual([ts_1] * 2, + [sr.epoch for sr in updated_ranges[:2]]) + # check idempotency + ts_2 = next(self.ts_iter) + finalize_shrinking(broker, shard_ranges[2:], shard_ranges[:2], ts_2) + updated_ranges = broker.get_shard_ranges() + self.assertEqual( + [ShardRange.SHRINKING, ShardRange.SHRINKING, ShardRange.ACTIVE], + [sr.state for sr in updated_ranges] + ) + # acceptor is not updated... + self.assertEqual(ts_0, updated_ranges[2].timestamp) + # donors are not updated... + self.assertEqual([ts_1] * 2, + [sr.state_timestamp for sr in updated_ranges[:2]]) + self.assertEqual([ts_1] * 2, + [sr.epoch for sr in updated_ranges[:2]]) + + def test_process_compactable(self): + ts_0 = next(self.ts_iter) + # no sequences... + acceptors, donors = process_compactable_shard_sequences([], ts_0) + self.assertEqual([], acceptors) + self.assertEqual([], donors) + + # two sequences with acceptor bounds needing to be updated + sequence_1 = self._make_shard_ranges( + (('a', 'b'), ('b', 'c'), ('c', 'd')), + state=ShardRange.ACTIVE, timestamp=ts_0) + sequence_2 = self._make_shard_ranges( + (('x', 'y'), ('y', 'z')), + state=ShardRange.ACTIVE, timestamp=ts_0) + ts_1 = next(self.ts_iter) + acceptors, donors = process_compactable_shard_sequences( + [sequence_1, sequence_2], ts_1) + expected_donors = sequence_1[:-1] + sequence_2[:-1] + expected_acceptors = [sequence_1[-1].copy(lower='a', timestamp=ts_1), + sequence_2[-1].copy(lower='x', timestamp=ts_1)] + self.assertEqual([dict(sr) for sr in expected_acceptors], + [dict(sr) for sr in acceptors]) + self.assertEqual([dict(sr) for sr in expected_donors], + [dict(sr) for sr in donors]) + + # sequences have already been processed - acceptors expanded + sequence_1 = self._make_shard_ranges( + (('a', 'b'), ('b', 'c'), ('a', 'd')), + state=ShardRange.ACTIVE, timestamp=ts_0) + sequence_2 = self._make_shard_ranges( + (('x', 'y'), ('x', 'z')), + state=ShardRange.ACTIVE, timestamp=ts_0) + acceptors, donors = process_compactable_shard_sequences( + [sequence_1, sequence_2], ts_1) + expected_donors = sequence_1[:-1] + sequence_2[:-1] + expected_acceptors = [sequence_1[-1], sequence_2[-1]] + self.assertEqual([dict(sr) for sr in expected_acceptors], + [dict(sr) for sr in acceptors]) + self.assertEqual([dict(sr) for sr in expected_donors], + [dict(sr) for sr in donors]) + + # acceptor is root - needs state to be updated, but not bounds + sequence_1 = self._make_shard_ranges( + (('a', 'b'), ('b', 'c'), ('a', 'd'), ('d', ''), ('', '')), + state=[ShardRange.ACTIVE] * 4 + [ShardRange.SHARDED], + timestamp=ts_0) + acceptors, donors = process_compactable_shard_sequences( + [sequence_1], ts_1) + expected_donors = sequence_1[:-1] + expected_acceptors = [sequence_1[-1].copy(state=ShardRange.ACTIVE, + state_timestamp=ts_1)] + self.assertEqual([dict(sr) for sr in expected_acceptors], + [dict(sr) for sr in acceptors]) + self.assertEqual([dict(sr) for sr in expected_donors], + [dict(sr) for sr in donors])