From 8227f4539cbdf4b9b59def8a344084796622248c Mon Sep 17 00:00:00 2001 From: Matthew Oliver Date: Wed, 8 Sep 2021 16:29:30 +1000 Subject: [PATCH] sharding: don't replace own_shard_range without an epoch We've observed a root container suddenly thinks it's unsharded when it's own_shard_range is reset. This patch blocks a remote osr with an epoch of None from overwriting a local epoched OSR. The only way we've observed this happen is when a new replica or handoff node creates a container and it's new own_shard_range is created without an epoch and then replicated to older primaries. However, if a bad node with a non-epoched OSR is on a primary, it's newer timestamp would prevent pulling the good osr from it's peers. So it'll be left stuck with it's bad one. When this happens expect to see a bunch of: Ignoring remote osr w/o epoch: x, from: y When an OSR comes in from a replica that doesn't have an epoch when it should, we do a pre-flight check to see if it would remove the epoch before emitting the error above. We do this because when sharding is first initiated it's perfectly valid to get OSR's without epochs from replicas. This is expected and harmless. Closes-bug: #1980451 Change-Id: I069bdbeb430e89074605e40525d955b3a704a44f --- swift/container/replicator.py | 48 ++++- test/probe/test_sharder.py | 239 ++++++++++++++++++++++++- test/unit/container/test_backend.py | 182 ++++++++++++++++++- test/unit/container/test_replicator.py | 125 +++++++++++++ 4 files changed, 576 insertions(+), 18 deletions(-) diff --git a/swift/container/replicator.py b/swift/container/replicator.py index 07c6f8bd36..411d297736 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -20,7 +20,8 @@ from eventlet import Timeout from random import choice from swift.container.sync_store import ContainerSyncStore -from swift.container.backend import ContainerBroker, DATADIR, SHARDED +from swift.container.backend import ContainerBroker, DATADIR, SHARDED, \ + merge_shards from swift.container.reconciler import ( MISPLACED_OBJECTS_ACCOUNT, incorrect_policy_index, get_reconciler_container_name, get_row_to_q_entry_translator) @@ -31,6 +32,35 @@ from swift.common.http import is_success from swift.common.utils import Timestamp, majority_size, get_db_files +def check_merge_own_shard_range(shards, broker, logger, source): + """ + If broker has own_shard_range *with an epoch* then filter out an + own_shard_range *without an epoch*, and log a warning about it. + + :param shards: a list of candidate ShardRanges to merge + :param broker: a ContainerBroker + :param logger: a logger + :param source: string to log as source of shards + :return: a list of ShardRanges to actually merge + """ + # work-around for https://bugs.launchpad.net/swift/+bug/1980451 + own_sr = broker.get_own_shard_range() + if own_sr.epoch is None: + return shards + to_merge = [] + for shard in shards: + if shard['name'] == own_sr.name and not shard['epoch']: + shard_copy = dict(shard) + new_content = merge_shards(shard_copy, dict(own_sr)) + if new_content and shard_copy['epoch'] is None: + logger.warning( + 'Ignoring remote osr w/o epoch, own_sr: %r, remote_sr: %r,' + ' source: %s', dict(own_sr), shard, source) + continue + to_merge.append(shard) + return to_merge + + class ContainerReplicator(db_replicator.Replicator): server_type = 'container' brokerclass = ContainerBroker @@ -138,8 +168,10 @@ class ContainerReplicator(db_replicator.Replicator): with Timeout(self.node_timeout): response = http.replicate('get_shard_ranges') if response and is_success(response.status): - broker.merge_shard_ranges(json.loads( - response.data.decode('ascii'))) + shards = json.loads(response.data.decode('ascii')) + shards = check_merge_own_shard_range( + shards, broker, self.logger, '%s%s' % (http.host, http.path)) + broker.merge_shard_ranges(shards) def find_local_handoff_for_part(self, part): """ @@ -394,11 +426,15 @@ class ContainerReplicatorRpc(db_replicator.ReplicatorRpc): def _post_rsync_then_merge_hook(self, existing_broker, new_broker): # Note the following hook will need to change to using a pointer and # limit in the future. - new_broker.merge_shard_ranges( - existing_broker.get_all_shard_range_data()) + shards = existing_broker.get_all_shard_range_data() + shards = check_merge_own_shard_range( + shards, new_broker, self.logger, 'rsync') + new_broker.merge_shard_ranges(shards) def merge_shard_ranges(self, broker, args): - broker.merge_shard_ranges(args[0]) + shards = check_merge_own_shard_range( + args[0], broker, self.logger, 'repl_req') + broker.merge_shard_ranges(shards) return HTTPAccepted() def get_shard_ranges(self, broker, args): diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 14a83746e6..dc1a5b54da 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -28,8 +28,8 @@ from swift.common.header_key_dict import HeaderKeyDict from swift.common.internal_client import UnexpectedResponse from swift.common.manager import Manager from swift.common.memcached import MemcacheRing -from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \ - quorum_size, config_true_value, Timestamp, md5, Namespace +from swift.common.utils import ShardRange, parse_db_filename, quorum_size, \ + config_true_value, Timestamp, md5, Namespace from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \ SHARDED from swift.container.sharder import CleavingContext, ContainerSharder @@ -244,9 +244,10 @@ class BaseTestContainerSharding(ReplProbeTest): def get_db_file(self, part, node, account=None, container=None): container_dir, container_hash = self.get_storage_dir( part, node, account=account, container=container) - db_file = os.path.join(container_dir, container_hash + '.db') - self.assertTrue(get_db_files(db_file)) # sanity check - return db_file + for f in os.listdir(container_dir): + path = os.path.join(container_dir, f) + if path.endswith('.db'): + return path def get_broker(self, part, node, account=None, container=None): return ContainerBroker( @@ -259,10 +260,13 @@ class BaseTestContainerSharding(ReplProbeTest): shard_part, shard_nodes[node_index], shard_range.account, shard_range.container) - def categorize_container_dir_content(self, account=None, container=None): + def categorize_container_dir_content(self, account=None, container=None, + more_nodes=False): account = account or self.brain.account container = container or self.container_name part, nodes = self.brain.ring.get_nodes(account, container) + if more_nodes: + nodes.extend(self.brain.ring.get_more_nodes(part)) storage_dirs = [ self.get_storage_dir(part, node, account=account, container=container)[0] @@ -4050,6 +4054,229 @@ class TestManagedContainerSharding(BaseTestContainerSharding): broker.get_shard_usage()['object_count']) self.assertFalse(broker.is_deleted()) + def test_handoff_replication_does_not_cause_reset_epoch(self): + obj_names = self._make_object_names(100) + self.put_objects(obj_names) + + client.post_container(self.url, self.admin_token, self.container_name, + headers={'X-Container-Sharding': 'on'}) + + # run replicators first time to get sync points set + self.replicators.once() + + # sanity check: we don't have nearly enough objects for this to shard + # automatically + self.sharders_once_non_auto( + number=self.brain.node_numbers[0], + additional_args='--partitions=%s' % self.brain.part) + self.assert_container_state(self.brain.nodes[0], 'unsharded', 0) + + self.assert_subprocess_success([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, self.brain.nodes[0]), + 'find_and_replace', '50', '--enable', + '--minimum-shard-size', '40']) + self.assert_container_state(self.brain.nodes[0], 'unsharded', 2) + + # "Run container-replicator to replicate them to other nodes." + self.replicators.once() + # "Run container-sharder on all nodes to shard the container." + self.sharders_once_non_auto( + additional_args='--partitions=%s' % self.brain.part) + + # Everybody's settled + self.assert_container_state(self.brain.nodes[0], 'sharded', 2) + self.assert_container_state(self.brain.nodes[1], 'sharded', 2) + self.assert_container_state(self.brain.nodes[2], 'sharded', 2) + self.assert_container_listing(obj_names) + + # now lets put the container again and make sure it lands on a handoff + self.brain.stop_primary_half() + self.brain.put_container(policy_index=int(self.policy)) + self.brain.start_primary_half() + + dir_content = self.categorize_container_dir_content(more_nodes=True) + # the handoff node is considered normal because it doesn't have an + # epoch + self.assertEqual(len(dir_content['normal_dbs']), 1) + self.assertEqual(len(dir_content['shard_dbs']), 3) + + # let's replicate + self.replicators.once() + self.sharders_once_non_auto( + additional_args='--partitions=%s' % self.brain.part) + + # let's now check the handoff broker it should have all the shards + handoff_broker = ContainerBroker(dir_content['normal_dbs'][0]) + self.assertEqual(len(handoff_broker.get_shard_ranges()), 2) + handoff_osr = handoff_broker.get_own_shard_range(no_default=True) + self.assertIsNotNone(handoff_osr.epoch) + + def test_force_replication_of_a_reset_own_shard_range(self): + obj_names = self._make_object_names(100) + self.put_objects(obj_names) + + client.post_container(self.url, self.admin_token, self.container_name, + headers={'X-Container-Sharding': 'on'}) + + # run replicators first time to get sync points set + self.replicators.once() + + # sanity check: we don't have nearly enough objects for this to shard + # automatically + self.sharders_once_non_auto( + number=self.brain.node_numbers[0], + additional_args='--partitions=%s' % self.brain.part) + self.assert_container_state(self.brain.nodes[0], 'unsharded', 0) + + self.assert_subprocess_success([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, self.brain.nodes[0]), + 'find_and_replace', '50', '--enable', + '--minimum-shard-size', '40']) + self.assert_container_state(self.brain.nodes[0], 'unsharded', 2) + + # "Run container-replicator to replicate them to other nodes." + self.replicators.once() + # "Run container-sharder on all nodes to shard the container." + self.sharders_once_non_auto( + additional_args='--partitions=%s' % self.brain.part) + + # Everybody's settled + self.assert_container_state(self.brain.nodes[0], 'sharded', 2) + self.assert_container_state(self.brain.nodes[1], 'sharded', 2) + self.assert_container_state(self.brain.nodes[2], 'sharded', 2) + self.assert_container_listing(obj_names) + + # Lets delete a primary to simulate a new primary and force an + # own_shard_range reset. + new_primary = self.brain.nodes[2] + db_file = self.get_db_file(self.brain.part, new_primary) + os.remove(db_file) + + # issue a new PUT to create the "new" primary container + self.brain.put_container(policy_index=int(self.policy)) + + # put a bunch of objects that should land in the primary so it'll be + # shardable (in case this makes any kind of difference). + self.put_objects(obj_names) + + # The new primary isn't considered a shard_db because it hasn't + # sunk with the other primaries yet. + dir_content = self.categorize_container_dir_content() + self.assertEqual(len(dir_content['normal_dbs']), 1) + self.assertEqual(len(dir_content['shard_dbs']), 2) + + # run the sharders incase this will trigger a reset osr + self.sharders_once_non_auto( + additional_args='--partitions=%s' % self.brain.part) + new_primary_broker = self.get_broker(self.brain.part, new_primary) + # Nope, still no default/reset osr + self.assertIsNone( + new_primary_broker.get_own_shard_range(no_default=True)) + + # Let's reset the osr by hand. + reset_osr = new_primary_broker.get_own_shard_range() + self.assertIsNone(reset_osr.epoch) + self.assertEqual(reset_osr.state, ShardRange.ACTIVE) + new_primary_broker.merge_shard_ranges(reset_osr) + + # now let's replicate with the old primaries + self.replicators.once() + # Pull an old primary own_shard_range + dir_content = self.categorize_container_dir_content() + old_broker = ContainerBroker(dir_content['shard_dbs'][0]) + old_osr = old_broker.get_own_shard_range() + new_primary_broker = ContainerBroker(dir_content['normal_dbs'][0]) + new_osr = new_primary_broker.get_own_shard_range() + + # This version stops replicating a remote non-epoch osr over a local + # epoched osr. But it doesn't do the other way. So it means the + # primary with non-epoched OSR get's stuck with it, if it is newer then + # the other epoched versions. + self.assertIsNotNone(old_osr.epoch) + self.assertEqual(old_osr.state, ShardRange.SHARDED) + + self.assertIsNone(new_osr.epoch) + self.assertGreater(new_osr.timestamp, old_osr.timestamp) + + def test_manage_shard_ranges_missing_epoch_no_false_positives(self): + # when one replica of a shard is sharding before the others, it's epoch + # is not None but it is normal for the other replica to replicate to it + # sending their own shard ranges with epoch=None until they also shard + obj_names = self._make_object_names(4) + self.put_objects(obj_names) + client.post_container(self.url, self.admin_token, self.container_name, + headers={'X-Container-Sharding': 'on'}) + # run replicators first time to get sync points set, and get container + # sharded into 4 shards + self.replicators.once() + self.assert_subprocess_success([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, self.brain.nodes[0]), + 'find_and_replace', '2', '--enable']) + ranges = self.assert_container_state( + self.brain.nodes[0], 'unsharded', 2) + + # "Run container-replicator to replicate them to other nodes." + self.replicators.once() + # "Run container-sharder on all nodes to shard the container." + self.sharders_once_non_auto( + additional_args='--partitions=%s' % self.brain.part) + # Run them again, just so the shards themselves can pull down the + # latest sharded versions of their OSRs. + self.sharders_once_non_auto() + + # Everybody's settled + self.assert_container_state(self.brain.nodes[0], 'sharded', 2) + self.assert_container_state(self.brain.nodes[1], 'sharded', 2) + ranges = self.assert_container_state(self.brain.nodes[2], 'sharded', 2) + self.assert_container_listing(obj_names) + + # Now we need to shard a shard. A shard's OSR always exist and should + # have an epoch of None, so we should get some false positives. + # we'll shard ranges[1] which have a range of objs-0002 - MAX + shard_obj_names = ['objs-0001%d' % i for i in range(2)] + self.put_objects(shard_obj_names) + + part, shard_node_numbers = self.get_part_and_node_numbers(ranges[1]) + shard_nodes = self.brain.ring.get_part_nodes(part) + shard_broker = self.get_shard_broker(ranges[1], 0) + # set the account, container instance variables + shard_broker.get_info() + self.replicators.once() + self.assert_subprocess_success([ + 'swift-manage-shard-ranges', + shard_broker.db_file, + 'find_and_replace', '2', '--enable']) + self.assert_container_state( + shard_nodes[0], 'unsharded', 2, + shard_broker.account, shard_broker.container, part) + + # index 0 has an epoch now but 1 and 2 don't + for idx in 1, 2: + sb = self.get_shard_broker(ranges[1], idx) + osr = sb.get_own_shard_range(no_default=True) + self.assertIsNone(osr.epoch) + + expected_false_positive_line_snippet = 'Ignoring remote osr w/o epoch:' + # run the replicator on the node with an epoch and it'll complain the + # others dont have an epoch and not set it. + replicator = self.run_custom_daemon( + ContainerReplicator, 'container-replicator', + shard_node_numbers[0], {}) + warnings = replicator.logger.get_lines_for_level('warning') + + self.assertFalse([w for w in warnings + if expected_false_positive_line_snippet in w]) + + # But it does send the new OSR with an epoch so the others should all + # have it now. + for idx in 1, 2: + sb = self.get_shard_broker(ranges[1], idx) + osr = sb.get_own_shard_range(no_default=True) + self.assertIsNotNone(osr.epoch) + def test_manage_shard_ranges_deleted_child_and_parent_gap(self): # Test to produce a scenario where a parent container is stuck at # sharding because of a gap in shard ranges. And the gap is caused by diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py index 7a14e88251..097f711dec 100644 --- a/test/unit/container/test_backend.py +++ b/test/unit/container/test_backend.py @@ -36,7 +36,8 @@ import six from swift.common.exceptions import LockTimeout from swift.container.backend import ContainerBroker, \ update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \ - COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES, sift_shard_ranges + COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES, sift_shard_ranges, \ + merge_shards from swift.common.db import DatabaseAlreadyExists, GreenDBConnection, \ TombstoneReclaimer, GreenDBCursor from swift.common.request_helpers import get_reserved_name @@ -6976,11 +6977,180 @@ class TestUpdateNewItemFromExisting(unittest.TestCase): class TestModuleFunctions(unittest.TestCase): + def setUp(self): + super(TestModuleFunctions, self).setUp() + self.ts_iter = make_timestamp_iter() + self.ts = [next(self.ts_iter).internal for _ in range(10)] + + def test_merge_shards_existing_none(self): + data = dict(ShardRange('a/o', self.ts[1]), reported=True) + exp_data = dict(data) + self.assertTrue(merge_shards(data, None)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_lt(self): + existing = dict(ShardRange('a/o', self.ts[0])) + data = dict(ShardRange('a/o', self.ts[1]), reported=True) + exp_data = dict(data, reported=False) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_gt(self): + existing = dict(ShardRange('a/o', self.ts[1])) + data = dict(ShardRange('a/o', self.ts[0]), reported=True) + exp_data = dict(data) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + # existing timestamp trumps data state_timestamp + data = dict(ShardRange('a/o', self.ts[0]), state=ShardRange.ACTIVE, + state_timestamp=self.ts[2]) + exp_data = dict(data) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + # existing timestamp trumps data meta_timestamp + data = dict(ShardRange('a/o', self.ts[0]), state=ShardRange.ACTIVE, + meta_timestamp=self.ts[2]) + exp_data = dict(data) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_merge_reported(self): + existing = dict(ShardRange('a/o', self.ts[0])) + data = dict(ShardRange('a/o', self.ts[0]), reported=False) + exp_data = dict(data) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + data = dict(ShardRange('a/o', self.ts[0]), reported=True) + exp_data = dict(data) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_retain_bounds(self): + existing = dict(ShardRange('a/o', self.ts[0])) + data = dict(ShardRange('a/o', self.ts[0]), lower='l', upper='u') + exp_data = dict(data, lower='', upper='') + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_retain_deleted(self): + existing = dict(ShardRange('a/o', self.ts[0])) + data = dict(ShardRange('a/o', self.ts[0]), deleted=1) + exp_data = dict(data, deleted=0) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_meta_ts_gte(self): + existing = dict( + ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[1], + object_count=1, bytes_used=2, tombstones=3)) + data = dict( + ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[1], + object_count=10, bytes_used=20, tombstones=30)) + exp_data = dict(data, object_count=1, bytes_used=2, tombstones=3) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + existing = dict( + ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[2], + object_count=1, bytes_used=2, tombstones=3)) + exp_data = dict(data, object_count=1, bytes_used=2, tombstones=3, + meta_timestamp=self.ts[2]) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_meta_ts_lt(self): + existing = dict( + ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[1], + object_count=1, bytes_used=2, tombstones=3, + epoch=self.ts[3])) + data = dict( + ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[2], + object_count=10, bytes_used=20, tombstones=30, + epoch=None)) + exp_data = dict(data, epoch=self.ts[3]) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_state_ts_eq(self): + # data has more advanced state + existing = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.CREATED, epoch=self.ts[4])) + data = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.ACTIVE, epoch=self.ts[5])) + exp_data = dict(data) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + # data has less advanced state + existing = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.CREATED, epoch=self.ts[4])) + data = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.FOUND, epoch=self.ts[5])) + exp_data = dict(data, state=ShardRange.CREATED, epoch=self.ts[4]) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_state_ts_gt(self): + existing = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[2], + state=ShardRange.CREATED, epoch=self.ts[4])) + data = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.ACTIVE, epoch=self.ts[5])) + exp_data = dict(data, state_timestamp=self.ts[2], + state=ShardRange.CREATED, epoch=self.ts[4]) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_state_ts_lt(self): + existing = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[0], + state=ShardRange.CREATED, epoch=self.ts[4])) + data = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.ACTIVE, epoch=self.ts[5])) + exp_data = dict(data) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_epoch_reset(self): + # not sure if these scenarios are realistic, but we have seen epoch + # resets in prod + # same timestamps, data has more advanced state but no epoch + existing = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.CREATED, epoch=self.ts[4])) + data = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.ACTIVE, epoch=None)) + exp_data = dict(data) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + self.assertIsNone(exp_data['epoch']) + + # data has more advanced state_timestamp but no epoch + existing = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.CREATED, epoch=self.ts[4])) + data = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[2], + state=ShardRange.FOUND, epoch=None)) + exp_data = dict(data) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + self.assertIsNone(exp_data['epoch']) + def test_sift_shard_ranges(self): - ts_iter = make_timestamp_iter() existing_shards = {} - sr1 = dict(ShardRange('a/o', next(ts_iter).internal)) - sr2 = dict(ShardRange('a/o2', next(ts_iter).internal)) + sr1 = dict(ShardRange('a/o', next(self.ts_iter).internal)) + sr2 = dict(ShardRange('a/o2', next(self.ts_iter).internal)) new_shard_ranges = [sr1, sr2] # first empty existing shards will just add the shards @@ -6994,7 +7164,7 @@ class TestModuleFunctions(unittest.TestCase): # if there is a newer version in the existing shards then it won't be # added to to_add existing_shards['a/o'] = dict( - ShardRange('a/o', next(ts_iter).internal)) + ShardRange('a/o', next(self.ts_iter).internal)) to_add, to_delete = sift_shard_ranges(new_shard_ranges, existing_shards) self.assertEqual([sr2], list(to_add)) @@ -7002,7 +7172,7 @@ class TestModuleFunctions(unittest.TestCase): # But if a newer version is in new_shard_ranges then the old will be # added to to_delete and new is added to to_add. - sr1['timestamp'] = next(ts_iter).internal + sr1['timestamp'] = next(self.ts_iter).internal to_add, to_delete = sift_shard_ranges(new_shard_ranges, existing_shards) self.assertEqual(2, len(to_add)) diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index 2cb139dc3c..4c84fcb9dd 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -32,6 +32,7 @@ from swift.container.reconciler import ( from swift.common.utils import Timestamp, encode_timestamps, ShardRange, \ get_db_files, make_db_file_path from swift.common.storage_policy import POLICIES +from test import annotate_failure from test.debug_logger import debug_logger from test.unit.common import test_db_replicator @@ -1432,6 +1433,130 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): daemon.logger.get_lines_for_level('debug')) daemon.logger.clear() + def test_sync_shard_ranges_merge_remote_osr(self): + def do_test(local_osr, remote_osr, exp_merge, exp_warning, + exp_rpc_warning): + put_timestamp = Timestamp.now().internal + # create "local" broker + broker = self._get_broker('a', 'c', node_index=0) + broker.initialize(put_timestamp, POLICIES.default.idx) + # create "remote" broker + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_broker.initialize(put_timestamp, POLICIES.default.idx) + + bounds = (('', 'g'), ('g', 'r'), ('r', '')) + shard_ranges = [ + ShardRange('.shards_a/sr-%s' % upper, Timestamp.now(), lower, + upper, i + 1, 10 * (i + 1)) + for i, (lower, upper) in enumerate(bounds) + ] + + for db in (broker, remote_broker): + db.merge_shard_ranges(shard_ranges) + + if local_osr: + broker.merge_shard_ranges(ShardRange(**dict(local_osr))) + if remote_osr: + remote_broker.merge_shard_ranges( + ShardRange(**dict(remote_osr))) + + daemon = replicator.ContainerReplicator({}, logger=debug_logger()) + part, remote_node = self._get_broker_part_node(remote_broker) + part, local_node = self._get_broker_part_node(broker) + info = broker.get_replication_info() + success = daemon._repl_to_node(remote_node, broker, part, info) + self.assertTrue(success) + local_info = self._get_broker( + 'a', 'c', node_index=0).get_info() + remote_info = self._get_broker( + 'a', 'c', node_index=1).get_info() + for k, v in local_info.items(): + if k == 'id': + continue + self.assertEqual(remote_info[k], v, + "mismatch remote %s %r != %r" % ( + k, remote_info[k], v)) + actual_osr = broker.get_own_shard_range(no_default=True) + actual_osr = dict(actual_osr) if actual_osr else actual_osr + if exp_merge: + exp_osr = (dict(remote_osr, meta_timestamp=mock.ANY) + if remote_osr else remote_osr) + else: + exp_osr = (dict(local_osr, meta_timestamp=mock.ANY) + if local_osr else local_osr) + self.assertEqual(exp_osr, actual_osr) + lines = daemon.logger.get_lines_for_level('warning') + if exp_warning: + self.assertEqual(len(lines), 1, lines) + self.assertIn("Ignoring remote osr w/o epoch", lines[0]) + self.assertIn("own_sr: ", lines[0]) + self.assertIn("'epoch': '%s'" % local_osr.epoch.normal, + lines[0]) + self.assertIn("remote_sr: ", lines[0]) + self.assertIn("'epoch': None", lines[0]) + hash_ = os.path.splitext(os.path.basename(broker.db_file))[0] + url = "%s/%s/%s/%s" % ( + remote_node['ip'], remote_node['device'], part, hash_) + self.assertIn("source: %s" % url, lines[0]) + else: + self.assertFalse(lines) + lines = self.rpc.logger.get_lines_for_level('warning') + if exp_rpc_warning: + self.assertEqual(len(lines), 1, lines) + self.assertIn("Ignoring remote osr w/o epoch", lines[0]) + self.assertIn("source: repl_req", lines[0]) + else: + self.assertFalse(lines) + + os.remove(broker.db_file) + os.remove(remote_broker.db_file) + return daemon + + # we'll use other broker as a template to use the "default" osrs + other_broker = self._get_broker('a', 'c', node_index=2) + other_broker.initialize(Timestamp.now().internal, POLICIES.default.idx) + default_osr = other_broker.get_own_shard_range() + self.assertIsNone(default_osr.epoch) + osr_with_epoch = other_broker.get_own_shard_range() + osr_with_epoch.epoch = Timestamp.now() + osr_with_different_epoch = other_broker.get_own_shard_range() + osr_with_different_epoch.epoch = Timestamp.now() + default_osr_newer = ShardRange(**dict(default_osr)) + default_osr_newer.timestamp = Timestamp.now() + + # local_osr, remote_osr, exp_merge, exp_warning, exp_rpc_warning + tests = ( + # First the None case, ie no osrs + (None, None, False, False, False), + # Default and not the other + (None, default_osr, True, False, False), + (default_osr, None, False, False, False), + (default_osr, default_osr, True, False, False), + (default_osr, None, False, False, False), + # With an epoch and no OSR is also fine + (None, osr_with_epoch, True, False, False), + (osr_with_epoch, None, False, False, False), + # even with the same or different epochs + (osr_with_epoch, osr_with_epoch, True, False, False), + (osr_with_epoch, osr_with_different_epoch, True, False, False), + # But if local does have an epoch but the remote doesn't: false + # positive, nothing will merge anyway, no warning. + (osr_with_epoch, default_osr, False, False, False), + # It's also OK if the remote has an epoch but not the local, + # this also works on the RPC side because merge_shards happen on + # to local then sends updated shards to the remote. So if the + # OSR on the remote is newer then the default the RPC side will + # actually get a merged OSR, ie get the remote one back. + (default_osr, osr_with_epoch, True, False, False), + # But if the local default is newer then the epoched remote side + # we'd get an error logged on the RPC side and the local is newer + # so wil fail to merge + (default_osr_newer, osr_with_epoch, False, False, True), + ) + for i, params in enumerate(tests): + with annotate_failure((i, params)): + do_test(*params) + def test_sync_shard_ranges(self): put_timestamp = Timestamp.now().internal # create "local" broker