diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 958418274a..1812de74bb 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -1660,6 +1660,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): def _cleave_shard_broker(self, broker, cleaving_context, shard_range, own_shard_range, shard_broker, put_timestamp, shard_part, node_id): + result = CLEAVE_SUCCESS start = time.time() # only cleave from the retiring db - misplaced objects handler will # deal with any objects in the fresh db @@ -1685,15 +1686,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # This was just created; don't need to replicate this # SR because there was nothing there. So cleanup and # remove the shard_broker from its hand off location. - self.delete_db(shard_broker) - cleaving_context.range_done(shard_range.upper_str) - if shard_range.upper >= own_shard_range.upper: - # cleaving complete - cleaving_context.cleaving_done = True - cleaving_context.store(broker) # Because nothing was here we wont count it in the shard # batch count. - return CLEAVE_EMPTY + result = CLEAVE_EMPTY # Else, it wasn't newly created by us, and # we don't know what's in it or why. Let it get # replicated and counted in the batch count. @@ -1742,40 +1737,46 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): shard_broker.merge_shard_ranges(shard_range) replication_quorum = self.shard_replication_quorum - self.logger.info( - 'Replicating new shard container %s for %s', - quote(shard_broker.path), own_shard_range) + if result == CLEAVE_EMPTY: + self.delete_db(shard_broker) + else: # result == CLEAVE_SUCCESS: + self.logger.info( + 'Replicating new shard container %s for %s', + quote(shard_broker.path), own_shard_range) - success, responses = self._replicate_object( - shard_part, shard_broker.db_file, node_id) + success, responses = self._replicate_object( + shard_part, shard_broker.db_file, node_id) - replication_successes = responses.count(True) - if (not success and (not responses or - replication_successes < replication_quorum)): - # insufficient replication or replication not even attempted; - # break because we don't want to progress the cleave cursor - # until each shard range has been successfully cleaved - self.logger.warning( - 'Failed to sufficiently replicate cleaved shard %s for %s: ' - '%s successes, %s required.', shard_range, quote(broker.path), - replication_successes, replication_quorum) - self._increment_stat('cleaved', 'failure', statsd=True) - return CLEAVE_FAILED + replication_successes = responses.count(True) + if (not success and (not responses or + replication_successes < replication_quorum)): + # insufficient replication or replication not even attempted; + # break because we don't want to progress the cleave cursor + # until each shard range has been successfully cleaved + self.logger.warning( + 'Failed to sufficiently replicate cleaved shard %s for %s:' + ' %s successes, %s required.', shard_range, + quote(broker.path), + replication_successes, replication_quorum) + self._increment_stat('cleaved', 'failure', statsd=True) + result = CLEAVE_FAILED + else: + elapsed = round(time.time() - start, 3) + self._min_stat('cleaved', 'min_time', elapsed) + self._max_stat('cleaved', 'max_time', elapsed) + self.logger.info( + 'Cleaved %s for shard range %s in %gs.', + quote(broker.path), shard_range, elapsed) + self._increment_stat('cleaved', 'success', statsd=True) - elapsed = round(time.time() - start, 3) - self._min_stat('cleaved', 'min_time', elapsed) - self._max_stat('cleaved', 'max_time', elapsed) - broker.merge_shard_ranges(shard_range) - cleaving_context.range_done(shard_range.upper_str) - if shard_range.upper >= own_shard_range.upper: - # cleaving complete - cleaving_context.cleaving_done = True - cleaving_context.store(broker) - self.logger.info( - 'Cleaved %s for shard range %s in %gs.', - quote(broker.path), shard_range, elapsed) - self._increment_stat('cleaved', 'success', statsd=True) - return CLEAVE_SUCCESS + if result in (CLEAVE_SUCCESS, CLEAVE_EMPTY): + broker.merge_shard_ranges(shard_range) + cleaving_context.range_done(shard_range.upper_str) + if shard_range.upper >= own_shard_range.upper: + # cleaving complete + cleaving_context.cleaving_done = True + cleaving_context.store(broker) + return result def _cleave_shard_range(self, broker, cleaving_context, shard_range, own_shard_range): diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 082f5163e9..22a6d4ed22 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -1332,6 +1332,81 @@ class TestContainerSharding(BaseAutoContainerSharding): def test_sharded_listing_with_replicators(self): self._test_sharded_listing(run_replicators=True) + def test_listing_under_populated_replica(self): + # the leader node and one other primary have all the objects and will + # cleave to 4 shard ranges, but the third primary only has 1 object in + # the final shard range + obj_names = self._make_object_names(2 * self.max_shard_size) + self.brain.servers.stop(number=self.brain.node_numbers[2]) + self.put_objects(obj_names) + self.brain.servers.start(number=self.brain.node_numbers[2]) + subset_obj_names = [obj_names[-1]] + self.put_objects(subset_obj_names) + self.brain.servers.stop(number=self.brain.node_numbers[2]) + + # sanity check: the first 2 primaries will list all objects + self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'}) + + # Run sharder on the fully populated nodes, starting with the leader + client.post_container(self.url, self.admin_token, self.container_name, + headers={'X-Container-Sharding': 'on'}) + self.sharders.once(number=self.brain.node_numbers[0], + additional_args='--partitions=%s' % self.brain.part) + self.sharders.once(number=self.brain.node_numbers[1], + additional_args='--partitions=%s' % self.brain.part) + + # Verify that the first 2 primary nodes have cleaved the first batch of + # 2 shard ranges + broker = self.get_broker(self.brain.part, self.brain.nodes[0]) + self.assertEqual('sharding', broker.get_db_state()) + shard_ranges = [dict(sr) for sr in broker.get_shard_ranges()] + self.assertLengthEqual(shard_ranges, 4) + self.assertEqual([ShardRange.CLEAVED, ShardRange.CLEAVED, + ShardRange.CREATED, ShardRange.CREATED], + [sr['state'] for sr in shard_ranges]) + self.assertEqual( + {False}, + set([ctx.done() for ctx, _ in CleavingContext.load_all(broker)])) + + # listing is complete (from the fully populated primaries at least); + # the root serves the listing parts for the last 2 shard ranges which + # are not yet cleaved + self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'}) + + # Run the sharder on the under-populated node to get it fully + # cleaved. + self.brain.servers.start(number=self.brain.node_numbers[2]) + Manager(['container-replicator']).once( + number=self.brain.node_numbers[2]) + self.sharders.once(number=self.brain.node_numbers[2], + additional_args='--partitions=%s' % self.brain.part) + + broker = self.get_broker(self.brain.part, self.brain.nodes[2]) + self.assertEqual('sharded', broker.get_db_state()) + shard_ranges = [dict(sr) for sr in broker.get_shard_ranges()] + self.assertLengthEqual(shard_ranges, 4) + self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE, + ShardRange.ACTIVE, ShardRange.ACTIVE], + [sr['state'] for sr in shard_ranges]) + self.assertEqual( + {True, False}, + set([ctx.done() for ctx, _ in CleavingContext.load_all(broker)])) + + # Get a consistent view of shard range states then check listing + Manager(['container-replicator']).once( + number=self.brain.node_numbers[2]) + # oops, the listing is incomplete because the last 2 listing parts are + # now served by the under-populated shard ranges. + self.assert_container_listing( + obj_names[:self.max_shard_size] + subset_obj_names, + req_hdrs={'x-newest': 'true'}) + + # but once another replica has completed cleaving the listing is + # complete again + self.sharders.once(number=self.brain.node_numbers[1], + additional_args='--partitions=%s' % self.brain.part) + self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'}) + def test_async_pendings(self): obj_names = self._make_object_names(self.max_shard_size * 2) diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 1ffb0e16a4..e0beaceb2b 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -1663,6 +1663,9 @@ class TestSharder(BaseTestSharder): self.assertEqual(cleaving_context.ranges_todo, 0) self.assertTrue(cleaving_context.cleaving_done) + self.assertEqual([ShardRange.CLEAVED] * 3, + [sr.state for sr in broker.get_shard_ranges()]) + def test_cleave_root_empty_db_with_pre_existing_shard_db_handoff(self): broker = self._make_broker() broker.enable_sharding(Timestamp.now()) @@ -1695,6 +1698,10 @@ class TestSharder(BaseTestSharder): self.assertEqual(cleaving_context.ranges_todo, 2) self.assertFalse(cleaving_context.cleaving_done) + self.assertEqual( + [ShardRange.CLEAVED, ShardRange.CREATED, ShardRange.CREATED], + [sr.state for sr in broker.get_shard_ranges()]) + def test_cleave_shard_range_no_own_shard_range(self): # create an unsharded broker that has shard ranges but no # own_shard_range, verify that it does not cleave...