fixed _check_node() in the container sharder
Previously, _check_node() wouldn't catch the raise ValueError when a drive was unmounted. Therefore the error would bubble up, uncaught, and stop the shard cycle. The practical effect is that an unmounted drive on a node would prevent sharding for happening. This patch updates _check_node() to properly use the check_drive() method. Furthermore, the _check_node() return value has been modified to be more similar to what check_drive() actually returns. This should help prevent similar errors from being introduced in the future. Closes-Bug: #1806500 Change-Id: I3da9b5b120a5980e77ef5c4dc8fa1697e462ce0d
This commit is contained in:
parent
333ae3086f
commit
c26d67efcf
@ -493,18 +493,22 @@ class ContainerSharder(ContainerReplicator):
|
||||
self._report_stats()
|
||||
|
||||
def _check_node(self, node):
|
||||
"""
|
||||
:return: The path to the device, if the node is mounted.
|
||||
Returns False if the node is unmounted.
|
||||
"""
|
||||
if not node:
|
||||
return False
|
||||
if not is_local_device(self.ips, self.port,
|
||||
node['replication_ip'],
|
||||
node['replication_port']):
|
||||
return False
|
||||
if not check_drive(self.root, node['device'],
|
||||
self.mount_check):
|
||||
try:
|
||||
return check_drive(self.root, node['device'], self.mount_check)
|
||||
except ValueError:
|
||||
self.logger.warning(
|
||||
'Skipping %(device)s as it is not mounted' % node)
|
||||
return False
|
||||
return True
|
||||
|
||||
def _fetch_shard_ranges(self, broker, newest=False, params=None,
|
||||
include_deleted=False):
|
||||
@ -1485,9 +1489,10 @@ class ContainerSharder(ContainerReplicator):
|
||||
dirs = []
|
||||
self.ips = whataremyips(bind_ip=self.bind_ip)
|
||||
for node in self.ring.devs:
|
||||
if not self._check_node(node):
|
||||
device_path = self._check_node(node)
|
||||
if not device_path:
|
||||
continue
|
||||
datadir = os.path.join(self.root, node['device'], self.datadir)
|
||||
datadir = os.path.join(device_path, self.datadir)
|
||||
if os.path.isdir(datadir):
|
||||
# Populate self._local_device_ids so we can find devices for
|
||||
# shard containers later
|
||||
|
@ -245,6 +245,10 @@ class FakeRing(Ring):
|
||||
self._device_char_iter = itertools.cycle(
|
||||
['sd%s' % chr(ord('a') + x) for x in range(26)])
|
||||
|
||||
def add_node(self, dev):
|
||||
# round trip through json to ensure unicode like real rings
|
||||
self._devs.append(json.loads(json.dumps(dev)))
|
||||
|
||||
def set_replicas(self, replicas):
|
||||
self.replicas = replicas
|
||||
self._devs = []
|
||||
@ -252,8 +256,7 @@ class FakeRing(Ring):
|
||||
for x in range(self.replicas):
|
||||
ip = '10.0.0.%s' % x
|
||||
port = self._base_port + x
|
||||
# round trip through json to ensure unicode like real rings
|
||||
self._devs.append(json.loads(json.dumps({
|
||||
dev = {
|
||||
'ip': ip,
|
||||
'replication_ip': ip,
|
||||
'port': port,
|
||||
@ -262,7 +265,8 @@ class FakeRing(Ring):
|
||||
'zone': x % 3,
|
||||
'region': x % 2,
|
||||
'id': x,
|
||||
})))
|
||||
}
|
||||
self.add_node(dev)
|
||||
|
||||
@property
|
||||
def replica_count(self):
|
||||
|
@ -293,7 +293,8 @@ class TestSharder(BaseTestSharder):
|
||||
conf = {'recon_cache_path': self.tempdir,
|
||||
'devices': self.tempdir}
|
||||
with self._mock_sharder(conf) as sharder:
|
||||
sharder._check_node = lambda *args: True
|
||||
sharder._check_node = lambda node: os.path.join(
|
||||
sharder.conf['devices'], node['device'])
|
||||
sharder.logger.clear()
|
||||
brokers = []
|
||||
for container in ('c1', 'c2'):
|
||||
@ -468,12 +469,29 @@ class TestSharder(BaseTestSharder):
|
||||
conf = {'recon_cache_path': self.tempdir,
|
||||
'devices': self.tempdir,
|
||||
'shard_container_threshold': 9}
|
||||
with self._mock_sharder(conf) as sharder:
|
||||
sharder._check_node = lambda *args: True
|
||||
|
||||
def fake_ismount(path):
|
||||
# unmounted_dev is defined from .get_more_nodes() below
|
||||
unmounted_path = os.path.join(conf['devices'],
|
||||
unmounted_dev['device'])
|
||||
if path == unmounted_path:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
with self._mock_sharder(conf) as sharder, \
|
||||
mock.patch('swift.common.utils.ismount', fake_ismount), \
|
||||
mock.patch('swift.container.sharder.is_local_device',
|
||||
return_value=True):
|
||||
sharder.reported = time.time()
|
||||
sharder.logger = debug_logger()
|
||||
brokers = []
|
||||
device_ids = set(range(3))
|
||||
device_ids = set(d['id'] for d in sharder.ring.devs)
|
||||
|
||||
sharder.ring.max_more_nodes = 1
|
||||
unmounted_dev = next(sharder.ring.get_more_nodes(1))
|
||||
unmounted_dev['device'] = 'xxxx'
|
||||
sharder.ring.add_node(unmounted_dev)
|
||||
for device_id in device_ids:
|
||||
brokers.append(self._make_broker(
|
||||
container='c%s' % device_id, hash_='c%shash' % device_id,
|
||||
@ -494,6 +512,10 @@ class TestSharder(BaseTestSharder):
|
||||
sharder._local_device_ids = {'stale_node_id'}
|
||||
sharder._one_shard_cycle(Everything(), Everything())
|
||||
|
||||
lines = sharder.logger.get_lines_for_level('warning')
|
||||
expected = 'Skipping %s as it is not mounted' % \
|
||||
unmounted_dev['device']
|
||||
self.assertIn(expected, lines[0])
|
||||
self.assertEqual(device_ids, sharder._local_device_ids)
|
||||
self.assertEqual(2, mock_process_broker.call_count)
|
||||
processed_paths = [call[0][0].path
|
||||
@ -542,12 +564,17 @@ class TestSharder(BaseTestSharder):
|
||||
"for %s" % broker.path)
|
||||
|
||||
# check exceptions are handled
|
||||
sharder.logger.clear()
|
||||
with mock.patch('eventlet.sleep'), mock.patch.object(
|
||||
sharder, '_process_broker', side_effect=mock_processing
|
||||
) as mock_process_broker:
|
||||
sharder._local_device_ids = {'stale_node_id'}
|
||||
sharder._one_shard_cycle(Everything(), Everything())
|
||||
|
||||
lines = sharder.logger.get_lines_for_level('warning')
|
||||
expected = 'Skipping %s as it is not mounted' % \
|
||||
unmounted_dev['device']
|
||||
self.assertIn(expected, lines[0])
|
||||
self.assertEqual(device_ids, sharder._local_device_ids)
|
||||
self.assertEqual(3, mock_process_broker.call_count)
|
||||
processed_paths = [call[0][0].path
|
||||
@ -697,6 +724,35 @@ class TestSharder(BaseTestSharder):
|
||||
expected_dict.pop('meta_timestamp'))
|
||||
self.assertEqual(expected_dict, actual_dict)
|
||||
|
||||
def test_check_node(self):
|
||||
node = {
|
||||
'replication_ip': '127.0.0.1',
|
||||
'replication_port': 5000,
|
||||
'device': 'd100',
|
||||
}
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder.mount_check = True
|
||||
sharder.ips = ['127.0.0.1']
|
||||
sharder.port = 5000
|
||||
|
||||
# normal behavior
|
||||
with mock.patch(
|
||||
'swift.common.utils.ismount',
|
||||
lambda *args: True):
|
||||
r = sharder._check_node(node)
|
||||
expected = os.path.join(sharder.conf['devices'], node['device'])
|
||||
self.assertEqual(r, expected)
|
||||
|
||||
# test with an unmounted drive
|
||||
with mock.patch(
|
||||
'swift.common.utils.ismount',
|
||||
lambda *args: False):
|
||||
r = sharder._check_node(node)
|
||||
self.assertEqual(r, False)
|
||||
lines = sharder.logger.get_lines_for_level('warning')
|
||||
expected = 'Skipping %s as it is not mounted' % node['device']
|
||||
self.assertIn(expected, lines[0])
|
||||
|
||||
def test_fetch_shard_ranges_unexpected_response(self):
|
||||
broker = self._make_broker()
|
||||
exc = internal_client.UnexpectedResponse(
|
||||
@ -4368,7 +4424,8 @@ class TestSharder(BaseTestSharder):
|
||||
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder.ring = ring
|
||||
sharder._check_node = lambda *args: True
|
||||
sharder._check_node = lambda node: os.path.join(
|
||||
sharder.conf['devices'], node['device'])
|
||||
with mock.patch.object(
|
||||
sharder, '_process_broker') as mock_process_broker:
|
||||
sharder.run_once()
|
||||
@ -4379,7 +4436,8 @@ class TestSharder(BaseTestSharder):
|
||||
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder.ring = ring
|
||||
sharder._check_node = lambda *args: True
|
||||
sharder._check_node = lambda node: os.path.join(
|
||||
sharder.conf['devices'], node['device'])
|
||||
with mock.patch.object(
|
||||
sharder, '_process_broker') as mock_process_broker:
|
||||
sharder.run_once(partitions='0')
|
||||
@ -4390,7 +4448,8 @@ class TestSharder(BaseTestSharder):
|
||||
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder.ring = ring
|
||||
sharder._check_node = lambda *args: True
|
||||
sharder._check_node = lambda node: os.path.join(
|
||||
sharder.conf['devices'], node['device'])
|
||||
with mock.patch.object(
|
||||
sharder, '_process_broker') as mock_process_broker:
|
||||
sharder.run_once(partitions='2,0')
|
||||
@ -4401,7 +4460,8 @@ class TestSharder(BaseTestSharder):
|
||||
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder.ring = ring
|
||||
sharder._check_node = lambda *args: True
|
||||
sharder._check_node = lambda node: os.path.join(
|
||||
sharder.conf['devices'], node['device'])
|
||||
with mock.patch.object(
|
||||
sharder, '_process_broker') as mock_process_broker:
|
||||
sharder.run_once(partitions='2,0', devices='sdc')
|
||||
@ -4412,7 +4472,8 @@ class TestSharder(BaseTestSharder):
|
||||
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder.ring = ring
|
||||
sharder._check_node = lambda *args: True
|
||||
sharder._check_node = lambda node: os.path.join(
|
||||
sharder.conf['devices'], node['device'])
|
||||
with mock.patch.object(
|
||||
sharder, '_process_broker') as mock_process_broker:
|
||||
sharder.run_once(devices='sdb,sdc')
|
||||
|
Loading…
Reference in New Issue
Block a user