Merge "reconstructor: restrict max objects per revert job"
This commit is contained in:
commit
8dc679ea0c
@ -466,6 +466,63 @@ handoffs_only false The handoffs_only mode op
|
||||
temporary use and should be disabled
|
||||
as soon as the emergency situation
|
||||
has been resolved.
|
||||
rebuild_handoff_node_count 2 The default strategy for unmounted
|
||||
drives will stage
|
||||
rebuilt data on a
|
||||
handoff node until
|
||||
updated rings are
|
||||
deployed. Because
|
||||
fragments are rebuilt on
|
||||
offset handoffs based on
|
||||
fragment index and the
|
||||
proxy limits how deep it
|
||||
will search for EC frags
|
||||
we restrict how many
|
||||
nodes we'll try.
|
||||
Setting to 0 will
|
||||
disable rebuilds to
|
||||
handoffs and only
|
||||
rebuild fragments for
|
||||
unmounted devices to
|
||||
mounted primaries after
|
||||
a ring change. Setting
|
||||
to -1 means "no limit".
|
||||
max_objects_per_revert 0 By default the reconstructor
|
||||
attempts to revert all
|
||||
objects from handoff
|
||||
partitions in a single
|
||||
batch using a single
|
||||
SSYNC request. In
|
||||
exceptional
|
||||
circumstances
|
||||
max_objects_per_revert
|
||||
can be used to
|
||||
temporarily limit the
|
||||
number of objects
|
||||
reverted by each
|
||||
reconstructor revert
|
||||
type job. If more than
|
||||
max_objects_per_revert
|
||||
are available in a
|
||||
sender's handoff
|
||||
partition, the remaining
|
||||
objects will remain in
|
||||
the handoff partition
|
||||
and will not be reverted
|
||||
until the next time the
|
||||
reconstructor visits
|
||||
that handoff partition
|
||||
i.e. with this option
|
||||
set, a single cycle of
|
||||
the reconstructor may
|
||||
not completely revert
|
||||
all handoff partitions.
|
||||
The option has no effect
|
||||
on reconstructor sync
|
||||
type jobs between
|
||||
primary partitions. A
|
||||
value of 0 (the default)
|
||||
means there is no limit.
|
||||
node_timeout DEFAULT or 10 Request timeout to external
|
||||
services. The value used is the value
|
||||
set in this section, or the value set
|
||||
|
@ -358,6 +358,7 @@ use = egg:swift#recon
|
||||
# lockup_timeout = 1800
|
||||
# ring_check_interval = 15.0
|
||||
# recon_cache_path = /var/cache/swift
|
||||
#
|
||||
# The handoffs_only mode option is for special case emergency situations during
|
||||
# rebalance such as disk full in the cluster. This option SHOULD NOT BE
|
||||
# CHANGED, except for extreme situations. When handoffs_only mode is enabled
|
||||
@ -380,6 +381,19 @@ use = egg:swift#recon
|
||||
# Setting to -1 means "no limit".
|
||||
# rebuild_handoff_node_count = 2
|
||||
#
|
||||
# By default the reconstructor attempts to revert all objects from handoff
|
||||
# partitions in a single batch using a single SSYNC request. In exceptional
|
||||
# circumstances max_objects_per_revert can be used to temporarily limit the
|
||||
# number of objects reverted by each reconstructor revert type job. If more
|
||||
# than max_objects_per_revert are available in a sender's handoff partition,
|
||||
# the remaining objects will remain in the handoff partition and will not be
|
||||
# reverted until the next time the reconstructor visits that handoff partition
|
||||
# i.e. with this option set, a single cycle of the reconstructor may not
|
||||
# completely revert all handoff partitions. The option has no effect on
|
||||
# reconstructor sync type jobs between primary partitions. A value of 0 (the
|
||||
# default) means there is no limit.
|
||||
# max_objects_per_revert = 0
|
||||
#
|
||||
# You can set scheduling priority of processes. Niceness values range from -20
|
||||
# (most favorable to the process) to 19 (least favorable to the process).
|
||||
# nice_priority =
|
||||
|
@ -240,7 +240,8 @@ class ObjectReconstructor(Daemon):
|
||||
conf.get('reclaim_age', DEFAULT_RECLAIM_AGE)))
|
||||
self.request_node_count = config_request_node_count_value(
|
||||
conf.get('request_node_count', '2 * replicas'))
|
||||
|
||||
self.max_objects_per_revert = non_negative_int(
|
||||
conf.get('max_objects_per_revert', 0))
|
||||
# When upgrading from liberasurecode<=1.5.0, you may want to continue
|
||||
# writing legacy CRCs until all nodes are upgraded and capabale of
|
||||
# reading fragments with zlib CRCs.
|
||||
@ -1057,9 +1058,13 @@ class ObjectReconstructor(Daemon):
|
||||
if not suffixes:
|
||||
continue
|
||||
|
||||
# ssync any out-of-sync suffixes with the remote node
|
||||
# ssync any out-of-sync suffixes with the remote node; do not limit
|
||||
# max_objects - we need to check them all because, unlike a revert
|
||||
# job, we don't purge any objects so start with the same set each
|
||||
# cycle
|
||||
success, _ = ssync_sender(
|
||||
self, node, job, suffixes, include_non_durable=False)()
|
||||
self, node, job, suffixes, include_non_durable=False,
|
||||
max_objects=0)()
|
||||
# update stats for this attempt
|
||||
self.suffix_sync += len(suffixes)
|
||||
self.logger.update_stats('suffix.syncs', len(suffixes))
|
||||
@ -1087,7 +1092,8 @@ class ObjectReconstructor(Daemon):
|
||||
node['index'])
|
||||
success, in_sync_objs = ssync_sender(
|
||||
self, node, job, job['suffixes'],
|
||||
include_non_durable=True)()
|
||||
include_non_durable=True,
|
||||
max_objects=self.max_objects_per_revert)()
|
||||
if success:
|
||||
syncd_with += 1
|
||||
reverted_objs.update(in_sync_objs)
|
||||
|
@ -144,7 +144,7 @@ class Sender(object):
|
||||
"""
|
||||
|
||||
def __init__(self, daemon, node, job, suffixes, remote_check_objs=None,
|
||||
include_non_durable=False):
|
||||
include_non_durable=False, max_objects=0):
|
||||
self.daemon = daemon
|
||||
self.df_mgr = self.daemon._df_router[job['policy']]
|
||||
self.node = node
|
||||
@ -154,6 +154,7 @@ class Sender(object):
|
||||
# make sure those objects exist or not in remote.
|
||||
self.remote_check_objs = remote_check_objs
|
||||
self.include_non_durable = include_non_durable
|
||||
self.max_objects = max_objects
|
||||
|
||||
def __call__(self):
|
||||
"""
|
||||
@ -319,6 +320,17 @@ class Sender(object):
|
||||
sleep() # Gives a chance for other greenthreads to run
|
||||
nlines += 1
|
||||
nbytes += len(msg)
|
||||
if 0 < self.max_objects <= nlines:
|
||||
for _ in hash_gen:
|
||||
# only log truncation if there were more hashes to come...
|
||||
self.daemon.logger.info(
|
||||
'ssync missing_check truncated after %d objects: '
|
||||
'device: %s, part: %s, policy: %s, last object hash: '
|
||||
'%s', nlines, self.job['device'],
|
||||
self.job['partition'], int(self.job['policy']),
|
||||
object_hash)
|
||||
break
|
||||
break
|
||||
with exceptions.MessageTimeout(
|
||||
self.daemon.node_timeout, 'missing_check end'):
|
||||
msg = b':MISSING_CHECK: END\r\n'
|
||||
|
@ -1108,12 +1108,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
"""
|
||||
class _fake_ssync(object):
|
||||
def __init__(self, daemon, node, job, suffixes,
|
||||
include_non_durable=False, **kwargs):
|
||||
include_non_durable=False, max_objects=0,
|
||||
**kwargs):
|
||||
# capture context and generate an available_map of objs
|
||||
context = {}
|
||||
context['node'] = node
|
||||
context['job'] = job
|
||||
context['suffixes'] = suffixes
|
||||
context['max_objects'] = max_objects
|
||||
self.suffixes = suffixes
|
||||
self.daemon = daemon
|
||||
self.job = job
|
||||
@ -1124,8 +1126,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
frag_index=self.job.get('frag_index'),
|
||||
frag_prefs=frag_prefs)
|
||||
self.available_map = {}
|
||||
nlines = 0
|
||||
for hash_, timestamps in hash_gen:
|
||||
self.available_map[hash_] = timestamps
|
||||
nlines += 1
|
||||
if 0 < max_objects <= nlines:
|
||||
break
|
||||
|
||||
context['available_map'] = self.available_map
|
||||
ssync_calls.append(context)
|
||||
self.success = True
|
||||
@ -1179,6 +1186,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
context['available_map']))
|
||||
else:
|
||||
self.assertFalse(context.get('include_non_durable'))
|
||||
self.assertEqual(0, context.get('max_objects'))
|
||||
|
||||
mock_delete.assert_has_calls(expected_calls, any_order=True)
|
||||
|
||||
@ -1207,10 +1215,32 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
filename.endswith(data_file_tail), filename)
|
||||
else:
|
||||
self.assertFalse(context.get('include_non_durable'))
|
||||
self.assertEqual(0, context.get('max_objects'))
|
||||
|
||||
# sanity check that some files should were deleted
|
||||
self.assertGreater(n_files, n_files_after)
|
||||
|
||||
def test_max_objects_per_revert_only_for_revert_jobs(self):
|
||||
# verify max_objects_per_revert option is only passed to revert jobs
|
||||
ssync_calls = []
|
||||
conf = dict(self.conf, max_objects_per_revert=2)
|
||||
with mock.patch('swift.obj.reconstructor.ssync_sender',
|
||||
self._make_fake_ssync(ssync_calls)), \
|
||||
mocked_http_conn(*[200] * 6, body=pickle.dumps({})):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
conf, logger=self.logger)
|
||||
reconstructor.reconstruct()
|
||||
reverts = syncs = 0
|
||||
for context in ssync_calls:
|
||||
if context['job']['job_type'] == REVERT:
|
||||
self.assertEqual(2, context.get('max_objects'))
|
||||
reverts += 1
|
||||
else:
|
||||
self.assertEqual(0, context.get('max_objects'))
|
||||
syncs += 1
|
||||
self.assertGreater(reverts, 0)
|
||||
self.assertGreater(syncs, 0)
|
||||
|
||||
def test_delete_reverted_nondurable(self):
|
||||
# verify reconstructor only deletes reverted nondurable fragments older
|
||||
# commit_window
|
||||
@ -1419,6 +1449,63 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
with self.assertRaises(DiskFileError):
|
||||
df_older.writer().commit(ts_old)
|
||||
|
||||
def test_delete_reverted_max_objects_per_revert(self):
|
||||
# verify reconstructor only deletes objects that were actually reverted
|
||||
# when ssync is limited by max_objects_per_revert
|
||||
shutil.rmtree(self.ec_obj_path)
|
||||
ips = utils.whataremyips(self.reconstructor.bind_ip)
|
||||
local_devs = [dev for dev in self.ec_obj_ring.devs
|
||||
if dev and dev['replication_ip'] in ips and
|
||||
dev['replication_port'] ==
|
||||
self.reconstructor.port]
|
||||
partition = (local_devs[0]['id'] + 1) % 3
|
||||
# 2 durable objects
|
||||
df_0 = self._create_diskfile(
|
||||
object_name='zero', part=partition)
|
||||
datafile_0 = df_0.manager.cleanup_ondisk_files(
|
||||
df_0._datadir, frag_prefs=[])['data_file']
|
||||
self.assertTrue(os.path.exists(datafile_0))
|
||||
df_1 = self._create_diskfile(
|
||||
object_name='one', part=partition)
|
||||
datafile_1 = df_1.manager.cleanup_ondisk_files(
|
||||
df_1._datadir, frag_prefs=[])['data_file']
|
||||
self.assertTrue(os.path.exists(datafile_1))
|
||||
df_2 = self._create_diskfile(
|
||||
object_name='two', part=partition)
|
||||
datafile_2 = df_2.manager.cleanup_ondisk_files(
|
||||
df_2._datadir, frag_prefs=[])['data_file']
|
||||
self.assertTrue(os.path.exists(datafile_2))
|
||||
|
||||
datafiles = [datafile_0, datafile_1, datafile_2]
|
||||
actual_datafiles = [df for df in datafiles if os.path.exists(df)]
|
||||
self.assertEqual(datafiles, actual_datafiles)
|
||||
|
||||
# only two object will be sync'd and purged...
|
||||
ssync_calls = []
|
||||
conf = dict(self.conf, max_objects_per_revert=2, handoffs_only=True)
|
||||
self.reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
conf, logger=self.logger)
|
||||
with mock.patch('swift.obj.reconstructor.ssync_sender',
|
||||
self._make_fake_ssync(ssync_calls)):
|
||||
self.reconstructor.reconstruct()
|
||||
for context in ssync_calls:
|
||||
self.assertEqual(REVERT, context['job']['job_type'])
|
||||
self.assertEqual(2, context.get('max_objects'))
|
||||
actual_datafiles = [df for df in datafiles if os.path.exists(df)]
|
||||
self.assertEqual(1, len(actual_datafiles), actual_datafiles)
|
||||
|
||||
# ...until next reconstructor run which will sync and purge the last
|
||||
# object
|
||||
ssync_calls = []
|
||||
with mock.patch('swift.obj.reconstructor.ssync_sender',
|
||||
self._make_fake_ssync(ssync_calls)):
|
||||
self.reconstructor.reconstruct()
|
||||
for context in ssync_calls:
|
||||
self.assertEqual(REVERT, context['job']['job_type'])
|
||||
self.assertEqual(2, context.get('max_objects'))
|
||||
actual_datafiles = [df for df in datafiles if os.path.exists(df)]
|
||||
self.assertEqual([], actual_datafiles)
|
||||
|
||||
def test_no_delete_failed_revert(self):
|
||||
# test will only process revert jobs
|
||||
self.reconstructor.handoffs_only = True
|
||||
|
@ -102,10 +102,10 @@ class TestSender(BaseTest):
|
||||
self.daemon_logger = debug_logger('test-ssync-sender')
|
||||
self.daemon = ObjectReplicator(self.daemon_conf,
|
||||
self.daemon_logger)
|
||||
job = {'policy': POLICIES.legacy,
|
||||
'device': 'test-dev',
|
||||
'partition': '99'} # sufficient for Sender.__init__
|
||||
self.sender = ssync_sender.Sender(self.daemon, None, job, None)
|
||||
self.job = {'policy': POLICIES.legacy,
|
||||
'device': 'test-dev',
|
||||
'partition': '99'} # sufficient for Sender.__init__
|
||||
self.sender = ssync_sender.Sender(self.daemon, None, self.job, None)
|
||||
|
||||
def test_call_catches_MessageTimeout(self):
|
||||
|
||||
@ -810,6 +810,9 @@ class TestSender(BaseTest):
|
||||
{'ts_data': Timestamp(1380144471.00000)})
|
||||
connection = FakeConnection()
|
||||
response = FakeResponse()
|
||||
# max_objects unlimited
|
||||
self.sender = ssync_sender.Sender(self.daemon, None, self.job, None,
|
||||
max_objects=0)
|
||||
self.sender.daemon.node_timeout = 0.01
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
sleeps = [0, 0, 1]
|
||||
@ -874,6 +877,10 @@ class TestSender(BaseTest):
|
||||
'No match for %r %r %r %r' % (device, partition,
|
||||
policy, suffixes))
|
||||
|
||||
# note: max_objects > number that would yield
|
||||
self.sender = ssync_sender.Sender(self.daemon, None, self.job, None,
|
||||
max_objects=4)
|
||||
|
||||
connection = FakeConnection()
|
||||
self.sender.job = {
|
||||
'device': 'dev',
|
||||
@ -908,6 +915,121 @@ class TestSender(BaseTest):
|
||||
ts_meta=Timestamp(1380144475.44444),
|
||||
ts_ctype=Timestamp(1380144474.44448)))]
|
||||
self.assertEqual(available_map, dict(candidates))
|
||||
self.assertEqual([], self.daemon_logger.get_lines_for_level('info'))
|
||||
|
||||
def test_missing_check_max_objects_less_than_actual_objects(self):
|
||||
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
||||
# verify missing_check stops after 2 objects even though more
|
||||
# objects would yield
|
||||
if (device == 'dev' and partition == '9' and
|
||||
policy == POLICIES.legacy and
|
||||
suffixes == ['abc', 'def']):
|
||||
yield (
|
||||
'9d41d8cd98f00b204e9800998ecf0abc',
|
||||
{'ts_data': Timestamp(1380144470.00000)})
|
||||
yield (
|
||||
'9d41d8cd98f00b204e9800998ecf0def',
|
||||
{'ts_data': Timestamp(1380144472.22222),
|
||||
'ts_meta': Timestamp(1380144473.22222)})
|
||||
yield (
|
||||
'9d41d8cd98f00b204e9800998ecf1def',
|
||||
{'ts_data': Timestamp(1380144474.44444),
|
||||
'ts_ctype': Timestamp(1380144474.44448),
|
||||
'ts_meta': Timestamp(1380144475.44444)})
|
||||
else:
|
||||
raise Exception(
|
||||
'No match for %r %r %r %r' % (device, partition,
|
||||
policy, suffixes))
|
||||
|
||||
# max_objects < number that would yield
|
||||
self.sender = ssync_sender.Sender(self.daemon, None, self.job, None,
|
||||
max_objects=2)
|
||||
|
||||
connection = FakeConnection()
|
||||
self.sender.job = {
|
||||
'device': 'dev',
|
||||
'partition': '9',
|
||||
'policy': POLICIES.legacy,
|
||||
}
|
||||
self.sender.suffixes = ['abc', 'def']
|
||||
response = FakeResponse(
|
||||
chunk_body=(
|
||||
':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'))
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
available_map, send_map = self.sender.missing_check(connection,
|
||||
response)
|
||||
self.assertEqual(
|
||||
b''.join(connection.sent),
|
||||
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
||||
b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 '
|
||||
b'm:186a0\r\n\r\n'
|
||||
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||
self.assertEqual(send_map, {})
|
||||
candidates = [('9d41d8cd98f00b204e9800998ecf0abc',
|
||||
dict(ts_data=Timestamp(1380144470.00000))),
|
||||
('9d41d8cd98f00b204e9800998ecf0def',
|
||||
dict(ts_data=Timestamp(1380144472.22222),
|
||||
ts_meta=Timestamp(1380144473.22222)))]
|
||||
self.assertEqual(available_map, dict(candidates))
|
||||
self.assertEqual(
|
||||
['ssync missing_check truncated after 2 objects: device: dev, '
|
||||
'part: 9, policy: 0, last object hash: '
|
||||
'9d41d8cd98f00b204e9800998ecf0def'],
|
||||
self.daemon_logger.get_lines_for_level('info'))
|
||||
|
||||
def test_missing_check_max_objects_exactly_actual_objects(self):
|
||||
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
||||
if (device == 'dev' and partition == '9' and
|
||||
policy == POLICIES.legacy and
|
||||
suffixes == ['abc', 'def']):
|
||||
yield (
|
||||
'9d41d8cd98f00b204e9800998ecf0abc',
|
||||
{'ts_data': Timestamp(1380144470.00000)})
|
||||
yield (
|
||||
'9d41d8cd98f00b204e9800998ecf0def',
|
||||
{'ts_data': Timestamp(1380144472.22222),
|
||||
'ts_meta': Timestamp(1380144473.22222)})
|
||||
else:
|
||||
raise Exception(
|
||||
'No match for %r %r %r %r' % (device, partition,
|
||||
policy, suffixes))
|
||||
|
||||
# max_objects == number that would yield
|
||||
self.sender = ssync_sender.Sender(self.daemon, None, self.job, None,
|
||||
max_objects=2)
|
||||
|
||||
connection = FakeConnection()
|
||||
self.sender.job = {
|
||||
'device': 'dev',
|
||||
'partition': '9',
|
||||
'policy': POLICIES.legacy,
|
||||
}
|
||||
self.sender.suffixes = ['abc', 'def']
|
||||
response = FakeResponse(
|
||||
chunk_body=(
|
||||
':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'))
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
available_map, send_map = self.sender.missing_check(connection,
|
||||
response)
|
||||
self.assertEqual(
|
||||
b''.join(connection.sent),
|
||||
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
||||
b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 '
|
||||
b'm:186a0\r\n\r\n'
|
||||
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||
self.assertEqual(send_map, {})
|
||||
candidates = [('9d41d8cd98f00b204e9800998ecf0abc',
|
||||
dict(ts_data=Timestamp(1380144470.00000))),
|
||||
('9d41d8cd98f00b204e9800998ecf0def',
|
||||
dict(ts_data=Timestamp(1380144472.22222),
|
||||
ts_meta=Timestamp(1380144473.22222)))]
|
||||
self.assertEqual(available_map, dict(candidates))
|
||||
# nothing logged re: truncation
|
||||
self.assertEqual([], self.daemon_logger.get_lines_for_level('info'))
|
||||
|
||||
def test_missing_check_far_end_disconnect(self):
|
||||
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
||||
|
Loading…
x
Reference in New Issue
Block a user