sharder: always set state to CLEAVED after cleaving
During cleaving, if the sharder finds that zero object rows are copied from the parent retiring DB to a cleaved shard DB, and if that shard DB appears have been freshly created by the cleaving process, then the sharder skips replicating that shard DB and does not count the shard range as part of the batch (see Related-Change). Previously, any shard range treated in this way would not have its state moved to CLEAVED but would remain in the CREATED state. However, cleaving of following shard ranges does continue, leading to anomalous sets of shard range states, including all other shard ranges moving to ACTIVE but the skipped range remaining in CREATED (until another sharder visitation finds object rows and actually replicates the cleaved shard DB). These anomalies can be avoided by moving the skipped shard range to the CLEAVED state. This is exactly what would happen anyway if the cleaved DB had only one object row copied to it, or if the cleaved DB had zero object rows copied to it but happened to already exist on disk. Related-Change: Id338f6c3187f93454bcdf025a32a073284a4a159 Change-Id: I1ca7bf42ee03a169261d8c6feffc38b53226c97f
This commit is contained in:
parent
2e2251638c
commit
57f7145f73
@ -1660,6 +1660,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
def _cleave_shard_broker(self, broker, cleaving_context, shard_range,
|
||||
own_shard_range, shard_broker, put_timestamp,
|
||||
shard_part, node_id):
|
||||
result = CLEAVE_SUCCESS
|
||||
start = time.time()
|
||||
# only cleave from the retiring db - misplaced objects handler will
|
||||
# deal with any objects in the fresh db
|
||||
@ -1685,15 +1686,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
# This was just created; don't need to replicate this
|
||||
# SR because there was nothing there. So cleanup and
|
||||
# remove the shard_broker from its hand off location.
|
||||
self.delete_db(shard_broker)
|
||||
cleaving_context.range_done(shard_range.upper_str)
|
||||
if shard_range.upper >= own_shard_range.upper:
|
||||
# cleaving complete
|
||||
cleaving_context.cleaving_done = True
|
||||
cleaving_context.store(broker)
|
||||
# Because nothing was here we wont count it in the shard
|
||||
# batch count.
|
||||
return CLEAVE_EMPTY
|
||||
result = CLEAVE_EMPTY
|
||||
# Else, it wasn't newly created by us, and
|
||||
# we don't know what's in it or why. Let it get
|
||||
# replicated and counted in the batch count.
|
||||
@ -1742,40 +1737,46 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
shard_broker.merge_shard_ranges(shard_range)
|
||||
replication_quorum = self.shard_replication_quorum
|
||||
|
||||
self.logger.info(
|
||||
'Replicating new shard container %s for %s',
|
||||
quote(shard_broker.path), own_shard_range)
|
||||
if result == CLEAVE_EMPTY:
|
||||
self.delete_db(shard_broker)
|
||||
else: # result == CLEAVE_SUCCESS:
|
||||
self.logger.info(
|
||||
'Replicating new shard container %s for %s',
|
||||
quote(shard_broker.path), own_shard_range)
|
||||
|
||||
success, responses = self._replicate_object(
|
||||
shard_part, shard_broker.db_file, node_id)
|
||||
success, responses = self._replicate_object(
|
||||
shard_part, shard_broker.db_file, node_id)
|
||||
|
||||
replication_successes = responses.count(True)
|
||||
if (not success and (not responses or
|
||||
replication_successes < replication_quorum)):
|
||||
# insufficient replication or replication not even attempted;
|
||||
# break because we don't want to progress the cleave cursor
|
||||
# until each shard range has been successfully cleaved
|
||||
self.logger.warning(
|
||||
'Failed to sufficiently replicate cleaved shard %s for %s: '
|
||||
'%s successes, %s required.', shard_range, quote(broker.path),
|
||||
replication_successes, replication_quorum)
|
||||
self._increment_stat('cleaved', 'failure', statsd=True)
|
||||
return CLEAVE_FAILED
|
||||
replication_successes = responses.count(True)
|
||||
if (not success and (not responses or
|
||||
replication_successes < replication_quorum)):
|
||||
# insufficient replication or replication not even attempted;
|
||||
# break because we don't want to progress the cleave cursor
|
||||
# until each shard range has been successfully cleaved
|
||||
self.logger.warning(
|
||||
'Failed to sufficiently replicate cleaved shard %s for %s:'
|
||||
' %s successes, %s required.', shard_range,
|
||||
quote(broker.path),
|
||||
replication_successes, replication_quorum)
|
||||
self._increment_stat('cleaved', 'failure', statsd=True)
|
||||
result = CLEAVE_FAILED
|
||||
else:
|
||||
elapsed = round(time.time() - start, 3)
|
||||
self._min_stat('cleaved', 'min_time', elapsed)
|
||||
self._max_stat('cleaved', 'max_time', elapsed)
|
||||
self.logger.info(
|
||||
'Cleaved %s for shard range %s in %gs.',
|
||||
quote(broker.path), shard_range, elapsed)
|
||||
self._increment_stat('cleaved', 'success', statsd=True)
|
||||
|
||||
elapsed = round(time.time() - start, 3)
|
||||
self._min_stat('cleaved', 'min_time', elapsed)
|
||||
self._max_stat('cleaved', 'max_time', elapsed)
|
||||
broker.merge_shard_ranges(shard_range)
|
||||
cleaving_context.range_done(shard_range.upper_str)
|
||||
if shard_range.upper >= own_shard_range.upper:
|
||||
# cleaving complete
|
||||
cleaving_context.cleaving_done = True
|
||||
cleaving_context.store(broker)
|
||||
self.logger.info(
|
||||
'Cleaved %s for shard range %s in %gs.',
|
||||
quote(broker.path), shard_range, elapsed)
|
||||
self._increment_stat('cleaved', 'success', statsd=True)
|
||||
return CLEAVE_SUCCESS
|
||||
if result in (CLEAVE_SUCCESS, CLEAVE_EMPTY):
|
||||
broker.merge_shard_ranges(shard_range)
|
||||
cleaving_context.range_done(shard_range.upper_str)
|
||||
if shard_range.upper >= own_shard_range.upper:
|
||||
# cleaving complete
|
||||
cleaving_context.cleaving_done = True
|
||||
cleaving_context.store(broker)
|
||||
return result
|
||||
|
||||
def _cleave_shard_range(self, broker, cleaving_context, shard_range,
|
||||
own_shard_range):
|
||||
|
@ -1332,6 +1332,81 @@ class TestContainerSharding(BaseAutoContainerSharding):
|
||||
def test_sharded_listing_with_replicators(self):
|
||||
self._test_sharded_listing(run_replicators=True)
|
||||
|
||||
def test_listing_under_populated_replica(self):
|
||||
# the leader node and one other primary have all the objects and will
|
||||
# cleave to 4 shard ranges, but the third primary only has 1 object in
|
||||
# the final shard range
|
||||
obj_names = self._make_object_names(2 * self.max_shard_size)
|
||||
self.brain.servers.stop(number=self.brain.node_numbers[2])
|
||||
self.put_objects(obj_names)
|
||||
self.brain.servers.start(number=self.brain.node_numbers[2])
|
||||
subset_obj_names = [obj_names[-1]]
|
||||
self.put_objects(subset_obj_names)
|
||||
self.brain.servers.stop(number=self.brain.node_numbers[2])
|
||||
|
||||
# sanity check: the first 2 primaries will list all objects
|
||||
self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'})
|
||||
|
||||
# Run sharder on the fully populated nodes, starting with the leader
|
||||
client.post_container(self.url, self.admin_token, self.container_name,
|
||||
headers={'X-Container-Sharding': 'on'})
|
||||
self.sharders.once(number=self.brain.node_numbers[0],
|
||||
additional_args='--partitions=%s' % self.brain.part)
|
||||
self.sharders.once(number=self.brain.node_numbers[1],
|
||||
additional_args='--partitions=%s' % self.brain.part)
|
||||
|
||||
# Verify that the first 2 primary nodes have cleaved the first batch of
|
||||
# 2 shard ranges
|
||||
broker = self.get_broker(self.brain.part, self.brain.nodes[0])
|
||||
self.assertEqual('sharding', broker.get_db_state())
|
||||
shard_ranges = [dict(sr) for sr in broker.get_shard_ranges()]
|
||||
self.assertLengthEqual(shard_ranges, 4)
|
||||
self.assertEqual([ShardRange.CLEAVED, ShardRange.CLEAVED,
|
||||
ShardRange.CREATED, ShardRange.CREATED],
|
||||
[sr['state'] for sr in shard_ranges])
|
||||
self.assertEqual(
|
||||
{False},
|
||||
set([ctx.done() for ctx, _ in CleavingContext.load_all(broker)]))
|
||||
|
||||
# listing is complete (from the fully populated primaries at least);
|
||||
# the root serves the listing parts for the last 2 shard ranges which
|
||||
# are not yet cleaved
|
||||
self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'})
|
||||
|
||||
# Run the sharder on the under-populated node to get it fully
|
||||
# cleaved.
|
||||
self.brain.servers.start(number=self.brain.node_numbers[2])
|
||||
Manager(['container-replicator']).once(
|
||||
number=self.brain.node_numbers[2])
|
||||
self.sharders.once(number=self.brain.node_numbers[2],
|
||||
additional_args='--partitions=%s' % self.brain.part)
|
||||
|
||||
broker = self.get_broker(self.brain.part, self.brain.nodes[2])
|
||||
self.assertEqual('sharded', broker.get_db_state())
|
||||
shard_ranges = [dict(sr) for sr in broker.get_shard_ranges()]
|
||||
self.assertLengthEqual(shard_ranges, 4)
|
||||
self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE,
|
||||
ShardRange.ACTIVE, ShardRange.ACTIVE],
|
||||
[sr['state'] for sr in shard_ranges])
|
||||
self.assertEqual(
|
||||
{True, False},
|
||||
set([ctx.done() for ctx, _ in CleavingContext.load_all(broker)]))
|
||||
|
||||
# Get a consistent view of shard range states then check listing
|
||||
Manager(['container-replicator']).once(
|
||||
number=self.brain.node_numbers[2])
|
||||
# oops, the listing is incomplete because the last 2 listing parts are
|
||||
# now served by the under-populated shard ranges.
|
||||
self.assert_container_listing(
|
||||
obj_names[:self.max_shard_size] + subset_obj_names,
|
||||
req_hdrs={'x-newest': 'true'})
|
||||
|
||||
# but once another replica has completed cleaving the listing is
|
||||
# complete again
|
||||
self.sharders.once(number=self.brain.node_numbers[1],
|
||||
additional_args='--partitions=%s' % self.brain.part)
|
||||
self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'})
|
||||
|
||||
def test_async_pendings(self):
|
||||
obj_names = self._make_object_names(self.max_shard_size * 2)
|
||||
|
||||
|
@ -1663,6 +1663,9 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertEqual(cleaving_context.ranges_todo, 0)
|
||||
self.assertTrue(cleaving_context.cleaving_done)
|
||||
|
||||
self.assertEqual([ShardRange.CLEAVED] * 3,
|
||||
[sr.state for sr in broker.get_shard_ranges()])
|
||||
|
||||
def test_cleave_root_empty_db_with_pre_existing_shard_db_handoff(self):
|
||||
broker = self._make_broker()
|
||||
broker.enable_sharding(Timestamp.now())
|
||||
@ -1695,6 +1698,10 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertEqual(cleaving_context.ranges_todo, 2)
|
||||
self.assertFalse(cleaving_context.cleaving_done)
|
||||
|
||||
self.assertEqual(
|
||||
[ShardRange.CLEAVED, ShardRange.CREATED, ShardRange.CREATED],
|
||||
[sr.state for sr in broker.get_shard_ranges()])
|
||||
|
||||
def test_cleave_shard_range_no_own_shard_range(self):
|
||||
# create an unsharded broker that has shard ranges but no
|
||||
# own_shard_range, verify that it does not cleave...
|
||||
|
Loading…
x
Reference in New Issue
Block a user