Merge "sharder: avoid small tail shards"

This commit is contained in:
Zuul 2021-07-08 17:00:52 +00:00 committed by Gerrit Code Review
commit 17489ce7bf
9 changed files with 236 additions and 36 deletions

View File

@ -329,6 +329,18 @@ rows_per_shard 500000 This defines the initial
containers. The default
is shard_container_threshold // 2.
minimum_shard_size 100000 Minimum size of the final
shard range. If this is
greater than one then the
final shard range may be
extended to more than
rows_per_shard in order
to avoid a further shard
range with less than
minimum_shard_size rows.
The default value is
rows_per_shard // 5.
shrink_threshold This defines the
object count below which
a 'donor' shard container

View File

@ -374,6 +374,12 @@ use = egg:swift#xprofile
# default is shard_container_threshold // 2
# rows_per_shard = 500000
#
# Minimum size of the final shard range. If this is greater than one then the
# final shard range may be extended to more than rows_per_shard in order to
# avoid a further shard range with less than minimum_shard_size rows. The
# default value is rows_per_shard // 5.
# minimum_shard_size = 100000
#
# When auto-sharding is enabled shrink_threshold defines the object count
# below which a 'donor' shard container will be considered for shrinking into
# another 'acceptor' shard container. The default is determined by

View File

@ -301,7 +301,8 @@ def _find_ranges(broker, args, status_file=None):
start = last_report = time.time()
limit = 5 if status_file else -1
shard_data, last_found = broker.find_shard_ranges(
args.rows_per_shard, limit=limit)
args.rows_per_shard, limit=limit,
minimum_shard_size=args.minimum_shard_size)
if shard_data:
while not last_found:
if last_report + 10 < time.time():
@ -311,7 +312,8 @@ def _find_ranges(broker, args, status_file=None):
# prefix doesn't matter since we aren't persisting it
found_ranges = make_shard_ranges(broker, shard_data, '.shards_')
more_shard_data, last_found = broker.find_shard_ranges(
args.rows_per_shard, existing_ranges=found_ranges, limit=5)
args.rows_per_shard, existing_ranges=found_ranges, limit=5,
minimum_shard_size=args.minimum_shard_size)
shard_data.extend(more_shard_data)
return shard_data, time.time() - start
@ -709,6 +711,13 @@ def _add_find_args(parser):
'Default is half of the shard_container_threshold value if that is '
'given in a conf file specified with --config, otherwise %s.'
% DEFAULT_SHARDER_CONF['rows_per_shard'])
parser.add_argument(
'--minimum-shard-size', type=_positive_int,
default=USE_SHARDER_DEFAULT,
help='Minimum size of the final shard range. If this is greater than '
'one then the final shard range may be extended to more than '
'rows_per_shard in order to avoid a further shard range with less '
'than minimum-shard-size rows.')
def _add_replace_args(parser):
@ -906,18 +915,18 @@ def main(cli_args=None):
return EXIT_INVALID_ARGS
try:
# load values from conf file or sharder defaults
conf = {}
if args.conf_file:
conf = readconf(args.conf_file, 'container-sharder')
conf.update(dict((k, v) for k, v in vars(args).items()
if v != USE_SHARDER_DEFAULT))
conf_args = ContainerSharderConf(conf)
except (OSError, IOError) as exc:
print('Error opening config file %s: %s' % (args.conf_file, exc),
file=sys.stderr)
return EXIT_ERROR
except (TypeError, ValueError) as exc:
print('Error loading config file %s: %s' % (args.conf_file, exc),
file=sys.stderr)
print('Error loading config: %s' % exc, file=sys.stderr)
return EXIT_INVALID_ARGS
for k, v in vars(args).items():

View File

@ -2234,7 +2234,8 @@ class ContainerBroker(DatabaseBroker):
row = connection.execute(sql, args).fetchone()
return row['name'] if row else None
def find_shard_ranges(self, shard_size, limit=-1, existing_ranges=None):
def find_shard_ranges(self, shard_size, limit=-1, existing_ranges=None,
minimum_shard_size=1):
"""
Scans the container db for shard ranges. Scanning will start at the
upper bound of the any ``existing_ranges`` that are given, otherwise
@ -2253,6 +2254,10 @@ class ContainerBroker(DatabaseBroker):
given, this list should be sorted in order of upper bounds; the
scan for new shard ranges will start at the upper bound of the last
existing ShardRange.
:param minimum_shard_size: Minimum size of the final shard range. If
this is greater than one then the final shard range may be extended
to more than shard_size in order to avoid a further shard range
with less minimum_shard_size rows.
:return: a tuple; the first value in the tuple is a list of
dicts each having keys {'index', 'lower', 'upper', 'object_count'}
in order of ascending 'upper'; the second value in the tuple is a
@ -2260,8 +2265,9 @@ class ContainerBroker(DatabaseBroker):
otherwise.
"""
existing_ranges = existing_ranges or []
minimum_shard_size = max(minimum_shard_size, 1)
object_count = self.get_info().get('object_count', 0)
if shard_size >= object_count:
if shard_size + minimum_shard_size > object_count:
# container not big enough to shard
return [], False
@ -2292,9 +2298,10 @@ class ContainerBroker(DatabaseBroker):
sub_broker = self.get_brokers()[0]
index = len(existing_ranges)
while limit is None or limit < 0 or len(found_ranges) < limit:
if progress + shard_size >= object_count:
# next shard point is at or beyond final object name so don't
# bother with db query
if progress + shard_size + minimum_shard_size > object_count:
# next shard point is within minimum_size rows of the final
# object name, or beyond it, so don't bother with db query.
# This shard will have <= shard_size + (minimum_size - 1) rows.
next_shard_upper = None
else:
try:

View File

@ -630,10 +630,23 @@ class ContainerSharderConf(object):
self.rows_per_shard = get_val(
'rows_per_shard', config_positive_int_value,
max(self.shard_container_threshold // 2, 1))
self.minimum_shard_size = get_val(
'minimum_shard_size', config_positive_int_value,
max(self.rows_per_shard // 5, 1))
self.validate_conf()
def percent_of_threshold(self, val):
return int(config_percent_value(val) * self.shard_container_threshold)
def validate_conf(self):
keys = ('minimum_shard_size', 'rows_per_shard')
vals = [getattr(self, key) for key in keys]
if not vals[0] <= vals[1]:
raise ValueError(
'%s (%d) must be less than %s (%d)'
% (keys[0], vals[0], keys[1], vals[1]))
DEFAULT_SHARDER_CONF = vars(ContainerSharderConf())
@ -1482,7 +1495,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
start = time.time()
shard_data, last_found = broker.find_shard_ranges(
self.rows_per_shard, limit=self.shard_scanner_batch_size,
existing_ranges=shard_ranges)
existing_ranges=shard_ranges,
minimum_shard_size=self.minimum_shard_size)
elapsed = time.time() - start
if not shard_data:

View File

@ -2789,7 +2789,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
self.sharders.once(**kwargs)
def test_manage_shard_ranges(self):
obj_names = self._make_object_names(4)
obj_names = self._make_object_names(7)
self.put_objects(obj_names)
client.post_container(self.url, self.admin_token, self.container_name,
@ -2807,7 +2807,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
self.assert_subprocess_success([
'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, self.brain.nodes[0]),
'find_and_replace', '2', '--enable'])
'find_and_replace', '3', '--enable', '--minimum-shard-size', '2'])
self.assert_container_state(self.brain.nodes[0], 'unsharded', 2)
# "Run container-replicator to replicate them to other nodes."

View File

@ -142,6 +142,7 @@ class TestManageShardRanges(unittest.TestCase):
rows_per_shard = 600
max_shrinking = 33
max_expanding = 31
minimum_shard_size = 88
"""
conf_file = os.path.join(self.testdir, 'sharder.conf')
@ -159,7 +160,8 @@ class TestManageShardRanges(unittest.TestCase):
rows_per_shard=500000,
subcommand='find',
force_commits=False,
verbose=0)
verbose=0,
minimum_shard_size=100000)
mocked.assert_called_once_with(mock.ANY, expected)
# conf file
@ -173,13 +175,15 @@ class TestManageShardRanges(unittest.TestCase):
rows_per_shard=600,
subcommand='find',
force_commits=False,
verbose=0)
verbose=0,
minimum_shard_size=88)
mocked.assert_called_once_with(mock.ANY, expected)
# cli options override conf file
with mock.patch('swift.cli.manage_shard_ranges.find_ranges',
return_value=0) as mocked:
ret = main([db_file, '--config', conf_file, 'find', '12345'])
ret = main([db_file, '--config', conf_file, 'find', '12345',
'--minimum-shard-size', '99'])
self.assertEqual(0, ret)
expected = Namespace(conf_file=conf_file,
path_to_file=mock.ANY,
@ -187,7 +191,8 @@ class TestManageShardRanges(unittest.TestCase):
rows_per_shard=12345,
subcommand='find',
force_commits=False,
verbose=0)
verbose=0,
minimum_shard_size=99)
mocked.assert_called_once_with(mock.ANY, expected)
# default values
@ -377,7 +382,7 @@ class TestManageShardRanges(unittest.TestCase):
ret = main([db_file, '--config', conf_file, 'compact'])
self.assertEqual(2, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Error loading config file')
self.assert_starts_with(err_lines[0], 'Error loading config')
self.assertIn('shard_container_threshold', err_lines[0])
def test_conf_file_invalid_deprecated_options(self):
@ -403,7 +408,7 @@ class TestManageShardRanges(unittest.TestCase):
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, '--config', conf_file, 'compact'])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Error loading config file')
self.assert_starts_with(err_lines[0], 'Error loading config')
self.assertIn('shard_shrink_point', err_lines[0])
def test_conf_file_does_not_exist(self):
@ -457,7 +462,7 @@ class TestManageShardRanges(unittest.TestCase):
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([db_file, 'find', '99'])
ret = main([db_file, 'find', '99', '--minimum-shard-size', '1'])
self.assertEqual(0, ret)
self.assert_formatted_json(out.getvalue(), [
{'index': 0, 'lower': '', 'upper': 'obj98', 'object_count': 99},
@ -496,6 +501,91 @@ class TestManageShardRanges(unittest.TestCase):
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 10 ranges in ')
def test_find_shard_ranges_with_minimum_size(self):
db_file = os.path.join(self.testdir, 'hash.db')
broker = ContainerBroker(db_file)
broker.account = 'a'
broker.container = 'c'
broker.initialize()
ts = utils.Timestamp.now()
# with 105 objects and rows_per_shard = 50 there is the potential for a
# tail shard of size 5
broker.merge_items([
{'name': 'obj%03d' % i, 'created_at': ts.internal, 'size': 0,
'content_type': 'application/octet-stream', 'etag': 'not-really',
'deleted': 0, 'storage_policy_index': 0,
'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
for i in range(105)])
def assert_tail_shard_not_extended(minimum):
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([db_file, 'find', '50',
'--minimum-shard-size', str(minimum)])
self.assertEqual(0, ret)
self.assert_formatted_json(out.getvalue(), [
{'index': 0, 'lower': '', 'upper': 'obj049',
'object_count': 50},
{'index': 1, 'lower': 'obj049', 'upper': 'obj099',
'object_count': 50},
{'index': 2, 'lower': 'obj099', 'upper': '',
'object_count': 5},
])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 3 ranges in ')
# tail shard size > minimum
assert_tail_shard_not_extended(1)
assert_tail_shard_not_extended(4)
assert_tail_shard_not_extended(5)
def assert_tail_shard_extended(minimum):
out = StringIO()
err = StringIO()
if minimum is not None:
extra_args = ['--minimum-shard-size', str(minimum)]
else:
extra_args = []
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([db_file, 'find', '50'] + extra_args)
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_formatted_json(out.getvalue(), [
{'index': 0, 'lower': '', 'upper': 'obj049',
'object_count': 50},
{'index': 1, 'lower': 'obj049', 'upper': '',
'object_count': 55},
])
self.assert_starts_with(err_lines[1], 'Found 2 ranges in ')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
# sanity check - no minimum specified, defaults to rows_per_shard/5
assert_tail_shard_extended(None)
assert_tail_shard_extended(6)
assert_tail_shard_extended(50)
def assert_too_large_value_handled(minimum):
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([db_file, 'find', '50',
'--minimum-shard-size', str(minimum)])
self.assertEqual(2, ret)
self.assertEqual(
'Error loading config: minimum_shard_size (%s) must be less '
'than rows_per_shard (50)' % minimum, err.getvalue().strip())
assert_too_large_value_handled(51)
assert_too_large_value_handled(52)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
with self.assertRaises(SystemExit):
main([db_file, 'find', '50', '--minimum-shard-size', '-1'])
def test_info(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':

View File

@ -4340,7 +4340,7 @@ class TestContainerBroker(unittest.TestCase):
container_name = 'test_container'
def do_test(expected_bounds, expected_last_found, shard_size, limit,
start_index=0, existing=None):
start_index=0, existing=None, minimum_size=1):
# expected_bounds is a list of tuples (lower, upper, object_count)
# build expected shard ranges
expected_shard_ranges = [
@ -4352,7 +4352,8 @@ class TestContainerBroker(unittest.TestCase):
with mock.patch('swift.common.utils.time.time',
return_value=float(ts_now.normal)):
ranges, last_found = broker.find_shard_ranges(
shard_size, limit=limit, existing_ranges=existing)
shard_size, limit=limit, existing_ranges=existing,
minimum_shard_size=minimum_size)
self.assertEqual(expected_shard_ranges, ranges)
self.assertEqual(expected_last_found, last_found)
@ -4397,6 +4398,20 @@ class TestContainerBroker(unittest.TestCase):
do_test(expected, True, shard_size=4, limit=4)
do_test(expected, True, shard_size=4, limit=-1)
# check use of minimum_shard_size
expected = [(c_lower, 'obj03', 4), ('obj03', 'obj07', 4),
('obj07', c_upper, 2)]
do_test(expected, True, shard_size=4, limit=None, minimum_size=2)
# crazy values ignored...
do_test(expected, True, shard_size=4, limit=None, minimum_size=0)
do_test(expected, True, shard_size=4, limit=None, minimum_size=-1)
# minimum_size > potential final shard
expected = [(c_lower, 'obj03', 4), ('obj03', c_upper, 6)]
do_test(expected, True, shard_size=4, limit=None, minimum_size=3)
# extended shard size >= object_count
do_test([], False, shard_size=6, limit=None, minimum_size=5)
do_test([], False, shard_size=6, limit=None, minimum_size=500)
# increase object count to 11
broker.put_object(
'obj10', next(self.ts).internal, 0, 'text/plain', 'etag')

View File

@ -4222,6 +4222,7 @@ class TestSharder(BaseTestSharder):
broker, objects = self._setup_old_style_find_ranges(
account, cont, lower, upper)
with self._mock_sharder(conf={'shard_container_threshold': 199,
'minimum_shard_size': 1,
'auto_create_account_prefix': '.int_'}
) as sharder:
with mock_timestamp_now() as now:
@ -4237,6 +4238,7 @@ class TestSharder(BaseTestSharder):
# second invocation finds none
with self._mock_sharder(conf={'shard_container_threshold': 199,
'minimum_shard_size': 1,
'auto_create_account_prefix': '.int_'}
) as sharder:
num_found = sharder._find_shard_ranges(broker)
@ -4317,10 +4319,11 @@ class TestSharder(BaseTestSharder):
self._assert_shard_ranges_equal(expected_ranges,
broker.get_shard_ranges())
# first invocation finds both ranges
# first invocation finds both ranges, sizes 99 and 1
broker, objects = self._setup_find_ranges(
account, cont, lower, upper)
with self._mock_sharder(conf={'shard_container_threshold': 199,
'minimum_shard_size': 1,
'auto_create_account_prefix': '.int_'}
) as sharder:
with mock_timestamp_now() as now:
@ -4371,9 +4374,11 @@ class TestSharder(BaseTestSharder):
now, objects[89][0], upper, 10),
]
# first invocation finds 2 ranges
# (third shard range will be > minimum_shard_size)
with self._mock_sharder(
conf={'shard_container_threshold': 90,
'shard_scanner_batch_size': 2}) as sharder:
'shard_scanner_batch_size': 2,
'minimum_shard_size': 10}) as sharder:
with mock_timestamp_now(now):
num_found = sharder._find_shard_ranges(broker)
self.assertEqual(45, sharder.rows_per_shard)
@ -4388,8 +4393,9 @@ class TestSharder(BaseTestSharder):
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
# second invocation finds third shard range
with self._mock_sharder(conf={'shard_container_threshold': 199,
'shard_scanner_batch_size': 2}
with self._mock_sharder(conf={'shard_container_threshold': 90,
'shard_scanner_batch_size': 2,
'minimum_shard_size': 10}
) as sharder:
with mock_timestamp_now(now):
num_found = sharder._find_shard_ranges(broker)
@ -4405,7 +4411,8 @@ class TestSharder(BaseTestSharder):
# third invocation finds none
with self._mock_sharder(conf={'shard_container_threshold': 199,
'shard_scanner_batch_size': 2}
'shard_scanner_batch_size': 2,
'minimum_shard_size': 10}
) as sharder:
sharder._send_shard_ranges = mock.MagicMock(return_value=True)
num_found = sharder._find_shard_ranges(broker)
@ -4425,6 +4432,41 @@ class TestSharder(BaseTestSharder):
def test_find_shard_ranges_finds_three_shard(self):
self._check_find_shard_ranges_finds_three('.shards_a', 'c_', 'l', 'u')
def test_find_shard_ranges_with_minimum_size(self):
cont = 'c_'
lower = 'l'
upper = 'u'
broker, objects = self._setup_find_ranges(
'.shards_a', 'c_', lower, upper)
now = Timestamp.now()
expected_ranges = [
ShardRange(
ShardRange.make_path('.shards_a', 'c', cont, now, 0),
now, lower, objects[44][0], 45),
ShardRange(
ShardRange.make_path('.shards_a', 'c', cont, now, 1),
now, objects[44][0], upper, 55),
]
# first invocation finds 2 ranges - second has been extended to avoid
# final shard range < minimum_size
with self._mock_sharder(
conf={'shard_container_threshold': 90,
'shard_scanner_batch_size': 2,
'minimum_shard_size': 11}) as sharder:
with mock_timestamp_now(now):
num_found = sharder._find_shard_ranges(broker)
self.assertEqual(45, sharder.rows_per_shard)
self.assertEqual(11, sharder.minimum_shard_size)
self.assertEqual(2, num_found)
self.assertEqual(2, len(broker.get_shard_ranges()))
self._assert_shard_ranges_equal(expected_ranges[:2],
broker.get_shard_ranges())
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
'found': 2, 'min_time': mock.ANY,
'max_time': mock.ANY}
stats = self._assert_stats(expected_stats, sharder, 'scanned')
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
def test_sharding_enabled(self):
broker = self._make_broker()
self.assertFalse(sharding_enabled(broker))
@ -5564,13 +5606,14 @@ class TestSharder(BaseTestSharder):
def test_find_and_enable_sharding_candidates_bootstrap(self):
broker = self._make_broker()
with self._mock_sharder(
conf={'shard_container_threshold': 1}) as sharder:
conf={'shard_container_threshold': 2}) as sharder:
sharder._find_and_enable_sharding_candidates(broker)
self.assertEqual(ShardRange.ACTIVE, broker.get_own_shard_range().state)
broker.put_object('obj', next(self.ts_iter).internal, 1, '', '')
self.assertEqual(1, broker.get_info()['object_count'])
broker.put_object('obj1', next(self.ts_iter).internal, 1, '', '')
broker.put_object('obj2', next(self.ts_iter).internal, 1, '', '')
self.assertEqual(2, broker.get_info()['object_count'])
with self._mock_sharder(
conf={'shard_container_threshold': 1}) as sharder:
conf={'shard_container_threshold': 2}) as sharder:
with mock_timestamp_now() as now:
sharder._find_and_enable_sharding_candidates(
broker, [broker.get_own_shard_range()])
@ -5581,7 +5624,7 @@ class TestSharder(BaseTestSharder):
# check idempotency
with self._mock_sharder(
conf={'shard_container_threshold': 1}) as sharder:
conf={'shard_container_threshold': 2}) as sharder:
with mock_timestamp_now():
sharder._find_and_enable_sharding_candidates(
broker, [broker.get_own_shard_range()])
@ -7287,7 +7330,8 @@ class TestContainerSharderConf(unittest.TestCase):
'auto_shard': False,
'shrink_threshold': 100000,
'expansion_limit': 750000,
'rows_per_shard': 500000}
'rows_per_shard': 500000,
'minimum_shard_size': 100000}
self.assertEqual(expected, vars(ContainerSharderConf()))
self.assertEqual(expected, vars(ContainerSharderConf(None)))
self.assertEqual(expected, DEFAULT_SHARDER_CONF)
@ -7306,7 +7350,8 @@ class TestContainerSharderConf(unittest.TestCase):
'auto_shard': True,
'shrink_threshold': 100001,
'expansion_limit': 750001,
'rows_per_shard': 500001}
'rows_per_shard': 500001,
'minimum_shard_size': 20}
expected = dict(conf)
conf.update({'unexpected': 'option'})
self.assertEqual(expected, vars(ContainerSharderConf(conf)))
@ -7322,7 +7367,8 @@ class TestContainerSharderConf(unittest.TestCase):
'recon_candidates_limit': 6,
'recon_sharded_timeout': 43201,
'conn_timeout': 5.1,
'auto_shard': True}
'auto_shard': True,
'minimum_shard_size': 1}
# percent options work
deprecated_conf = {'shard_shrink_point': 9,
@ -7360,7 +7406,8 @@ class TestContainerSharderConf(unittest.TestCase):
'shrink_threshold': not_int,
'expansion_limit': not_int,
'shard_shrink_point': not_percent,
'shard_shrink_merge_point': not_percent}
'shard_shrink_merge_point': not_percent,
'minimum_shard_size': not_positive_int}
for key, bad_values in bad.items():
for bad_value in bad_values: