sharding: don't replace own_shard_range without an epoch

We've observed a root container suddenly thinks it's unsharded when it's
own_shard_range is reset.  This patch blocks a remote osr with an epoch
of None from overwriting a local epoched OSR.

The only way we've observed this happen is when a new replica or handoff
node creates a container and it's new own_shard_range is created without
an epoch and then replicated to older primaries.

However, if a bad node with a non-epoched OSR is on a primary, it's
newer timestamp would prevent pulling the good osr from it's peers.  So
it'll be left stuck with it's bad one.

When this happens expect to see a bunch of:
    Ignoring remote osr w/o epoch: x, from: y

When an OSR comes in from a replica that doesn't have an epoch when
it should, we do a pre-flight check to see if it would remove the epoch
before emitting the error above. We do this because when sharding is
first initiated it's perfectly valid to get OSR's without epochs from
replicas. This is expected and harmless.

Closes-bug: #1980451
Change-Id: I069bdbeb430e89074605e40525d955b3a704a44f
This commit is contained in:
Matthew Oliver 2021-09-08 16:29:30 +10:00 committed by Tim Burke
parent bdbabbb809
commit 8227f4539c
4 changed files with 576 additions and 18 deletions

View File

@ -20,7 +20,8 @@ from eventlet import Timeout
from random import choice
from swift.container.sync_store import ContainerSyncStore
from swift.container.backend import ContainerBroker, DATADIR, SHARDED
from swift.container.backend import ContainerBroker, DATADIR, SHARDED, \
merge_shards
from swift.container.reconciler import (
MISPLACED_OBJECTS_ACCOUNT, incorrect_policy_index,
get_reconciler_container_name, get_row_to_q_entry_translator)
@ -31,6 +32,35 @@ from swift.common.http import is_success
from swift.common.utils import Timestamp, majority_size, get_db_files
def check_merge_own_shard_range(shards, broker, logger, source):
"""
If broker has own_shard_range *with an epoch* then filter out an
own_shard_range *without an epoch*, and log a warning about it.
:param shards: a list of candidate ShardRanges to merge
:param broker: a ContainerBroker
:param logger: a logger
:param source: string to log as source of shards
:return: a list of ShardRanges to actually merge
"""
# work-around for https://bugs.launchpad.net/swift/+bug/1980451
own_sr = broker.get_own_shard_range()
if own_sr.epoch is None:
return shards
to_merge = []
for shard in shards:
if shard['name'] == own_sr.name and not shard['epoch']:
shard_copy = dict(shard)
new_content = merge_shards(shard_copy, dict(own_sr))
if new_content and shard_copy['epoch'] is None:
logger.warning(
'Ignoring remote osr w/o epoch, own_sr: %r, remote_sr: %r,'
' source: %s', dict(own_sr), shard, source)
continue
to_merge.append(shard)
return to_merge
class ContainerReplicator(db_replicator.Replicator):
server_type = 'container'
brokerclass = ContainerBroker
@ -138,8 +168,10 @@ class ContainerReplicator(db_replicator.Replicator):
with Timeout(self.node_timeout):
response = http.replicate('get_shard_ranges')
if response and is_success(response.status):
broker.merge_shard_ranges(json.loads(
response.data.decode('ascii')))
shards = json.loads(response.data.decode('ascii'))
shards = check_merge_own_shard_range(
shards, broker, self.logger, '%s%s' % (http.host, http.path))
broker.merge_shard_ranges(shards)
def find_local_handoff_for_part(self, part):
"""
@ -394,11 +426,15 @@ class ContainerReplicatorRpc(db_replicator.ReplicatorRpc):
def _post_rsync_then_merge_hook(self, existing_broker, new_broker):
# Note the following hook will need to change to using a pointer and
# limit in the future.
new_broker.merge_shard_ranges(
existing_broker.get_all_shard_range_data())
shards = existing_broker.get_all_shard_range_data()
shards = check_merge_own_shard_range(
shards, new_broker, self.logger, 'rsync')
new_broker.merge_shard_ranges(shards)
def merge_shard_ranges(self, broker, args):
broker.merge_shard_ranges(args[0])
shards = check_merge_own_shard_range(
args[0], broker, self.logger, 'repl_req')
broker.merge_shard_ranges(shards)
return HTTPAccepted()
def get_shard_ranges(self, broker, args):

View File

@ -28,8 +28,8 @@ from swift.common.header_key_dict import HeaderKeyDict
from swift.common.internal_client import UnexpectedResponse
from swift.common.manager import Manager
from swift.common.memcached import MemcacheRing
from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \
quorum_size, config_true_value, Timestamp, md5, Namespace
from swift.common.utils import ShardRange, parse_db_filename, quorum_size, \
config_true_value, Timestamp, md5, Namespace
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \
SHARDED
from swift.container.sharder import CleavingContext, ContainerSharder
@ -244,9 +244,10 @@ class BaseTestContainerSharding(ReplProbeTest):
def get_db_file(self, part, node, account=None, container=None):
container_dir, container_hash = self.get_storage_dir(
part, node, account=account, container=container)
db_file = os.path.join(container_dir, container_hash + '.db')
self.assertTrue(get_db_files(db_file)) # sanity check
return db_file
for f in os.listdir(container_dir):
path = os.path.join(container_dir, f)
if path.endswith('.db'):
return path
def get_broker(self, part, node, account=None, container=None):
return ContainerBroker(
@ -259,10 +260,13 @@ class BaseTestContainerSharding(ReplProbeTest):
shard_part, shard_nodes[node_index], shard_range.account,
shard_range.container)
def categorize_container_dir_content(self, account=None, container=None):
def categorize_container_dir_content(self, account=None, container=None,
more_nodes=False):
account = account or self.brain.account
container = container or self.container_name
part, nodes = self.brain.ring.get_nodes(account, container)
if more_nodes:
nodes.extend(self.brain.ring.get_more_nodes(part))
storage_dirs = [
self.get_storage_dir(part, node, account=account,
container=container)[0]
@ -4050,6 +4054,229 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
broker.get_shard_usage()['object_count'])
self.assertFalse(broker.is_deleted())
def test_handoff_replication_does_not_cause_reset_epoch(self):
obj_names = self._make_object_names(100)
self.put_objects(obj_names)
client.post_container(self.url, self.admin_token, self.container_name,
headers={'X-Container-Sharding': 'on'})
# run replicators first time to get sync points set
self.replicators.once()
# sanity check: we don't have nearly enough objects for this to shard
# automatically
self.sharders_once_non_auto(
number=self.brain.node_numbers[0],
additional_args='--partitions=%s' % self.brain.part)
self.assert_container_state(self.brain.nodes[0], 'unsharded', 0)
self.assert_subprocess_success([
'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, self.brain.nodes[0]),
'find_and_replace', '50', '--enable',
'--minimum-shard-size', '40'])
self.assert_container_state(self.brain.nodes[0], 'unsharded', 2)
# "Run container-replicator to replicate them to other nodes."
self.replicators.once()
# "Run container-sharder on all nodes to shard the container."
self.sharders_once_non_auto(
additional_args='--partitions=%s' % self.brain.part)
# Everybody's settled
self.assert_container_state(self.brain.nodes[0], 'sharded', 2)
self.assert_container_state(self.brain.nodes[1], 'sharded', 2)
self.assert_container_state(self.brain.nodes[2], 'sharded', 2)
self.assert_container_listing(obj_names)
# now lets put the container again and make sure it lands on a handoff
self.brain.stop_primary_half()
self.brain.put_container(policy_index=int(self.policy))
self.brain.start_primary_half()
dir_content = self.categorize_container_dir_content(more_nodes=True)
# the handoff node is considered normal because it doesn't have an
# epoch
self.assertEqual(len(dir_content['normal_dbs']), 1)
self.assertEqual(len(dir_content['shard_dbs']), 3)
# let's replicate
self.replicators.once()
self.sharders_once_non_auto(
additional_args='--partitions=%s' % self.brain.part)
# let's now check the handoff broker it should have all the shards
handoff_broker = ContainerBroker(dir_content['normal_dbs'][0])
self.assertEqual(len(handoff_broker.get_shard_ranges()), 2)
handoff_osr = handoff_broker.get_own_shard_range(no_default=True)
self.assertIsNotNone(handoff_osr.epoch)
def test_force_replication_of_a_reset_own_shard_range(self):
obj_names = self._make_object_names(100)
self.put_objects(obj_names)
client.post_container(self.url, self.admin_token, self.container_name,
headers={'X-Container-Sharding': 'on'})
# run replicators first time to get sync points set
self.replicators.once()
# sanity check: we don't have nearly enough objects for this to shard
# automatically
self.sharders_once_non_auto(
number=self.brain.node_numbers[0],
additional_args='--partitions=%s' % self.brain.part)
self.assert_container_state(self.brain.nodes[0], 'unsharded', 0)
self.assert_subprocess_success([
'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, self.brain.nodes[0]),
'find_and_replace', '50', '--enable',
'--minimum-shard-size', '40'])
self.assert_container_state(self.brain.nodes[0], 'unsharded', 2)
# "Run container-replicator to replicate them to other nodes."
self.replicators.once()
# "Run container-sharder on all nodes to shard the container."
self.sharders_once_non_auto(
additional_args='--partitions=%s' % self.brain.part)
# Everybody's settled
self.assert_container_state(self.brain.nodes[0], 'sharded', 2)
self.assert_container_state(self.brain.nodes[1], 'sharded', 2)
self.assert_container_state(self.brain.nodes[2], 'sharded', 2)
self.assert_container_listing(obj_names)
# Lets delete a primary to simulate a new primary and force an
# own_shard_range reset.
new_primary = self.brain.nodes[2]
db_file = self.get_db_file(self.brain.part, new_primary)
os.remove(db_file)
# issue a new PUT to create the "new" primary container
self.brain.put_container(policy_index=int(self.policy))
# put a bunch of objects that should land in the primary so it'll be
# shardable (in case this makes any kind of difference).
self.put_objects(obj_names)
# The new primary isn't considered a shard_db because it hasn't
# sunk with the other primaries yet.
dir_content = self.categorize_container_dir_content()
self.assertEqual(len(dir_content['normal_dbs']), 1)
self.assertEqual(len(dir_content['shard_dbs']), 2)
# run the sharders incase this will trigger a reset osr
self.sharders_once_non_auto(
additional_args='--partitions=%s' % self.brain.part)
new_primary_broker = self.get_broker(self.brain.part, new_primary)
# Nope, still no default/reset osr
self.assertIsNone(
new_primary_broker.get_own_shard_range(no_default=True))
# Let's reset the osr by hand.
reset_osr = new_primary_broker.get_own_shard_range()
self.assertIsNone(reset_osr.epoch)
self.assertEqual(reset_osr.state, ShardRange.ACTIVE)
new_primary_broker.merge_shard_ranges(reset_osr)
# now let's replicate with the old primaries
self.replicators.once()
# Pull an old primary own_shard_range
dir_content = self.categorize_container_dir_content()
old_broker = ContainerBroker(dir_content['shard_dbs'][0])
old_osr = old_broker.get_own_shard_range()
new_primary_broker = ContainerBroker(dir_content['normal_dbs'][0])
new_osr = new_primary_broker.get_own_shard_range()
# This version stops replicating a remote non-epoch osr over a local
# epoched osr. But it doesn't do the other way. So it means the
# primary with non-epoched OSR get's stuck with it, if it is newer then
# the other epoched versions.
self.assertIsNotNone(old_osr.epoch)
self.assertEqual(old_osr.state, ShardRange.SHARDED)
self.assertIsNone(new_osr.epoch)
self.assertGreater(new_osr.timestamp, old_osr.timestamp)
def test_manage_shard_ranges_missing_epoch_no_false_positives(self):
# when one replica of a shard is sharding before the others, it's epoch
# is not None but it is normal for the other replica to replicate to it
# sending their own shard ranges with epoch=None until they also shard
obj_names = self._make_object_names(4)
self.put_objects(obj_names)
client.post_container(self.url, self.admin_token, self.container_name,
headers={'X-Container-Sharding': 'on'})
# run replicators first time to get sync points set, and get container
# sharded into 4 shards
self.replicators.once()
self.assert_subprocess_success([
'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, self.brain.nodes[0]),
'find_and_replace', '2', '--enable'])
ranges = self.assert_container_state(
self.brain.nodes[0], 'unsharded', 2)
# "Run container-replicator to replicate them to other nodes."
self.replicators.once()
# "Run container-sharder on all nodes to shard the container."
self.sharders_once_non_auto(
additional_args='--partitions=%s' % self.brain.part)
# Run them again, just so the shards themselves can pull down the
# latest sharded versions of their OSRs.
self.sharders_once_non_auto()
# Everybody's settled
self.assert_container_state(self.brain.nodes[0], 'sharded', 2)
self.assert_container_state(self.brain.nodes[1], 'sharded', 2)
ranges = self.assert_container_state(self.brain.nodes[2], 'sharded', 2)
self.assert_container_listing(obj_names)
# Now we need to shard a shard. A shard's OSR always exist and should
# have an epoch of None, so we should get some false positives.
# we'll shard ranges[1] which have a range of objs-0002 - MAX
shard_obj_names = ['objs-0001%d' % i for i in range(2)]
self.put_objects(shard_obj_names)
part, shard_node_numbers = self.get_part_and_node_numbers(ranges[1])
shard_nodes = self.brain.ring.get_part_nodes(part)
shard_broker = self.get_shard_broker(ranges[1], 0)
# set the account, container instance variables
shard_broker.get_info()
self.replicators.once()
self.assert_subprocess_success([
'swift-manage-shard-ranges',
shard_broker.db_file,
'find_and_replace', '2', '--enable'])
self.assert_container_state(
shard_nodes[0], 'unsharded', 2,
shard_broker.account, shard_broker.container, part)
# index 0 has an epoch now but 1 and 2 don't
for idx in 1, 2:
sb = self.get_shard_broker(ranges[1], idx)
osr = sb.get_own_shard_range(no_default=True)
self.assertIsNone(osr.epoch)
expected_false_positive_line_snippet = 'Ignoring remote osr w/o epoch:'
# run the replicator on the node with an epoch and it'll complain the
# others dont have an epoch and not set it.
replicator = self.run_custom_daemon(
ContainerReplicator, 'container-replicator',
shard_node_numbers[0], {})
warnings = replicator.logger.get_lines_for_level('warning')
self.assertFalse([w for w in warnings
if expected_false_positive_line_snippet in w])
# But it does send the new OSR with an epoch so the others should all
# have it now.
for idx in 1, 2:
sb = self.get_shard_broker(ranges[1], idx)
osr = sb.get_own_shard_range(no_default=True)
self.assertIsNotNone(osr.epoch)
def test_manage_shard_ranges_deleted_child_and_parent_gap(self):
# Test to produce a scenario where a parent container is stuck at
# sharding because of a gap in shard ranges. And the gap is caused by

View File

@ -36,7 +36,8 @@ import six
from swift.common.exceptions import LockTimeout
from swift.container.backend import ContainerBroker, \
update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \
COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES, sift_shard_ranges
COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES, sift_shard_ranges, \
merge_shards
from swift.common.db import DatabaseAlreadyExists, GreenDBConnection, \
TombstoneReclaimer, GreenDBCursor
from swift.common.request_helpers import get_reserved_name
@ -6976,11 +6977,180 @@ class TestUpdateNewItemFromExisting(unittest.TestCase):
class TestModuleFunctions(unittest.TestCase):
def setUp(self):
super(TestModuleFunctions, self).setUp()
self.ts_iter = make_timestamp_iter()
self.ts = [next(self.ts_iter).internal for _ in range(10)]
def test_merge_shards_existing_none(self):
data = dict(ShardRange('a/o', self.ts[1]), reported=True)
exp_data = dict(data)
self.assertTrue(merge_shards(data, None))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_lt(self):
existing = dict(ShardRange('a/o', self.ts[0]))
data = dict(ShardRange('a/o', self.ts[1]), reported=True)
exp_data = dict(data, reported=False)
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_gt(self):
existing = dict(ShardRange('a/o', self.ts[1]))
data = dict(ShardRange('a/o', self.ts[0]), reported=True)
exp_data = dict(data)
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
# existing timestamp trumps data state_timestamp
data = dict(ShardRange('a/o', self.ts[0]), state=ShardRange.ACTIVE,
state_timestamp=self.ts[2])
exp_data = dict(data)
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
# existing timestamp trumps data meta_timestamp
data = dict(ShardRange('a/o', self.ts[0]), state=ShardRange.ACTIVE,
meta_timestamp=self.ts[2])
exp_data = dict(data)
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_merge_reported(self):
existing = dict(ShardRange('a/o', self.ts[0]))
data = dict(ShardRange('a/o', self.ts[0]), reported=False)
exp_data = dict(data)
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
data = dict(ShardRange('a/o', self.ts[0]), reported=True)
exp_data = dict(data)
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_retain_bounds(self):
existing = dict(ShardRange('a/o', self.ts[0]))
data = dict(ShardRange('a/o', self.ts[0]), lower='l', upper='u')
exp_data = dict(data, lower='', upper='')
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_retain_deleted(self):
existing = dict(ShardRange('a/o', self.ts[0]))
data = dict(ShardRange('a/o', self.ts[0]), deleted=1)
exp_data = dict(data, deleted=0)
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_meta_ts_gte(self):
existing = dict(
ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[1],
object_count=1, bytes_used=2, tombstones=3))
data = dict(
ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[1],
object_count=10, bytes_used=20, tombstones=30))
exp_data = dict(data, object_count=1, bytes_used=2, tombstones=3)
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
existing = dict(
ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[2],
object_count=1, bytes_used=2, tombstones=3))
exp_data = dict(data, object_count=1, bytes_used=2, tombstones=3,
meta_timestamp=self.ts[2])
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_meta_ts_lt(self):
existing = dict(
ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[1],
object_count=1, bytes_used=2, tombstones=3,
epoch=self.ts[3]))
data = dict(
ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[2],
object_count=10, bytes_used=20, tombstones=30,
epoch=None))
exp_data = dict(data, epoch=self.ts[3])
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_state_ts_eq(self):
# data has more advanced state
existing = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.CREATED, epoch=self.ts[4]))
data = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.ACTIVE, epoch=self.ts[5]))
exp_data = dict(data)
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
# data has less advanced state
existing = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.CREATED, epoch=self.ts[4]))
data = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.FOUND, epoch=self.ts[5]))
exp_data = dict(data, state=ShardRange.CREATED, epoch=self.ts[4])
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_state_ts_gt(self):
existing = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[2],
state=ShardRange.CREATED, epoch=self.ts[4]))
data = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.ACTIVE, epoch=self.ts[5]))
exp_data = dict(data, state_timestamp=self.ts[2],
state=ShardRange.CREATED, epoch=self.ts[4])
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_state_ts_lt(self):
existing = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[0],
state=ShardRange.CREATED, epoch=self.ts[4]))
data = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.ACTIVE, epoch=self.ts[5]))
exp_data = dict(data)
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_epoch_reset(self):
# not sure if these scenarios are realistic, but we have seen epoch
# resets in prod
# same timestamps, data has more advanced state but no epoch
existing = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.CREATED, epoch=self.ts[4]))
data = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.ACTIVE, epoch=None))
exp_data = dict(data)
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
self.assertIsNone(exp_data['epoch'])
# data has more advanced state_timestamp but no epoch
existing = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.CREATED, epoch=self.ts[4]))
data = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[2],
state=ShardRange.FOUND, epoch=None))
exp_data = dict(data)
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
self.assertIsNone(exp_data['epoch'])
def test_sift_shard_ranges(self):
ts_iter = make_timestamp_iter()
existing_shards = {}
sr1 = dict(ShardRange('a/o', next(ts_iter).internal))
sr2 = dict(ShardRange('a/o2', next(ts_iter).internal))
sr1 = dict(ShardRange('a/o', next(self.ts_iter).internal))
sr2 = dict(ShardRange('a/o2', next(self.ts_iter).internal))
new_shard_ranges = [sr1, sr2]
# first empty existing shards will just add the shards
@ -6994,7 +7164,7 @@ class TestModuleFunctions(unittest.TestCase):
# if there is a newer version in the existing shards then it won't be
# added to to_add
existing_shards['a/o'] = dict(
ShardRange('a/o', next(ts_iter).internal))
ShardRange('a/o', next(self.ts_iter).internal))
to_add, to_delete = sift_shard_ranges(new_shard_ranges,
existing_shards)
self.assertEqual([sr2], list(to_add))
@ -7002,7 +7172,7 @@ class TestModuleFunctions(unittest.TestCase):
# But if a newer version is in new_shard_ranges then the old will be
# added to to_delete and new is added to to_add.
sr1['timestamp'] = next(ts_iter).internal
sr1['timestamp'] = next(self.ts_iter).internal
to_add, to_delete = sift_shard_ranges(new_shard_ranges,
existing_shards)
self.assertEqual(2, len(to_add))

View File

@ -32,6 +32,7 @@ from swift.container.reconciler import (
from swift.common.utils import Timestamp, encode_timestamps, ShardRange, \
get_db_files, make_db_file_path
from swift.common.storage_policy import POLICIES
from test import annotate_failure
from test.debug_logger import debug_logger
from test.unit.common import test_db_replicator
@ -1432,6 +1433,130 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
daemon.logger.get_lines_for_level('debug'))
daemon.logger.clear()
def test_sync_shard_ranges_merge_remote_osr(self):
def do_test(local_osr, remote_osr, exp_merge, exp_warning,
exp_rpc_warning):
put_timestamp = Timestamp.now().internal
# create "local" broker
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_timestamp, POLICIES.default.idx)
# create "remote" broker
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
bounds = (('', 'g'), ('g', 'r'), ('r', ''))
shard_ranges = [
ShardRange('.shards_a/sr-%s' % upper, Timestamp.now(), lower,
upper, i + 1, 10 * (i + 1))
for i, (lower, upper) in enumerate(bounds)
]
for db in (broker, remote_broker):
db.merge_shard_ranges(shard_ranges)
if local_osr:
broker.merge_shard_ranges(ShardRange(**dict(local_osr)))
if remote_osr:
remote_broker.merge_shard_ranges(
ShardRange(**dict(remote_osr)))
daemon = replicator.ContainerReplicator({}, logger=debug_logger())
part, remote_node = self._get_broker_part_node(remote_broker)
part, local_node = self._get_broker_part_node(broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(remote_node, broker, part, info)
self.assertTrue(success)
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
remote_info = self._get_broker(
'a', 'c', node_index=1).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
actual_osr = broker.get_own_shard_range(no_default=True)
actual_osr = dict(actual_osr) if actual_osr else actual_osr
if exp_merge:
exp_osr = (dict(remote_osr, meta_timestamp=mock.ANY)
if remote_osr else remote_osr)
else:
exp_osr = (dict(local_osr, meta_timestamp=mock.ANY)
if local_osr else local_osr)
self.assertEqual(exp_osr, actual_osr)
lines = daemon.logger.get_lines_for_level('warning')
if exp_warning:
self.assertEqual(len(lines), 1, lines)
self.assertIn("Ignoring remote osr w/o epoch", lines[0])
self.assertIn("own_sr: ", lines[0])
self.assertIn("'epoch': '%s'" % local_osr.epoch.normal,
lines[0])
self.assertIn("remote_sr: ", lines[0])
self.assertIn("'epoch': None", lines[0])
hash_ = os.path.splitext(os.path.basename(broker.db_file))[0]
url = "%s/%s/%s/%s" % (
remote_node['ip'], remote_node['device'], part, hash_)
self.assertIn("source: %s" % url, lines[0])
else:
self.assertFalse(lines)
lines = self.rpc.logger.get_lines_for_level('warning')
if exp_rpc_warning:
self.assertEqual(len(lines), 1, lines)
self.assertIn("Ignoring remote osr w/o epoch", lines[0])
self.assertIn("source: repl_req", lines[0])
else:
self.assertFalse(lines)
os.remove(broker.db_file)
os.remove(remote_broker.db_file)
return daemon
# we'll use other broker as a template to use the "default" osrs
other_broker = self._get_broker('a', 'c', node_index=2)
other_broker.initialize(Timestamp.now().internal, POLICIES.default.idx)
default_osr = other_broker.get_own_shard_range()
self.assertIsNone(default_osr.epoch)
osr_with_epoch = other_broker.get_own_shard_range()
osr_with_epoch.epoch = Timestamp.now()
osr_with_different_epoch = other_broker.get_own_shard_range()
osr_with_different_epoch.epoch = Timestamp.now()
default_osr_newer = ShardRange(**dict(default_osr))
default_osr_newer.timestamp = Timestamp.now()
# local_osr, remote_osr, exp_merge, exp_warning, exp_rpc_warning
tests = (
# First the None case, ie no osrs
(None, None, False, False, False),
# Default and not the other
(None, default_osr, True, False, False),
(default_osr, None, False, False, False),
(default_osr, default_osr, True, False, False),
(default_osr, None, False, False, False),
# With an epoch and no OSR is also fine
(None, osr_with_epoch, True, False, False),
(osr_with_epoch, None, False, False, False),
# even with the same or different epochs
(osr_with_epoch, osr_with_epoch, True, False, False),
(osr_with_epoch, osr_with_different_epoch, True, False, False),
# But if local does have an epoch but the remote doesn't: false
# positive, nothing will merge anyway, no warning.
(osr_with_epoch, default_osr, False, False, False),
# It's also OK if the remote has an epoch but not the local,
# this also works on the RPC side because merge_shards happen on
# to local then sends updated shards to the remote. So if the
# OSR on the remote is newer then the default the RPC side will
# actually get a merged OSR, ie get the remote one back.
(default_osr, osr_with_epoch, True, False, False),
# But if the local default is newer then the epoched remote side
# we'd get an error logged on the RPC side and the local is newer
# so wil fail to merge
(default_osr_newer, osr_with_epoch, False, False, True),
)
for i, params in enumerate(tests):
with annotate_failure((i, params)):
do_test(*params)
def test_sync_shard_ranges(self):
put_timestamp = Timestamp.now().internal
# create "local" broker