diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 6205c0bc25..7c00716a20 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -1046,12 +1046,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): headers=headers, contents=body) except DirectClientException as err: self.logger.warning( - 'Failed to put shard ranges to %s:%s/%s: %s', - node['ip'], node['port'], node['device'], err.http_status) + 'Failed to put shard ranges to %s:%s/%s %s/%s: %s', + node['ip'], node['port'], node['device'], + quote(account), quote(container), err.http_status) except (Exception, Timeout) as err: self.logger.exception( - 'Failed to put shard ranges to %s:%s/%s: %s', - node['ip'], node['port'], node['device'], err) + 'Failed to put shard ranges to %s:%s/%s %s/%s: %s', + node['ip'], node['port'], node['device'], + quote(account), quote(container), err) else: return True return False @@ -2071,8 +2073,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): def _process_broker(self, broker, node, part): broker.get_info() # make sure account/container are populated state = broker.get_db_state() - self.logger.debug('Starting processing %s state %s', - quote(broker.path), state) + is_deleted = broker.is_deleted() + self.logger.debug('Starting processing %s state %s%s', + quote(broker.path), state, + ' (deleted)' if is_deleted else '') if not self._audit_container(broker): return @@ -2080,13 +2084,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # now look and deal with misplaced objects. self._move_misplaced_objects(broker) - if broker.is_deleted(): - # This container is deleted so we can skip it. We still want - # deleted containers to go via misplaced items because they may - # have new objects sitting in them that may need to move. - return - - is_leader = node['index'] == 0 and self.auto_shard + is_leader = node['index'] == 0 and self.auto_shard and not is_deleted if state in (UNSHARDED, COLLAPSED): if is_leader and broker.is_root_container(): # bootstrap sharding of root container @@ -2138,29 +2136,31 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self.logger.debug('Remaining in sharding state %s', quote(broker.path)) - if state == SHARDED and broker.is_root_container(): - # look for shrink stats - self._identify_shrinking_candidate(broker, node) - if is_leader: - self._find_and_enable_shrinking_candidates(broker) - self._find_and_enable_sharding_candidates(broker) - for shard_range in broker.get_shard_ranges( - states=[ShardRange.SHARDING]): - self._send_shard_ranges( - shard_range.account, shard_range.container, - [shard_range]) + if not broker.is_deleted(): + if state == SHARDED and broker.is_root_container(): + # look for shrink stats + self._identify_shrinking_candidate(broker, node) + if is_leader: + self._find_and_enable_shrinking_candidates(broker) + self._find_and_enable_sharding_candidates(broker) + for shard_range in broker.get_shard_ranges( + states=[ShardRange.SHARDING]): + self._send_shard_ranges( + shard_range.account, shard_range.container, + [shard_range]) - if not broker.is_root_container(): - # Update the root container with this container's shard range - # info; do this even when sharded in case previous attempts - # failed; don't do this if there is no own shard range. When - # sharding a shard, this is when the root will see the new - # shards move to ACTIVE state and the sharded shard - # simultaneously become deleted. - self._update_root_container(broker) + if not broker.is_root_container(): + # Update the root container with this container's shard range + # info; do this even when sharded in case previous attempts + # failed; don't do this if there is no own shard range. When + # sharding a shard, this is when the root will see the new + # shards move to ACTIVE state and the sharded shard + # simultaneously become deleted. + self._update_root_container(broker) - self.logger.debug('Finished processing %s state %s', - quote(broker.path), broker.get_db_state()) + self.logger.debug('Finished processing %s state %s%s', + quote(broker.path), broker.get_db_state(), + ' (deleted)' if is_deleted else '') def _one_shard_cycle(self, devices_to_shard, partitions_to_shard): """ diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 22a6d4ed22..46e83556f5 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -367,13 +367,17 @@ class BaseTestContainerSharding(ReplProbeTest): self.fail('No shard sysmeta found in %s' % headers) def assert_container_state(self, node, expected_state, num_shard_ranges, - account=None, container=None, part=None): + account=None, container=None, part=None, + override_deleted=False): account = account or self.account container = container or self.container_to_shard part = part or self.brain.part + headers = {'X-Backend-Record-Type': 'shard'} + if override_deleted: + headers['x-backend-override-deleted'] = True headers, shard_ranges = direct_client.direct_get_container( node, part, account, container, - headers={'X-Backend-Record-Type': 'shard'}) + headers=headers) self.assertEqual(num_shard_ranges, len(shard_ranges)) self.assertIn('X-Backend-Sharding-State', headers) self.assertEqual( @@ -401,12 +405,17 @@ class BaseTestContainerSharding(ReplProbeTest): shard_range.account, shard_range.container) return part, [n['id'] + 1 for n in nodes] - def run_sharders(self, shard_ranges): + def run_sharders(self, shard_ranges, exclude_partitions=None): """Run the sharder on partitions for given shard ranges.""" if not isinstance(shard_ranges, (list, tuple, set)): shard_ranges = (shard_ranges,) - partitions = ','.join(str(self.get_part_and_node_numbers(sr)[0]) - for sr in shard_ranges) + exclude_partitions = exclude_partitions or [] + shard_parts = [] + for sr in shard_ranges: + sr_part = self.get_part_and_node_numbers(sr)[0] + if sr_part not in exclude_partitions: + shard_parts.append(str(sr_part)) + partitions = ','.join(shard_parts) self.sharders.once(additional_args='--partitions=%s' % partitions) def run_sharder_sequentially(self, shard_range=None): @@ -3586,3 +3595,187 @@ class TestManagedContainerSharding(BaseTestContainerSharding): self.assert_container_listing( [obj_names[0], new_obj_name] + obj_names[1:]) + + def test_manage_shard_ranges_unsharded_deleted_root(self): + # verify that a deleted DB will still be sharded + + # choose a node that will not be sharded initially + sharded_nodes = [] + unsharded_node = None + for node in self.brain.nodes: + if self.brain.node_numbers[node['index']] \ + in self.brain.handoff_numbers: + unsharded_node = node + else: + sharded_nodes.append(node) + + # put some objects - not enough to trigger auto-sharding + obj_names = self._make_object_names(MIN_SHARD_CONTAINER_THRESHOLD - 1) + self.put_objects(obj_names) + + # run replicators first time to get sync points set and commit updates + self.replicators.once() + + # setup sharding... + self.assert_subprocess_success([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, sharded_nodes[0]), + 'find_and_replace', '2', '--enable', '--minimum-shard-size', '1']) + + # Run container-replicator to replicate shard ranges + self.container_replicators.once() + self.assert_container_state(sharded_nodes[0], 'unsharded', 2) + self.assert_container_state(sharded_nodes[1], 'unsharded', 2) + self.assert_container_state(unsharded_node, 'unsharded', 2) + + # Run container-sharder to shard the 2 primary replicas that did + # receive the object PUTs + for num in self.brain.primary_numbers: + self.sharders_once( + number=num, + additional_args='--partitions=%s' % self.brain.part) + + # delete the objects - the proxy's will have cached container info with + # out-of-date db_state=unsharded, so updates go to the root DBs + self.delete_objects(obj_names) + # deal with DELETE's being misplaced in root db's... + for num in self.brain.primary_numbers: + self.sharders_once( + number=num, + additional_args='--partitions=%s' % self.brain.part) + + self.assert_container_state(sharded_nodes[0], 'sharded', 2) + self.assert_container_state(sharded_nodes[1], 'sharded', 2) + shard_ranges = self.assert_container_state( + unsharded_node, 'unsharded', 2) + + # get root stats updated - but avoid sharding the remaining root DB + self.run_sharders(shard_ranges, exclude_partitions=[self.brain.part]) + self.assert_container_listing([]) + + # delete the empty container + client.delete_container(self.url, self.admin_token, + self.container_name) + + # sanity check - unsharded DB is deleted + broker = self.get_broker(self.brain.part, unsharded_node, + self.account, self.container_name) + self.assertEqual(UNSHARDED, broker.get_db_state()) + self.assertTrue(broker.is_deleted()) + self.assertEqual(0, broker.get_info()['object_count']) + self.assertEqual(0, broker.get_shard_usage()['object_count']) + + # now shard the final DB + for num in self.brain.handoff_numbers: + self.sharders_once( + number=num, + additional_args='--partitions=%s' % self.brain.part) + + # all DBs should now be sharded and still deleted + for node in self.brain.nodes: + with annotate_failure( + 'node %s in %s' + % (node['index'], [n['index'] for n in self.brain.nodes])): + self.assert_container_state(node, 'sharded', 2, + override_deleted=True) + broker = self.get_broker(self.brain.part, node, + self.account, self.container_name) + self.assertEqual(SHARDED, broker.get_db_state()) + self.assertEqual(0, broker.get_info()['object_count']) + self.assertEqual(0, + broker.get_shard_usage()['object_count']) + self.assertTrue(broker.is_deleted()) + + def test_manage_shard_ranges_unsharded_deleted_root_gets_undeleted(self): + # verify that an apparently deleted DB (no object rows in root db) will + # still be sharded and also become undeleted when objects are + # discovered in the shards + + # choose a node that will not be sharded initially + sharded_nodes = [] + unsharded_node = None + for node in self.brain.nodes: + if self.brain.node_numbers[node['index']] \ + in self.brain.handoff_numbers: + unsharded_node = node + else: + sharded_nodes.append(node) + + # put some objects, but only to 2 replicas - not enough to trigger + # auto-sharding + self.brain.stop_handoff_half() + + obj_names = self._make_object_names(MIN_SHARD_CONTAINER_THRESHOLD - 1) + self.put_objects(obj_names) + # run replicators first time to get sync points set and commit puts + self.replicators.once() + + # setup sharding... + self.assert_subprocess_success([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, sharded_nodes[0]), + 'find_and_replace', '2', '--enable', '--minimum-shard-size', '1']) + + # Run container-replicator to replicate shard ranges - object rows will + # not be sync'd now there are shard ranges + for num in self.brain.primary_numbers: + self.container_replicators.once(number=num) + self.assert_container_state(sharded_nodes[0], 'unsharded', 2) + self.assert_container_state(sharded_nodes[1], 'unsharded', 2) + + # revive the stopped node + self.brain.start_handoff_half() + self.assert_container_state(unsharded_node, 'unsharded', 0) + + # delete the empty replica + direct_client.direct_delete_container( + unsharded_node, self.brain.part, self.account, + self.container_name) + + # Run container-sharder to shard the 2 primary replicas that did + # receive the object PUTs + for num in self.brain.primary_numbers: + self.sharders_once( + number=num, + additional_args='--partitions=%s' % self.brain.part) + + self.assert_container_state(sharded_nodes[0], 'sharded', 2) + self.assert_container_state(sharded_nodes[1], 'sharded', 2) + # the sharder syncs shard ranges ... + self.assert_container_state(unsharded_node, 'unsharded', 2, + override_deleted=True) + + # sanity check - unsharded DB is empty and deleted + broker = self.get_broker(self.brain.part, unsharded_node, + self.account, self.container_name) + self.assertEqual(UNSHARDED, broker.get_db_state()) + self.assertEqual(0, broker.get_info()['object_count']) + # the shard ranges do have object count but are in CREATED state so + # not reported in shard usage... + self.assertEqual(0, broker.get_shard_usage()['object_count']) + self.assertTrue(broker.is_deleted()) + + # now shard the final DB + for num in self.brain.handoff_numbers: + self.sharders_once( + number=num, + additional_args='--partitions=%s' % self.brain.part) + shard_ranges = self.assert_container_state( + unsharded_node, 'sharded', 2, override_deleted=True) + + # and get roots updated and sync'd + self.container_replicators.once() + self.run_sharders(shard_ranges, exclude_partitions=[self.brain.part]) + + # all DBs should now be sharded and NOT deleted + for node in self.brain.nodes: + with annotate_failure( + 'node %s in %s' + % (node['index'], [n['index'] for n in self.brain.nodes])): + broker = self.get_broker(self.brain.part, node, + self.account, self.container_name) + self.assertEqual(SHARDED, broker.get_db_state()) + self.assertEqual(3, broker.get_info()['object_count']) + self.assertEqual(3, + broker.get_shard_usage()['object_count']) + self.assertFalse(broker.is_deleted()) diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index e0beaceb2b..aab8e57218 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -68,7 +68,7 @@ class BaseTestSharder(unittest.TestCase): [dict(sr) for sr in actual]) def _make_broker(self, account='a', container='c', epoch=None, - device='sda', part=0, hash_=None): + device='sda', part=0, hash_=None, put_timestamp=None): hash_ = hash_ or md5( container.encode('utf-8'), usedforsecurity=False).hexdigest() datadir = os.path.join( @@ -81,7 +81,7 @@ class BaseTestSharder(unittest.TestCase): broker = ContainerBroker( db_file, account=account, container=container, logger=self.logger) - broker.initialize() + broker.initialize(put_timestamp=put_timestamp) return broker def _make_old_style_sharding_broker(self, account='a', container='c', @@ -4784,37 +4784,62 @@ class TestSharder(BaseTestSharder): self.assertFalse(broker.logger.get_lines_for_level('error')) broker.logger.clear() - def _check_process_broker_sharding_no_others(self, state): + def _check_process_broker_sharding_no_others(self, start_state, deleted): # verify that when existing own_shard_range has given state and there - # are other shard ranges then the sharding process will begin - broker = self._make_broker(hash_='hash%s' % state) + # are other shard ranges then the sharding process will complete + broker = self._make_broker(hash_='hash%s%s' % (start_state, deleted)) node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2', 'index': 0} own_sr = broker.get_own_shard_range() - self.assertTrue(own_sr.update_state(state)) - epoch = Timestamp.now() + self.assertTrue(own_sr.update_state(start_state)) + epoch = next(self.ts_iter) own_sr.epoch = epoch shard_ranges = self._make_shard_ranges((('', 'm'), ('m', ''))) broker.merge_shard_ranges([own_sr] + shard_ranges) + if deleted: + broker.delete_db(next(self.ts_iter).internal) with self._mock_sharder() as sharder: with mock.patch.object( - sharder, '_create_shard_containers', return_value=0): - with mock_timestamp_now() as now: + sharder, '_send_shard_ranges', return_value=True): + with mock_timestamp_now_with_iter(self.ts_iter): sharder._audit_container = mock.MagicMock() sharder._process_broker(broker, node, 99) - final_own_sr = broker.get_own_shard_range(no_default=True) - self.assertEqual(dict(own_sr, meta_timestamp=now), - dict(final_own_sr)) - self.assertEqual(SHARDING, broker.get_db_state()) + final_own_sr = broker.get_own_shard_range(no_default=True) + self.assertEqual(SHARDED, broker.get_db_state()) self.assertEqual(epoch.normal, parse_db_filename(broker.db_file)[1]) self.assertFalse(broker.logger.get_lines_for_level('warning')) self.assertFalse(broker.logger.get_lines_for_level('error')) + # self.assertEqual(deleted, broker.is_deleted()) + return own_sr, final_own_sr def test_process_broker_sharding_with_own_shard_range_no_others(self): - self._check_process_broker_sharding_no_others(ShardRange.SHARDING) - self._check_process_broker_sharding_no_others(ShardRange.SHRINKING) + own_sr, final_own_sr = self._check_process_broker_sharding_no_others( + ShardRange.SHARDING, False) + exp_own_sr = dict(own_sr, state=ShardRange.SHARDED, + meta_timestamp=mock.ANY) + self.assertEqual(exp_own_sr, dict(final_own_sr)) + + # verify that deleted DBs will be sharded + own_sr, final_own_sr = self._check_process_broker_sharding_no_others( + ShardRange.SHARDING, True) + exp_own_sr = dict(own_sr, state=ShardRange.SHARDED, + meta_timestamp=mock.ANY) + self.assertEqual(exp_own_sr, dict(final_own_sr)) + + own_sr, final_own_sr = self._check_process_broker_sharding_no_others( + ShardRange.SHRINKING, False) + exp_own_sr = dict(own_sr, state=ShardRange.SHRUNK, + meta_timestamp=mock.ANY) + self.assertEqual(exp_own_sr, dict(final_own_sr)) + + # verify that deleted DBs will be shrunk + own_sr, final_own_sr = self._check_process_broker_sharding_no_others( + ShardRange.SHRINKING, True) + exp_own_sr = dict(own_sr, state=ShardRange.SHRUNK, + meta_timestamp=mock.ANY) + self.assertEqual(exp_own_sr, dict(final_own_sr)) def test_process_broker_not_sharding_others(self): # verify that sharding process will not start when own shard range is @@ -4907,6 +4932,83 @@ class TestSharder(BaseTestSharder): self._check_process_broker_sharding_others(ShardRange.SHRINKING) self._check_process_broker_sharding_others(ShardRange.SHARDED) + def test_process_broker_leader_auto_shard(self): + # verify conditions for acting as auto-shard leader + broker = self._make_broker(put_timestamp=next(self.ts_iter).internal) + objects = [ + ['obj%3d' % i, self.ts_encoded(), i, 'text/plain', + 'etag%s' % i, 0] for i in range(10)] + for obj in objects: + broker.put_object(*obj) + self.assertEqual(10, broker.get_info()['object_count']) + node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2', + 'index': 0} + + def do_process(conf): + with self._mock_sharder(conf) as sharder: + with mock_timestamp_now(): + # we're not testing rest of the process here so prevent any + # attempt to progress shard range states + sharder._create_shard_containers = lambda *args: 0 + sharder._process_broker(broker, node, 99) + + # auto shard disabled + conf = {'shard_container_threshold': 10, + 'rows_per_shard': 5, + 'shrink_threshold': 1, + 'auto_shard': False} + do_process(conf) + self.assertEqual(UNSHARDED, broker.get_db_state()) + own_sr = broker.get_own_shard_range(no_default=True) + self.assertIsNone(own_sr) + + # auto shard enabled, not node 0 + conf['auto_shard'] = True + node['index'] = 1 + do_process(conf) + self.assertEqual(UNSHARDED, broker.get_db_state()) + own_sr = broker.get_own_shard_range(no_default=True) + self.assertIsNone(own_sr) + + # auto shard enabled, node 0 -> start sharding + node['index'] = 0 + do_process(conf) + self.assertEqual(SHARDING, broker.get_db_state()) + own_sr = broker.get_own_shard_range(no_default=True) + self.assertIsNotNone(own_sr) + self.assertEqual(ShardRange.SHARDING, own_sr.state) + self.assertEqual(own_sr.epoch.normal, + parse_db_filename(broker.db_file)[1]) + self.assertEqual(2, len(broker.get_shard_ranges())) + + def test_process_broker_leader_auto_shard_deleted_db(self): + # verify no auto-shard leader if broker is deleted + conf = {'shard_container_threshold': 10, + 'rows_per_shard': 5, + 'shrink_threshold': 1, + 'auto_shard': True} + broker = self._make_broker(put_timestamp=next(self.ts_iter).internal) + broker.delete_db(next(self.ts_iter).internal) + self.assertTrue(broker.is_deleted()) # sanity check + node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2', + 'index': 0} + + with self._mock_sharder(conf) as sharder: + with mock_timestamp_now(): + with mock.patch.object( + sharder, '_find_and_enable_sharding_candidates' + ) as mock_find_and_enable: + sharder._process_broker(broker, node, 99) + + self.assertEqual(UNSHARDED, broker.get_db_state()) + own_sr = broker.get_own_shard_range(no_default=True) + self.assertIsNone(own_sr) + # this is the only concrete assertion that verifies the leader actions + # are not taken; no shard ranges would actually be found for an empty + # deleted db so there's no other way to differentiate from an undeleted + # db being processed... + mock_find_and_enable.assert_not_called() + def check_shard_ranges_sent(self, broker, expected_sent): bodies = [] servers = []