container-replicator: Add a timeout for get_shard_ranges
Previously this had no timeout, which meant that the replicator might hang and fail to make progress indefinitely while trying to receive shard ranges. While we're at it, only call get_shard_ranges when the remote indicates that it has shard ranges for us to sync -- this reduces the number of requests necessary to bring unsharded replicas in sync. Change-Id: I32f51f42d76db38271442a261600089404a00f91 Closes-Bug: #1835260
This commit is contained in:
parent
dc84c69c65
commit
4c4bd778ea
@ -563,7 +563,7 @@ class Replicator(Daemon):
|
||||
|
||||
:param partition: partition to be replicated to
|
||||
:param object_file: DB file name to be replicated
|
||||
:param node_id: node id of the node to be replicated to
|
||||
:param node_id: node id of the node to be replicated from
|
||||
:returns: a tuple (success, responses). ``success`` is a boolean that
|
||||
is True if the method completed successfully, False otherwise.
|
||||
``responses`` is a list of booleans each of which indicates the
|
||||
|
@ -78,7 +78,7 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
broker.merge_timestamps(*(remote_info[key] for key in
|
||||
sync_timestamps))
|
||||
|
||||
if 'shard_max_row' in remote_info:
|
||||
if remote_info.get('shard_max_row', -1) >= 0:
|
||||
# Grab remote's shard ranges, too
|
||||
self._fetch_and_merge_shard_ranges(http, broker)
|
||||
|
||||
@ -136,7 +136,8 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
return shard_range_success and success
|
||||
|
||||
def _fetch_and_merge_shard_ranges(self, http, broker):
|
||||
response = http.replicate('get_shard_ranges')
|
||||
with Timeout(self.node_timeout):
|
||||
response = http.replicate('get_shard_ranges')
|
||||
if is_success(response.status):
|
||||
broker.merge_shard_ranges(json.loads(
|
||||
response.data.decode('ascii')))
|
||||
|
@ -22,6 +22,8 @@ import mock
|
||||
import random
|
||||
import sqlite3
|
||||
|
||||
from eventlet import sleep
|
||||
|
||||
from swift.common import db_replicator
|
||||
from swift.common.swob import HTTPServerError
|
||||
from swift.container import replicator, backend, server, sync_store
|
||||
@ -1426,7 +1428,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
self.assertFalse(success)
|
||||
# broker only has its own shard range so expect objects to be sync'd
|
||||
self.assertEqual(
|
||||
['sync', 'get_shard_ranges', 'merge_shard_ranges', 'merge_items',
|
||||
['sync', 'merge_shard_ranges', 'merge_items',
|
||||
'merge_syncs'],
|
||||
[call[0][0] for call in replicate_hook.call_args_list])
|
||||
error_lines = daemon.logger.get_lines_for_level('error')
|
||||
@ -1435,6 +1437,46 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
self.assertEqual(1, daemon.stats['diff'])
|
||||
self.assertEqual(1, daemon.logger.get_increment_counts()['diffs'])
|
||||
|
||||
def test_sync_shard_ranges_timeout_in_fetch(self):
|
||||
# verify that replication is not considered successful if
|
||||
# merge_shard_ranges fails
|
||||
put_time = Timestamp.now().internal
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
broker.initialize(put_time, POLICIES.default.idx)
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
remote_broker.initialize(put_time, POLICIES.default.idx)
|
||||
# get an own shard range into remote broker
|
||||
remote_broker.enable_sharding(Timestamp.now())
|
||||
|
||||
replicate_calls = []
|
||||
|
||||
def replicate_hook(op, *args):
|
||||
replicate_calls.append(op)
|
||||
if op == 'get_shard_ranges':
|
||||
sleep(0.1)
|
||||
|
||||
fake_repl_connection = attach_fake_replication_rpc(
|
||||
self.rpc, replicate_hook=replicate_hook)
|
||||
db_replicator.ReplConnection = fake_repl_connection
|
||||
part, node = self._get_broker_part_node(remote_broker)
|
||||
daemon = replicator.ContainerReplicator({'node_timeout': '0.001'})
|
||||
daemon.logger = FakeLogger()
|
||||
with mock.patch.object(daemon.ring, 'get_part_nodes',
|
||||
return_value=[node]), \
|
||||
mock.patch.object(daemon, '_post_replicate_hook'):
|
||||
success, _ = daemon._replicate_object(
|
||||
part, broker.db_file, node['id'])
|
||||
self.assertFalse(success)
|
||||
# broker only has its own shard range so expect objects to be sync'd
|
||||
self.assertEqual(['sync', 'get_shard_ranges'], replicate_calls)
|
||||
error_lines = daemon.logger.get_lines_for_level('error')
|
||||
self.assertIn('ERROR syncing /', error_lines[0])
|
||||
self.assertFalse(error_lines[1:])
|
||||
self.assertEqual(0, daemon.stats['diff'])
|
||||
self.assertNotIn('diffs', daemon.logger.get_increment_counts())
|
||||
self.assertEqual(1, daemon.stats['failure'])
|
||||
self.assertEqual(1, daemon.logger.get_increment_counts()['failures'])
|
||||
|
||||
def test_sync_shard_ranges_none_to_sync(self):
|
||||
# verify that merge_shard_ranges is not sent if there are no shard
|
||||
# ranges to sync
|
||||
@ -1456,6 +1498,38 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
daemon = replicator.ContainerReplicator({})
|
||||
success = daemon._repl_to_node(node, broker, part, info)
|
||||
self.assertTrue(success)
|
||||
# NB: remote has no shard ranges, so no call to get_shard_ranges
|
||||
self.assertEqual(
|
||||
['sync', 'merge_items', 'merge_syncs'],
|
||||
[call[0][0] for call in replicate_hook.call_args_list])
|
||||
|
||||
def test_sync_shard_ranges_trouble_receiving_so_none_to_sync(self):
|
||||
# verify that merge_shard_ranges is not sent if local has no shard
|
||||
# ranges to sync
|
||||
put_time = Timestamp.now().internal
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
broker.initialize(put_time, POLICIES.default.idx)
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
remote_broker.initialize(put_time, POLICIES.default.idx)
|
||||
# ensure the remote has at least one shard range
|
||||
remote_broker.enable_sharding(Timestamp.now())
|
||||
# put an object into local broker
|
||||
broker.put_object('obj', Timestamp.now().internal, 0, 'text/plain',
|
||||
EMPTY_ETAG)
|
||||
|
||||
replicate_hook = mock.MagicMock()
|
||||
fake_repl_connection = attach_fake_replication_rpc(
|
||||
self.rpc, errors={'get_shard_ranges': [
|
||||
FakeHTTPResponse(HTTPServerError())]},
|
||||
replicate_hook=replicate_hook)
|
||||
db_replicator.ReplConnection = fake_repl_connection
|
||||
part, node = self._get_broker_part_node(remote_broker)
|
||||
info = broker.get_replication_info()
|
||||
daemon = replicator.ContainerReplicator({})
|
||||
success = daemon._repl_to_node(node, broker, part, info)
|
||||
self.assertTrue(success)
|
||||
# NB: remote had shard ranges, but there was... some sort of issue
|
||||
# in getting them locally, so no call to merge_shard_ranges
|
||||
self.assertEqual(
|
||||
['sync', 'get_shard_ranges', 'merge_items', 'merge_syncs'],
|
||||
[call[0][0] for call in replicate_hook.call_args_list])
|
||||
@ -1645,6 +1719,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
remote_node_index,
|
||||
repl_conf,
|
||||
expected_shard_ranges,
|
||||
remote_has_shards=True,
|
||||
expect_success=True):
|
||||
# expected_shard_ranges is expected final list of sync'd ranges
|
||||
daemon, repl_calls, rsync_calls = self.check_replicate(
|
||||
@ -1655,17 +1730,22 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
self.assertEqual(1, daemon.stats['deferred'])
|
||||
self.assertEqual(0, daemon.stats['diff'])
|
||||
self.assertEqual(0, daemon.stats['rsync'])
|
||||
self.assertEqual(['sync', 'get_shard_ranges', 'merge_shard_ranges'],
|
||||
[call[0] for call in repl_calls])
|
||||
if remote_has_shards:
|
||||
exp_calls = ['sync', 'get_shard_ranges', 'merge_shard_ranges']
|
||||
else:
|
||||
exp_calls = ['sync', 'merge_shard_ranges']
|
||||
self.assertEqual(exp_calls, [call[0] for call in repl_calls])
|
||||
self.assertFalse(rsync_calls)
|
||||
# sync
|
||||
local_id = local_broker.get_info()['id']
|
||||
self.assertEqual(local_id, repl_calls[0][1][2])
|
||||
# get_shard_ranges
|
||||
self.assertEqual((), repl_calls[1][1])
|
||||
if remote_has_shards:
|
||||
self.assertEqual((), repl_calls[1][1])
|
||||
# merge_shard_ranges for sending local shard ranges
|
||||
self.assertShardRangesEqual(expected_shard_ranges, repl_calls[2][1][0])
|
||||
self.assertEqual(local_id, repl_calls[2][1][1])
|
||||
self.assertShardRangesEqual(expected_shard_ranges,
|
||||
repl_calls[-1][1][0])
|
||||
self.assertEqual(local_id, repl_calls[-1][1][1])
|
||||
remote_broker = self._get_broker(
|
||||
local_broker.account, local_broker.container, node_index=1)
|
||||
self.assertNotEqual(local_id, remote_broker.get_info()['id'])
|
||||
@ -1836,7 +1916,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
|
||||
self._check_only_shard_ranges_replicated(
|
||||
local_broker, 1, repl_conf,
|
||||
local_broker.get_shard_ranges(include_own=True))
|
||||
local_broker.get_shard_ranges(include_own=True),
|
||||
remote_has_shards=False)
|
||||
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
self.assertEqual(
|
||||
@ -2063,6 +2144,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
self._check_only_shard_ranges_replicated(
|
||||
local_broker, 1, repl_conf,
|
||||
local_broker.get_shard_ranges(include_own=True),
|
||||
remote_has_shards=False,
|
||||
expect_success=True)
|
||||
|
||||
# sharded broker takes object count from shard range whereas remote
|
||||
@ -2081,6 +2163,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
self._check_only_shard_ranges_replicated(
|
||||
local_broker, 1, repl_conf,
|
||||
local_broker.get_shard_ranges(include_own=True),
|
||||
# We just sent shards, so of course remote has some
|
||||
remote_has_shards=True,
|
||||
expect_success=True)
|
||||
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
@ -2247,7 +2331,9 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
repl_conf={'per_diff': 1})
|
||||
|
||||
mock_get_items_since.assert_not_called()
|
||||
self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'],
|
||||
# No call to get_shard_ranges because remote didn't have shard ranges
|
||||
# when the sync arrived
|
||||
self.assertEqual(['sync', 'rsync_then_merge'],
|
||||
[call[0] for call in repl_calls])
|
||||
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
|
||||
self.assertEqual(local_broker.get_info()['id'],
|
||||
@ -2286,7 +2372,9 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
repl_conf={'per_diff': 1})
|
||||
|
||||
mock_get_items_since.assert_not_called()
|
||||
self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'],
|
||||
# No call to get_shard_ranges because remote didn't have shard ranges
|
||||
# when the sync arrived
|
||||
self.assertEqual(['sync', 'rsync_then_merge'],
|
||||
[call[0] for call in repl_calls])
|
||||
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
|
||||
self.assertEqual(local_broker.get_info()['id'],
|
||||
@ -2327,7 +2415,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
local_broker, 1, expect_success=False,
|
||||
repl_conf={'per_diff': 1})
|
||||
|
||||
self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'],
|
||||
self.assertEqual(['sync', 'rsync_then_merge'],
|
||||
[call[0] for call in repl_calls])
|
||||
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
|
||||
self.assertEqual(local_broker.get_info()['id'],
|
||||
@ -2370,7 +2458,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
local_broker, 1, expect_success=False,
|
||||
repl_conf={'per_diff': 1})
|
||||
|
||||
self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'],
|
||||
self.assertEqual(['sync', 'rsync_then_merge'],
|
||||
[call[0] for call in repl_calls])
|
||||
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
|
||||
self.assertEqual(local_broker.get_info()['id'],
|
||||
|
Loading…
x
Reference in New Issue
Block a user