sharder: process deleted DBs

It is possible for some replicas of a container DB to be sharded, and
for the container to then be emptied and deleted, before another
replica of the DB has started sharding. Previously, the unsharded
replica would remain in the unsharded state because the sharder would
not process deleted DBs.

This patch modifies the sharder to always process deleted DBs; this
will result in the sharder making some additional DB queries for shard
ranges in order to determine whether any processing is required for
deleted DBs.

Auto-sharding will not act on deleted DBs.

Change-Id: Ia6ad92042aa9a51e3ddefec0b9b8acae44e9e9d7
This commit is contained in:
Alistair Coles 2022-07-07 18:29:06 +01:00
parent c3aa1ce66c
commit 38271142eb
3 changed files with 349 additions and 54 deletions

View File

@ -1004,12 +1004,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
headers=headers, contents=body)
except DirectClientException as err:
self.logger.warning(
'Failed to put shard ranges to %s:%s/%s: %s',
node['ip'], node['port'], node['device'], err.http_status)
'Failed to put shard ranges to %s:%s/%s %s/%s: %s',
node['ip'], node['port'], node['device'],
quote(account), quote(container), err.http_status)
except (Exception, Timeout) as err:
self.logger.exception(
'Failed to put shard ranges to %s:%s/%s: %s',
node['ip'], node['port'], node['device'], err)
'Failed to put shard ranges to %s:%s/%s %s/%s: %s',
node['ip'], node['port'], node['device'],
quote(account), quote(container), err)
else:
return True
return False
@ -2029,8 +2031,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
def _process_broker(self, broker, node, part):
broker.get_info() # make sure account/container are populated
state = broker.get_db_state()
self.logger.debug('Starting processing %s state %s',
quote(broker.path), state)
is_deleted = broker.is_deleted()
self.logger.debug('Starting processing %s state %s%s',
quote(broker.path), state,
' (deleted)' if is_deleted else '')
if not self._audit_container(broker):
return
@ -2038,13 +2042,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# now look and deal with misplaced objects.
self._move_misplaced_objects(broker)
if broker.is_deleted():
# This container is deleted so we can skip it. We still want
# deleted containers to go via misplaced items because they may
# have new objects sitting in them that may need to move.
return
is_leader = node['index'] == 0 and self.auto_shard
is_leader = node['index'] == 0 and self.auto_shard and not is_deleted
if state in (UNSHARDED, COLLAPSED):
if is_leader and broker.is_root_container():
# bootstrap sharding of root container
@ -2096,29 +2094,31 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
self.logger.debug('Remaining in sharding state %s',
quote(broker.path))
if state == SHARDED and broker.is_root_container():
# look for shrink stats
self._identify_shrinking_candidate(broker, node)
if is_leader:
self._find_and_enable_shrinking_candidates(broker)
self._find_and_enable_sharding_candidates(broker)
for shard_range in broker.get_shard_ranges(
states=[ShardRange.SHARDING]):
self._send_shard_ranges(
shard_range.account, shard_range.container,
[shard_range])
if not broker.is_deleted():
if state == SHARDED and broker.is_root_container():
# look for shrink stats
self._identify_shrinking_candidate(broker, node)
if is_leader:
self._find_and_enable_shrinking_candidates(broker)
self._find_and_enable_sharding_candidates(broker)
for shard_range in broker.get_shard_ranges(
states=[ShardRange.SHARDING]):
self._send_shard_ranges(
shard_range.account, shard_range.container,
[shard_range])
if not broker.is_root_container():
# Update the root container with this container's shard range
# info; do this even when sharded in case previous attempts
# failed; don't do this if there is no own shard range. When
# sharding a shard, this is when the root will see the new
# shards move to ACTIVE state and the sharded shard
# simultaneously become deleted.
self._update_root_container(broker)
if not broker.is_root_container():
# Update the root container with this container's shard range
# info; do this even when sharded in case previous attempts
# failed; don't do this if there is no own shard range. When
# sharding a shard, this is when the root will see the new
# shards move to ACTIVE state and the sharded shard
# simultaneously become deleted.
self._update_root_container(broker)
self.logger.debug('Finished processing %s state %s',
quote(broker.path), broker.get_db_state())
self.logger.debug('Finished processing %s state %s%s',
quote(broker.path), broker.get_db_state(),
' (deleted)' if is_deleted else '')
def _one_shard_cycle(self, devices_to_shard, partitions_to_shard):
"""

View File

@ -367,13 +367,17 @@ class BaseTestContainerSharding(ReplProbeTest):
self.fail('No shard sysmeta found in %s' % headers)
def assert_container_state(self, node, expected_state, num_shard_ranges,
account=None, container=None, part=None):
account=None, container=None, part=None,
override_deleted=False):
account = account or self.account
container = container or self.container_to_shard
part = part or self.brain.part
headers = {'X-Backend-Record-Type': 'shard'}
if override_deleted:
headers['x-backend-override-deleted'] = True
headers, shard_ranges = direct_client.direct_get_container(
node, part, account, container,
headers={'X-Backend-Record-Type': 'shard'})
headers=headers)
self.assertEqual(num_shard_ranges, len(shard_ranges))
self.assertIn('X-Backend-Sharding-State', headers)
self.assertEqual(
@ -401,12 +405,17 @@ class BaseTestContainerSharding(ReplProbeTest):
shard_range.account, shard_range.container)
return part, [n['id'] + 1 for n in nodes]
def run_sharders(self, shard_ranges):
def run_sharders(self, shard_ranges, exclude_partitions=None):
"""Run the sharder on partitions for given shard ranges."""
if not isinstance(shard_ranges, (list, tuple, set)):
shard_ranges = (shard_ranges,)
partitions = ','.join(str(self.get_part_and_node_numbers(sr)[0])
for sr in shard_ranges)
exclude_partitions = exclude_partitions or []
shard_parts = []
for sr in shard_ranges:
sr_part = self.get_part_and_node_numbers(sr)[0]
if sr_part not in exclude_partitions:
shard_parts.append(str(sr_part))
partitions = ','.join(shard_parts)
self.sharders.once(additional_args='--partitions=%s' % partitions)
def run_sharder_sequentially(self, shard_range=None):
@ -3586,3 +3595,187 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
self.assert_container_listing(
[obj_names[0], new_obj_name] + obj_names[1:])
def test_manage_shard_ranges_unsharded_deleted_root(self):
# verify that a deleted DB will still be sharded
# choose a node that will not be sharded initially
sharded_nodes = []
unsharded_node = None
for node in self.brain.nodes:
if self.brain.node_numbers[node['index']] \
in self.brain.handoff_numbers:
unsharded_node = node
else:
sharded_nodes.append(node)
# put some objects - not enough to trigger auto-sharding
obj_names = self._make_object_names(MIN_SHARD_CONTAINER_THRESHOLD - 1)
self.put_objects(obj_names)
# run replicators first time to get sync points set and commit updates
self.replicators.once()
# setup sharding...
self.assert_subprocess_success([
'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, sharded_nodes[0]),
'find_and_replace', '2', '--enable', '--minimum-shard-size', '1'])
# Run container-replicator to replicate shard ranges
self.container_replicators.once()
self.assert_container_state(sharded_nodes[0], 'unsharded', 2)
self.assert_container_state(sharded_nodes[1], 'unsharded', 2)
self.assert_container_state(unsharded_node, 'unsharded', 2)
# Run container-sharder to shard the 2 primary replicas that did
# receive the object PUTs
for num in self.brain.primary_numbers:
self.sharders_once(
number=num,
additional_args='--partitions=%s' % self.brain.part)
# delete the objects - the proxy's will have cached container info with
# out-of-date db_state=unsharded, so updates go to the root DBs
self.delete_objects(obj_names)
# deal with DELETE's being misplaced in root db's...
for num in self.brain.primary_numbers:
self.sharders_once(
number=num,
additional_args='--partitions=%s' % self.brain.part)
self.assert_container_state(sharded_nodes[0], 'sharded', 2)
self.assert_container_state(sharded_nodes[1], 'sharded', 2)
shard_ranges = self.assert_container_state(
unsharded_node, 'unsharded', 2)
# get root stats updated - but avoid sharding the remaining root DB
self.run_sharders(shard_ranges, exclude_partitions=[self.brain.part])
self.assert_container_listing([])
# delete the empty container
client.delete_container(self.url, self.admin_token,
self.container_name)
# sanity check - unsharded DB is deleted
broker = self.get_broker(self.brain.part, unsharded_node,
self.account, self.container_name)
self.assertEqual(UNSHARDED, broker.get_db_state())
self.assertTrue(broker.is_deleted())
self.assertEqual(0, broker.get_info()['object_count'])
self.assertEqual(0, broker.get_shard_usage()['object_count'])
# now shard the final DB
for num in self.brain.handoff_numbers:
self.sharders_once(
number=num,
additional_args='--partitions=%s' % self.brain.part)
# all DBs should now be sharded and still deleted
for node in self.brain.nodes:
with annotate_failure(
'node %s in %s'
% (node['index'], [n['index'] for n in self.brain.nodes])):
self.assert_container_state(node, 'sharded', 2,
override_deleted=True)
broker = self.get_broker(self.brain.part, node,
self.account, self.container_name)
self.assertEqual(SHARDED, broker.get_db_state())
self.assertEqual(0, broker.get_info()['object_count'])
self.assertEqual(0,
broker.get_shard_usage()['object_count'])
self.assertTrue(broker.is_deleted())
def test_manage_shard_ranges_unsharded_deleted_root_gets_undeleted(self):
# verify that an apparently deleted DB (no object rows in root db) will
# still be sharded and also become undeleted when objects are
# discovered in the shards
# choose a node that will not be sharded initially
sharded_nodes = []
unsharded_node = None
for node in self.brain.nodes:
if self.brain.node_numbers[node['index']] \
in self.brain.handoff_numbers:
unsharded_node = node
else:
sharded_nodes.append(node)
# put some objects, but only to 2 replicas - not enough to trigger
# auto-sharding
self.brain.stop_handoff_half()
obj_names = self._make_object_names(MIN_SHARD_CONTAINER_THRESHOLD - 1)
self.put_objects(obj_names)
# run replicators first time to get sync points set and commit puts
self.replicators.once()
# setup sharding...
self.assert_subprocess_success([
'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, sharded_nodes[0]),
'find_and_replace', '2', '--enable', '--minimum-shard-size', '1'])
# Run container-replicator to replicate shard ranges - object rows will
# not be sync'd now there are shard ranges
for num in self.brain.primary_numbers:
self.container_replicators.once(number=num)
self.assert_container_state(sharded_nodes[0], 'unsharded', 2)
self.assert_container_state(sharded_nodes[1], 'unsharded', 2)
# revive the stopped node
self.brain.start_handoff_half()
self.assert_container_state(unsharded_node, 'unsharded', 0)
# delete the empty replica
direct_client.direct_delete_container(
unsharded_node, self.brain.part, self.account,
self.container_name)
# Run container-sharder to shard the 2 primary replicas that did
# receive the object PUTs
for num in self.brain.primary_numbers:
self.sharders_once(
number=num,
additional_args='--partitions=%s' % self.brain.part)
self.assert_container_state(sharded_nodes[0], 'sharded', 2)
self.assert_container_state(sharded_nodes[1], 'sharded', 2)
# the sharder syncs shard ranges ...
self.assert_container_state(unsharded_node, 'unsharded', 2,
override_deleted=True)
# sanity check - unsharded DB is empty and deleted
broker = self.get_broker(self.brain.part, unsharded_node,
self.account, self.container_name)
self.assertEqual(UNSHARDED, broker.get_db_state())
self.assertEqual(0, broker.get_info()['object_count'])
# the shard ranges do have object count but are in CREATED state so
# not reported in shard usage...
self.assertEqual(0, broker.get_shard_usage()['object_count'])
self.assertTrue(broker.is_deleted())
# now shard the final DB
for num in self.brain.handoff_numbers:
self.sharders_once(
number=num,
additional_args='--partitions=%s' % self.brain.part)
shard_ranges = self.assert_container_state(
unsharded_node, 'sharded', 2, override_deleted=True)
# and get roots updated and sync'd
self.container_replicators.once()
self.run_sharders(shard_ranges, exclude_partitions=[self.brain.part])
# all DBs should now be sharded and NOT deleted
for node in self.brain.nodes:
with annotate_failure(
'node %s in %s'
% (node['index'], [n['index'] for n in self.brain.nodes])):
broker = self.get_broker(self.brain.part, node,
self.account, self.container_name)
self.assertEqual(SHARDED, broker.get_db_state())
self.assertEqual(3, broker.get_info()['object_count'])
self.assertEqual(3,
broker.get_shard_usage()['object_count'])
self.assertFalse(broker.is_deleted())

View File

@ -68,7 +68,7 @@ class BaseTestSharder(unittest.TestCase):
[dict(sr) for sr in actual])
def _make_broker(self, account='a', container='c', epoch=None,
device='sda', part=0, hash_=None):
device='sda', part=0, hash_=None, put_timestamp=None):
hash_ = hash_ or md5(
container.encode('utf-8'), usedforsecurity=False).hexdigest()
datadir = os.path.join(
@ -81,7 +81,7 @@ class BaseTestSharder(unittest.TestCase):
broker = ContainerBroker(
db_file, account=account, container=container,
logger=self.logger)
broker.initialize()
broker.initialize(put_timestamp=put_timestamp)
return broker
def _make_old_style_sharding_broker(self, account='a', container='c',
@ -4784,37 +4784,62 @@ class TestSharder(BaseTestSharder):
self.assertFalse(broker.logger.get_lines_for_level('error'))
broker.logger.clear()
def _check_process_broker_sharding_no_others(self, state):
def _check_process_broker_sharding_no_others(self, start_state, deleted):
# verify that when existing own_shard_range has given state and there
# are other shard ranges then the sharding process will begin
broker = self._make_broker(hash_='hash%s' % state)
# are other shard ranges then the sharding process will complete
broker = self._make_broker(hash_='hash%s%s' % (start_state, deleted))
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
'index': 0}
own_sr = broker.get_own_shard_range()
self.assertTrue(own_sr.update_state(state))
epoch = Timestamp.now()
self.assertTrue(own_sr.update_state(start_state))
epoch = next(self.ts_iter)
own_sr.epoch = epoch
shard_ranges = self._make_shard_ranges((('', 'm'), ('m', '')))
broker.merge_shard_ranges([own_sr] + shard_ranges)
if deleted:
broker.delete_db(next(self.ts_iter).internal)
with self._mock_sharder() as sharder:
with mock.patch.object(
sharder, '_create_shard_containers', return_value=0):
with mock_timestamp_now() as now:
sharder, '_send_shard_ranges', return_value=True):
with mock_timestamp_now_with_iter(self.ts_iter):
sharder._audit_container = mock.MagicMock()
sharder._process_broker(broker, node, 99)
final_own_sr = broker.get_own_shard_range(no_default=True)
self.assertEqual(dict(own_sr, meta_timestamp=now),
dict(final_own_sr))
self.assertEqual(SHARDING, broker.get_db_state())
final_own_sr = broker.get_own_shard_range(no_default=True)
self.assertEqual(SHARDED, broker.get_db_state())
self.assertEqual(epoch.normal, parse_db_filename(broker.db_file)[1])
self.assertFalse(broker.logger.get_lines_for_level('warning'))
self.assertFalse(broker.logger.get_lines_for_level('error'))
# self.assertEqual(deleted, broker.is_deleted())
return own_sr, final_own_sr
def test_process_broker_sharding_with_own_shard_range_no_others(self):
self._check_process_broker_sharding_no_others(ShardRange.SHARDING)
self._check_process_broker_sharding_no_others(ShardRange.SHRINKING)
own_sr, final_own_sr = self._check_process_broker_sharding_no_others(
ShardRange.SHARDING, False)
exp_own_sr = dict(own_sr, state=ShardRange.SHARDED,
meta_timestamp=mock.ANY)
self.assertEqual(exp_own_sr, dict(final_own_sr))
# verify that deleted DBs will be sharded
own_sr, final_own_sr = self._check_process_broker_sharding_no_others(
ShardRange.SHARDING, True)
exp_own_sr = dict(own_sr, state=ShardRange.SHARDED,
meta_timestamp=mock.ANY)
self.assertEqual(exp_own_sr, dict(final_own_sr))
own_sr, final_own_sr = self._check_process_broker_sharding_no_others(
ShardRange.SHRINKING, False)
exp_own_sr = dict(own_sr, state=ShardRange.SHRUNK,
meta_timestamp=mock.ANY)
self.assertEqual(exp_own_sr, dict(final_own_sr))
# verify that deleted DBs will be shrunk
own_sr, final_own_sr = self._check_process_broker_sharding_no_others(
ShardRange.SHRINKING, True)
exp_own_sr = dict(own_sr, state=ShardRange.SHRUNK,
meta_timestamp=mock.ANY)
self.assertEqual(exp_own_sr, dict(final_own_sr))
def test_process_broker_not_sharding_others(self):
# verify that sharding process will not start when own shard range is
@ -4907,6 +4932,83 @@ class TestSharder(BaseTestSharder):
self._check_process_broker_sharding_others(ShardRange.SHRINKING)
self._check_process_broker_sharding_others(ShardRange.SHARDED)
def test_process_broker_leader_auto_shard(self):
# verify conditions for acting as auto-shard leader
broker = self._make_broker(put_timestamp=next(self.ts_iter).internal)
objects = [
['obj%3d' % i, self.ts_encoded(), i, 'text/plain',
'etag%s' % i, 0] for i in range(10)]
for obj in objects:
broker.put_object(*obj)
self.assertEqual(10, broker.get_info()['object_count'])
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
'index': 0}
def do_process(conf):
with self._mock_sharder(conf) as sharder:
with mock_timestamp_now():
# we're not testing rest of the process here so prevent any
# attempt to progress shard range states
sharder._create_shard_containers = lambda *args: 0
sharder._process_broker(broker, node, 99)
# auto shard disabled
conf = {'shard_container_threshold': 10,
'rows_per_shard': 5,
'shrink_threshold': 1,
'auto_shard': False}
do_process(conf)
self.assertEqual(UNSHARDED, broker.get_db_state())
own_sr = broker.get_own_shard_range(no_default=True)
self.assertIsNone(own_sr)
# auto shard enabled, not node 0
conf['auto_shard'] = True
node['index'] = 1
do_process(conf)
self.assertEqual(UNSHARDED, broker.get_db_state())
own_sr = broker.get_own_shard_range(no_default=True)
self.assertIsNone(own_sr)
# auto shard enabled, node 0 -> start sharding
node['index'] = 0
do_process(conf)
self.assertEqual(SHARDING, broker.get_db_state())
own_sr = broker.get_own_shard_range(no_default=True)
self.assertIsNotNone(own_sr)
self.assertEqual(ShardRange.SHARDING, own_sr.state)
self.assertEqual(own_sr.epoch.normal,
parse_db_filename(broker.db_file)[1])
self.assertEqual(2, len(broker.get_shard_ranges()))
def test_process_broker_leader_auto_shard_deleted_db(self):
# verify no auto-shard leader if broker is deleted
conf = {'shard_container_threshold': 10,
'rows_per_shard': 5,
'shrink_threshold': 1,
'auto_shard': True}
broker = self._make_broker(put_timestamp=next(self.ts_iter).internal)
broker.delete_db(next(self.ts_iter).internal)
self.assertTrue(broker.is_deleted()) # sanity check
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
'index': 0}
with self._mock_sharder(conf) as sharder:
with mock_timestamp_now():
with mock.patch.object(
sharder, '_find_and_enable_sharding_candidates'
) as mock_find_and_enable:
sharder._process_broker(broker, node, 99)
self.assertEqual(UNSHARDED, broker.get_db_state())
own_sr = broker.get_own_shard_range(no_default=True)
self.assertIsNone(own_sr)
# this is the only concrete assertion that verifies the leader actions
# are not taken; no shard ranges would actually be found for an empty
# deleted db so there's no other way to differentiate from an undeleted
# db being processed...
mock_find_and_enable.assert_not_called()
def check_shard_ranges_sent(self, broker, expected_sent):
bodies = []
servers = []