diff --git a/doc/source/config/container_server_config.rst b/doc/source/config/container_server_config.rst index acbb32310c..85fb9b377c 100644 --- a/doc/source/config/container_server_config.rst +++ b/doc/source/config/container_server_config.rst @@ -363,12 +363,12 @@ shard_shrink_merge_point 75 When auto-sharding is For example, if shard_container_threshold is 1 million, - shard_shrink_point is 5, + shard_shrink_point is 10, and shard_shrink_merge_point is 75 then a shard will be considered for shrinking if it has less - than or equal to 50 + than or equal to 100 thousand objects but will only merge into an acceptor if the combined diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index d03217cbb8..0bcda5af1d 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -375,7 +375,7 @@ use = egg:swift#xprofile # When auto-sharding is enabled shard_shrink_point defines the object count # below which a 'donor' shard container will be considered for shrinking into # another 'acceptor' shard container. shard_shrink_point is a percentage of -# shard_container_threshold e.g. the default value of 5 means 5% of the +# shard_container_threshold e.g. the default value of 10 means 10% of the # shard_container_threshold. # shard_shrink_point = 10 # @@ -387,8 +387,8 @@ use = egg:swift#xprofile # for the donor to be allowed to merge into the acceptor. # # For example, if the shard_container_threshold is 1 million, -# shard_shrink_point is 5, and shard_shrink_merge_point is 75 then a shard will -# be considered for shrinking if it has less than or equal to 50 thousand +# shard_shrink_point is 10, and shard_shrink_merge_point is 75 then a shard will +# be considered for shrinking if it has less than or equal to 100 thousand # objects but will only merge into an acceptor if the combined object count # would be less than or equal to 750 thousand objects. # shard_shrink_merge_point = 75 diff --git a/swift/cli/manage_shard_ranges.py b/swift/cli/manage_shard_ranges.py index 34556b0dfe..3ec4f16d64 100644 --- a/swift/cli/manage_shard_ranges.py +++ b/swift/cli/manage_shard_ranges.py @@ -165,26 +165,27 @@ from contextlib import contextmanager from six.moves import input + from swift.common.utils import Timestamp, get_logger, ShardRange, readconf, \ - config_percent_value, config_positive_int_value, ShardRangeList + ShardRangeList from swift.container.backend import ContainerBroker, UNSHARDED from swift.container.sharder import make_shard_ranges, sharding_enabled, \ CleavingContext, process_compactible_shard_sequences, \ find_compactible_shard_sequences, find_overlapping_ranges, \ - find_paths, rank_paths, finalize_shrinking, \ - DEFAULT_MAX_SHRINKING, DEFAULT_MAX_EXPANDING, \ - DEFAULT_SHARD_CONTAINER_THRESHOLD, DEFAULT_SHARD_SHRINK_POINT, \ - DEFAULT_SHARD_MERGE_POINT - -DEFAULT_ROWS_PER_SHARD = DEFAULT_SHARD_CONTAINER_THRESHOLD // 2 -DEFAULT_SHRINK_THRESHOLD = DEFAULT_SHARD_CONTAINER_THRESHOLD * \ - config_percent_value(DEFAULT_SHARD_SHRINK_POINT) + find_paths, rank_paths, finalize_shrinking, DEFAULT_SHARDER_CONF, \ + ContainerSharderConf EXIT_SUCCESS = 0 EXIT_ERROR = 1 EXIT_INVALID_ARGS = 2 # consistent with argparse exit code for invalid args EXIT_USER_QUIT = 3 +# Some CLI options derive their default values from DEFAULT_SHARDER_CONF if +# they have not been set. It is therefore important that the CLI parser +# provides None as a default so that we can detect that no value was set on the +# command line. We use this alias to act as a reminder. +USE_SHARDER_DEFAULT = object() + class ManageShardRangesException(Exception): pass @@ -702,14 +703,21 @@ def _positive_int(arg): def _add_find_args(parser): - parser.add_argument('rows_per_shard', nargs='?', type=int, - default=None) + parser.add_argument( + 'rows_per_shard', nargs='?', type=int, default=USE_SHARDER_DEFAULT, + help='Target number of rows for newly created shards. ' + 'Default is half of the shard_container_threshold value if that is ' + 'given in a conf file specified with --config, otherwise %s.' + % DEFAULT_SHARDER_CONF['rows_per_shard']) def _add_replace_args(parser): parser.add_argument( '--shards_account_prefix', metavar='shards_account_prefix', type=str, - required=False, help='Prefix for shards account', default='.shards_') + required=False, default='.shards_', + help="Prefix for shards account. The default is '.shards_'. This " + "should only be changed if the auto_create_account_prefix option " + "has been similarly changed in swift.conf.") parser.add_argument( '--replace-timeout', type=int, default=600, help='Minimum DB timeout to use when replacing shard ranges.') @@ -825,19 +833,18 @@ def _make_parser(): help='Compact shard ranges with less than the shrink-threshold number ' 'of rows. This command only works on root containers.') _add_prompt_args(compact_parser) - compact_parser.add_argument('--shrink-threshold', nargs='?', - type=_positive_int, - default=None, - help='The number of rows below which a shard ' - 'can qualify for shrinking. Defaults to ' - '%d' % DEFAULT_SHRINK_THRESHOLD) - compact_parser.add_argument('--expansion-limit', nargs='?', - type=_positive_int, - default=None, - help='Maximum number of rows for an expanding ' - 'shard to have after compaction has ' - 'completed. Defaults to %d' % - DEFAULT_ROWS_PER_SHARD) + compact_parser.add_argument( + '--shrink-threshold', nargs='?', type=_positive_int, + default=USE_SHARDER_DEFAULT, + help='The number of rows below which a shard can qualify for ' + 'shrinking. ' + 'Defaults to %d' % DEFAULT_SHARDER_CONF['shrink_threshold']) + compact_parser.add_argument( + '--expansion-limit', nargs='?', type=_positive_int, + default=USE_SHARDER_DEFAULT, + help='Maximum number of rows for an expanding shard to have after ' + 'compaction has completed. ' + 'Defaults to %d' % DEFAULT_SHARDER_CONF['expansion_limit']) # If just one donor shard is chosen to shrink to an acceptor then the # expanded acceptor will handle object listings as soon as the donor shard # has shrunk. If more than one donor shard are chosen to shrink to an @@ -846,7 +853,7 @@ def _make_parser(): # temporary gap(s) in object listings where the shrunk donors are missing. compact_parser.add_argument('--max-shrinking', nargs='?', type=_positive_int, - default=None, + default=USE_SHARDER_DEFAULT, help='Maximum number of shards that should be ' 'shrunk into each expanding shard. ' 'Defaults to 1. Using values greater ' @@ -855,7 +862,7 @@ def _make_parser(): 'shards have shrunk.') compact_parser.add_argument('--max-expanding', nargs='?', type=_positive_int, - default=None, + default=USE_SHARDER_DEFAULT, help='Maximum number of shards that should be ' 'expanded. Defaults to unlimited.') compact_parser.set_defaults(func=compact_shard_ranges) @@ -878,9 +885,9 @@ def _make_parser(): return parser -def main(args=None): +def main(cli_args=None): parser = _make_parser() - args = parser.parse_args(args) + args = parser.parse_args(cli_args) if not args.subcommand: # On py2, subparsers are required; on py3 they are not; see # https://bugs.python.org/issue9253. py37 added a `required` kwarg @@ -891,49 +898,25 @@ def main(args=None): print('\nA sub-command is required.', file=sys.stderr) return EXIT_INVALID_ARGS - conf = {} - rows_per_shard = DEFAULT_ROWS_PER_SHARD - shrink_threshold = DEFAULT_SHRINK_THRESHOLD - expansion_limit = DEFAULT_ROWS_PER_SHARD - if args.conf_file: - try: + try: + # load values from conf file or sharder defaults + conf = {} + if args.conf_file: conf = readconf(args.conf_file, 'container-sharder') - shard_container_threshold = config_positive_int_value(conf.get( - 'shard_container_threshold', - DEFAULT_SHARD_CONTAINER_THRESHOLD)) - if shard_container_threshold: - rows_per_shard = shard_container_threshold // 2 - shrink_threshold = int( - shard_container_threshold * config_percent_value( - conf.get('shard_shrink_point', - DEFAULT_SHARD_SHRINK_POINT))) - expansion_limit = int( - shard_container_threshold * config_percent_value( - conf.get('shard_shrink_merge_point', - DEFAULT_SHARD_MERGE_POINT))) - except (OSError, IOError) as exc: - print('Error opening config file %s: %s' % (args.conf_file, exc), - file=sys.stderr) - return EXIT_ERROR - except (TypeError, ValueError) as exc: - print('Error loading config file %s: %s' % (args.conf_file, exc), - file=sys.stderr) - return EXIT_INVALID_ARGS + conf_args = ContainerSharderConf(conf) + except (OSError, IOError) as exc: + print('Error opening config file %s: %s' % (args.conf_file, exc), + file=sys.stderr) + return EXIT_ERROR + except (TypeError, ValueError) as exc: + print('Error loading config file %s: %s' % (args.conf_file, exc), + file=sys.stderr) + return EXIT_INVALID_ARGS - # seems having sub parsers mean sometimes an arg wont exist in the args - # namespace. But we can check if it is with the 'in' statement. - if "max_shrinking" in args and args.max_shrinking is None: - args.max_shrinking = int(conf.get( - "max_shrinking", DEFAULT_MAX_SHRINKING)) - if "max_expanding" in args and args.max_expanding is None: - args.max_expanding = int(conf.get( - "max_expanding", DEFAULT_MAX_EXPANDING)) - if "shrink_threshold" in args and args.shrink_threshold is None: - args.shrink_threshold = shrink_threshold - if "expansion_limit" in args and args.expansion_limit is None: - args.expansion_limit = expansion_limit - if "rows_per_shard" in args and args.rows_per_shard is None: - args.rows_per_shard = rows_per_shard + for k, v in vars(args).items(): + # set any un-set cli args from conf_args + if v is USE_SHARDER_DEFAULT: + setattr(args, k, getattr(conf_args, k)) if args.func in (analyze_shard_ranges,): args.input = args.path_to_file diff --git a/swift/container/sharder.py b/swift/container/sharder.py index a2d811f376..8153be3b3a 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -126,16 +126,16 @@ def is_sharding_candidate(shard_range, threshold): shard_range.object_count >= threshold) -def is_shrinking_candidate(shard_range, shrink_threshold, merge_size, +def is_shrinking_candidate(shard_range, shrink_threshold, expansion_limit, states=None): - # typically shrink_threshold < merge_size but check both just in case + # typically shrink_threshold < expansion_limit but check both just in case # note: use *row* count (objects plus tombstones) as the condition for # shrinking to avoid inadvertently moving large numbers of tombstones into # an acceptor states = states or (ShardRange.ACTIVE,) return (shard_range.state in states and shard_range.row_count < shrink_threshold and - shard_range.row_count <= merge_size) + shard_range.row_count <= expansion_limit) def find_sharding_candidates(broker, threshold, shard_ranges=None): @@ -156,13 +156,13 @@ def find_sharding_candidates(broker, threshold, shard_ranges=None): return candidates -def find_shrinking_candidates(broker, shrink_threshold, merge_size): +def find_shrinking_candidates(broker, shrink_threshold, expansion_limit): # this is only here to preserve a legacy public function signature; # superseded by find_compactible_shard_sequences merge_pairs = {} # restrict search to sequences with one donor results = find_compactible_shard_sequences(broker, shrink_threshold, - merge_size, 1, -1, + expansion_limit, 1, -1, include_shrinking=True) for sequence in results: # map acceptor -> donor list @@ -172,7 +172,7 @@ def find_shrinking_candidates(broker, shrink_threshold, merge_size): def find_compactible_shard_sequences(broker, shrink_threshold, - merge_size, + expansion_limit, max_shrinking, max_expanding, include_shrinking=False): @@ -185,8 +185,8 @@ def find_compactible_shard_sequences(broker, :param broker: A :class:`~swift.container.backend.ContainerBroker`. :param shrink_threshold: the number of rows below which a shard may be considered for shrinking into another shard - :param merge_size: the maximum number of rows that an acceptor shard range - should have after other shard ranges have been compacted into it + :param expansion_limit: the maximum number of rows that an acceptor shard + range should have after other shard ranges have been compacted into it :param max_shrinking: the maximum number of shard ranges that should be compacted into each acceptor; -1 implies unlimited. :param max_expanding: the maximum number of acceptors to be found (i.e. the @@ -213,13 +213,13 @@ def find_compactible_shard_sequences(broker, # - the max number of shard ranges to be compacted (max_shrinking) has # been reached # - the total number of objects in the sequence has reached the - # merge_size + # expansion_limit if (sequence and (not is_shrinking_candidate( - sequence[-1], shrink_threshold, merge_size, + sequence[-1], shrink_threshold, expansion_limit, states=(ShardRange.ACTIVE, ShardRange.SHRINKING)) or 0 < max_shrinking < len(sequence) or - sequence.row_count >= merge_size)): + sequence.row_count >= expansion_limit)): return True return False @@ -229,7 +229,7 @@ def find_compactible_shard_sequences(broker, while ((max_expanding < 0 or expanding < max_expanding) and index < len(shard_ranges)): if not is_shrinking_candidate( - shard_ranges[index], shrink_threshold, merge_size, + shard_ranges[index], shrink_threshold, expansion_limit, states=(ShardRange.ACTIVE, ShardRange.SHRINKING)): # this shard range cannot be the start of a new or existing # compactible sequence, move on @@ -254,7 +254,7 @@ def find_compactible_shard_sequences(broker, # already shrinking: add to sequence unconditionally sequence.append(shard_range) elif (sequence.row_count + shard_range.row_count - <= merge_size): + <= expansion_limit): # add to sequence: could be a donor or acceptor sequence.append(shard_range) if sequence_complete(sequence): @@ -269,7 +269,7 @@ def find_compactible_shard_sequences(broker, sequence.includes(own_shard_range)): # special case: only one sequence has been found, which consumes # all shard ranges, encompasses the entire namespace, has no more - # than merge_size records and whose shard ranges are all + # than expansion_limit records and whose shard ranges are all # shrinkable; all the shards in the sequence can be shrunk to the # root, so append own_shard_range to the sequence to act as an # acceptor; note: only shrink to the root when *all* the remaining @@ -572,19 +572,70 @@ class CleavingContext(object): broker.set_sharding_sysmeta('Context-' + self.ref, '') -DEFAULT_SHARD_CONTAINER_THRESHOLD = 1000000 -DEFAULT_SHARD_SHRINK_POINT = 10 -DEFAULT_SHARD_MERGE_POINT = 75 -DEFAULT_MAX_SHRINKING = 1 -DEFAULT_MAX_EXPANDING = -1 +class ContainerSharderConf(object): + def __init__(self, conf=None): + conf = conf if conf else {} + + def get_val(key, validator, default): + """ + Get a value from conf and validate it. + + :param key: key to lookup value in the ``conf`` dict. + :param validator: A function that will passed the value from the + ``conf`` dict and should return the value to be set. This + function should raise a ValueError if the ``conf`` value if not + valid. + :param default: value to use if ``key`` is not found in ``conf``. + :raises: ValueError if the value read from ``conf`` is invalid. + :returns: the configuration value. + """ + try: + return validator(conf.get(key, default)) + except ValueError as err: + raise ValueError('Error setting %s: %s' % (key, err)) + + self.shard_container_threshold = get_val( + 'shard_container_threshold', config_positive_int_value, 1000000) + self.max_shrinking = get_val( + 'max_shrinking', int, 1) + self.max_expanding = get_val( + 'max_expanding', int, -1) + self.shard_scanner_batch_size = get_val( + 'shard_scanner_batch_size', config_positive_int_value, 10) + self.cleave_batch_size = get_val( + 'cleave_batch_size', config_positive_int_value, 2) + self.cleave_row_batch_size = get_val( + 'cleave_row_batch_size', config_positive_int_value, 10000) + self.broker_timeout = get_val( + 'broker_timeout', config_positive_int_value, 60) + self.recon_candidates_limit = get_val( + 'recon_candidates_limit', int, 5) + self.recon_sharded_timeout = get_val( + 'recon_sharded_timeout', int, 43200) + self.conn_timeout = get_val( + 'conn_timeout', float, 5) + self.auto_shard = get_val( + 'auto_shard', config_true_value, False) + self.shrink_threshold = get_val( + 'shard_shrink_point', self.percent_of_threshold, 10) + self.expansion_limit = get_val( + 'shard_shrink_merge_point', self.percent_of_threshold, 75) + self.rows_per_shard = self.shard_container_threshold // 2 + + def percent_of_threshold(self, val): + return int(config_percent_value(val) * self.shard_container_threshold) -class ContainerSharder(ContainerReplicator): +DEFAULT_SHARDER_CONF = vars(ContainerSharderConf()) + + +class ContainerSharder(ContainerSharderConf, ContainerReplicator): """Shards containers.""" def __init__(self, conf, logger=None): logger = logger or get_logger(conf, log_route='container-sharder') - super(ContainerSharder, self).__init__(conf, logger=logger) + ContainerReplicator.__init__(self, conf, logger=logger) + ContainerSharderConf.__init__(self, conf) if conf.get('auto_create_account_prefix'): self.logger.warning('Option auto_create_account_prefix is ' 'deprecated. Configure ' @@ -597,37 +648,8 @@ class ContainerSharder(ContainerReplicator): else: auto_create_account_prefix = AUTO_CREATE_ACCOUNT_PREFIX self.shards_account_prefix = (auto_create_account_prefix + 'shards_') - self.shard_shrink_point = config_percent_value( - conf.get('shard_shrink_point', DEFAULT_SHARD_SHRINK_POINT)) - self.shrink_merge_point = config_percent_value( - conf.get('shard_shrink_merge_point', DEFAULT_SHARD_MERGE_POINT)) - self.shard_container_threshold = config_positive_int_value( - conf.get('shard_container_threshold', - DEFAULT_SHARD_CONTAINER_THRESHOLD)) - self.shrink_size = (self.shard_container_threshold * - self.shard_shrink_point) - self.merge_size = (self.shard_container_threshold * - self.shrink_merge_point) - self.split_size = self.shard_container_threshold // 2 - self.scanner_batch_size = config_positive_int_value( - conf.get('shard_scanner_batch_size', 10)) - self.cleave_batch_size = config_positive_int_value( - conf.get('cleave_batch_size', 2)) - self.cleave_row_batch_size = config_positive_int_value( - conf.get('cleave_row_batch_size', 10000)) - self.max_shrinking = int(conf.get('max_shrinking', - DEFAULT_MAX_SHRINKING)) - self.max_expanding = int(conf.get('max_expanding', - DEFAULT_MAX_EXPANDING)) - self.auto_shard = config_true_value(conf.get('auto_shard', False)) self.sharding_candidates = [] self.shrinking_candidates = [] - self.recon_candidates_limit = int( - conf.get('recon_candidates_limit', 5)) - self.recon_sharded_timeout = int( - conf.get('recon_sharded_timeout', 43200)) - self.broker_timeout = config_positive_int_value( - conf.get('broker_timeout', 60)) replica_count = self.ring.replica_count quorum = quorum_size(replica_count) self.shard_replication_quorum = config_auto_int_value( @@ -649,7 +671,6 @@ class ContainerSharder(ContainerReplicator): self.existing_shard_replication_quorum = replica_count # internal client - self.conn_timeout = float(conf.get('conn_timeout', 5)) request_tries = config_positive_int_value( conf.get('request_tries', 3)) internal_client_conf_path = conf.get('internal_client_conf_path', @@ -728,7 +749,7 @@ class ContainerSharder(ContainerReplicator): def _identify_shrinking_candidate(self, broker, node): sequences = find_compactible_shard_sequences( - broker, self.shrink_size, self.merge_size, + broker, self.shrink_threshold, self.expansion_limit, self.max_shrinking, self.max_expanding) # compactible_ranges are all apart from final acceptor in each sequence compactible_ranges = sum(len(seq) - 1 for seq in sequences) @@ -1449,7 +1470,7 @@ class ContainerSharder(ContainerReplicator): start = time.time() shard_data, last_found = broker.find_shard_ranges( - self.split_size, limit=self.scanner_batch_size, + self.rows_per_shard, limit=self.shard_scanner_batch_size, existing_ranges=shard_ranges) elapsed = time.time() - start @@ -1812,8 +1833,8 @@ class ContainerSharder(ContainerReplicator): return compactible_sequences = find_compactible_shard_sequences( - broker, self.shrink_size, self.merge_size, self.max_shrinking, - self.max_expanding, include_shrinking=True) + broker, self.shrink_threshold, self.expansion_limit, + self.max_shrinking, self.max_expanding, include_shrinking=True) self.logger.debug('Found %s compactible sequences of length(s) %s' % (len(compactible_sequences), [len(s) for s in compactible_sequences])) diff --git a/test/unit/cli/test_manage_shard_ranges.py b/test/unit/cli/test_manage_shard_ranges.py index 9e3fcb01b2..dbc81fa3dc 100644 --- a/test/unit/cli/test_manage_shard_ranges.py +++ b/test/unit/cli/test_manage_shard_ranges.py @@ -203,7 +203,7 @@ class TestManageShardRanges(unittest.TestCase): max_expanding=-1, max_shrinking=1, shrink_threshold=100000, - expansion_limit=500000, + expansion_limit=750000, yes=False, dry_run=False) mocked.assert_called_once_with(mock.ANY, expected) @@ -283,6 +283,11 @@ class TestManageShardRanges(unittest.TestCase): dry_run=False) mocked.assert_called_once_with(mock.ANY, expected) + def test_conf_file_invalid(self): + db_file = os.path.join(self.testdir, 'hash.db') + broker = ContainerBroker(db_file, account='a', container='c') + broker.initialize() + # conf file - invalid value for shard_container_threshold conf = """ [container-sharder] @@ -303,8 +308,12 @@ class TestManageShardRanges(unittest.TestCase): self.assertEqual(2, ret) err_lines = err.getvalue().split('\n') self.assert_starts_with(err_lines[0], 'Error loading config file') + self.assertIn('shard_container_threshold', err_lines[0]) - # conf file - cannot open conf file + def test_conf_file_does_not_exist(self): + db_file = os.path.join(self.testdir, 'hash.db') + broker = ContainerBroker(db_file, account='a', container='c') + broker.initialize() conf_file = os.path.join(self.testdir, 'missing_sharder.conf') out = StringIO() err = StringIO() diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index a3cc933d08..9b5b99e919 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -38,11 +38,10 @@ from swift.container import replicator from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \ SHARDED, DATADIR from swift.container.sharder import ContainerSharder, sharding_enabled, \ - CleavingContext, DEFAULT_SHARD_SHRINK_POINT, \ - DEFAULT_SHARD_CONTAINER_THRESHOLD, finalize_shrinking, \ + CleavingContext, DEFAULT_SHARDER_CONF, finalize_shrinking, \ find_shrinking_candidates, process_compactible_shard_sequences, \ find_compactible_shard_sequences, is_shrinking_candidate, \ - is_sharding_candidate, find_paths, rank_paths + is_sharding_candidate, find_paths, rank_paths, ContainerSharderConf from swift.common.utils import ShardRange, Timestamp, hash_path, \ encode_timestamps, parse_db_filename, quorum_size, Everything, md5 from test import annotate_failure @@ -152,6 +151,7 @@ class TestSharder(BaseTestSharder): (k, v, getattr(sharder, k))) return sharder, mock_ic + # defaults expected = { 'mount_check': True, 'bind_ip': '0.0.0.0', 'port': 6201, 'per_diff': 1000, 'max_diffs': 100, 'interval': 30, @@ -161,12 +161,12 @@ class TestSharder(BaseTestSharder): 'rsync_compress': False, 'rsync_module': '{replication_ip}::container', 'reclaim_age': 86400 * 7, - 'shard_shrink_point': 0.10, - 'shrink_merge_point': 0.75, 'shard_container_threshold': 1000000, - 'split_size': 500000, + 'rows_per_shard': 500000, + 'shrink_threshold': 100000, + 'expansion_limit': 750000, 'cleave_batch_size': 2, - 'scanner_batch_size': 10, + 'shard_scanner_batch_size': 10, 'rcache': '/var/cache/swift/container.recon', 'shards_account_prefix': '.shards_', 'auto_shard': False, @@ -185,6 +185,7 @@ class TestSharder(BaseTestSharder): allow_modify_pipeline=False, use_replication_network=True) + # non-default conf = { 'mount_check': False, 'bind_ip': '10.11.12.13', 'bind_port': 62010, 'per_diff': 2000, 'max_diffs': 200, 'interval': 60, @@ -209,7 +210,8 @@ class TestSharder(BaseTestSharder): 'shard_replication_quorum': 1, 'existing_shard_replication_quorum': 0, 'max_shrinking': 5, - 'max_expanding': 4 + 'max_expanding': 4, + 'rows_per_shard': 13, # should be ignored - not configurable } expected = { 'mount_check': False, 'bind_ip': '10.11.12.13', 'port': 62010, @@ -220,12 +222,12 @@ class TestSharder(BaseTestSharder): 'rsync_compress': True, 'rsync_module': '{replication_ip}::container_sda', 'reclaim_age': 86400 * 14, - 'shard_shrink_point': 0.35, - 'shrink_merge_point': 0.85, 'shard_container_threshold': 20000000, - 'split_size': 10000000, + 'rows_per_shard': 10000000, + 'shrink_threshold': 7000000, + 'expansion_limit': 17000000, 'cleave_batch_size': 4, - 'scanner_batch_size': 8, + 'shard_scanner_batch_size': 8, 'rcache': '/var/cache/swift-alt/container.recon', 'shards_account_prefix': '...shards_', 'auto_shard': True, @@ -4090,7 +4092,7 @@ class TestSharder(BaseTestSharder): def _check_old_style_find_shard_ranges_none_found(self, broker, objects): with self._mock_sharder() as sharder: num_found = sharder._find_shard_ranges(broker) - self.assertGreater(sharder.split_size, len(objects)) + self.assertGreater(sharder.rows_per_shard, len(objects)) self.assertEqual(0, num_found) self.assertFalse(broker.get_shard_ranges()) expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, @@ -4102,7 +4104,7 @@ class TestSharder(BaseTestSharder): with self._mock_sharder( conf={'shard_container_threshold': 200}) as sharder: num_found = sharder._find_shard_ranges(broker) - self.assertEqual(sharder.split_size, len(objects)) + self.assertEqual(sharder.rows_per_shard, len(objects)) self.assertEqual(0, num_found) self.assertFalse(broker.get_shard_ranges()) expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, @@ -4143,7 +4145,7 @@ class TestSharder(BaseTestSharder): ) as sharder: with mock_timestamp_now() as now: num_found = sharder._find_shard_ranges(broker) - self.assertEqual(99, sharder.split_size) + self.assertEqual(99, sharder.rows_per_shard) self.assertEqual(2, num_found) check_ranges() expected_stats = {'attempted': 1, 'success': 1, 'failure': 0, @@ -4190,7 +4192,7 @@ class TestSharder(BaseTestSharder): def _check_find_shard_ranges_none_found(self, broker, objects): with self._mock_sharder() as sharder: num_found = sharder._find_shard_ranges(broker) - self.assertGreater(sharder.split_size, len(objects)) + self.assertGreater(sharder.rows_per_shard, len(objects)) self.assertEqual(0, num_found) self.assertFalse(broker.get_shard_ranges()) expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, @@ -4202,7 +4204,7 @@ class TestSharder(BaseTestSharder): with self._mock_sharder( conf={'shard_container_threshold': 200}) as sharder: num_found = sharder._find_shard_ranges(broker) - self.assertEqual(sharder.split_size, len(objects)) + self.assertEqual(sharder.rows_per_shard, len(objects)) self.assertEqual(0, num_found) self.assertFalse(broker.get_shard_ranges()) expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, @@ -4242,7 +4244,7 @@ class TestSharder(BaseTestSharder): ) as sharder: with mock_timestamp_now() as now: num_found = sharder._find_shard_ranges(broker) - self.assertEqual(99, sharder.split_size) + self.assertEqual(99, sharder.rows_per_shard) self.assertEqual(2, num_found) check_ranges() expected_stats = {'attempted': 1, 'success': 1, 'failure': 0, @@ -4293,7 +4295,7 @@ class TestSharder(BaseTestSharder): 'shard_scanner_batch_size': 2}) as sharder: with mock_timestamp_now(now): num_found = sharder._find_shard_ranges(broker) - self.assertEqual(45, sharder.split_size) + self.assertEqual(45, sharder.rows_per_shard) self.assertEqual(2, num_found) self.assertEqual(2, len(broker.get_shard_ranges())) self._assert_shard_ranges_equal(expected_ranges[:2], @@ -5505,8 +5507,7 @@ class TestSharder(BaseTestSharder): broker = self._make_broker() broker.enable_sharding(next(self.ts_iter)) shard_bounds = (('', 'here'), ('here', 'there'), ('there', '')) - size = (DEFAULT_SHARD_SHRINK_POINT * - DEFAULT_SHARD_CONTAINER_THRESHOLD / 100) + size = (DEFAULT_SHARDER_CONF['shrink_threshold']) # all shard ranges too big to shrink shard_ranges = self._make_shard_ranges( @@ -5612,8 +5613,7 @@ class TestSharder(BaseTestSharder): broker.enable_sharding(next(self.ts_iter)) shard_bounds = (('', 'a'), ('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', '')) - size = (DEFAULT_SHARD_SHRINK_POINT * - DEFAULT_SHARD_CONTAINER_THRESHOLD / 100) + size = (DEFAULT_SHARDER_CONF['shrink_threshold']) shard_ranges = self._make_shard_ranges( shard_bounds, state=ShardRange.ACTIVE, object_count=size) own_sr = broker.get_own_shard_range() @@ -5831,7 +5831,8 @@ class TestSharder(BaseTestSharder): brokers.append(broker) shard_ranges.append(self._make_shard_ranges( shard_bounds, state=ShardRange.ACTIVE, - object_count=(DEFAULT_SHARD_CONTAINER_THRESHOLD / 2), + object_count=( + DEFAULT_SHARDER_CONF['shard_container_threshold'] / 2), timestamp=next(self.ts_iter))) # we want c2 to have 2 shrink pairs @@ -5844,7 +5845,7 @@ class TestSharder(BaseTestSharder): # we want c1 to have the same, but one can't be shrunk shard_ranges[C1][1].object_count = 0 shard_ranges[C1][2].object_count = \ - DEFAULT_SHARD_CONTAINER_THRESHOLD - 1 + DEFAULT_SHARDER_CONF['shard_container_threshold'] - 1 shard_ranges[C1][3].object_count = 0 brokers[C1].merge_shard_ranges(shard_ranges[C1]) brokers[C1].set_sharding_state() @@ -5922,7 +5923,8 @@ class TestSharder(BaseTestSharder): # and no longer appears in stats def shrink_actionable_ranges(broker): compactible = find_compactible_shard_sequences( - broker, sharder.shrink_size, sharder.merge_size, 1, -1) + broker, sharder.shrink_threshold, sharder.expansion_limit, + 1, -1) self.assertNotEqual([], compactible) with mock_timestamp_now(next(self.ts_iter)): process_compactible_shard_sequences(broker, compactible) @@ -6491,8 +6493,7 @@ class TestSharderFunctions(BaseTestSharder): def test_find_shrinking_candidates(self): broker = self._make_broker() shard_bounds = (('', 'a'), ('a', 'b'), ('b', 'c'), ('c', 'd')) - threshold = (DEFAULT_SHARD_SHRINK_POINT * - DEFAULT_SHARD_CONTAINER_THRESHOLD / 100) + threshold = (DEFAULT_SHARDER_CONF['shrink_threshold']) shard_ranges = self._make_shard_ranges( shard_bounds, state=ShardRange.ACTIVE, object_count=threshold, timestamp=next(self.ts_iter)) @@ -7182,3 +7183,70 @@ class TestSharderFunctions(BaseTestSharder): ], rank_paths(paths, own_sr) ) + + +class TestContainerSharderConf(unittest.TestCase): + def test_default(self): + expected = {'shard_container_threshold': 1000000, + 'max_shrinking': 1, + 'max_expanding': -1, + 'shard_scanner_batch_size': 10, + 'cleave_batch_size': 2, + 'cleave_row_batch_size': 10000, + 'broker_timeout': 60, + 'recon_candidates_limit': 5, + 'recon_sharded_timeout': 43200, + 'conn_timeout': 5.0, + 'auto_shard': False, + 'shrink_threshold': 100000, + 'expansion_limit': 750000, + 'rows_per_shard': 500000} + self.assertEqual(expected, vars(ContainerSharderConf())) + self.assertEqual(expected, vars(ContainerSharderConf(None))) + self.assertEqual(expected, DEFAULT_SHARDER_CONF) + + def test_conf(self): + base_conf = {'shard_container_threshold': 2000000, + 'max_shrinking': 2, + 'max_expanding': 3, + 'shard_scanner_batch_size': 11, + 'cleave_batch_size': 4, + 'cleave_row_batch_size': 50000, + 'broker_timeout': 61, + 'recon_candidates_limit': 6, + 'recon_sharded_timeout': 43201, + 'conn_timeout': 5.1, + 'auto_shard': True} + + percent_conf = {'shard_shrink_point': 9, + 'shard_shrink_merge_point': 71} + expected = dict(base_conf, rows_per_shard=1000000, + shrink_threshold=180000, expansion_limit=1420000) + conf = dict(base_conf) + conf.update(percent_conf) + self.assertEqual(expected, vars(ContainerSharderConf(conf))) + + def test_bad_values(self): + not_positive_int = [0, -1, 'bad'] + not_int = not_float = ['bad'] + not_percent = ['bad', -1, 101, -0.1, 100.1] + bad = {'shard_container_threshold': not_positive_int, + 'max_shrinking': not_int, + 'max_expanding': not_int, + 'shard_scanner_batch_size': not_positive_int, + 'cleave_batch_size': not_positive_int, + 'cleave_row_batch_size': not_positive_int, + 'broker_timeout': not_positive_int, + 'recon_candidates_limit': not_int, + 'recon_sharded_timeout': not_int, + 'conn_timeout': not_float, + # 'auto_shard': anything can be passed to config_true_value + 'shard_shrink_point': not_percent, + 'shard_shrink_merge_point': not_percent} + + for key, bad_values in bad.items(): + for bad_value in bad_values: + with self.assertRaises( + ValueError, msg='{%s : %s}' % (key, bad_value)) as cm: + ContainerSharderConf({key: bad_value}) + self.assertIn('Error setting %s' % key, str(cm.exception))