From 4c4bd778ea8fe8d02a2892524c7918da0ca25ea9 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Wed, 3 Jul 2019 09:52:41 -0700 Subject: [PATCH] container-replicator: Add a timeout for get_shard_ranges Previously this had no timeout, which meant that the replicator might hang and fail to make progress indefinitely while trying to receive shard ranges. While we're at it, only call get_shard_ranges when the remote indicates that it has shard ranges for us to sync -- this reduces the number of requests necessary to bring unsharded replicas in sync. Change-Id: I32f51f42d76db38271442a261600089404a00f91 Closes-Bug: #1835260 --- swift/common/db_replicator.py | 2 +- swift/container/replicator.py | 5 +- test/unit/container/test_replicator.py | 110 ++++++++++++++++++++++--- 3 files changed, 103 insertions(+), 14 deletions(-) diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index a5698a6d51..8f15527ff8 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -563,7 +563,7 @@ class Replicator(Daemon): :param partition: partition to be replicated to :param object_file: DB file name to be replicated - :param node_id: node id of the node to be replicated to + :param node_id: node id of the node to be replicated from :returns: a tuple (success, responses). ``success`` is a boolean that is True if the method completed successfully, False otherwise. ``responses`` is a list of booleans each of which indicates the diff --git a/swift/container/replicator.py b/swift/container/replicator.py index 2ffa7c3b28..b1c3f165d6 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -78,7 +78,7 @@ class ContainerReplicator(db_replicator.Replicator): broker.merge_timestamps(*(remote_info[key] for key in sync_timestamps)) - if 'shard_max_row' in remote_info: + if remote_info.get('shard_max_row', -1) >= 0: # Grab remote's shard ranges, too self._fetch_and_merge_shard_ranges(http, broker) @@ -136,7 +136,8 @@ class ContainerReplicator(db_replicator.Replicator): return shard_range_success and success def _fetch_and_merge_shard_ranges(self, http, broker): - response = http.replicate('get_shard_ranges') + with Timeout(self.node_timeout): + response = http.replicate('get_shard_ranges') if is_success(response.status): broker.merge_shard_ranges(json.loads( response.data.decode('ascii'))) diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index b2794d18a1..7548ab2f56 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -22,6 +22,8 @@ import mock import random import sqlite3 +from eventlet import sleep + from swift.common import db_replicator from swift.common.swob import HTTPServerError from swift.container import replicator, backend, server, sync_store @@ -1426,7 +1428,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self.assertFalse(success) # broker only has its own shard range so expect objects to be sync'd self.assertEqual( - ['sync', 'get_shard_ranges', 'merge_shard_ranges', 'merge_items', + ['sync', 'merge_shard_ranges', 'merge_items', 'merge_syncs'], [call[0][0] for call in replicate_hook.call_args_list]) error_lines = daemon.logger.get_lines_for_level('error') @@ -1435,6 +1437,46 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self.assertEqual(1, daemon.stats['diff']) self.assertEqual(1, daemon.logger.get_increment_counts()['diffs']) + def test_sync_shard_ranges_timeout_in_fetch(self): + # verify that replication is not considered successful if + # merge_shard_ranges fails + put_time = Timestamp.now().internal + broker = self._get_broker('a', 'c', node_index=0) + broker.initialize(put_time, POLICIES.default.idx) + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_broker.initialize(put_time, POLICIES.default.idx) + # get an own shard range into remote broker + remote_broker.enable_sharding(Timestamp.now()) + + replicate_calls = [] + + def replicate_hook(op, *args): + replicate_calls.append(op) + if op == 'get_shard_ranges': + sleep(0.1) + + fake_repl_connection = attach_fake_replication_rpc( + self.rpc, replicate_hook=replicate_hook) + db_replicator.ReplConnection = fake_repl_connection + part, node = self._get_broker_part_node(remote_broker) + daemon = replicator.ContainerReplicator({'node_timeout': '0.001'}) + daemon.logger = FakeLogger() + with mock.patch.object(daemon.ring, 'get_part_nodes', + return_value=[node]), \ + mock.patch.object(daemon, '_post_replicate_hook'): + success, _ = daemon._replicate_object( + part, broker.db_file, node['id']) + self.assertFalse(success) + # broker only has its own shard range so expect objects to be sync'd + self.assertEqual(['sync', 'get_shard_ranges'], replicate_calls) + error_lines = daemon.logger.get_lines_for_level('error') + self.assertIn('ERROR syncing /', error_lines[0]) + self.assertFalse(error_lines[1:]) + self.assertEqual(0, daemon.stats['diff']) + self.assertNotIn('diffs', daemon.logger.get_increment_counts()) + self.assertEqual(1, daemon.stats['failure']) + self.assertEqual(1, daemon.logger.get_increment_counts()['failures']) + def test_sync_shard_ranges_none_to_sync(self): # verify that merge_shard_ranges is not sent if there are no shard # ranges to sync @@ -1456,6 +1498,38 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): daemon = replicator.ContainerReplicator({}) success = daemon._repl_to_node(node, broker, part, info) self.assertTrue(success) + # NB: remote has no shard ranges, so no call to get_shard_ranges + self.assertEqual( + ['sync', 'merge_items', 'merge_syncs'], + [call[0][0] for call in replicate_hook.call_args_list]) + + def test_sync_shard_ranges_trouble_receiving_so_none_to_sync(self): + # verify that merge_shard_ranges is not sent if local has no shard + # ranges to sync + put_time = Timestamp.now().internal + broker = self._get_broker('a', 'c', node_index=0) + broker.initialize(put_time, POLICIES.default.idx) + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_broker.initialize(put_time, POLICIES.default.idx) + # ensure the remote has at least one shard range + remote_broker.enable_sharding(Timestamp.now()) + # put an object into local broker + broker.put_object('obj', Timestamp.now().internal, 0, 'text/plain', + EMPTY_ETAG) + + replicate_hook = mock.MagicMock() + fake_repl_connection = attach_fake_replication_rpc( + self.rpc, errors={'get_shard_ranges': [ + FakeHTTPResponse(HTTPServerError())]}, + replicate_hook=replicate_hook) + db_replicator.ReplConnection = fake_repl_connection + part, node = self._get_broker_part_node(remote_broker) + info = broker.get_replication_info() + daemon = replicator.ContainerReplicator({}) + success = daemon._repl_to_node(node, broker, part, info) + self.assertTrue(success) + # NB: remote had shard ranges, but there was... some sort of issue + # in getting them locally, so no call to merge_shard_ranges self.assertEqual( ['sync', 'get_shard_ranges', 'merge_items', 'merge_syncs'], [call[0][0] for call in replicate_hook.call_args_list]) @@ -1645,6 +1719,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): remote_node_index, repl_conf, expected_shard_ranges, + remote_has_shards=True, expect_success=True): # expected_shard_ranges is expected final list of sync'd ranges daemon, repl_calls, rsync_calls = self.check_replicate( @@ -1655,17 +1730,22 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self.assertEqual(1, daemon.stats['deferred']) self.assertEqual(0, daemon.stats['diff']) self.assertEqual(0, daemon.stats['rsync']) - self.assertEqual(['sync', 'get_shard_ranges', 'merge_shard_ranges'], - [call[0] for call in repl_calls]) + if remote_has_shards: + exp_calls = ['sync', 'get_shard_ranges', 'merge_shard_ranges'] + else: + exp_calls = ['sync', 'merge_shard_ranges'] + self.assertEqual(exp_calls, [call[0] for call in repl_calls]) self.assertFalse(rsync_calls) # sync local_id = local_broker.get_info()['id'] self.assertEqual(local_id, repl_calls[0][1][2]) # get_shard_ranges - self.assertEqual((), repl_calls[1][1]) + if remote_has_shards: + self.assertEqual((), repl_calls[1][1]) # merge_shard_ranges for sending local shard ranges - self.assertShardRangesEqual(expected_shard_ranges, repl_calls[2][1][0]) - self.assertEqual(local_id, repl_calls[2][1][1]) + self.assertShardRangesEqual(expected_shard_ranges, + repl_calls[-1][1][0]) + self.assertEqual(local_id, repl_calls[-1][1][1]) remote_broker = self._get_broker( local_broker.account, local_broker.container, node_index=1) self.assertNotEqual(local_id, remote_broker.get_info()['id']) @@ -1836,7 +1916,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self._check_only_shard_ranges_replicated( local_broker, 1, repl_conf, - local_broker.get_shard_ranges(include_own=True)) + local_broker.get_shard_ranges(include_own=True), + remote_has_shards=False) remote_broker = self._get_broker('a', 'c', node_index=1) self.assertEqual( @@ -2063,6 +2144,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self._check_only_shard_ranges_replicated( local_broker, 1, repl_conf, local_broker.get_shard_ranges(include_own=True), + remote_has_shards=False, expect_success=True) # sharded broker takes object count from shard range whereas remote @@ -2081,6 +2163,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self._check_only_shard_ranges_replicated( local_broker, 1, repl_conf, local_broker.get_shard_ranges(include_own=True), + # We just sent shards, so of course remote has some + remote_has_shards=True, expect_success=True) remote_broker = self._get_broker('a', 'c', node_index=1) @@ -2247,7 +2331,9 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): repl_conf={'per_diff': 1}) mock_get_items_since.assert_not_called() - self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'], + # No call to get_shard_ranges because remote didn't have shard ranges + # when the sync arrived + self.assertEqual(['sync', 'rsync_then_merge'], [call[0] for call in repl_calls]) self.assertEqual(local_broker.db_file, rsync_calls[0][0]) self.assertEqual(local_broker.get_info()['id'], @@ -2286,7 +2372,9 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): repl_conf={'per_diff': 1}) mock_get_items_since.assert_not_called() - self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'], + # No call to get_shard_ranges because remote didn't have shard ranges + # when the sync arrived + self.assertEqual(['sync', 'rsync_then_merge'], [call[0] for call in repl_calls]) self.assertEqual(local_broker.db_file, rsync_calls[0][0]) self.assertEqual(local_broker.get_info()['id'], @@ -2327,7 +2415,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): local_broker, 1, expect_success=False, repl_conf={'per_diff': 1}) - self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'], + self.assertEqual(['sync', 'rsync_then_merge'], [call[0] for call in repl_calls]) self.assertEqual(local_broker.db_file, rsync_calls[0][0]) self.assertEqual(local_broker.get_info()['id'], @@ -2370,7 +2458,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): local_broker, 1, expect_success=False, repl_conf={'per_diff': 1}) - self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'], + self.assertEqual(['sync', 'rsync_then_merge'], [call[0] for call in repl_calls]) self.assertEqual(local_broker.db_file, rsync_calls[0][0]) self.assertEqual(local_broker.get_info()['id'],