sharder: always merge child shard ranges fetched from root
While the sharder is auditing shard container DBs it would previously only merge shard ranges fetched from root into the shard DB if the shard was shrinking; shrinking is the only time when a shard normally *must* receive sub-shards from the root. With this patch the sharder will also merge shard ranges fetched from the root if they are known to be the children of the shard, regardless of the state of the shard. Children shard ranges would previously only have been merged during replication with peers of the shard; merging shard-ranges from the root during audit potentially speeds their propagation to peers that have yet to replicate. Change-Id: I57aafc537ff94b081d0e1ea70e7fb7dd3598c61e
This commit is contained in:
parent
f15b92084f
commit
a46f2324ab
@ -1201,6 +1201,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
shard range is found in ``shard_ranges``.
|
||||
"""
|
||||
own_shard_range_from_root = None
|
||||
children_shard_ranges = []
|
||||
other_shard_ranges = []
|
||||
for shard_range in shard_ranges:
|
||||
# look for this shard range in the list of shard ranges received
|
||||
@ -1222,9 +1223,20 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
self.logger.info(
|
||||
'Updated own shard range from %s to %s',
|
||||
orig_own_shard_range, own_shard_range)
|
||||
elif shard_range.is_child_of(own_shard_range):
|
||||
children_shard_ranges.append(shard_range)
|
||||
else:
|
||||
other_shard_ranges.append(shard_range)
|
||||
|
||||
if children_shard_ranges and not broker.is_sharded():
|
||||
# Merging shard ranges from the root is only necessary until this
|
||||
# DB is fully cleaved and reaches SHARDED DB state, after which it
|
||||
# is useful for debugging for the set of sub-shards to which a
|
||||
# shards has sharded to be frozen.
|
||||
self.logger.debug('Updating %d children shard ranges from root',
|
||||
len(children_shard_ranges))
|
||||
broker.merge_shard_ranges(children_shard_ranges)
|
||||
|
||||
if (other_shard_ranges and
|
||||
own_shard_range.state in ShardRange.SHRINKING_STATES):
|
||||
# If own_shard_range state is shrinking, save off *all* shards
|
||||
|
@ -1106,60 +1106,40 @@ class TestContainerSharding(BaseAutoContainerSharding):
|
||||
|
||||
# but third replica still has no idea it should be sharding
|
||||
self.assertLengthEqual(found_for_shard['normal_dbs'], 3)
|
||||
self.assertEqual(
|
||||
ShardRange.ACTIVE,
|
||||
ContainerBroker(
|
||||
found_for_shard['normal_dbs'][2]).get_own_shard_range().state)
|
||||
broker = ContainerBroker(found_for_shard['normal_dbs'][2])
|
||||
self.assertEqual(ShardRange.ACTIVE, broker.get_own_shard_range().state)
|
||||
|
||||
# ...but once sharder runs on third replica it will learn its state;
|
||||
# note that any root replica on the stopped container server also won't
|
||||
# know about the shards being in sharding state, so leave that server
|
||||
# stopped for now so that shard fetches its state from an up-to-date
|
||||
# root replica
|
||||
# ...but once sharder runs on third replica it will learn its state and
|
||||
# fetch its sub-shard ranges durng audit; note that any root replica on
|
||||
# the stopped container server also won't know about the shards being
|
||||
# in sharding state, so leave that server stopped for now so that shard
|
||||
# fetches its state from an up-to-date root replica
|
||||
self.sharders.once(
|
||||
number=shard_1_nodes[2],
|
||||
additional_args='--partitions=%s' % shard_1_part)
|
||||
|
||||
# third replica is sharding but has no sub-shard ranges yet...
|
||||
# third replica is sharding and has sub-shard ranges so can start
|
||||
# cleaving...
|
||||
found_for_shard = self.categorize_container_dir_content(
|
||||
shard_1.account, shard_1.container)
|
||||
self.assertLengthEqual(found_for_shard['shard_dbs'], 2)
|
||||
self.assertLengthEqual(found_for_shard['shard_dbs'], 3)
|
||||
self.assertLengthEqual(found_for_shard['normal_dbs'], 3)
|
||||
broker = ContainerBroker(found_for_shard['normal_dbs'][2])
|
||||
self.assertEqual('unsharded', broker.get_db_state())
|
||||
sharding_broker = ContainerBroker(found_for_shard['normal_dbs'][2])
|
||||
self.assertEqual('sharding', sharding_broker.get_db_state())
|
||||
self.assertEqual(
|
||||
ShardRange.SHARDING, broker.get_own_shard_range().state)
|
||||
self.assertFalse(broker.get_shard_ranges())
|
||||
ShardRange.SHARDING, sharding_broker.get_own_shard_range().state)
|
||||
self.assertEqual(3, len(sharding_broker.get_shard_ranges()))
|
||||
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual([], contexts) # length check
|
||||
|
||||
# ...until sub-shard ranges are replicated from another shard replica;
|
||||
# there may also be a sub-shard replica missing so run replicators on
|
||||
# all nodes to fix that if necessary
|
||||
self.brain.servers.start(number=shard_1_nodes[2])
|
||||
self.replicators.once()
|
||||
|
||||
# Now that the replicators have all run, third replica sees cleaving
|
||||
# contexts for the first two
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual(len(contexts), 2)
|
||||
|
||||
# now run sharder again on third replica
|
||||
self.sharders.once(
|
||||
number=shard_1_nodes[2],
|
||||
additional_args='--partitions=%s' % shard_1_part)
|
||||
sharding_broker = ContainerBroker(found_for_shard['normal_dbs'][2])
|
||||
self.assertEqual('sharding', sharding_broker.get_db_state())
|
||||
|
||||
broker_id = broker.get_info()['id']
|
||||
# Old, unsharded DB doesn't have the context...
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual(len(contexts), 2)
|
||||
self.assertNotIn(broker_id, [ctx[0].ref for ctx in contexts])
|
||||
# ...but the sharding one does
|
||||
# contexts for the first two (plus its own cleaving context)
|
||||
contexts = list(CleavingContext.load_all(sharding_broker))
|
||||
self.assertEqual(len(contexts), 3)
|
||||
broker_id = broker.get_info()['id']
|
||||
self.assertIn(broker_id, [ctx[0].ref for ctx in contexts])
|
||||
|
||||
# check original first shard range state and sub-shards - all replicas
|
||||
|
@ -6026,6 +6026,155 @@ class TestSharder(BaseTestSharder):
|
||||
warning_lines[1])
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
def test_audit_shard_container_children_merged_while_sharding(self):
|
||||
# Verify that sharding shard will always merge children shard ranges
|
||||
def do_test(child_deleted, child_state):
|
||||
root_sr = ShardRange('a/root', next(self.ts_iter),
|
||||
state=ShardRange.SHARDED)
|
||||
parent_path = ShardRange.make_path(
|
||||
'.shards_a', 'root', root_sr.container,
|
||||
next(self.ts_iter), 2)
|
||||
parent_sr = ShardRange(
|
||||
parent_path, next(self.ts_iter), 'a', 'd',
|
||||
state=ShardRange.SHARDING)
|
||||
child_srs = []
|
||||
for i, lower, upper in ((0, 'a', 'b'), (0, 'b', 'd')):
|
||||
child_path = ShardRange.make_path(
|
||||
'.shards_a', 'root', parent_sr.container,
|
||||
next(self.ts_iter), i)
|
||||
child_sr = ShardRange(
|
||||
child_path, next(self.ts_iter), lower, upper,
|
||||
state=child_state, deleted=child_deleted)
|
||||
self.assertTrue(child_sr.is_child_of(parent_sr))
|
||||
child_srs.append(child_sr)
|
||||
other_path = ShardRange.make_path(
|
||||
'.shards_a', 'root', root_sr.container,
|
||||
next(self.ts_iter), 3) # different index w.r.t. parent
|
||||
other_sr = ShardRange(
|
||||
other_path, next(self.ts_iter), 'a', 'd',
|
||||
state=ShardRange.ACTIVE)
|
||||
self.assertFalse(other_sr.is_child_of(parent_sr))
|
||||
|
||||
# the parent is sharding...
|
||||
broker = self._make_broker(account=parent_sr.account,
|
||||
container=parent_sr.container)
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
broker.merge_shard_ranges(parent_sr)
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
self.assertEqual([parent_sr],
|
||||
broker.get_shard_ranges(include_own=True))
|
||||
|
||||
ranges_from_root = child_srs + [parent_sr, root_sr, other_sr]
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, ranges_from_root)
|
||||
expected_headers = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'a', 'end_marker': 'd',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers,
|
||||
acceptable_statuses=(2,), params=params)
|
||||
|
||||
self._assert_shard_ranges_equal(
|
||||
sorted(child_srs + [parent_sr],
|
||||
key=ShardRange.sort_key),
|
||||
sorted(broker.get_shard_ranges(
|
||||
include_own=True, include_deleted=True),
|
||||
key=ShardRange.sort_key))
|
||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
for child_deleted in (False, True):
|
||||
for child_state in ShardRange.STATES:
|
||||
with annotate_failure('deleted: %s, state: %s'
|
||||
% (child_deleted, child_state)):
|
||||
do_test(child_deleted, child_state)
|
||||
|
||||
def test_audit_shard_container_children_not_merged_once_sharded(self):
|
||||
# Verify that sharding shard will not merge children shard ranges
|
||||
# once the DB is sharded (but continues to merge own shard range
|
||||
# received from root)
|
||||
root_sr = ShardRange('a/root', next(self.ts_iter),
|
||||
state=ShardRange.SHARDED)
|
||||
ts = next(self.ts_iter)
|
||||
parent_path = ShardRange.make_path(
|
||||
'.shards_a', 'root', root_sr.container, ts, 2)
|
||||
parent_sr = ShardRange(
|
||||
parent_path, ts, 'a', 'b', state=ShardRange.ACTIVE, epoch=ts)
|
||||
child_srs = []
|
||||
for i, lower, upper in ((0, 'a', 'ab'), (0, 'ab', 'b')):
|
||||
child_path = ShardRange.make_path(
|
||||
'.shards_a', 'root', parent_sr.container,
|
||||
next(self.ts_iter), i)
|
||||
child_sr = ShardRange(
|
||||
child_path, next(self.ts_iter), lower, upper,
|
||||
state=ShardRange.CLEAVED)
|
||||
self.assertTrue(child_sr.is_child_of(parent_sr))
|
||||
child_srs.append(child_sr)
|
||||
|
||||
# DB is unsharded...
|
||||
broker = self._make_broker(account=parent_sr.account,
|
||||
container=parent_sr.container)
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
broker.merge_shard_ranges(parent_sr)
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
|
||||
self.assertTrue(parent_sr.update_state(
|
||||
ShardRange.SHARDING, state_timestamp=next(self.ts_iter)))
|
||||
ranges_from_root = child_srs + [parent_sr, root_sr]
|
||||
sharder, _ = self.call_audit_container(broker, ranges_from_root)
|
||||
|
||||
# children ranges from root are merged
|
||||
self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges())
|
||||
# own sr from root is merged
|
||||
self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY),
|
||||
dict(broker.get_own_shard_range()))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
# DB is sharding...
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
self.assertEqual(SHARDING, broker.get_db_state())
|
||||
parent_sr.state_timestamp = next(self.ts_iter)
|
||||
for child_sr in child_srs:
|
||||
child_sr.update_state(ShardRange.ACTIVE,
|
||||
state_timestamp=next(self.ts_iter))
|
||||
|
||||
sharder, _ = self.call_audit_container(broker, ranges_from_root)
|
||||
|
||||
# children ranges from root are merged
|
||||
self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges())
|
||||
# own sr from root is merged
|
||||
self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY),
|
||||
dict(broker.get_own_shard_range()))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
# DB is sharded...
|
||||
self.assertTrue(broker.set_sharded_state())
|
||||
self.assertEqual(SHARDED, broker.get_db_state())
|
||||
self.assertTrue(parent_sr.update_state(
|
||||
ShardRange.SHARDED, state_timestamp=next(self.ts_iter)))
|
||||
updated_child_srs = [
|
||||
child_sr.copy(state=ShardRange.SHARDING,
|
||||
state_timestamp=next(self.ts_iter))
|
||||
for child_sr in child_srs]
|
||||
|
||||
ranges_from_root = updated_child_srs + [parent_sr, root_sr]
|
||||
sharder, _ = self.call_audit_container(broker, ranges_from_root)
|
||||
|
||||
# children ranges from root are NOT merged
|
||||
self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges())
|
||||
# own sr from root is merged
|
||||
self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY),
|
||||
dict(broker.get_own_shard_range()))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
def test_audit_shard_deleted_range_in_root_container(self):
|
||||
# verify that shard DB is marked deleted when its own shard range is
|
||||
# updated with deleted version from root
|
||||
|
Loading…
x
Reference in New Issue
Block a user