diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index a5698a6d51..8f15527ff8 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -563,7 +563,7 @@ class Replicator(Daemon): :param partition: partition to be replicated to :param object_file: DB file name to be replicated - :param node_id: node id of the node to be replicated to + :param node_id: node id of the node to be replicated from :returns: a tuple (success, responses). ``success`` is a boolean that is True if the method completed successfully, False otherwise. ``responses`` is a list of booleans each of which indicates the diff --git a/swift/container/replicator.py b/swift/container/replicator.py index 2ffa7c3b28..b1c3f165d6 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -78,7 +78,7 @@ class ContainerReplicator(db_replicator.Replicator): broker.merge_timestamps(*(remote_info[key] for key in sync_timestamps)) - if 'shard_max_row' in remote_info: + if remote_info.get('shard_max_row', -1) >= 0: # Grab remote's shard ranges, too self._fetch_and_merge_shard_ranges(http, broker) @@ -136,7 +136,8 @@ class ContainerReplicator(db_replicator.Replicator): return shard_range_success and success def _fetch_and_merge_shard_ranges(self, http, broker): - response = http.replicate('get_shard_ranges') + with Timeout(self.node_timeout): + response = http.replicate('get_shard_ranges') if is_success(response.status): broker.merge_shard_ranges(json.loads( response.data.decode('ascii'))) diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index b2794d18a1..7548ab2f56 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -22,6 +22,8 @@ import mock import random import sqlite3 +from eventlet import sleep + from swift.common import db_replicator from swift.common.swob import HTTPServerError from swift.container import replicator, backend, server, sync_store @@ -1426,7 +1428,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self.assertFalse(success) # broker only has its own shard range so expect objects to be sync'd self.assertEqual( - ['sync', 'get_shard_ranges', 'merge_shard_ranges', 'merge_items', + ['sync', 'merge_shard_ranges', 'merge_items', 'merge_syncs'], [call[0][0] for call in replicate_hook.call_args_list]) error_lines = daemon.logger.get_lines_for_level('error') @@ -1435,6 +1437,46 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self.assertEqual(1, daemon.stats['diff']) self.assertEqual(1, daemon.logger.get_increment_counts()['diffs']) + def test_sync_shard_ranges_timeout_in_fetch(self): + # verify that replication is not considered successful if + # merge_shard_ranges fails + put_time = Timestamp.now().internal + broker = self._get_broker('a', 'c', node_index=0) + broker.initialize(put_time, POLICIES.default.idx) + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_broker.initialize(put_time, POLICIES.default.idx) + # get an own shard range into remote broker + remote_broker.enable_sharding(Timestamp.now()) + + replicate_calls = [] + + def replicate_hook(op, *args): + replicate_calls.append(op) + if op == 'get_shard_ranges': + sleep(0.1) + + fake_repl_connection = attach_fake_replication_rpc( + self.rpc, replicate_hook=replicate_hook) + db_replicator.ReplConnection = fake_repl_connection + part, node = self._get_broker_part_node(remote_broker) + daemon = replicator.ContainerReplicator({'node_timeout': '0.001'}) + daemon.logger = FakeLogger() + with mock.patch.object(daemon.ring, 'get_part_nodes', + return_value=[node]), \ + mock.patch.object(daemon, '_post_replicate_hook'): + success, _ = daemon._replicate_object( + part, broker.db_file, node['id']) + self.assertFalse(success) + # broker only has its own shard range so expect objects to be sync'd + self.assertEqual(['sync', 'get_shard_ranges'], replicate_calls) + error_lines = daemon.logger.get_lines_for_level('error') + self.assertIn('ERROR syncing /', error_lines[0]) + self.assertFalse(error_lines[1:]) + self.assertEqual(0, daemon.stats['diff']) + self.assertNotIn('diffs', daemon.logger.get_increment_counts()) + self.assertEqual(1, daemon.stats['failure']) + self.assertEqual(1, daemon.logger.get_increment_counts()['failures']) + def test_sync_shard_ranges_none_to_sync(self): # verify that merge_shard_ranges is not sent if there are no shard # ranges to sync @@ -1456,6 +1498,38 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): daemon = replicator.ContainerReplicator({}) success = daemon._repl_to_node(node, broker, part, info) self.assertTrue(success) + # NB: remote has no shard ranges, so no call to get_shard_ranges + self.assertEqual( + ['sync', 'merge_items', 'merge_syncs'], + [call[0][0] for call in replicate_hook.call_args_list]) + + def test_sync_shard_ranges_trouble_receiving_so_none_to_sync(self): + # verify that merge_shard_ranges is not sent if local has no shard + # ranges to sync + put_time = Timestamp.now().internal + broker = self._get_broker('a', 'c', node_index=0) + broker.initialize(put_time, POLICIES.default.idx) + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_broker.initialize(put_time, POLICIES.default.idx) + # ensure the remote has at least one shard range + remote_broker.enable_sharding(Timestamp.now()) + # put an object into local broker + broker.put_object('obj', Timestamp.now().internal, 0, 'text/plain', + EMPTY_ETAG) + + replicate_hook = mock.MagicMock() + fake_repl_connection = attach_fake_replication_rpc( + self.rpc, errors={'get_shard_ranges': [ + FakeHTTPResponse(HTTPServerError())]}, + replicate_hook=replicate_hook) + db_replicator.ReplConnection = fake_repl_connection + part, node = self._get_broker_part_node(remote_broker) + info = broker.get_replication_info() + daemon = replicator.ContainerReplicator({}) + success = daemon._repl_to_node(node, broker, part, info) + self.assertTrue(success) + # NB: remote had shard ranges, but there was... some sort of issue + # in getting them locally, so no call to merge_shard_ranges self.assertEqual( ['sync', 'get_shard_ranges', 'merge_items', 'merge_syncs'], [call[0][0] for call in replicate_hook.call_args_list]) @@ -1645,6 +1719,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): remote_node_index, repl_conf, expected_shard_ranges, + remote_has_shards=True, expect_success=True): # expected_shard_ranges is expected final list of sync'd ranges daemon, repl_calls, rsync_calls = self.check_replicate( @@ -1655,17 +1730,22 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self.assertEqual(1, daemon.stats['deferred']) self.assertEqual(0, daemon.stats['diff']) self.assertEqual(0, daemon.stats['rsync']) - self.assertEqual(['sync', 'get_shard_ranges', 'merge_shard_ranges'], - [call[0] for call in repl_calls]) + if remote_has_shards: + exp_calls = ['sync', 'get_shard_ranges', 'merge_shard_ranges'] + else: + exp_calls = ['sync', 'merge_shard_ranges'] + self.assertEqual(exp_calls, [call[0] for call in repl_calls]) self.assertFalse(rsync_calls) # sync local_id = local_broker.get_info()['id'] self.assertEqual(local_id, repl_calls[0][1][2]) # get_shard_ranges - self.assertEqual((), repl_calls[1][1]) + if remote_has_shards: + self.assertEqual((), repl_calls[1][1]) # merge_shard_ranges for sending local shard ranges - self.assertShardRangesEqual(expected_shard_ranges, repl_calls[2][1][0]) - self.assertEqual(local_id, repl_calls[2][1][1]) + self.assertShardRangesEqual(expected_shard_ranges, + repl_calls[-1][1][0]) + self.assertEqual(local_id, repl_calls[-1][1][1]) remote_broker = self._get_broker( local_broker.account, local_broker.container, node_index=1) self.assertNotEqual(local_id, remote_broker.get_info()['id']) @@ -1836,7 +1916,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self._check_only_shard_ranges_replicated( local_broker, 1, repl_conf, - local_broker.get_shard_ranges(include_own=True)) + local_broker.get_shard_ranges(include_own=True), + remote_has_shards=False) remote_broker = self._get_broker('a', 'c', node_index=1) self.assertEqual( @@ -2063,6 +2144,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self._check_only_shard_ranges_replicated( local_broker, 1, repl_conf, local_broker.get_shard_ranges(include_own=True), + remote_has_shards=False, expect_success=True) # sharded broker takes object count from shard range whereas remote @@ -2081,6 +2163,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self._check_only_shard_ranges_replicated( local_broker, 1, repl_conf, local_broker.get_shard_ranges(include_own=True), + # We just sent shards, so of course remote has some + remote_has_shards=True, expect_success=True) remote_broker = self._get_broker('a', 'c', node_index=1) @@ -2247,7 +2331,9 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): repl_conf={'per_diff': 1}) mock_get_items_since.assert_not_called() - self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'], + # No call to get_shard_ranges because remote didn't have shard ranges + # when the sync arrived + self.assertEqual(['sync', 'rsync_then_merge'], [call[0] for call in repl_calls]) self.assertEqual(local_broker.db_file, rsync_calls[0][0]) self.assertEqual(local_broker.get_info()['id'], @@ -2286,7 +2372,9 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): repl_conf={'per_diff': 1}) mock_get_items_since.assert_not_called() - self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'], + # No call to get_shard_ranges because remote didn't have shard ranges + # when the sync arrived + self.assertEqual(['sync', 'rsync_then_merge'], [call[0] for call in repl_calls]) self.assertEqual(local_broker.db_file, rsync_calls[0][0]) self.assertEqual(local_broker.get_info()['id'], @@ -2327,7 +2415,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): local_broker, 1, expect_success=False, repl_conf={'per_diff': 1}) - self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'], + self.assertEqual(['sync', 'rsync_then_merge'], [call[0] for call in repl_calls]) self.assertEqual(local_broker.db_file, rsync_calls[0][0]) self.assertEqual(local_broker.get_info()['id'], @@ -2370,7 +2458,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): local_broker, 1, expect_success=False, repl_conf={'per_diff': 1}) - self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'], + self.assertEqual(['sync', 'rsync_then_merge'], [call[0] for call in repl_calls]) self.assertEqual(local_broker.db_file, rsync_calls[0][0]) self.assertEqual(local_broker.get_info()['id'],