Enable shard ranges to be manually shrunk to root container
Shard containers learn about their own shard range by fetching shard ranges from the root container during the sharder audit phase. Since [1], if the shard is shrinking, it may also learn about acceptor shards in the shard ranges fetched from the root. However, the fetched shard ranges do not currently include the root's own shard range, even when the root is to be the acceptor for a shrinking shard. This prevents the mechanism being used to perform shrinking to root. This patch modifies the root container behaviour to include its own shard range in responses to shard containers when the container GET request param 'states' has value 'auditing'. This parameter is used to indicate that a particular GET request is from the sharder during shard audit; the root does not otherwise include its own shard range in GET responses. When the 'states=auditing' parameter is used with a container GET request the response includes all shard ranges except those in the FOUND state. The shard ranges of relevance to a shard are its own shard range and any overlapping shard ranges that may be acceptors if the shard is shrinking. None of these relevant shard ranges should be in state FOUND: the shard itself cannot be in FOUND state since it has been created; acceptor ranges should not be in FOUND state. The FOUND state is therefore excluded from the 'auditing' states to prevent an unintended overlapping FOUND shard range that has not yet been resolved at the root container being fetched by a shrinking shard, which might then proceed to create and cleave to it. The shard only merges the root's shard range (and any other shard ranges) when the shard is shrinking. If the root shard range is ACTIVE then it is the acceptor and will be used when the shard cleaves. If the root shard range is in any other state then it will be ignored when the shard cleaves to other acceptors. The sharder cleave loop is modified to break as soon as cleaving is done i.e. cleaving has been completed up to the shard's upper bound. This prevents misleading logging that cleaving has stopped when in fact cleaving to a non-root acceptor has completed but the shard range list still contains an irrelevant root shard range in SHARDED state. This also prevents cleaving to more than one acceptor in the unexpected case that multiple active acceptors overlap the shrinking shard - cleaving will now complete once the first acceptor has cleaved. [1] Related-Change: I9034a5715406b310c7282f1bec9625fe7acd57b6 Change-Id: I5d48b67217f705ac30bb427ef8d969a90eaad2e5
This commit is contained in:
parent
beb1c3969b
commit
b0c8de699e
@ -54,7 +54,13 @@ SHARD_STATS_STATES = [ShardRange.ACTIVE, ShardRange.SHARDING,
|
||||
SHARD_LISTING_STATES = SHARD_STATS_STATES + [ShardRange.CLEAVED]
|
||||
SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
|
||||
ShardRange.ACTIVE, ShardRange.SHARDING]
|
||||
|
||||
# when auditing a shard gets its own shard range, which could be in any state
|
||||
# except FOUND, and any potential acceptors excluding FOUND ranges that may be
|
||||
# unwanted overlaps
|
||||
SHARD_AUDITING_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
|
||||
ShardRange.ACTIVE, ShardRange.SHARDING,
|
||||
ShardRange.SHARDED, ShardRange.SHRINKING,
|
||||
ShardRange.SHRUNK]
|
||||
|
||||
# attribute names in order used when transforming shard ranges from dicts to
|
||||
# tuples and vice-versa
|
||||
@ -1706,7 +1712,10 @@ class ContainerBroker(DatabaseBroker):
|
||||
|
||||
The following alias values are supported: 'listing' maps to all states
|
||||
that are considered valid when listing objects; 'updating' maps to all
|
||||
states that are considered valid for redirecting an object update.
|
||||
states that are considered valid for redirecting an object update;
|
||||
'auditing' maps to all states that are considered valid for a shard
|
||||
container that is updating its own shard range table from a root (this
|
||||
currently maps to all states except FOUND).
|
||||
|
||||
:param states: a list of values each of which may be the name of a
|
||||
state, the number of a state, or an alias
|
||||
@ -1721,6 +1730,8 @@ class ContainerBroker(DatabaseBroker):
|
||||
resolved_states.update(SHARD_LISTING_STATES)
|
||||
elif state == 'updating':
|
||||
resolved_states.update(SHARD_UPDATE_STATES)
|
||||
elif state == 'auditing':
|
||||
resolved_states.update(SHARD_AUDITING_STATES)
|
||||
else:
|
||||
resolved_states.add(ShardRange.resolve_state(state)[0])
|
||||
return resolved_states
|
||||
|
@ -749,10 +749,14 @@ class ContainerController(BaseStorageServer):
|
||||
marker = end_marker = includes = None
|
||||
reverse = False
|
||||
states = params.get('states')
|
||||
fill_gaps = False
|
||||
fill_gaps = include_own = False
|
||||
if states:
|
||||
states = list_from_csv(states)
|
||||
fill_gaps = any(('listing' in states, 'updating' in states))
|
||||
# 'auditing' is used during shard audit; if the shard is
|
||||
# shrinking then it needs to get acceptor shard ranges, which
|
||||
# may be the root container itself, so use include_own
|
||||
include_own = 'auditing' in states
|
||||
try:
|
||||
states = broker.resolve_shard_range_states(states)
|
||||
except ValueError:
|
||||
@ -761,7 +765,8 @@ class ContainerController(BaseStorageServer):
|
||||
req.headers.get('x-backend-include-deleted', False))
|
||||
container_list = broker.get_shard_ranges(
|
||||
marker, end_marker, includes, reverse, states=states,
|
||||
include_deleted=include_deleted, fill_gaps=fill_gaps)
|
||||
include_deleted=include_deleted, fill_gaps=fill_gaps,
|
||||
include_own=include_own)
|
||||
else:
|
||||
resp_headers = gen_resp_headers(info, is_deleted=is_deleted)
|
||||
if is_deleted:
|
||||
|
@ -317,8 +317,7 @@ class CleavingContext(object):
|
||||
def range_done(self, new_cursor):
|
||||
self.ranges_done += 1
|
||||
self.ranges_todo -= 1
|
||||
if new_cursor is not None:
|
||||
self.cursor = new_cursor
|
||||
self.cursor = new_cursor
|
||||
|
||||
def done(self):
|
||||
return all((self.misplaced_done, self.cleaving_done,
|
||||
@ -744,11 +743,18 @@ class ContainerSharder(ContainerReplicator):
|
||||
shard_ranges = own_shard_range_from_root = None
|
||||
if own_shard_range:
|
||||
# Get the root view of the world, at least that part of the world
|
||||
# that overlaps with this shard's namespace
|
||||
# that overlaps with this shard's namespace. The
|
||||
# 'states=auditing' parameter will cause the root to include
|
||||
# its own shard range in the response, which is necessary for the
|
||||
# particular case when this shard should be shrinking to the root
|
||||
# container; when not shrinking to root, but to another acceptor,
|
||||
# the root range should be in sharded state and will not interfere
|
||||
# with cleaving, listing or updating behaviour.
|
||||
shard_ranges = self._fetch_shard_ranges(
|
||||
broker, newest=True,
|
||||
params={'marker': str_to_wsgi(own_shard_range.lower_str),
|
||||
'end_marker': str_to_wsgi(own_shard_range.upper_str)},
|
||||
'end_marker': str_to_wsgi(own_shard_range.upper_str),
|
||||
'states': 'auditing'},
|
||||
include_deleted=True)
|
||||
if shard_ranges:
|
||||
for shard_range in shard_ranges:
|
||||
@ -1389,10 +1395,17 @@ class ContainerSharder(ContainerReplicator):
|
||||
quote(broker.path))
|
||||
return cleaving_context.misplaced_done
|
||||
|
||||
ranges_todo = broker.get_shard_ranges(marker=cleaving_context.marker)
|
||||
shard_ranges = broker.get_shard_ranges(marker=cleaving_context.marker)
|
||||
# Ignore shrinking shard ranges: we never want to cleave objects to a
|
||||
# shrinking shard. Shrinking shard ranges are to be expected in a root;
|
||||
# shrinking shard ranges (other than own shard range) are not normally
|
||||
# expected in a shard but can occur if there is an overlapping shard
|
||||
# range that has been discovered from the root.
|
||||
ranges_todo = [sr for sr in shard_ranges
|
||||
if sr.state != ShardRange.SHRINKING]
|
||||
if cleaving_context.cursor:
|
||||
# always update ranges_todo in case more ranges have been found
|
||||
# since last visit
|
||||
# always update ranges_todo in case shard ranges have changed since
|
||||
# last visit
|
||||
cleaving_context.ranges_todo = len(ranges_todo)
|
||||
self.logger.debug('Continuing to cleave (%s done, %s todo): %s',
|
||||
cleaving_context.ranges_done,
|
||||
@ -1406,36 +1419,36 @@ class ContainerSharder(ContainerReplicator):
|
||||
|
||||
ranges_done = []
|
||||
for shard_range in ranges_todo:
|
||||
if shard_range.state == ShardRange.SHRINKING:
|
||||
# Ignore shrinking shard ranges: we never want to cleave
|
||||
# objects to a shrinking shard. Shrinking shard ranges are to
|
||||
# be expected in a root; shrinking shard ranges (other than own
|
||||
# shard range) are not normally expected in a shard but can
|
||||
# occur if there is an overlapping shard range that has been
|
||||
# discovered from the root.
|
||||
cleaving_context.range_done(None) # don't move the cursor
|
||||
continue
|
||||
elif shard_range.state in (ShardRange.CREATED,
|
||||
ShardRange.CLEAVED,
|
||||
ShardRange.ACTIVE):
|
||||
cleave_result = self._cleave_shard_range(
|
||||
broker, cleaving_context, shard_range)
|
||||
if cleave_result == CLEAVE_SUCCESS:
|
||||
ranges_done.append(shard_range)
|
||||
if len(ranges_done) == self.cleave_batch_size:
|
||||
break
|
||||
elif cleave_result == CLEAVE_FAILED:
|
||||
break
|
||||
# else, no errors, but no rows found either. keep going,
|
||||
# and don't count it against our batch size
|
||||
else:
|
||||
if cleaving_context.cleaving_done:
|
||||
# note: there may still be ranges_todo, for example: if this
|
||||
# shard is shrinking and has merged a root shard range in
|
||||
# sharded state along with an active acceptor shard range, but
|
||||
# the root range is irrelevant
|
||||
break
|
||||
|
||||
if len(ranges_done) == self.cleave_batch_size:
|
||||
break
|
||||
|
||||
if shard_range.state not in (ShardRange.CREATED,
|
||||
ShardRange.CLEAVED,
|
||||
ShardRange.ACTIVE):
|
||||
self.logger.info('Stopped cleave at unready %s', shard_range)
|
||||
break
|
||||
|
||||
if not ranges_done:
|
||||
# _cleave_shard_range always store()s the context on success; make
|
||||
# sure we *also* do that if we hit a failure right off the bat
|
||||
cleaving_context.store(broker)
|
||||
cleave_result = self._cleave_shard_range(
|
||||
broker, cleaving_context, shard_range)
|
||||
|
||||
if cleave_result == CLEAVE_SUCCESS:
|
||||
ranges_done.append(shard_range)
|
||||
elif cleave_result == CLEAVE_FAILED:
|
||||
break
|
||||
# else: CLEAVE_EMPTY: no errors, but no rows found either. keep
|
||||
# going, and don't count it against our batch size
|
||||
|
||||
# _cleave_shard_range always store()s the context on success; *also* do
|
||||
# that here in case we hit a failure right off the bat or ended loop
|
||||
# with skipped ranges
|
||||
cleaving_context.store(broker)
|
||||
self.logger.debug(
|
||||
'Cleaved %s shard ranges for %s',
|
||||
len(ranges_done), quote(broker.path))
|
||||
|
@ -1441,14 +1441,32 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
def test_shrinking(self):
|
||||
int_client = self.make_internal_client()
|
||||
|
||||
def check_node_data(node_data, exp_hdrs, exp_obj_count, exp_shards):
|
||||
def check_node_data(node_data, exp_hdrs, exp_obj_count, exp_shards,
|
||||
exp_sharded_root_range=False):
|
||||
hdrs, range_data = node_data
|
||||
self.assert_dict_contains(exp_hdrs, hdrs)
|
||||
self.assert_shard_ranges_contiguous(exp_shards, range_data)
|
||||
self.assert_total_object_count(exp_obj_count, range_data)
|
||||
sharded_root_range = False
|
||||
other_range_data = []
|
||||
for data in range_data:
|
||||
sr = ShardRange.from_dict(data)
|
||||
if (sr.account == self.account and
|
||||
sr.container == self.container_name and
|
||||
sr.state == ShardRange.SHARDED):
|
||||
# only expect one root range
|
||||
self.assertFalse(sharded_root_range, range_data)
|
||||
sharded_root_range = True
|
||||
self.assertEqual(ShardRange.MIN, sr.lower, sr)
|
||||
self.assertEqual(ShardRange.MAX, sr.upper, sr)
|
||||
else:
|
||||
# include active root range in further assertions
|
||||
other_range_data.append(data)
|
||||
self.assertEqual(exp_sharded_root_range, sharded_root_range)
|
||||
self.assert_shard_ranges_contiguous(exp_shards, other_range_data)
|
||||
self.assert_total_object_count(exp_obj_count, other_range_data)
|
||||
|
||||
def check_shard_nodes_data(node_data, expected_state='unsharded',
|
||||
expected_shards=0, exp_obj_count=0):
|
||||
expected_shards=0, exp_obj_count=0,
|
||||
exp_sharded_root_range=False):
|
||||
# checks that shard range is consistent on all nodes
|
||||
root_path = '%s/%s' % (self.account, self.container_name)
|
||||
exp_shard_hdrs = {
|
||||
@ -1460,7 +1478,7 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
with annotate_failure('Node id %s.' % node_id):
|
||||
check_node_data(
|
||||
node_data, exp_shard_hdrs, exp_obj_count,
|
||||
expected_shards)
|
||||
expected_shards, exp_sharded_root_range)
|
||||
hdrs = node_data[0]
|
||||
object_counts.append(int(hdrs['X-Container-Object-Count']))
|
||||
bytes_used.append(int(hdrs['X-Container-Bytes-Used']))
|
||||
@ -1659,10 +1677,13 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
donor = orig_shard_ranges[0]
|
||||
shard_nodes_data = self.direct_get_container_shard_ranges(
|
||||
donor.account, donor.container)
|
||||
# the donor's shard range will have the acceptor's projected stats
|
||||
# the donor's shard range will have the acceptor's projected stats;
|
||||
# donor also has copy of root shard range that will be ignored;
|
||||
# note: expected_shards does not include the sharded root range
|
||||
obj_count, bytes_used = check_shard_nodes_data(
|
||||
shard_nodes_data, expected_state='sharded', expected_shards=1,
|
||||
exp_obj_count=len(second_shard_objects) + 1)
|
||||
exp_obj_count=len(second_shard_objects) + 1,
|
||||
exp_sharded_root_range=True)
|
||||
# but the donor is empty and so reports zero stats
|
||||
self.assertEqual(0, obj_count)
|
||||
self.assertEqual(0, bytes_used)
|
||||
@ -2528,6 +2549,29 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
||||
self.assert_container_state(self.brain.nodes[2], 'sharded', 2)
|
||||
self.assert_container_listing(obj_names)
|
||||
|
||||
# Let's pretend that some actor in the system has determined that all
|
||||
# the shard ranges should shrink back to root
|
||||
# TODO: replace this db manipulation if/when manage_shard_ranges can
|
||||
# manage shrinking...
|
||||
broker = self.get_broker(self.brain.part, self.brain.nodes[0])
|
||||
shard_ranges = broker.get_shard_ranges()
|
||||
self.assertEqual(2, len(shard_ranges))
|
||||
for sr in shard_ranges:
|
||||
self.assertTrue(sr.update_state(ShardRange.SHRINKING))
|
||||
sr.epoch = sr.state_timestamp = Timestamp.now()
|
||||
own_sr = broker.get_own_shard_range()
|
||||
own_sr.update_state(ShardRange.ACTIVE, state_timestamp=Timestamp.now())
|
||||
broker.merge_shard_ranges(shard_ranges + [own_sr])
|
||||
|
||||
# replicate and run sharders
|
||||
self.replicators.once()
|
||||
self.sharders_once()
|
||||
|
||||
self.assert_container_state(self.brain.nodes[0], 'collapsed', 0)
|
||||
self.assert_container_state(self.brain.nodes[1], 'collapsed', 0)
|
||||
self.assert_container_state(self.brain.nodes[2], 'collapsed', 0)
|
||||
self.assert_container_listing(obj_names)
|
||||
|
||||
def test_manage_shard_ranges_used_poorly(self):
|
||||
obj_names = self._make_object_names(8)
|
||||
self.put_objects(obj_names)
|
||||
|
@ -3830,6 +3830,12 @@ class TestContainerBroker(unittest.TestCase):
|
||||
ContainerBroker.resolve_shard_range_states(
|
||||
['updating', 'listing']))
|
||||
|
||||
self.assertEqual(
|
||||
{ShardRange.CREATED, ShardRange.CLEAVED,
|
||||
ShardRange.ACTIVE, ShardRange.SHARDING, ShardRange.SHARDED,
|
||||
ShardRange.SHRINKING, ShardRange.SHRUNK},
|
||||
ContainerBroker.resolve_shard_range_states(['auditing']))
|
||||
|
||||
def check_bad_value(value):
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
ContainerBroker.resolve_shard_range_states(value)
|
||||
@ -3928,7 +3934,7 @@ class TestContainerBroker(unittest.TestCase):
|
||||
self.assertFalse(actual)
|
||||
|
||||
@with_tempdir
|
||||
def test_overloap_shard_range_order(self, tempdir):
|
||||
def test_overlap_shard_range_order(self, tempdir):
|
||||
db_path = os.path.join(tempdir, 'container.db')
|
||||
broker = ContainerBroker(db_path, account='a', container='c')
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
|
@ -3178,6 +3178,98 @@ class TestContainerController(unittest.TestCase):
|
||||
|
||||
do_test({'states': 'bad'}, 404)
|
||||
|
||||
def test_GET_shard_ranges_auditing(self):
|
||||
# verify that states=auditing causes own shard range to be included
|
||||
def put_shard_ranges(shard_ranges):
|
||||
headers = {'X-Timestamp': next(self.ts).normal,
|
||||
'X-Backend-Record-Type': 'shard'}
|
||||
body = json.dumps([dict(sr) for sr in shard_ranges])
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c', method='PUT', headers=headers, body=body)
|
||||
self.assertEqual(202, req.get_response(self.controller).status_int)
|
||||
|
||||
def do_test(ts_now, extra_params):
|
||||
headers = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Include-Deleted': 'True'}
|
||||
params = {'format': 'json'}
|
||||
if extra_params:
|
||||
params.update(extra_params)
|
||||
req = Request.blank('/sda1/p/a/c?format=json', method='GET',
|
||||
headers=headers, params=params)
|
||||
with mock_timestamp_now(ts_now):
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.content_type, 'application/json')
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual('shard', resp.headers['X-Backend-Record-Type'])
|
||||
return resp
|
||||
|
||||
# initially not all shards are shrinking and root is sharded
|
||||
own_sr = ShardRange('a/c', next(self.ts), '', '',
|
||||
state=ShardRange.SHARDED)
|
||||
shard_bounds = [('', 'f', ShardRange.SHRUNK, True),
|
||||
('f', 't', ShardRange.SHRINKING, False),
|
||||
('t', '', ShardRange.ACTIVE, False)]
|
||||
shard_ranges = [
|
||||
ShardRange('.shards_a/_%s' % upper, next(self.ts),
|
||||
lower, upper, state=state, deleted=deleted)
|
||||
for (lower, upper, state, deleted) in shard_bounds]
|
||||
overlap = ShardRange('.shards_a/c_bad', next(self.ts), '', 'f',
|
||||
state=ShardRange.FOUND)
|
||||
|
||||
# create container and PUT some shard ranges
|
||||
headers = {'X-Timestamp': next(self.ts).normal}
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c', method='PUT', headers=headers)
|
||||
self.assertIn(
|
||||
req.get_response(self.controller).status_int, (201, 202))
|
||||
put_shard_ranges(shard_ranges + [own_sr, overlap])
|
||||
|
||||
# do *not* expect own shard range in default case (no states param)
|
||||
ts_now = next(self.ts)
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in [overlap] + shard_ranges]
|
||||
resp = do_test(ts_now, {})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
# expect own shard range to be included when states=auditing
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in shard_ranges + [own_sr]]
|
||||
resp = do_test(ts_now, {'states': 'auditing'})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
# expect own shard range to be included, marker/end_marker respected
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in shard_ranges[1:2] + [own_sr]]
|
||||
resp = do_test(ts_now, {'marker': 'f', 'end_marker': 't',
|
||||
'states': 'auditing'})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
# update shards to all shrinking and root to active
|
||||
shard_ranges[-1].update_state(ShardRange.SHRINKING, next(self.ts))
|
||||
own_sr.update_state(ShardRange.ACTIVE, next(self.ts))
|
||||
put_shard_ranges(shard_ranges + [own_sr])
|
||||
|
||||
# do *not* expect own shard range in default case (no states param)
|
||||
ts_now = next(self.ts)
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in [overlap] + shard_ranges]
|
||||
resp = do_test(ts_now, {})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
# expect own shard range to be included when states=auditing
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in shard_ranges[:2] + [own_sr] + shard_ranges[2:]]
|
||||
resp = do_test(ts_now, {'states': 'auditing'})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
# expect own shard range to be included, marker/end_marker respected
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in shard_ranges[1:2] + [own_sr]]
|
||||
resp = do_test(ts_now, {'marker': 'f', 'end_marker': 't',
|
||||
'states': 'auditing'})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
def test_GET_auto_record_type(self):
|
||||
# make a container
|
||||
ts_iter = make_timestamp_iter()
|
||||
|
@ -2258,14 +2258,14 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
|
||||
# run cleave - first batch is cleaved, shrinking range doesn't count
|
||||
# towards batch size of 2 but does count towards ranges_done
|
||||
# towards batch size of 2 nor towards ranges_done
|
||||
with self._mock_sharder() as sharder:
|
||||
self.assertFalse(sharder._cleave(broker))
|
||||
context = CleavingContext.load(broker)
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertFalse(context.cleaving_done)
|
||||
self.assertEqual(shard_ranges[2].upper_str, context.cursor)
|
||||
self.assertEqual(3, context.ranges_done)
|
||||
self.assertEqual(2, context.ranges_done)
|
||||
self.assertEqual(2, context.ranges_todo)
|
||||
|
||||
# run cleave - stops at shard range in FOUND state
|
||||
@ -2275,7 +2275,7 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertFalse(context.cleaving_done)
|
||||
self.assertEqual(shard_ranges[3].upper_str, context.cursor)
|
||||
self.assertEqual(4, context.ranges_done)
|
||||
self.assertEqual(3, context.ranges_done)
|
||||
self.assertEqual(1, context.ranges_todo)
|
||||
|
||||
# run cleave - final shard range in CREATED state, cleaving proceeds
|
||||
@ -2288,9 +2288,149 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertTrue(context.cleaving_done)
|
||||
self.assertEqual(shard_ranges[4].upper_str, context.cursor)
|
||||
self.assertEqual(5, context.ranges_done)
|
||||
self.assertEqual(4, context.ranges_done)
|
||||
self.assertEqual(0, context.ranges_todo)
|
||||
|
||||
def test_cleave_shrinking_to_active_root_range(self):
|
||||
broker = self._make_broker(account='.shards_a', container='shard_c')
|
||||
broker.put_object(
|
||||
'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0)
|
||||
# a donor previously shrunk to own...
|
||||
deleted_range = ShardRange(
|
||||
'.shards/other', next(self.ts_iter), 'here', 'there', deleted=True,
|
||||
state=ShardRange.SHRUNK, epoch=next(self.ts_iter))
|
||||
own_shard_range = ShardRange(
|
||||
broker.path, next(self.ts_iter), 'here', '',
|
||||
state=ShardRange.SHRINKING, epoch=next(self.ts_iter))
|
||||
# root is the acceptor...
|
||||
root = ShardRange(
|
||||
'a/c', next(self.ts_iter), '', '',
|
||||
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
|
||||
broker.merge_shard_ranges([deleted_range, own_shard_range, root])
|
||||
broker.set_sharding_sysmeta('Root', 'a/c')
|
||||
self.assertFalse(broker.is_root_container()) # sanity check
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
|
||||
# expect cleave to the root
|
||||
with self._mock_sharder() as sharder:
|
||||
self.assertTrue(sharder._cleave(broker))
|
||||
context = CleavingContext.load(broker)
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertTrue(context.cleaving_done)
|
||||
self.assertEqual(root.upper_str, context.cursor)
|
||||
self.assertEqual(1, context.ranges_done)
|
||||
self.assertEqual(0, context.ranges_todo)
|
||||
|
||||
def test_cleave_shrinking_to_active_acceptor_with_sharded_root_range(self):
|
||||
broker = self._make_broker(account='.shards_a', container='shard_c')
|
||||
broker.put_object(
|
||||
'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0)
|
||||
own_shard_range = ShardRange(
|
||||
broker.path, next(self.ts_iter), 'here', 'there',
|
||||
state=ShardRange.SHARDING, epoch=next(self.ts_iter))
|
||||
# the intended acceptor...
|
||||
acceptor = ShardRange(
|
||||
'.shards_a/shard_d', next(self.ts_iter), 'here', '',
|
||||
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
|
||||
# root range also gets pulled from root during audit...
|
||||
root = ShardRange(
|
||||
'a/c', next(self.ts_iter), '', '',
|
||||
state=ShardRange.SHARDED, epoch=next(self.ts_iter))
|
||||
broker.merge_shard_ranges([own_shard_range, acceptor, root])
|
||||
broker.set_sharding_sysmeta('Root', 'a/c')
|
||||
self.assertFalse(broker.is_root_container()) # sanity check
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
|
||||
# sharded root range should always sort after an active acceptor so
|
||||
# expect cleave to acceptor first then cleaving completes
|
||||
with self._mock_sharder() as sharder:
|
||||
self.assertTrue(sharder._cleave(broker))
|
||||
context = CleavingContext.load(broker)
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertTrue(context.cleaving_done)
|
||||
self.assertEqual(acceptor.upper_str, context.cursor)
|
||||
self.assertEqual(1, context.ranges_done) # cleaved the acceptor
|
||||
self.assertEqual(1, context.ranges_todo) # never reached sharded root
|
||||
|
||||
def test_cleave_shrinking_to_active_root_range_with_active_acceptor(self):
|
||||
# if shrinking shard has both active root and active other acceptor,
|
||||
# verify that shard only cleaves to one of them;
|
||||
# root will sort before acceptor if acceptor.upper==MAX
|
||||
broker = self._make_broker(account='.shards_a', container='shard_c')
|
||||
broker.put_object(
|
||||
'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0)
|
||||
own_shard_range = ShardRange(
|
||||
broker.path, next(self.ts_iter), 'here', 'there',
|
||||
state=ShardRange.SHRINKING, epoch=next(self.ts_iter))
|
||||
# active acceptor with upper bound == MAX
|
||||
acceptor = ShardRange(
|
||||
'.shards/other', next(self.ts_iter), 'here', '', deleted=False,
|
||||
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
|
||||
# root is also active
|
||||
root = ShardRange(
|
||||
'a/c', next(self.ts_iter), '', '',
|
||||
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
|
||||
broker.merge_shard_ranges([own_shard_range, acceptor, root])
|
||||
broker.set_sharding_sysmeta('Root', 'a/c')
|
||||
self.assertFalse(broker.is_root_container()) # sanity check
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
|
||||
# expect cleave to the root
|
||||
acceptor.upper = ''
|
||||
acceptor.timestamp = next(self.ts_iter)
|
||||
broker.merge_shard_ranges([acceptor])
|
||||
with self._mock_sharder() as sharder:
|
||||
self.assertTrue(sharder._cleave(broker))
|
||||
context = CleavingContext.load(broker)
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertTrue(context.cleaving_done)
|
||||
self.assertEqual(root.upper_str, context.cursor)
|
||||
self.assertEqual(1, context.ranges_done)
|
||||
self.assertEqual(1, context.ranges_todo)
|
||||
info = [
|
||||
line for line in self.logger.get_lines_for_level('info')
|
||||
if line.startswith('Replicating new shard container a/c')
|
||||
]
|
||||
self.assertEqual(1, len(info))
|
||||
|
||||
def test_cleave_shrinking_to_active_acceptor_with_active_root_range(self):
|
||||
# if shrinking shard has both active root and active other acceptor,
|
||||
# verify that shard only cleaves to one of them;
|
||||
# root will sort after acceptor if acceptor.upper<MAX
|
||||
broker = self._make_broker(account='.shards_a', container='shard_c')
|
||||
broker.put_object(
|
||||
'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0)
|
||||
own_shard_range = ShardRange(
|
||||
broker.path, next(self.ts_iter), 'here', 'there',
|
||||
state=ShardRange.SHRINKING, epoch=next(self.ts_iter))
|
||||
# active acceptor with upper bound < MAX
|
||||
acceptor = ShardRange(
|
||||
'.shards/other', next(self.ts_iter), 'here', 'where',
|
||||
deleted=False, state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
|
||||
# root is also active
|
||||
root = ShardRange(
|
||||
'a/c', next(self.ts_iter), '', '',
|
||||
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
|
||||
broker.merge_shard_ranges([own_shard_range, acceptor, root])
|
||||
broker.set_sharding_sysmeta('Root', 'a/c')
|
||||
self.assertFalse(broker.is_root_container()) # sanity check
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
|
||||
# expect cleave to the acceptor
|
||||
with self._mock_sharder() as sharder:
|
||||
self.assertTrue(sharder._cleave(broker))
|
||||
context = CleavingContext.load(broker)
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertTrue(context.cleaving_done)
|
||||
self.assertEqual(acceptor.upper_str, context.cursor)
|
||||
self.assertEqual(1, context.ranges_done)
|
||||
self.assertEqual(1, context.ranges_todo)
|
||||
info = [
|
||||
line for line in self.logger.get_lines_for_level('info')
|
||||
if line.startswith('Replicating new shard container .shards/other')
|
||||
]
|
||||
self.assertEqual(1, len(info))
|
||||
|
||||
def _check_complete_sharding(self, account, container, shard_bounds):
|
||||
broker = self._make_sharding_broker(
|
||||
account=account, container=container, shard_bounds=shard_bounds)
|
||||
@ -4499,6 +4639,7 @@ class TestSharder(BaseTestSharder):
|
||||
mock_response = mock.MagicMock()
|
||||
mock_response.headers = {'x-backend-record-type':
|
||||
'shard'}
|
||||
shard_ranges.sort(key=ShardRange.sort_key)
|
||||
mock_response.body = json.dumps(
|
||||
[dict(sr) for sr in shard_ranges])
|
||||
mock_swift.make_request.return_value = mock_response
|
||||
@ -4520,7 +4661,8 @@ class TestSharder(BaseTestSharder):
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': marker, 'end_marker': end_marker}
|
||||
params = {'format': 'json', 'marker': marker, 'end_marker': end_marker,
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
@ -4591,7 +4733,8 @@ class TestSharder(BaseTestSharder):
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'j', 'end_marker': 'k'}
|
||||
params = {'format': 'json', 'marker': 'j', 'end_marker': 'k',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
@ -4605,7 +4748,7 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertEqual([own_shard_range],
|
||||
broker.get_shard_ranges(include_own=True))
|
||||
|
||||
# move root shard range to shrinking state
|
||||
# move root version of own shard range to shrinking state
|
||||
root_ts = next(self.ts_iter)
|
||||
self.assertTrue(shard_ranges[1].update_state(ShardRange.SHRINKING,
|
||||
state_timestamp=root_ts))
|
||||
@ -4626,7 +4769,8 @@ class TestSharder(BaseTestSharder):
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'k', 'end_marker': 't'}
|
||||
params = {'format': 'json', 'marker': 'k', 'end_marker': 't',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
@ -4659,7 +4803,8 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertFalse(lines[2:])
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
self.assertFalse(broker.is_deleted())
|
||||
params = {'format': 'json', 'marker': 'j', 'end_marker': 'k'}
|
||||
params = {'format': 'json', 'marker': 'j', 'end_marker': 'k',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
@ -4748,7 +4893,8 @@ class TestSharder(BaseTestSharder):
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'k', 'end_marker': 't'}
|
||||
params = {'format': 'json', 'marker': 'k', 'end_marker': 't',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
@ -4805,6 +4951,61 @@ class TestSharder(BaseTestSharder):
|
||||
self._do_test_audit_shard_container_merge_other_ranges('Quoted-Root',
|
||||
'a/c')
|
||||
|
||||
def _do_test_audit_shard_container_with_root_ranges(self, *args):
|
||||
# shards may merge acceptors and the root range when shrinking; verify
|
||||
# that shard audit is ok with merged ranges
|
||||
def check_audit(own_state, acceptor_state, root_state):
|
||||
broker = self._make_broker(
|
||||
account='.shards_a',
|
||||
container='shard_c_%s' % next(self.ts_iter).normal)
|
||||
broker.set_sharding_sysmeta(*args)
|
||||
own_sr = broker.get_own_shard_range().copy(
|
||||
state=own_state, state_timestamp=next(self.ts_iter),
|
||||
lower='a', upper='b', timestamp=next(self.ts_iter))
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
|
||||
# make acceptor and root ranges that overlap with the shard
|
||||
overlaps = self._make_shard_ranges([('a', 'c'), ('', '')],
|
||||
[acceptor_state, root_state])
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, [own_sr] + overlaps)
|
||||
expected_headers = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'a', 'end_marker': 'b',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
if own_state in (ShardRange.SHRINKING, ShardRange.SHRUNK):
|
||||
# check acceptor & root are merged into audited shard
|
||||
self.assertEqual(
|
||||
[dict(sr) for sr in overlaps],
|
||||
[dict(sr) for sr in broker.get_shard_ranges()])
|
||||
return sharder
|
||||
|
||||
def assert_ok(own_state, acceptor_state, root_state):
|
||||
sharder = check_audit(own_state, acceptor_state, root_state)
|
||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
|
||||
with annotate_failure('with states %s %s %s'
|
||||
% (own_state, acceptor_state, root_state)):
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
for own_state in ShardRange.STATES:
|
||||
for acceptor_state in ShardRange.STATES:
|
||||
for root_state in ShardRange.STATES:
|
||||
assert_ok(own_state, acceptor_state, root_state)
|
||||
|
||||
def test_audit_old_style_shard_container_with_root_ranges(self):
|
||||
self._do_test_audit_shard_container_with_root_ranges('Root', 'a/c')
|
||||
|
||||
def test_audit_shard_container_with_root_ranges(self):
|
||||
self._do_test_audit_shard_container_with_root_ranges('Quoted-Root',
|
||||
'a/c')
|
||||
|
||||
def test_audit_deleted_range_in_root_container(self):
|
||||
broker = self._make_broker(account='.shards_a', container='shard_c')
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
@ -5598,13 +5799,8 @@ class TestCleavingContext(BaseTestSharder):
|
||||
self.assertEqual(4, ctx.ranges_todo)
|
||||
self.assertEqual('b', ctx.cursor)
|
||||
|
||||
ctx.range_done(None)
|
||||
self.assertEqual(2, ctx.ranges_done)
|
||||
self.assertEqual(3, ctx.ranges_todo)
|
||||
self.assertEqual('b', ctx.cursor)
|
||||
|
||||
ctx.ranges_todo = 9
|
||||
ctx.range_done('c')
|
||||
self.assertEqual(3, ctx.ranges_done)
|
||||
self.assertEqual(2, ctx.ranges_done)
|
||||
self.assertEqual(8, ctx.ranges_todo)
|
||||
self.assertEqual('c', ctx.cursor)
|
||||
|
Loading…
x
Reference in New Issue
Block a user