swift-manage-shard-ranges: add 'compact' command
This patch adds a 'compact' command to swift-manage-shard-ranges that enables sequences of contiguous shards with low object counts to be compacted into another existing shard, or into the root container. Change-Id: Ia8f3297d610b5a5cf5598d076fdaf30211832366
This commit is contained in:
parent
b0c8de699e
commit
12bb4839f0
@ -167,7 +167,14 @@ from six.moves import input
|
|||||||
from swift.common.utils import Timestamp, get_logger, ShardRange
|
from swift.common.utils import Timestamp, get_logger, ShardRange
|
||||||
from swift.container.backend import ContainerBroker, UNSHARDED
|
from swift.container.backend import ContainerBroker, UNSHARDED
|
||||||
from swift.container.sharder import make_shard_ranges, sharding_enabled, \
|
from swift.container.sharder import make_shard_ranges, sharding_enabled, \
|
||||||
CleavingContext
|
CleavingContext, process_compactable_shard_sequences, \
|
||||||
|
find_compactable_shard_sequences, find_overlapping_ranges, \
|
||||||
|
finalize_shrinking
|
||||||
|
|
||||||
|
DEFAULT_ROWS_PER_SHARD = 500000
|
||||||
|
DEFAULT_SHRINK_THRESHOLD = 10000
|
||||||
|
DEFAULT_MAX_SHRINKING = 1
|
||||||
|
DEFAULT_MAX_EXPANDING = -1
|
||||||
|
|
||||||
|
|
||||||
def _load_and_validate_shard_data(args):
|
def _load_and_validate_shard_data(args):
|
||||||
@ -289,6 +296,7 @@ def db_info(broker, args):
|
|||||||
print('Metadata:')
|
print('Metadata:')
|
||||||
for k, (v, t) in broker.metadata.items():
|
for k, (v, t) in broker.metadata.items():
|
||||||
print(' %s = %s' % (k, v))
|
print(' %s = %s' % (k, v))
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def delete_shard_ranges(broker, args):
|
def delete_shard_ranges(broker, args):
|
||||||
@ -410,8 +418,76 @@ def enable_sharding(broker, args):
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def compact_shard_ranges(broker, args):
|
||||||
|
if not broker.is_root_container():
|
||||||
|
print('WARNING: Shard containers cannot be compacted.')
|
||||||
|
print('This command should be used on a root container.')
|
||||||
|
return 2
|
||||||
|
|
||||||
|
if not broker.is_sharded():
|
||||||
|
print('WARNING: Container is not yet sharded so cannot be compacted.')
|
||||||
|
return 2
|
||||||
|
|
||||||
|
shard_ranges = broker.get_shard_ranges()
|
||||||
|
if find_overlapping_ranges([sr for sr in shard_ranges if
|
||||||
|
sr.state != ShardRange.SHRINKING]):
|
||||||
|
print('WARNING: Container has overlapping shard ranges so cannot be '
|
||||||
|
'compacted.')
|
||||||
|
return 2
|
||||||
|
|
||||||
|
compactable = find_compactable_shard_sequences(broker,
|
||||||
|
args.shrink_threshold,
|
||||||
|
args.expansion_limit,
|
||||||
|
args.max_shrinking,
|
||||||
|
args.max_expanding)
|
||||||
|
if not compactable:
|
||||||
|
print('No shards identified for compaction.')
|
||||||
|
return 0
|
||||||
|
|
||||||
|
for sequence in compactable:
|
||||||
|
if sequence[-1].state not in (ShardRange.ACTIVE, ShardRange.SHARDED):
|
||||||
|
print('ERROR: acceptor not in correct state: %s' % sequence[-1],
|
||||||
|
file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
if not args.yes:
|
||||||
|
for sequence in compactable:
|
||||||
|
acceptor = sequence[-1]
|
||||||
|
donors = sequence[:-1]
|
||||||
|
print('Shard %s (object count %d) can expand to accept %d objects '
|
||||||
|
'from:' %
|
||||||
|
(acceptor, acceptor.object_count, donors.object_count))
|
||||||
|
for donor in donors:
|
||||||
|
print(' shard %s (object count %d)' %
|
||||||
|
(donor, donor.object_count))
|
||||||
|
print('Once applied to the broker these changes will result in shard '
|
||||||
|
'range compaction the next time the sharder runs.')
|
||||||
|
choice = input('Do you want to apply these changes? [y/N]')
|
||||||
|
if choice != 'y':
|
||||||
|
print('No changes applied')
|
||||||
|
return 0
|
||||||
|
|
||||||
|
timestamp = Timestamp.now()
|
||||||
|
acceptor_ranges, shrinking_ranges = process_compactable_shard_sequences(
|
||||||
|
compactable, timestamp)
|
||||||
|
finalize_shrinking(broker, acceptor_ranges, shrinking_ranges, timestamp)
|
||||||
|
print('Updated %s shard sequences for compaction.' % len(compactable))
|
||||||
|
print('Run container-replicator to replicate the changes to other '
|
||||||
|
'nodes.')
|
||||||
|
print('Run container-sharder on all nodes to compact shards.')
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def _positive_int(arg):
|
||||||
|
val = int(arg)
|
||||||
|
if val <= 0:
|
||||||
|
raise argparse.ArgumentTypeError('must be > 0')
|
||||||
|
return val
|
||||||
|
|
||||||
|
|
||||||
def _add_find_args(parser):
|
def _add_find_args(parser):
|
||||||
parser.add_argument('rows_per_shard', nargs='?', type=int, default=500000)
|
parser.add_argument('rows_per_shard', nargs='?', type=int,
|
||||||
|
default=DEFAULT_ROWS_PER_SHARD)
|
||||||
|
|
||||||
|
|
||||||
def _add_replace_args(parser):
|
def _add_replace_args(parser):
|
||||||
@ -500,6 +576,50 @@ def _make_parser():
|
|||||||
_add_enable_args(enable_parser)
|
_add_enable_args(enable_parser)
|
||||||
enable_parser.set_defaults(func=enable_sharding)
|
enable_parser.set_defaults(func=enable_sharding)
|
||||||
_add_replace_args(enable_parser)
|
_add_replace_args(enable_parser)
|
||||||
|
|
||||||
|
# compact
|
||||||
|
compact_parser = subparsers.add_parser(
|
||||||
|
'compact',
|
||||||
|
help='Compact shard ranges with less than the shrink-threshold number '
|
||||||
|
'of rows. This command only works on root containers.')
|
||||||
|
compact_parser.add_argument(
|
||||||
|
'--yes', '-y', action='store_true', default=False,
|
||||||
|
help='Apply shard range changes to broker without prompting.')
|
||||||
|
compact_parser.add_argument('--shrink-threshold', nargs='?',
|
||||||
|
type=_positive_int,
|
||||||
|
default=DEFAULT_SHRINK_THRESHOLD,
|
||||||
|
help='The number of rows below which a shard '
|
||||||
|
'can qualify for shrinking. Defaults to '
|
||||||
|
'%d' % DEFAULT_SHRINK_THRESHOLD)
|
||||||
|
compact_parser.add_argument('--expansion-limit', nargs='?',
|
||||||
|
type=_positive_int,
|
||||||
|
default=DEFAULT_ROWS_PER_SHARD,
|
||||||
|
help='Maximum number of rows for an expanding '
|
||||||
|
'shard to have after compaction has '
|
||||||
|
'completed. Defaults to %d' %
|
||||||
|
DEFAULT_ROWS_PER_SHARD)
|
||||||
|
# If just one donor shard is chosen to shrink to an acceptor then the
|
||||||
|
# expanded acceptor will handle object listings as soon as the donor shard
|
||||||
|
# has shrunk. If more than one donor shard are chosen to shrink to an
|
||||||
|
# acceptor then the acceptor may not handle object listings for some donor
|
||||||
|
# shards that have shrunk until *all* donors have shrunk, resulting in
|
||||||
|
# temporary gap(s) in object listings where the shrunk donors are missing.
|
||||||
|
compact_parser.add_argument('--max-shrinking', nargs='?',
|
||||||
|
type=_positive_int,
|
||||||
|
default=DEFAULT_MAX_SHRINKING,
|
||||||
|
help='Maximum number of shards that should be '
|
||||||
|
'shrunk into each expanding shard. '
|
||||||
|
'Defaults to 1. Using values greater '
|
||||||
|
'than 1 may result in temporary gaps in '
|
||||||
|
'object listings until all selected '
|
||||||
|
'shards have shrunk.')
|
||||||
|
compact_parser.add_argument('--max-expanding', nargs='?',
|
||||||
|
type=_positive_int,
|
||||||
|
default=DEFAULT_MAX_EXPANDING,
|
||||||
|
help='Maximum number of shards that should be '
|
||||||
|
'expanded. Defaults to unlimited.')
|
||||||
|
compact_parser.set_defaults(func=compact_shard_ranges)
|
||||||
|
|
||||||
return parser
|
return parser
|
||||||
|
|
||||||
|
|
||||||
|
@ -89,7 +89,7 @@ def print_own_shard_range(node, sr, indent_level):
|
|||||||
indent = indent_level * TAB
|
indent = indent_level * TAB
|
||||||
range = '%r - %r' % (sr.lower, sr.upper)
|
range = '%r - %r' % (sr.lower, sr.upper)
|
||||||
print('%s(%s) %23s, objs: %3s, bytes: %3s, timestamp: %s (%s), '
|
print('%s(%s) %23s, objs: %3s, bytes: %3s, timestamp: %s (%s), '
|
||||||
'modified: %s (%s), %7s: %s (%s), deleted: %s epoch: %s' %
|
'modified: %s (%s), %7s: %s (%s), deleted: %s, epoch: %s' %
|
||||||
(indent, node[1][0], range, sr.object_count, sr.bytes_used,
|
(indent, node[1][0], range, sr.object_count, sr.bytes_used,
|
||||||
sr.timestamp.isoformat, sr.timestamp.internal,
|
sr.timestamp.isoformat, sr.timestamp.internal,
|
||||||
sr.meta_timestamp.isoformat, sr.meta_timestamp.internal,
|
sr.meta_timestamp.isoformat, sr.meta_timestamp.internal,
|
||||||
@ -108,12 +108,13 @@ def print_shard_range(node, sr, indent_level):
|
|||||||
indent = indent_level * TAB
|
indent = indent_level * TAB
|
||||||
range = '%r - %r' % (sr.lower, sr.upper)
|
range = '%r - %r' % (sr.lower, sr.upper)
|
||||||
print('%s(%s) %23s, objs: %3s, bytes: %3s, timestamp: %s (%s), '
|
print('%s(%s) %23s, objs: %3s, bytes: %3s, timestamp: %s (%s), '
|
||||||
'modified: %s (%s), %7s: %s (%s), deleted: %s %s' %
|
'modified: %s (%s), %7s: %s (%s), deleted: %s, epoch: %s %s' %
|
||||||
(indent, node[1][0], range, sr.object_count, sr.bytes_used,
|
(indent, node[1][0], range, sr.object_count, sr.bytes_used,
|
||||||
sr.timestamp.isoformat, sr.timestamp.internal,
|
sr.timestamp.isoformat, sr.timestamp.internal,
|
||||||
sr.meta_timestamp.isoformat, sr.meta_timestamp.internal,
|
sr.meta_timestamp.isoformat, sr.meta_timestamp.internal,
|
||||||
sr.state_text, sr.state_timestamp.isoformat,
|
sr.state_text, sr.state_timestamp.isoformat,
|
||||||
sr.state_timestamp.internal, sr.deleted, sr.name))
|
sr.state_timestamp.internal, sr.deleted,
|
||||||
|
sr.epoch.internal if sr.epoch else None, sr.name))
|
||||||
|
|
||||||
|
|
||||||
def print_shard_range_info(node, shard_ranges, indent_level=0):
|
def print_shard_range_info(node, shard_ranges, indent_level=0):
|
||||||
|
@ -82,6 +82,7 @@ from six.moves.configparser import (ConfigParser, NoSectionError,
|
|||||||
from six.moves import range, http_client
|
from six.moves import range, http_client
|
||||||
from six.moves.urllib.parse import quote as _quote, unquote
|
from six.moves.urllib.parse import quote as _quote, unquote
|
||||||
from six.moves.urllib.parse import urlparse
|
from six.moves.urllib.parse import urlparse
|
||||||
|
from six.moves import UserList
|
||||||
|
|
||||||
from swift import gettext_ as _
|
from swift import gettext_ as _
|
||||||
import swift.common.exceptions
|
import swift.common.exceptions
|
||||||
@ -5483,6 +5484,105 @@ class ShardRange(object):
|
|||||||
params['state_timestamp'], params['epoch'],
|
params['state_timestamp'], params['epoch'],
|
||||||
params.get('reported', 0))
|
params.get('reported', 0))
|
||||||
|
|
||||||
|
def expand(self, donors):
|
||||||
|
"""
|
||||||
|
Expands the bounds as necessary to match the minimum and maximum bounds
|
||||||
|
of the given donors.
|
||||||
|
|
||||||
|
:param donors: A list of :class:`~swift.common.utils.ShardRange`
|
||||||
|
:return: True if the bounds have been modified, False otherwise.
|
||||||
|
"""
|
||||||
|
modified = False
|
||||||
|
new_lower = self.lower
|
||||||
|
new_upper = self.upper
|
||||||
|
for donor in donors:
|
||||||
|
new_lower = min(new_lower, donor.lower)
|
||||||
|
new_upper = max(new_upper, donor.upper)
|
||||||
|
if self.lower > new_lower or self.upper < new_upper:
|
||||||
|
self.lower = new_lower
|
||||||
|
self.upper = new_upper
|
||||||
|
modified = True
|
||||||
|
return modified
|
||||||
|
|
||||||
|
|
||||||
|
class ShardRangeList(UserList):
|
||||||
|
"""
|
||||||
|
This class provides some convenience functions for working with lists of
|
||||||
|
:class:`~swift.common.utils.ShardRange`.
|
||||||
|
|
||||||
|
This class does not enforce ordering or continuity of the list items:
|
||||||
|
callers should ensure that items are added in order as appropriate.
|
||||||
|
"""
|
||||||
|
def __getitem__(self, index):
|
||||||
|
# workaround for py3 - not needed for py2.7,py3.8
|
||||||
|
result = self.data[index]
|
||||||
|
return ShardRangeList(result) if type(result) == list else result
|
||||||
|
|
||||||
|
@property
|
||||||
|
def lower(self):
|
||||||
|
"""
|
||||||
|
Returns the lower bound of the first item in the list. Note: this will
|
||||||
|
only be equal to the lowest bound of all items in the list if the list
|
||||||
|
contents has been sorted.
|
||||||
|
|
||||||
|
:return: lower bound of first item in the list, or ShardRange.MIN
|
||||||
|
if the list is empty.
|
||||||
|
"""
|
||||||
|
if not self:
|
||||||
|
# empty list has range MIN->MIN
|
||||||
|
return ShardRange.MIN
|
||||||
|
return self[0].lower
|
||||||
|
|
||||||
|
@property
|
||||||
|
def upper(self):
|
||||||
|
"""
|
||||||
|
Returns the upper bound of the first item in the list. Note: this will
|
||||||
|
only be equal to the uppermost bound of all items in the list if the
|
||||||
|
list has previously been sorted.
|
||||||
|
|
||||||
|
:return: upper bound of first item in the list, or ShardRange.MIN
|
||||||
|
if the list is empty.
|
||||||
|
"""
|
||||||
|
if not self:
|
||||||
|
# empty list has range MIN->MIN
|
||||||
|
return ShardRange.MIN
|
||||||
|
return self[-1].upper
|
||||||
|
|
||||||
|
@property
|
||||||
|
def object_count(self):
|
||||||
|
"""
|
||||||
|
Returns the total number of objects of all items in the list.
|
||||||
|
|
||||||
|
:return: total object count
|
||||||
|
"""
|
||||||
|
return sum(sr.object_count for sr in self)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def bytes_used(self):
|
||||||
|
"""
|
||||||
|
Returns the total number of bytes in all items in the list.
|
||||||
|
|
||||||
|
:return: total bytes used
|
||||||
|
"""
|
||||||
|
return sum(sr.bytes_used for sr in self)
|
||||||
|
|
||||||
|
def includes(self, other):
|
||||||
|
"""
|
||||||
|
Check if another ShardRange namespace is enclosed between the list's
|
||||||
|
``lower`` and ``upper`` properties. Note: the list's ``lower`` and
|
||||||
|
``upper`` properties will only equal the outermost bounds of all items
|
||||||
|
in the list if the list has previously been sorted.
|
||||||
|
|
||||||
|
Note: the list does not need to contain an item matching ``other`` for
|
||||||
|
this method to return True, although if the list has been sorted and
|
||||||
|
does contain an item matching ``other`` then the method will return
|
||||||
|
True.
|
||||||
|
|
||||||
|
:param other: an instance of :class:`~swift.common.utils.ShardRange`
|
||||||
|
:return: True if other's namespace is enclosed, False otherwise.
|
||||||
|
"""
|
||||||
|
return self.lower <= other.lower and self.upper >= other.upper
|
||||||
|
|
||||||
|
|
||||||
def find_shard_range(item, ranges):
|
def find_shard_range(item, ranges):
|
||||||
"""
|
"""
|
||||||
|
@ -35,7 +35,8 @@ from swift.common.swob import str_to_wsgi
|
|||||||
from swift.common.utils import get_logger, config_true_value, \
|
from swift.common.utils import get_logger, config_true_value, \
|
||||||
dump_recon_cache, whataremyips, Timestamp, ShardRange, GreenAsyncPile, \
|
dump_recon_cache, whataremyips, Timestamp, ShardRange, GreenAsyncPile, \
|
||||||
config_float_value, config_positive_int_value, \
|
config_float_value, config_positive_int_value, \
|
||||||
quorum_size, parse_override_options, Everything, config_auto_int_value
|
quorum_size, parse_override_options, Everything, config_auto_int_value, \
|
||||||
|
ShardRangeList
|
||||||
from swift.container.backend import ContainerBroker, \
|
from swift.container.backend import ContainerBroker, \
|
||||||
RECORD_TYPE_SHARD, UNSHARDED, SHARDING, SHARDED, COLLAPSED, \
|
RECORD_TYPE_SHARD, UNSHARDED, SHARDING, SHARDED, COLLAPSED, \
|
||||||
SHARD_UPDATE_STATES
|
SHARD_UPDATE_STATES
|
||||||
@ -146,6 +147,42 @@ def find_sharding_candidates(broker, threshold, shard_ranges=None):
|
|||||||
|
|
||||||
|
|
||||||
def find_shrinking_candidates(broker, shrink_threshold, merge_size):
|
def find_shrinking_candidates(broker, shrink_threshold, merge_size):
|
||||||
|
# this is only here to preserve a legacy public function signature;
|
||||||
|
# superseded by find_compactable_shard_sequences
|
||||||
|
merge_pairs = {}
|
||||||
|
# restrict search to sequences with one donor
|
||||||
|
results = find_compactable_shard_sequences(broker, shrink_threshold,
|
||||||
|
merge_size, 1, -1)
|
||||||
|
for sequence in results:
|
||||||
|
# map acceptor -> donor list
|
||||||
|
merge_pairs[sequence[-1]] = sequence[-2]
|
||||||
|
return merge_pairs
|
||||||
|
|
||||||
|
|
||||||
|
def find_compactable_shard_sequences(broker,
|
||||||
|
shrink_threshold,
|
||||||
|
merge_size,
|
||||||
|
max_shrinking,
|
||||||
|
max_expanding):
|
||||||
|
"""
|
||||||
|
Find sequences of shard ranges that could be compacted into a single
|
||||||
|
acceptor shard range.
|
||||||
|
|
||||||
|
This function does not modify shard ranges.
|
||||||
|
|
||||||
|
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
|
||||||
|
:param shrink_threshold: the number of rows below which a shard may be
|
||||||
|
considered for shrinking into another shard
|
||||||
|
:param merge_size: the maximum number of rows that an acceptor shard range
|
||||||
|
should have after other shard ranges have been compacted into it
|
||||||
|
:param max_shrinking: the maximum number of shard ranges that should be
|
||||||
|
compacted into each acceptor; -1 implies unlimited.
|
||||||
|
:param max_expanding: the maximum number of acceptors to be found (i.e. the
|
||||||
|
maximum number of sequences to be returned); -1 implies unlimited.
|
||||||
|
:returns: A list of :class:`~swift.common.utils.ShardRangeList` each
|
||||||
|
containing a sequence of neighbouring shard ranges that may be
|
||||||
|
compacted; the final shard range in the list is the acceptor
|
||||||
|
"""
|
||||||
# this should only execute on root containers that have sharded; the
|
# this should only execute on root containers that have sharded; the
|
||||||
# goal is to find small shard containers that could be retired by
|
# goal is to find small shard containers that could be retired by
|
||||||
# merging with a neighbour.
|
# merging with a neighbour.
|
||||||
@ -158,47 +195,125 @@ def find_shrinking_candidates(broker, shrink_threshold, merge_size):
|
|||||||
# a neighbour. We may need to expose row count as well as object count.
|
# a neighbour. We may need to expose row count as well as object count.
|
||||||
shard_ranges = broker.get_shard_ranges()
|
shard_ranges = broker.get_shard_ranges()
|
||||||
own_shard_range = broker.get_own_shard_range()
|
own_shard_range = broker.get_own_shard_range()
|
||||||
if len(shard_ranges) == 1:
|
|
||||||
# special case to enable final shard to shrink into root
|
|
||||||
shard_ranges.append(own_shard_range)
|
|
||||||
|
|
||||||
merge_pairs = {}
|
def sequence_complete(sequence):
|
||||||
for donor, acceptor in zip(shard_ranges, shard_ranges[1:]):
|
# a sequence is considered complete if any of the following are true:
|
||||||
if donor in merge_pairs:
|
# - the final shard range has more objects than the shrink_threshold,
|
||||||
# this range may already have been made an acceptor; if so then
|
# so should not be shrunk (this shard will be the acceptor)
|
||||||
# move on. In principle it might be that even after expansion
|
# - the max number of shard ranges to be compacted (max_shrinking) has
|
||||||
# this range and its donor(s) could all be merged with the next
|
# been reached
|
||||||
# range. In practice it is much easier to reason about a single
|
# - the total number of objects in the sequence has reached the
|
||||||
# donor merging into a single acceptor. Don't fret - eventually
|
# merge_size
|
||||||
# all the small ranges will be retired.
|
if (sequence and
|
||||||
continue
|
(sequence[-1].object_count >= shrink_threshold or
|
||||||
if (acceptor.name != own_shard_range.name and
|
0 < max_shrinking < len(sequence) or
|
||||||
acceptor.state != ShardRange.ACTIVE):
|
sequence.object_count >= merge_size)):
|
||||||
# don't shrink into a range that is not yet ACTIVE
|
return True
|
||||||
continue
|
return False
|
||||||
if donor.state not in (ShardRange.ACTIVE, ShardRange.SHRINKING):
|
|
||||||
# found? created? sharded? don't touch it
|
|
||||||
continue
|
|
||||||
|
|
||||||
proposed_object_count = donor.object_count + acceptor.object_count
|
def find_compactable_sequence(shard_ranges_todo):
|
||||||
if (donor.state == ShardRange.SHRINKING or
|
compactable_sequence = ShardRangeList()
|
||||||
(donor.object_count < shrink_threshold and
|
object_count = 0
|
||||||
proposed_object_count < merge_size)):
|
consumed = 0
|
||||||
# include previously identified merge pairs on presumption that
|
for shard_range in shard_ranges_todo:
|
||||||
# following shrink procedure is idempotent
|
if (compactable_sequence and
|
||||||
merge_pairs[acceptor] = donor
|
compactable_sequence.upper < shard_range.lower):
|
||||||
if donor.update_state(ShardRange.SHRINKING):
|
# found a gap! break before consuming this range because it
|
||||||
# Set donor state to shrinking so that next cycle won't use
|
# could become the first in the next sequence
|
||||||
# it as an acceptor; state_timestamp defines new epoch for
|
break
|
||||||
# donor and new timestamp for the expanded acceptor below.
|
consumed += 1
|
||||||
donor.epoch = donor.state_timestamp = Timestamp.now()
|
if (shard_range.name != own_shard_range.name and
|
||||||
if acceptor.lower != donor.lower:
|
shard_range.state not in (ShardRange.ACTIVE,
|
||||||
# Update the acceptor container with its expanding state to
|
ShardRange.SHRINKING)):
|
||||||
# prevent it treating objects cleaved from the donor
|
# found? created? sharded? don't touch it
|
||||||
# as misplaced.
|
break
|
||||||
acceptor.lower = donor.lower
|
proposed_object_count = object_count + shard_range.object_count
|
||||||
acceptor.timestamp = donor.state_timestamp
|
if (shard_range.state == ShardRange.SHRINKING or
|
||||||
return merge_pairs
|
proposed_object_count <= merge_size):
|
||||||
|
compactable_sequence.append(shard_range)
|
||||||
|
object_count += shard_range.object_count
|
||||||
|
if shard_range.state == ShardRange.SHRINKING:
|
||||||
|
continue
|
||||||
|
if sequence_complete(compactable_sequence):
|
||||||
|
break
|
||||||
|
return compactable_sequence, consumed
|
||||||
|
|
||||||
|
compactable_sequences = []
|
||||||
|
index = 0
|
||||||
|
while ((max_expanding < 0 or
|
||||||
|
len(compactable_sequences) < max_expanding) and
|
||||||
|
index < len(shard_ranges)):
|
||||||
|
sequence, consumed = find_compactable_sequence(shard_ranges[index:])
|
||||||
|
index += consumed
|
||||||
|
if (index == len(shard_ranges) and
|
||||||
|
not compactable_sequences and
|
||||||
|
not sequence_complete(sequence) and
|
||||||
|
sequence.includes(own_shard_range)):
|
||||||
|
# special case: only one sequence has been found, which encompasses
|
||||||
|
# the entire namespace, has no more than merge_size records and
|
||||||
|
# whose shard ranges are all shrinkable; all the shards in the
|
||||||
|
# sequence can be shrunk to the root, so append own_shard_range to
|
||||||
|
# the sequence to act as an acceptor; note: only shrink to the root
|
||||||
|
# when *all* the remaining shard ranges can be simultaneously
|
||||||
|
# shrunk to the root.
|
||||||
|
sequence.append(own_shard_range)
|
||||||
|
compactable_sequences.append(sequence)
|
||||||
|
elif len(sequence) > 1 and sequence[-1].state == ShardRange.ACTIVE:
|
||||||
|
compactable_sequences.append(sequence)
|
||||||
|
# else: this sequence doesn't end with a suitable acceptor shard range
|
||||||
|
|
||||||
|
return compactable_sequences
|
||||||
|
|
||||||
|
|
||||||
|
def finalize_shrinking(broker, acceptor_ranges, donor_ranges, timestamp):
|
||||||
|
"""
|
||||||
|
Update donor shard ranges to shrinking state and merge donors and acceptors
|
||||||
|
to broker.
|
||||||
|
|
||||||
|
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
|
||||||
|
:param acceptor_ranges: A list of :class:`~swift.common.utils.ShardRange`
|
||||||
|
that are to be acceptors.
|
||||||
|
:param donor_ranges: A list of :class:`~swift.common.utils.ShardRange`
|
||||||
|
that are to be donors; these will have their state and timestamp
|
||||||
|
updated.
|
||||||
|
:param timestamp: timestamp to use when updating donor state
|
||||||
|
"""
|
||||||
|
for donor in donor_ranges:
|
||||||
|
if donor.update_state(ShardRange.SHRINKING):
|
||||||
|
# Set donor state to shrinking state_timestamp defines new epoch
|
||||||
|
donor.epoch = donor.state_timestamp = timestamp
|
||||||
|
broker.merge_shard_ranges(acceptor_ranges + donor_ranges)
|
||||||
|
|
||||||
|
|
||||||
|
def process_compactable_shard_sequences(sequences, timestamp):
|
||||||
|
"""
|
||||||
|
Transform the given sequences of shard ranges into a list of acceptors and
|
||||||
|
a list of shrinking donors. For each given sequence the final ShardRange in
|
||||||
|
the sequence (the acceptor) is expanded to accommodate the other
|
||||||
|
ShardRanges in the sequence (the donors).
|
||||||
|
|
||||||
|
:param sequences: A list of :class:`~swift.common.utils.ShardRangeList`
|
||||||
|
:param timestamp: an instance of :class:`~swift.common.utils.Timestamp`
|
||||||
|
that is used when updating acceptor range bounds or state
|
||||||
|
:return: a tuple (acceptor_ranges, shrinking_ranges)
|
||||||
|
"""
|
||||||
|
acceptor_ranges = []
|
||||||
|
shrinking_ranges = []
|
||||||
|
for sequence in sequences:
|
||||||
|
donors = sequence[:-1]
|
||||||
|
shrinking_ranges.extend(donors)
|
||||||
|
# Update the acceptor container with its expanded bounds to prevent it
|
||||||
|
# treating objects cleaved from the donor as misplaced.
|
||||||
|
acceptor = sequence[-1]
|
||||||
|
if acceptor.expand(donors):
|
||||||
|
# Update the acceptor container with its expanded bounds to prevent
|
||||||
|
# it treating objects cleaved from the donor as misplaced.
|
||||||
|
acceptor.timestamp = timestamp
|
||||||
|
if acceptor.update_state(ShardRange.ACTIVE):
|
||||||
|
# Ensure acceptor state is ACTIVE (when acceptor is root)
|
||||||
|
acceptor.state_timestamp = timestamp
|
||||||
|
acceptor_ranges.append(acceptor)
|
||||||
|
return acceptor_ranges, shrinking_ranges
|
||||||
|
|
||||||
|
|
||||||
class CleavingContext(object):
|
class CleavingContext(object):
|
||||||
@ -1509,31 +1624,36 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
quote(broker.path))
|
quote(broker.path))
|
||||||
return
|
return
|
||||||
|
|
||||||
merge_pairs = find_shrinking_candidates(
|
compactable_sequences = find_compactable_shard_sequences(
|
||||||
broker, self.shrink_size, self.merge_size)
|
broker, self.shrink_size, self.merge_size, 1, -1)
|
||||||
self.logger.debug('Found %s shrinking candidates' % len(merge_pairs))
|
self.logger.debug('Found %s compactable sequences of length(s) %s' %
|
||||||
|
(len(compactable_sequences),
|
||||||
|
[len(s) for s in compactable_sequences]))
|
||||||
|
timestamp = Timestamp.now()
|
||||||
|
acceptors, donors = process_compactable_shard_sequences(
|
||||||
|
compactable_sequences, timestamp)
|
||||||
|
finalize_shrinking(broker, acceptors, donors, timestamp)
|
||||||
own_shard_range = broker.get_own_shard_range()
|
own_shard_range = broker.get_own_shard_range()
|
||||||
for acceptor, donor in merge_pairs.items():
|
for sequence in compactable_sequences:
|
||||||
self.logger.debug('shrinking shard range %s into %s in %s' %
|
acceptor = sequence[-1]
|
||||||
(donor, acceptor, broker.db_file))
|
donors = ShardRangeList(sequence[:-1])
|
||||||
broker.merge_shard_ranges([acceptor, donor])
|
self.logger.debug(
|
||||||
|
'shrinking %d objects from %d shard ranges into %s in %s' %
|
||||||
|
(donors.object_count, len(donors), acceptor, broker.db_file))
|
||||||
if acceptor.name != own_shard_range.name:
|
if acceptor.name != own_shard_range.name:
|
||||||
self._send_shard_ranges(
|
self._send_shard_ranges(
|
||||||
acceptor.account, acceptor.container, [acceptor])
|
acceptor.account, acceptor.container, [acceptor])
|
||||||
acceptor.increment_meta(donor.object_count, donor.bytes_used)
|
acceptor.increment_meta(donors.object_count, donors.bytes_used)
|
||||||
else:
|
|
||||||
# no need to change namespace or stats
|
|
||||||
acceptor.update_state(ShardRange.ACTIVE,
|
|
||||||
state_timestamp=Timestamp.now())
|
|
||||||
# Now send a copy of the expanded acceptor, with an updated
|
# Now send a copy of the expanded acceptor, with an updated
|
||||||
# timestamp, to the donor container. This forces the donor to
|
# timestamp, to each donor container. This forces each donor to
|
||||||
# asynchronously cleave its entire contents to the acceptor and
|
# asynchronously cleave its entire contents to the acceptor and
|
||||||
# delete itself. The donor will pass its own deleted shard range to
|
# delete itself. The donor will pass its own deleted shard range to
|
||||||
# the acceptor when cleaving. Subsequent updates from the donor or
|
# the acceptor when cleaving. Subsequent updates from the donor or
|
||||||
# the acceptor will then update the root to have the deleted donor
|
# the acceptor will then update the root to have the deleted donor
|
||||||
# shard range.
|
# shard range.
|
||||||
self._send_shard_ranges(
|
for donor in donors:
|
||||||
donor.account, donor.container, [donor, acceptor])
|
self._send_shard_ranges(
|
||||||
|
donor.account, donor.container, [donor, acceptor])
|
||||||
|
|
||||||
def _update_root_container(self, broker):
|
def _update_root_container(self, broker):
|
||||||
own_shard_range = broker.get_own_shard_range(no_default=True)
|
own_shard_range = broker.get_own_shard_range(no_default=True)
|
||||||
|
@ -2549,28 +2549,79 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
|||||||
self.assert_container_state(self.brain.nodes[2], 'sharded', 2)
|
self.assert_container_state(self.brain.nodes[2], 'sharded', 2)
|
||||||
self.assert_container_listing(obj_names)
|
self.assert_container_listing(obj_names)
|
||||||
|
|
||||||
# Let's pretend that some actor in the system has determined that all
|
def test_manage_shard_ranges_compact(self):
|
||||||
# the shard ranges should shrink back to root
|
# verify shard range compaction using swift-manage-shard-ranges
|
||||||
# TODO: replace this db manipulation if/when manage_shard_ranges can
|
obj_names = self._make_object_names(8)
|
||||||
# manage shrinking...
|
self.put_objects(obj_names)
|
||||||
broker = self.get_broker(self.brain.part, self.brain.nodes[0])
|
client.post_container(self.url, self.admin_token, self.container_name,
|
||||||
shard_ranges = broker.get_shard_ranges()
|
headers={'X-Container-Sharding': 'on'})
|
||||||
self.assertEqual(2, len(shard_ranges))
|
# run replicators first time to get sync points set, and get container
|
||||||
for sr in shard_ranges:
|
# sharded into 4 shards
|
||||||
self.assertTrue(sr.update_state(ShardRange.SHRINKING))
|
self.replicators.once()
|
||||||
sr.epoch = sr.state_timestamp = Timestamp.now()
|
subprocess.check_output([
|
||||||
own_sr = broker.get_own_shard_range()
|
'swift-manage-shard-ranges',
|
||||||
own_sr.update_state(ShardRange.ACTIVE, state_timestamp=Timestamp.now())
|
self.get_db_file(self.brain.part, self.brain.nodes[0]),
|
||||||
broker.merge_shard_ranges(shard_ranges + [own_sr])
|
'find_and_replace', '2', '--enable'], stderr=subprocess.STDOUT)
|
||||||
|
self.assert_container_state(self.brain.nodes[0], 'unsharded', 4)
|
||||||
|
self.replicators.once()
|
||||||
|
# run sharders twice to cleave all 4 shard ranges
|
||||||
|
self.sharders_once(additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
self.sharders_once(additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
self.assert_container_state(self.brain.nodes[0], 'sharded', 4)
|
||||||
|
self.assert_container_state(self.brain.nodes[1], 'sharded', 4)
|
||||||
|
self.assert_container_state(self.brain.nodes[2], 'sharded', 4)
|
||||||
|
self.assert_container_listing(obj_names)
|
||||||
|
|
||||||
# replicate and run sharders
|
# now compact some ranges; use --max-shrinking to allow 2 shrinking
|
||||||
|
# shards
|
||||||
|
subprocess.check_output([
|
||||||
|
'swift-manage-shard-ranges',
|
||||||
|
self.get_db_file(self.brain.part, self.brain.nodes[0]),
|
||||||
|
'compact', '--max-expanding', '1', '--max-shrinking', '2',
|
||||||
|
'--yes'],
|
||||||
|
stderr=subprocess.STDOUT)
|
||||||
|
shard_ranges = self.assert_container_state(
|
||||||
|
self.brain.nodes[0], 'sharded', 4)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING] * 2 + [ShardRange.ACTIVE] * 2,
|
||||||
|
[sr.state for sr in shard_ranges])
|
||||||
self.replicators.once()
|
self.replicators.once()
|
||||||
self.sharders_once()
|
self.sharders_once()
|
||||||
|
# check there's now just 2 remaining shard ranges
|
||||||
|
shard_ranges = self.assert_container_state(
|
||||||
|
self.brain.nodes[0], 'sharded', 2)
|
||||||
|
self.assertEqual([ShardRange.ACTIVE] * 2,
|
||||||
|
[sr.state for sr in shard_ranges])
|
||||||
|
self.assert_container_listing(obj_names, req_hdrs={'X-Newest': 'True'})
|
||||||
|
|
||||||
|
# root container own shard range should still be SHARDED
|
||||||
|
for i, node in enumerate(self.brain.nodes):
|
||||||
|
with annotate_failure('node[%d]' % i):
|
||||||
|
broker = self.get_broker(self.brain.part, self.brain.nodes[0])
|
||||||
|
self.assertEqual(ShardRange.SHARDED,
|
||||||
|
broker.get_own_shard_range().state)
|
||||||
|
|
||||||
|
# now compact the final two shard ranges to the root; use
|
||||||
|
# --max-shrinking to allow 2 shrinking shards
|
||||||
|
subprocess.check_output([
|
||||||
|
'swift-manage-shard-ranges',
|
||||||
|
self.get_db_file(self.brain.part, self.brain.nodes[0]),
|
||||||
|
'compact', '--yes', '--max-shrinking', '2'],
|
||||||
|
stderr=subprocess.STDOUT)
|
||||||
|
shard_ranges = self.assert_container_state(
|
||||||
|
self.brain.nodes[0], 'sharded', 2)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING] * 2,
|
||||||
|
[sr.state for sr in shard_ranges])
|
||||||
|
self.replicators.once()
|
||||||
|
self.sharders_once()
|
||||||
self.assert_container_state(self.brain.nodes[0], 'collapsed', 0)
|
self.assert_container_state(self.brain.nodes[0], 'collapsed', 0)
|
||||||
self.assert_container_state(self.brain.nodes[1], 'collapsed', 0)
|
self.assert_container_listing(obj_names, req_hdrs={'X-Newest': 'True'})
|
||||||
self.assert_container_state(self.brain.nodes[2], 'collapsed', 0)
|
|
||||||
self.assert_container_listing(obj_names)
|
# root container own shard range should now be ACTIVE
|
||||||
|
for i, node in enumerate(self.brain.nodes):
|
||||||
|
with annotate_failure('node[%d]' % i):
|
||||||
|
broker = self.get_broker(self.brain.part, self.brain.nodes[0])
|
||||||
|
self.assertEqual(ShardRange.ACTIVE,
|
||||||
|
broker.get_own_shard_range().state)
|
||||||
|
|
||||||
def test_manage_shard_ranges_used_poorly(self):
|
def test_manage_shard_ranges_used_poorly(self):
|
||||||
obj_names = self._make_object_names(8)
|
obj_names = self._make_object_names(8)
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
from shutil import rmtree
|
from shutil import rmtree
|
||||||
from tempfile import mkdtemp
|
from tempfile import mkdtemp
|
||||||
@ -23,6 +24,7 @@ from swift.cli.manage_shard_ranges import main
|
|||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.utils import Timestamp, ShardRange
|
from swift.common.utils import Timestamp, ShardRange
|
||||||
from swift.container.backend import ContainerBroker
|
from swift.container.backend import ContainerBroker
|
||||||
|
from swift.container.sharder import make_shard_ranges
|
||||||
from test.unit import mock_timestamp_now
|
from test.unit import mock_timestamp_now
|
||||||
|
|
||||||
|
|
||||||
@ -32,7 +34,8 @@ class TestManageShardRanges(unittest.TestCase):
|
|||||||
utils.mkdirs(self.testdir)
|
utils.mkdirs(self.testdir)
|
||||||
rmtree(self.testdir)
|
rmtree(self.testdir)
|
||||||
self.shard_data = [
|
self.shard_data = [
|
||||||
{'index': 0, 'lower': '', 'upper': 'obj09', 'object_count': 10},
|
{'index': 0, 'lower': '', 'upper': 'obj09',
|
||||||
|
'object_count': 10},
|
||||||
{'index': 1, 'lower': 'obj09', 'upper': 'obj19',
|
{'index': 1, 'lower': 'obj09', 'upper': 'obj19',
|
||||||
'object_count': 10},
|
'object_count': 10},
|
||||||
{'index': 2, 'lower': 'obj19', 'upper': 'obj29',
|
{'index': 2, 'lower': 'obj19', 'upper': 'obj29',
|
||||||
@ -49,7 +52,8 @@ class TestManageShardRanges(unittest.TestCase):
|
|||||||
'object_count': 10},
|
'object_count': 10},
|
||||||
{'index': 8, 'lower': 'obj79', 'upper': 'obj89',
|
{'index': 8, 'lower': 'obj79', 'upper': 'obj89',
|
||||||
'object_count': 10},
|
'object_count': 10},
|
||||||
{'index': 9, 'lower': 'obj89', 'upper': '', 'object_count': 10},
|
{'index': 9, 'lower': 'obj89', 'upper': '',
|
||||||
|
'object_count': 10},
|
||||||
]
|
]
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
@ -79,6 +83,16 @@ class TestManageShardRanges(unittest.TestCase):
|
|||||||
broker.initialize()
|
broker.initialize()
|
||||||
return broker
|
return broker
|
||||||
|
|
||||||
|
def _move_broker_to_sharded_state(self, broker):
|
||||||
|
epoch = Timestamp.now()
|
||||||
|
broker.enable_sharding(epoch)
|
||||||
|
self.assertTrue(broker.set_sharding_state())
|
||||||
|
self.assertTrue(broker.set_sharded_state())
|
||||||
|
own_sr = broker.get_own_shard_range()
|
||||||
|
own_sr.update_state(ShardRange.SHARDED, epoch)
|
||||||
|
broker.merge_shard_ranges([own_sr])
|
||||||
|
return epoch
|
||||||
|
|
||||||
def test_find_shard_ranges(self):
|
def test_find_shard_ranges(self):
|
||||||
db_file = os.path.join(self.testdir, 'hash.db')
|
db_file = os.path.join(self.testdir, 'hash.db')
|
||||||
broker = ContainerBroker(db_file)
|
broker = ContainerBroker(db_file)
|
||||||
@ -380,3 +394,547 @@ class TestManageShardRanges(unittest.TestCase):
|
|||||||
self.assertEqual(expected, out.getvalue().splitlines())
|
self.assertEqual(expected, out.getvalue().splitlines())
|
||||||
self.assertEqual(['Loaded db broker for a/c.'],
|
self.assertEqual(['Loaded db broker for a/c.'],
|
||||||
err.getvalue().splitlines())
|
err.getvalue().splitlines())
|
||||||
|
|
||||||
|
def test_compact_bad_args(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
with self.assertRaises(SystemExit):
|
||||||
|
main([broker.db_file, 'compact', '--shrink-threshold', '0'])
|
||||||
|
with self.assertRaises(SystemExit):
|
||||||
|
main([broker.db_file, 'compact', '--expansion-limit', '0'])
|
||||||
|
with self.assertRaises(SystemExit):
|
||||||
|
main([broker.db_file, 'compact', '--max-shrinking', '0'])
|
||||||
|
with self.assertRaises(SystemExit):
|
||||||
|
main([broker.db_file, 'compact', '--max-expanding', '0'])
|
||||||
|
|
||||||
|
def test_compact_not_root(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
# make broker appear to not be a root container
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
broker.set_sharding_sysmeta('Quoted-Root', 'not_a/c')
|
||||||
|
self.assertFalse(broker.is_root_container())
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact'])
|
||||||
|
self.assertEqual(2, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['WARNING: Shard containers cannot be compacted.',
|
||||||
|
'This command should be used on a root container.'],
|
||||||
|
out_lines[:2]
|
||||||
|
)
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.FOUND] * 10,
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
|
||||||
|
def test_compact_not_sharded(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
# make broker appear to be a root container but it isn't sharded
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||||
|
self.assertTrue(broker.is_root_container())
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact'])
|
||||||
|
self.assertEqual(2, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['WARNING: Container is not yet sharded so cannot be compacted.'],
|
||||||
|
out_lines[:1])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.FOUND] * 10,
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
|
||||||
|
def test_compact_overlapping_shard_ranges(self):
|
||||||
|
# verify that containers with overlaps will not be compacted
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
for i, sr in enumerate(shard_ranges):
|
||||||
|
sr.update_state(ShardRange.ACTIVE)
|
||||||
|
shard_ranges[3].upper = shard_ranges[4].upper
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self._move_broker_to_sharded_state(broker)
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file,
|
||||||
|
'compact', '--yes', '--max-expanding', '10'])
|
||||||
|
self.assertEqual(2, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['WARNING: Container has overlapping shard ranges so cannot be '
|
||||||
|
'compacted.'],
|
||||||
|
out_lines[:1])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.ACTIVE] * 10,
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
|
||||||
|
def test_compact_shard_ranges_in_found_state(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self._move_broker_to_sharded_state(broker)
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['No shards identified for compaction.'],
|
||||||
|
out_lines[:1])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assertEqual([ShardRange.FOUND] * 10,
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
|
||||||
|
def test_compact_user_input(self):
|
||||||
|
# verify user input 'y' or 'n' is respected
|
||||||
|
small_ranges = (3, 4, 7)
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
for i, sr in enumerate(shard_ranges):
|
||||||
|
if i not in small_ranges:
|
||||||
|
sr.object_count = 100001
|
||||||
|
sr.update_state(ShardRange.ACTIVE)
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self._move_broker_to_sharded_state(broker)
|
||||||
|
|
||||||
|
def do_compact(user_input):
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out),\
|
||||||
|
mock.patch('sys.stderr', err), \
|
||||||
|
mock.patch('swift.cli.manage_shard_ranges.input',
|
||||||
|
return_value=user_input):
|
||||||
|
ret = main([broker.db_file, 'compact',
|
||||||
|
'--max-shrinking', '99'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertIn('can expand to accept 20 objects', out_lines[0])
|
||||||
|
self.assertIn('(object count 10)', out_lines[1])
|
||||||
|
self.assertIn('(object count 10)', out_lines[2])
|
||||||
|
self.assertIn('can expand to accept 10 objects', out_lines[3])
|
||||||
|
self.assertIn('(object count 10)', out_lines[4])
|
||||||
|
broker_ranges = broker.get_shard_ranges()
|
||||||
|
return broker_ranges
|
||||||
|
|
||||||
|
broker_ranges = do_compact('n')
|
||||||
|
# expect no changes to shard ranges
|
||||||
|
self.assertEqual(shard_ranges, broker_ranges)
|
||||||
|
for i, sr in enumerate(broker_ranges):
|
||||||
|
self.assertEqual(ShardRange.ACTIVE, sr.state)
|
||||||
|
|
||||||
|
broker_ranges = do_compact('y')
|
||||||
|
# expect updated shard ranges
|
||||||
|
shard_ranges[5].lower = shard_ranges[3].lower
|
||||||
|
shard_ranges[8].lower = shard_ranges[7].lower
|
||||||
|
self.assertEqual(shard_ranges, broker_ranges)
|
||||||
|
for i, sr in enumerate(broker_ranges):
|
||||||
|
if i in small_ranges:
|
||||||
|
self.assertEqual(ShardRange.SHRINKING, sr.state)
|
||||||
|
else:
|
||||||
|
self.assertEqual(ShardRange.ACTIVE, sr.state)
|
||||||
|
|
||||||
|
def test_compact_three_donors_two_acceptors(self):
|
||||||
|
small_ranges = (2, 3, 4, 7)
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
for i, sr in enumerate(shard_ranges):
|
||||||
|
if i not in small_ranges:
|
||||||
|
sr.object_count = 100001
|
||||||
|
sr.update_state(ShardRange.ACTIVE)
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self._move_broker_to_sharded_state(broker)
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact', '--yes',
|
||||||
|
'--max-shrinking', '99'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['Updated 2 shard sequences for compaction.'],
|
||||||
|
out_lines[:1])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
for i, sr in enumerate(updated_ranges):
|
||||||
|
if i in small_ranges:
|
||||||
|
self.assertEqual(ShardRange.SHRINKING, sr.state)
|
||||||
|
else:
|
||||||
|
self.assertEqual(ShardRange.ACTIVE, sr.state)
|
||||||
|
shard_ranges[5].lower = shard_ranges[2].lower
|
||||||
|
shard_ranges[8].lower = shard_ranges[7].lower
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
for i in (5, 8):
|
||||||
|
# acceptors should have updated timestamp
|
||||||
|
self.assertLess(shard_ranges[i].timestamp,
|
||||||
|
updated_ranges[i].timestamp)
|
||||||
|
# check idempotency
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact', '--yes',
|
||||||
|
'--max-shrinking', '99'])
|
||||||
|
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
for i, sr in enumerate(updated_ranges):
|
||||||
|
if i in small_ranges:
|
||||||
|
self.assertEqual(ShardRange.SHRINKING, sr.state)
|
||||||
|
else:
|
||||||
|
self.assertEqual(ShardRange.ACTIVE, sr.state)
|
||||||
|
|
||||||
|
def test_compact_all_donors_shrink_to_root(self):
|
||||||
|
# by default all shard ranges are small enough to shrink so the root
|
||||||
|
# becomes the acceptor
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
for i, sr in enumerate(shard_ranges):
|
||||||
|
sr.update_state(ShardRange.ACTIVE)
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
epoch = self._move_broker_to_sharded_state(broker)
|
||||||
|
own_sr = broker.get_own_shard_range(no_default=True)
|
||||||
|
self.assertEqual(epoch, own_sr.state_timestamp) # sanity check
|
||||||
|
self.assertEqual(ShardRange.SHARDED, own_sr.state) # sanity check
|
||||||
|
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact', '--yes',
|
||||||
|
'--max-shrinking', '99'])
|
||||||
|
self.assertEqual(0, ret, 'stdout:\n%s\nstderr\n%s' %
|
||||||
|
(out.getvalue(), err.getvalue()))
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['Updated 1 shard sequences for compaction.'],
|
||||||
|
out_lines[:1])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING] * 10,
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
updated_own_sr = broker.get_own_shard_range(no_default=True)
|
||||||
|
self.assertEqual(own_sr.timestamp, updated_own_sr.timestamp)
|
||||||
|
self.assertEqual(own_sr.epoch, updated_own_sr.epoch)
|
||||||
|
self.assertLess(own_sr.state_timestamp,
|
||||||
|
updated_own_sr.state_timestamp)
|
||||||
|
self.assertEqual(ShardRange.ACTIVE, updated_own_sr.state)
|
||||||
|
|
||||||
|
def test_compact_single_donor_shrink_to_root(self):
|
||||||
|
# single shard range small enough to shrink so the root becomes the
|
||||||
|
# acceptor
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_data = [
|
||||||
|
{'index': 0, 'lower': '', 'upper': '', 'object_count': 10}
|
||||||
|
]
|
||||||
|
|
||||||
|
shard_ranges = make_shard_ranges(broker, shard_data, '.shards_')
|
||||||
|
shard_ranges[0].update_state(ShardRange.ACTIVE)
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
epoch = self._move_broker_to_sharded_state(broker)
|
||||||
|
own_sr = broker.get_own_shard_range(no_default=True)
|
||||||
|
self.assertEqual(epoch, own_sr.state_timestamp) # sanity check
|
||||||
|
self.assertEqual(ShardRange.SHARDED, own_sr.state) # sanity check
|
||||||
|
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact', '--yes'])
|
||||||
|
self.assertEqual(0, ret, 'stdout:\n%s\nstderr\n%s' %
|
||||||
|
(out.getvalue(), err.getvalue()))
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['Updated 1 shard sequences for compaction.'],
|
||||||
|
out_lines[:1])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING],
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
updated_own_sr = broker.get_own_shard_range(no_default=True)
|
||||||
|
self.assertEqual(own_sr.timestamp, updated_own_sr.timestamp)
|
||||||
|
self.assertEqual(own_sr.epoch, updated_own_sr.epoch)
|
||||||
|
self.assertLess(own_sr.state_timestamp,
|
||||||
|
updated_own_sr.state_timestamp)
|
||||||
|
self.assertEqual(ShardRange.ACTIVE, updated_own_sr.state)
|
||||||
|
|
||||||
|
def test_compact_donors_but_no_suitable_acceptor(self):
|
||||||
|
# if shard ranges are already shrinking, check that the final one is
|
||||||
|
# not made into an acceptor if a suitable adjacent acceptor is not
|
||||||
|
# found (unexpected scenario but possible in an overlap situation)
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
for i, state in enumerate([ShardRange.SHRINKING] * 3 +
|
||||||
|
[ShardRange.SHARDING] +
|
||||||
|
[ShardRange.ACTIVE] * 6):
|
||||||
|
shard_ranges[i].update_state(state)
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
epoch = self._move_broker_to_sharded_state(broker)
|
||||||
|
with mock_timestamp_now(epoch):
|
||||||
|
own_sr = broker.get_own_shard_range(no_default=True)
|
||||||
|
self.assertEqual(epoch, own_sr.state_timestamp) # sanity check
|
||||||
|
self.assertEqual(ShardRange.SHARDED, own_sr.state) # sanity check
|
||||||
|
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact', '--yes',
|
||||||
|
'--max-shrinking', '99'])
|
||||||
|
self.assertEqual(0, ret, 'stdout:\n%s\nstderr\n%s' %
|
||||||
|
(out.getvalue(), err.getvalue()))
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['Updated 1 shard sequences for compaction.'],
|
||||||
|
out_lines[:1])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
shard_ranges[9].lower = shard_ranges[4].lower # expanded acceptor
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING] * 3 + # unchanged
|
||||||
|
[ShardRange.SHARDING] + # unchanged
|
||||||
|
[ShardRange.SHRINKING] * 5 + # moved to shrinking
|
||||||
|
[ShardRange.ACTIVE], # unchanged
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
with mock_timestamp_now(epoch): # force equal meta-timestamp
|
||||||
|
updated_own_sr = broker.get_own_shard_range(no_default=True)
|
||||||
|
self.assertEqual(dict(own_sr), dict(updated_own_sr))
|
||||||
|
|
||||||
|
def test_compact_no_gaps(self):
|
||||||
|
# verify that compactable sequences do not include gaps
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
for i, sr in enumerate(shard_ranges):
|
||||||
|
sr.update_state(ShardRange.ACTIVE)
|
||||||
|
gapped_ranges = shard_ranges[:3] + shard_ranges[4:]
|
||||||
|
broker.merge_shard_ranges(gapped_ranges)
|
||||||
|
self._move_broker_to_sharded_state(broker)
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact', '--yes',
|
||||||
|
'--max-shrinking', '99'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['Updated 2 shard sequences for compaction.'],
|
||||||
|
out_lines[:1])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
gapped_ranges[2].lower = gapped_ranges[0].lower
|
||||||
|
gapped_ranges[8].lower = gapped_ranges[3].lower
|
||||||
|
self.assertEqual(gapped_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING] * 2 + [ShardRange.ACTIVE] +
|
||||||
|
[ShardRange.SHRINKING] * 5 + [ShardRange.ACTIVE],
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
|
||||||
|
def test_compact_max_shrinking_default(self):
|
||||||
|
# verify default limit on number of shrinking shards per acceptor
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
for i, sr in enumerate(shard_ranges):
|
||||||
|
sr.update_state(ShardRange.ACTIVE)
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self._move_broker_to_sharded_state(broker)
|
||||||
|
|
||||||
|
def do_compact():
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact', '--yes'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['Updated 5 shard sequences for compaction.'],
|
||||||
|
out_lines[:1])
|
||||||
|
return broker.get_shard_ranges()
|
||||||
|
|
||||||
|
updated_ranges = do_compact()
|
||||||
|
for acceptor in (1, 3, 5, 7, 9):
|
||||||
|
shard_ranges[acceptor].lower = shard_ranges[acceptor - 1].lower
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING, ShardRange.ACTIVE] * 5,
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
|
||||||
|
# check idempotency
|
||||||
|
updated_ranges = do_compact()
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING, ShardRange.ACTIVE] * 5,
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
|
||||||
|
def test_compact_max_shrinking(self):
|
||||||
|
# verify option to limit the number of shrinking shards per acceptor
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
for i, sr in enumerate(shard_ranges):
|
||||||
|
sr.update_state(ShardRange.ACTIVE)
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self._move_broker_to_sharded_state(broker)
|
||||||
|
|
||||||
|
def do_compact():
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact', '--yes',
|
||||||
|
'--max-shrinking', '7'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['Updated 2 shard sequences for compaction.'],
|
||||||
|
out_lines[:1])
|
||||||
|
return broker.get_shard_ranges()
|
||||||
|
|
||||||
|
updated_ranges = do_compact()
|
||||||
|
shard_ranges[7].lower = shard_ranges[0].lower
|
||||||
|
shard_ranges[9].lower = shard_ranges[8].lower
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING] * 7 + [ShardRange.ACTIVE] +
|
||||||
|
[ShardRange.SHRINKING] + [ShardRange.ACTIVE],
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
|
||||||
|
# check idempotency
|
||||||
|
updated_ranges = do_compact()
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING] * 7 + [ShardRange.ACTIVE] +
|
||||||
|
[ShardRange.SHRINKING] + [ShardRange.ACTIVE],
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
|
||||||
|
def test_compact_max_expanding(self):
|
||||||
|
# verify option to limit the number of expanding shards per acceptor
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
for i, sr in enumerate(shard_ranges):
|
||||||
|
sr.update_state(ShardRange.ACTIVE)
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self._move_broker_to_sharded_state(broker)
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
# note: max_shrinking is set to 3 so that there is opportunity for more
|
||||||
|
# than 2 acceptors
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact', '--yes',
|
||||||
|
'--max-shrinking', '3', '--max-expanding', '2'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['Updated 2 shard sequences for compaction.'],
|
||||||
|
out_lines[:1])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
shard_ranges[3].lower = shard_ranges[0].lower
|
||||||
|
shard_ranges[7].lower = shard_ranges[4].lower
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING] * 3 + [ShardRange.ACTIVE] +
|
||||||
|
[ShardRange.SHRINKING] * 3 + [ShardRange.ACTIVE] * 3,
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
|
||||||
|
def test_compact_expansion_limit(self):
|
||||||
|
# verify option to limit the size of each acceptor after compaction
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
for i, sr in enumerate(shard_ranges):
|
||||||
|
sr.update_state(ShardRange.ACTIVE)
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self._move_broker_to_sharded_state(broker)
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact', '--yes',
|
||||||
|
'--expansion-limit', '20'])
|
||||||
|
self.assertEqual(0, ret, out.getvalue())
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['Updated 5 shard sequences for compaction.'],
|
||||||
|
out_lines[:1])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
shard_ranges[1].lower = shard_ranges[0].lower
|
||||||
|
shard_ranges[3].lower = shard_ranges[2].lower
|
||||||
|
shard_ranges[5].lower = shard_ranges[4].lower
|
||||||
|
shard_ranges[7].lower = shard_ranges[6].lower
|
||||||
|
shard_ranges[9].lower = shard_ranges[8].lower
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING] + [ShardRange.ACTIVE] +
|
||||||
|
[ShardRange.SHRINKING] + [ShardRange.ACTIVE] +
|
||||||
|
[ShardRange.SHRINKING] + [ShardRange.ACTIVE] +
|
||||||
|
[ShardRange.SHRINKING] + [ShardRange.ACTIVE] +
|
||||||
|
[ShardRange.SHRINKING] + [ShardRange.ACTIVE],
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
|
||||||
|
def test_compact_shrink_threshold(self):
|
||||||
|
# verify option to set the shrink threshold for compaction;
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
for i, sr in enumerate(shard_ranges):
|
||||||
|
sr.update_state(ShardRange.ACTIVE)
|
||||||
|
# (n-2)th shard range has one extra object
|
||||||
|
shard_ranges[-2].object_count = 11
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self._move_broker_to_sharded_state(broker)
|
||||||
|
# with threshold set to 10 no shard ranges can be shrunk
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact', '--yes',
|
||||||
|
'--max-shrinking', '99',
|
||||||
|
'--shrink-threshold', '10'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['No shards identified for compaction.'],
|
||||||
|
out_lines[:1])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.ACTIVE] * 10,
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
|
||||||
|
# with threshold == 11 all but the final 2 shard ranges can be shrunk;
|
||||||
|
# note: the (n-1)th shard range is NOT shrunk to root
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'compact', '--yes',
|
||||||
|
'--max-shrinking', '99',
|
||||||
|
'--shrink-threshold', '11'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
err_lines = err.getvalue().split('\n')
|
||||||
|
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||||
|
out_lines = out.getvalue().split('\n')
|
||||||
|
self.assertEqual(
|
||||||
|
['Updated 1 shard sequences for compaction.'],
|
||||||
|
out_lines[:1])
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
shard_ranges[8].lower = shard_ranges[0].lower
|
||||||
|
self.assertEqual(shard_ranges, updated_ranges)
|
||||||
|
self.assertEqual([ShardRange.SHRINKING] * 8 + [ShardRange.ACTIVE] * 2,
|
||||||
|
[sr.state for sr in updated_ranges])
|
||||||
|
@ -74,7 +74,7 @@ from swift.common.exceptions import Timeout, MessageTimeout, \
|
|||||||
MimeInvalid
|
MimeInvalid
|
||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.utils import is_valid_ip, is_valid_ipv4, is_valid_ipv6, \
|
from swift.common.utils import is_valid_ip, is_valid_ipv4, is_valid_ipv6, \
|
||||||
set_swift_dir, md5
|
set_swift_dir, md5, ShardRangeList
|
||||||
from swift.common.container_sync_realms import ContainerSyncRealms
|
from swift.common.container_sync_realms import ContainerSyncRealms
|
||||||
from swift.common.header_key_dict import HeaderKeyDict
|
from swift.common.header_key_dict import HeaderKeyDict
|
||||||
from swift.common.storage_policy import POLICIES, reload_storage_policies
|
from swift.common.storage_policy import POLICIES, reload_storage_policies
|
||||||
@ -8323,6 +8323,135 @@ class TestShardRange(unittest.TestCase):
|
|||||||
self.assertEqual('a/root-%s-%s-foo' % (parent_hash, ts.internal),
|
self.assertEqual('a/root-%s-%s-foo' % (parent_hash, ts.internal),
|
||||||
actual)
|
actual)
|
||||||
|
|
||||||
|
def test_expand(self):
|
||||||
|
bounds = (('', 'd'), ('d', 'k'), ('k', 't'), ('t', ''))
|
||||||
|
donors = [
|
||||||
|
utils.ShardRange('a/c-%d' % i, utils.Timestamp.now(), b[0], b[1])
|
||||||
|
for i, b in enumerate(bounds)
|
||||||
|
]
|
||||||
|
acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), 'f', 's')
|
||||||
|
self.assertTrue(acceptor.expand(donors[:1]))
|
||||||
|
self.assertEqual((utils.ShardRange.MIN, 's'),
|
||||||
|
(acceptor.lower, acceptor.upper))
|
||||||
|
|
||||||
|
acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), 'f', 's')
|
||||||
|
self.assertTrue(acceptor.expand(donors[:2]))
|
||||||
|
self.assertEqual((utils.ShardRange.MIN, 's'),
|
||||||
|
(acceptor.lower, acceptor.upper))
|
||||||
|
|
||||||
|
acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), 'f', 's')
|
||||||
|
self.assertTrue(acceptor.expand(donors[1:3]))
|
||||||
|
self.assertEqual(('d', 't'),
|
||||||
|
(acceptor.lower, acceptor.upper))
|
||||||
|
|
||||||
|
acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), 'f', 's')
|
||||||
|
self.assertTrue(acceptor.expand(donors))
|
||||||
|
self.assertEqual((utils.ShardRange.MIN, utils.ShardRange.MAX),
|
||||||
|
(acceptor.lower, acceptor.upper))
|
||||||
|
|
||||||
|
acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), 'f', 's')
|
||||||
|
self.assertTrue(acceptor.expand(donors[1:2] + donors[3:]))
|
||||||
|
self.assertEqual(('d', utils.ShardRange.MAX),
|
||||||
|
(acceptor.lower, acceptor.upper))
|
||||||
|
|
||||||
|
acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), '', 'd')
|
||||||
|
self.assertFalse(acceptor.expand(donors[:1]))
|
||||||
|
self.assertEqual((utils.ShardRange.MIN, 'd'),
|
||||||
|
(acceptor.lower, acceptor.upper))
|
||||||
|
|
||||||
|
acceptor = utils.ShardRange('a/c-acc', utils.Timestamp.now(), 'b', 'v')
|
||||||
|
self.assertFalse(acceptor.expand(donors[1:3]))
|
||||||
|
self.assertEqual(('b', 'v'),
|
||||||
|
(acceptor.lower, acceptor.upper))
|
||||||
|
|
||||||
|
|
||||||
|
class TestShardRangeList(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.shard_ranges = [
|
||||||
|
utils.ShardRange('a/b', utils.Timestamp.now(), 'a', 'b',
|
||||||
|
object_count=2, bytes_used=22),
|
||||||
|
utils.ShardRange('b/c', utils.Timestamp.now(), 'b', 'c',
|
||||||
|
object_count=4, bytes_used=44),
|
||||||
|
utils.ShardRange('x/y', utils.Timestamp.now(), 'x', 'y',
|
||||||
|
object_count=6, bytes_used=66),
|
||||||
|
]
|
||||||
|
|
||||||
|
def test_init(self):
|
||||||
|
srl = ShardRangeList()
|
||||||
|
self.assertEqual(0, len(srl))
|
||||||
|
self.assertEqual(utils.ShardRange.MIN, srl.lower)
|
||||||
|
self.assertEqual(utils.ShardRange.MIN, srl.upper)
|
||||||
|
self.assertEqual(0, srl.object_count)
|
||||||
|
self.assertEqual(0, srl.bytes_used)
|
||||||
|
|
||||||
|
def test_init_with_list(self):
|
||||||
|
srl = ShardRangeList(self.shard_ranges[:2])
|
||||||
|
self.assertEqual(2, len(srl))
|
||||||
|
self.assertEqual('a', srl.lower)
|
||||||
|
self.assertEqual('c', srl.upper)
|
||||||
|
self.assertEqual(6, srl.object_count)
|
||||||
|
self.assertEqual(66, srl.bytes_used)
|
||||||
|
|
||||||
|
srl.append(self.shard_ranges[2])
|
||||||
|
self.assertEqual(3, len(srl))
|
||||||
|
self.assertEqual('a', srl.lower)
|
||||||
|
self.assertEqual('y', srl.upper)
|
||||||
|
self.assertEqual(12, srl.object_count)
|
||||||
|
self.assertEqual(132, srl.bytes_used)
|
||||||
|
|
||||||
|
def test_pop(self):
|
||||||
|
srl = ShardRangeList(self.shard_ranges[:2])
|
||||||
|
srl.pop()
|
||||||
|
self.assertEqual(1, len(srl))
|
||||||
|
self.assertEqual('a', srl.lower)
|
||||||
|
self.assertEqual('b', srl.upper)
|
||||||
|
self.assertEqual(2, srl.object_count)
|
||||||
|
self.assertEqual(22, srl.bytes_used)
|
||||||
|
|
||||||
|
def test_slice(self):
|
||||||
|
srl = ShardRangeList(self.shard_ranges)
|
||||||
|
sublist = srl[:1]
|
||||||
|
self.assertIsInstance(sublist, ShardRangeList)
|
||||||
|
self.assertEqual(1, len(sublist))
|
||||||
|
self.assertEqual('a', sublist.lower)
|
||||||
|
self.assertEqual('b', sublist.upper)
|
||||||
|
self.assertEqual(2, sublist.object_count)
|
||||||
|
self.assertEqual(22, sublist.bytes_used)
|
||||||
|
|
||||||
|
sublist = srl[1:]
|
||||||
|
self.assertIsInstance(sublist, ShardRangeList)
|
||||||
|
self.assertEqual(2, len(sublist))
|
||||||
|
self.assertEqual('b', sublist.lower)
|
||||||
|
self.assertEqual('y', sublist.upper)
|
||||||
|
self.assertEqual(10, sublist.object_count)
|
||||||
|
self.assertEqual(110, sublist.bytes_used)
|
||||||
|
|
||||||
|
def test_includes(self):
|
||||||
|
srl = ShardRangeList(self.shard_ranges)
|
||||||
|
|
||||||
|
for sr in self.shard_ranges:
|
||||||
|
self.assertTrue(srl.includes(sr))
|
||||||
|
|
||||||
|
self.assertTrue(srl.includes(srl))
|
||||||
|
|
||||||
|
sr = utils.ShardRange('a/a', utils.Timestamp.now(), '', 'a')
|
||||||
|
self.assertFalse(srl.includes(sr))
|
||||||
|
sr = utils.ShardRange('a/a', utils.Timestamp.now(), '', 'b')
|
||||||
|
self.assertFalse(srl.includes(sr))
|
||||||
|
sr = utils.ShardRange('a/z', utils.Timestamp.now(), 'x', 'z')
|
||||||
|
self.assertFalse(srl.includes(sr))
|
||||||
|
sr = utils.ShardRange('a/z', utils.Timestamp.now(), 'y', 'z')
|
||||||
|
self.assertFalse(srl.includes(sr))
|
||||||
|
sr = utils.ShardRange('a/entire', utils.Timestamp.now(), '', '')
|
||||||
|
self.assertFalse(srl.includes(sr))
|
||||||
|
|
||||||
|
# entire range
|
||||||
|
srl_entire = ShardRangeList([sr])
|
||||||
|
self.assertFalse(srl.includes(srl_entire))
|
||||||
|
# make a fresh instance
|
||||||
|
sr = utils.ShardRange('a/entire', utils.Timestamp.now(), '', '')
|
||||||
|
self.assertTrue(srl_entire.includes(sr))
|
||||||
|
|
||||||
|
|
||||||
@patch('ctypes.get_errno')
|
@patch('ctypes.get_errno')
|
||||||
@patch.object(utils, '_sys_posix_fallocate')
|
@patch.object(utils, '_sys_posix_fallocate')
|
||||||
|
@ -39,7 +39,8 @@ from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \
|
|||||||
SHARDED, DATADIR
|
SHARDED, DATADIR
|
||||||
from swift.container.sharder import ContainerSharder, sharding_enabled, \
|
from swift.container.sharder import ContainerSharder, sharding_enabled, \
|
||||||
CleavingContext, DEFAULT_SHARD_SHRINK_POINT, \
|
CleavingContext, DEFAULT_SHARD_SHRINK_POINT, \
|
||||||
DEFAULT_SHARD_CONTAINER_THRESHOLD
|
DEFAULT_SHARD_CONTAINER_THRESHOLD, finalize_shrinking, \
|
||||||
|
find_shrinking_candidates, process_compactable_shard_sequences
|
||||||
from swift.common.utils import ShardRange, Timestamp, hash_path, \
|
from swift.common.utils import ShardRange, Timestamp, hash_path, \
|
||||||
encode_timestamps, parse_db_filename, quorum_size, Everything, md5
|
encode_timestamps, parse_db_filename, quorum_size, Everything, md5
|
||||||
from test import annotate_failure
|
from test import annotate_failure
|
||||||
@ -5146,7 +5147,9 @@ class TestSharder(BaseTestSharder):
|
|||||||
DEFAULT_SHARD_CONTAINER_THRESHOLD / 100)
|
DEFAULT_SHARD_CONTAINER_THRESHOLD / 100)
|
||||||
shard_ranges = self._make_shard_ranges(
|
shard_ranges = self._make_shard_ranges(
|
||||||
shard_bounds, state=ShardRange.ACTIVE, object_count=size)
|
shard_bounds, state=ShardRange.ACTIVE, object_count=size)
|
||||||
broker.merge_shard_ranges(shard_ranges)
|
own_sr = broker.get_own_shard_range()
|
||||||
|
own_sr.update_state(ShardRange.SHARDED, Timestamp.now())
|
||||||
|
broker.merge_shard_ranges(shard_ranges + [own_sr])
|
||||||
self.assertTrue(broker.set_sharding_state())
|
self.assertTrue(broker.set_sharding_state())
|
||||||
self.assertTrue(broker.set_sharded_state())
|
self.assertTrue(broker.set_sharded_state())
|
||||||
with self._mock_sharder() as sharder:
|
with self._mock_sharder() as sharder:
|
||||||
@ -5239,6 +5242,53 @@ class TestSharder(BaseTestSharder):
|
|||||||
[final_donor, broker.get_own_shard_range()])]
|
[final_donor, broker.get_own_shard_range()])]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_find_and_enable_multiple_shrinking_candidates(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
broker.enable_sharding(next(self.ts_iter))
|
||||||
|
shard_bounds = (('', 'a'), ('a', 'b'), ('b', 'c'),
|
||||||
|
('c', 'd'), ('d', 'e'), ('e', ''))
|
||||||
|
size = (DEFAULT_SHARD_SHRINK_POINT *
|
||||||
|
DEFAULT_SHARD_CONTAINER_THRESHOLD / 100)
|
||||||
|
shard_ranges = self._make_shard_ranges(
|
||||||
|
shard_bounds, state=ShardRange.ACTIVE, object_count=size)
|
||||||
|
own_sr = broker.get_own_shard_range()
|
||||||
|
own_sr.update_state(ShardRange.SHARDED, Timestamp.now())
|
||||||
|
broker.merge_shard_ranges(shard_ranges + [own_sr])
|
||||||
|
self.assertTrue(broker.set_sharding_state())
|
||||||
|
self.assertTrue(broker.set_sharded_state())
|
||||||
|
with self._mock_sharder() as sharder:
|
||||||
|
sharder._find_and_enable_shrinking_candidates(broker)
|
||||||
|
self._assert_shard_ranges_equal(shard_ranges,
|
||||||
|
broker.get_shard_ranges())
|
||||||
|
|
||||||
|
# three ranges just below threshold
|
||||||
|
shard_ranges = broker.get_shard_ranges() # get timestamps updated
|
||||||
|
shard_ranges[0].update_meta(size - 1, 0)
|
||||||
|
shard_ranges[1].update_meta(size - 1, 0)
|
||||||
|
shard_ranges[3].update_meta(size - 1, 0)
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
with self._mock_sharder() as sharder:
|
||||||
|
with mock_timestamp_now() as now:
|
||||||
|
sharder._send_shard_ranges = mock.MagicMock()
|
||||||
|
sharder._find_and_enable_shrinking_candidates(broker)
|
||||||
|
# 0 shrinks into 1 (only one donor per acceptor is allowed)
|
||||||
|
shard_ranges[0].update_state(ShardRange.SHRINKING, state_timestamp=now)
|
||||||
|
shard_ranges[0].epoch = now
|
||||||
|
shard_ranges[1].lower = shard_ranges[0].lower
|
||||||
|
shard_ranges[1].timestamp = now
|
||||||
|
# 3 shrinks into 4
|
||||||
|
shard_ranges[3].update_state(ShardRange.SHRINKING, state_timestamp=now)
|
||||||
|
shard_ranges[3].epoch = now
|
||||||
|
shard_ranges[4].lower = shard_ranges[3].lower
|
||||||
|
shard_ranges[4].timestamp = now
|
||||||
|
self._assert_shard_ranges_equal(shard_ranges,
|
||||||
|
broker.get_shard_ranges())
|
||||||
|
for donor, acceptor in (shard_ranges[:2], shard_ranges[3:5]):
|
||||||
|
sharder._send_shard_ranges.assert_has_calls(
|
||||||
|
[mock.call(acceptor.account, acceptor.container, [acceptor]),
|
||||||
|
mock.call(donor.account, donor.container, [donor, acceptor])]
|
||||||
|
)
|
||||||
|
|
||||||
def test_partition_and_device_filters(self):
|
def test_partition_and_device_filters(self):
|
||||||
# verify partitions and devices kwargs result in filtering of processed
|
# verify partitions and devices kwargs result in filtering of processed
|
||||||
# containers but not of the local device ids.
|
# containers but not of the local device ids.
|
||||||
@ -5804,3 +5854,135 @@ class TestCleavingContext(BaseTestSharder):
|
|||||||
self.assertEqual(2, ctx.ranges_done)
|
self.assertEqual(2, ctx.ranges_done)
|
||||||
self.assertEqual(8, ctx.ranges_todo)
|
self.assertEqual(8, ctx.ranges_todo)
|
||||||
self.assertEqual('c', ctx.cursor)
|
self.assertEqual('c', ctx.cursor)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSharderFunctions(BaseTestSharder):
|
||||||
|
def test_find_shrinking_candidates(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
shard_bounds = (('', 'a'), ('a', 'b'), ('b', 'c'), ('c', 'd'))
|
||||||
|
threshold = (DEFAULT_SHARD_SHRINK_POINT *
|
||||||
|
DEFAULT_SHARD_CONTAINER_THRESHOLD / 100)
|
||||||
|
shard_ranges = self._make_shard_ranges(
|
||||||
|
shard_bounds, state=ShardRange.ACTIVE, object_count=threshold)
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
pairs = find_shrinking_candidates(broker, threshold, threshold * 4)
|
||||||
|
self.assertEqual({}, pairs)
|
||||||
|
|
||||||
|
# one range just below threshold
|
||||||
|
shard_ranges[0].update_meta(threshold - 1, 0)
|
||||||
|
broker.merge_shard_ranges(shard_ranges[0])
|
||||||
|
pairs = find_shrinking_candidates(broker, threshold, threshold * 4)
|
||||||
|
self.assertEqual(1, len(pairs), pairs)
|
||||||
|
for acceptor, donor in pairs.items():
|
||||||
|
self.assertEqual(shard_ranges[1], acceptor)
|
||||||
|
self.assertEqual(shard_ranges[0], donor)
|
||||||
|
|
||||||
|
# two ranges just below threshold
|
||||||
|
shard_ranges[2].update_meta(threshold - 1, 0)
|
||||||
|
broker.merge_shard_ranges(shard_ranges[2])
|
||||||
|
pairs = find_shrinking_candidates(broker, threshold, threshold * 4)
|
||||||
|
# shenanigans to work around dicts with ShardRanges keys not comparing
|
||||||
|
acceptors = []
|
||||||
|
donors = []
|
||||||
|
for acceptor, donor in pairs.items():
|
||||||
|
acceptors.append(acceptor)
|
||||||
|
donors.append(donor)
|
||||||
|
acceptors.sort(key=ShardRange.sort_key)
|
||||||
|
donors.sort(key=ShardRange.sort_key)
|
||||||
|
self.assertEqual([shard_ranges[1], shard_ranges[3]], acceptors)
|
||||||
|
self.assertEqual([shard_ranges[0], shard_ranges[2]], donors)
|
||||||
|
|
||||||
|
def test_finalize_shrinking(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
broker.enable_sharding(next(self.ts_iter))
|
||||||
|
shard_bounds = (('', 'here'), ('here', 'there'), ('there', ''))
|
||||||
|
ts_0 = next(self.ts_iter)
|
||||||
|
shard_ranges = self._make_shard_ranges(
|
||||||
|
shard_bounds, state=ShardRange.ACTIVE, timestamp=ts_0)
|
||||||
|
self.assertTrue(broker.set_sharding_state())
|
||||||
|
self.assertTrue(broker.set_sharded_state())
|
||||||
|
ts_1 = next(self.ts_iter)
|
||||||
|
finalize_shrinking(broker, shard_ranges[2:], shard_ranges[:2], ts_1)
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assertEqual(
|
||||||
|
[ShardRange.SHRINKING, ShardRange.SHRINKING, ShardRange.ACTIVE],
|
||||||
|
[sr.state for sr in updated_ranges]
|
||||||
|
)
|
||||||
|
# acceptor is not updated...
|
||||||
|
self.assertEqual(ts_0, updated_ranges[2].timestamp)
|
||||||
|
# donors are updated...
|
||||||
|
self.assertEqual([ts_1] * 2,
|
||||||
|
[sr.state_timestamp for sr in updated_ranges[:2]])
|
||||||
|
self.assertEqual([ts_1] * 2,
|
||||||
|
[sr.epoch for sr in updated_ranges[:2]])
|
||||||
|
# check idempotency
|
||||||
|
ts_2 = next(self.ts_iter)
|
||||||
|
finalize_shrinking(broker, shard_ranges[2:], shard_ranges[:2], ts_2)
|
||||||
|
updated_ranges = broker.get_shard_ranges()
|
||||||
|
self.assertEqual(
|
||||||
|
[ShardRange.SHRINKING, ShardRange.SHRINKING, ShardRange.ACTIVE],
|
||||||
|
[sr.state for sr in updated_ranges]
|
||||||
|
)
|
||||||
|
# acceptor is not updated...
|
||||||
|
self.assertEqual(ts_0, updated_ranges[2].timestamp)
|
||||||
|
# donors are not updated...
|
||||||
|
self.assertEqual([ts_1] * 2,
|
||||||
|
[sr.state_timestamp for sr in updated_ranges[:2]])
|
||||||
|
self.assertEqual([ts_1] * 2,
|
||||||
|
[sr.epoch for sr in updated_ranges[:2]])
|
||||||
|
|
||||||
|
def test_process_compactable(self):
|
||||||
|
ts_0 = next(self.ts_iter)
|
||||||
|
# no sequences...
|
||||||
|
acceptors, donors = process_compactable_shard_sequences([], ts_0)
|
||||||
|
self.assertEqual([], acceptors)
|
||||||
|
self.assertEqual([], donors)
|
||||||
|
|
||||||
|
# two sequences with acceptor bounds needing to be updated
|
||||||
|
sequence_1 = self._make_shard_ranges(
|
||||||
|
(('a', 'b'), ('b', 'c'), ('c', 'd')),
|
||||||
|
state=ShardRange.ACTIVE, timestamp=ts_0)
|
||||||
|
sequence_2 = self._make_shard_ranges(
|
||||||
|
(('x', 'y'), ('y', 'z')),
|
||||||
|
state=ShardRange.ACTIVE, timestamp=ts_0)
|
||||||
|
ts_1 = next(self.ts_iter)
|
||||||
|
acceptors, donors = process_compactable_shard_sequences(
|
||||||
|
[sequence_1, sequence_2], ts_1)
|
||||||
|
expected_donors = sequence_1[:-1] + sequence_2[:-1]
|
||||||
|
expected_acceptors = [sequence_1[-1].copy(lower='a', timestamp=ts_1),
|
||||||
|
sequence_2[-1].copy(lower='x', timestamp=ts_1)]
|
||||||
|
self.assertEqual([dict(sr) for sr in expected_acceptors],
|
||||||
|
[dict(sr) for sr in acceptors])
|
||||||
|
self.assertEqual([dict(sr) for sr in expected_donors],
|
||||||
|
[dict(sr) for sr in donors])
|
||||||
|
|
||||||
|
# sequences have already been processed - acceptors expanded
|
||||||
|
sequence_1 = self._make_shard_ranges(
|
||||||
|
(('a', 'b'), ('b', 'c'), ('a', 'd')),
|
||||||
|
state=ShardRange.ACTIVE, timestamp=ts_0)
|
||||||
|
sequence_2 = self._make_shard_ranges(
|
||||||
|
(('x', 'y'), ('x', 'z')),
|
||||||
|
state=ShardRange.ACTIVE, timestamp=ts_0)
|
||||||
|
acceptors, donors = process_compactable_shard_sequences(
|
||||||
|
[sequence_1, sequence_2], ts_1)
|
||||||
|
expected_donors = sequence_1[:-1] + sequence_2[:-1]
|
||||||
|
expected_acceptors = [sequence_1[-1], sequence_2[-1]]
|
||||||
|
self.assertEqual([dict(sr) for sr in expected_acceptors],
|
||||||
|
[dict(sr) for sr in acceptors])
|
||||||
|
self.assertEqual([dict(sr) for sr in expected_donors],
|
||||||
|
[dict(sr) for sr in donors])
|
||||||
|
|
||||||
|
# acceptor is root - needs state to be updated, but not bounds
|
||||||
|
sequence_1 = self._make_shard_ranges(
|
||||||
|
(('a', 'b'), ('b', 'c'), ('a', 'd'), ('d', ''), ('', '')),
|
||||||
|
state=[ShardRange.ACTIVE] * 4 + [ShardRange.SHARDED],
|
||||||
|
timestamp=ts_0)
|
||||||
|
acceptors, donors = process_compactable_shard_sequences(
|
||||||
|
[sequence_1], ts_1)
|
||||||
|
expected_donors = sequence_1[:-1]
|
||||||
|
expected_acceptors = [sequence_1[-1].copy(state=ShardRange.ACTIVE,
|
||||||
|
state_timestamp=ts_1)]
|
||||||
|
self.assertEqual([dict(sr) for sr in expected_acceptors],
|
||||||
|
[dict(sr) for sr in acceptors])
|
||||||
|
self.assertEqual([dict(sr) for sr in expected_donors],
|
||||||
|
[dict(sr) for sr in donors])
|
||||||
|
Loading…
Reference in New Issue
Block a user