Merge "Use ContainerSharderConf class in sharder and manage-shard-ranges"

This commit is contained in:
Zuul 2021-05-24 07:58:17 +00:00 committed by Gerrit Code Review
commit f7f1553edb
6 changed files with 241 additions and 160 deletions

View File

@ -363,12 +363,12 @@ shard_shrink_merge_point 75 When auto-sharding is
For example, if For example, if
shard_container_threshold shard_container_threshold
is 1 million, is 1 million,
shard_shrink_point is 5, shard_shrink_point is 10,
and shard_shrink_merge_point and shard_shrink_merge_point
is 75 then a shard will is 75 then a shard will
be considered for be considered for
shrinking if it has less shrinking if it has less
than or equal to 50 than or equal to 100
thousand objects but will thousand objects but will
only merge into an only merge into an
acceptor if the combined acceptor if the combined

View File

@ -375,7 +375,7 @@ use = egg:swift#xprofile
# When auto-sharding is enabled shard_shrink_point defines the object count # When auto-sharding is enabled shard_shrink_point defines the object count
# below which a 'donor' shard container will be considered for shrinking into # below which a 'donor' shard container will be considered for shrinking into
# another 'acceptor' shard container. shard_shrink_point is a percentage of # another 'acceptor' shard container. shard_shrink_point is a percentage of
# shard_container_threshold e.g. the default value of 5 means 5% of the # shard_container_threshold e.g. the default value of 10 means 10% of the
# shard_container_threshold. # shard_container_threshold.
# shard_shrink_point = 10 # shard_shrink_point = 10
# #
@ -387,8 +387,8 @@ use = egg:swift#xprofile
# for the donor to be allowed to merge into the acceptor. # for the donor to be allowed to merge into the acceptor.
# #
# For example, if the shard_container_threshold is 1 million, # For example, if the shard_container_threshold is 1 million,
# shard_shrink_point is 5, and shard_shrink_merge_point is 75 then a shard will # shard_shrink_point is 10, and shard_shrink_merge_point is 75 then a shard will
# be considered for shrinking if it has less than or equal to 50 thousand # be considered for shrinking if it has less than or equal to 100 thousand
# objects but will only merge into an acceptor if the combined object count # objects but will only merge into an acceptor if the combined object count
# would be less than or equal to 750 thousand objects. # would be less than or equal to 750 thousand objects.
# shard_shrink_merge_point = 75 # shard_shrink_merge_point = 75

View File

@ -165,26 +165,27 @@ from contextlib import contextmanager
from six.moves import input from six.moves import input
from swift.common.utils import Timestamp, get_logger, ShardRange, readconf, \ from swift.common.utils import Timestamp, get_logger, ShardRange, readconf, \
config_percent_value, config_positive_int_value, ShardRangeList ShardRangeList
from swift.container.backend import ContainerBroker, UNSHARDED from swift.container.backend import ContainerBroker, UNSHARDED
from swift.container.sharder import make_shard_ranges, sharding_enabled, \ from swift.container.sharder import make_shard_ranges, sharding_enabled, \
CleavingContext, process_compactible_shard_sequences, \ CleavingContext, process_compactible_shard_sequences, \
find_compactible_shard_sequences, find_overlapping_ranges, \ find_compactible_shard_sequences, find_overlapping_ranges, \
find_paths, rank_paths, finalize_shrinking, \ find_paths, rank_paths, finalize_shrinking, DEFAULT_SHARDER_CONF, \
DEFAULT_MAX_SHRINKING, DEFAULT_MAX_EXPANDING, \ ContainerSharderConf
DEFAULT_SHARD_CONTAINER_THRESHOLD, DEFAULT_SHARD_SHRINK_POINT, \
DEFAULT_SHARD_MERGE_POINT
DEFAULT_ROWS_PER_SHARD = DEFAULT_SHARD_CONTAINER_THRESHOLD // 2
DEFAULT_SHRINK_THRESHOLD = DEFAULT_SHARD_CONTAINER_THRESHOLD * \
config_percent_value(DEFAULT_SHARD_SHRINK_POINT)
EXIT_SUCCESS = 0 EXIT_SUCCESS = 0
EXIT_ERROR = 1 EXIT_ERROR = 1
EXIT_INVALID_ARGS = 2 # consistent with argparse exit code for invalid args EXIT_INVALID_ARGS = 2 # consistent with argparse exit code for invalid args
EXIT_USER_QUIT = 3 EXIT_USER_QUIT = 3
# Some CLI options derive their default values from DEFAULT_SHARDER_CONF if
# they have not been set. It is therefore important that the CLI parser
# provides None as a default so that we can detect that no value was set on the
# command line. We use this alias to act as a reminder.
USE_SHARDER_DEFAULT = object()
class ManageShardRangesException(Exception): class ManageShardRangesException(Exception):
pass pass
@ -702,14 +703,21 @@ def _positive_int(arg):
def _add_find_args(parser): def _add_find_args(parser):
parser.add_argument('rows_per_shard', nargs='?', type=int, parser.add_argument(
default=None) 'rows_per_shard', nargs='?', type=int, default=USE_SHARDER_DEFAULT,
help='Target number of rows for newly created shards. '
'Default is half of the shard_container_threshold value if that is '
'given in a conf file specified with --config, otherwise %s.'
% DEFAULT_SHARDER_CONF['rows_per_shard'])
def _add_replace_args(parser): def _add_replace_args(parser):
parser.add_argument( parser.add_argument(
'--shards_account_prefix', metavar='shards_account_prefix', type=str, '--shards_account_prefix', metavar='shards_account_prefix', type=str,
required=False, help='Prefix for shards account', default='.shards_') required=False, default='.shards_',
help="Prefix for shards account. The default is '.shards_'. This "
"should only be changed if the auto_create_account_prefix option "
"has been similarly changed in swift.conf.")
parser.add_argument( parser.add_argument(
'--replace-timeout', type=int, default=600, '--replace-timeout', type=int, default=600,
help='Minimum DB timeout to use when replacing shard ranges.') help='Minimum DB timeout to use when replacing shard ranges.')
@ -825,19 +833,18 @@ def _make_parser():
help='Compact shard ranges with less than the shrink-threshold number ' help='Compact shard ranges with less than the shrink-threshold number '
'of rows. This command only works on root containers.') 'of rows. This command only works on root containers.')
_add_prompt_args(compact_parser) _add_prompt_args(compact_parser)
compact_parser.add_argument('--shrink-threshold', nargs='?', compact_parser.add_argument(
type=_positive_int, '--shrink-threshold', nargs='?', type=_positive_int,
default=None, default=USE_SHARDER_DEFAULT,
help='The number of rows below which a shard ' help='The number of rows below which a shard can qualify for '
'can qualify for shrinking. Defaults to ' 'shrinking. '
'%d' % DEFAULT_SHRINK_THRESHOLD) 'Defaults to %d' % DEFAULT_SHARDER_CONF['shrink_threshold'])
compact_parser.add_argument('--expansion-limit', nargs='?', compact_parser.add_argument(
type=_positive_int, '--expansion-limit', nargs='?', type=_positive_int,
default=None, default=USE_SHARDER_DEFAULT,
help='Maximum number of rows for an expanding ' help='Maximum number of rows for an expanding shard to have after '
'shard to have after compaction has ' 'compaction has completed. '
'completed. Defaults to %d' % 'Defaults to %d' % DEFAULT_SHARDER_CONF['expansion_limit'])
DEFAULT_ROWS_PER_SHARD)
# If just one donor shard is chosen to shrink to an acceptor then the # If just one donor shard is chosen to shrink to an acceptor then the
# expanded acceptor will handle object listings as soon as the donor shard # expanded acceptor will handle object listings as soon as the donor shard
# has shrunk. If more than one donor shard are chosen to shrink to an # has shrunk. If more than one donor shard are chosen to shrink to an
@ -846,7 +853,7 @@ def _make_parser():
# temporary gap(s) in object listings where the shrunk donors are missing. # temporary gap(s) in object listings where the shrunk donors are missing.
compact_parser.add_argument('--max-shrinking', nargs='?', compact_parser.add_argument('--max-shrinking', nargs='?',
type=_positive_int, type=_positive_int,
default=None, default=USE_SHARDER_DEFAULT,
help='Maximum number of shards that should be ' help='Maximum number of shards that should be '
'shrunk into each expanding shard. ' 'shrunk into each expanding shard. '
'Defaults to 1. Using values greater ' 'Defaults to 1. Using values greater '
@ -855,7 +862,7 @@ def _make_parser():
'shards have shrunk.') 'shards have shrunk.')
compact_parser.add_argument('--max-expanding', nargs='?', compact_parser.add_argument('--max-expanding', nargs='?',
type=_positive_int, type=_positive_int,
default=None, default=USE_SHARDER_DEFAULT,
help='Maximum number of shards that should be ' help='Maximum number of shards that should be '
'expanded. Defaults to unlimited.') 'expanded. Defaults to unlimited.')
compact_parser.set_defaults(func=compact_shard_ranges) compact_parser.set_defaults(func=compact_shard_ranges)
@ -878,9 +885,9 @@ def _make_parser():
return parser return parser
def main(args=None): def main(cli_args=None):
parser = _make_parser() parser = _make_parser()
args = parser.parse_args(args) args = parser.parse_args(cli_args)
if not args.subcommand: if not args.subcommand:
# On py2, subparsers are required; on py3 they are not; see # On py2, subparsers are required; on py3 they are not; see
# https://bugs.python.org/issue9253. py37 added a `required` kwarg # https://bugs.python.org/issue9253. py37 added a `required` kwarg
@ -891,26 +898,12 @@ def main(args=None):
print('\nA sub-command is required.', file=sys.stderr) print('\nA sub-command is required.', file=sys.stderr)
return EXIT_INVALID_ARGS return EXIT_INVALID_ARGS
conf = {}
rows_per_shard = DEFAULT_ROWS_PER_SHARD
shrink_threshold = DEFAULT_SHRINK_THRESHOLD
expansion_limit = DEFAULT_ROWS_PER_SHARD
if args.conf_file:
try: try:
# load values from conf file or sharder defaults
conf = {}
if args.conf_file:
conf = readconf(args.conf_file, 'container-sharder') conf = readconf(args.conf_file, 'container-sharder')
shard_container_threshold = config_positive_int_value(conf.get( conf_args = ContainerSharderConf(conf)
'shard_container_threshold',
DEFAULT_SHARD_CONTAINER_THRESHOLD))
if shard_container_threshold:
rows_per_shard = shard_container_threshold // 2
shrink_threshold = int(
shard_container_threshold * config_percent_value(
conf.get('shard_shrink_point',
DEFAULT_SHARD_SHRINK_POINT)))
expansion_limit = int(
shard_container_threshold * config_percent_value(
conf.get('shard_shrink_merge_point',
DEFAULT_SHARD_MERGE_POINT)))
except (OSError, IOError) as exc: except (OSError, IOError) as exc:
print('Error opening config file %s: %s' % (args.conf_file, exc), print('Error opening config file %s: %s' % (args.conf_file, exc),
file=sys.stderr) file=sys.stderr)
@ -920,20 +913,10 @@ def main(args=None):
file=sys.stderr) file=sys.stderr)
return EXIT_INVALID_ARGS return EXIT_INVALID_ARGS
# seems having sub parsers mean sometimes an arg wont exist in the args for k, v in vars(args).items():
# namespace. But we can check if it is with the 'in' statement. # set any un-set cli args from conf_args
if "max_shrinking" in args and args.max_shrinking is None: if v is USE_SHARDER_DEFAULT:
args.max_shrinking = int(conf.get( setattr(args, k, getattr(conf_args, k))
"max_shrinking", DEFAULT_MAX_SHRINKING))
if "max_expanding" in args and args.max_expanding is None:
args.max_expanding = int(conf.get(
"max_expanding", DEFAULT_MAX_EXPANDING))
if "shrink_threshold" in args and args.shrink_threshold is None:
args.shrink_threshold = shrink_threshold
if "expansion_limit" in args and args.expansion_limit is None:
args.expansion_limit = expansion_limit
if "rows_per_shard" in args and args.rows_per_shard is None:
args.rows_per_shard = rows_per_shard
if args.func in (analyze_shard_ranges,): if args.func in (analyze_shard_ranges,):
args.input = args.path_to_file args.input = args.path_to_file

View File

@ -127,16 +127,16 @@ def is_sharding_candidate(shard_range, threshold):
shard_range.object_count >= threshold) shard_range.object_count >= threshold)
def is_shrinking_candidate(shard_range, shrink_threshold, merge_size, def is_shrinking_candidate(shard_range, shrink_threshold, expansion_limit,
states=None): states=None):
# typically shrink_threshold < merge_size but check both just in case # typically shrink_threshold < expansion_limit but check both just in case
# note: use *row* count (objects plus tombstones) as the condition for # note: use *row* count (objects plus tombstones) as the condition for
# shrinking to avoid inadvertently moving large numbers of tombstones into # shrinking to avoid inadvertently moving large numbers of tombstones into
# an acceptor # an acceptor
states = states or (ShardRange.ACTIVE,) states = states or (ShardRange.ACTIVE,)
return (shard_range.state in states and return (shard_range.state in states and
shard_range.row_count < shrink_threshold and shard_range.row_count < shrink_threshold and
shard_range.row_count <= merge_size) shard_range.row_count <= expansion_limit)
def find_sharding_candidates(broker, threshold, shard_ranges=None): def find_sharding_candidates(broker, threshold, shard_ranges=None):
@ -157,13 +157,13 @@ def find_sharding_candidates(broker, threshold, shard_ranges=None):
return candidates return candidates
def find_shrinking_candidates(broker, shrink_threshold, merge_size): def find_shrinking_candidates(broker, shrink_threshold, expansion_limit):
# this is only here to preserve a legacy public function signature; # this is only here to preserve a legacy public function signature;
# superseded by find_compactible_shard_sequences # superseded by find_compactible_shard_sequences
merge_pairs = {} merge_pairs = {}
# restrict search to sequences with one donor # restrict search to sequences with one donor
results = find_compactible_shard_sequences(broker, shrink_threshold, results = find_compactible_shard_sequences(broker, shrink_threshold,
merge_size, 1, -1, expansion_limit, 1, -1,
include_shrinking=True) include_shrinking=True)
for sequence in results: for sequence in results:
# map acceptor -> donor list # map acceptor -> donor list
@ -173,7 +173,7 @@ def find_shrinking_candidates(broker, shrink_threshold, merge_size):
def find_compactible_shard_sequences(broker, def find_compactible_shard_sequences(broker,
shrink_threshold, shrink_threshold,
merge_size, expansion_limit,
max_shrinking, max_shrinking,
max_expanding, max_expanding,
include_shrinking=False): include_shrinking=False):
@ -186,8 +186,8 @@ def find_compactible_shard_sequences(broker,
:param broker: A :class:`~swift.container.backend.ContainerBroker`. :param broker: A :class:`~swift.container.backend.ContainerBroker`.
:param shrink_threshold: the number of rows below which a shard may be :param shrink_threshold: the number of rows below which a shard may be
considered for shrinking into another shard considered for shrinking into another shard
:param merge_size: the maximum number of rows that an acceptor shard range :param expansion_limit: the maximum number of rows that an acceptor shard
should have after other shard ranges have been compacted into it range should have after other shard ranges have been compacted into it
:param max_shrinking: the maximum number of shard ranges that should be :param max_shrinking: the maximum number of shard ranges that should be
compacted into each acceptor; -1 implies unlimited. compacted into each acceptor; -1 implies unlimited.
:param max_expanding: the maximum number of acceptors to be found (i.e. the :param max_expanding: the maximum number of acceptors to be found (i.e. the
@ -214,13 +214,13 @@ def find_compactible_shard_sequences(broker,
# - the max number of shard ranges to be compacted (max_shrinking) has # - the max number of shard ranges to be compacted (max_shrinking) has
# been reached # been reached
# - the total number of objects in the sequence has reached the # - the total number of objects in the sequence has reached the
# merge_size # expansion_limit
if (sequence and if (sequence and
(not is_shrinking_candidate( (not is_shrinking_candidate(
sequence[-1], shrink_threshold, merge_size, sequence[-1], shrink_threshold, expansion_limit,
states=(ShardRange.ACTIVE, ShardRange.SHRINKING)) or states=(ShardRange.ACTIVE, ShardRange.SHRINKING)) or
0 < max_shrinking < len(sequence) or 0 < max_shrinking < len(sequence) or
sequence.row_count >= merge_size)): sequence.row_count >= expansion_limit)):
return True return True
return False return False
@ -230,7 +230,7 @@ def find_compactible_shard_sequences(broker,
while ((max_expanding < 0 or expanding < max_expanding) and while ((max_expanding < 0 or expanding < max_expanding) and
index < len(shard_ranges)): index < len(shard_ranges)):
if not is_shrinking_candidate( if not is_shrinking_candidate(
shard_ranges[index], shrink_threshold, merge_size, shard_ranges[index], shrink_threshold, expansion_limit,
states=(ShardRange.ACTIVE, ShardRange.SHRINKING)): states=(ShardRange.ACTIVE, ShardRange.SHRINKING)):
# this shard range cannot be the start of a new or existing # this shard range cannot be the start of a new or existing
# compactible sequence, move on # compactible sequence, move on
@ -255,7 +255,7 @@ def find_compactible_shard_sequences(broker,
# already shrinking: add to sequence unconditionally # already shrinking: add to sequence unconditionally
sequence.append(shard_range) sequence.append(shard_range)
elif (sequence.row_count + shard_range.row_count elif (sequence.row_count + shard_range.row_count
<= merge_size): <= expansion_limit):
# add to sequence: could be a donor or acceptor # add to sequence: could be a donor or acceptor
sequence.append(shard_range) sequence.append(shard_range)
if sequence_complete(sequence): if sequence_complete(sequence):
@ -270,7 +270,7 @@ def find_compactible_shard_sequences(broker,
sequence.includes(own_shard_range)): sequence.includes(own_shard_range)):
# special case: only one sequence has been found, which consumes # special case: only one sequence has been found, which consumes
# all shard ranges, encompasses the entire namespace, has no more # all shard ranges, encompasses the entire namespace, has no more
# than merge_size records and whose shard ranges are all # than expansion_limit records and whose shard ranges are all
# shrinkable; all the shards in the sequence can be shrunk to the # shrinkable; all the shards in the sequence can be shrunk to the
# root, so append own_shard_range to the sequence to act as an # root, so append own_shard_range to the sequence to act as an
# acceptor; note: only shrink to the root when *all* the remaining # acceptor; note: only shrink to the root when *all* the remaining
@ -573,19 +573,70 @@ class CleavingContext(object):
broker.set_sharding_sysmeta('Context-' + self.ref, '') broker.set_sharding_sysmeta('Context-' + self.ref, '')
DEFAULT_SHARD_CONTAINER_THRESHOLD = 1000000 class ContainerSharderConf(object):
DEFAULT_SHARD_SHRINK_POINT = 10 def __init__(self, conf=None):
DEFAULT_SHARD_MERGE_POINT = 75 conf = conf if conf else {}
DEFAULT_MAX_SHRINKING = 1
DEFAULT_MAX_EXPANDING = -1 def get_val(key, validator, default):
"""
Get a value from conf and validate it.
:param key: key to lookup value in the ``conf`` dict.
:param validator: A function that will passed the value from the
``conf`` dict and should return the value to be set. This
function should raise a ValueError if the ``conf`` value if not
valid.
:param default: value to use if ``key`` is not found in ``conf``.
:raises: ValueError if the value read from ``conf`` is invalid.
:returns: the configuration value.
"""
try:
return validator(conf.get(key, default))
except ValueError as err:
raise ValueError('Error setting %s: %s' % (key, err))
self.shard_container_threshold = get_val(
'shard_container_threshold', config_positive_int_value, 1000000)
self.max_shrinking = get_val(
'max_shrinking', int, 1)
self.max_expanding = get_val(
'max_expanding', int, -1)
self.shard_scanner_batch_size = get_val(
'shard_scanner_batch_size', config_positive_int_value, 10)
self.cleave_batch_size = get_val(
'cleave_batch_size', config_positive_int_value, 2)
self.cleave_row_batch_size = get_val(
'cleave_row_batch_size', config_positive_int_value, 10000)
self.broker_timeout = get_val(
'broker_timeout', config_positive_int_value, 60)
self.recon_candidates_limit = get_val(
'recon_candidates_limit', int, 5)
self.recon_sharded_timeout = get_val(
'recon_sharded_timeout', int, 43200)
self.conn_timeout = get_val(
'conn_timeout', float, 5)
self.auto_shard = get_val(
'auto_shard', config_true_value, False)
self.shrink_threshold = get_val(
'shard_shrink_point', self.percent_of_threshold, 10)
self.expansion_limit = get_val(
'shard_shrink_merge_point', self.percent_of_threshold, 75)
self.rows_per_shard = self.shard_container_threshold // 2
def percent_of_threshold(self, val):
return int(config_percent_value(val) * self.shard_container_threshold)
class ContainerSharder(ContainerReplicator): DEFAULT_SHARDER_CONF = vars(ContainerSharderConf())
class ContainerSharder(ContainerSharderConf, ContainerReplicator):
"""Shards containers.""" """Shards containers."""
def __init__(self, conf, logger=None): def __init__(self, conf, logger=None):
logger = logger or get_logger(conf, log_route='container-sharder') logger = logger or get_logger(conf, log_route='container-sharder')
super(ContainerSharder, self).__init__(conf, logger=logger) ContainerReplicator.__init__(self, conf, logger=logger)
ContainerSharderConf.__init__(self, conf)
if conf.get('auto_create_account_prefix'): if conf.get('auto_create_account_prefix'):
self.logger.warning('Option auto_create_account_prefix is ' self.logger.warning('Option auto_create_account_prefix is '
'deprecated. Configure ' 'deprecated. Configure '
@ -598,37 +649,8 @@ class ContainerSharder(ContainerReplicator):
else: else:
auto_create_account_prefix = AUTO_CREATE_ACCOUNT_PREFIX auto_create_account_prefix = AUTO_CREATE_ACCOUNT_PREFIX
self.shards_account_prefix = (auto_create_account_prefix + 'shards_') self.shards_account_prefix = (auto_create_account_prefix + 'shards_')
self.shard_shrink_point = config_percent_value(
conf.get('shard_shrink_point', DEFAULT_SHARD_SHRINK_POINT))
self.shrink_merge_point = config_percent_value(
conf.get('shard_shrink_merge_point', DEFAULT_SHARD_MERGE_POINT))
self.shard_container_threshold = config_positive_int_value(
conf.get('shard_container_threshold',
DEFAULT_SHARD_CONTAINER_THRESHOLD))
self.shrink_size = (self.shard_container_threshold *
self.shard_shrink_point)
self.merge_size = (self.shard_container_threshold *
self.shrink_merge_point)
self.split_size = self.shard_container_threshold // 2
self.scanner_batch_size = config_positive_int_value(
conf.get('shard_scanner_batch_size', 10))
self.cleave_batch_size = config_positive_int_value(
conf.get('cleave_batch_size', 2))
self.cleave_row_batch_size = config_positive_int_value(
conf.get('cleave_row_batch_size', 10000))
self.max_shrinking = int(conf.get('max_shrinking',
DEFAULT_MAX_SHRINKING))
self.max_expanding = int(conf.get('max_expanding',
DEFAULT_MAX_EXPANDING))
self.auto_shard = config_true_value(conf.get('auto_shard', False))
self.sharding_candidates = [] self.sharding_candidates = []
self.shrinking_candidates = [] self.shrinking_candidates = []
self.recon_candidates_limit = int(
conf.get('recon_candidates_limit', 5))
self.recon_sharded_timeout = int(
conf.get('recon_sharded_timeout', 43200))
self.broker_timeout = config_positive_int_value(
conf.get('broker_timeout', 60))
replica_count = self.ring.replica_count replica_count = self.ring.replica_count
quorum = quorum_size(replica_count) quorum = quorum_size(replica_count)
self.shard_replication_quorum = config_auto_int_value( self.shard_replication_quorum = config_auto_int_value(
@ -650,7 +672,6 @@ class ContainerSharder(ContainerReplicator):
self.existing_shard_replication_quorum = replica_count self.existing_shard_replication_quorum = replica_count
# internal client # internal client
self.conn_timeout = float(conf.get('conn_timeout', 5))
request_tries = config_positive_int_value( request_tries = config_positive_int_value(
conf.get('request_tries', 3)) conf.get('request_tries', 3))
internal_client_conf_path = conf.get('internal_client_conf_path', internal_client_conf_path = conf.get('internal_client_conf_path',
@ -729,7 +750,7 @@ class ContainerSharder(ContainerReplicator):
def _identify_shrinking_candidate(self, broker, node): def _identify_shrinking_candidate(self, broker, node):
sequences = find_compactible_shard_sequences( sequences = find_compactible_shard_sequences(
broker, self.shrink_size, self.merge_size, broker, self.shrink_threshold, self.expansion_limit,
self.max_shrinking, self.max_expanding) self.max_shrinking, self.max_expanding)
# compactible_ranges are all apart from final acceptor in each sequence # compactible_ranges are all apart from final acceptor in each sequence
compactible_ranges = sum(len(seq) - 1 for seq in sequences) compactible_ranges = sum(len(seq) - 1 for seq in sequences)
@ -1452,7 +1473,7 @@ class ContainerSharder(ContainerReplicator):
start = time.time() start = time.time()
shard_data, last_found = broker.find_shard_ranges( shard_data, last_found = broker.find_shard_ranges(
self.split_size, limit=self.scanner_batch_size, self.rows_per_shard, limit=self.shard_scanner_batch_size,
existing_ranges=shard_ranges) existing_ranges=shard_ranges)
elapsed = time.time() - start elapsed = time.time() - start
@ -1815,8 +1836,8 @@ class ContainerSharder(ContainerReplicator):
return return
compactible_sequences = find_compactible_shard_sequences( compactible_sequences = find_compactible_shard_sequences(
broker, self.shrink_size, self.merge_size, self.max_shrinking, broker, self.shrink_threshold, self.expansion_limit,
self.max_expanding, include_shrinking=True) self.max_shrinking, self.max_expanding, include_shrinking=True)
self.logger.debug('Found %s compactible sequences of length(s) %s' % self.logger.debug('Found %s compactible sequences of length(s) %s' %
(len(compactible_sequences), (len(compactible_sequences),
[len(s) for s in compactible_sequences])) [len(s) for s in compactible_sequences]))

View File

@ -203,7 +203,7 @@ class TestManageShardRanges(unittest.TestCase):
max_expanding=-1, max_expanding=-1,
max_shrinking=1, max_shrinking=1,
shrink_threshold=100000, shrink_threshold=100000,
expansion_limit=500000, expansion_limit=750000,
yes=False, yes=False,
dry_run=False) dry_run=False)
mocked.assert_called_once_with(mock.ANY, expected) mocked.assert_called_once_with(mock.ANY, expected)
@ -283,6 +283,11 @@ class TestManageShardRanges(unittest.TestCase):
dry_run=False) dry_run=False)
mocked.assert_called_once_with(mock.ANY, expected) mocked.assert_called_once_with(mock.ANY, expected)
def test_conf_file_invalid(self):
db_file = os.path.join(self.testdir, 'hash.db')
broker = ContainerBroker(db_file, account='a', container='c')
broker.initialize()
# conf file - invalid value for shard_container_threshold # conf file - invalid value for shard_container_threshold
conf = """ conf = """
[container-sharder] [container-sharder]
@ -303,8 +308,12 @@ class TestManageShardRanges(unittest.TestCase):
self.assertEqual(2, ret) self.assertEqual(2, ret)
err_lines = err.getvalue().split('\n') err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Error loading config file') self.assert_starts_with(err_lines[0], 'Error loading config file')
self.assertIn('shard_container_threshold', err_lines[0])
# conf file - cannot open conf file def test_conf_file_does_not_exist(self):
db_file = os.path.join(self.testdir, 'hash.db')
broker = ContainerBroker(db_file, account='a', container='c')
broker.initialize()
conf_file = os.path.join(self.testdir, 'missing_sharder.conf') conf_file = os.path.join(self.testdir, 'missing_sharder.conf')
out = StringIO() out = StringIO()
err = StringIO() err = StringIO()

View File

@ -38,11 +38,10 @@ from swift.container import replicator
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \ from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \
SHARDED, DATADIR SHARDED, DATADIR
from swift.container.sharder import ContainerSharder, sharding_enabled, \ from swift.container.sharder import ContainerSharder, sharding_enabled, \
CleavingContext, DEFAULT_SHARD_SHRINK_POINT, \ CleavingContext, DEFAULT_SHARDER_CONF, finalize_shrinking, \
DEFAULT_SHARD_CONTAINER_THRESHOLD, finalize_shrinking, \
find_shrinking_candidates, process_compactible_shard_sequences, \ find_shrinking_candidates, process_compactible_shard_sequences, \
find_compactible_shard_sequences, is_shrinking_candidate, \ find_compactible_shard_sequences, is_shrinking_candidate, \
is_sharding_candidate, find_paths, rank_paths is_sharding_candidate, find_paths, rank_paths, ContainerSharderConf
from swift.common.utils import ShardRange, Timestamp, hash_path, \ from swift.common.utils import ShardRange, Timestamp, hash_path, \
encode_timestamps, parse_db_filename, quorum_size, Everything, md5 encode_timestamps, parse_db_filename, quorum_size, Everything, md5
from test import annotate_failure from test import annotate_failure
@ -152,6 +151,7 @@ class TestSharder(BaseTestSharder):
(k, v, getattr(sharder, k))) (k, v, getattr(sharder, k)))
return sharder, mock_ic return sharder, mock_ic
# defaults
expected = { expected = {
'mount_check': True, 'bind_ip': '0.0.0.0', 'port': 6201, 'mount_check': True, 'bind_ip': '0.0.0.0', 'port': 6201,
'per_diff': 1000, 'max_diffs': 100, 'interval': 30, 'per_diff': 1000, 'max_diffs': 100, 'interval': 30,
@ -161,12 +161,12 @@ class TestSharder(BaseTestSharder):
'rsync_compress': False, 'rsync_compress': False,
'rsync_module': '{replication_ip}::container', 'rsync_module': '{replication_ip}::container',
'reclaim_age': 86400 * 7, 'reclaim_age': 86400 * 7,
'shard_shrink_point': 0.10,
'shrink_merge_point': 0.75,
'shard_container_threshold': 1000000, 'shard_container_threshold': 1000000,
'split_size': 500000, 'rows_per_shard': 500000,
'shrink_threshold': 100000,
'expansion_limit': 750000,
'cleave_batch_size': 2, 'cleave_batch_size': 2,
'scanner_batch_size': 10, 'shard_scanner_batch_size': 10,
'rcache': '/var/cache/swift/container.recon', 'rcache': '/var/cache/swift/container.recon',
'shards_account_prefix': '.shards_', 'shards_account_prefix': '.shards_',
'auto_shard': False, 'auto_shard': False,
@ -185,6 +185,7 @@ class TestSharder(BaseTestSharder):
allow_modify_pipeline=False, allow_modify_pipeline=False,
use_replication_network=True) use_replication_network=True)
# non-default
conf = { conf = {
'mount_check': False, 'bind_ip': '10.11.12.13', 'bind_port': 62010, 'mount_check': False, 'bind_ip': '10.11.12.13', 'bind_port': 62010,
'per_diff': 2000, 'max_diffs': 200, 'interval': 60, 'per_diff': 2000, 'max_diffs': 200, 'interval': 60,
@ -209,7 +210,8 @@ class TestSharder(BaseTestSharder):
'shard_replication_quorum': 1, 'shard_replication_quorum': 1,
'existing_shard_replication_quorum': 0, 'existing_shard_replication_quorum': 0,
'max_shrinking': 5, 'max_shrinking': 5,
'max_expanding': 4 'max_expanding': 4,
'rows_per_shard': 13, # should be ignored - not configurable
} }
expected = { expected = {
'mount_check': False, 'bind_ip': '10.11.12.13', 'port': 62010, 'mount_check': False, 'bind_ip': '10.11.12.13', 'port': 62010,
@ -220,12 +222,12 @@ class TestSharder(BaseTestSharder):
'rsync_compress': True, 'rsync_compress': True,
'rsync_module': '{replication_ip}::container_sda', 'rsync_module': '{replication_ip}::container_sda',
'reclaim_age': 86400 * 14, 'reclaim_age': 86400 * 14,
'shard_shrink_point': 0.35,
'shrink_merge_point': 0.85,
'shard_container_threshold': 20000000, 'shard_container_threshold': 20000000,
'split_size': 10000000, 'rows_per_shard': 10000000,
'shrink_threshold': 7000000,
'expansion_limit': 17000000,
'cleave_batch_size': 4, 'cleave_batch_size': 4,
'scanner_batch_size': 8, 'shard_scanner_batch_size': 8,
'rcache': '/var/cache/swift-alt/container.recon', 'rcache': '/var/cache/swift-alt/container.recon',
'shards_account_prefix': '...shards_', 'shards_account_prefix': '...shards_',
'auto_shard': True, 'auto_shard': True,
@ -4090,7 +4092,7 @@ class TestSharder(BaseTestSharder):
def _check_old_style_find_shard_ranges_none_found(self, broker, objects): def _check_old_style_find_shard_ranges_none_found(self, broker, objects):
with self._mock_sharder() as sharder: with self._mock_sharder() as sharder:
num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertGreater(sharder.split_size, len(objects)) self.assertGreater(sharder.rows_per_shard, len(objects))
self.assertEqual(0, num_found) self.assertEqual(0, num_found)
self.assertFalse(broker.get_shard_ranges()) self.assertFalse(broker.get_shard_ranges())
expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, expected_stats = {'attempted': 1, 'success': 0, 'failure': 1,
@ -4102,7 +4104,7 @@ class TestSharder(BaseTestSharder):
with self._mock_sharder( with self._mock_sharder(
conf={'shard_container_threshold': 200}) as sharder: conf={'shard_container_threshold': 200}) as sharder:
num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertEqual(sharder.split_size, len(objects)) self.assertEqual(sharder.rows_per_shard, len(objects))
self.assertEqual(0, num_found) self.assertEqual(0, num_found)
self.assertFalse(broker.get_shard_ranges()) self.assertFalse(broker.get_shard_ranges())
expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, expected_stats = {'attempted': 1, 'success': 0, 'failure': 1,
@ -4143,7 +4145,7 @@ class TestSharder(BaseTestSharder):
) as sharder: ) as sharder:
with mock_timestamp_now() as now: with mock_timestamp_now() as now:
num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertEqual(99, sharder.split_size) self.assertEqual(99, sharder.rows_per_shard)
self.assertEqual(2, num_found) self.assertEqual(2, num_found)
check_ranges() check_ranges()
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0, expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
@ -4190,7 +4192,7 @@ class TestSharder(BaseTestSharder):
def _check_find_shard_ranges_none_found(self, broker, objects): def _check_find_shard_ranges_none_found(self, broker, objects):
with self._mock_sharder() as sharder: with self._mock_sharder() as sharder:
num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertGreater(sharder.split_size, len(objects)) self.assertGreater(sharder.rows_per_shard, len(objects))
self.assertEqual(0, num_found) self.assertEqual(0, num_found)
self.assertFalse(broker.get_shard_ranges()) self.assertFalse(broker.get_shard_ranges())
expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, expected_stats = {'attempted': 1, 'success': 0, 'failure': 1,
@ -4202,7 +4204,7 @@ class TestSharder(BaseTestSharder):
with self._mock_sharder( with self._mock_sharder(
conf={'shard_container_threshold': 200}) as sharder: conf={'shard_container_threshold': 200}) as sharder:
num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertEqual(sharder.split_size, len(objects)) self.assertEqual(sharder.rows_per_shard, len(objects))
self.assertEqual(0, num_found) self.assertEqual(0, num_found)
self.assertFalse(broker.get_shard_ranges()) self.assertFalse(broker.get_shard_ranges())
expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, expected_stats = {'attempted': 1, 'success': 0, 'failure': 1,
@ -4242,7 +4244,7 @@ class TestSharder(BaseTestSharder):
) as sharder: ) as sharder:
with mock_timestamp_now() as now: with mock_timestamp_now() as now:
num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertEqual(99, sharder.split_size) self.assertEqual(99, sharder.rows_per_shard)
self.assertEqual(2, num_found) self.assertEqual(2, num_found)
check_ranges() check_ranges()
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0, expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
@ -4293,7 +4295,7 @@ class TestSharder(BaseTestSharder):
'shard_scanner_batch_size': 2}) as sharder: 'shard_scanner_batch_size': 2}) as sharder:
with mock_timestamp_now(now): with mock_timestamp_now(now):
num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertEqual(45, sharder.split_size) self.assertEqual(45, sharder.rows_per_shard)
self.assertEqual(2, num_found) self.assertEqual(2, num_found)
self.assertEqual(2, len(broker.get_shard_ranges())) self.assertEqual(2, len(broker.get_shard_ranges()))
self._assert_shard_ranges_equal(expected_ranges[:2], self._assert_shard_ranges_equal(expected_ranges[:2],
@ -5508,8 +5510,7 @@ class TestSharder(BaseTestSharder):
broker = self._make_broker() broker = self._make_broker()
broker.enable_sharding(next(self.ts_iter)) broker.enable_sharding(next(self.ts_iter))
shard_bounds = (('', 'here'), ('here', 'there'), ('there', '')) shard_bounds = (('', 'here'), ('here', 'there'), ('there', ''))
size = (DEFAULT_SHARD_SHRINK_POINT * size = (DEFAULT_SHARDER_CONF['shrink_threshold'])
DEFAULT_SHARD_CONTAINER_THRESHOLD / 100)
# all shard ranges too big to shrink # all shard ranges too big to shrink
shard_ranges = self._make_shard_ranges( shard_ranges = self._make_shard_ranges(
@ -5615,8 +5616,7 @@ class TestSharder(BaseTestSharder):
broker.enable_sharding(next(self.ts_iter)) broker.enable_sharding(next(self.ts_iter))
shard_bounds = (('', 'a'), ('a', 'b'), ('b', 'c'), shard_bounds = (('', 'a'), ('a', 'b'), ('b', 'c'),
('c', 'd'), ('d', 'e'), ('e', '')) ('c', 'd'), ('d', 'e'), ('e', ''))
size = (DEFAULT_SHARD_SHRINK_POINT * size = (DEFAULT_SHARDER_CONF['shrink_threshold'])
DEFAULT_SHARD_CONTAINER_THRESHOLD / 100)
shard_ranges = self._make_shard_ranges( shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.ACTIVE, object_count=size) shard_bounds, state=ShardRange.ACTIVE, object_count=size)
own_sr = broker.get_own_shard_range() own_sr = broker.get_own_shard_range()
@ -5834,7 +5834,8 @@ class TestSharder(BaseTestSharder):
brokers.append(broker) brokers.append(broker)
shard_ranges.append(self._make_shard_ranges( shard_ranges.append(self._make_shard_ranges(
shard_bounds, state=ShardRange.ACTIVE, shard_bounds, state=ShardRange.ACTIVE,
object_count=(DEFAULT_SHARD_CONTAINER_THRESHOLD / 2), object_count=(
DEFAULT_SHARDER_CONF['shard_container_threshold'] / 2),
timestamp=next(self.ts_iter))) timestamp=next(self.ts_iter)))
# we want c2 to have 2 shrink pairs # we want c2 to have 2 shrink pairs
@ -5847,7 +5848,7 @@ class TestSharder(BaseTestSharder):
# we want c1 to have the same, but one can't be shrunk # we want c1 to have the same, but one can't be shrunk
shard_ranges[C1][1].object_count = 0 shard_ranges[C1][1].object_count = 0
shard_ranges[C1][2].object_count = \ shard_ranges[C1][2].object_count = \
DEFAULT_SHARD_CONTAINER_THRESHOLD - 1 DEFAULT_SHARDER_CONF['shard_container_threshold'] - 1
shard_ranges[C1][3].object_count = 0 shard_ranges[C1][3].object_count = 0
brokers[C1].merge_shard_ranges(shard_ranges[C1]) brokers[C1].merge_shard_ranges(shard_ranges[C1])
brokers[C1].set_sharding_state() brokers[C1].set_sharding_state()
@ -5925,7 +5926,8 @@ class TestSharder(BaseTestSharder):
# and no longer appears in stats # and no longer appears in stats
def shrink_actionable_ranges(broker): def shrink_actionable_ranges(broker):
compactible = find_compactible_shard_sequences( compactible = find_compactible_shard_sequences(
broker, sharder.shrink_size, sharder.merge_size, 1, -1) broker, sharder.shrink_threshold, sharder.expansion_limit,
1, -1)
self.assertNotEqual([], compactible) self.assertNotEqual([], compactible)
with mock_timestamp_now(next(self.ts_iter)): with mock_timestamp_now(next(self.ts_iter)):
process_compactible_shard_sequences(broker, compactible) process_compactible_shard_sequences(broker, compactible)
@ -6494,8 +6496,7 @@ class TestSharderFunctions(BaseTestSharder):
def test_find_shrinking_candidates(self): def test_find_shrinking_candidates(self):
broker = self._make_broker() broker = self._make_broker()
shard_bounds = (('', 'a'), ('a', 'b'), ('b', 'c'), ('c', 'd')) shard_bounds = (('', 'a'), ('a', 'b'), ('b', 'c'), ('c', 'd'))
threshold = (DEFAULT_SHARD_SHRINK_POINT * threshold = (DEFAULT_SHARDER_CONF['shrink_threshold'])
DEFAULT_SHARD_CONTAINER_THRESHOLD / 100)
shard_ranges = self._make_shard_ranges( shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.ACTIVE, object_count=threshold, shard_bounds, state=ShardRange.ACTIVE, object_count=threshold,
timestamp=next(self.ts_iter)) timestamp=next(self.ts_iter))
@ -7185,3 +7186,70 @@ class TestSharderFunctions(BaseTestSharder):
], ],
rank_paths(paths, own_sr) rank_paths(paths, own_sr)
) )
class TestContainerSharderConf(unittest.TestCase):
def test_default(self):
expected = {'shard_container_threshold': 1000000,
'max_shrinking': 1,
'max_expanding': -1,
'shard_scanner_batch_size': 10,
'cleave_batch_size': 2,
'cleave_row_batch_size': 10000,
'broker_timeout': 60,
'recon_candidates_limit': 5,
'recon_sharded_timeout': 43200,
'conn_timeout': 5.0,
'auto_shard': False,
'shrink_threshold': 100000,
'expansion_limit': 750000,
'rows_per_shard': 500000}
self.assertEqual(expected, vars(ContainerSharderConf()))
self.assertEqual(expected, vars(ContainerSharderConf(None)))
self.assertEqual(expected, DEFAULT_SHARDER_CONF)
def test_conf(self):
base_conf = {'shard_container_threshold': 2000000,
'max_shrinking': 2,
'max_expanding': 3,
'shard_scanner_batch_size': 11,
'cleave_batch_size': 4,
'cleave_row_batch_size': 50000,
'broker_timeout': 61,
'recon_candidates_limit': 6,
'recon_sharded_timeout': 43201,
'conn_timeout': 5.1,
'auto_shard': True}
percent_conf = {'shard_shrink_point': 9,
'shard_shrink_merge_point': 71}
expected = dict(base_conf, rows_per_shard=1000000,
shrink_threshold=180000, expansion_limit=1420000)
conf = dict(base_conf)
conf.update(percent_conf)
self.assertEqual(expected, vars(ContainerSharderConf(conf)))
def test_bad_values(self):
not_positive_int = [0, -1, 'bad']
not_int = not_float = ['bad']
not_percent = ['bad', -1, 101, -0.1, 100.1]
bad = {'shard_container_threshold': not_positive_int,
'max_shrinking': not_int,
'max_expanding': not_int,
'shard_scanner_batch_size': not_positive_int,
'cleave_batch_size': not_positive_int,
'cleave_row_batch_size': not_positive_int,
'broker_timeout': not_positive_int,
'recon_candidates_limit': not_int,
'recon_sharded_timeout': not_int,
'conn_timeout': not_float,
# 'auto_shard': anything can be passed to config_true_value
'shard_shrink_point': not_percent,
'shard_shrink_merge_point': not_percent}
for key, bad_values in bad.items():
for bad_value in bad_values:
with self.assertRaises(
ValueError, msg='{%s : %s}' % (key, bad_value)) as cm:
ContainerSharderConf({key: bad_value})
self.assertIn('Error setting %s' % key, str(cm.exception))