reconstructor: include partially reverted handoffs in handoffs_remaining

For a reconstructor revert job, if sync'd to sufficient other nodes,
the handoff partition is considered done and handoffs_remaining is not
incremented. With the new max_objects_per_revert option [1], a ssync
job may appear to be complete but not all objects have yet been
reverted, so handoffs remaining should be incremented.

[1] Related-Change: If81760c80a4692212e3774e73af5ce37c02e8aff
Change-Id: I59572f75b9b0ba331369eb7358932943b7935ff0
This commit is contained in:
Alistair Coles 2021-12-03 14:35:09 +00:00
parent 8ee631ccee
commit 1b3879e0da
4 changed files with 78 additions and 27 deletions

View File

@ -1088,19 +1088,22 @@ class ObjectReconstructor(Daemon):
with df_mgr.partition_lock(job['device'], job['policy'],
job['partition'], name='replication',
timeout=0.2):
limited_by_max_objects = False
for node in job['sync_to']:
node['backend_index'] = job['policy'].get_backend_index(
node['index'])
success, in_sync_objs = ssync_sender(
sender = ssync_sender(
self, node, job, job['suffixes'],
include_non_durable=True,
max_objects=self.max_objects_per_revert)()
max_objects=self.max_objects_per_revert)
success, in_sync_objs = sender()
limited_by_max_objects |= sender.limited_by_max_objects
if success:
syncd_with += 1
reverted_objs.update(in_sync_objs)
if syncd_with >= len(job['sync_to']):
self.delete_reverted_objs(job, reverted_objs)
else:
if syncd_with < len(job['sync_to']) or limited_by_max_objects:
self.handoffs_remaining += 1
except PartitionLockTimeout:
self.logger.info("Unable to lock handoff partition %d for revert "

View File

@ -155,6 +155,7 @@ class Sender(object):
self.remote_check_objs = remote_check_objs
self.include_non_durable = include_non_durable
self.max_objects = max_objects
self.limited_by_max_objects = False
def __call__(self):
"""
@ -285,6 +286,7 @@ class Sender(object):
Full documentation of this can be found at
:py:meth:`.Receiver.missing_check`.
"""
self.limited_by_max_objects = False
available_map = {}
send_map = {}
# First, send our list.
@ -307,6 +309,7 @@ class Sender(object):
self.remote_check_objs, hash_gen)
nlines = 0
nbytes = 0
object_hash = None
for object_hash, timestamps in hash_gen:
available_map[object_hash] = timestamps
with exceptions.MessageTimeout(
@ -321,16 +324,17 @@ class Sender(object):
nlines += 1
nbytes += len(msg)
if 0 < self.max_objects <= nlines:
for _ in hash_gen:
# only log truncation if there were more hashes to come...
self.daemon.logger.info(
'ssync missing_check truncated after %d objects: '
'device: %s, part: %s, policy: %s, last object hash: '
'%s', nlines, self.job['device'],
self.job['partition'], int(self.job['policy']),
object_hash)
break
break
for _ in hash_gen:
# only log truncation if there were more hashes to come...
self.limited_by_max_objects = True
self.daemon.logger.info(
'ssync missing_check truncated after %d objects: '
'device: %s, part: %s, policy: %s, last object hash: '
'%s', nlines, self.job['device'],
self.job['partition'], int(self.job['policy']),
object_hash)
break
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'missing_check end'):
msg = b':MISSING_CHECK: END\r\n'

View File

@ -52,21 +52,33 @@ from test.unit import (patch_policies, mocked_http_conn, FabricatedRing,
from test.unit.obj.common import write_diskfile
@contextmanager
def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs):
def fake_ssync(daemon, node, job, suffixes, **kwargs):
class FakeSsyncSender(object):
def __init__(self, daemon, node, job, suffixes, ssync_calls=None,
response_callback=None, **kwargs):
if ssync_calls is not None:
call_args = {'node': node, 'job': job, 'suffixes': suffixes}
call_args.update(kwargs)
ssync_calls.append(call_args)
self.response_callback = response_callback
self.node = node
self.job = job
self.suffixes = suffixes
self.limited_by_max_objects = False
def fake_call():
if response_callback:
response = response_callback(node, job, suffixes)
else:
response = True, {}
return response
return fake_call
def __call__(self):
if self.response_callback:
response = self.response_callback(
self.node, self.job, self.suffixes)
else:
response = True, {}
return response
@contextmanager
def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs):
def fake_ssync(daemon, node, job, suffixes, **kwargs):
return FakeSsyncSender(daemon, node, job, suffixes, ssync_calls,
response_callback, **kwargs)
with mock.patch('swift.obj.reconstructor.ssync_sender', fake_ssync):
yield fake_ssync
@ -1126,13 +1138,16 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
frag_index=self.job.get('frag_index'),
frag_prefs=frag_prefs)
self.available_map = {}
self.limited_by_max_objects = False
nlines = 0
for hash_, timestamps in hash_gen:
self.available_map[hash_] = timestamps
nlines += 1
if 0 < max_objects <= nlines:
break
for _ in hash_gen:
self.limited_by_max_objects = True
break
context['available_map'] = self.available_map
ssync_calls.append(context)
self.success = True
@ -1459,7 +1474,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
dev['replication_port'] ==
self.reconstructor.port]
partition = (local_devs[0]['id'] + 1) % 3
# 2 durable objects
# three durable objects
df_0 = self._create_diskfile(
object_name='zero', part=partition)
datafile_0 = df_0.manager.cleanup_ondisk_files(
@ -1480,7 +1495,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
actual_datafiles = [df for df in datafiles if os.path.exists(df)]
self.assertEqual(datafiles, actual_datafiles)
# only two object will be sync'd and purged...
# only two objects will be sync'd and purged...
ssync_calls = []
conf = dict(self.conf, max_objects_per_revert=2, handoffs_only=True)
self.reconstructor = object_reconstructor.ObjectReconstructor(
@ -1493,18 +1508,25 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.assertEqual(2, context.get('max_objects'))
actual_datafiles = [df for df in datafiles if os.path.exists(df)]
self.assertEqual(1, len(actual_datafiles), actual_datafiles)
# handoff still reported as remaining
self.assertEqual(1, self.reconstructor.handoffs_remaining)
# ...until next reconstructor run which will sync and purge the last
# object
# object; max_objects_per_revert == actual number of objects
ssync_calls = []
conf = dict(self.conf, max_objects_per_revert=1, handoffs_only=True)
self.reconstructor = object_reconstructor.ObjectReconstructor(
conf, logger=self.logger)
with mock.patch('swift.obj.reconstructor.ssync_sender',
self._make_fake_ssync(ssync_calls)):
self.reconstructor.reconstruct()
for context in ssync_calls:
self.assertEqual(REVERT, context['job']['job_type'])
self.assertEqual(2, context.get('max_objects'))
self.assertEqual(1, context.get('max_objects'))
actual_datafiles = [df for df in datafiles if os.path.exists(df)]
self.assertEqual([], actual_datafiles)
# handoff is no longer remaining
self.assertEqual(0, self.reconstructor.handoffs_remaining)
def test_no_delete_failed_revert(self):
# test will only process revert jobs

View File

@ -794,11 +794,13 @@ class TestSender(BaseTest):
connection = FakeConnection()
response = FakeResponse()
self.sender.daemon.node_timeout = 0.01
self.assertFalse(self.sender.limited_by_max_objects)
with mock.patch.object(connection, 'send',
side_effect=lambda *args: eventlet.sleep(1)):
with self.assertRaises(exceptions.MessageTimeout) as cm:
self.sender.missing_check(connection, response)
self.assertIn('0.01 seconds: missing_check start', str(cm.exception))
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_timeout_send_line(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -815,6 +817,7 @@ class TestSender(BaseTest):
max_objects=0)
self.sender.daemon.node_timeout = 0.01
self.sender.df_mgr.yield_hashes = yield_hashes
self.assertFalse(self.sender.limited_by_max_objects)
sleeps = [0, 0, 1]
with mock.patch.object(
connection, 'send',
@ -823,6 +826,7 @@ class TestSender(BaseTest):
self.sender.missing_check(connection, response)
self.assertIn('0.01 seconds: missing_check send line: '
'1 lines (57 bytes) sent', str(cm.exception))
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_has_empty_suffixes(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -846,6 +850,7 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.assertFalse(self.sender.limited_by_max_objects)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
@ -854,6 +859,7 @@ class TestSender(BaseTest):
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(send_map, {})
self.assertEqual(available_map, {})
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_has_suffixes(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -893,6 +899,7 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.assertFalse(self.sender.limited_by_max_objects)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
@ -916,6 +923,7 @@ class TestSender(BaseTest):
ts_ctype=Timestamp(1380144474.44448)))]
self.assertEqual(available_map, dict(candidates))
self.assertEqual([], self.daemon_logger.get_lines_for_level('info'))
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_max_objects_less_than_actual_objects(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -957,6 +965,7 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.assertFalse(self.sender.limited_by_max_objects)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
@ -978,6 +987,7 @@ class TestSender(BaseTest):
'part: 9, policy: 0, last object hash: '
'9d41d8cd98f00b204e9800998ecf0def'],
self.daemon_logger.get_lines_for_level('info'))
self.assertTrue(self.sender.limited_by_max_objects)
def test_missing_check_max_objects_exactly_actual_objects(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -1012,6 +1022,7 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.assertFalse(self.sender.limited_by_max_objects)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
@ -1030,6 +1041,7 @@ class TestSender(BaseTest):
self.assertEqual(available_map, dict(candidates))
# nothing logged re: truncation
self.assertEqual([], self.daemon_logger.get_lines_for_level('info'))
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_far_end_disconnect(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -1052,6 +1064,7 @@ class TestSender(BaseTest):
}
self.sender.suffixes = ['abc']
self.sender.df_mgr.yield_hashes = yield_hashes
self.assertFalse(self.sender.limited_by_max_objects)
response = FakeResponse(chunk_body='\r\n')
exc = None
try:
@ -1064,6 +1077,7 @@ class TestSender(BaseTest):
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_far_end_disconnect2(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -1086,6 +1100,7 @@ class TestSender(BaseTest):
}
self.sender.suffixes = ['abc']
self.sender.df_mgr.yield_hashes = yield_hashes
self.assertFalse(self.sender.limited_by_max_objects)
response = FakeResponse(
chunk_body=':MISSING_CHECK: START\r\n')
exc = None
@ -1099,6 +1114,7 @@ class TestSender(BaseTest):
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_far_end_unexpected(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -1121,6 +1137,7 @@ class TestSender(BaseTest):
}
self.sender.suffixes = ['abc']
self.sender.df_mgr.yield_hashes = yield_hashes
self.assertFalse(self.sender.limited_by_max_objects)
response = FakeResponse(chunk_body='OH HAI\r\n')
exc = None
try:
@ -1133,6 +1150,7 @@ class TestSender(BaseTest):
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_send_map(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -1160,6 +1178,7 @@ class TestSender(BaseTest):
'0123abc dm\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.assertFalse(self.sender.limited_by_max_objects)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
@ -1171,6 +1190,7 @@ class TestSender(BaseTest):
self.assertEqual(available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})]))
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_extra_line_parts(self):
# check that sender tolerates extra parts in missing check
@ -1200,12 +1220,14 @@ class TestSender(BaseTest):
'0123abc d extra response parts\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.assertFalse(self.sender.limited_by_max_objects)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(send_map, {'0123abc': {'data': True}})
self.assertEqual(available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})]))
self.assertFalse(self.sender.limited_by_max_objects)
def test_updates_timeout(self):
connection = FakeConnection()