diskfile: don't remove recently written non-durables

DiskFileManager will remove any stale files during
cleanup_ondisk_files(): these include tombstones and nondurable EC
data fragments whose timestamps are older than reclaim_age. It can
usually be safely assumed that a non-durable data fragment older than
reclaim_age is not going to become durable. However, if an agent PUTs
objects with specified older X-Timestamps (for example the reconciler
or container-sync) then there is a window of time during which the
object server has written an old non-durable data file but has not yet
committed it to make it durable.

Previously, if another process (for example the reconstructor) called
cleanup_ondisk_files during this window then the non-durable data file
would be removed. The subsequent attempt to commit the data file would
then result in a traceback due to there no longer being a data file to
rename, and of course the data file is lost.

This patch modifies cleanup_ondisk_files to not remove old, otherwise
stale, non-durable data files that were only written to disk in the
preceding 'commit_window' seconds. 'commit_window' is configurable for
the object server and defaults to 60.0 seconds.

Closes-Bug: #1936508
Related-Change: I0d519ebaaade35249fb7b17bd5f419ffdaa616c0
Change-Id: I5f3318a44af64b77a63713e6ff8d0fd3b6144f13
This commit is contained in:
Alistair Coles 2021-07-15 18:08:01 +01:00
parent dfd020d11c
commit bbaed18e9b
6 changed files with 260 additions and 9 deletions

View File

@ -149,6 +149,10 @@ The default is 65536.
.IP \fBreclaim_age\fR .IP \fBreclaim_age\fR
Time elapsed in seconds before an object can be reclaimed. The default is Time elapsed in seconds before an object can be reclaimed. The default is
604800 seconds. 604800 seconds.
.IP \fBcommit_window\fR
Time in seconds during which a newly written non-durable data file will not be
reclaimed. The value should be greater than zero and much less than
reclaim_age. The default is 60.0 seconds.
.IP \fBnice_priority\fR .IP \fBnice_priority\fR
Modify scheduling priority of server processes. Niceness values range from -20 Modify scheduling priority of server processes. Niceness values range from -20
(most favorable to the process) to 19 (least favorable to the process). (most favorable to the process) to 19 (least favorable to the process).

View File

@ -136,6 +136,18 @@ reclaim_age 604800 Time elapsed in seconds before the
it will result in dark data. This setting it will result in dark data. This setting
should be consistent across all object should be consistent across all object
services. services.
commit_window 60 Non-durable data files may also
get reclaimed if they are older
than reclaim_age, but not if the
time they were written to disk
(i.e. mtime) is less than
commit_window seconds ago. A
commit_window greater than zero is
strongly recommended to avoid
unintended reclamation of data
files that were about to become
durable; commit_window should be
much less than reclaim_age.
nice_priority None Scheduling priority of server processes. nice_priority None Scheduling priority of server processes.
Niceness values range from -20 (most Niceness values range from -20 (most
favorable to the process) to 19 (least favorable to the process) to 19 (least

View File

@ -87,6 +87,13 @@ bind_port = 6200
# and not greater than the container services reclaim_age # and not greater than the container services reclaim_age
# reclaim_age = 604800 # reclaim_age = 604800
# #
# Non-durable data files may also get reclaimed if they are older than
# reclaim_age, but not if the time they were written to disk (i.e. mtime) is
# less than commit_window seconds ago. A commit_window greater than zero is
# strongly recommended to avoid unintended reclamation of data files that were
# about to become durable; commit_window should be much less than reclaim_age.
# commit_window = 60.0
#
# You can set scheduling priority of processes. Niceness values range from -20 # You can set scheduling priority of processes. Niceness values range from -20
# (most favorable to the process) to 19 (least favorable to the process). # (most favorable to the process) to 19 (least favorable to the process).
# nice_priority = # nice_priority =

View File

@ -66,7 +66,7 @@ from swift.common.utils import mkdirs, Timestamp, \
get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \ get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \
MD5_OF_EMPTY_STRING, link_fd_to_path, \ MD5_OF_EMPTY_STRING, link_fd_to_path, \
O_TMPFILE, makedirs_count, replace_partition_in_path, remove_directory, \ O_TMPFILE, makedirs_count, replace_partition_in_path, remove_directory, \
md5, is_file_older md5, is_file_older, non_negative_float
from swift.common.splice import splice, tee from swift.common.splice import splice, tee
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
@ -81,6 +81,7 @@ from swift.common.storage_policy import (
PICKLE_PROTOCOL = 2 PICKLE_PROTOCOL = 2
DEFAULT_RECLAIM_AGE = timedelta(weeks=1).total_seconds() DEFAULT_RECLAIM_AGE = timedelta(weeks=1).total_seconds()
DEFAULT_COMMIT_WINDOW = 60.0
HASH_FILE = 'hashes.pkl' HASH_FILE = 'hashes.pkl'
HASH_INVALIDATIONS_FILE = 'hashes.invalid' HASH_INVALIDATIONS_FILE = 'hashes.invalid'
METADATA_KEY = b'user.swift.metadata' METADATA_KEY = b'user.swift.metadata'
@ -708,6 +709,8 @@ class BaseDiskFileManager(object):
self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024 self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.reclaim_age = int(conf.get('reclaim_age', DEFAULT_RECLAIM_AGE)) self.reclaim_age = int(conf.get('reclaim_age', DEFAULT_RECLAIM_AGE))
self.commit_window = non_negative_float(conf.get(
'commit_window', DEFAULT_COMMIT_WINDOW))
replication_concurrency_per_device = conf.get( replication_concurrency_per_device = conf.get(
'replication_concurrency_per_device') 'replication_concurrency_per_device')
replication_one_per_device = conf.get('replication_one_per_device') replication_one_per_device = conf.get('replication_one_per_device')
@ -1102,8 +1105,14 @@ class BaseDiskFileManager(object):
remove_file(join(hsh_path, results['ts_info']['filename'])) remove_file(join(hsh_path, results['ts_info']['filename']))
files.remove(results.pop('ts_info')['filename']) files.remove(results.pop('ts_info')['filename'])
for file_info in results.get('possible_reclaim', []): for file_info in results.get('possible_reclaim', []):
# stray files are not deleted until reclaim-age # stray files are not deleted until reclaim-age; non-durable data
if is_reclaimable(file_info['timestamp']): # files are not deleted unless they were written before
# commit_window
filepath = join(hsh_path, file_info['filename'])
if (is_reclaimable(file_info['timestamp']) and
(file_info.get('durable', True) or
self.commit_window <= 0 or
is_file_older(filepath, self.commit_window))):
results.setdefault('obsolete', []).append(file_info) results.setdefault('obsolete', []).append(file_info)
for file_info in results.get('obsolete', []): for file_info in results.get('obsolete', []):
remove_file(join(hsh_path, file_info['filename'])) remove_file(join(hsh_path, file_info['filename']))
@ -3625,7 +3634,8 @@ class ECDiskFileManager(BaseDiskFileManager):
results.setdefault('obsolete', []).extend(exts['.durable']) results.setdefault('obsolete', []).extend(exts['.durable'])
exts.pop('.durable') exts.pop('.durable')
# Fragments *may* be ready for reclaim, unless they are durable # Fragments *may* be ready for reclaim, unless they are most recent
# durable
for frag_set in frag_sets.values(): for frag_set in frag_sets.values():
if frag_set in (durable_frag_set, chosen_frag_set): if frag_set in (durable_frag_set, chosen_frag_set):
continue continue

View File

@ -1054,6 +1054,48 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
policy=policy, frag_index=frag_index, policy=policy, frag_index=frag_index,
**kwargs) **kwargs)
def test_init(self):
for policy in POLICIES:
df_router = diskfile.DiskFileRouter({}, self.logger)
df_mgr = df_router[policy]
self.assertEqual('/srv/node', df_mgr.devices)
self.assertEqual(604800, df_mgr.reclaim_age)
self.assertEqual(60.0, df_mgr.commit_window)
self.assertTrue(df_mgr.mount_check)
for policy in POLICIES:
conf = dict(devices=self.testdir,
mount_check='false',
reclaim_age=1000,
commit_window=10.1)
df_router = diskfile.DiskFileRouter(conf, self.logger)
df_mgr = df_router[policy]
self.assertEqual(self.testdir, df_mgr.devices)
self.assertEqual(1000, df_mgr.reclaim_age)
self.assertEqual(10.1, df_mgr.commit_window)
self.assertFalse(df_mgr.mount_check)
def test_init_commit_window(self):
def assert_ok(value, expected):
for policy in POLICIES:
conf = {'commit_window': value}
df_mgr = diskfile.DiskFileRouter(conf, self.logger)[policy]
self.assertEqual(expected, df_mgr.commit_window)
assert_ok(10.1, 10.1)
assert_ok('10.1', 10.1)
assert_ok(0, 0.0)
def assert_invalid(value):
for policy in POLICIES:
conf = {'commit_window': value}
with self.assertRaises(ValueError):
diskfile.DiskFileRouter(conf, self.logger)[policy]
assert_invalid(-1.1)
assert_invalid('-1.1')
assert_invalid('auto')
def test_cleanup_uses_configured_reclaim_age(self): def test_cleanup_uses_configured_reclaim_age(self):
# verify that the reclaim_age used when cleaning up tombstones is # verify that the reclaim_age used when cleaning up tombstones is
# either the default or the configured value # either the default or the configured value
@ -1108,7 +1150,7 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
shuffle(files) shuffle(files)
def _test_cleanup_ondisk_files(self, scenarios, policy, def _test_cleanup_ondisk_files(self, scenarios, policy,
reclaim_age=None): reclaim_age=None, commit_window=None):
# check that expected files are left in hashdir after cleanup # check that expected files are left in hashdir after cleanup
for test in scenarios: for test in scenarios:
class_under_test = self.df_router[policy] class_under_test = self.df_router[policy]
@ -1120,6 +1162,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
open(os.path.join(hashdir, fname), 'w') open(os.path.join(hashdir, fname), 'w')
expected_after_cleanup = set([f[0] for f in test expected_after_cleanup = set([f[0] for f in test
if (f[2] if len(f) > 2 else f[1])]) if (f[2] if len(f) > 2 else f[1])])
if commit_window is not None:
class_under_test.commit_window = commit_window
if reclaim_age: if reclaim_age:
class_under_test.reclaim_age = reclaim_age class_under_test.reclaim_age = reclaim_age
class_under_test.cleanup_ondisk_files(hashdir) class_under_test.cleanup_ondisk_files(hashdir)
@ -1268,7 +1312,7 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
('%s.ts' % older, False, False)]] ('%s.ts' % older, False, False)]]
self._test_cleanup_ondisk_files(scenarios, POLICIES.default, self._test_cleanup_ondisk_files(scenarios, POLICIES.default,
reclaim_age=1000) reclaim_age=1000, commit_window=0)
def test_construct_dev_path(self): def test_construct_dev_path(self):
res_path = self.df_mgr.construct_dev_path('abc') res_path = self.df_mgr.construct_dev_path('abc')
@ -2693,7 +2737,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
('%s.durable' % much_older, False, False)]] ('%s.durable' % much_older, False, False)]]
self._test_cleanup_ondisk_files(scenarios, POLICIES.default, self._test_cleanup_ondisk_files(scenarios, POLICIES.default,
reclaim_age=1000) reclaim_age=1000, commit_window=0)
def test_cleanup_ondisk_files_reclaim_with_data_files(self): def test_cleanup_ondisk_files_reclaim_with_data_files(self):
# Each scenario specifies a list of (filename, extension, [survives]) # Each scenario specifies a list of (filename, extension, [survives])
@ -2739,7 +2783,33 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
('%s#4.data' % much_older, False, False)]] ('%s#4.data' % much_older, False, False)]]
self._test_cleanup_ondisk_files(scenarios, POLICIES.default, self._test_cleanup_ondisk_files(scenarios, POLICIES.default,
reclaim_age=1000) reclaim_age=1000, commit_window=0)
def test_cleanup_ondisk_files_commit_window(self):
# verify that non-durable files are not reclaimed regardless of
# timestamp if written to disk within commit_window
much_older = Timestamp(time() - 1001).internal
older = Timestamp(time() - 1001).internal
newer = Timestamp(time() - 900).internal
scenarios = [
# recently written nondurables not cleaned up
[('%s#1.data' % older, True),
('%s#2.data' % newer, True),
('%s.meta' % much_older, False),
('%s.ts' % much_older, False)]]
self._test_cleanup_ondisk_files(scenarios, POLICIES.default,
reclaim_age=1000, commit_window=60)
# ... but if commit_window is reduced then recently written files are
# cleaned up
scenarios = [
# older *timestamps* cleaned up
[('%s#1.data' % older, False),
('%s#2.data' % newer, True),
('%s.meta' % much_older, False),
('%s.ts' % much_older, False)]]
self._test_cleanup_ondisk_files(scenarios, POLICIES.default,
reclaim_age=1000, commit_window=0)
def test_get_ondisk_files_with_stray_meta(self): def test_get_ondisk_files_with_stray_meta(self):
# get_ondisk_files ignores a stray .meta file # get_ondisk_files ignores a stray .meta file
@ -6820,6 +6890,7 @@ class TestSuffixHashes(unittest.TestCase):
self.assertRaises(output_files.__class__, self.assertRaises(output_files.__class__,
df_mgr.cleanup_ondisk_files, path) df_mgr.cleanup_ondisk_files, path)
return return
df_mgr.commit_window = 0
files = df_mgr.cleanup_ondisk_files('/whatever')['files'] files = df_mgr.cleanup_ondisk_files('/whatever')['files']
self.assertEqual(files, output_files) self.assertEqual(files, output_files)
if files: if files:
@ -7701,6 +7772,7 @@ class TestSuffixHashes(unittest.TestCase):
# get_hashes does not trigger reclaim because the suffix has # get_hashes does not trigger reclaim because the suffix has
# MD5_OF_EMPTY_STRING in hashes.pkl # MD5_OF_EMPTY_STRING in hashes.pkl
df_mgr.reclaim_age = 500 df_mgr.reclaim_age = 500
df_mgr.commit_window = 0
hashes = df_mgr.get_hashes('sda1', '0', [], policy) hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertEqual([ts_meta.internal + '.meta'], self.assertEqual([ts_meta.internal + '.meta'],
os.listdir(df._datadir)) os.listdir(df._datadir))

View File

@ -269,7 +269,6 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
dev_path = os.path.join(self.reconstructor.devices_dir, dev_path = os.path.join(self.reconstructor.devices_dir,
self.ec_local_dev['device']) self.ec_local_dev['device'])
self.ec_obj_path = os.path.join(dev_path, data_dir) self.ec_obj_path = os.path.join(dev_path, data_dir)
# create a bunch of FA's to test # create a bunch of FA's to test
t = 1421181937.70054 # time.time() t = 1421181937.70054 # time.time()
with mock.patch('swift.obj.diskfile.time') as mock_time: with mock.patch('swift.obj.diskfile.time') as mock_time:
@ -1252,6 +1251,153 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.assertFalse(os.path.exists(datafile_recent)) self.assertFalse(os.path.exists(datafile_recent))
self.assertFalse(os.path.exists(datafile_older)) self.assertFalse(os.path.exists(datafile_older))
def test_sync_old_nondurable_before_committed_non_zero_commit_window(self):
# verify that a *recently written* nondurable fragment survives being
# visited by the reconstructor, despite having timestamp older than
# reclaim_age
shutil.rmtree(self.ec_obj_path)
ips = utils.whataremyips(self.reconstructor.bind_ip)
local_devs = [dev for dev in self.ec_obj_ring.devs
if dev and dev['replication_ip'] in ips and
dev['replication_port'] ==
self.reconstructor.port]
partition = local_devs[0]['id']
# recently written, recent timestamp non-durable
ts_recent = Timestamp(time.time())
df_mgr = self.reconstructor._df_router[self.policy]
reclaim_age = df_mgr.reclaim_age
df_recent = self._create_diskfile(
object_name='recent', part=partition, commit=False,
timestamp=ts_recent, frag_index=4)
datafile_recent = df_recent.manager.cleanup_ondisk_files(
df_recent._datadir, frag_prefs=[])['data_file']
# recently written but old timestamp non-durable
ts_old = Timestamp(time.time() - reclaim_age - 1)
df_older = self._create_diskfile(
object_name='older', part=partition, commit=False,
timestamp=ts_old, frag_index=4)
datafile_older = df_older.manager.cleanup_ondisk_files(
df_older._datadir, frag_prefs=[])['data_file']
self.assertTrue(os.path.exists(datafile_recent))
self.assertTrue(os.path.exists(datafile_older))
# for this test we don't actually need to ssync anything, so pretend
# all suffixes are in sync
self.reconstructor._get_suffixes_to_sync = (
lambda job, node: ([], node))
df_mgr.commit_window = 1000 # avoid non-durables being reclaimed
self.reconstructor.reconstruct()
# neither nondurable should be removed yet with default commit_window
# because their mtimes are too recent
self.assertTrue(os.path.exists(datafile_recent))
self.assertTrue(os.path.exists(datafile_older))
# and we can still make the nondurables durable
df_recent.writer().commit(ts_recent)
self.assertTrue(os.path.exists(datafile_recent.replace('#4', '#4#d')))
df_older.writer().commit(ts_old)
self.assertTrue(os.path.exists(datafile_older.replace('#4', '#4#d')))
def test_sync_old_nondurable_before_committed_zero_commit_window(self):
# verify that a *recently written* nondurable fragment won't survive
# being visited by the reconstructor if its timestamp is older than
# reclaim_age and commit_window is zero; this test illustrates the
# potential data loss bug that commit_window addresses
shutil.rmtree(self.ec_obj_path)
ips = utils.whataremyips(self.reconstructor.bind_ip)
local_devs = [dev for dev in self.ec_obj_ring.devs
if dev and dev['replication_ip'] in ips and
dev['replication_port'] ==
self.reconstructor.port]
partition = local_devs[0]['id']
# recently written, recent timestamp non-durable
ts_recent = Timestamp(time.time())
df_mgr = self.reconstructor._df_router[self.policy]
reclaim_age = df_mgr.reclaim_age
df_recent = self._create_diskfile(
object_name='recent', part=partition, commit=False,
timestamp=ts_recent, frag_index=4)
datafile_recent = df_recent.manager.cleanup_ondisk_files(
df_recent._datadir, frag_prefs=[])['data_file']
# recently written but old timestamp non-durable
ts_old = Timestamp(time.time() - reclaim_age - 1)
df_older = self._create_diskfile(
object_name='older', part=partition, commit=False,
timestamp=ts_old, frag_index=4)
datafile_older = df_older.manager.cleanup_ondisk_files(
df_older._datadir, frag_prefs=[])['data_file']
self.assertTrue(os.path.exists(datafile_recent))
self.assertTrue(os.path.exists(datafile_older))
# for this test we don't actually need to ssync anything, so pretend
# all suffixes are in sync
self.reconstructor._get_suffixes_to_sync = (
lambda job, node: ([], node))
df_mgr.commit_window = 0
with mock.patch(
'swift.obj.diskfile.is_file_older') as mock_is_file_older:
self.reconstructor.reconstruct()
# older nondurable will be removed with commit_window = 0
self.assertTrue(os.path.exists(datafile_recent))
self.assertFalse(os.path.exists(datafile_older))
df_recent.writer().commit(ts_recent)
self.assertTrue(os.path.exists(datafile_recent.replace('#4', '#4#d')))
# ...and attempt to commit will fail :(
with self.assertRaises(DiskFileError):
df_older.writer().commit(ts_old)
# with zero commit_window the call to stat the file is not made
mock_is_file_older.assert_not_called()
def test_sync_old_nondurable_before_committed_past_commit_window(self):
# verify that a *not so recently written* nondurable fragment won't
# survive being visited by the reconstructor if its timestamp is older
# than reclaim_age
shutil.rmtree(self.ec_obj_path)
ips = utils.whataremyips(self.reconstructor.bind_ip)
local_devs = [dev for dev in self.ec_obj_ring.devs
if dev and dev['replication_ip'] in ips and
dev['replication_port'] ==
self.reconstructor.port]
partition = local_devs[0]['id']
# recently written, recent timestamp non-durable
ts_recent = Timestamp(time.time())
df_mgr = self.reconstructor._df_router[self.policy]
reclaim_age = df_mgr.reclaim_age
df_recent = self._create_diskfile(
object_name='recent', part=partition, commit=False,
timestamp=ts_recent, frag_index=4)
datafile_recent = df_recent.manager.cleanup_ondisk_files(
df_recent._datadir, frag_prefs=[])['data_file']
# recently written but old timestamp non-durable
ts_old = Timestamp(time.time() - reclaim_age - 1)
df_older = self._create_diskfile(
object_name='older', part=partition, commit=False,
timestamp=ts_old, frag_index=4)
datafile_older = df_older.manager.cleanup_ondisk_files(
df_older._datadir, frag_prefs=[])['data_file']
# pretend file was written more than commit_window seconds ago
now = time.time()
os.utime(datafile_older, (now - 60.1, now - 60.1))
self.assertTrue(os.path.exists(datafile_recent))
self.assertTrue(os.path.exists(datafile_older))
# for this test we don't actually need to ssync anything, so pretend
# all suffixes are in sync
self.reconstructor._get_suffixes_to_sync = (
lambda job, node: ([], node))
# leave commit_window at default of 60 seconds
self.reconstructor.reconstruct()
# older nondurable will be removed
self.assertTrue(os.path.exists(datafile_recent))
self.assertFalse(os.path.exists(datafile_older))
df_recent.writer().commit(ts_recent)
self.assertTrue(os.path.exists(datafile_recent.replace('#4', '#4#d')))
# ...and attempt to commit will fail :(
with self.assertRaises(DiskFileError):
df_older.writer().commit(ts_old)
def test_no_delete_failed_revert(self): def test_no_delete_failed_revert(self):
# test will only process revert jobs # test will only process revert jobs
self.reconstructor.handoffs_only = True self.reconstructor.handoffs_only = True