Merge "manage-shard-ranges: add gap repair option"
This commit is contained in:
commit
79fe49b129
@ -173,7 +173,7 @@ from swift.container.sharder import make_shard_ranges, sharding_enabled, \
|
|||||||
CleavingContext, process_compactible_shard_sequences, \
|
CleavingContext, process_compactible_shard_sequences, \
|
||||||
find_compactible_shard_sequences, find_overlapping_ranges, \
|
find_compactible_shard_sequences, find_overlapping_ranges, \
|
||||||
find_paths, rank_paths, finalize_shrinking, DEFAULT_SHARDER_CONF, \
|
find_paths, rank_paths, finalize_shrinking, DEFAULT_SHARDER_CONF, \
|
||||||
ContainerSharderConf
|
ContainerSharderConf, find_paths_with_gaps
|
||||||
|
|
||||||
EXIT_SUCCESS = 0
|
EXIT_SUCCESS = 0
|
||||||
EXIT_ERROR = 1
|
EXIT_ERROR = 1
|
||||||
@ -225,8 +225,8 @@ def _print_shard_range(sr, level=0):
|
|||||||
print(indent + '%r' % sr.name)
|
print(indent + '%r' % sr.name)
|
||||||
print(indent + ' objects: %9d, tombstones: %9d, lower: %r'
|
print(indent + ' objects: %9d, tombstones: %9d, lower: %r'
|
||||||
% (sr.object_count, sr.tombstones, sr.lower_str))
|
% (sr.object_count, sr.tombstones, sr.lower_str))
|
||||||
print(indent + ' state: %9s, upper: %r'
|
print(indent + ' state: %9s, deleted: %d upper: %r'
|
||||||
% (sr.state_text, sr.upper_str))
|
% (sr.state_text, sr.deleted, sr.upper_str))
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
@ -596,6 +596,78 @@ def _find_overlapping_donors(shard_ranges, own_sr, args):
|
|||||||
return acceptor_path, overlapping_donors
|
return acceptor_path, overlapping_donors
|
||||||
|
|
||||||
|
|
||||||
|
def _fix_gaps(broker, args, paths_with_gaps):
|
||||||
|
timestamp = Timestamp.now()
|
||||||
|
solutions = []
|
||||||
|
print('Found %d gaps:' % len(paths_with_gaps))
|
||||||
|
for start_path, gap_range, end_path in paths_with_gaps:
|
||||||
|
if end_path[0].state == ShardRange.ACTIVE:
|
||||||
|
expanding_range = end_path[0]
|
||||||
|
solutions.append((gap_range, expanding_range))
|
||||||
|
elif start_path[-1].state == ShardRange.ACTIVE:
|
||||||
|
expanding_range = start_path[-1]
|
||||||
|
solutions.append((gap_range, expanding_range))
|
||||||
|
else:
|
||||||
|
expanding_range = None
|
||||||
|
print(' gap: %r - %r'
|
||||||
|
% (gap_range.lower, gap_range.upper))
|
||||||
|
print(' apparent gap contents:')
|
||||||
|
for sr in broker.get_shard_ranges(marker=gap_range.lower,
|
||||||
|
end_marker=gap_range.upper,
|
||||||
|
include_deleted=True):
|
||||||
|
_print_shard_range(sr, 3)
|
||||||
|
if expanding_range:
|
||||||
|
print(' gap can be fixed by expanding neighbor range:')
|
||||||
|
_print_shard_range(expanding_range, 3)
|
||||||
|
else:
|
||||||
|
print('Warning: cannot fix gap: non-ACTIVE neighbors')
|
||||||
|
|
||||||
|
if args.max_expanding >= 0:
|
||||||
|
solutions = solutions[:args.max_expanding]
|
||||||
|
|
||||||
|
# it's possible that an expanding range is used twice, expanding both down
|
||||||
|
# and up; if so, we only want one copy of it in our merged shard ranges
|
||||||
|
expanding_ranges = {}
|
||||||
|
for gap_range, expanding_range in solutions:
|
||||||
|
expanding_range.expand([gap_range])
|
||||||
|
expanding_range.timestamp = timestamp
|
||||||
|
expanding_ranges[expanding_range.name] = expanding_range
|
||||||
|
|
||||||
|
print('')
|
||||||
|
print('Repairs necessary to fill gaps.')
|
||||||
|
print('The following expanded shard range(s) will be applied to the DB:')
|
||||||
|
for expanding_range in sorted(expanding_ranges.values(),
|
||||||
|
key=lambda s: s.lower):
|
||||||
|
_print_shard_range(expanding_range, 2)
|
||||||
|
print('')
|
||||||
|
print(
|
||||||
|
'It is recommended that no other concurrent changes are made to the \n'
|
||||||
|
'shard ranges while fixing gaps. If necessary, abort this change \n'
|
||||||
|
'and stop any auto-sharding processes before repeating this command.'
|
||||||
|
)
|
||||||
|
print('')
|
||||||
|
|
||||||
|
if not _proceed(args):
|
||||||
|
return EXIT_USER_QUIT
|
||||||
|
|
||||||
|
broker.merge_shard_ranges(list(expanding_ranges.values()))
|
||||||
|
print('Run container-replicator to replicate the changes to other nodes.')
|
||||||
|
print('Run container-sharder on all nodes to fill gaps.')
|
||||||
|
return EXIT_SUCCESS
|
||||||
|
|
||||||
|
|
||||||
|
def repair_gaps(broker, args):
|
||||||
|
shard_ranges = broker.get_shard_ranges()
|
||||||
|
paths_with_gaps = find_paths_with_gaps(shard_ranges)
|
||||||
|
if paths_with_gaps:
|
||||||
|
return _fix_gaps(broker, args, paths_with_gaps)
|
||||||
|
else:
|
||||||
|
print('Found one complete sequence of %d shard ranges with no gaps.'
|
||||||
|
% len(shard_ranges))
|
||||||
|
print('No repairs necessary.')
|
||||||
|
return EXIT_SUCCESS
|
||||||
|
|
||||||
|
|
||||||
def print_repair_solution(acceptor_path, overlapping_donors):
|
def print_repair_solution(acceptor_path, overlapping_donors):
|
||||||
print('Donors:')
|
print('Donors:')
|
||||||
for donor in sorted(overlapping_donors):
|
for donor in sorted(overlapping_donors):
|
||||||
@ -647,12 +719,7 @@ def find_repair_solution(shard_ranges, own_sr, args):
|
|||||||
return acceptor_path, overlapping_donors
|
return acceptor_path, overlapping_donors
|
||||||
|
|
||||||
|
|
||||||
def repair_shard_ranges(broker, args):
|
def repair_overlaps(broker, args):
|
||||||
if not broker.is_root_container():
|
|
||||||
print('WARNING: Shard containers cannot be repaired.')
|
|
||||||
print('This command should be used on a root container.')
|
|
||||||
return EXIT_ERROR
|
|
||||||
|
|
||||||
shard_ranges = broker.get_shard_ranges()
|
shard_ranges = broker.get_shard_ranges()
|
||||||
if not shard_ranges:
|
if not shard_ranges:
|
||||||
print('No shards found, nothing to do.')
|
print('No shards found, nothing to do.')
|
||||||
@ -682,6 +749,17 @@ def repair_shard_ranges(broker, args):
|
|||||||
return EXIT_SUCCESS
|
return EXIT_SUCCESS
|
||||||
|
|
||||||
|
|
||||||
|
def repair_shard_ranges(broker, args):
|
||||||
|
if not broker.is_root_container():
|
||||||
|
print('WARNING: Shard containers cannot be repaired.')
|
||||||
|
print('This command should be used on a root container.')
|
||||||
|
return EXIT_ERROR
|
||||||
|
if args.gaps:
|
||||||
|
return repair_gaps(broker, args)
|
||||||
|
else:
|
||||||
|
return repair_overlaps(broker, args)
|
||||||
|
|
||||||
|
|
||||||
def analyze_shard_ranges(args):
|
def analyze_shard_ranges(args):
|
||||||
shard_data = _load_and_validate_shard_data(args, require_index=False)
|
shard_data = _load_and_validate_shard_data(args, require_index=False)
|
||||||
for data in shard_data:
|
for data in shard_data:
|
||||||
@ -720,13 +798,17 @@ def _add_find_args(parser):
|
|||||||
'than minimum-shard-size rows.')
|
'than minimum-shard-size rows.')
|
||||||
|
|
||||||
|
|
||||||
def _add_replace_args(parser):
|
def _add_account_prefix_arg(parser):
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--shards_account_prefix', metavar='shards_account_prefix', type=str,
|
'--shards_account_prefix', metavar='shards_account_prefix', type=str,
|
||||||
required=False, default='.shards_',
|
required=False, default='.shards_',
|
||||||
help="Prefix for shards account. The default is '.shards_'. This "
|
help="Prefix for shards account. The default is '.shards_'. This "
|
||||||
"should only be changed if the auto_create_account_prefix option "
|
"should only be changed if the auto_create_account_prefix option "
|
||||||
"has been similarly changed in swift.conf.")
|
"has been similarly changed in swift.conf.")
|
||||||
|
|
||||||
|
|
||||||
|
def _add_replace_args(parser):
|
||||||
|
_add_account_prefix_arg(parser)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--replace-timeout', type=int, default=600,
|
'--replace-timeout', type=int, default=600,
|
||||||
help='Minimum DB timeout to use when replacing shard ranges.')
|
help='Minimum DB timeout to use when replacing shard ranges.')
|
||||||
@ -756,6 +838,14 @@ def _add_prompt_args(parser):
|
|||||||
'Cannot be used with --yes option.')
|
'Cannot be used with --yes option.')
|
||||||
|
|
||||||
|
|
||||||
|
def _add_max_expanding_arg(parser):
|
||||||
|
parser.add_argument('--max-expanding', nargs='?',
|
||||||
|
type=_positive_int,
|
||||||
|
default=USE_SHARDER_DEFAULT,
|
||||||
|
help='Maximum number of shards that should be '
|
||||||
|
'expanded. Defaults to unlimited.')
|
||||||
|
|
||||||
|
|
||||||
def _make_parser():
|
def _make_parser():
|
||||||
parser = argparse.ArgumentParser(description='Manage shard ranges')
|
parser = argparse.ArgumentParser(description='Manage shard ranges')
|
||||||
parser.add_argument('path_to_file',
|
parser.add_argument('path_to_file',
|
||||||
@ -876,11 +966,7 @@ def _make_parser():
|
|||||||
'than 1 may result in temporary gaps in '
|
'than 1 may result in temporary gaps in '
|
||||||
'object listings until all selected '
|
'object listings until all selected '
|
||||||
'shards have shrunk.')
|
'shards have shrunk.')
|
||||||
compact_parser.add_argument('--max-expanding', nargs='?',
|
_add_max_expanding_arg(compact_parser)
|
||||||
type=_positive_int,
|
|
||||||
default=USE_SHARDER_DEFAULT,
|
|
||||||
help='Maximum number of shards that should be '
|
|
||||||
'expanded. Defaults to unlimited.')
|
|
||||||
compact_parser.set_defaults(func=compact_shard_ranges)
|
compact_parser.set_defaults(func=compact_shard_ranges)
|
||||||
|
|
||||||
# repair
|
# repair
|
||||||
@ -889,6 +975,12 @@ def _make_parser():
|
|||||||
help='Repair overlapping shard ranges. No action will be taken '
|
help='Repair overlapping shard ranges. No action will be taken '
|
||||||
'without user confirmation unless the -y option is used.')
|
'without user confirmation unless the -y option is used.')
|
||||||
_add_prompt_args(repair_parser)
|
_add_prompt_args(repair_parser)
|
||||||
|
# TODO: maybe this should be a separate subcommand given that it needs
|
||||||
|
# some extra options vs repairing overlaps?
|
||||||
|
repair_parser.add_argument(
|
||||||
|
'--gaps', action='store_true', default=False,
|
||||||
|
help='Repair gaps in shard ranges.')
|
||||||
|
_add_max_expanding_arg(repair_parser)
|
||||||
repair_parser.set_defaults(func=repair_shard_ranges)
|
repair_parser.set_defaults(func=repair_shard_ranges)
|
||||||
|
|
||||||
# analyze
|
# analyze
|
||||||
|
@ -80,25 +80,78 @@ def make_shard_ranges(broker, shard_data, shards_account_prefix):
|
|||||||
return shard_ranges
|
return shard_ranges
|
||||||
|
|
||||||
|
|
||||||
def find_missing_ranges(shard_ranges):
|
def _find_discontinuity(paths, start):
|
||||||
"""
|
# select the path that reaches furthest from start into the namespace
|
||||||
Find any ranges in the entire object namespace that are not covered by any
|
start_paths = [path for path in paths if path.lower == start]
|
||||||
shard range in the given list.
|
start_paths.sort(key=lambda p: p.upper)
|
||||||
|
longest_start_path = start_paths[-1]
|
||||||
|
# search for paths that end further into the namespace (note: these must
|
||||||
|
# have a lower that differs from the start_path upper, otherwise they would
|
||||||
|
# be part of the start_path longer!)
|
||||||
|
end_paths = [path for path in paths
|
||||||
|
if path.upper > longest_start_path.upper]
|
||||||
|
if end_paths:
|
||||||
|
# select those that begin nearest the start of the namespace
|
||||||
|
end_paths.sort(key=lambda p: p.lower)
|
||||||
|
end_paths = [p for p in end_paths if p.lower == end_paths[0].lower]
|
||||||
|
# select the longest of those
|
||||||
|
end_paths.sort(key=lambda p: p.upper)
|
||||||
|
longest_end_path = end_paths[-1]
|
||||||
|
else:
|
||||||
|
longest_end_path = None
|
||||||
|
return longest_start_path, longest_end_path
|
||||||
|
|
||||||
:param shard_ranges: A list of :class:`~swift.utils.ShardRange`
|
|
||||||
:return: a list of missing ranges
|
def find_paths_with_gaps(shard_ranges):
|
||||||
"""
|
"""
|
||||||
gaps = []
|
Find gaps in the shard ranges and pairs of shard range paths that lead to
|
||||||
if not shard_ranges:
|
and from those gaps. For each gap a single pair of adjacent paths is
|
||||||
return ((ShardRange.MIN, ShardRange.MAX),)
|
selected. The concatenation of all selected paths and gaps will span the
|
||||||
if shard_ranges[0].lower > ShardRange.MIN:
|
entire namespace with no overlaps.
|
||||||
gaps.append((ShardRange.MIN, shard_ranges[0].lower))
|
|
||||||
for first, second in zip(shard_ranges, shard_ranges[1:]):
|
:param shard_ranges: a list of instances of ShardRange.
|
||||||
if first.upper < second.lower:
|
:return: A list of tuples of ``(start_path, gap_range, end_path)`` where
|
||||||
gaps.append((first.upper, second.lower))
|
``start_path`` is a list of ShardRanges leading to the gap,
|
||||||
if shard_ranges[-1].upper < ShardRange.MAX:
|
``gap_range`` is a ShardRange synthesized to describe the namespace
|
||||||
gaps.append((shard_ranges[-1].upper, ShardRange.MAX))
|
gap, and ``end_path`` is a list of ShardRanges leading from the gap.
|
||||||
return gaps
|
When gaps start or end at the namespace minimum or maximum bounds,
|
||||||
|
``start_path`` and ``end_path`` may be 'null' paths that contain a
|
||||||
|
single ShardRange covering either the minimum or maximum of the
|
||||||
|
namespace.
|
||||||
|
"""
|
||||||
|
timestamp = Timestamp.now()
|
||||||
|
shard_ranges = ShardRangeList(shard_ranges)
|
||||||
|
# note: find_paths results do not include shrinking ranges
|
||||||
|
paths = find_paths(shard_ranges)
|
||||||
|
# add paths covering no namespace at start and end of namespace to ensure
|
||||||
|
# that a start_path and end_path is always found even when there is a gap
|
||||||
|
# at the start or end of the namespace
|
||||||
|
null_start = ShardRange('null/start', timestamp,
|
||||||
|
lower=ShardRange.MIN,
|
||||||
|
upper=ShardRange.MIN,
|
||||||
|
state=ShardRange.FOUND)
|
||||||
|
null_end = ShardRange('null/end', timestamp,
|
||||||
|
lower=ShardRange.MAX,
|
||||||
|
upper=ShardRange.MAX,
|
||||||
|
state=ShardRange.FOUND)
|
||||||
|
paths.extend([ShardRangeList([null_start]), ShardRangeList([null_end])])
|
||||||
|
paths_with_gaps = []
|
||||||
|
start = null_start.lower
|
||||||
|
while True:
|
||||||
|
start_path, end_path = _find_discontinuity(paths, start)
|
||||||
|
if end_path is None:
|
||||||
|
# end of namespace reached
|
||||||
|
break
|
||||||
|
start = end_path.lower
|
||||||
|
if start_path.upper > end_path.lower:
|
||||||
|
# overlap
|
||||||
|
continue
|
||||||
|
gap_range = ShardRange('gap/index_%06d' % len(paths_with_gaps),
|
||||||
|
timestamp,
|
||||||
|
lower=start_path.upper,
|
||||||
|
upper=end_path.lower)
|
||||||
|
paths_with_gaps.append((start_path, gap_range, end_path))
|
||||||
|
return paths_with_gaps
|
||||||
|
|
||||||
|
|
||||||
def find_overlapping_ranges(shard_ranges):
|
def find_overlapping_ranges(shard_ranges):
|
||||||
@ -1040,12 +1093,12 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
if own_shard_range.state in (ShardRange.SHARDING, ShardRange.SHARDED):
|
if own_shard_range.state in (ShardRange.SHARDING, ShardRange.SHARDED):
|
||||||
shard_ranges = [sr for sr in broker.get_shard_ranges()
|
shard_ranges = [sr for sr in broker.get_shard_ranges()
|
||||||
if sr.state != ShardRange.SHRINKING]
|
if sr.state != ShardRange.SHRINKING]
|
||||||
missing_ranges = find_missing_ranges(shard_ranges)
|
paths_with_gaps = find_paths_with_gaps(shard_ranges)
|
||||||
if missing_ranges:
|
if paths_with_gaps:
|
||||||
warnings.append(
|
warnings.append(
|
||||||
'missing range(s): %s' %
|
'missing range(s): %s' %
|
||||||
' '.join(['%s-%s' % (lower, upper)
|
' '.join(['%s-%s' % (gap.lower, gap.upper)
|
||||||
for lower, upper in missing_ranges]))
|
for (_, gap, _) in paths_with_gaps]))
|
||||||
|
|
||||||
for state in ShardRange.STATES:
|
for state in ShardRange.STATES:
|
||||||
if state == ShardRange.SHRINKING:
|
if state == ShardRange.SHRINKING:
|
||||||
@ -1970,6 +2023,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
# hammering the root
|
# hammering the root
|
||||||
own_shard_range.reported = True
|
own_shard_range.reported = True
|
||||||
broker.merge_shard_ranges(own_shard_range)
|
broker.merge_shard_ranges(own_shard_range)
|
||||||
|
self.logger.debug(
|
||||||
|
'updated root objs=%d, tombstones=%s (%s)',
|
||||||
|
own_shard_range.object_count, own_shard_range.tombstones,
|
||||||
|
quote(broker.path))
|
||||||
|
|
||||||
def _process_broker(self, broker, node, part):
|
def _process_broker(self, broker, node, part):
|
||||||
broker.get_info() # make sure account/container are populated
|
broker.get_info() # make sure account/container are populated
|
||||||
|
@ -133,6 +133,7 @@ class BaseTestContainerSharding(ReplProbeTest):
|
|||||||
self.sharders = Manager(['container-sharder'])
|
self.sharders = Manager(['container-sharder'])
|
||||||
self.internal_client = self.make_internal_client()
|
self.internal_client = self.make_internal_client()
|
||||||
self.memcache = MemcacheRing(['127.0.0.1:11211'])
|
self.memcache = MemcacheRing(['127.0.0.1:11211'])
|
||||||
|
self.container_replicators = Manager(['container-replicator'])
|
||||||
|
|
||||||
def init_brain(self, container_name):
|
def init_brain(self, container_name):
|
||||||
self.container_to_shard = container_name
|
self.container_to_shard = container_name
|
||||||
@ -371,9 +372,13 @@ class BaseTestContainerSharding(ReplProbeTest):
|
|||||||
else:
|
else:
|
||||||
self.fail('No shard sysmeta found in %s' % headers)
|
self.fail('No shard sysmeta found in %s' % headers)
|
||||||
|
|
||||||
def assert_container_state(self, node, expected_state, num_shard_ranges):
|
def assert_container_state(self, node, expected_state, num_shard_ranges,
|
||||||
|
account=None, container=None, part=None):
|
||||||
|
account = account or self.account
|
||||||
|
container = container or self.container_to_shard
|
||||||
|
part = part or self.brain.part
|
||||||
headers, shard_ranges = direct_client.direct_get_container(
|
headers, shard_ranges = direct_client.direct_get_container(
|
||||||
node, self.brain.part, self.account, self.container_to_shard,
|
node, part, account, container,
|
||||||
headers={'X-Backend-Record-Type': 'shard'})
|
headers={'X-Backend-Record-Type': 'shard'})
|
||||||
self.assertEqual(num_shard_ranges, len(shard_ranges))
|
self.assertEqual(num_shard_ranges, len(shard_ranges))
|
||||||
self.assertIn('X-Backend-Sharding-State', headers)
|
self.assertIn('X-Backend-Sharding-State', headers)
|
||||||
@ -383,7 +388,7 @@ class BaseTestContainerSharding(ReplProbeTest):
|
|||||||
|
|
||||||
def assert_subprocess_success(self, cmd_args):
|
def assert_subprocess_success(self, cmd_args):
|
||||||
try:
|
try:
|
||||||
subprocess.check_output(cmd_args, stderr=subprocess.STDOUT)
|
return subprocess.check_output(cmd_args, stderr=subprocess.STDOUT)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
# why not 'except CalledProcessError'? because in my py3.6 tests
|
# why not 'except CalledProcessError'? because in my py3.6 tests
|
||||||
# the CalledProcessError wasn't caught by that! despite type(exc)
|
# the CalledProcessError wasn't caught by that! despite type(exc)
|
||||||
@ -3330,3 +3335,174 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
|||||||
self.assertEqual(1, len(sharded_shard_ranges), root_shard_ranges)
|
self.assertEqual(1, len(sharded_shard_ranges), root_shard_ranges)
|
||||||
|
|
||||||
self.assert_container_listing(expected_obj_names)
|
self.assert_container_listing(expected_obj_names)
|
||||||
|
|
||||||
|
def test_manage_shard_ranges_repair_root_shrinking_gaps(self):
|
||||||
|
# provoke shrinking/shrunk gaps by prematurely repairing a transient
|
||||||
|
# overlap in root container; repair the gap.
|
||||||
|
# note: be careful not to add a container listing to this test which
|
||||||
|
# would get shard ranges into memcache
|
||||||
|
obj_names = self._make_object_names(4)
|
||||||
|
self.put_objects(obj_names)
|
||||||
|
|
||||||
|
client.post_container(self.url, self.admin_token, self.container_name,
|
||||||
|
headers={'X-Container-Sharding': 'on'})
|
||||||
|
|
||||||
|
# run replicators first time to get sync points set
|
||||||
|
self.container_replicators.once(
|
||||||
|
additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
|
||||||
|
# shard root
|
||||||
|
root_0_db_file = self.get_db_file(self.brain.part, self.brain.nodes[0])
|
||||||
|
self.assert_subprocess_success([
|
||||||
|
'swift-manage-shard-ranges',
|
||||||
|
root_0_db_file,
|
||||||
|
'find_and_replace', '2', '--enable'])
|
||||||
|
self.container_replicators.once(
|
||||||
|
additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
for node in self.brain.nodes:
|
||||||
|
self.assert_container_state(node, 'unsharded', 2)
|
||||||
|
self.sharders_once(additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
# get shards to update state from parent...
|
||||||
|
self.sharders_once()
|
||||||
|
for node in self.brain.nodes:
|
||||||
|
self.assert_container_state(node, 'sharded', 2)
|
||||||
|
|
||||||
|
# sanity check, all is well
|
||||||
|
msg = self.assert_subprocess_success([
|
||||||
|
'swift-manage-shard-ranges', root_0_db_file, 'repair', '--gaps',
|
||||||
|
'--dry-run'])
|
||||||
|
self.assertIn(b'No repairs necessary.', msg)
|
||||||
|
|
||||||
|
# shard first shard into 2 sub-shards while root node 0 is disabled
|
||||||
|
self.stop_container_servers(node_numbers=slice(0, 1))
|
||||||
|
shard_ranges = self.get_container_shard_ranges()
|
||||||
|
shard_brokers = [self.get_shard_broker(shard_ranges[0], node_index=i)
|
||||||
|
for i in range(3)]
|
||||||
|
self.assert_subprocess_success([
|
||||||
|
'swift-manage-shard-ranges',
|
||||||
|
shard_brokers[0].db_file,
|
||||||
|
'find_and_replace', '1', '--enable'])
|
||||||
|
shard_part, shard_nodes = self.brain.ring.get_nodes(
|
||||||
|
shard_ranges[0].account, shard_ranges[0].container)
|
||||||
|
self.container_replicators.once(
|
||||||
|
additional_args='--partitions=%s' % shard_part)
|
||||||
|
# TODO: get this assertion working (node filtering wonky??)
|
||||||
|
# for node in [n for n in shard_nodes if n != self.brain.nodes[0]]:
|
||||||
|
# self.assert_container_state(
|
||||||
|
# node, 'unsharded', 2, account=shard_ranges[0].account,
|
||||||
|
# container=shard_ranges[0].container, part=shard_part)
|
||||||
|
self.sharders_once(additional_args='--partitions=%s' % shard_part)
|
||||||
|
# get shards to update state from parent...
|
||||||
|
self.sharders_once()
|
||||||
|
# TODO: get this assertion working (node filtering wonky??)
|
||||||
|
# for node in [n for n in shard_nodes if n != self.brain.nodes[0]]:
|
||||||
|
# self.assert_container_state(
|
||||||
|
# node, 'sharded', 2, account=shard_ranges[0].account,
|
||||||
|
# container=shard_ranges[0].container, part=shard_part)
|
||||||
|
|
||||||
|
# put an object into the second of the 2 sub-shards so that the shard
|
||||||
|
# will update the root next time the sharder is run; do this before
|
||||||
|
# restarting root node 0 so that the object update is definitely
|
||||||
|
# redirected to a sub-shard by root node 1 or 2.
|
||||||
|
new_obj_name = obj_names[0] + 'a'
|
||||||
|
self.put_objects([new_obj_name])
|
||||||
|
|
||||||
|
# restart root node 0
|
||||||
|
self.brain.servers.start(number=self.brain.node_numbers[0])
|
||||||
|
# node 0 DB doesn't know about the sub-shards
|
||||||
|
root_brokers = [self.get_broker(self.brain.part, node)
|
||||||
|
for node in self.brain.nodes]
|
||||||
|
broker = root_brokers[0]
|
||||||
|
self.assertEqual(
|
||||||
|
[(ShardRange.ACTIVE, False, ShardRange.MIN, obj_names[1]),
|
||||||
|
(ShardRange.ACTIVE, False, obj_names[1], ShardRange.MAX)],
|
||||||
|
[(sr.state, sr.deleted, sr.lower, sr.upper)
|
||||||
|
for sr in broker.get_shard_ranges(include_deleted=True)])
|
||||||
|
|
||||||
|
for broker in root_brokers[1:]:
|
||||||
|
self.assertEqual(
|
||||||
|
[(ShardRange.ACTIVE, False, ShardRange.MIN, obj_names[0]),
|
||||||
|
(ShardRange.ACTIVE, False, obj_names[0], obj_names[1]),
|
||||||
|
(ShardRange.SHARDED, True, ShardRange.MIN, obj_names[1]),
|
||||||
|
(ShardRange.ACTIVE, False, obj_names[1], ShardRange.MAX)],
|
||||||
|
[(sr.state, sr.deleted, sr.lower, sr.upper)
|
||||||
|
for sr in broker.get_shard_ranges(include_deleted=True)])
|
||||||
|
|
||||||
|
sub_shard = root_brokers[1].get_shard_ranges()[1]
|
||||||
|
self.assertEqual(obj_names[0], sub_shard.lower)
|
||||||
|
self.assertEqual(obj_names[1], sub_shard.upper)
|
||||||
|
sub_shard_part, nodes = self.get_part_and_node_numbers(sub_shard)
|
||||||
|
# we want the sub-shard to update root node 0 but not the sharded
|
||||||
|
# shard, but there is a small chance the two will be in same partition
|
||||||
|
# TODO: how can we work around this?
|
||||||
|
self.assertNotEqual(sub_shard_part, shard_part,
|
||||||
|
'You were unlucky, try again')
|
||||||
|
self.sharders_once(additional_args='--partitions=%s' % sub_shard_part)
|
||||||
|
|
||||||
|
# now root node 0 has the original shards plus one of the sub-shards
|
||||||
|
# but all are active :(
|
||||||
|
self.assertEqual(
|
||||||
|
[(ShardRange.ACTIVE, False, ShardRange.MIN, obj_names[1]),
|
||||||
|
# note: overlap!
|
||||||
|
(ShardRange.ACTIVE, False, obj_names[0], obj_names[1]),
|
||||||
|
(ShardRange.ACTIVE, False, obj_names[1], ShardRange.MAX)],
|
||||||
|
[(sr.state, sr.deleted, sr.lower, sr.upper)
|
||||||
|
for sr in root_brokers[0].get_shard_ranges(include_deleted=True)])
|
||||||
|
|
||||||
|
# we are allowed to fix the overlap...
|
||||||
|
msg = self.assert_subprocess_success([
|
||||||
|
'swift-manage-shard-ranges', root_0_db_file, 'repair', '--yes'])
|
||||||
|
self.assertIn(
|
||||||
|
b'Repairs necessary to remove overlapping shard ranges.', msg)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
[(ShardRange.ACTIVE, False, ShardRange.MIN, obj_names[1]),
|
||||||
|
(ShardRange.SHRINKING, False, obj_names[0], obj_names[1]),
|
||||||
|
(ShardRange.ACTIVE, False, obj_names[1], ShardRange.MAX)],
|
||||||
|
[(sr.state, sr.deleted, sr.lower, sr.upper)
|
||||||
|
for sr in root_brokers[0].get_shard_ranges(include_deleted=True)])
|
||||||
|
|
||||||
|
self.sharders_once()
|
||||||
|
self.sharders_once()
|
||||||
|
self.container_replicators.once()
|
||||||
|
|
||||||
|
# boo :'( ... we made gap
|
||||||
|
for broker in root_brokers:
|
||||||
|
self.assertEqual(
|
||||||
|
[(ShardRange.ACTIVE, False, ShardRange.MIN, obj_names[0]),
|
||||||
|
(ShardRange.SHARDED, True, ShardRange.MIN, obj_names[1]),
|
||||||
|
(ShardRange.SHRUNK, True, obj_names[0], obj_names[1]),
|
||||||
|
(ShardRange.ACTIVE, False, obj_names[1], ShardRange.MAX)],
|
||||||
|
[(sr.state, sr.deleted, sr.lower, sr.upper)
|
||||||
|
for sr in broker.get_shard_ranges(include_deleted=True)])
|
||||||
|
|
||||||
|
msg = self.assert_subprocess_success([
|
||||||
|
'swift-manage-shard-ranges', root_0_db_file, 'repair', '--gaps',
|
||||||
|
'--yes'])
|
||||||
|
self.assertIn(b'Repairs necessary to fill gaps.', msg)
|
||||||
|
|
||||||
|
self.sharders_once()
|
||||||
|
self.sharders_once()
|
||||||
|
self.container_replicators.once()
|
||||||
|
|
||||||
|
# yay! we fixed the gap (without creating an overlap)
|
||||||
|
for broker in root_brokers:
|
||||||
|
self.assertEqual(
|
||||||
|
[(ShardRange.ACTIVE, False, ShardRange.MIN, obj_names[0]),
|
||||||
|
(ShardRange.SHARDED, True, ShardRange.MIN, obj_names[1]),
|
||||||
|
(ShardRange.SHRUNK, True, obj_names[0], obj_names[1]),
|
||||||
|
(ShardRange.ACTIVE, False, obj_names[0], ShardRange.MAX)],
|
||||||
|
[(sr.state, sr.deleted, sr.lower, sr.upper)
|
||||||
|
for sr in broker.get_shard_ranges(include_deleted=True)])
|
||||||
|
|
||||||
|
msg = self.assert_subprocess_success([
|
||||||
|
'swift-manage-shard-ranges', root_0_db_file, 'repair',
|
||||||
|
'--dry-run'])
|
||||||
|
self.assertIn(b'No repairs necessary.', msg)
|
||||||
|
msg = self.assert_subprocess_success([
|
||||||
|
'swift-manage-shard-ranges', root_0_db_file, 'repair', '--gaps',
|
||||||
|
'--dry-run'])
|
||||||
|
self.assertIn(b'No repairs necessary.', msg)
|
||||||
|
|
||||||
|
self.assert_container_listing(
|
||||||
|
[obj_names[0], new_obj_name] + obj_names[1:])
|
||||||
|
@ -62,7 +62,8 @@ class TestManageShardRanges(unittest.TestCase):
|
|||||||
]
|
]
|
||||||
|
|
||||||
self.overlap_shard_data_1 = [
|
self.overlap_shard_data_1 = [
|
||||||
{'index': 0, 'lower': '', 'upper': 'obj10', 'object_count': 1},
|
{'index': 0, 'lower': '', 'upper': 'obj10',
|
||||||
|
'object_count': 1},
|
||||||
{'index': 1, 'lower': 'obj10', 'upper': 'obj20',
|
{'index': 1, 'lower': 'obj10', 'upper': 'obj20',
|
||||||
'object_count': 1},
|
'object_count': 1},
|
||||||
{'index': 2, 'lower': 'obj20', 'upper': 'obj30',
|
{'index': 2, 'lower': 'obj20', 'upper': 'obj30',
|
||||||
@ -79,7 +80,8 @@ class TestManageShardRanges(unittest.TestCase):
|
|||||||
'object_count': 1},
|
'object_count': 1},
|
||||||
{'index': 8, 'lower': 'obj78', 'upper': 'obj88',
|
{'index': 8, 'lower': 'obj78', 'upper': 'obj88',
|
||||||
'object_count': 1},
|
'object_count': 1},
|
||||||
{'index': 9, 'lower': 'obj88', 'upper': '', 'object_count': 1},
|
{'index': 9, 'lower': 'obj88', 'upper': '',
|
||||||
|
'object_count': 1},
|
||||||
]
|
]
|
||||||
|
|
||||||
self.overlap_shard_data_2 = [
|
self.overlap_shard_data_2 = [
|
||||||
@ -1074,22 +1076,22 @@ class TestManageShardRanges(unittest.TestCase):
|
|||||||
'Donor shard range(s) with total of 2018 rows:',
|
'Donor shard range(s) with total of 2018 rows:',
|
||||||
" '.shards_a",
|
" '.shards_a",
|
||||||
" objects: 10, tombstones: 999, lower: 'obj29'",
|
" objects: 10, tombstones: 999, lower: 'obj29'",
|
||||||
" state: active, upper: 'obj39'",
|
" state: active, deleted: 0 upper: 'obj39'",
|
||||||
" '.shards_a",
|
" '.shards_a",
|
||||||
" objects: 10, tombstones: 999, lower: 'obj39'",
|
" objects: 10, tombstones: 999, lower: 'obj39'",
|
||||||
" state: active, upper: 'obj49'",
|
" state: active, deleted: 0 upper: 'obj49'",
|
||||||
'can be compacted into acceptor shard range:',
|
'can be compacted into acceptor shard range:',
|
||||||
" '.shards_a",
|
" '.shards_a",
|
||||||
" objects: 100001, tombstones: 999, lower: 'obj49'",
|
" objects: 100001, tombstones: 999, lower: 'obj49'",
|
||||||
" state: active, upper: 'obj59'",
|
" state: active, deleted: 0 upper: 'obj59'",
|
||||||
'Donor shard range(s) with total of 1009 rows:',
|
'Donor shard range(s) with total of 1009 rows:',
|
||||||
" '.shards_a",
|
" '.shards_a",
|
||||||
" objects: 10, tombstones: 999, lower: 'obj69'",
|
" objects: 10, tombstones: 999, lower: 'obj69'",
|
||||||
" state: active, upper: 'obj79'",
|
" state: active, deleted: 0 upper: 'obj79'",
|
||||||
'can be compacted into acceptor shard range:',
|
'can be compacted into acceptor shard range:',
|
||||||
" '.shards_a",
|
" '.shards_a",
|
||||||
" objects: 100001, tombstones: 999, lower: 'obj79'",
|
" objects: 100001, tombstones: 999, lower: 'obj79'",
|
||||||
" state: active, upper: 'obj89'",
|
" state: active, deleted: 0 upper: 'obj89'",
|
||||||
'Total of 2 shard sequences identified for compaction.',
|
'Total of 2 shard sequences identified for compaction.',
|
||||||
'Once applied to the broker these changes will result in '
|
'Once applied to the broker these changes will result in '
|
||||||
'shard range compaction the next time the sharder runs.',
|
'shard range compaction the next time the sharder runs.',
|
||||||
@ -1634,7 +1636,7 @@ class TestManageShardRanges(unittest.TestCase):
|
|||||||
updated_ranges = broker.get_shard_ranges()
|
updated_ranges = broker.get_shard_ranges()
|
||||||
self.assert_shard_ranges_equal([], updated_ranges)
|
self.assert_shard_ranges_equal([], updated_ranges)
|
||||||
|
|
||||||
def test_repair_gaps_one_incomplete_sequence(self):
|
def test_repair_one_incomplete_sequence(self):
|
||||||
broker = self._make_broker()
|
broker = self._make_broker()
|
||||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||||
with mock_timestamp_now(next(self.ts_iter)):
|
with mock_timestamp_now(next(self.ts_iter)):
|
||||||
@ -1656,7 +1658,7 @@ class TestManageShardRanges(unittest.TestCase):
|
|||||||
updated_ranges = broker.get_shard_ranges()
|
updated_ranges = broker.get_shard_ranges()
|
||||||
self.assert_shard_ranges_equal(shard_ranges, updated_ranges)
|
self.assert_shard_ranges_equal(shard_ranges, updated_ranges)
|
||||||
|
|
||||||
def test_repair_gaps_overlapping_incomplete_sequences(self):
|
def test_repair_overlapping_incomplete_sequences(self):
|
||||||
broker = self._make_broker()
|
broker = self._make_broker()
|
||||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||||
with mock_timestamp_now(next(self.ts_iter)):
|
with mock_timestamp_now(next(self.ts_iter)):
|
||||||
@ -1685,6 +1687,374 @@ class TestManageShardRanges(unittest.TestCase):
|
|||||||
key=ShardRange.sort_key)
|
key=ShardRange.sort_key)
|
||||||
self.assert_shard_ranges_equal(expected, updated_ranges)
|
self.assert_shard_ranges_equal(expected, updated_ranges)
|
||||||
|
|
||||||
|
def test_repair_gaps(self):
|
||||||
|
def do_test(missing_index, expander_index, missing_state=None):
|
||||||
|
broker = self._make_broker()
|
||||||
|
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||||
|
for shard in self.shard_data:
|
||||||
|
shard['state'] = ShardRange.ACTIVE
|
||||||
|
with mock_timestamp_now(next(self.ts_iter)):
|
||||||
|
all_shard_ranges = make_shard_ranges(
|
||||||
|
broker, self.shard_data, '.shards_')
|
||||||
|
shard_ranges = list(all_shard_ranges)
|
||||||
|
if missing_state is None:
|
||||||
|
missing_range = shard_ranges.pop(missing_index)
|
||||||
|
exp_gap_contents = []
|
||||||
|
else:
|
||||||
|
missing_range = shard_ranges[missing_index]
|
||||||
|
missing_range.state = missing_state
|
||||||
|
exp_gap_contents = [
|
||||||
|
" '%s'" % missing_range.name, mock.ANY, mock.ANY]
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self.assertTrue(broker.is_root_container())
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock_timestamp_now(next(self.ts_iter)) as ts_now, \
|
||||||
|
mock.patch('sys.stdout', out), \
|
||||||
|
mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'repair', '--gaps', '--yes'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
expander = all_shard_ranges[expander_index]
|
||||||
|
if missing_index < expander_index:
|
||||||
|
expander.lower = missing_range.lower
|
||||||
|
else:
|
||||||
|
expander.upper = missing_range.upper
|
||||||
|
expander.state_timestamp = expander.timestamp
|
||||||
|
expander.meta_timestamp = expander.timestamp
|
||||||
|
expander.timestamp = ts_now
|
||||||
|
self.assertEqual(
|
||||||
|
['Found 1 gaps:',
|
||||||
|
' gap: %r - %r' % (missing_range.lower, missing_range.upper),
|
||||||
|
' apparent gap contents:']
|
||||||
|
+ exp_gap_contents +
|
||||||
|
[' gap can be fixed by expanding neighbor range:',
|
||||||
|
" '%s'" % expander.name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
['',
|
||||||
|
'Repairs necessary to fill gaps.',
|
||||||
|
'The following expanded shard range(s) will be applied to '
|
||||||
|
'the DB:',
|
||||||
|
" '%s'" % expander.name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
['',
|
||||||
|
'It is recommended that no other concurrent changes are made '
|
||||||
|
'to the ',
|
||||||
|
'shard ranges while fixing gaps. If necessary, abort '
|
||||||
|
'this change ',
|
||||||
|
'and stop any auto-sharding processes before repeating '
|
||||||
|
'this command.',
|
||||||
|
'',
|
||||||
|
'Run container-replicator to replicate the changes to '
|
||||||
|
'other nodes.',
|
||||||
|
'Run container-sharder on all nodes to fill gaps.',
|
||||||
|
''],
|
||||||
|
out_lines)
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assert_shard_ranges_equal(shard_ranges, updated_ranges)
|
||||||
|
os.remove(broker.db_file)
|
||||||
|
|
||||||
|
for i in range(len(self.shard_data) - 1):
|
||||||
|
do_test(i, i + 1)
|
||||||
|
|
||||||
|
do_test(len(self.shard_data) - 1, len(self.shard_data) - 2)
|
||||||
|
|
||||||
|
for i in range(len(self.shard_data) - 1):
|
||||||
|
do_test(i, i + 1, ShardRange.SHRINKING)
|
||||||
|
|
||||||
|
do_test(len(self.shard_data) - 1, len(self.shard_data) - 2,
|
||||||
|
ShardRange.SHRINKING)
|
||||||
|
|
||||||
|
def test_repair_gaps_multiple_missing(self):
|
||||||
|
def do_test(broker, max_expanding):
|
||||||
|
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||||
|
states = [
|
||||||
|
ShardRange.ACTIVE,
|
||||||
|
ShardRange.SHRINKING,
|
||||||
|
ShardRange.SHRUNK,
|
||||||
|
ShardRange.ACTIVE,
|
||||||
|
ShardRange.SHRUNK,
|
||||||
|
ShardRange.SHRINKING,
|
||||||
|
ShardRange.ACTIVE,
|
||||||
|
ShardRange.SHRINKING,
|
||||||
|
ShardRange.SHRUNK,
|
||||||
|
ShardRange.SHARDED,
|
||||||
|
]
|
||||||
|
for i, shard in enumerate(self.shard_data):
|
||||||
|
shard['state'] = states[i]
|
||||||
|
if states[i] in (ShardRange.SHRUNK, ShardRange.SHARDED):
|
||||||
|
shard['deleted'] = 1
|
||||||
|
with mock_timestamp_now(next(self.ts_iter)):
|
||||||
|
shard_ranges = make_shard_ranges(
|
||||||
|
broker, self.shard_data, '.shards_')
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self.assertTrue(broker.is_root_container())
|
||||||
|
orig_shard_ranges = broker.get_shard_ranges(include_deleted=True)
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
args = [broker.db_file, 'repair', '--gaps', '--yes']
|
||||||
|
if max_expanding is not None:
|
||||||
|
args.extend(['--max-expanding', str(max_expanding)])
|
||||||
|
with mock_timestamp_now(next(self.ts_iter)) as ts_now, \
|
||||||
|
mock.patch('sys.stdout', out), \
|
||||||
|
mock.patch('sys.stderr', err):
|
||||||
|
ret = main(args)
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
os.remove(broker.db_file)
|
||||||
|
return orig_shard_ranges, out_lines, ts_now
|
||||||
|
|
||||||
|
# max-expanding 1
|
||||||
|
broker = self._make_broker()
|
||||||
|
orig_shard_ranges, out_lines, ts_now = do_test(broker, 1)
|
||||||
|
orig_shard_ranges[3].timestamp = ts_now
|
||||||
|
orig_shard_ranges[3].lower = orig_shard_ranges[1].lower
|
||||||
|
self.assertEqual(
|
||||||
|
['Found 3 gaps:',
|
||||||
|
' gap: %r - %r' % (orig_shard_ranges[1].lower,
|
||||||
|
orig_shard_ranges[2].upper),
|
||||||
|
' apparent gap contents:']
|
||||||
|
+ [mock.ANY] * 6 +
|
||||||
|
[' gap can be fixed by expanding neighbor range:',
|
||||||
|
" '%s'" % orig_shard_ranges[3].name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
[' gap: %r - %r' % (orig_shard_ranges[4].lower,
|
||||||
|
orig_shard_ranges[5].upper),
|
||||||
|
' apparent gap contents:'] +
|
||||||
|
[mock.ANY] * 6 +
|
||||||
|
[' gap can be fixed by expanding neighbor range:',
|
||||||
|
" '%s'" % orig_shard_ranges[6].name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
[' gap: %r - %r' % (orig_shard_ranges[7].lower,
|
||||||
|
orig_shard_ranges[9].upper),
|
||||||
|
' apparent gap contents:'] +
|
||||||
|
[mock.ANY] * 9 +
|
||||||
|
[' gap can be fixed by expanding neighbor range:',
|
||||||
|
" '%s'" % orig_shard_ranges[6].name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
['',
|
||||||
|
'Repairs necessary to fill gaps.',
|
||||||
|
'The following expanded shard range(s) will be applied to the '
|
||||||
|
'DB:',
|
||||||
|
" '%s'" % orig_shard_ranges[3].name] +
|
||||||
|
[mock.ANY] * 6 +
|
||||||
|
['',
|
||||||
|
'Run container-replicator to replicate the changes to '
|
||||||
|
'other nodes.',
|
||||||
|
'Run container-sharder on all nodes to fill gaps.',
|
||||||
|
''],
|
||||||
|
out_lines)
|
||||||
|
updated_ranges = broker.get_shard_ranges(include_deleted=True)
|
||||||
|
self.assert_shard_ranges_equal(
|
||||||
|
sorted(orig_shard_ranges, key=lambda s: s.name),
|
||||||
|
sorted(updated_ranges, key=lambda s: s.name))
|
||||||
|
|
||||||
|
# max-expanding 2
|
||||||
|
broker = self._make_broker()
|
||||||
|
orig_shard_ranges, out_lines, ts_now = do_test(broker, 2)
|
||||||
|
orig_shard_ranges[3].timestamp = ts_now
|
||||||
|
orig_shard_ranges[3].lower = orig_shard_ranges[1].lower
|
||||||
|
orig_shard_ranges[6].timestamp = ts_now
|
||||||
|
orig_shard_ranges[6].lower = orig_shard_ranges[4].lower
|
||||||
|
self.assertEqual(
|
||||||
|
['Found 3 gaps:',
|
||||||
|
' gap: %r - %r' % (orig_shard_ranges[1].lower,
|
||||||
|
orig_shard_ranges[2].upper),
|
||||||
|
' apparent gap contents:'] +
|
||||||
|
[mock.ANY] * 6 +
|
||||||
|
[' gap can be fixed by expanding neighbor range:',
|
||||||
|
" '%s'" % orig_shard_ranges[3].name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
[' gap: %r - %r' % (orig_shard_ranges[4].lower,
|
||||||
|
orig_shard_ranges[5].upper),
|
||||||
|
' apparent gap contents:'] +
|
||||||
|
[mock.ANY] * 6 +
|
||||||
|
[' gap can be fixed by expanding neighbor range:',
|
||||||
|
" '%s'" % orig_shard_ranges[6].name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
[' gap: %r - %r' % (orig_shard_ranges[7].lower,
|
||||||
|
orig_shard_ranges[9].upper),
|
||||||
|
' apparent gap contents:'] +
|
||||||
|
[mock.ANY] * 9 +
|
||||||
|
[' gap can be fixed by expanding neighbor range:',
|
||||||
|
" '%s'" % orig_shard_ranges[6].name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
['',
|
||||||
|
'Repairs necessary to fill gaps.',
|
||||||
|
'The following expanded shard range(s) will be applied to the '
|
||||||
|
'DB:',
|
||||||
|
" '%s'" % orig_shard_ranges[3].name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
[" '%s'" % orig_shard_ranges[6].name] +
|
||||||
|
[mock.ANY] * 6 +
|
||||||
|
['',
|
||||||
|
'Run container-replicator to replicate the changes to '
|
||||||
|
'other nodes.',
|
||||||
|
'Run container-sharder on all nodes to fill gaps.',
|
||||||
|
''],
|
||||||
|
out_lines)
|
||||||
|
updated_ranges = broker.get_shard_ranges(include_deleted=True)
|
||||||
|
self.assert_shard_ranges_equal(
|
||||||
|
sorted(orig_shard_ranges, key=lambda s: s.name),
|
||||||
|
sorted(updated_ranges, key=lambda s: s.name))
|
||||||
|
|
||||||
|
# max-expanding unlimited
|
||||||
|
broker = self._make_broker()
|
||||||
|
orig_shard_ranges, out_lines, ts_now = do_test(broker, None)
|
||||||
|
orig_shard_ranges[3].timestamp = ts_now
|
||||||
|
orig_shard_ranges[3].lower = orig_shard_ranges[1].lower
|
||||||
|
orig_shard_ranges[6].timestamp = ts_now
|
||||||
|
orig_shard_ranges[6].lower = orig_shard_ranges[4].lower
|
||||||
|
orig_shard_ranges[6].upper = orig_shard_ranges[9].upper
|
||||||
|
self.assertEqual(
|
||||||
|
['Found 3 gaps:',
|
||||||
|
' gap: %r - %r' % (orig_shard_ranges[1].lower,
|
||||||
|
orig_shard_ranges[2].upper),
|
||||||
|
' apparent gap contents:'] +
|
||||||
|
[mock.ANY] * 6 +
|
||||||
|
[' gap can be fixed by expanding neighbor range:',
|
||||||
|
" '%s'" % orig_shard_ranges[3].name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
[' gap: %r - %r' % (orig_shard_ranges[4].lower,
|
||||||
|
orig_shard_ranges[5].upper),
|
||||||
|
' apparent gap contents:'] +
|
||||||
|
[mock.ANY] * 6 +
|
||||||
|
[' gap can be fixed by expanding neighbor range:',
|
||||||
|
" '%s'" % orig_shard_ranges[6].name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
[' gap: %r - %r' % (orig_shard_ranges[7].lower,
|
||||||
|
orig_shard_ranges[9].upper),
|
||||||
|
' apparent gap contents:'] +
|
||||||
|
[mock.ANY] * 9 +
|
||||||
|
[' gap can be fixed by expanding neighbor range:',
|
||||||
|
" '%s'" % orig_shard_ranges[6].name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
['',
|
||||||
|
'Repairs necessary to fill gaps.',
|
||||||
|
'The following expanded shard range(s) will be applied to the '
|
||||||
|
'DB:',
|
||||||
|
" '%s'" % orig_shard_ranges[3].name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
[" '%s'" % orig_shard_ranges[6].name] +
|
||||||
|
[mock.ANY] * 6 +
|
||||||
|
['',
|
||||||
|
'Run container-replicator to replicate the changes to '
|
||||||
|
'other nodes.',
|
||||||
|
'Run container-sharder on all nodes to fill gaps.',
|
||||||
|
''],
|
||||||
|
out_lines)
|
||||||
|
updated_ranges = broker.get_shard_ranges(include_deleted=True)
|
||||||
|
self.assert_shard_ranges_equal(
|
||||||
|
sorted(orig_shard_ranges, key=lambda s: s.name),
|
||||||
|
sorted(updated_ranges, key=lambda s: s.name))
|
||||||
|
|
||||||
|
def test_repair_gaps_complete_sequence(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||||
|
for shard in self.shard_data:
|
||||||
|
shard['state'] = ShardRange.ACTIVE
|
||||||
|
with mock_timestamp_now(next(self.ts_iter)):
|
||||||
|
shard_ranges = make_shard_ranges(
|
||||||
|
broker, self.shard_data, '.shards_')
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self.assertTrue(broker.is_root_container())
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock_timestamp_now(next(self.ts_iter)), \
|
||||||
|
mock.patch('sys.stdout', out), \
|
||||||
|
mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'repair', '--gaps', '--yes'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['Found one complete sequence of %d shard ranges with no gaps.'
|
||||||
|
% len(self.shard_data),
|
||||||
|
'No repairs necessary.'], out_lines[:2])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assert_shard_ranges_equal(shard_ranges, updated_ranges)
|
||||||
|
|
||||||
|
def test_repair_gaps_with_overlap(self):
|
||||||
|
# verify that overlaps don't look like gaps
|
||||||
|
broker = self._make_broker()
|
||||||
|
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||||
|
for shard in self.shard_data:
|
||||||
|
shard['state'] = ShardRange.ACTIVE
|
||||||
|
with mock_timestamp_now(next(self.ts_iter)):
|
||||||
|
shard_ranges = make_shard_ranges(
|
||||||
|
broker, self.shard_data, '.shards_')
|
||||||
|
# create a gap
|
||||||
|
shard_ranges[3].state = ShardRange.SHRINKING
|
||||||
|
# create an overlap
|
||||||
|
shard_ranges[5].lower = 'obj45'
|
||||||
|
self.assertLess(shard_ranges[5].lower, shard_ranges[4].upper)
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
orig_shard_ranges = broker.get_shard_ranges()
|
||||||
|
self.assertTrue(broker.is_root_container())
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock_timestamp_now(next(self.ts_iter)) as ts_now, \
|
||||||
|
mock.patch('sys.stdout', out), \
|
||||||
|
mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'repair', '--gaps', '--yes'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['Found 1 gaps:',
|
||||||
|
' gap: %r - %r' % (shard_ranges[3].lower,
|
||||||
|
shard_ranges[3].upper),
|
||||||
|
' apparent gap contents:'] +
|
||||||
|
[mock.ANY] * 3 +
|
||||||
|
[' gap can be fixed by expanding neighbor range:',
|
||||||
|
" '%s'" % shard_ranges[4].name] +
|
||||||
|
[mock.ANY] * 2 +
|
||||||
|
['',
|
||||||
|
'Repairs necessary to fill gaps.',
|
||||||
|
'The following expanded shard range(s) will be applied to the '
|
||||||
|
'DB:',
|
||||||
|
" '%s'" % shard_ranges[4].name] +
|
||||||
|
[mock.ANY] * 6 +
|
||||||
|
['',
|
||||||
|
'Run container-replicator to replicate the changes to '
|
||||||
|
'other nodes.',
|
||||||
|
'Run container-sharder on all nodes to fill gaps.',
|
||||||
|
''],
|
||||||
|
out_lines)
|
||||||
|
orig_shard_ranges[4].lower = shard_ranges[3].lower
|
||||||
|
orig_shard_ranges[4].timestamp = ts_now
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assert_shard_ranges_equal(orig_shard_ranges, updated_ranges)
|
||||||
|
|
||||||
|
def test_repair_gaps_not_root(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
# make broker appear to not be a root container
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
broker.set_sharding_sysmeta('Quoted-Root', 'not_a/c')
|
||||||
|
self.assertFalse(broker.is_root_container())
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'repair', '--gaps'])
|
||||||
|
self.assertEqual(1, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['WARNING: Shard containers cannot be repaired.',
|
||||||
|
'This command should be used on a root container.'],
|
||||||
|
out_lines[:2]
|
||||||
|
)
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assert_shard_ranges_equal(shard_ranges, updated_ranges)
|
||||||
|
|
||||||
def test_repair_not_needed(self):
|
def test_repair_not_needed(self):
|
||||||
broker = self._make_broker()
|
broker = self._make_broker()
|
||||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||||
@ -1978,9 +2348,9 @@ class TestManageShardRanges(unittest.TestCase):
|
|||||||
self.assertEqual(2, cm.exception.code)
|
self.assertEqual(2, cm.exception.code)
|
||||||
err_lines = err.getvalue().split('\n')
|
err_lines = err.getvalue().split('\n')
|
||||||
runner = os.path.basename(sys.argv[0])
|
runner = os.path.basename(sys.argv[0])
|
||||||
self.assertEqual(
|
self.assertIn(
|
||||||
'usage: %s path_to_file repair [-h] [--yes | --dry-run]' % runner,
|
'usage: %s path_to_file repair [-h] [--yes | --dry-run]' % runner,
|
||||||
err_lines[0])
|
err_lines[0])
|
||||||
self.assertIn(
|
self.assertIn(
|
||||||
"argument --yes/-y: not allowed with argument --dry-run/-n",
|
"argument --yes/-y: not allowed with argument --dry-run/-n",
|
||||||
err_lines[1])
|
err_lines[-2], err_lines)
|
||||||
|
@ -42,7 +42,8 @@ from swift.container.sharder import ContainerSharder, sharding_enabled, \
|
|||||||
CleavingContext, DEFAULT_SHARDER_CONF, finalize_shrinking, \
|
CleavingContext, DEFAULT_SHARDER_CONF, finalize_shrinking, \
|
||||||
find_shrinking_candidates, process_compactible_shard_sequences, \
|
find_shrinking_candidates, process_compactible_shard_sequences, \
|
||||||
find_compactible_shard_sequences, is_shrinking_candidate, \
|
find_compactible_shard_sequences, is_shrinking_candidate, \
|
||||||
is_sharding_candidate, find_paths, rank_paths, ContainerSharderConf
|
is_sharding_candidate, find_paths, rank_paths, ContainerSharderConf, \
|
||||||
|
find_paths_with_gaps
|
||||||
from swift.common.utils import ShardRange, Timestamp, hash_path, \
|
from swift.common.utils import ShardRange, Timestamp, hash_path, \
|
||||||
encode_timestamps, parse_db_filename, quorum_size, Everything, md5
|
encode_timestamps, parse_db_filename, quorum_size, Everything, md5
|
||||||
from test import annotate_failure
|
from test import annotate_failure
|
||||||
@ -7446,6 +7447,40 @@ class TestSharderFunctions(BaseTestSharder):
|
|||||||
rank_paths(paths, own_sr)
|
rank_paths(paths, own_sr)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_find_paths_with_gaps(self):
|
||||||
|
bounds = (
|
||||||
|
# gap
|
||||||
|
('a', 'f'),
|
||||||
|
('f', 'm'), # overlap
|
||||||
|
('k', 'p'),
|
||||||
|
# gap
|
||||||
|
('q', 'y')
|
||||||
|
# gap
|
||||||
|
)
|
||||||
|
ranges = self._make_shard_ranges(
|
||||||
|
bounds, ShardRange.ACTIVE,
|
||||||
|
timestamp=next(self.ts_iter), object_count=1)
|
||||||
|
paths_with_gaps = find_paths_with_gaps(ranges)
|
||||||
|
self.assertEqual(3, len(paths_with_gaps))
|
||||||
|
self.assertEqual(
|
||||||
|
[(ShardRange.MIN, ShardRange.MIN),
|
||||||
|
(ShardRange.MIN, 'a'),
|
||||||
|
('a', 'm')],
|
||||||
|
[(r.lower, r.upper) for r in paths_with_gaps[0]]
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
[('k', 'p'),
|
||||||
|
('p', 'q'),
|
||||||
|
('q', 'y')],
|
||||||
|
[(r.lower, r.upper) for r in paths_with_gaps[1]]
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
[('q', 'y'),
|
||||||
|
('y', ShardRange.MAX),
|
||||||
|
(ShardRange.MAX, ShardRange.MAX)],
|
||||||
|
[(r.lower, r.upper) for r in paths_with_gaps[2]]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestContainerSharderConf(unittest.TestCase):
|
class TestContainerSharderConf(unittest.TestCase):
|
||||||
def test_default(self):
|
def test_default(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user