Merge "container-replicator: Add a timeout for get_shard_ranges"
This commit is contained in:
commit
392c967af5
@ -563,7 +563,7 @@ class Replicator(Daemon):
|
||||
|
||||
:param partition: partition to be replicated to
|
||||
:param object_file: DB file name to be replicated
|
||||
:param node_id: node id of the node to be replicated to
|
||||
:param node_id: node id of the node to be replicated from
|
||||
:returns: a tuple (success, responses). ``success`` is a boolean that
|
||||
is True if the method completed successfully, False otherwise.
|
||||
``responses`` is a list of booleans each of which indicates the
|
||||
|
@ -78,7 +78,7 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
broker.merge_timestamps(*(remote_info[key] for key in
|
||||
sync_timestamps))
|
||||
|
||||
if 'shard_max_row' in remote_info:
|
||||
if remote_info.get('shard_max_row', -1) >= 0:
|
||||
# Grab remote's shard ranges, too
|
||||
self._fetch_and_merge_shard_ranges(http, broker)
|
||||
|
||||
@ -136,7 +136,8 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
return shard_range_success and success
|
||||
|
||||
def _fetch_and_merge_shard_ranges(self, http, broker):
|
||||
response = http.replicate('get_shard_ranges')
|
||||
with Timeout(self.node_timeout):
|
||||
response = http.replicate('get_shard_ranges')
|
||||
if is_success(response.status):
|
||||
broker.merge_shard_ranges(json.loads(
|
||||
response.data.decode('ascii')))
|
||||
|
@ -22,6 +22,8 @@ import mock
|
||||
import random
|
||||
import sqlite3
|
||||
|
||||
from eventlet import sleep
|
||||
|
||||
from swift.common import db_replicator
|
||||
from swift.common.swob import HTTPServerError
|
||||
from swift.container import replicator, backend, server, sync_store
|
||||
@ -1426,7 +1428,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
self.assertFalse(success)
|
||||
# broker only has its own shard range so expect objects to be sync'd
|
||||
self.assertEqual(
|
||||
['sync', 'get_shard_ranges', 'merge_shard_ranges', 'merge_items',
|
||||
['sync', 'merge_shard_ranges', 'merge_items',
|
||||
'merge_syncs'],
|
||||
[call[0][0] for call in replicate_hook.call_args_list])
|
||||
error_lines = daemon.logger.get_lines_for_level('error')
|
||||
@ -1435,6 +1437,46 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
self.assertEqual(1, daemon.stats['diff'])
|
||||
self.assertEqual(1, daemon.logger.get_increment_counts()['diffs'])
|
||||
|
||||
def test_sync_shard_ranges_timeout_in_fetch(self):
|
||||
# verify that replication is not considered successful if
|
||||
# merge_shard_ranges fails
|
||||
put_time = Timestamp.now().internal
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
broker.initialize(put_time, POLICIES.default.idx)
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
remote_broker.initialize(put_time, POLICIES.default.idx)
|
||||
# get an own shard range into remote broker
|
||||
remote_broker.enable_sharding(Timestamp.now())
|
||||
|
||||
replicate_calls = []
|
||||
|
||||
def replicate_hook(op, *args):
|
||||
replicate_calls.append(op)
|
||||
if op == 'get_shard_ranges':
|
||||
sleep(0.1)
|
||||
|
||||
fake_repl_connection = attach_fake_replication_rpc(
|
||||
self.rpc, replicate_hook=replicate_hook)
|
||||
db_replicator.ReplConnection = fake_repl_connection
|
||||
part, node = self._get_broker_part_node(remote_broker)
|
||||
daemon = replicator.ContainerReplicator({'node_timeout': '0.001'})
|
||||
daemon.logger = FakeLogger()
|
||||
with mock.patch.object(daemon.ring, 'get_part_nodes',
|
||||
return_value=[node]), \
|
||||
mock.patch.object(daemon, '_post_replicate_hook'):
|
||||
success, _ = daemon._replicate_object(
|
||||
part, broker.db_file, node['id'])
|
||||
self.assertFalse(success)
|
||||
# broker only has its own shard range so expect objects to be sync'd
|
||||
self.assertEqual(['sync', 'get_shard_ranges'], replicate_calls)
|
||||
error_lines = daemon.logger.get_lines_for_level('error')
|
||||
self.assertIn('ERROR syncing /', error_lines[0])
|
||||
self.assertFalse(error_lines[1:])
|
||||
self.assertEqual(0, daemon.stats['diff'])
|
||||
self.assertNotIn('diffs', daemon.logger.get_increment_counts())
|
||||
self.assertEqual(1, daemon.stats['failure'])
|
||||
self.assertEqual(1, daemon.logger.get_increment_counts()['failures'])
|
||||
|
||||
def test_sync_shard_ranges_none_to_sync(self):
|
||||
# verify that merge_shard_ranges is not sent if there are no shard
|
||||
# ranges to sync
|
||||
@ -1456,6 +1498,38 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
daemon = replicator.ContainerReplicator({})
|
||||
success = daemon._repl_to_node(node, broker, part, info)
|
||||
self.assertTrue(success)
|
||||
# NB: remote has no shard ranges, so no call to get_shard_ranges
|
||||
self.assertEqual(
|
||||
['sync', 'merge_items', 'merge_syncs'],
|
||||
[call[0][0] for call in replicate_hook.call_args_list])
|
||||
|
||||
def test_sync_shard_ranges_trouble_receiving_so_none_to_sync(self):
|
||||
# verify that merge_shard_ranges is not sent if local has no shard
|
||||
# ranges to sync
|
||||
put_time = Timestamp.now().internal
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
broker.initialize(put_time, POLICIES.default.idx)
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
remote_broker.initialize(put_time, POLICIES.default.idx)
|
||||
# ensure the remote has at least one shard range
|
||||
remote_broker.enable_sharding(Timestamp.now())
|
||||
# put an object into local broker
|
||||
broker.put_object('obj', Timestamp.now().internal, 0, 'text/plain',
|
||||
EMPTY_ETAG)
|
||||
|
||||
replicate_hook = mock.MagicMock()
|
||||
fake_repl_connection = attach_fake_replication_rpc(
|
||||
self.rpc, errors={'get_shard_ranges': [
|
||||
FakeHTTPResponse(HTTPServerError())]},
|
||||
replicate_hook=replicate_hook)
|
||||
db_replicator.ReplConnection = fake_repl_connection
|
||||
part, node = self._get_broker_part_node(remote_broker)
|
||||
info = broker.get_replication_info()
|
||||
daemon = replicator.ContainerReplicator({})
|
||||
success = daemon._repl_to_node(node, broker, part, info)
|
||||
self.assertTrue(success)
|
||||
# NB: remote had shard ranges, but there was... some sort of issue
|
||||
# in getting them locally, so no call to merge_shard_ranges
|
||||
self.assertEqual(
|
||||
['sync', 'get_shard_ranges', 'merge_items', 'merge_syncs'],
|
||||
[call[0][0] for call in replicate_hook.call_args_list])
|
||||
@ -1645,6 +1719,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
remote_node_index,
|
||||
repl_conf,
|
||||
expected_shard_ranges,
|
||||
remote_has_shards=True,
|
||||
expect_success=True):
|
||||
# expected_shard_ranges is expected final list of sync'd ranges
|
||||
daemon, repl_calls, rsync_calls = self.check_replicate(
|
||||
@ -1655,17 +1730,22 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
self.assertEqual(1, daemon.stats['deferred'])
|
||||
self.assertEqual(0, daemon.stats['diff'])
|
||||
self.assertEqual(0, daemon.stats['rsync'])
|
||||
self.assertEqual(['sync', 'get_shard_ranges', 'merge_shard_ranges'],
|
||||
[call[0] for call in repl_calls])
|
||||
if remote_has_shards:
|
||||
exp_calls = ['sync', 'get_shard_ranges', 'merge_shard_ranges']
|
||||
else:
|
||||
exp_calls = ['sync', 'merge_shard_ranges']
|
||||
self.assertEqual(exp_calls, [call[0] for call in repl_calls])
|
||||
self.assertFalse(rsync_calls)
|
||||
# sync
|
||||
local_id = local_broker.get_info()['id']
|
||||
self.assertEqual(local_id, repl_calls[0][1][2])
|
||||
# get_shard_ranges
|
||||
self.assertEqual((), repl_calls[1][1])
|
||||
if remote_has_shards:
|
||||
self.assertEqual((), repl_calls[1][1])
|
||||
# merge_shard_ranges for sending local shard ranges
|
||||
self.assertShardRangesEqual(expected_shard_ranges, repl_calls[2][1][0])
|
||||
self.assertEqual(local_id, repl_calls[2][1][1])
|
||||
self.assertShardRangesEqual(expected_shard_ranges,
|
||||
repl_calls[-1][1][0])
|
||||
self.assertEqual(local_id, repl_calls[-1][1][1])
|
||||
remote_broker = self._get_broker(
|
||||
local_broker.account, local_broker.container, node_index=1)
|
||||
self.assertNotEqual(local_id, remote_broker.get_info()['id'])
|
||||
@ -1836,7 +1916,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
|
||||
self._check_only_shard_ranges_replicated(
|
||||
local_broker, 1, repl_conf,
|
||||
local_broker.get_shard_ranges(include_own=True))
|
||||
local_broker.get_shard_ranges(include_own=True),
|
||||
remote_has_shards=False)
|
||||
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
self.assertEqual(
|
||||
@ -2063,6 +2144,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
self._check_only_shard_ranges_replicated(
|
||||
local_broker, 1, repl_conf,
|
||||
local_broker.get_shard_ranges(include_own=True),
|
||||
remote_has_shards=False,
|
||||
expect_success=True)
|
||||
|
||||
# sharded broker takes object count from shard range whereas remote
|
||||
@ -2081,6 +2163,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
self._check_only_shard_ranges_replicated(
|
||||
local_broker, 1, repl_conf,
|
||||
local_broker.get_shard_ranges(include_own=True),
|
||||
# We just sent shards, so of course remote has some
|
||||
remote_has_shards=True,
|
||||
expect_success=True)
|
||||
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
@ -2247,7 +2331,9 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
repl_conf={'per_diff': 1})
|
||||
|
||||
mock_get_items_since.assert_not_called()
|
||||
self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'],
|
||||
# No call to get_shard_ranges because remote didn't have shard ranges
|
||||
# when the sync arrived
|
||||
self.assertEqual(['sync', 'rsync_then_merge'],
|
||||
[call[0] for call in repl_calls])
|
||||
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
|
||||
self.assertEqual(local_broker.get_info()['id'],
|
||||
@ -2286,7 +2372,9 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
repl_conf={'per_diff': 1})
|
||||
|
||||
mock_get_items_since.assert_not_called()
|
||||
self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'],
|
||||
# No call to get_shard_ranges because remote didn't have shard ranges
|
||||
# when the sync arrived
|
||||
self.assertEqual(['sync', 'rsync_then_merge'],
|
||||
[call[0] for call in repl_calls])
|
||||
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
|
||||
self.assertEqual(local_broker.get_info()['id'],
|
||||
@ -2327,7 +2415,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
local_broker, 1, expect_success=False,
|
||||
repl_conf={'per_diff': 1})
|
||||
|
||||
self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'],
|
||||
self.assertEqual(['sync', 'rsync_then_merge'],
|
||||
[call[0] for call in repl_calls])
|
||||
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
|
||||
self.assertEqual(local_broker.get_info()['id'],
|
||||
@ -2370,7 +2458,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
local_broker, 1, expect_success=False,
|
||||
repl_conf={'per_diff': 1})
|
||||
|
||||
self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'],
|
||||
self.assertEqual(['sync', 'rsync_then_merge'],
|
||||
[call[0] for call in repl_calls])
|
||||
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
|
||||
self.assertEqual(local_broker.get_info()['id'],
|
||||
|
Loading…
x
Reference in New Issue
Block a user