diff --git a/swift/container/replicator.py b/swift/container/replicator.py index 235630958c..d4cd835cb6 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -78,8 +78,9 @@ class ContainerReplicator(db_replicator.Replicator): broker.merge_timestamps(*(remote_info[key] for key in sync_timestamps)) - # Grab remote's shard ranges, too - self._fetch_and_merge_shard_ranges(http, broker) + if 'shard_max_row' in remote_info: + # Grab remote's shard ranges, too + self._fetch_and_merge_shard_ranges(http, broker) return super(ContainerReplicator, self)._handle_sync_response( node, response, info, broker, http, different_region) @@ -101,8 +102,16 @@ class ContainerReplicator(db_replicator.Replicator): def _choose_replication_mode(self, node, rinfo, info, local_sync, broker, http, different_region): - # Always replicate shard ranges - shard_range_success = self._sync_shard_ranges(broker, http, info['id']) + if 'shard_max_row' in rinfo: + # Always replicate shard ranges to new-enough swift + shard_range_success = self._sync_shard_ranges( + broker, http, info['id']) + else: + shard_range_success = False + self.logger.warning( + '%s is unable to replicate shard ranges to peer %s; ' + 'peer may need upgrading', broker.db_file, + '%(ip)s:%(port)s/%(device)s' % node) if broker.sharding_initiated(): self.logger.warning( '%s is able to shard -- refusing to replicate objects to peer ' diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index 8e399e6512..2cafabf164 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -1860,6 +1860,77 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self._check_replication_local_sharding_remote_unsharded( {'per_diff': 1}) + def _check_only_sync(self, local_broker, remote_node_index, repl_conf): + daemon, repl_calls, rsync_calls = self.check_replicate( + local_broker, remote_node_index, repl_conf, + expect_success=False) + + # When talking to an old (pre-2.18.0) container server, abort + # replication when we're sharding or sharded. Wait for the + # rolling upgrade that's presumably in-progress to finish instead. + self.assertEqual(1, daemon.stats['deferred']) + self.assertEqual(0, daemon.stats['diff']) + self.assertEqual(0, daemon.stats['rsync']) + self.assertEqual(['sync'], + [call[0] for call in repl_calls]) + self.assertFalse(rsync_calls) + lines = daemon.logger.get_lines_for_level('warning') + self.assertIn('unable to replicate shard ranges', lines[0]) + self.assertIn('refusing to replicate objects', lines[1]) + self.assertFalse(lines[2:]) + # sync + local_id = local_broker.get_info()['id'] + self.assertEqual(local_id, repl_calls[0][1][2]) + remote_broker = self._get_broker( + local_broker.account, local_broker.container, node_index=1) + self.assertNotEqual(local_id, remote_broker.get_info()['id']) + self.assertEqual([], remote_broker.get_shard_ranges()) + + def _check_replication_local_sharding_remote_presharding(self, repl_conf): + local_context = self._setup_replication_test(0) + self._merge_object(index=slice(0, 3), **local_context) + local_broker = local_context['broker'] + epoch = Timestamp.now() + self._goto_sharding_state(local_broker, epoch) + self._merge_shard_range(index=0, **local_context) + self._merge_object(index=slice(3, 11), **local_context) + + remote_context = self._setup_replication_test(1) + self._merge_object(index=11, **remote_context) + + orig_get_remote_info = \ + replicator.ContainerReplicatorRpc._get_synced_replication_info + + def presharding_get_remote_info(*args): + rinfo = orig_get_remote_info(*args) + del rinfo['shard_max_row'] + return rinfo + + with mock.patch('swift.container.replicator.' + 'ContainerReplicatorRpc._get_synced_replication_info', + presharding_get_remote_info): + self._check_only_sync(local_broker, 1, repl_conf) + + remote_broker = self._get_broker('a', 'c', node_index=1) + self.assertEqual( + [remote_broker._db_file], get_db_files(remote_broker.db_file)) + self.assertEqual(remote_context['objects'][11:12], + remote_broker.get_objects()) + + self.assert_info_synced( + local_broker, 1, + mismatches=['db_state', 'object_count', 'bytes_used', + 'status_changed_at', 'hash']) + + self._check_only_sync(local_broker, 1, repl_conf) + + def test_replication_local_sharding_remote_presharding(self): + self._check_replication_local_sharding_remote_presharding({}) + + def test_replication_local_sharding_remote_presharding_large_diff(self): + self._check_replication_local_sharding_remote_presharding( + {'per_diff': 1}) + def _check_replication_local_sharding_remote_sharding(self, repl_conf): local_context = self._setup_replication_test(0) self._merge_object(index=slice(0, 5), **local_context)