From 38271142eb403a1cdf587616b6d26bfc6b7fb382 Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Thu, 7 Jul 2022 18:29:06 +0100 Subject: [PATCH] sharder: process deleted DBs It is possible for some replicas of a container DB to be sharded, and for the container to then be emptied and deleted, before another replica of the DB has started sharding. Previously, the unsharded replica would remain in the unsharded state because the sharder would not process deleted DBs. This patch modifies the sharder to always process deleted DBs; this will result in the sharder making some additional DB queries for shard ranges in order to determine whether any processing is required for deleted DBs. Auto-sharding will not act on deleted DBs. Change-Id: Ia6ad92042aa9a51e3ddefec0b9b8acae44e9e9d7 --- swift/container/sharder.py | 68 +++++----- test/probe/test_sharder.py | 203 +++++++++++++++++++++++++++- test/unit/container/test_sharder.py | 132 ++++++++++++++++-- 3 files changed, 349 insertions(+), 54 deletions(-) diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 1812de74bb..98ec1bbd3e 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -1004,12 +1004,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): headers=headers, contents=body) except DirectClientException as err: self.logger.warning( - 'Failed to put shard ranges to %s:%s/%s: %s', - node['ip'], node['port'], node['device'], err.http_status) + 'Failed to put shard ranges to %s:%s/%s %s/%s: %s', + node['ip'], node['port'], node['device'], + quote(account), quote(container), err.http_status) except (Exception, Timeout) as err: self.logger.exception( - 'Failed to put shard ranges to %s:%s/%s: %s', - node['ip'], node['port'], node['device'], err) + 'Failed to put shard ranges to %s:%s/%s %s/%s: %s', + node['ip'], node['port'], node['device'], + quote(account), quote(container), err) else: return True return False @@ -2029,8 +2031,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): def _process_broker(self, broker, node, part): broker.get_info() # make sure account/container are populated state = broker.get_db_state() - self.logger.debug('Starting processing %s state %s', - quote(broker.path), state) + is_deleted = broker.is_deleted() + self.logger.debug('Starting processing %s state %s%s', + quote(broker.path), state, + ' (deleted)' if is_deleted else '') if not self._audit_container(broker): return @@ -2038,13 +2042,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # now look and deal with misplaced objects. self._move_misplaced_objects(broker) - if broker.is_deleted(): - # This container is deleted so we can skip it. We still want - # deleted containers to go via misplaced items because they may - # have new objects sitting in them that may need to move. - return - - is_leader = node['index'] == 0 and self.auto_shard + is_leader = node['index'] == 0 and self.auto_shard and not is_deleted if state in (UNSHARDED, COLLAPSED): if is_leader and broker.is_root_container(): # bootstrap sharding of root container @@ -2096,29 +2094,31 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self.logger.debug('Remaining in sharding state %s', quote(broker.path)) - if state == SHARDED and broker.is_root_container(): - # look for shrink stats - self._identify_shrinking_candidate(broker, node) - if is_leader: - self._find_and_enable_shrinking_candidates(broker) - self._find_and_enable_sharding_candidates(broker) - for shard_range in broker.get_shard_ranges( - states=[ShardRange.SHARDING]): - self._send_shard_ranges( - shard_range.account, shard_range.container, - [shard_range]) + if not broker.is_deleted(): + if state == SHARDED and broker.is_root_container(): + # look for shrink stats + self._identify_shrinking_candidate(broker, node) + if is_leader: + self._find_and_enable_shrinking_candidates(broker) + self._find_and_enable_sharding_candidates(broker) + for shard_range in broker.get_shard_ranges( + states=[ShardRange.SHARDING]): + self._send_shard_ranges( + shard_range.account, shard_range.container, + [shard_range]) - if not broker.is_root_container(): - # Update the root container with this container's shard range - # info; do this even when sharded in case previous attempts - # failed; don't do this if there is no own shard range. When - # sharding a shard, this is when the root will see the new - # shards move to ACTIVE state and the sharded shard - # simultaneously become deleted. - self._update_root_container(broker) + if not broker.is_root_container(): + # Update the root container with this container's shard range + # info; do this even when sharded in case previous attempts + # failed; don't do this if there is no own shard range. When + # sharding a shard, this is when the root will see the new + # shards move to ACTIVE state and the sharded shard + # simultaneously become deleted. + self._update_root_container(broker) - self.logger.debug('Finished processing %s state %s', - quote(broker.path), broker.get_db_state()) + self.logger.debug('Finished processing %s state %s%s', + quote(broker.path), broker.get_db_state(), + ' (deleted)' if is_deleted else '') def _one_shard_cycle(self, devices_to_shard, partitions_to_shard): """ diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 22a6d4ed22..46e83556f5 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -367,13 +367,17 @@ class BaseTestContainerSharding(ReplProbeTest): self.fail('No shard sysmeta found in %s' % headers) def assert_container_state(self, node, expected_state, num_shard_ranges, - account=None, container=None, part=None): + account=None, container=None, part=None, + override_deleted=False): account = account or self.account container = container or self.container_to_shard part = part or self.brain.part + headers = {'X-Backend-Record-Type': 'shard'} + if override_deleted: + headers['x-backend-override-deleted'] = True headers, shard_ranges = direct_client.direct_get_container( node, part, account, container, - headers={'X-Backend-Record-Type': 'shard'}) + headers=headers) self.assertEqual(num_shard_ranges, len(shard_ranges)) self.assertIn('X-Backend-Sharding-State', headers) self.assertEqual( @@ -401,12 +405,17 @@ class BaseTestContainerSharding(ReplProbeTest): shard_range.account, shard_range.container) return part, [n['id'] + 1 for n in nodes] - def run_sharders(self, shard_ranges): + def run_sharders(self, shard_ranges, exclude_partitions=None): """Run the sharder on partitions for given shard ranges.""" if not isinstance(shard_ranges, (list, tuple, set)): shard_ranges = (shard_ranges,) - partitions = ','.join(str(self.get_part_and_node_numbers(sr)[0]) - for sr in shard_ranges) + exclude_partitions = exclude_partitions or [] + shard_parts = [] + for sr in shard_ranges: + sr_part = self.get_part_and_node_numbers(sr)[0] + if sr_part not in exclude_partitions: + shard_parts.append(str(sr_part)) + partitions = ','.join(shard_parts) self.sharders.once(additional_args='--partitions=%s' % partitions) def run_sharder_sequentially(self, shard_range=None): @@ -3586,3 +3595,187 @@ class TestManagedContainerSharding(BaseTestContainerSharding): self.assert_container_listing( [obj_names[0], new_obj_name] + obj_names[1:]) + + def test_manage_shard_ranges_unsharded_deleted_root(self): + # verify that a deleted DB will still be sharded + + # choose a node that will not be sharded initially + sharded_nodes = [] + unsharded_node = None + for node in self.brain.nodes: + if self.brain.node_numbers[node['index']] \ + in self.brain.handoff_numbers: + unsharded_node = node + else: + sharded_nodes.append(node) + + # put some objects - not enough to trigger auto-sharding + obj_names = self._make_object_names(MIN_SHARD_CONTAINER_THRESHOLD - 1) + self.put_objects(obj_names) + + # run replicators first time to get sync points set and commit updates + self.replicators.once() + + # setup sharding... + self.assert_subprocess_success([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, sharded_nodes[0]), + 'find_and_replace', '2', '--enable', '--minimum-shard-size', '1']) + + # Run container-replicator to replicate shard ranges + self.container_replicators.once() + self.assert_container_state(sharded_nodes[0], 'unsharded', 2) + self.assert_container_state(sharded_nodes[1], 'unsharded', 2) + self.assert_container_state(unsharded_node, 'unsharded', 2) + + # Run container-sharder to shard the 2 primary replicas that did + # receive the object PUTs + for num in self.brain.primary_numbers: + self.sharders_once( + number=num, + additional_args='--partitions=%s' % self.brain.part) + + # delete the objects - the proxy's will have cached container info with + # out-of-date db_state=unsharded, so updates go to the root DBs + self.delete_objects(obj_names) + # deal with DELETE's being misplaced in root db's... + for num in self.brain.primary_numbers: + self.sharders_once( + number=num, + additional_args='--partitions=%s' % self.brain.part) + + self.assert_container_state(sharded_nodes[0], 'sharded', 2) + self.assert_container_state(sharded_nodes[1], 'sharded', 2) + shard_ranges = self.assert_container_state( + unsharded_node, 'unsharded', 2) + + # get root stats updated - but avoid sharding the remaining root DB + self.run_sharders(shard_ranges, exclude_partitions=[self.brain.part]) + self.assert_container_listing([]) + + # delete the empty container + client.delete_container(self.url, self.admin_token, + self.container_name) + + # sanity check - unsharded DB is deleted + broker = self.get_broker(self.brain.part, unsharded_node, + self.account, self.container_name) + self.assertEqual(UNSHARDED, broker.get_db_state()) + self.assertTrue(broker.is_deleted()) + self.assertEqual(0, broker.get_info()['object_count']) + self.assertEqual(0, broker.get_shard_usage()['object_count']) + + # now shard the final DB + for num in self.brain.handoff_numbers: + self.sharders_once( + number=num, + additional_args='--partitions=%s' % self.brain.part) + + # all DBs should now be sharded and still deleted + for node in self.brain.nodes: + with annotate_failure( + 'node %s in %s' + % (node['index'], [n['index'] for n in self.brain.nodes])): + self.assert_container_state(node, 'sharded', 2, + override_deleted=True) + broker = self.get_broker(self.brain.part, node, + self.account, self.container_name) + self.assertEqual(SHARDED, broker.get_db_state()) + self.assertEqual(0, broker.get_info()['object_count']) + self.assertEqual(0, + broker.get_shard_usage()['object_count']) + self.assertTrue(broker.is_deleted()) + + def test_manage_shard_ranges_unsharded_deleted_root_gets_undeleted(self): + # verify that an apparently deleted DB (no object rows in root db) will + # still be sharded and also become undeleted when objects are + # discovered in the shards + + # choose a node that will not be sharded initially + sharded_nodes = [] + unsharded_node = None + for node in self.brain.nodes: + if self.brain.node_numbers[node['index']] \ + in self.brain.handoff_numbers: + unsharded_node = node + else: + sharded_nodes.append(node) + + # put some objects, but only to 2 replicas - not enough to trigger + # auto-sharding + self.brain.stop_handoff_half() + + obj_names = self._make_object_names(MIN_SHARD_CONTAINER_THRESHOLD - 1) + self.put_objects(obj_names) + # run replicators first time to get sync points set and commit puts + self.replicators.once() + + # setup sharding... + self.assert_subprocess_success([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, sharded_nodes[0]), + 'find_and_replace', '2', '--enable', '--minimum-shard-size', '1']) + + # Run container-replicator to replicate shard ranges - object rows will + # not be sync'd now there are shard ranges + for num in self.brain.primary_numbers: + self.container_replicators.once(number=num) + self.assert_container_state(sharded_nodes[0], 'unsharded', 2) + self.assert_container_state(sharded_nodes[1], 'unsharded', 2) + + # revive the stopped node + self.brain.start_handoff_half() + self.assert_container_state(unsharded_node, 'unsharded', 0) + + # delete the empty replica + direct_client.direct_delete_container( + unsharded_node, self.brain.part, self.account, + self.container_name) + + # Run container-sharder to shard the 2 primary replicas that did + # receive the object PUTs + for num in self.brain.primary_numbers: + self.sharders_once( + number=num, + additional_args='--partitions=%s' % self.brain.part) + + self.assert_container_state(sharded_nodes[0], 'sharded', 2) + self.assert_container_state(sharded_nodes[1], 'sharded', 2) + # the sharder syncs shard ranges ... + self.assert_container_state(unsharded_node, 'unsharded', 2, + override_deleted=True) + + # sanity check - unsharded DB is empty and deleted + broker = self.get_broker(self.brain.part, unsharded_node, + self.account, self.container_name) + self.assertEqual(UNSHARDED, broker.get_db_state()) + self.assertEqual(0, broker.get_info()['object_count']) + # the shard ranges do have object count but are in CREATED state so + # not reported in shard usage... + self.assertEqual(0, broker.get_shard_usage()['object_count']) + self.assertTrue(broker.is_deleted()) + + # now shard the final DB + for num in self.brain.handoff_numbers: + self.sharders_once( + number=num, + additional_args='--partitions=%s' % self.brain.part) + shard_ranges = self.assert_container_state( + unsharded_node, 'sharded', 2, override_deleted=True) + + # and get roots updated and sync'd + self.container_replicators.once() + self.run_sharders(shard_ranges, exclude_partitions=[self.brain.part]) + + # all DBs should now be sharded and NOT deleted + for node in self.brain.nodes: + with annotate_failure( + 'node %s in %s' + % (node['index'], [n['index'] for n in self.brain.nodes])): + broker = self.get_broker(self.brain.part, node, + self.account, self.container_name) + self.assertEqual(SHARDED, broker.get_db_state()) + self.assertEqual(3, broker.get_info()['object_count']) + self.assertEqual(3, + broker.get_shard_usage()['object_count']) + self.assertFalse(broker.is_deleted()) diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index e0beaceb2b..aab8e57218 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -68,7 +68,7 @@ class BaseTestSharder(unittest.TestCase): [dict(sr) for sr in actual]) def _make_broker(self, account='a', container='c', epoch=None, - device='sda', part=0, hash_=None): + device='sda', part=0, hash_=None, put_timestamp=None): hash_ = hash_ or md5( container.encode('utf-8'), usedforsecurity=False).hexdigest() datadir = os.path.join( @@ -81,7 +81,7 @@ class BaseTestSharder(unittest.TestCase): broker = ContainerBroker( db_file, account=account, container=container, logger=self.logger) - broker.initialize() + broker.initialize(put_timestamp=put_timestamp) return broker def _make_old_style_sharding_broker(self, account='a', container='c', @@ -4784,37 +4784,62 @@ class TestSharder(BaseTestSharder): self.assertFalse(broker.logger.get_lines_for_level('error')) broker.logger.clear() - def _check_process_broker_sharding_no_others(self, state): + def _check_process_broker_sharding_no_others(self, start_state, deleted): # verify that when existing own_shard_range has given state and there - # are other shard ranges then the sharding process will begin - broker = self._make_broker(hash_='hash%s' % state) + # are other shard ranges then the sharding process will complete + broker = self._make_broker(hash_='hash%s%s' % (start_state, deleted)) node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2', 'index': 0} own_sr = broker.get_own_shard_range() - self.assertTrue(own_sr.update_state(state)) - epoch = Timestamp.now() + self.assertTrue(own_sr.update_state(start_state)) + epoch = next(self.ts_iter) own_sr.epoch = epoch shard_ranges = self._make_shard_ranges((('', 'm'), ('m', ''))) broker.merge_shard_ranges([own_sr] + shard_ranges) + if deleted: + broker.delete_db(next(self.ts_iter).internal) with self._mock_sharder() as sharder: with mock.patch.object( - sharder, '_create_shard_containers', return_value=0): - with mock_timestamp_now() as now: + sharder, '_send_shard_ranges', return_value=True): + with mock_timestamp_now_with_iter(self.ts_iter): sharder._audit_container = mock.MagicMock() sharder._process_broker(broker, node, 99) - final_own_sr = broker.get_own_shard_range(no_default=True) - self.assertEqual(dict(own_sr, meta_timestamp=now), - dict(final_own_sr)) - self.assertEqual(SHARDING, broker.get_db_state()) + final_own_sr = broker.get_own_shard_range(no_default=True) + self.assertEqual(SHARDED, broker.get_db_state()) self.assertEqual(epoch.normal, parse_db_filename(broker.db_file)[1]) self.assertFalse(broker.logger.get_lines_for_level('warning')) self.assertFalse(broker.logger.get_lines_for_level('error')) + # self.assertEqual(deleted, broker.is_deleted()) + return own_sr, final_own_sr def test_process_broker_sharding_with_own_shard_range_no_others(self): - self._check_process_broker_sharding_no_others(ShardRange.SHARDING) - self._check_process_broker_sharding_no_others(ShardRange.SHRINKING) + own_sr, final_own_sr = self._check_process_broker_sharding_no_others( + ShardRange.SHARDING, False) + exp_own_sr = dict(own_sr, state=ShardRange.SHARDED, + meta_timestamp=mock.ANY) + self.assertEqual(exp_own_sr, dict(final_own_sr)) + + # verify that deleted DBs will be sharded + own_sr, final_own_sr = self._check_process_broker_sharding_no_others( + ShardRange.SHARDING, True) + exp_own_sr = dict(own_sr, state=ShardRange.SHARDED, + meta_timestamp=mock.ANY) + self.assertEqual(exp_own_sr, dict(final_own_sr)) + + own_sr, final_own_sr = self._check_process_broker_sharding_no_others( + ShardRange.SHRINKING, False) + exp_own_sr = dict(own_sr, state=ShardRange.SHRUNK, + meta_timestamp=mock.ANY) + self.assertEqual(exp_own_sr, dict(final_own_sr)) + + # verify that deleted DBs will be shrunk + own_sr, final_own_sr = self._check_process_broker_sharding_no_others( + ShardRange.SHRINKING, True) + exp_own_sr = dict(own_sr, state=ShardRange.SHRUNK, + meta_timestamp=mock.ANY) + self.assertEqual(exp_own_sr, dict(final_own_sr)) def test_process_broker_not_sharding_others(self): # verify that sharding process will not start when own shard range is @@ -4907,6 +4932,83 @@ class TestSharder(BaseTestSharder): self._check_process_broker_sharding_others(ShardRange.SHRINKING) self._check_process_broker_sharding_others(ShardRange.SHARDED) + def test_process_broker_leader_auto_shard(self): + # verify conditions for acting as auto-shard leader + broker = self._make_broker(put_timestamp=next(self.ts_iter).internal) + objects = [ + ['obj%3d' % i, self.ts_encoded(), i, 'text/plain', + 'etag%s' % i, 0] for i in range(10)] + for obj in objects: + broker.put_object(*obj) + self.assertEqual(10, broker.get_info()['object_count']) + node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2', + 'index': 0} + + def do_process(conf): + with self._mock_sharder(conf) as sharder: + with mock_timestamp_now(): + # we're not testing rest of the process here so prevent any + # attempt to progress shard range states + sharder._create_shard_containers = lambda *args: 0 + sharder._process_broker(broker, node, 99) + + # auto shard disabled + conf = {'shard_container_threshold': 10, + 'rows_per_shard': 5, + 'shrink_threshold': 1, + 'auto_shard': False} + do_process(conf) + self.assertEqual(UNSHARDED, broker.get_db_state()) + own_sr = broker.get_own_shard_range(no_default=True) + self.assertIsNone(own_sr) + + # auto shard enabled, not node 0 + conf['auto_shard'] = True + node['index'] = 1 + do_process(conf) + self.assertEqual(UNSHARDED, broker.get_db_state()) + own_sr = broker.get_own_shard_range(no_default=True) + self.assertIsNone(own_sr) + + # auto shard enabled, node 0 -> start sharding + node['index'] = 0 + do_process(conf) + self.assertEqual(SHARDING, broker.get_db_state()) + own_sr = broker.get_own_shard_range(no_default=True) + self.assertIsNotNone(own_sr) + self.assertEqual(ShardRange.SHARDING, own_sr.state) + self.assertEqual(own_sr.epoch.normal, + parse_db_filename(broker.db_file)[1]) + self.assertEqual(2, len(broker.get_shard_ranges())) + + def test_process_broker_leader_auto_shard_deleted_db(self): + # verify no auto-shard leader if broker is deleted + conf = {'shard_container_threshold': 10, + 'rows_per_shard': 5, + 'shrink_threshold': 1, + 'auto_shard': True} + broker = self._make_broker(put_timestamp=next(self.ts_iter).internal) + broker.delete_db(next(self.ts_iter).internal) + self.assertTrue(broker.is_deleted()) # sanity check + node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2', + 'index': 0} + + with self._mock_sharder(conf) as sharder: + with mock_timestamp_now(): + with mock.patch.object( + sharder, '_find_and_enable_sharding_candidates' + ) as mock_find_and_enable: + sharder._process_broker(broker, node, 99) + + self.assertEqual(UNSHARDED, broker.get_db_state()) + own_sr = broker.get_own_shard_range(no_default=True) + self.assertIsNone(own_sr) + # this is the only concrete assertion that verifies the leader actions + # are not taken; no shard ranges would actually be found for an empty + # deleted db so there's no other way to differentiate from an undeleted + # db being processed... + mock_find_and_enable.assert_not_called() + def check_shard_ranges_sent(self, broker, expected_sent): bodies = [] servers = []