Only try to fetch or sync shard ranges if the remote supports sharding
Change-Id: I7231e8af310e268484f2075f0194b7783cf1c3ea
This commit is contained in:
parent
c01c43d982
commit
f6a436aeda
@ -78,8 +78,9 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
broker.merge_timestamps(*(remote_info[key] for key in
|
||||
sync_timestamps))
|
||||
|
||||
# Grab remote's shard ranges, too
|
||||
self._fetch_and_merge_shard_ranges(http, broker)
|
||||
if 'shard_max_row' in remote_info:
|
||||
# Grab remote's shard ranges, too
|
||||
self._fetch_and_merge_shard_ranges(http, broker)
|
||||
|
||||
return super(ContainerReplicator, self)._handle_sync_response(
|
||||
node, response, info, broker, http, different_region)
|
||||
@ -101,8 +102,16 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
|
||||
def _choose_replication_mode(self, node, rinfo, info, local_sync, broker,
|
||||
http, different_region):
|
||||
# Always replicate shard ranges
|
||||
shard_range_success = self._sync_shard_ranges(broker, http, info['id'])
|
||||
if 'shard_max_row' in rinfo:
|
||||
# Always replicate shard ranges to new-enough swift
|
||||
shard_range_success = self._sync_shard_ranges(
|
||||
broker, http, info['id'])
|
||||
else:
|
||||
shard_range_success = False
|
||||
self.logger.warning(
|
||||
'%s is unable to replicate shard ranges to peer %s; '
|
||||
'peer may need upgrading', broker.db_file,
|
||||
'%(ip)s:%(port)s/%(device)s' % node)
|
||||
if broker.sharding_initiated():
|
||||
self.logger.warning(
|
||||
'%s is able to shard -- refusing to replicate objects to peer '
|
||||
|
@ -1860,6 +1860,77 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
self._check_replication_local_sharding_remote_unsharded(
|
||||
{'per_diff': 1})
|
||||
|
||||
def _check_only_sync(self, local_broker, remote_node_index, repl_conf):
|
||||
daemon, repl_calls, rsync_calls = self.check_replicate(
|
||||
local_broker, remote_node_index, repl_conf,
|
||||
expect_success=False)
|
||||
|
||||
# When talking to an old (pre-2.18.0) container server, abort
|
||||
# replication when we're sharding or sharded. Wait for the
|
||||
# rolling upgrade that's presumably in-progress to finish instead.
|
||||
self.assertEqual(1, daemon.stats['deferred'])
|
||||
self.assertEqual(0, daemon.stats['diff'])
|
||||
self.assertEqual(0, daemon.stats['rsync'])
|
||||
self.assertEqual(['sync'],
|
||||
[call[0] for call in repl_calls])
|
||||
self.assertFalse(rsync_calls)
|
||||
lines = daemon.logger.get_lines_for_level('warning')
|
||||
self.assertIn('unable to replicate shard ranges', lines[0])
|
||||
self.assertIn('refusing to replicate objects', lines[1])
|
||||
self.assertFalse(lines[2:])
|
||||
# sync
|
||||
local_id = local_broker.get_info()['id']
|
||||
self.assertEqual(local_id, repl_calls[0][1][2])
|
||||
remote_broker = self._get_broker(
|
||||
local_broker.account, local_broker.container, node_index=1)
|
||||
self.assertNotEqual(local_id, remote_broker.get_info()['id'])
|
||||
self.assertEqual([], remote_broker.get_shard_ranges())
|
||||
|
||||
def _check_replication_local_sharding_remote_presharding(self, repl_conf):
|
||||
local_context = self._setup_replication_test(0)
|
||||
self._merge_object(index=slice(0, 3), **local_context)
|
||||
local_broker = local_context['broker']
|
||||
epoch = Timestamp.now()
|
||||
self._goto_sharding_state(local_broker, epoch)
|
||||
self._merge_shard_range(index=0, **local_context)
|
||||
self._merge_object(index=slice(3, 11), **local_context)
|
||||
|
||||
remote_context = self._setup_replication_test(1)
|
||||
self._merge_object(index=11, **remote_context)
|
||||
|
||||
orig_get_remote_info = \
|
||||
replicator.ContainerReplicatorRpc._get_synced_replication_info
|
||||
|
||||
def presharding_get_remote_info(*args):
|
||||
rinfo = orig_get_remote_info(*args)
|
||||
del rinfo['shard_max_row']
|
||||
return rinfo
|
||||
|
||||
with mock.patch('swift.container.replicator.'
|
||||
'ContainerReplicatorRpc._get_synced_replication_info',
|
||||
presharding_get_remote_info):
|
||||
self._check_only_sync(local_broker, 1, repl_conf)
|
||||
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
self.assertEqual(
|
||||
[remote_broker._db_file], get_db_files(remote_broker.db_file))
|
||||
self.assertEqual(remote_context['objects'][11:12],
|
||||
remote_broker.get_objects())
|
||||
|
||||
self.assert_info_synced(
|
||||
local_broker, 1,
|
||||
mismatches=['db_state', 'object_count', 'bytes_used',
|
||||
'status_changed_at', 'hash'])
|
||||
|
||||
self._check_only_sync(local_broker, 1, repl_conf)
|
||||
|
||||
def test_replication_local_sharding_remote_presharding(self):
|
||||
self._check_replication_local_sharding_remote_presharding({})
|
||||
|
||||
def test_replication_local_sharding_remote_presharding_large_diff(self):
|
||||
self._check_replication_local_sharding_remote_presharding(
|
||||
{'per_diff': 1})
|
||||
|
||||
def _check_replication_local_sharding_remote_sharding(self, repl_conf):
|
||||
local_context = self._setup_replication_test(0)
|
||||
self._merge_object(index=slice(0, 5), **local_context)
|
||||
|
Loading…
Reference in New Issue
Block a user