sharder: avoid small tail shards

A container is typically sharded when it has grown to have an object
count of shard_container_threshold + N, where N <<
shard_container_threshold.  If sharded using the default
rows_per_shard of shard_container_threshold / 2 then this would
previously result in 3 shards: the tail shard would typically be
small, having only N rows. This behaviour caused more shards to be
generated than desirable.

This patch adds a minimum-shard-size option to
swift-manage-shard-ranges, and a corresponding option in the sharder
config, which can be used to avoid small tail shards. If set to
greater than one then the final shard range may be extended to more
than rows_per_shard in order to avoid a further shard range with less
than minimum-shard-size rows. In the example given, if
minimum-shard-size is set to M > N then the container would shard into
two shards having rows_per_shard rows and rows_per_shard + N
respectively.

The default value for minimum-shard-size is rows_per_shard // 5. If
all options have their default values this results in
minimum-shard-size being 100000.

Closes-Bug: #1928370
Co-Authored-By: Matthew Oliver <matt@oliver.net.au>
Change-Id: I3baa278c6eaf488e3f390a936eebbec13f2c3e55
This commit is contained in:
Alistair Coles 2021-06-03 13:16:45 +01:00
parent a87317db6e
commit 2a593174a5
9 changed files with 236 additions and 36 deletions

View File

@ -329,6 +329,18 @@ rows_per_shard 500000 This defines the initial
containers. The default
is shard_container_threshold // 2.
minimum_shard_size 100000 Minimum size of the final
shard range. If this is
greater than one then the
final shard range may be
extended to more than
rows_per_shard in order
to avoid a further shard
range with less than
minimum_shard_size rows.
The default value is
rows_per_shard // 5.
shrink_threshold This defines the
object count below which
a 'donor' shard container

View File

@ -374,6 +374,12 @@ use = egg:swift#xprofile
# default is shard_container_threshold // 2
# rows_per_shard = 500000
#
# Minimum size of the final shard range. If this is greater than one then the
# final shard range may be extended to more than rows_per_shard in order to
# avoid a further shard range with less than minimum_shard_size rows. The
# default value is rows_per_shard // 5.
# minimum_shard_size = 100000
#
# When auto-sharding is enabled shrink_threshold defines the object count
# below which a 'donor' shard container will be considered for shrinking into
# another 'acceptor' shard container. The default is determined by

View File

@ -301,7 +301,8 @@ def _find_ranges(broker, args, status_file=None):
start = last_report = time.time()
limit = 5 if status_file else -1
shard_data, last_found = broker.find_shard_ranges(
args.rows_per_shard, limit=limit)
args.rows_per_shard, limit=limit,
minimum_shard_size=args.minimum_shard_size)
if shard_data:
while not last_found:
if last_report + 10 < time.time():
@ -311,7 +312,8 @@ def _find_ranges(broker, args, status_file=None):
# prefix doesn't matter since we aren't persisting it
found_ranges = make_shard_ranges(broker, shard_data, '.shards_')
more_shard_data, last_found = broker.find_shard_ranges(
args.rows_per_shard, existing_ranges=found_ranges, limit=5)
args.rows_per_shard, existing_ranges=found_ranges, limit=5,
minimum_shard_size=args.minimum_shard_size)
shard_data.extend(more_shard_data)
return shard_data, time.time() - start
@ -709,6 +711,13 @@ def _add_find_args(parser):
'Default is half of the shard_container_threshold value if that is '
'given in a conf file specified with --config, otherwise %s.'
% DEFAULT_SHARDER_CONF['rows_per_shard'])
parser.add_argument(
'--minimum-shard-size', type=_positive_int,
default=USE_SHARDER_DEFAULT,
help='Minimum size of the final shard range. If this is greater than '
'one then the final shard range may be extended to more than '
'rows_per_shard in order to avoid a further shard range with less '
'than minimum-shard-size rows.')
def _add_replace_args(parser):
@ -906,18 +915,18 @@ def main(cli_args=None):
return EXIT_INVALID_ARGS
try:
# load values from conf file or sharder defaults
conf = {}
if args.conf_file:
conf = readconf(args.conf_file, 'container-sharder')
conf.update(dict((k, v) for k, v in vars(args).items()
if v != USE_SHARDER_DEFAULT))
conf_args = ContainerSharderConf(conf)
except (OSError, IOError) as exc:
print('Error opening config file %s: %s' % (args.conf_file, exc),
file=sys.stderr)
return EXIT_ERROR
except (TypeError, ValueError) as exc:
print('Error loading config file %s: %s' % (args.conf_file, exc),
file=sys.stderr)
print('Error loading config: %s' % exc, file=sys.stderr)
return EXIT_INVALID_ARGS
for k, v in vars(args).items():

View File

@ -2234,7 +2234,8 @@ class ContainerBroker(DatabaseBroker):
row = connection.execute(sql, args).fetchone()
return row['name'] if row else None
def find_shard_ranges(self, shard_size, limit=-1, existing_ranges=None):
def find_shard_ranges(self, shard_size, limit=-1, existing_ranges=None,
minimum_shard_size=1):
"""
Scans the container db for shard ranges. Scanning will start at the
upper bound of the any ``existing_ranges`` that are given, otherwise
@ -2253,6 +2254,10 @@ class ContainerBroker(DatabaseBroker):
given, this list should be sorted in order of upper bounds; the
scan for new shard ranges will start at the upper bound of the last
existing ShardRange.
:param minimum_shard_size: Minimum size of the final shard range. If
this is greater than one then the final shard range may be extended
to more than shard_size in order to avoid a further shard range
with less minimum_shard_size rows.
:return: a tuple; the first value in the tuple is a list of
dicts each having keys {'index', 'lower', 'upper', 'object_count'}
in order of ascending 'upper'; the second value in the tuple is a
@ -2260,8 +2265,9 @@ class ContainerBroker(DatabaseBroker):
otherwise.
"""
existing_ranges = existing_ranges or []
minimum_shard_size = max(minimum_shard_size, 1)
object_count = self.get_info().get('object_count', 0)
if shard_size >= object_count:
if shard_size + minimum_shard_size > object_count:
# container not big enough to shard
return [], False
@ -2292,9 +2298,10 @@ class ContainerBroker(DatabaseBroker):
sub_broker = self.get_brokers()[0]
index = len(existing_ranges)
while limit is None or limit < 0 or len(found_ranges) < limit:
if progress + shard_size >= object_count:
# next shard point is at or beyond final object name so don't
# bother with db query
if progress + shard_size + minimum_shard_size > object_count:
# next shard point is within minimum_size rows of the final
# object name, or beyond it, so don't bother with db query.
# This shard will have <= shard_size + (minimum_size - 1) rows.
next_shard_upper = None
else:
try:

View File

@ -630,10 +630,23 @@ class ContainerSharderConf(object):
self.rows_per_shard = get_val(
'rows_per_shard', config_positive_int_value,
max(self.shard_container_threshold // 2, 1))
self.minimum_shard_size = get_val(
'minimum_shard_size', config_positive_int_value,
max(self.rows_per_shard // 5, 1))
self.validate_conf()
def percent_of_threshold(self, val):
return int(config_percent_value(val) * self.shard_container_threshold)
def validate_conf(self):
keys = ('minimum_shard_size', 'rows_per_shard')
vals = [getattr(self, key) for key in keys]
if not vals[0] <= vals[1]:
raise ValueError(
'%s (%d) must be less than %s (%d)'
% (keys[0], vals[0], keys[1], vals[1]))
DEFAULT_SHARDER_CONF = vars(ContainerSharderConf())
@ -1482,7 +1495,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
start = time.time()
shard_data, last_found = broker.find_shard_ranges(
self.rows_per_shard, limit=self.shard_scanner_batch_size,
existing_ranges=shard_ranges)
existing_ranges=shard_ranges,
minimum_shard_size=self.minimum_shard_size)
elapsed = time.time() - start
if not shard_data:

View File

@ -2789,7 +2789,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
self.sharders.once(**kwargs)
def test_manage_shard_ranges(self):
obj_names = self._make_object_names(4)
obj_names = self._make_object_names(7)
self.put_objects(obj_names)
client.post_container(self.url, self.admin_token, self.container_name,
@ -2807,7 +2807,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
self.assert_subprocess_success([
'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, self.brain.nodes[0]),
'find_and_replace', '2', '--enable'])
'find_and_replace', '3', '--enable', '--minimum-shard-size', '2'])
self.assert_container_state(self.brain.nodes[0], 'unsharded', 2)
# "Run container-replicator to replicate them to other nodes."

View File

@ -142,6 +142,7 @@ class TestManageShardRanges(unittest.TestCase):
rows_per_shard = 600
max_shrinking = 33
max_expanding = 31
minimum_shard_size = 88
"""
conf_file = os.path.join(self.testdir, 'sharder.conf')
@ -159,7 +160,8 @@ class TestManageShardRanges(unittest.TestCase):
rows_per_shard=500000,
subcommand='find',
force_commits=False,
verbose=0)
verbose=0,
minimum_shard_size=100000)
mocked.assert_called_once_with(mock.ANY, expected)
# conf file
@ -173,13 +175,15 @@ class TestManageShardRanges(unittest.TestCase):
rows_per_shard=600,
subcommand='find',
force_commits=False,
verbose=0)
verbose=0,
minimum_shard_size=88)
mocked.assert_called_once_with(mock.ANY, expected)
# cli options override conf file
with mock.patch('swift.cli.manage_shard_ranges.find_ranges',
return_value=0) as mocked:
ret = main([db_file, '--config', conf_file, 'find', '12345'])
ret = main([db_file, '--config', conf_file, 'find', '12345',
'--minimum-shard-size', '99'])
self.assertEqual(0, ret)
expected = Namespace(conf_file=conf_file,
path_to_file=mock.ANY,
@ -187,7 +191,8 @@ class TestManageShardRanges(unittest.TestCase):
rows_per_shard=12345,
subcommand='find',
force_commits=False,
verbose=0)
verbose=0,
minimum_shard_size=99)
mocked.assert_called_once_with(mock.ANY, expected)
# default values
@ -377,7 +382,7 @@ class TestManageShardRanges(unittest.TestCase):
ret = main([db_file, '--config', conf_file, 'compact'])
self.assertEqual(2, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Error loading config file')
self.assert_starts_with(err_lines[0], 'Error loading config')
self.assertIn('shard_container_threshold', err_lines[0])
def test_conf_file_invalid_deprecated_options(self):
@ -403,7 +408,7 @@ class TestManageShardRanges(unittest.TestCase):
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, '--config', conf_file, 'compact'])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Error loading config file')
self.assert_starts_with(err_lines[0], 'Error loading config')
self.assertIn('shard_shrink_point', err_lines[0])
def test_conf_file_does_not_exist(self):
@ -457,7 +462,7 @@ class TestManageShardRanges(unittest.TestCase):
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([db_file, 'find', '99'])
ret = main([db_file, 'find', '99', '--minimum-shard-size', '1'])
self.assertEqual(0, ret)
self.assert_formatted_json(out.getvalue(), [
{'index': 0, 'lower': '', 'upper': 'obj98', 'object_count': 99},
@ -496,6 +501,91 @@ class TestManageShardRanges(unittest.TestCase):
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 10 ranges in ')
def test_find_shard_ranges_with_minimum_size(self):
db_file = os.path.join(self.testdir, 'hash.db')
broker = ContainerBroker(db_file)
broker.account = 'a'
broker.container = 'c'
broker.initialize()
ts = utils.Timestamp.now()
# with 105 objects and rows_per_shard = 50 there is the potential for a
# tail shard of size 5
broker.merge_items([
{'name': 'obj%03d' % i, 'created_at': ts.internal, 'size': 0,
'content_type': 'application/octet-stream', 'etag': 'not-really',
'deleted': 0, 'storage_policy_index': 0,
'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
for i in range(105)])
def assert_tail_shard_not_extended(minimum):
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([db_file, 'find', '50',
'--minimum-shard-size', str(minimum)])
self.assertEqual(0, ret)
self.assert_formatted_json(out.getvalue(), [
{'index': 0, 'lower': '', 'upper': 'obj049',
'object_count': 50},
{'index': 1, 'lower': 'obj049', 'upper': 'obj099',
'object_count': 50},
{'index': 2, 'lower': 'obj099', 'upper': '',
'object_count': 5},
])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 3 ranges in ')
# tail shard size > minimum
assert_tail_shard_not_extended(1)
assert_tail_shard_not_extended(4)
assert_tail_shard_not_extended(5)
def assert_tail_shard_extended(minimum):
out = StringIO()
err = StringIO()
if minimum is not None:
extra_args = ['--minimum-shard-size', str(minimum)]
else:
extra_args = []
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([db_file, 'find', '50'] + extra_args)
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_formatted_json(out.getvalue(), [
{'index': 0, 'lower': '', 'upper': 'obj049',
'object_count': 50},
{'index': 1, 'lower': 'obj049', 'upper': '',
'object_count': 55},
])
self.assert_starts_with(err_lines[1], 'Found 2 ranges in ')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
# sanity check - no minimum specified, defaults to rows_per_shard/5
assert_tail_shard_extended(None)
assert_tail_shard_extended(6)
assert_tail_shard_extended(50)
def assert_too_large_value_handled(minimum):
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([db_file, 'find', '50',
'--minimum-shard-size', str(minimum)])
self.assertEqual(2, ret)
self.assertEqual(
'Error loading config: minimum_shard_size (%s) must be less '
'than rows_per_shard (50)' % minimum, err.getvalue().strip())
assert_too_large_value_handled(51)
assert_too_large_value_handled(52)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
with self.assertRaises(SystemExit):
main([db_file, 'find', '50', '--minimum-shard-size', '-1'])
def test_info(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':

View File

@ -4340,7 +4340,7 @@ class TestContainerBroker(unittest.TestCase):
container_name = 'test_container'
def do_test(expected_bounds, expected_last_found, shard_size, limit,
start_index=0, existing=None):
start_index=0, existing=None, minimum_size=1):
# expected_bounds is a list of tuples (lower, upper, object_count)
# build expected shard ranges
expected_shard_ranges = [
@ -4352,7 +4352,8 @@ class TestContainerBroker(unittest.TestCase):
with mock.patch('swift.common.utils.time.time',
return_value=float(ts_now.normal)):
ranges, last_found = broker.find_shard_ranges(
shard_size, limit=limit, existing_ranges=existing)
shard_size, limit=limit, existing_ranges=existing,
minimum_shard_size=minimum_size)
self.assertEqual(expected_shard_ranges, ranges)
self.assertEqual(expected_last_found, last_found)
@ -4397,6 +4398,20 @@ class TestContainerBroker(unittest.TestCase):
do_test(expected, True, shard_size=4, limit=4)
do_test(expected, True, shard_size=4, limit=-1)
# check use of minimum_shard_size
expected = [(c_lower, 'obj03', 4), ('obj03', 'obj07', 4),
('obj07', c_upper, 2)]
do_test(expected, True, shard_size=4, limit=None, minimum_size=2)
# crazy values ignored...
do_test(expected, True, shard_size=4, limit=None, minimum_size=0)
do_test(expected, True, shard_size=4, limit=None, minimum_size=-1)
# minimum_size > potential final shard
expected = [(c_lower, 'obj03', 4), ('obj03', c_upper, 6)]
do_test(expected, True, shard_size=4, limit=None, minimum_size=3)
# extended shard size >= object_count
do_test([], False, shard_size=6, limit=None, minimum_size=5)
do_test([], False, shard_size=6, limit=None, minimum_size=500)
# increase object count to 11
broker.put_object(
'obj10', next(self.ts).internal, 0, 'text/plain', 'etag')

View File

@ -4222,6 +4222,7 @@ class TestSharder(BaseTestSharder):
broker, objects = self._setup_old_style_find_ranges(
account, cont, lower, upper)
with self._mock_sharder(conf={'shard_container_threshold': 199,
'minimum_shard_size': 1,
'auto_create_account_prefix': '.int_'}
) as sharder:
with mock_timestamp_now() as now:
@ -4237,6 +4238,7 @@ class TestSharder(BaseTestSharder):
# second invocation finds none
with self._mock_sharder(conf={'shard_container_threshold': 199,
'minimum_shard_size': 1,
'auto_create_account_prefix': '.int_'}
) as sharder:
num_found = sharder._find_shard_ranges(broker)
@ -4317,10 +4319,11 @@ class TestSharder(BaseTestSharder):
self._assert_shard_ranges_equal(expected_ranges,
broker.get_shard_ranges())
# first invocation finds both ranges
# first invocation finds both ranges, sizes 99 and 1
broker, objects = self._setup_find_ranges(
account, cont, lower, upper)
with self._mock_sharder(conf={'shard_container_threshold': 199,
'minimum_shard_size': 1,
'auto_create_account_prefix': '.int_'}
) as sharder:
with mock_timestamp_now() as now:
@ -4371,9 +4374,11 @@ class TestSharder(BaseTestSharder):
now, objects[89][0], upper, 10),
]
# first invocation finds 2 ranges
# (third shard range will be > minimum_shard_size)
with self._mock_sharder(
conf={'shard_container_threshold': 90,
'shard_scanner_batch_size': 2}) as sharder:
'shard_scanner_batch_size': 2,
'minimum_shard_size': 10}) as sharder:
with mock_timestamp_now(now):
num_found = sharder._find_shard_ranges(broker)
self.assertEqual(45, sharder.rows_per_shard)
@ -4388,8 +4393,9 @@ class TestSharder(BaseTestSharder):
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
# second invocation finds third shard range
with self._mock_sharder(conf={'shard_container_threshold': 199,
'shard_scanner_batch_size': 2}
with self._mock_sharder(conf={'shard_container_threshold': 90,
'shard_scanner_batch_size': 2,
'minimum_shard_size': 10}
) as sharder:
with mock_timestamp_now(now):
num_found = sharder._find_shard_ranges(broker)
@ -4405,7 +4411,8 @@ class TestSharder(BaseTestSharder):
# third invocation finds none
with self._mock_sharder(conf={'shard_container_threshold': 199,
'shard_scanner_batch_size': 2}
'shard_scanner_batch_size': 2,
'minimum_shard_size': 10}
) as sharder:
sharder._send_shard_ranges = mock.MagicMock(return_value=True)
num_found = sharder._find_shard_ranges(broker)
@ -4425,6 +4432,41 @@ class TestSharder(BaseTestSharder):
def test_find_shard_ranges_finds_three_shard(self):
self._check_find_shard_ranges_finds_three('.shards_a', 'c_', 'l', 'u')
def test_find_shard_ranges_with_minimum_size(self):
cont = 'c_'
lower = 'l'
upper = 'u'
broker, objects = self._setup_find_ranges(
'.shards_a', 'c_', lower, upper)
now = Timestamp.now()
expected_ranges = [
ShardRange(
ShardRange.make_path('.shards_a', 'c', cont, now, 0),
now, lower, objects[44][0], 45),
ShardRange(
ShardRange.make_path('.shards_a', 'c', cont, now, 1),
now, objects[44][0], upper, 55),
]
# first invocation finds 2 ranges - second has been extended to avoid
# final shard range < minimum_size
with self._mock_sharder(
conf={'shard_container_threshold': 90,
'shard_scanner_batch_size': 2,
'minimum_shard_size': 11}) as sharder:
with mock_timestamp_now(now):
num_found = sharder._find_shard_ranges(broker)
self.assertEqual(45, sharder.rows_per_shard)
self.assertEqual(11, sharder.minimum_shard_size)
self.assertEqual(2, num_found)
self.assertEqual(2, len(broker.get_shard_ranges()))
self._assert_shard_ranges_equal(expected_ranges[:2],
broker.get_shard_ranges())
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
'found': 2, 'min_time': mock.ANY,
'max_time': mock.ANY}
stats = self._assert_stats(expected_stats, sharder, 'scanned')
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
def test_sharding_enabled(self):
broker = self._make_broker()
self.assertFalse(sharding_enabled(broker))
@ -5561,13 +5603,14 @@ class TestSharder(BaseTestSharder):
def test_find_and_enable_sharding_candidates_bootstrap(self):
broker = self._make_broker()
with self._mock_sharder(
conf={'shard_container_threshold': 1}) as sharder:
conf={'shard_container_threshold': 2}) as sharder:
sharder._find_and_enable_sharding_candidates(broker)
self.assertEqual(ShardRange.ACTIVE, broker.get_own_shard_range().state)
broker.put_object('obj', next(self.ts_iter).internal, 1, '', '')
self.assertEqual(1, broker.get_info()['object_count'])
broker.put_object('obj1', next(self.ts_iter).internal, 1, '', '')
broker.put_object('obj2', next(self.ts_iter).internal, 1, '', '')
self.assertEqual(2, broker.get_info()['object_count'])
with self._mock_sharder(
conf={'shard_container_threshold': 1}) as sharder:
conf={'shard_container_threshold': 2}) as sharder:
with mock_timestamp_now() as now:
sharder._find_and_enable_sharding_candidates(
broker, [broker.get_own_shard_range()])
@ -5578,7 +5621,7 @@ class TestSharder(BaseTestSharder):
# check idempotency
with self._mock_sharder(
conf={'shard_container_threshold': 1}) as sharder:
conf={'shard_container_threshold': 2}) as sharder:
with mock_timestamp_now():
sharder._find_and_enable_sharding_candidates(
broker, [broker.get_own_shard_range()])
@ -7284,7 +7327,8 @@ class TestContainerSharderConf(unittest.TestCase):
'auto_shard': False,
'shrink_threshold': 100000,
'expansion_limit': 750000,
'rows_per_shard': 500000}
'rows_per_shard': 500000,
'minimum_shard_size': 100000}
self.assertEqual(expected, vars(ContainerSharderConf()))
self.assertEqual(expected, vars(ContainerSharderConf(None)))
self.assertEqual(expected, DEFAULT_SHARDER_CONF)
@ -7303,7 +7347,8 @@ class TestContainerSharderConf(unittest.TestCase):
'auto_shard': True,
'shrink_threshold': 100001,
'expansion_limit': 750001,
'rows_per_shard': 500001}
'rows_per_shard': 500001,
'minimum_shard_size': 20}
expected = dict(conf)
conf.update({'unexpected': 'option'})
self.assertEqual(expected, vars(ContainerSharderConf(conf)))
@ -7319,7 +7364,8 @@ class TestContainerSharderConf(unittest.TestCase):
'recon_candidates_limit': 6,
'recon_sharded_timeout': 43201,
'conn_timeout': 5.1,
'auto_shard': True}
'auto_shard': True,
'minimum_shard_size': 1}
# percent options work
deprecated_conf = {'shard_shrink_point': 9,
@ -7357,7 +7403,8 @@ class TestContainerSharderConf(unittest.TestCase):
'shrink_threshold': not_int,
'expansion_limit': not_int,
'shard_shrink_point': not_percent,
'shard_shrink_merge_point': not_percent}
'shard_shrink_merge_point': not_percent,
'minimum_shard_size': not_positive_int}
for key, bad_values in bad.items():
for bad_value in bad_values: