Sharder: Fall back to local device in get_shard_broker

If the sharder is processing a node that has 0 weight, especially for
all the devices on the node, the `find_local_handoff_for_part` can
fail because there will be no local hand off devices available as it
uses the replica2part2dev_id to find a device.  However, a 0 weighted
device won't appear in the replica2part2dev table.

This patch extends `find_local_handoff_for_part`, if it fails to find
a node from the ring it'll fall back to a local device identified by
the `_local_device_ids` that is built up when the replicator or
sharder was identifing local devices. This uses the ring.devs, so does
include 0 weighted devices.  This allows the sharder to find a
location to write the shard_broker in a handoff location while
sharding.

Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Change-Id: Ic38698e9ca0397770c7362229baef1101a72788f
This commit is contained in:
Matthew Oliver 2022-07-21 12:32:27 +10:00 committed by Alistair Coles
parent d7a931191b
commit c4e00eb89f
7 changed files with 193 additions and 64 deletions

View File

@ -196,7 +196,7 @@ class Replicator(Daemon):
self.cpool = GreenPool(size=concurrency)
swift_dir = conf.get('swift_dir', '/etc/swift')
self.ring = ring.Ring(swift_dir, ring_name=self.server_type)
self._local_device_ids = set()
self._local_device_ids = {}
self.per_diff = int(conf.get('per_diff', 1000))
self.max_diffs = int(conf.get('max_diffs') or 100)
self.interval = float(conf.get('interval') or
@ -795,7 +795,7 @@ class Replicator(Daemon):
'These modes are not intended for normal '
'operation; use these options with care.')
self._local_device_ids = set()
self._local_device_ids = {}
found_local = False
for node in self.ring.devs:
if node and is_local_device(ips, self.port,
@ -822,7 +822,7 @@ class Replicator(Daemon):
time.time() - self.reclaim_age)
datadir = os.path.join(self.root, node['device'], self.datadir)
if os.path.isdir(datadir):
self._local_device_ids.add(node['id'])
self._local_device_ids[node['id']] = node
part_filt = self._partition_dir_filter(
node['id'], partitions_to_replicate)
dirs.append((datadir, node['id'], part_filt))

View File

@ -14,10 +14,10 @@
# limitations under the License.
import os
import itertools
import json
from collections import defaultdict
from eventlet import Timeout
from random import choice
from swift.container.sync_store import ContainerSyncStore
from swift.container.backend import ContainerBroker, DATADIR, SHARDED
@ -27,7 +27,6 @@ from swift.container.reconciler import (
from swift.common import db_replicator
from swift.common.storage_policy import POLICIES
from swift.common.swob import HTTPOk, HTTPAccepted
from swift.common.exceptions import DeviceUnavailable
from swift.common.http import is_success
from swift.common.utils import Timestamp, majority_size, get_db_files
@ -144,18 +143,37 @@ class ContainerReplicator(db_replicator.Replicator):
def find_local_handoff_for_part(self, part):
"""
Look through devices in the ring for the first handoff device that was
identified during job creation as available on this node.
Find a device in the ring that is on this node on which to place a
partition. Preference is given to a device that is a primary location
for the partition. If no such device is found then a local device with
weight is chosen, and failing that any local device.
:param part: a partition
:returns: a node entry from the ring
"""
nodes = self.ring.get_part_nodes(part)
more_nodes = self.ring.get_more_nodes(part)
if not self._local_device_ids:
raise RuntimeError('Cannot find local handoff; no local devices')
for node in itertools.chain(nodes, more_nodes):
for node in self.ring.get_part_nodes(part):
if node['id'] in self._local_device_ids:
return node
return None
# don't attempt to minimize handoff depth: just choose any local
# device, but start by only picking a device with a weight, just in
# case some devices are being drained...
local_devs_with_weight = [
dev for dev in self._local_device_ids.values()
if dev.get('weight', 0)]
if local_devs_with_weight:
return choice(local_devs_with_weight)
# we have to return something, so choose any local device..
node = choice(list(self._local_device_ids.values()))
self.logger.warning(
"Could not find a non-zero weight device for handoff partition "
"%d, falling back device %s" %
(part, node['device']))
return node
def get_reconciler_broker(self, timestamp):
"""
@ -173,10 +191,6 @@ class ContainerReplicator(db_replicator.Replicator):
account = MISPLACED_OBJECTS_ACCOUNT
part = self.ring.get_part(account, container)
node = self.find_local_handoff_for_part(part)
if not node:
raise DeviceUnavailable(
'No mounted devices found suitable to Handoff reconciler '
'container %s in partition %s' % (container, part))
broker = ContainerBroker.create_broker(
os.path.join(self.root, node['device']), part, account, container,
logger=self.logger, put_timestamp=timestamp,
@ -198,8 +212,9 @@ class ContainerReplicator(db_replicator.Replicator):
try:
reconciler = self.get_reconciler_broker(container)
except DeviceUnavailable as e:
self.logger.warning('DeviceUnavailable: %s', e)
except Exception:
self.logger.exception('Failed to get reconciler broker for '
'container %s', container)
return False
self.logger.debug('Adding %d objects to the reconciler at %s',
len(item_list), reconciler.db_file)

View File

@ -31,7 +31,6 @@ from swift.common import internal_client
from swift.common.constraints import check_drive, AUTO_CREATE_ACCOUNT_PREFIX
from swift.common.direct_client import (direct_put_container,
DirectClientException)
from swift.common.exceptions import DeviceUnavailable
from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER
from swift.common.ring.utils import is_local_device
from swift.common.swob import str_to_wsgi
@ -1089,19 +1088,18 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
:param shard_range: a :class:`~swift.common.utils.ShardRange`
:param root_path: the path of the shard's root container
:param policy_index: the storage policy index
:returns: a tuple of ``(part, broker, node_id)`` where ``part`` is the
shard container's partition, ``broker`` is an instance of
:returns: a tuple of ``(part, broker, node_id, put_timestamp)`` where
``part`` is the shard container's partition,
``broker`` is an instance of
:class:`~swift.container.backend.ContainerBroker`,
``node_id`` is the id of the selected node.
``node_id`` is the id of the selected node,
``put_timestamp`` is the put_timestamp if the broker needed to
be initialized.
"""
part = self.ring.get_part(shard_range.account, shard_range.container)
node = self.find_local_handoff_for_part(part)
put_timestamp = Timestamp.now().internal
if not node:
raise DeviceUnavailable(
'No mounted devices found suitable for creating shard broker '
'for %s in partition %s' % (quote(shard_range.name), part))
put_timestamp = Timestamp.now().internal
shard_broker = ContainerBroker.create_broker(
os.path.join(self.root, node['device']), part, shard_range.account,
shard_range.container, epoch=shard_range.epoch,
@ -1830,18 +1828,12 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
quote(shard_range.name), shard_range)
self._increment_stat('cleaved', 'attempted')
policy_index = broker.storage_policy_index
try:
shard_part, shard_broker, node_id, put_timestamp = \
self._get_shard_broker(shard_range, broker.root_path,
policy_index)
except DeviceUnavailable as duex:
self.logger.warning(str(duex))
self._increment_stat('cleaved', 'failure', statsd=True)
return CLEAVE_FAILED
else:
return self._cleave_shard_broker(
broker, cleaving_context, shard_range, own_shard_range,
shard_broker, put_timestamp, shard_part, node_id)
shard_part, shard_broker, node_id, put_timestamp = \
self._get_shard_broker(shard_range, broker.root_path,
policy_index)
return self._cleave_shard_broker(
broker, cleaving_context, shard_range, own_shard_range,
shard_broker, put_timestamp, shard_part, node_id)
def _cleave(self, broker):
# Returns True if misplaced objects have been moved and the entire
@ -2184,7 +2176,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
self.logger.info('(Override partitions: %s)',
', '.join(str(p) for p in partitions_to_shard))
self._zero_stats()
self._local_device_ids = set()
self._local_device_ids = {}
dirs = []
self.ips = whataremyips(self.bind_ip)
for node in self.ring.devs:
@ -2195,7 +2187,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if os.path.isdir(datadir):
# Populate self._local_device_ids so we can find devices for
# shard containers later
self._local_device_ids.add(node['id'])
self._local_device_ids[node['id']] = node
if node['device'] not in devices_to_shard:
continue
part_filt = self._partition_dir_filter(

View File

@ -279,6 +279,7 @@ class FakeRing(Ring):
'zone': x % 3,
'region': x % 2,
'id': x,
'weight': 1,
}
self.add_node(dev)

View File

@ -2213,10 +2213,10 @@ class TestReplicatorSync(unittest.TestCase):
for node in self._ring.devs:
daemon = self._run_once(node)
if node['device'] == 'sdc':
self.assertEqual(daemon._local_device_ids, set())
self.assertEqual(daemon._local_device_ids, {})
else:
self.assertEqual(daemon._local_device_ids,
set([node['id']]))
{node['id']: node})
def test_clean_up_after_deleted_brokers(self):
broker = self._get_broker('a', 'c', node_index=0)

View File

@ -2635,6 +2635,75 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
os.path.basename(rsync_calls[0][1]))
self.assertFalse(rsync_calls[1:])
@mock.patch('swift.common.ring.ring.Ring.get_part_nodes', return_value=[])
def test_find_local_handoff_for_part(self, mock_part_nodes):
with mock.patch(
'swift.common.db_replicator.ring.Ring',
return_value=self._ring):
daemon = replicator.ContainerReplicator({}, logger=self.logger)
# First let's assume we find a primary node
ring_node1, ring_node2, ring_node3 = daemon.ring.devs[-3:]
mock_part_nodes.return_value = [ring_node1, ring_node2]
daemon._local_device_ids = {ring_node1['id']: ring_node1,
ring_node3['id']: ring_node3}
node = daemon.find_local_handoff_for_part(0)
self.assertEqual(node['id'], ring_node1['id'])
# And if we can't find one from the primaries get *some* local device
mock_part_nodes.return_value = []
daemon._local_device_ids = {ring_node3['id']: ring_node3}
node = daemon.find_local_handoff_for_part(0)
self.assertEqual(node['id'], ring_node3['id'])
# if there are more then 1 local_dev_id it'll randomly pick one, but
# not a zero-weight device
ring_node3['weight'] = 0
selected_node_ids = set()
local_dev_ids = {dev['id']: dev for dev in daemon.ring.devs[-3:]}
daemon._local_device_ids = local_dev_ids
for _ in range(15):
node = daemon.find_local_handoff_for_part(0)
self.assertIn(node['id'], local_dev_ids)
selected_node_ids.add(node['id'])
if len(selected_node_ids) == 3:
break # unexpected
self.assertEqual(len(selected_node_ids), 2)
self.assertEqual([1, 1], [local_dev_ids[dev_id]['weight']
for dev_id in selected_node_ids])
warning_lines = self.logger.get_lines_for_level('warning')
self.assertFalse(warning_lines)
# ...unless all devices have zero-weight
ring_node3['weight'] = 0
ring_node2['weight'] = 0
selected_node_ids = set()
local_dev_ids = {dev['id']: dev for dev in daemon.ring.devs[-2:]}
daemon._local_device_ids = local_dev_ids
for _ in range(15):
self.logger.clear()
node = daemon.find_local_handoff_for_part(0)
self.assertIn(node['id'], local_dev_ids)
selected_node_ids.add(node['id'])
if len(selected_node_ids) == 2:
break # expected
self.assertEqual(len(selected_node_ids), 2)
self.assertEqual([0, 0], [local_dev_ids[dev_id]['weight']
for dev_id in selected_node_ids])
warning_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_lines), warning_lines)
self.assertIn(
'Could not find a non-zero weight device for handoff partition',
warning_lines[0])
# If there are also no local_dev_ids, then we'll get the RuntimeError
daemon._local_device_ids = {}
with self.assertRaises(RuntimeError) as dev_err:
daemon.find_local_handoff_for_part(0)
expected_error_string = 'Cannot find local handoff; no local devices'
self.assertEqual(str(dev_err.exception), expected_error_string)
if __name__ == '__main__':
unittest.main()

View File

@ -682,14 +682,14 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker'
) as mock_process_broker:
sharder._local_device_ids = {'stale_node_id'}
sharder._local_device_ids = {'stale_node_id': {}}
sharder._one_shard_cycle(Everything(), Everything())
lines = sharder.logger.get_lines_for_level('warning')
expected = 'Skipping %s as it is not mounted' % \
unmounted_dev['device']
self.assertIn(expected, lines[0])
self.assertEqual(device_ids, sharder._local_device_ids)
self.assertEqual(device_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(2, mock_process_broker.call_count)
processed_paths = [call[0][0].path
for call in mock_process_broker.call_args_list]
@ -741,14 +741,14 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker', side_effect=mock_processing
) as mock_process_broker:
sharder._local_device_ids = {'stale_node_id'}
sharder._local_device_ids = {'stale_node_id': {}}
sharder._one_shard_cycle(Everything(), Everything())
lines = sharder.logger.get_lines_for_level('warning')
expected = 'Skipping %s as it is not mounted' % \
unmounted_dev['device']
self.assertIn(expected, lines[0])
self.assertEqual(device_ids, sharder._local_device_ids)
self.assertEqual(device_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(3, mock_process_broker.call_count)
processed_paths = [call[0][0].path
for call in mock_process_broker.call_args_list]
@ -799,10 +799,10 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker'
) as mock_process_broker:
sharder._local_device_ids = {999}
sharder._local_device_ids = {999: {}}
sharder._one_shard_cycle(Everything(), Everything())
self.assertEqual(device_ids, sharder._local_device_ids)
self.assertEqual(device_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(3, mock_process_broker.call_count)
processed_paths = [call[0][0].path
for call in mock_process_broker.call_args_list]
@ -826,7 +826,7 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker'
) as mock_process_broker:
sharder._local_device_ids = {999}
sharder._local_device_ids = {999: {}}
sharder._one_shard_cycle(Everything(), Everything())
expected_in_progress_stats = {
@ -866,7 +866,7 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), \
mock.patch.object(sharder, '_process_broker') \
as mock_process_broker, mock_timestamp_now(ts_now):
sharder._local_device_ids = {999}
sharder._local_device_ids = {999: {}}
sharder._one_shard_cycle(Everything(), Everything())
expected_in_progress_stats = {
@ -896,7 +896,7 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), \
mock.patch.object(sharder, '_process_broker') \
as mock_process_broker, mock_timestamp_now(ts_now):
sharder._local_device_ids = {999}
sharder._local_device_ids = {999: {}}
sharder._one_shard_cycle(Everything(), Everything())
self._assert_stats(
expected_in_progress_stats, sharder, 'sharding_in_progress')
@ -908,7 +908,7 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), \
mock.patch.object(sharder, '_process_broker') \
as mock_process_broker, mock_timestamp_now(ts_now):
sharder._local_device_ids = {999}
sharder._local_device_ids = {999: {}}
sharder._one_shard_cycle(Everything(), Everything())
expected_in_progress_stats = {
@ -1005,7 +1005,8 @@ class TestSharder(BaseTestSharder):
'swift.common.db_replicator.ring.Ring',
return_value=fake_ring):
sharder = ContainerSharder(conf, logger=self.logger)
sharder._local_device_ids = {0, 1, 2}
sharder._local_device_ids = {dev['id']: dev
for dev in fake_ring.devs}
sharder._replicate_object = mock.MagicMock(
return_value=(True, [True] * sharder.ring.replica_count))
yield sharder
@ -5558,9 +5559,10 @@ class TestSharder(BaseTestSharder):
self.assertEqual([], self.logger.get_lines_for_level('warning'))
# advance time
with mock.patch('swift.container.sharder.time.time') as fake_time, \
self._mock_sharder() as sharder:
fake_time.return_value = 6048000 + float(delete_ts)
future_time = 6048000 + float(delete_ts)
with mock.patch(
'swift.container.sharder.time.time',
return_value=future_time), self._mock_sharder() as sharder:
sharder._audit_container(broker)
message = 'Reclaimable db stuck waiting for shrinking: %s (%s)' % (
broker.db_file, broker.path)
@ -5574,9 +5576,9 @@ class TestSharder(BaseTestSharder):
broker.merge_shard_ranges(shard_ranges)
# no more warning
with mock.patch('swift.container.sharder.time.time') as fake_time, \
self._mock_sharder() as sharder:
fake_time.return_value = 6048000 + float(delete_ts)
with mock.patch(
'swift.container.sharder.time.time',
return_value=future_time), self._mock_sharder() as sharder:
sharder._audit_container(broker)
self.assertEqual([], self.logger.get_lines_for_level('warning'))
@ -6049,7 +6051,7 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once()
self.assertEqual(dev_ids, set(sharder._local_device_ids))
self.assertEqual(dev_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(set(container_data),
set((call[0][0].path, call[0][1]['id'], call[0][2])
for call in mock_process_broker.call_args_list))
@ -6061,7 +6063,7 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once(partitions='0')
self.assertEqual(dev_ids, set(sharder._local_device_ids))
self.assertEqual(dev_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(set([container_data[0]]),
set((call[0][0].path, call[0][1]['id'], call[0][2])
for call in mock_process_broker.call_args_list))
@ -6073,7 +6075,7 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once(partitions='2,0')
self.assertEqual(dev_ids, set(sharder._local_device_ids))
self.assertEqual(dev_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(set([container_data[0], container_data[2]]),
set((call[0][0].path, call[0][1]['id'], call[0][2])
for call in mock_process_broker.call_args_list))
@ -6085,7 +6087,7 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once(partitions='2,0', devices='sdc')
self.assertEqual(dev_ids, set(sharder._local_device_ids))
self.assertEqual(dev_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(set([container_data[2]]),
set((call[0][0].path, call[0][1]['id'], call[0][2])
for call in mock_process_broker.call_args_list))
@ -6097,7 +6099,7 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once(devices='sdb,sdc')
self.assertEqual(dev_ids, set(sharder._local_device_ids))
self.assertEqual(dev_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(set(container_data[1:]),
set((call[0][0].path, call[0][1]['id'], call[0][2])
for call in mock_process_broker.call_args_list))
@ -6403,6 +6405,56 @@ class TestSharder(BaseTestSharder):
self._assert_recon_stats(expected_shrinking_candidates_data,
sharder, 'shrinking_candidates')
@mock.patch('swift.common.ring.ring.Ring.get_part_nodes', return_value=[])
@mock.patch('swift.common.ring.ring.Ring.get_more_nodes', return_value=[])
def test_get_shard_broker_no_local_handoff_for_part(
self, mock_part_nodes, mock_more_nodes):
broker = self._make_broker()
broker.enable_sharding(Timestamp.now())
shard_bounds = (('', 'd'), ('d', 'x'), ('x', ''))
shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.CREATED)
broker.merge_shard_ranges(shard_ranges)
self.assertTrue(broker.set_sharding_state())
# first, let's assume there local_handoff_for_part fails because the
# node we're on is at zero weight for all disks. So it wont appear in
# the replica2part2dev table, meaning we wont get a node back.
# in this case, we'll fall back to one of our own devices which we
# determine from the ring.devs not the replica2part2dev table.
with self._mock_sharder() as sharder:
local_dev_ids = {dev['id']: dev for dev in sharder.ring.devs[-1:]}
sharder._local_device_ids = local_dev_ids
part, shard_broker, node_id, _ = sharder._get_shard_broker(
shard_ranges[0], broker.root_path, 0)
self.assertIn(node_id, local_dev_ids)
# if there are more then 1 local_dev_id it'll randomly pick one
selected_node_ids = set()
for _ in range(10):
with self._mock_sharder() as sharder:
local_dev_ids = {dev['id']: dev
for dev in sharder.ring.devs[-2:]}
sharder._local_device_ids = local_dev_ids
part, shard_broker, node_id, _ = sharder._get_shard_broker(
shard_ranges[0], broker.root_path, 0)
self.assertIn(node_id, local_dev_ids)
selected_node_ids.add(node_id)
if len(selected_node_ids) == 2:
break
self.assertEqual(len(selected_node_ids), 2)
# If there are also no local_dev_ids, then we'll get the RuntimeError
with self._mock_sharder() as sharder:
sharder._local_device_ids = {}
with self.assertRaises(RuntimeError) as dev_err:
sharder._get_shard_broker(shard_ranges[0], broker.root_path, 0)
expected_error_string = 'Cannot find local handoff; no local devices'
self.assertEqual(str(dev_err.exception), expected_error_string)
class TestCleavingContext(BaseTestSharder):
def test_init(self):