Merge "swift-manage-shard-ranges: add repair and analyze commands"
This commit is contained in:
commit
ce847afa06
@ -165,11 +165,12 @@ import time
|
||||
from six.moves import input
|
||||
|
||||
from swift.common.utils import Timestamp, get_logger, ShardRange, readconf, \
|
||||
config_percent_value, config_positive_int_value
|
||||
config_percent_value, config_positive_int_value, ShardRangeList
|
||||
from swift.container.backend import ContainerBroker, UNSHARDED
|
||||
from swift.container.sharder import make_shard_ranges, sharding_enabled, \
|
||||
CleavingContext, process_compactible_shard_sequences, \
|
||||
find_compactible_shard_sequences, find_overlapping_ranges, \
|
||||
find_paths, rank_paths, finalize_shrinking, \
|
||||
DEFAULT_MAX_SHRINKING, DEFAULT_MAX_EXPANDING, \
|
||||
DEFAULT_SHARD_CONTAINER_THRESHOLD, DEFAULT_SHARD_SHRINK_POINT, \
|
||||
DEFAULT_SHARD_MERGE_POINT
|
||||
@ -179,6 +180,25 @@ DEFAULT_SHRINK_THRESHOLD = DEFAULT_SHARD_CONTAINER_THRESHOLD * \
|
||||
config_percent_value(DEFAULT_SHARD_SHRINK_POINT)
|
||||
|
||||
|
||||
class ManageShardRangesException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class GapsFoundException(ManageShardRangesException):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidStateException(ManageShardRangesException):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidSolutionException(ManageShardRangesException):
|
||||
def __init__(self, msg, acceptor_path, overlapping_donors):
|
||||
super(InvalidSolutionException, self).__init__(msg)
|
||||
self.acceptor_path = acceptor_path
|
||||
self.overlapping_donors = overlapping_donors
|
||||
|
||||
|
||||
def _print_shard_range(sr, level=0):
|
||||
indent = ' ' * level
|
||||
print(indent + '%r' % sr.name)
|
||||
@ -187,16 +207,19 @@ def _print_shard_range(sr, level=0):
|
||||
print(indent + ' state: %9s upper: %r' % (sr.state_text, sr.upper_str))
|
||||
|
||||
|
||||
def _load_and_validate_shard_data(args):
|
||||
def _load_and_validate_shard_data(args, require_index=True):
|
||||
required_keys = ['lower', 'upper', 'object_count']
|
||||
if require_index:
|
||||
required_keys.append('index')
|
||||
try:
|
||||
with open(args.input, 'r') as fd:
|
||||
try:
|
||||
data = json.load(fd)
|
||||
if not isinstance(data, list):
|
||||
raise ValueError('Shard data must be a list of dicts')
|
||||
for k in ('lower', 'upper', 'index', 'object_count'):
|
||||
for k in required_keys:
|
||||
for shard in data:
|
||||
shard[k]
|
||||
shard[k] # trigger KeyError for missing required key
|
||||
return data
|
||||
except (TypeError, ValueError, KeyError) as err:
|
||||
print('Failed to load valid shard range data: %r' % err,
|
||||
@ -473,8 +496,8 @@ def compact_shard_ranges(broker, args):
|
||||
_print_shard_range(acceptor, level=1)
|
||||
print('Once applied to the broker these changes will result in shard '
|
||||
'range compaction the next time the sharder runs.')
|
||||
choice = input('Do you want to apply these changes? [y/N]')
|
||||
if choice != 'y':
|
||||
choice = input('Do you want to apply these changes? [yes/N]')
|
||||
if choice != 'yes':
|
||||
print('No changes applied')
|
||||
return 0
|
||||
|
||||
@ -486,6 +509,160 @@ def compact_shard_ranges(broker, args):
|
||||
return 0
|
||||
|
||||
|
||||
def _find_overlapping_donors(shard_ranges, own_sr, args):
|
||||
shard_ranges = ShardRangeList(shard_ranges)
|
||||
if ShardRange.SHARDING in shard_ranges.states:
|
||||
# This may be over-cautious, but for now we'll avoid dealing with
|
||||
# SHARDING shards (which by design will temporarily overlap with their
|
||||
# sub-shards) and require repair to be re-tried once sharding has
|
||||
# completed. Note that once a shard ranges moves from SHARDING to
|
||||
# SHARDED state and is deleted, some replicas of the shard may still be
|
||||
# in the process of sharding but we cannot detect that at the root.
|
||||
raise InvalidStateException('Found shard ranges in sharding state')
|
||||
if ShardRange.SHRINKING in shard_ranges.states:
|
||||
# Also stop now if there are SHRINKING shard ranges: we would need to
|
||||
# ensure that these were not chosen as acceptors, but for now it is
|
||||
# simpler to require repair to be re-tried once shrinking has
|
||||
# completes.
|
||||
raise InvalidStateException('Found shard ranges in shrinking state')
|
||||
|
||||
paths = find_paths(shard_ranges)
|
||||
ranked_paths = rank_paths(paths, own_sr)
|
||||
if not (ranked_paths and ranked_paths[0].includes(own_sr)):
|
||||
# individual paths do not have gaps within them; if no path spans the
|
||||
# entire namespace then there must be a gap in the shard_ranges
|
||||
raise GapsFoundException
|
||||
|
||||
# simple repair strategy: choose the highest ranked complete sequence and
|
||||
# shrink all other shard ranges into it
|
||||
acceptor_path = ranked_paths[0]
|
||||
acceptor_names = set(sr.name for sr in acceptor_path)
|
||||
overlapping_donors = ShardRangeList([sr for sr in shard_ranges
|
||||
if sr.name not in acceptor_names])
|
||||
|
||||
# check that the solution makes sense: if the acceptor path has the most
|
||||
# progressed continuous cleaving, which has reached cleaved_upper, then we
|
||||
# don't expect any shard ranges beyond cleaved_upper to be in states
|
||||
# CLEAVED or ACTIVE, otherwise there should have been a better acceptor
|
||||
# path that reached them.
|
||||
cleaved_states = {ShardRange.CLEAVED, ShardRange.ACTIVE}
|
||||
cleaved_upper = acceptor_path.find_lower(
|
||||
lambda sr: sr.state not in cleaved_states)
|
||||
beyond_cleaved = acceptor_path.filter(marker=cleaved_upper)
|
||||
if beyond_cleaved.states.intersection(cleaved_states):
|
||||
raise InvalidSolutionException(
|
||||
'Isolated cleaved and/or active shard ranges in acceptor path',
|
||||
acceptor_path, overlapping_donors)
|
||||
beyond_cleaved = overlapping_donors.filter(marker=cleaved_upper)
|
||||
if beyond_cleaved.states.intersection(cleaved_states):
|
||||
raise InvalidSolutionException(
|
||||
'Isolated cleaved and/or active shard ranges in donor ranges',
|
||||
acceptor_path, overlapping_donors)
|
||||
|
||||
return acceptor_path, overlapping_donors
|
||||
|
||||
|
||||
def print_repair_solution(acceptor_path, overlapping_donors):
|
||||
print('Donors:')
|
||||
for donor in sorted(overlapping_donors):
|
||||
_print_shard_range(donor, level=1)
|
||||
print('Acceptors:')
|
||||
for acceptor in acceptor_path:
|
||||
_print_shard_range(acceptor, level=1)
|
||||
|
||||
|
||||
def find_repair_solution(shard_ranges, own_sr, args):
|
||||
try:
|
||||
acceptor_path, overlapping_donors = _find_overlapping_donors(
|
||||
shard_ranges, own_sr, args)
|
||||
except GapsFoundException:
|
||||
print('Found no complete sequence of shard ranges.')
|
||||
print('Repairs necessary to fill gaps.')
|
||||
print('Gap filling not supported by this tool. No repairs performed.')
|
||||
raise
|
||||
except InvalidStateException as exc:
|
||||
print('WARNING: %s' % exc)
|
||||
print('No repairs performed.')
|
||||
raise
|
||||
except InvalidSolutionException as exc:
|
||||
print('ERROR: %s' % exc)
|
||||
print_repair_solution(exc.acceptor_path, exc.overlapping_donors)
|
||||
print('No repairs performed.')
|
||||
raise
|
||||
|
||||
if not overlapping_donors:
|
||||
print('Found one complete sequence of %d shard ranges and no '
|
||||
'overlapping shard ranges.' % len(acceptor_path))
|
||||
print('No repairs necessary.')
|
||||
return None, None
|
||||
|
||||
print('Repairs necessary to remove overlapping shard ranges.')
|
||||
print('Chosen a complete sequence of %d shard ranges with current total '
|
||||
'of %d object records to accept object records from %d overlapping '
|
||||
'donor shard ranges.' %
|
||||
(len(acceptor_path), acceptor_path.object_count,
|
||||
len(overlapping_donors)))
|
||||
if args.verbose:
|
||||
print_repair_solution(acceptor_path, overlapping_donors)
|
||||
|
||||
print('Once applied to the broker these changes will result in:')
|
||||
print(' %d shard ranges being removed.' % len(overlapping_donors))
|
||||
print(' %d object records being moved to the chosen shard ranges.'
|
||||
% overlapping_donors.object_count)
|
||||
|
||||
return acceptor_path, overlapping_donors
|
||||
|
||||
|
||||
def repair_shard_ranges(broker, args):
|
||||
if not broker.is_root_container():
|
||||
print('WARNING: Shard containers cannot be repaired.')
|
||||
print('This command should be used on a root container.')
|
||||
return 2
|
||||
|
||||
shard_ranges = broker.get_shard_ranges()
|
||||
if not shard_ranges:
|
||||
print('No shards found, nothing to do.')
|
||||
return 0
|
||||
|
||||
own_sr = broker.get_own_shard_range()
|
||||
try:
|
||||
acceptor_path, overlapping_donors = find_repair_solution(
|
||||
shard_ranges, own_sr, args)
|
||||
except ManageShardRangesException:
|
||||
return 1
|
||||
|
||||
if not acceptor_path:
|
||||
return 0
|
||||
|
||||
if not args.yes:
|
||||
choice = input('Do you want to apply these changes to the container '
|
||||
'DB? [yes/N]')
|
||||
if choice != 'yes':
|
||||
print('No changes applied')
|
||||
return 0
|
||||
|
||||
# merge changes to the broker...
|
||||
# note: acceptors do not need to be modified since they already span the
|
||||
# complete range
|
||||
ts_now = Timestamp.now()
|
||||
finalize_shrinking(broker, [], overlapping_donors, ts_now)
|
||||
print('Updated %s donor shard ranges.' % len(overlapping_donors))
|
||||
print('Run container-replicator to replicate the changes to other nodes.')
|
||||
print('Run container-sharder on all nodes to repair shards.')
|
||||
return 0
|
||||
|
||||
|
||||
def analyze_shard_ranges(args):
|
||||
shard_data = _load_and_validate_shard_data(args, require_index=False)
|
||||
shard_ranges = [ShardRange.from_dict(data) for data in shard_data]
|
||||
whole_sr = ShardRange('whole/namespace', 0)
|
||||
try:
|
||||
find_repair_solution(shard_ranges, whole_sr, args)
|
||||
except ManageShardRangesException:
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
def _positive_int(arg):
|
||||
val = int(arg)
|
||||
if val <= 0:
|
||||
@ -519,14 +696,29 @@ def _add_enable_args(parser):
|
||||
help='DB timeout to use when enabling sharding.')
|
||||
|
||||
|
||||
def _add_yes_arg(parser):
|
||||
parser.add_argument(
|
||||
'--yes', '-y', action='store_true', default=False,
|
||||
help='Apply shard range changes to broker without prompting.')
|
||||
|
||||
|
||||
def _make_parser():
|
||||
parser = argparse.ArgumentParser(description='Manage shard ranges')
|
||||
parser.add_argument('container_db')
|
||||
parser.add_argument('path_to_file',
|
||||
help='Path to a container DB file or, for the analyze '
|
||||
'subcommand, a shard data file.')
|
||||
parser.add_argument('--config', dest='conf_file', required=False,
|
||||
help='Path to config file with [container-sharder] '
|
||||
'section')
|
||||
parser.add_argument('--verbose', '-v', action='count', default=0,
|
||||
help='Increase output verbosity')
|
||||
# this is useful for probe tests that shard containers with unrealistically
|
||||
# low numbers of objects, of which a significant proportion may still be in
|
||||
# the pending file
|
||||
parser.add_argument(
|
||||
'--force-commits', action='store_true', default=False,
|
||||
help='Force broker to commit pending object updates before finding '
|
||||
'shard ranges. By default the broker will skip commits.')
|
||||
subparsers = parser.add_subparsers(
|
||||
dest='subcommand', help='Sub-command help', title='Sub-commands')
|
||||
|
||||
@ -595,9 +787,7 @@ def _make_parser():
|
||||
'compact',
|
||||
help='Compact shard ranges with less than the shrink-threshold number '
|
||||
'of rows. This command only works on root containers.')
|
||||
compact_parser.add_argument(
|
||||
'--yes', '-y', action='store_true', default=False,
|
||||
help='Apply shard range changes to broker without prompting.')
|
||||
_add_yes_arg(compact_parser)
|
||||
compact_parser.add_argument('--shrink-threshold', nargs='?',
|
||||
type=_positive_int,
|
||||
default=None,
|
||||
@ -633,6 +823,21 @@ def _make_parser():
|
||||
'expanded. Defaults to unlimited.')
|
||||
compact_parser.set_defaults(func=compact_shard_ranges)
|
||||
|
||||
# repair
|
||||
repair_parser = subparsers.add_parser(
|
||||
'repair',
|
||||
help='Repair overlapping shard ranges. No action will be taken '
|
||||
'without user confirmation unless the -y option is used.')
|
||||
_add_yes_arg(repair_parser)
|
||||
repair_parser.set_defaults(func=repair_shard_ranges)
|
||||
|
||||
# analyze
|
||||
analyze_parser = subparsers.add_parser(
|
||||
'analyze',
|
||||
help='Analyze shard range json data read from file. Use -v to see '
|
||||
'more detailed analysis.')
|
||||
analyze_parser.set_defaults(func=analyze_shard_ranges)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
@ -648,6 +853,7 @@ def main(args=None):
|
||||
parser.print_help()
|
||||
print('\nA sub-command is required.')
|
||||
return 1
|
||||
|
||||
conf = {}
|
||||
rows_per_shard = DEFAULT_ROWS_PER_SHARD
|
||||
shrink_threshold = DEFAULT_SHRINK_THRESHOLD
|
||||
@ -688,16 +894,21 @@ def main(args=None):
|
||||
if "rows_per_shard" in args and args.rows_per_shard is None:
|
||||
args.rows_per_shard = rows_per_shard
|
||||
|
||||
logger = get_logger(conf, name='ContainerBroker', log_to_console=True)
|
||||
broker = ContainerBroker(os.path.realpath(args.container_db),
|
||||
logger=logger, skip_commits=True)
|
||||
if args.func in (analyze_shard_ranges,):
|
||||
args.input = args.path_to_file
|
||||
return args.func(args) or 0
|
||||
|
||||
logger = get_logger({}, name='ContainerBroker', log_to_console=True)
|
||||
broker = ContainerBroker(os.path.realpath(args.path_to_file),
|
||||
logger=logger,
|
||||
skip_commits=not args.force_commits)
|
||||
try:
|
||||
broker.get_info()
|
||||
except Exception as exc:
|
||||
print('Error opening container DB %s: %s' % (args.container_db, exc),
|
||||
print('Error opening container DB %s: %s' % (args.path_to_file, exc),
|
||||
file=sys.stderr)
|
||||
return 2
|
||||
print('Loaded db broker for %s.' % broker.path, file=sys.stderr)
|
||||
print('Loaded db broker for %s' % broker.path, file=sys.stderr)
|
||||
return args.func(broker, args)
|
||||
|
||||
|
||||
|
@ -5325,17 +5325,19 @@ class ShardRange(object):
|
||||
valid state number.
|
||||
"""
|
||||
try:
|
||||
state = state.lower()
|
||||
state_num = cls.STATES_BY_NAME[state]
|
||||
except (KeyError, AttributeError):
|
||||
try:
|
||||
state_name = cls.STATES[state]
|
||||
except KeyError:
|
||||
raise ValueError('Invalid state %r' % state)
|
||||
else:
|
||||
state_num = state
|
||||
else:
|
||||
state_name = state
|
||||
# maybe it's a number
|
||||
float_state = float(state)
|
||||
state_num = int(float_state)
|
||||
if state_num != float_state:
|
||||
raise ValueError('Invalid state %r' % state)
|
||||
state_name = cls.STATES[state_num]
|
||||
except (ValueError, TypeError):
|
||||
# maybe it's a state name
|
||||
state_name = state.lower()
|
||||
state_num = cls.STATES_BY_NAME[state_name]
|
||||
except (KeyError, AttributeError):
|
||||
raise ValueError('Invalid state %r' % state)
|
||||
return state_num, state_name
|
||||
|
||||
@property
|
||||
@ -5344,14 +5346,7 @@ class ShardRange(object):
|
||||
|
||||
@state.setter
|
||||
def state(self, state):
|
||||
try:
|
||||
float_state = float(state)
|
||||
int_state = int(float_state)
|
||||
except (ValueError, TypeError):
|
||||
raise ValueError('Invalid state %r' % state)
|
||||
if int_state != float_state or int_state not in self.STATES:
|
||||
raise ValueError('Invalid state %r' % state)
|
||||
self._state = int_state
|
||||
self._state = self.resolve_state(state)[0]
|
||||
|
||||
@property
|
||||
def state_text(self):
|
||||
@ -5639,6 +5634,14 @@ class ShardRangeList(UserList):
|
||||
"""
|
||||
return sum(sr.bytes_used for sr in self)
|
||||
|
||||
@property
|
||||
def timestamps(self):
|
||||
return set(sr.timestamp for sr in self)
|
||||
|
||||
@property
|
||||
def states(self):
|
||||
return set(sr.state for sr in self)
|
||||
|
||||
def includes(self, other):
|
||||
"""
|
||||
Check if another ShardRange namespace is enclosed between the list's
|
||||
@ -5656,6 +5659,44 @@ class ShardRangeList(UserList):
|
||||
"""
|
||||
return self.lower <= other.lower and self.upper >= other.upper
|
||||
|
||||
def filter(self, includes=None, marker=None, end_marker=None):
|
||||
"""
|
||||
Filter the list for those shard ranges whose namespace includes the
|
||||
``includes`` name or any part of the namespace between ``marker`` and
|
||||
``end_marker``. If none of ``includes``, ``marker`` or ``end_marker``
|
||||
are specified then all shard ranges will be returned.
|
||||
|
||||
:param includes: a string; if not empty then only the shard range, if
|
||||
any, whose namespace includes this string will be returned, and
|
||||
``marker`` and ``end_marker`` will be ignored.
|
||||
:param marker: if specified then only shard ranges whose upper bound is
|
||||
greater than this value will be returned.
|
||||
:param end_marker: if specified then only shard ranges whose lower
|
||||
bound is less than this value will be returned.
|
||||
:return: A new instance of :class:`~swift.common.utils.ShardRangeList`
|
||||
containing the filtered shard ranges.
|
||||
"""
|
||||
return ShardRangeList(
|
||||
filter_shard_ranges(self, includes, marker, end_marker))
|
||||
|
||||
def find_lower(self, condition):
|
||||
"""
|
||||
Finds the first shard range satisfies the given condition and returns
|
||||
its lower bound.
|
||||
|
||||
:param condition: A function that must accept a single argument of type
|
||||
:class:`~swift.common.utils.ShardRange` and return True if the
|
||||
shard range satisfies the condition or False otherwise.
|
||||
:return: The lower bound of the first shard range to satisfy the
|
||||
condition, or the ``upper`` value of this list if no such shard
|
||||
range is found.
|
||||
|
||||
"""
|
||||
for sr in self:
|
||||
if condition(sr):
|
||||
return sr.lower
|
||||
return self.upper
|
||||
|
||||
|
||||
def find_shard_range(item, ranges):
|
||||
"""
|
||||
@ -5674,6 +5715,22 @@ def find_shard_range(item, ranges):
|
||||
|
||||
|
||||
def filter_shard_ranges(shard_ranges, includes, marker, end_marker):
|
||||
"""
|
||||
Filter the given shard ranges to those whose namespace includes the
|
||||
``includes`` name or any part of the namespace between ``marker`` and
|
||||
``end_marker``. If none of ``includes``, ``marker`` or ``end_marker`` are
|
||||
specified then all shard ranges will be returned.
|
||||
|
||||
:param shard_ranges: A list of :class:`~swift.common.utils.ShardRange`.
|
||||
:param includes: a string; if not empty then only the shard range, if any,
|
||||
whose namespace includes this string will be returned, and ``marker``
|
||||
and ``end_marker`` will be ignored.
|
||||
:param marker: if specified then only shard ranges whose upper bound is
|
||||
greater than this value will be returned.
|
||||
:param end_marker: if specified then only shard ranges whose lower bound is
|
||||
less than this value will be returned.
|
||||
:return: A filtered list of :class:`~swift.common.utils.ShardRange`.
|
||||
"""
|
||||
if includes:
|
||||
shard_range = find_shard_range(includes, shard_ranges)
|
||||
return [shard_range] if shard_range else []
|
||||
@ -5689,6 +5746,10 @@ def filter_shard_ranges(shard_ranges, includes, marker, end_marker):
|
||||
if marker or end_marker:
|
||||
return list(filter(shard_range_filter, shard_ranges))
|
||||
|
||||
if marker == ShardRange.MAX or end_marker == ShardRange.MIN:
|
||||
# MIN and MAX are both Falsy so not handled by shard_range_filter
|
||||
return []
|
||||
|
||||
return shard_ranges
|
||||
|
||||
|
||||
|
@ -32,7 +32,7 @@ from swift.common.utils import Timestamp, encode_timestamps, \
|
||||
decode_timestamps, extract_swift_bytes, storage_directory, hash_path, \
|
||||
ShardRange, renamer, MD5_OF_EMPTY_STRING, mkdirs, get_db_files, \
|
||||
parse_db_filename, make_db_file_path, split_path, RESERVED_BYTE, \
|
||||
filter_shard_ranges
|
||||
filter_shard_ranges, ShardRangeList
|
||||
from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
|
||||
zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT
|
||||
|
||||
@ -1388,7 +1388,7 @@ class ContainerBroker(DatabaseBroker):
|
||||
"""
|
||||
if not shard_ranges:
|
||||
return
|
||||
if not isinstance(shard_ranges, list):
|
||||
if not isinstance(shard_ranges, (list, ShardRangeList)):
|
||||
shard_ranges = [shard_ranges]
|
||||
|
||||
item_list = []
|
||||
|
@ -12,7 +12,7 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import errno
|
||||
import json
|
||||
import time
|
||||
@ -339,6 +339,106 @@ def process_compactible_shard_sequences(broker, sequences):
|
||||
finalize_shrinking(broker, acceptor_ranges, shrinking_ranges, timestamp)
|
||||
|
||||
|
||||
def find_paths(shard_ranges):
|
||||
"""
|
||||
Returns a list of all continuous paths through the shard ranges. An
|
||||
individual path may not necessarily span the entire namespace, but it will
|
||||
span a continuous namespace without gaps.
|
||||
|
||||
:param shard_ranges: A list of :class:`~swift.common.utils.ShardRange`.
|
||||
:return: A list of :class:`~swift.common.utils.ShardRangeList`.
|
||||
"""
|
||||
# A node is a point in the namespace that is used as a bound of any shard
|
||||
# range. Shard ranges form the edges between nodes.
|
||||
|
||||
# First build a dict mapping nodes to a list of edges that leave that node
|
||||
# (in other words, shard ranges whose lower bound equals the node)
|
||||
node_successors = collections.defaultdict(list)
|
||||
for shard_range in shard_ranges:
|
||||
if shard_range.state == ShardRange.SHRINKING:
|
||||
# shrinking shards are not a viable edge in any path
|
||||
continue
|
||||
node_successors[shard_range.lower].append(shard_range)
|
||||
|
||||
paths = []
|
||||
|
||||
def clone_path(other=None):
|
||||
# create a new path, possibly cloning another path, and add it to the
|
||||
# list of all paths through the shards
|
||||
path = ShardRangeList() if other is None else ShardRangeList(other)
|
||||
paths.append(path)
|
||||
return path
|
||||
|
||||
# we need to keep track of every path that ends at each node so that when
|
||||
# we visit the node we can extend those paths, or clones of them, with the
|
||||
# edges that leave the node
|
||||
paths_to_node = collections.defaultdict(list)
|
||||
|
||||
# visit the nodes in ascending order by name...
|
||||
for node, edges in sorted(node_successors.items()):
|
||||
if not edges:
|
||||
# this node is a dead-end, so there's no path updates to make
|
||||
continue
|
||||
if not paths_to_node[node]:
|
||||
# this is either the first node to be visited, or it has no paths
|
||||
# leading to it, so we need to start a new path here
|
||||
paths_to_node[node].append(clone_path([]))
|
||||
for path_to_node in paths_to_node[node]:
|
||||
# extend each path that arrives at this node with all of the
|
||||
# possible edges that leave the node; if more than edge leaves the
|
||||
# node then we will make clones of the path to the node and extend
|
||||
# those clones, adding to the collection of all paths though the
|
||||
# shards
|
||||
for i, edge in enumerate(edges):
|
||||
if i == len(edges) - 1:
|
||||
# the last edge is used to extend the original path to the
|
||||
# node; there is nothing special about the last edge, but
|
||||
# doing this last means the original path to the node can
|
||||
# be cloned for all other edges before being modified here
|
||||
path = path_to_node
|
||||
else:
|
||||
# for all but one of the edges leaving the node we need to
|
||||
# make a clone the original path
|
||||
path = clone_path(path_to_node)
|
||||
# extend the path with the edge
|
||||
path.append(edge)
|
||||
# keep track of which node this path now arrives at
|
||||
paths_to_node[edge.upper].append(path)
|
||||
return paths
|
||||
|
||||
|
||||
def rank_paths(paths, shard_range_to_span):
|
||||
"""
|
||||
Sorts the given list of paths such that the most preferred path is the
|
||||
first item in the list.
|
||||
|
||||
:param paths: A list of :class:`~swift.common.utils.ShardRangeList`.
|
||||
:param shard_range_to_span: An instance of
|
||||
:class:`~swift.common.utils.ShardRange` that describes the namespace
|
||||
that would ideally be spanned by a path. Paths that include this
|
||||
namespace will be preferred over those that do not.
|
||||
:return: A sorted list of :class:`~swift.common.utils.ShardRangeList`.
|
||||
"""
|
||||
def sort_key(path):
|
||||
# defines the order of preference for paths through shards
|
||||
return (
|
||||
# complete path for the namespace
|
||||
path.includes(shard_range_to_span),
|
||||
# most cleaving progress
|
||||
path.find_lower(lambda sr: sr.state not in (
|
||||
ShardRange.CLEAVED, ShardRange.ACTIVE)),
|
||||
# largest object count
|
||||
path.object_count,
|
||||
# fewest timestamps
|
||||
-1 * len(path.timestamps),
|
||||
# newest timestamp
|
||||
sorted(path.timestamps)[-1]
|
||||
)
|
||||
|
||||
paths.sort(key=sort_key, reverse=True)
|
||||
return paths
|
||||
|
||||
|
||||
class CleavingContext(object):
|
||||
def __init__(self, ref, cursor='', max_row=None, cleave_to_row=None,
|
||||
last_cleave_to_row=None, cleaving_done=False,
|
||||
|
@ -169,13 +169,16 @@ class BaseTestContainerSharding(ReplProbeTest):
|
||||
else:
|
||||
conn.delete_object(self.container_name, obj)
|
||||
|
||||
def get_container_shard_ranges(self, account=None, container=None):
|
||||
def get_container_shard_ranges(self, account=None, container=None,
|
||||
include_deleted=False):
|
||||
account = account if account else self.account
|
||||
container = container if container else self.container_to_shard
|
||||
path = self.internal_client.make_path(account, container)
|
||||
headers = {'X-Backend-Record-Type': 'shard'}
|
||||
if include_deleted:
|
||||
headers['X-Backend-Include-Deleted'] = 'true'
|
||||
resp = self.internal_client.make_request(
|
||||
'GET', path + '?format=json', {'X-Backend-Record-Type': 'shard'},
|
||||
[200])
|
||||
'GET', path + '?format=json', headers, [200])
|
||||
return [ShardRange.from_dict(sr) for sr in json.loads(resp.body)]
|
||||
|
||||
def direct_get_container_shard_ranges(self, account=None, container=None,
|
||||
@ -371,6 +374,21 @@ class BaseTestContainerSharding(ReplProbeTest):
|
||||
expected_state, headers['X-Backend-Sharding-State'])
|
||||
return [ShardRange.from_dict(sr) for sr in shard_ranges]
|
||||
|
||||
def assert_subprocess_success(self, cmd_args):
|
||||
try:
|
||||
subprocess.check_output(cmd_args, stderr=subprocess.STDOUT)
|
||||
except Exception as exc:
|
||||
# why not 'except CalledProcessError'? because in my py3.6 tests
|
||||
# the CalledProcessError wasn't caught by that! despite type(exc)
|
||||
# being a CalledProcessError, isinstance(exc, CalledProcessError)
|
||||
# is False and the type has a different hash - could be
|
||||
# related to https://github.com/eventlet/eventlet/issues/413
|
||||
try:
|
||||
# assume this is a CalledProcessError
|
||||
self.fail('%s with output:\n%s' % (exc, exc.output))
|
||||
except AttributeError:
|
||||
raise exc
|
||||
|
||||
def get_part_and_node_numbers(self, shard_range):
|
||||
"""Return the partition and node numbers for a shard range."""
|
||||
part, nodes = self.brain.ring.get_nodes(
|
||||
@ -2841,7 +2859,8 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
||||
self.assertEqual(ShardRange.ACTIVE,
|
||||
broker.get_own_shard_range().state)
|
||||
|
||||
def test_manage_shard_ranges_used_poorly(self):
|
||||
def test_manage_shard_ranges_repair_root(self):
|
||||
# provoke overlaps in root container and repair
|
||||
obj_names = self._make_object_names(8)
|
||||
self.put_objects(obj_names)
|
||||
|
||||
@ -2917,18 +2936,12 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
||||
# horribly out of date as more objects are added
|
||||
self.assert_container_listing(obj_names)
|
||||
|
||||
# Let's pretend that some actor in the system has determined that the
|
||||
# second set of 3 shard ranges (1.*) are correct and the first set of 4
|
||||
# (0.*) are not desired, so shrink shard ranges 0.*. We've already
|
||||
# checked they are in cleaved or created state so it's ok to move them
|
||||
# to shrinking.
|
||||
# TODO: replace this db manipulation if/when manage_shard_ranges can
|
||||
# manage shrinking...
|
||||
for sr in shard_ranges_0:
|
||||
self.assertTrue(sr.update_state(ShardRange.SHRINKING))
|
||||
sr.epoch = sr.state_timestamp = Timestamp.now()
|
||||
broker = self.get_broker(self.brain.part, self.brain.nodes[0])
|
||||
broker.merge_shard_ranges(shard_ranges_0)
|
||||
# 'swift-manage-shard-ranges repair' will choose the second set of 3
|
||||
# shard ranges (1.*) with newer timestamp over the first set of 4
|
||||
# (0.*), and shrink shard ranges 0.*.
|
||||
db_file = self.get_db_file(self.brain.part, self.brain.nodes[0])
|
||||
self.assert_subprocess_success(
|
||||
['swift-manage-shard-ranges', db_file, 'repair', '--yes'])
|
||||
|
||||
# make sure all root replicas now sync their shard ranges
|
||||
self.replicators.once()
|
||||
@ -3057,3 +3070,139 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
||||
# Finally, with all root replicas in a consistent state, the listing
|
||||
# will be be predictably correct
|
||||
self.assert_container_listing(obj_names)
|
||||
|
||||
def test_manage_shard_ranges_repair_shard(self):
|
||||
# provoke overlaps in a shard container and repair them
|
||||
obj_names = self._make_object_names(24)
|
||||
initial_obj_names = obj_names[::2]
|
||||
# put 12 objects in container
|
||||
self.put_objects(initial_obj_names)
|
||||
client.post_container(self.url, self.admin_token, self.container_name,
|
||||
headers={'X-Container-Sharding': 'on'})
|
||||
# run replicators first time to get sync points set
|
||||
self.replicators.once()
|
||||
# find 3 shard ranges on root nodes[0] and get the root sharded
|
||||
subprocess.check_output([
|
||||
'swift-manage-shard-ranges',
|
||||
self.get_db_file(self.brain.part, self.brain.nodes[0]),
|
||||
'find_and_replace', '4', '--enable'], stderr=subprocess.STDOUT)
|
||||
self.replicators.once()
|
||||
# cleave first two shards
|
||||
self.sharders_once(additional_args='--partitions=%s' % self.brain.part)
|
||||
# cleave third shard
|
||||
self.sharders_once(additional_args='--partitions=%s' % self.brain.part)
|
||||
# ensure all shards learn their ACTIVE state from root
|
||||
self.sharders_once()
|
||||
for node in (0, 1, 2):
|
||||
with annotate_failure('node %d' % node):
|
||||
shard_ranges = self.assert_container_state(
|
||||
self.brain.nodes[node], 'sharded', 3)
|
||||
for sr in shard_ranges:
|
||||
self.assertEqual(ShardRange.ACTIVE, sr.state)
|
||||
self.assert_container_listing(initial_obj_names)
|
||||
|
||||
# add objects to second shard range so it has 8 objects ; this range
|
||||
# has bounds (obj-0006,obj-0014]
|
||||
root_shard_ranges = self.get_container_shard_ranges()
|
||||
self.assertEqual(3, len(root_shard_ranges))
|
||||
shard_1 = root_shard_ranges[1]
|
||||
self.assertEqual(obj_names[6], shard_1.lower)
|
||||
self.assertEqual(obj_names[14], shard_1.upper)
|
||||
more_obj_names = obj_names[7:15:2]
|
||||
self.put_objects(more_obj_names)
|
||||
expected_obj_names = sorted(initial_obj_names + more_obj_names)
|
||||
self.assert_container_listing(expected_obj_names)
|
||||
|
||||
shard_1_part, shard_1_nodes = self.brain.ring.get_nodes(
|
||||
shard_1.account, shard_1.container)
|
||||
|
||||
# find 3 sub-shards on one shard node; use --force-commits to ensure
|
||||
# the recently PUT objects are included when finding the shard range
|
||||
# pivot points
|
||||
subprocess.check_output([
|
||||
'swift-manage-shard-ranges', '--force-commits',
|
||||
self.get_db_file(shard_1_part, shard_1_nodes[1], shard_1.account,
|
||||
shard_1.container),
|
||||
'find_and_replace', '3', '--enable'],
|
||||
stderr=subprocess.STDOUT)
|
||||
# ... and mistakenly find 4 shard ranges on a different shard node :(
|
||||
subprocess.check_output([
|
||||
'swift-manage-shard-ranges', '--force-commits',
|
||||
self.get_db_file(shard_1_part, shard_1_nodes[2], shard_1.account,
|
||||
shard_1.container),
|
||||
'find_and_replace', '2', '--enable'],
|
||||
stderr=subprocess.STDOUT)
|
||||
# replicate the muddle of shard ranges between shard replicas, merged
|
||||
# result is:
|
||||
# '' - 6 shard ACTIVE
|
||||
# 6 - 8 sub-shard FOUND
|
||||
# 6 - 9 sub-shard FOUND
|
||||
# 8 - 10 sub-shard FOUND
|
||||
# 9 - 12 sub-shard FOUND
|
||||
# 10 - 12 sub-shard FOUND
|
||||
# 12 - 14 sub-shard FOUND
|
||||
# 12 - 14 sub-shard FOUND
|
||||
# 6 - 14 shard SHARDING
|
||||
# 14 - '' shard ACTIVE
|
||||
self.replicators.once()
|
||||
|
||||
# try hard to shard the shard...
|
||||
self.sharders_once(additional_args='--partitions=%s' % shard_1_part)
|
||||
self.sharders_once(additional_args='--partitions=%s' % shard_1_part)
|
||||
self.sharders_once(additional_args='--partitions=%s' % shard_1_part)
|
||||
# sharding hasn't completed and there's overlaps in the shard and root:
|
||||
# the sub-shards will have been cleaved in the order listed above, but
|
||||
# sub-shards (10 -12) and one of (12 - 14) will be overlooked because
|
||||
# the cleave cursor will have moved past their namespace before they
|
||||
# were yielded by the shard range iterator, so we now have:
|
||||
# '' - 6 shard ACTIVE
|
||||
# 6 - 8 sub-shard ACTIVE
|
||||
# 6 - 9 sub-shard ACTIVE
|
||||
# 8 - 10 sub-shard ACTIVE
|
||||
# 10 - 12 sub-shard CREATED
|
||||
# 9 - 12 sub-shard ACTIVE
|
||||
# 12 - 14 sub-shard CREATED
|
||||
# 12 - 14 sub-shard ACTIVE
|
||||
# 14 - '' shard ACTIVE
|
||||
sub_shard_ranges = self.get_container_shard_ranges(
|
||||
shard_1.account, shard_1.container)
|
||||
self.assertEqual(7, len(sub_shard_ranges), sub_shard_ranges)
|
||||
root_shard_ranges = self.get_container_shard_ranges()
|
||||
self.assertEqual(9, len(root_shard_ranges), root_shard_ranges)
|
||||
self.assertEqual([ShardRange.ACTIVE] * 4 +
|
||||
[ShardRange.CREATED, ShardRange.ACTIVE] * 2 +
|
||||
[ShardRange.ACTIVE],
|
||||
[sr.state for sr in root_shard_ranges])
|
||||
|
||||
# fix the overlaps - a set of 3 ACTIVE sub-shards will be chosen and 4
|
||||
# other sub-shards will be shrunk away; apply the fix at the root
|
||||
# container
|
||||
db_file = self.get_db_file(self.brain.part, self.brain.nodes[0])
|
||||
self.assert_subprocess_success(
|
||||
['swift-manage-shard-ranges', db_file, 'repair', '--yes'])
|
||||
self.replicators.once()
|
||||
self.sharders_once()
|
||||
self.sharders_once()
|
||||
|
||||
# check root now has just 5 shard ranges
|
||||
root_shard_ranges = self.get_container_shard_ranges()
|
||||
self.assertEqual(5, len(root_shard_ranges), root_shard_ranges)
|
||||
self.assertEqual([ShardRange.ACTIVE] * 5,
|
||||
[sr.state for sr in root_shard_ranges])
|
||||
# check there are 1 sharded shard and 4 shrunk sub-shard ranges in the
|
||||
# root (note, shard_1's shard ranges aren't updated once it has sharded
|
||||
# because the sub-shards report their state to the root; we cannot make
|
||||
# assertions about shrunk states in shard_1's shard range table)
|
||||
root_shard_ranges = self.get_container_shard_ranges(
|
||||
include_deleted=True)
|
||||
self.assertEqual(10, len(root_shard_ranges), root_shard_ranges)
|
||||
shrunk_shard_ranges = [sr for sr in root_shard_ranges
|
||||
if sr.state == ShardRange.SHRUNK]
|
||||
self.assertEqual(4, len(shrunk_shard_ranges), root_shard_ranges)
|
||||
self.assertEqual([True] * 4,
|
||||
[sr.deleted for sr in shrunk_shard_ranges])
|
||||
sharded_shard_ranges = [sr for sr in root_shard_ranges
|
||||
if sr.state == ShardRange.SHARDED]
|
||||
self.assertEqual(1, len(sharded_shard_ranges), root_shard_ranges)
|
||||
|
||||
self.assert_container_listing(expected_obj_names)
|
||||
|
@ -27,11 +27,12 @@ from swift.common import utils
|
||||
from swift.common.utils import Timestamp, ShardRange
|
||||
from swift.container.backend import ContainerBroker
|
||||
from swift.container.sharder import make_shard_ranges
|
||||
from test.unit import mock_timestamp_now
|
||||
from test.unit import mock_timestamp_now, make_timestamp_iter, with_tempdir
|
||||
|
||||
|
||||
class TestManageShardRanges(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.ts_iter = make_timestamp_iter()
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_cli_find_shards')
|
||||
utils.mkdirs(self.testdir)
|
||||
rmtree(self.testdir)
|
||||
@ -58,9 +59,40 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
'object_count': 10},
|
||||
]
|
||||
|
||||
self.overlap_shard_data_1 = [
|
||||
{'index': 0, 'lower': '', 'upper': 'obj10', 'object_count': 1},
|
||||
{'index': 1, 'lower': 'obj10', 'upper': 'obj20',
|
||||
'object_count': 1},
|
||||
{'index': 2, 'lower': 'obj20', 'upper': 'obj30',
|
||||
'object_count': 1},
|
||||
{'index': 3, 'lower': 'obj30', 'upper': 'obj39',
|
||||
'object_count': 1},
|
||||
{'index': 4, 'lower': 'obj39', 'upper': 'obj49',
|
||||
'object_count': 1},
|
||||
{'index': 5, 'lower': 'obj49', 'upper': 'obj58',
|
||||
'object_count': 1},
|
||||
{'index': 6, 'lower': 'obj58', 'upper': 'obj68',
|
||||
'object_count': 1},
|
||||
{'index': 7, 'lower': 'obj68', 'upper': 'obj78',
|
||||
'object_count': 1},
|
||||
{'index': 8, 'lower': 'obj78', 'upper': 'obj88',
|
||||
'object_count': 1},
|
||||
{'index': 9, 'lower': 'obj88', 'upper': '', 'object_count': 1},
|
||||
]
|
||||
|
||||
self.overlap_shard_data_2 = [
|
||||
{'index': 0, 'lower': '', 'upper': 'obj11', 'object_count': 1},
|
||||
{'index': 1, 'lower': 'obj11', 'upper': 'obj21',
|
||||
'object_count': 1},
|
||||
]
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(os.path.dirname(self.testdir))
|
||||
|
||||
def assert_shard_ranges_equal(self, expected, actual):
|
||||
self.assertEqual([dict(sr) for sr in expected],
|
||||
[dict(sr) for sr in actual])
|
||||
|
||||
def assert_starts_with(self, value, prefix):
|
||||
self.assertTrue(value.startswith(prefix),
|
||||
"%r does not start with %r" % (value, prefix))
|
||||
@ -117,10 +149,11 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
with mock.patch('swift.cli.manage_shard_ranges.find_ranges') as mocked:
|
||||
main([db_file, 'find'])
|
||||
expected = Namespace(conf_file=None,
|
||||
container_db=mock.ANY,
|
||||
path_to_file=mock.ANY,
|
||||
func=mock.ANY,
|
||||
rows_per_shard=500000,
|
||||
subcommand='find',
|
||||
force_commits=False,
|
||||
verbose=0)
|
||||
mocked.assert_called_once_with(mock.ANY, expected)
|
||||
|
||||
@ -128,10 +161,11 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
with mock.patch('swift.cli.manage_shard_ranges.find_ranges') as mocked:
|
||||
main([db_file, '--config', conf_file, 'find'])
|
||||
expected = Namespace(conf_file=conf_file,
|
||||
container_db=mock.ANY,
|
||||
path_to_file=mock.ANY,
|
||||
func=mock.ANY,
|
||||
rows_per_shard=500,
|
||||
subcommand='find',
|
||||
force_commits=False,
|
||||
verbose=0)
|
||||
mocked.assert_called_once_with(mock.ANY, expected)
|
||||
|
||||
@ -139,10 +173,11 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
with mock.patch('swift.cli.manage_shard_ranges.find_ranges') as mocked:
|
||||
main([db_file, '--config', conf_file, 'find', '12345'])
|
||||
expected = Namespace(conf_file=conf_file,
|
||||
container_db=mock.ANY,
|
||||
path_to_file=mock.ANY,
|
||||
func=mock.ANY,
|
||||
rows_per_shard=12345,
|
||||
subcommand='find',
|
||||
force_commits=False,
|
||||
verbose=0)
|
||||
mocked.assert_called_once_with(mock.ANY, expected)
|
||||
|
||||
@ -151,9 +186,10 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
as mocked:
|
||||
main([db_file, 'compact'])
|
||||
expected = Namespace(conf_file=None,
|
||||
container_db=mock.ANY,
|
||||
path_to_file=mock.ANY,
|
||||
func=mock.ANY,
|
||||
subcommand='compact',
|
||||
force_commits=False,
|
||||
verbose=0,
|
||||
max_expanding=-1,
|
||||
max_shrinking=1,
|
||||
@ -167,9 +203,10 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
as mocked:
|
||||
main([db_file, '--config', conf_file, 'compact'])
|
||||
expected = Namespace(conf_file=conf_file,
|
||||
container_db=mock.ANY,
|
||||
path_to_file=mock.ANY,
|
||||
func=mock.ANY,
|
||||
subcommand='compact',
|
||||
force_commits=False,
|
||||
verbose=0,
|
||||
max_expanding=31,
|
||||
max_shrinking=33,
|
||||
@ -197,9 +234,10 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
as mocked:
|
||||
main([db_file, '--config', conf_file, 'compact'])
|
||||
expected = Namespace(conf_file=conf_file,
|
||||
container_db=mock.ANY,
|
||||
path_to_file=mock.ANY,
|
||||
func=mock.ANY,
|
||||
subcommand='compact',
|
||||
force_commits=False,
|
||||
verbose=0,
|
||||
max_expanding=31,
|
||||
max_shrinking=33,
|
||||
@ -217,9 +255,10 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
'--expansion-limit', '3456',
|
||||
'--shrink-threshold', '1234'])
|
||||
expected = Namespace(conf_file=conf_file,
|
||||
container_db=mock.ANY,
|
||||
path_to_file=mock.ANY,
|
||||
func=mock.ANY,
|
||||
subcommand='compact',
|
||||
force_commits=False,
|
||||
verbose=0,
|
||||
max_expanding=11,
|
||||
max_shrinking=22,
|
||||
@ -344,7 +383,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
'Metadata:',
|
||||
' X-Container-Sysmeta-Sharding = True']
|
||||
self.assertEqual(expected, out.getvalue().splitlines())
|
||||
self.assertEqual(['Loaded db broker for a/c.'],
|
||||
self.assertEqual(['Loaded db broker for a/c'],
|
||||
err.getvalue().splitlines())
|
||||
|
||||
retiring_db_id = broker.get_info()['id']
|
||||
@ -391,7 +430,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
# The json.dumps() in py2 produces trailing space, not in py3.
|
||||
result = [x.rstrip() for x in out.getvalue().splitlines()]
|
||||
self.assertEqual(expected, result)
|
||||
self.assertEqual(['Loaded db broker for a/c.'],
|
||||
self.assertEqual(['Loaded db broker for a/c'],
|
||||
err.getvalue().splitlines())
|
||||
|
||||
self.assertTrue(broker.set_sharded_state())
|
||||
@ -420,7 +459,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
' X-Container-Sysmeta-Sharding = True']
|
||||
self.assertEqual(expected,
|
||||
[x.rstrip() for x in out.getvalue().splitlines()])
|
||||
self.assertEqual(['Loaded db broker for a/c.'],
|
||||
self.assertEqual(['Loaded db broker for a/c'],
|
||||
err.getvalue().splitlines())
|
||||
|
||||
def test_show(self):
|
||||
@ -430,7 +469,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
main([broker.db_file, 'show'])
|
||||
expected = [
|
||||
'Loaded db broker for a/c.',
|
||||
'Loaded db broker for a/c',
|
||||
'No shard data found.',
|
||||
]
|
||||
self.assertEqual(expected, err.getvalue().splitlines())
|
||||
@ -447,7 +486,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
main([broker.db_file, 'show'])
|
||||
expected = [
|
||||
'Loaded db broker for a/c.',
|
||||
'Loaded db broker for a/c',
|
||||
'Existing shard ranges:',
|
||||
]
|
||||
self.assertEqual(expected, err.getvalue().splitlines())
|
||||
@ -458,7 +497,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
main([broker.db_file, 'show', '--includes', 'foo'])
|
||||
expected = [
|
||||
'Loaded db broker for a/c.',
|
||||
'Loaded db broker for a/c',
|
||||
'Existing shard ranges:',
|
||||
]
|
||||
self.assertEqual(expected, err.getvalue().splitlines())
|
||||
@ -481,7 +520,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
'Run container-replicator to replicate them to other nodes.',
|
||||
'Use the enable sub-command to enable sharding.']
|
||||
self.assertEqual(expected, out.getvalue().splitlines())
|
||||
self.assertEqual(['Loaded db broker for a/c.'],
|
||||
self.assertEqual(['Loaded db broker for a/c'],
|
||||
err.getvalue().splitlines())
|
||||
self.assertEqual(
|
||||
[(data['lower'], data['upper']) for data in self.shard_data],
|
||||
@ -509,7 +548,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
expected = ["WARNING: invalid shard ranges: ['No shard ranges.'].",
|
||||
'Aborting.']
|
||||
self.assertEqual(expected, out.getvalue().splitlines())
|
||||
self.assertEqual(['Loaded db broker for a/c.'],
|
||||
self.assertEqual(['Loaded db broker for a/c'],
|
||||
err.getvalue().splitlines())
|
||||
|
||||
# success
|
||||
@ -531,7 +570,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
now.internal,
|
||||
'Run container-sharder on all nodes to shard the container.']
|
||||
self.assertEqual(expected, out.getvalue().splitlines())
|
||||
self.assertEqual(['Loaded db broker for a/c.'],
|
||||
self.assertEqual(['Loaded db broker for a/c'],
|
||||
err.getvalue().splitlines())
|
||||
self._assert_enabled(broker, now)
|
||||
|
||||
@ -546,7 +585,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
'No action required.',
|
||||
'Run container-sharder on all nodes to shard the container.']
|
||||
self.assertEqual(expected, out.getvalue().splitlines())
|
||||
self.assertEqual(['Loaded db broker for a/c.'],
|
||||
self.assertEqual(['Loaded db broker for a/c'],
|
||||
err.getvalue().splitlines())
|
||||
self._assert_enabled(broker, now)
|
||||
|
||||
@ -576,7 +615,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
now.internal,
|
||||
'Run container-sharder on all nodes to shard the container.']
|
||||
self.assertEqual(expected, out.getvalue().splitlines())
|
||||
self.assertEqual(['Loaded db broker for a/c.'],
|
||||
self.assertEqual(['Loaded db broker for a/c'],
|
||||
err.getvalue().splitlines())
|
||||
self._assert_enabled(broker, now)
|
||||
found_shard_ranges = broker.get_shard_ranges()
|
||||
@ -597,7 +636,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
self.assertEqual(found_shard_ranges, broker.get_shard_ranges())
|
||||
expected = ['This will delete existing 10 shard ranges.']
|
||||
self.assertEqual(expected, out.getvalue().splitlines())
|
||||
self.assertEqual(['Loaded db broker for a/c.'],
|
||||
self.assertEqual(['Loaded db broker for a/c'],
|
||||
err.getvalue().splitlines())
|
||||
|
||||
def test_compact_bad_args(self):
|
||||
@ -710,7 +749,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
[sr.state for sr in updated_ranges])
|
||||
|
||||
def test_compact_user_input(self):
|
||||
# verify user input 'y' or 'n' is respected
|
||||
# verify user input 'yes' or 'n' is respected
|
||||
small_ranges = (3, 4, 7)
|
||||
broker = self._make_broker()
|
||||
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||
@ -757,7 +796,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
'Once applied to the broker these changes will result in '
|
||||
'shard range compaction the next time the sharder runs.',
|
||||
]
|
||||
if user_input == 'y':
|
||||
if user_input == 'yes':
|
||||
expected.extend([
|
||||
'Updated 2 shard sequences for compaction.',
|
||||
'Run container-replicator to replicate the changes to '
|
||||
@ -779,7 +818,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
for i, sr in enumerate(broker_ranges):
|
||||
self.assertEqual(ShardRange.ACTIVE, sr.state)
|
||||
|
||||
broker_ranges = do_compact('y')
|
||||
broker_ranges = do_compact('yes')
|
||||
# expect updated shard ranges
|
||||
shard_ranges[5].lower = shard_ranges[3].lower
|
||||
shard_ranges[8].lower = shard_ranges[7].lower
|
||||
@ -1237,3 +1276,329 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
self.assertEqual(shard_ranges, updated_ranges)
|
||||
self.assertEqual([ShardRange.SHRINKING] * 8 + [ShardRange.ACTIVE] * 2,
|
||||
[sr.state for sr in updated_ranges])
|
||||
|
||||
def test_repair_not_root(self):
|
||||
broker = self._make_broker()
|
||||
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
# make broker appear to not be a root container
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'not_a/c')
|
||||
self.assertFalse(broker.is_root_container())
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'repair'])
|
||||
self.assertEqual(2, ret)
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['WARNING: Shard containers cannot be repaired.',
|
||||
'This command should be used on a root container.'],
|
||||
out_lines[:2]
|
||||
)
|
||||
updated_ranges = broker.get_shard_ranges()
|
||||
self.assert_shard_ranges_equal(shard_ranges, updated_ranges)
|
||||
|
||||
def test_repair_no_shard_ranges(self):
|
||||
broker = self._make_broker()
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
self.assertTrue(broker.is_root_container())
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'repair'])
|
||||
self.assertEqual(0, ret)
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['No shards found, nothing to do.'],
|
||||
out_lines[:1])
|
||||
updated_ranges = broker.get_shard_ranges()
|
||||
self.assert_shard_ranges_equal([], updated_ranges)
|
||||
|
||||
def test_repair_gaps_one_incomplete_sequence(self):
|
||||
broker = self._make_broker()
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
shard_ranges = make_shard_ranges(
|
||||
broker, self.shard_data[:-1], '.shards_')
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
self.assertTrue(broker.is_root_container())
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'repair'])
|
||||
self.assertEqual(1, ret)
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['Found no complete sequence of shard ranges.'],
|
||||
out_lines[:1])
|
||||
updated_ranges = broker.get_shard_ranges()
|
||||
self.assert_shard_ranges_equal(shard_ranges, updated_ranges)
|
||||
|
||||
def test_repair_gaps_overlapping_incomplete_sequences(self):
|
||||
broker = self._make_broker()
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
shard_ranges = make_shard_ranges(
|
||||
broker, self.shard_data[:-1], '.shards_')
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
# use new time to get distinct shard names
|
||||
overlap_shard_ranges = make_shard_ranges(
|
||||
broker,
|
||||
self.overlap_shard_data_1[:2] + self.overlap_shard_data_1[6:],
|
||||
'.shards_')
|
||||
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges)
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'repair'])
|
||||
self.assertEqual(1, ret)
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['Found no complete sequence of shard ranges.'],
|
||||
out_lines[:1])
|
||||
updated_ranges = broker.get_shard_ranges()
|
||||
expected = sorted(shard_ranges + overlap_shard_ranges,
|
||||
key=ShardRange.sort_key)
|
||||
self.assert_shard_ranges_equal(expected, updated_ranges)
|
||||
|
||||
def test_repair_not_needed(self):
|
||||
broker = self._make_broker()
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
shard_ranges = make_shard_ranges(
|
||||
broker, self.shard_data, '.shards_')
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
self.assertTrue(broker.is_root_container())
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'repair'])
|
||||
self.assertEqual(0, ret)
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['Found one complete sequence of 10 shard ranges and no '
|
||||
'overlapping shard ranges.',
|
||||
'No repairs necessary.'],
|
||||
out_lines[:2])
|
||||
updated_ranges = broker.get_shard_ranges()
|
||||
self.assert_shard_ranges_equal(shard_ranges, updated_ranges)
|
||||
|
||||
def _do_test_repair_exits_if_undesirable_state(self, undesirable_state):
|
||||
broker = self._make_broker()
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
shard_ranges = make_shard_ranges(
|
||||
broker, self.shard_data, '.shards_')
|
||||
# make one shard be in an undesirable state
|
||||
shard_ranges[2].update_state(undesirable_state)
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
overlap_shard_ranges_2 = make_shard_ranges(
|
||||
broker, self.overlap_shard_data_2, '.shards_')
|
||||
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges_2)
|
||||
self.assertTrue(broker.is_root_container())
|
||||
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), \
|
||||
mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'repair'])
|
||||
self.assertEqual(1, ret)
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['WARNING: Found shard ranges in %s state'
|
||||
% ShardRange.STATES[undesirable_state]], out_lines[:1])
|
||||
# nothing changed in DB
|
||||
self.assert_shard_ranges_equal(
|
||||
sorted(shard_ranges + overlap_shard_ranges_2,
|
||||
key=ShardRange.sort_key),
|
||||
broker.get_shard_ranges())
|
||||
|
||||
def test_repair_exits_if_sharding_state(self):
|
||||
self._do_test_repair_exits_if_undesirable_state(ShardRange.SHARDING)
|
||||
|
||||
def test_repair_exits_if_shrinking_state(self):
|
||||
self._do_test_repair_exits_if_undesirable_state(ShardRange.SHRINKING)
|
||||
|
||||
def test_repair_one_complete_sequences_one_incomplete(self):
|
||||
broker = self._make_broker()
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
shard_ranges = make_shard_ranges(
|
||||
broker, self.shard_data, '.shards_')
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
overlap_shard_ranges_2 = make_shard_ranges(
|
||||
broker, self.overlap_shard_data_2, '.shards_')
|
||||
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges_2)
|
||||
self.assertTrue(broker.is_root_container())
|
||||
|
||||
def do_repair(user_input, ts_now):
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), \
|
||||
mock.patch('sys.stderr', err), \
|
||||
mock_timestamp_now(ts_now), \
|
||||
mock.patch('swift.cli.manage_shard_ranges.input',
|
||||
return_value=user_input):
|
||||
ret = main([broker.db_file, 'repair'])
|
||||
self.assertEqual(0, ret)
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['Repairs necessary to remove overlapping shard ranges.'],
|
||||
out_lines[:1])
|
||||
|
||||
# user input 'n'
|
||||
ts_now = next(self.ts_iter)
|
||||
do_repair('n', ts_now)
|
||||
updated_ranges = broker.get_shard_ranges()
|
||||
expected = sorted(
|
||||
shard_ranges + overlap_shard_ranges_2,
|
||||
key=ShardRange.sort_key)
|
||||
self.assert_shard_ranges_equal(expected, updated_ranges)
|
||||
|
||||
# user input 'yes'
|
||||
ts_now = next(self.ts_iter)
|
||||
do_repair('yes', ts_now)
|
||||
updated_ranges = broker.get_shard_ranges()
|
||||
for sr in overlap_shard_ranges_2:
|
||||
sr.update_state(ShardRange.SHRINKING, ts_now)
|
||||
sr.epoch = ts_now
|
||||
expected = sorted(
|
||||
shard_ranges + overlap_shard_ranges_2,
|
||||
key=ShardRange.sort_key)
|
||||
self.assert_shard_ranges_equal(expected, updated_ranges)
|
||||
|
||||
def test_repair_two_complete_sequences_one_incomplete(self):
|
||||
broker = self._make_broker()
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
shard_ranges = make_shard_ranges(
|
||||
broker, self.shard_data, '.shards_')
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
overlap_shard_ranges_1 = make_shard_ranges(
|
||||
broker, self.overlap_shard_data_1, '.shards_')
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
overlap_shard_ranges_2 = make_shard_ranges(
|
||||
broker, self.overlap_shard_data_2, '.shards_')
|
||||
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges_1 +
|
||||
overlap_shard_ranges_2)
|
||||
self.assertTrue(broker.is_root_container())
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
ts_now = next(self.ts_iter)
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err), \
|
||||
mock_timestamp_now(ts_now):
|
||||
ret = main([broker.db_file, 'repair', '--yes'])
|
||||
self.assertEqual(0, ret)
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['Repairs necessary to remove overlapping shard ranges.'],
|
||||
out_lines[:1])
|
||||
updated_ranges = broker.get_shard_ranges()
|
||||
for sr in overlap_shard_ranges_1 + overlap_shard_ranges_2:
|
||||
sr.update_state(ShardRange.SHRINKING, ts_now)
|
||||
sr.epoch = ts_now
|
||||
expected = sorted(
|
||||
shard_ranges + overlap_shard_ranges_1 + overlap_shard_ranges_2,
|
||||
key=ShardRange.sort_key)
|
||||
self.assert_shard_ranges_equal(expected, updated_ranges)
|
||||
|
||||
@with_tempdir
|
||||
def test_show_and_analyze(self, tempdir):
|
||||
broker = self._make_broker()
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
with mock_timestamp_now(next(self.ts_iter)): # t1
|
||||
shard_ranges = make_shard_ranges(
|
||||
broker, self.shard_data, '.shards_')
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
overlap_shard_ranges_1 = make_shard_ranges(
|
||||
broker, self.overlap_shard_data_1, '.shards_')
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
overlap_shard_ranges_2 = make_shard_ranges(
|
||||
broker, self.overlap_shard_data_2, '.shards_')
|
||||
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges_1 +
|
||||
overlap_shard_ranges_2)
|
||||
self.assertTrue(broker.is_root_container())
|
||||
|
||||
# run show command
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'show'])
|
||||
self.assertEqual(0, ret)
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
shard_json = json.loads(out.getvalue())
|
||||
expected = sorted(
|
||||
shard_ranges + overlap_shard_ranges_1 + overlap_shard_ranges_2,
|
||||
key=ShardRange.sort_key)
|
||||
self.assert_shard_ranges_equal(
|
||||
expected, [ShardRange.from_dict(data) for data in shard_json])
|
||||
|
||||
# dump data to a file and then run analyze subcommand
|
||||
shard_file = os.path.join(tempdir, 'shards.json')
|
||||
with open(shard_file, 'w') as fd:
|
||||
json.dump(shard_json, fd)
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([shard_file, 'analyze'])
|
||||
self.assertEqual(0, ret)
|
||||
self.assertEqual('', err.getvalue())
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['Repairs necessary to remove overlapping shard ranges.'],
|
||||
out_lines[:1])
|
||||
|
||||
# no changes made to broker
|
||||
updated_ranges = broker.get_shard_ranges()
|
||||
expected = sorted(
|
||||
shard_ranges + overlap_shard_ranges_1 + overlap_shard_ranges_2,
|
||||
key=ShardRange.sort_key)
|
||||
self.assert_shard_ranges_equal(expected, updated_ranges)
|
||||
|
||||
# tweak timestamps to make the preferred path include shards from two
|
||||
# sets, so that shards to remove have name-timestamps that are also in
|
||||
# shards to keep
|
||||
t4 = next(self.ts_iter)
|
||||
for sr in shard_ranges[:5] + overlap_shard_ranges_1[5:]:
|
||||
sr.timestamp = t4
|
||||
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges_1 +
|
||||
overlap_shard_ranges_2)
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'show'])
|
||||
self.assertEqual(0, ret)
|
||||
shard_json = json.loads(out.getvalue())
|
||||
expected = sorted(
|
||||
shard_ranges + overlap_shard_ranges_1 + overlap_shard_ranges_2,
|
||||
key=ShardRange.sort_key)
|
||||
self.assert_shard_ranges_equal(
|
||||
expected, [ShardRange.from_dict(data) for data in shard_json])
|
||||
with open(shard_file, 'w') as fd:
|
||||
json.dump(shard_json, fd)
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([shard_file, 'analyze'])
|
||||
self.assertEqual(0, ret)
|
||||
self.assertEqual('', err.getvalue())
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['Repairs necessary to remove overlapping shard ranges.'],
|
||||
out_lines[:1])
|
||||
|
@ -8077,8 +8077,9 @@ class TestShardRange(unittest.TestCase):
|
||||
self.assertEqual(utils.Timestamp(0), sr.state_timestamp)
|
||||
|
||||
def test_state_setter(self):
|
||||
for state in utils.ShardRange.STATES:
|
||||
for test_value in (state, str(state)):
|
||||
for state, state_name in utils.ShardRange.STATES.items():
|
||||
for test_value in (
|
||||
state, str(state), state_name, state_name.upper()):
|
||||
sr = utils.ShardRange('a/test', next(self.ts_iter), 'l', 'u')
|
||||
sr.state = test_value
|
||||
actual = sr.state
|
||||
@ -8127,6 +8128,8 @@ class TestShardRange(unittest.TestCase):
|
||||
(number, name), utils.ShardRange.resolve_state(name.title()))
|
||||
self.assertEqual(
|
||||
(number, name), utils.ShardRange.resolve_state(number))
|
||||
self.assertEqual(
|
||||
(number, name), utils.ShardRange.resolve_state(str(number)))
|
||||
|
||||
def check_bad_value(value):
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
@ -8647,12 +8650,16 @@ class TestShardRange(unittest.TestCase):
|
||||
|
||||
class TestShardRangeList(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.ts_iter = make_timestamp_iter()
|
||||
self.t1 = next(self.ts_iter)
|
||||
self.t2 = next(self.ts_iter)
|
||||
self.ts_iter = make_timestamp_iter()
|
||||
self.shard_ranges = [
|
||||
utils.ShardRange('a/b', utils.Timestamp.now(), 'a', 'b',
|
||||
utils.ShardRange('a/b', self.t1, 'a', 'b',
|
||||
object_count=2, bytes_used=22),
|
||||
utils.ShardRange('b/c', utils.Timestamp.now(), 'b', 'c',
|
||||
utils.ShardRange('b/c', self.t2, 'b', 'c',
|
||||
object_count=4, bytes_used=44),
|
||||
utils.ShardRange('x/y', utils.Timestamp.now(), 'x', 'y',
|
||||
utils.ShardRange('c/y', self.t1, 'c', 'y',
|
||||
object_count=6, bytes_used=66),
|
||||
]
|
||||
|
||||
@ -8732,6 +8739,86 @@ class TestShardRangeList(unittest.TestCase):
|
||||
sr = utils.ShardRange('a/entire', utils.Timestamp.now(), '', '')
|
||||
self.assertTrue(srl_entire.includes(sr))
|
||||
|
||||
def test_timestamps(self):
|
||||
srl = ShardRangeList(self.shard_ranges)
|
||||
self.assertEqual({self.t1, self.t2}, srl.timestamps)
|
||||
t3 = next(self.ts_iter)
|
||||
self.shard_ranges[2].timestamp = t3
|
||||
self.assertEqual({self.t1, self.t2, t3}, srl.timestamps)
|
||||
srl.pop(0)
|
||||
self.assertEqual({self.t2, t3}, srl.timestamps)
|
||||
|
||||
def test_states(self):
|
||||
srl = ShardRangeList()
|
||||
self.assertEqual(set(), srl.states)
|
||||
|
||||
srl = ShardRangeList(self.shard_ranges)
|
||||
self.shard_ranges[0].update_state(
|
||||
utils.ShardRange.CREATED, next(self.ts_iter))
|
||||
self.shard_ranges[1].update_state(
|
||||
utils.ShardRange.CLEAVED, next(self.ts_iter))
|
||||
self.shard_ranges[2].update_state(
|
||||
utils.ShardRange.ACTIVE, next(self.ts_iter))
|
||||
|
||||
self.assertEqual({utils.ShardRange.CREATED,
|
||||
utils.ShardRange.CLEAVED,
|
||||
utils.ShardRange.ACTIVE},
|
||||
srl.states)
|
||||
|
||||
def test_filter(self):
|
||||
srl = ShardRangeList(self.shard_ranges)
|
||||
self.assertEqual(self.shard_ranges, srl.filter())
|
||||
self.assertEqual(self.shard_ranges,
|
||||
srl.filter(marker='', end_marker=''))
|
||||
self.assertEqual(self.shard_ranges,
|
||||
srl.filter(marker=utils.ShardRange.MIN,
|
||||
end_marker=utils.ShardRange.MAX))
|
||||
self.assertEqual([], srl.filter(marker=utils.ShardRange.MAX,
|
||||
end_marker=utils.ShardRange.MIN))
|
||||
self.assertEqual([], srl.filter(marker=utils.ShardRange.MIN,
|
||||
end_marker=utils.ShardRange.MIN))
|
||||
self.assertEqual([], srl.filter(marker=utils.ShardRange.MAX,
|
||||
end_marker=utils.ShardRange.MAX))
|
||||
self.assertEqual(self.shard_ranges[:1],
|
||||
srl.filter(marker='', end_marker='b'))
|
||||
self.assertEqual(self.shard_ranges[1:3],
|
||||
srl.filter(marker='b', end_marker='y'))
|
||||
self.assertEqual([],
|
||||
srl.filter(marker='y', end_marker='y'))
|
||||
self.assertEqual([],
|
||||
srl.filter(marker='y', end_marker='x'))
|
||||
# includes trumps marker & end_marker
|
||||
self.assertEqual(self.shard_ranges[0:1],
|
||||
srl.filter(includes='b', marker='c', end_marker='y'))
|
||||
self.assertEqual(self.shard_ranges[0:1],
|
||||
srl.filter(includes='b', marker='', end_marker=''))
|
||||
self.assertEqual([], srl.filter(includes='z'))
|
||||
|
||||
def test_find_lower(self):
|
||||
srl = ShardRangeList(self.shard_ranges)
|
||||
self.shard_ranges[0].update_state(
|
||||
utils.ShardRange.CREATED, next(self.ts_iter))
|
||||
self.shard_ranges[1].update_state(
|
||||
utils.ShardRange.CLEAVED, next(self.ts_iter))
|
||||
self.shard_ranges[2].update_state(
|
||||
utils.ShardRange.ACTIVE, next(self.ts_iter))
|
||||
|
||||
def do_test(states):
|
||||
return srl.find_lower(lambda sr: sr.state in states)
|
||||
|
||||
self.assertEqual(srl.upper,
|
||||
do_test([utils.ShardRange.FOUND]))
|
||||
self.assertEqual(self.shard_ranges[0].lower,
|
||||
do_test([utils.ShardRange.CREATED]))
|
||||
self.assertEqual(self.shard_ranges[0].lower,
|
||||
do_test((utils.ShardRange.CREATED,
|
||||
utils.ShardRange.CLEAVED)))
|
||||
self.assertEqual(self.shard_ranges[1].lower,
|
||||
do_test((utils.ShardRange.ACTIVE,
|
||||
utils.ShardRange.CLEAVED)))
|
||||
self.assertEqual(self.shard_ranges[2].lower,
|
||||
do_test([utils.ShardRange.ACTIVE]))
|
||||
|
||||
|
||||
@patch('ctypes.get_errno')
|
||||
@patch.object(utils, '_sys_posix_fallocate')
|
||||
|
@ -39,7 +39,7 @@ from swift.container.backend import ContainerBroker, \
|
||||
from swift.common.db import DatabaseAlreadyExists, GreenDBConnection
|
||||
from swift.common.request_helpers import get_reserved_name
|
||||
from swift.common.utils import Timestamp, encode_timestamps, hash_path, \
|
||||
ShardRange, make_db_file_path, md5
|
||||
ShardRange, make_db_file_path, md5, ShardRangeList
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
import mock
|
||||
@ -4811,7 +4811,7 @@ class TestContainerBroker(unittest.TestCase):
|
||||
|
||||
@with_tempdir
|
||||
def test_merge_shard_ranges(self, tempdir):
|
||||
ts = [next(self.ts) for _ in range(13)]
|
||||
ts = [next(self.ts) for _ in range(14)]
|
||||
db_path = os.path.join(
|
||||
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
|
||||
broker = ContainerBroker(
|
||||
@ -4907,6 +4907,15 @@ class TestContainerBroker(unittest.TestCase):
|
||||
self._assert_shard_ranges(
|
||||
broker, [sr_b_2_2_deleted, sr_c_10_10_deleted])
|
||||
|
||||
# merge a ShardRangeList
|
||||
sr_b_13 = ShardRange('a/c_b', ts[13], lower='a', upper='b',
|
||||
object_count=10, meta_timestamp=ts[13])
|
||||
sr_c_13 = ShardRange('a/c_c', ts[13], lower='b', upper='c',
|
||||
object_count=10, meta_timestamp=ts[13])
|
||||
broker.merge_shard_ranges(ShardRangeList([sr_c_13, sr_b_13]))
|
||||
self._assert_shard_ranges(
|
||||
broker, [sr_b_13, sr_c_13])
|
||||
|
||||
@with_tempdir
|
||||
def test_merge_shard_ranges_state(self, tempdir):
|
||||
db_path = os.path.join(
|
||||
|
@ -42,7 +42,7 @@ from swift.container.sharder import ContainerSharder, sharding_enabled, \
|
||||
DEFAULT_SHARD_CONTAINER_THRESHOLD, finalize_shrinking, \
|
||||
find_shrinking_candidates, process_compactible_shard_sequences, \
|
||||
find_compactible_shard_sequences, is_shrinking_candidate, \
|
||||
is_sharding_candidate
|
||||
is_sharding_candidate, find_paths, rank_paths
|
||||
from swift.common.utils import ShardRange, Timestamp, hash_path, \
|
||||
encode_timestamps, parse_db_filename, quorum_size, Everything, md5
|
||||
from test import annotate_failure
|
||||
@ -6783,3 +6783,193 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
for object_count in (10, 11):
|
||||
with annotate_failure('%s %s' % (state, object_count)):
|
||||
do_check_false(state, object_count)
|
||||
|
||||
def test_find_and_rank_whole_path_split(self):
|
||||
ts_0 = next(self.ts_iter)
|
||||
ts_1 = next(self.ts_iter)
|
||||
bounds_0 = (
|
||||
('', 'f'),
|
||||
('f', 'k'),
|
||||
('k', 's'),
|
||||
('s', 'x'),
|
||||
('x', ''),
|
||||
)
|
||||
bounds_1 = (
|
||||
('', 'g'),
|
||||
('g', 'l'),
|
||||
('l', 't'),
|
||||
('t', 'y'),
|
||||
('y', ''),
|
||||
)
|
||||
# path with newer timestamp wins
|
||||
ranges_0 = self._make_shard_ranges(bounds_0, ShardRange.ACTIVE,
|
||||
timestamp=ts_0)
|
||||
ranges_1 = self._make_shard_ranges(bounds_1, ShardRange.ACTIVE,
|
||||
timestamp=ts_1)
|
||||
|
||||
paths = find_paths(ranges_0 + ranges_1)
|
||||
self.assertEqual(2, len(paths))
|
||||
self.assertIn(ranges_0, paths)
|
||||
self.assertIn(ranges_1, paths)
|
||||
own_sr = ShardRange('a/c', Timestamp.now())
|
||||
self.assertEqual(
|
||||
[
|
||||
ranges_1, # complete and newer timestamp
|
||||
ranges_0, # complete
|
||||
],
|
||||
rank_paths(paths, own_sr))
|
||||
|
||||
# but object_count trumps matching timestamp
|
||||
ranges_0 = self._make_shard_ranges(bounds_0, ShardRange.ACTIVE,
|
||||
timestamp=ts_1, object_count=1)
|
||||
paths = find_paths(ranges_0 + ranges_1)
|
||||
self.assertEqual(2, len(paths))
|
||||
self.assertIn(ranges_0, paths)
|
||||
self.assertIn(ranges_1, paths)
|
||||
self.assertEqual(
|
||||
[
|
||||
ranges_0, # complete with more objects
|
||||
ranges_1, # complete
|
||||
],
|
||||
rank_paths(paths, own_sr))
|
||||
|
||||
def test_find_and_rank_two_sub_path_splits(self):
|
||||
ts_0 = next(self.ts_iter)
|
||||
ts_1 = next(self.ts_iter)
|
||||
ts_2 = next(self.ts_iter)
|
||||
bounds_0 = (
|
||||
('', 'a'),
|
||||
('a', 'm'),
|
||||
('m', 'p'),
|
||||
('p', 't'),
|
||||
('t', 'x'),
|
||||
('x', 'y'),
|
||||
('y', ''),
|
||||
)
|
||||
bounds_1 = (
|
||||
('a', 'g'), # split at 'a'
|
||||
('g', 'l'),
|
||||
('l', 'm'), # rejoin at 'm'
|
||||
)
|
||||
bounds_2 = (
|
||||
('t', 'y'), # split at 't', rejoin at 'y'
|
||||
)
|
||||
ranges_0 = self._make_shard_ranges(bounds_0, ShardRange.ACTIVE,
|
||||
timestamp=ts_0)
|
||||
ranges_1 = self._make_shard_ranges(bounds_1, ShardRange.ACTIVE,
|
||||
timestamp=ts_1, object_count=1)
|
||||
ranges_2 = self._make_shard_ranges(bounds_2, ShardRange.ACTIVE,
|
||||
timestamp=ts_2, object_count=1)
|
||||
# all paths are complete
|
||||
mix_path_0 = ranges_0[:1] + ranges_1 + ranges_0[2:] # 3 objects
|
||||
mix_path_1 = ranges_0[:4] + ranges_2 + ranges_0[6:] # 1 object
|
||||
mix_path_2 = (ranges_0[:1] + ranges_1 + ranges_0[2:4] + ranges_2 +
|
||||
ranges_0[6:]) # 4 objects
|
||||
paths = find_paths(ranges_0 + ranges_1 + ranges_2)
|
||||
self.assertEqual(4, len(paths))
|
||||
self.assertIn(ranges_0, paths)
|
||||
self.assertIn(mix_path_0, paths)
|
||||
self.assertIn(mix_path_1, paths)
|
||||
self.assertIn(mix_path_2, paths)
|
||||
own_sr = ShardRange('a/c', Timestamp.now())
|
||||
self.assertEqual(
|
||||
[
|
||||
mix_path_2, # has 4 objects, 3 different timestamps
|
||||
mix_path_0, # has 3 objects, 2 different timestamps
|
||||
mix_path_1, # has 1 object, 2 different timestamps
|
||||
ranges_0, # has 0 objects, 1 timestamp
|
||||
],
|
||||
rank_paths(paths, own_sr)
|
||||
)
|
||||
|
||||
def test_find_and_rank_most_cleave_progress(self):
|
||||
ts_0 = next(self.ts_iter)
|
||||
ts_1 = next(self.ts_iter)
|
||||
ts_2 = next(self.ts_iter)
|
||||
bounds_0 = (
|
||||
('', 'f'),
|
||||
('f', 'k'),
|
||||
('k', 'p'),
|
||||
('p', '')
|
||||
)
|
||||
bounds_1 = (
|
||||
('', 'g'),
|
||||
('g', 'l'),
|
||||
('l', 'q'),
|
||||
('q', '')
|
||||
)
|
||||
bounds_2 = (
|
||||
('', 'r'),
|
||||
('r', '')
|
||||
)
|
||||
ranges_0 = self._make_shard_ranges(
|
||||
bounds_0, [ShardRange.CLEAVED] * 3 + [ShardRange.CREATED],
|
||||
timestamp=ts_1, object_count=1)
|
||||
ranges_1 = self._make_shard_ranges(
|
||||
bounds_1, [ShardRange.CLEAVED] * 4,
|
||||
timestamp=ts_0)
|
||||
ranges_2 = self._make_shard_ranges(
|
||||
bounds_2, [ShardRange.CLEAVED, ShardRange.CREATED],
|
||||
timestamp=ts_2, object_count=1)
|
||||
paths = find_paths(ranges_0 + ranges_1 + ranges_2)
|
||||
self.assertEqual(3, len(paths))
|
||||
own_sr = ShardRange('a/c', Timestamp.now())
|
||||
self.assertEqual(
|
||||
[
|
||||
ranges_1, # cleaved to end
|
||||
ranges_2, # cleaved to r
|
||||
ranges_0, # cleaved to p
|
||||
],
|
||||
rank_paths(paths, own_sr)
|
||||
)
|
||||
ranges_2 = self._make_shard_ranges(
|
||||
bounds_2, [ShardRange.ACTIVE] * 2,
|
||||
timestamp=ts_2, object_count=1)
|
||||
paths = find_paths(ranges_0 + ranges_1 + ranges_2)
|
||||
self.assertEqual(
|
||||
[
|
||||
ranges_2, # active to end, newer timestamp
|
||||
ranges_1, # cleaved to r
|
||||
ranges_0, # cleaved to p
|
||||
],
|
||||
rank_paths(paths, own_sr)
|
||||
)
|
||||
|
||||
def test_find_and_rank_no_complete_path(self):
|
||||
ts_0 = next(self.ts_iter)
|
||||
ts_1 = next(self.ts_iter)
|
||||
ts_2 = next(self.ts_iter)
|
||||
bounds_0 = (
|
||||
('', 'f'),
|
||||
('f', 'k'),
|
||||
('k', 'm'),
|
||||
)
|
||||
bounds_1 = (
|
||||
('', 'g'),
|
||||
('g', 'l'),
|
||||
('l', 'n'),
|
||||
)
|
||||
bounds_2 = (
|
||||
('', 'l'),
|
||||
)
|
||||
ranges_0 = self._make_shard_ranges(bounds_0, ShardRange.ACTIVE,
|
||||
timestamp=ts_0)
|
||||
ranges_1 = self._make_shard_ranges(bounds_1, ShardRange.ACTIVE,
|
||||
timestamp=ts_1, object_count=1)
|
||||
ranges_2 = self._make_shard_ranges(bounds_2, ShardRange.ACTIVE,
|
||||
timestamp=ts_2, object_count=1)
|
||||
mix_path_0 = ranges_2 + ranges_1[2:]
|
||||
paths = find_paths(ranges_0 + ranges_1 + ranges_2)
|
||||
self.assertEqual(3, len(paths))
|
||||
self.assertIn(ranges_0, paths)
|
||||
self.assertIn(ranges_1, paths)
|
||||
self.assertIn(mix_path_0, paths)
|
||||
own_sr = ShardRange('a/c', Timestamp.now())
|
||||
self.assertEqual(
|
||||
[
|
||||
ranges_1, # cleaved to n, one timestamp
|
||||
mix_path_0, # cleaved to n, has two different timestamps
|
||||
ranges_0, # cleaved to m
|
||||
],
|
||||
rank_paths(paths, own_sr)
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user