reconstructor: purge meta files in pure handoffs
Previously, after reverting handoff files, the reconstructor would only purge tombstones and data files for the reverted fragment index. Any meta files were not purged because the partition might also be on a primary node for a different fragment index. For example, if, before the reconstructor visits, the object hash dir contained: t1#1#d.data t1#2#d.data t2.meta where frag index 1 is a handoff and gets reverted, then, after the reconstructor has visited, the hash dir should still contain: t1#2#d.data t2.meta If, before the reconstructor visits, the object hash dir contained: t1#1#d.data t2.meta then, after the reconstructor has visited, the hash dir would still contain: t2.meta The retention of meta files is undesirable when the partition is a "pure handoff" i.e. the node is not a primary for the partition for any fragment index. With this patch the meta files are purged after being reverted if the reconstructor has no sync job for the partition (i.e. the partition is a "pure handoff") and there are no more fragments to revert. Change-Id: I107af3bc2d62768e063ef3176645d60ef22fa6d4 Co-Authored-By: Tim Burke <tim.burke@gmail.com>
This commit is contained in:
parent
092d409c4b
commit
ada9f0eeb0
@ -1488,21 +1488,22 @@ class BaseDiskFileManager(object):
|
||||
self, audit_location.path, dev_path,
|
||||
audit_location.partition, policy=audit_location.policy)
|
||||
|
||||
def get_diskfile_from_hash(self, device, partition, object_hash,
|
||||
policy, **kwargs):
|
||||
def get_diskfile_and_filenames_from_hash(self, device, partition,
|
||||
object_hash, policy, **kwargs):
|
||||
"""
|
||||
Returns a DiskFile instance for an object at the given
|
||||
object_hash. Just in case someone thinks of refactoring, be
|
||||
sure DiskFileDeleted is *not* raised, but the DiskFile
|
||||
instance representing the tombstoned object is returned
|
||||
instead.
|
||||
Returns a tuple of (a DiskFile instance for an object at the given
|
||||
object_hash, the basenames of the files in the object's hash dir).
|
||||
Just in case someone thinks of refactoring, be sure DiskFileDeleted is
|
||||
*not* raised, but the DiskFile instance representing the tombstoned
|
||||
object is returned instead.
|
||||
|
||||
:param device: name of target device
|
||||
:param partition: partition on the device in which the object lives
|
||||
:param object_hash: the hash of an object path
|
||||
:param policy: the StoragePolicy instance
|
||||
:raises DiskFileNotExist: if the object does not exist
|
||||
:returns: an instance of BaseDiskFile
|
||||
:returns: a tuple comprising (an instance of BaseDiskFile, a list of
|
||||
file basenames)
|
||||
"""
|
||||
dev_path = self.get_dev_path(device)
|
||||
if not dev_path:
|
||||
@ -1541,9 +1542,27 @@ class BaseDiskFileManager(object):
|
||||
metadata.get('name', ''), 3, 3, True)
|
||||
except ValueError:
|
||||
raise DiskFileNotExist()
|
||||
return self.diskfile_cls(self, dev_path,
|
||||
partition, account, container, obj,
|
||||
policy=policy, **kwargs)
|
||||
df = self.diskfile_cls(self, dev_path, partition, account, container,
|
||||
obj, policy=policy, **kwargs)
|
||||
return df, filenames
|
||||
|
||||
def get_diskfile_from_hash(self, device, partition, object_hash, policy,
|
||||
**kwargs):
|
||||
"""
|
||||
Returns a DiskFile instance for an object at the given object_hash.
|
||||
Just in case someone thinks of refactoring, be sure DiskFileDeleted is
|
||||
*not* raised, but the DiskFile instance representing the tombstoned
|
||||
object is returned instead.
|
||||
|
||||
:param device: name of target device
|
||||
:param partition: partition on the device in which the object lives
|
||||
:param object_hash: the hash of an object path
|
||||
:param policy: the StoragePolicy instance
|
||||
:raises DiskFileNotExist: if the object does not exist
|
||||
:returns: an instance of BaseDiskFile
|
||||
"""
|
||||
return self.get_diskfile_and_filenames_from_hash(
|
||||
device, partition, object_hash, policy, **kwargs)[0]
|
||||
|
||||
def get_hashes(self, device, partition, suffixes, policy,
|
||||
skip_rehash=False):
|
||||
@ -3325,7 +3344,8 @@ class ECDiskFile(BaseDiskFile):
|
||||
frag_prefs=self._frag_prefs, policy=policy)
|
||||
return self._ondisk_info
|
||||
|
||||
def purge(self, timestamp, frag_index, nondurable_purge_delay=0):
|
||||
def purge(self, timestamp, frag_index, nondurable_purge_delay=0,
|
||||
meta_timestamp=None):
|
||||
"""
|
||||
Remove a tombstone file matching the specified timestamp or
|
||||
datafile matching the specified timestamp and fragment index
|
||||
@ -3344,12 +3364,20 @@ class ECDiskFile(BaseDiskFile):
|
||||
a whole number or None.
|
||||
:param nondurable_purge_delay: only remove a non-durable data file if
|
||||
it's been on disk longer than this many seconds.
|
||||
:param meta_timestamp: if not None then remove any meta file with this
|
||||
timestamp
|
||||
"""
|
||||
purge_file = self.manager.make_on_disk_filename(
|
||||
timestamp, ext='.ts')
|
||||
purge_path = os.path.join(self._datadir, purge_file)
|
||||
remove_file(purge_path)
|
||||
|
||||
if meta_timestamp is not None:
|
||||
purge_file = self.manager.make_on_disk_filename(
|
||||
meta_timestamp, ext='.meta')
|
||||
purge_path = os.path.join(self._datadir, purge_file)
|
||||
remove_file(purge_path)
|
||||
|
||||
if frag_index is not None:
|
||||
# data file may or may not be durable so try removing both filename
|
||||
# possibilities
|
||||
|
@ -961,7 +961,7 @@ class ObjectReconstructor(Daemon):
|
||||
self.suffix_count += len(suffixes)
|
||||
return suffixes, node
|
||||
|
||||
def delete_reverted_objs(self, job, objects, frag_index):
|
||||
def delete_reverted_objs(self, job, objects):
|
||||
"""
|
||||
For EC we can potentially revert only some of a partition
|
||||
so we'll delete reverted objects here. Note that we delete
|
||||
@ -970,24 +970,37 @@ class ObjectReconstructor(Daemon):
|
||||
:param job: the job being processed
|
||||
:param objects: a dict of objects to be deleted, each entry maps
|
||||
hash=>timestamp
|
||||
:param frag_index: (int) the fragment index of data files to be deleted
|
||||
"""
|
||||
df_mgr = self._df_router[job['policy']]
|
||||
suffixes_to_delete = set()
|
||||
for object_hash, timestamps in objects.items():
|
||||
try:
|
||||
df = df_mgr.get_diskfile_from_hash(
|
||||
df, filenames = df_mgr.get_diskfile_and_filenames_from_hash(
|
||||
job['local_dev']['device'], job['partition'],
|
||||
object_hash, job['policy'],
|
||||
frag_index=frag_index)
|
||||
frag_index=job['frag_index'])
|
||||
# legacy durable data files look like modern nondurable data
|
||||
# files; we therefore override nondurable_purge_delay when we
|
||||
# know the data file is durable so that legacy durable data
|
||||
# files get purged
|
||||
nondurable_purge_delay = (0 if timestamps.get('durable')
|
||||
else df_mgr.commit_window)
|
||||
df.purge(timestamps['ts_data'], frag_index,
|
||||
nondurable_purge_delay)
|
||||
data_files = [
|
||||
f for f in filenames
|
||||
if f.endswith('.data')]
|
||||
purgable_data_files = [
|
||||
f for f in data_files
|
||||
if f.startswith(timestamps['ts_data'].internal)]
|
||||
if (job['primary_frag_index'] is None
|
||||
and len(purgable_data_files) == len(data_files) <= 1):
|
||||
# pure handoff node, and we're about to purge the last
|
||||
# .data file, so it's ok to remove any meta file that may
|
||||
# have been reverted
|
||||
meta_timestamp = timestamps.get('ts_meta')
|
||||
else:
|
||||
meta_timestamp = None
|
||||
df.purge(timestamps['ts_data'], job['frag_index'],
|
||||
nondurable_purge_delay, meta_timestamp)
|
||||
except DiskFileNotExist:
|
||||
# may have passed reclaim age since being reverted, or may have
|
||||
# raced with another reconstructor process trying the same
|
||||
@ -995,7 +1008,7 @@ class ObjectReconstructor(Daemon):
|
||||
except DiskFileError:
|
||||
self.logger.exception(
|
||||
'Unable to purge DiskFile (%r %r %r)',
|
||||
object_hash, timestamps['ts_data'], frag_index)
|
||||
object_hash, timestamps['ts_data'], job['frag_index'])
|
||||
suffixes_to_delete.add(object_hash[-3:])
|
||||
|
||||
for suffix in suffixes_to_delete:
|
||||
@ -1080,8 +1093,7 @@ class ObjectReconstructor(Daemon):
|
||||
syncd_with += 1
|
||||
reverted_objs.update(in_sync_objs)
|
||||
if syncd_with >= len(job['sync_to']):
|
||||
self.delete_reverted_objs(
|
||||
job, reverted_objs, job['frag_index'])
|
||||
self.delete_reverted_objs(job, reverted_objs)
|
||||
else:
|
||||
self.handoffs_remaining += 1
|
||||
except PartitionLockTimeout:
|
||||
@ -1150,7 +1162,8 @@ class ObjectReconstructor(Daemon):
|
||||
data_fi_to_suffixes[fi].append(suffix)
|
||||
|
||||
# helper to ensure consistent structure of jobs
|
||||
def build_job(job_type, frag_index, suffixes, sync_to):
|
||||
def build_job(job_type, frag_index, suffixes, sync_to,
|
||||
primary_frag_index):
|
||||
return {
|
||||
'job_type': job_type,
|
||||
'frag_index': frag_index,
|
||||
@ -1163,28 +1176,33 @@ class ObjectReconstructor(Daemon):
|
||||
'local_dev': local_dev,
|
||||
# ssync likes to have it handy
|
||||
'device': local_dev['device'],
|
||||
# provide a hint to revert jobs that the node is a primary for
|
||||
# one of the frag indexes
|
||||
'primary_frag_index': primary_frag_index,
|
||||
}
|
||||
|
||||
# aggregate jobs for all the fragment index in this part
|
||||
jobs = []
|
||||
|
||||
# check the primary nodes - to see if the part belongs here
|
||||
primary_frag_index = None
|
||||
part_nodes = policy.object_ring.get_part_nodes(partition)
|
||||
for node in part_nodes:
|
||||
if node['id'] == local_dev['id']:
|
||||
# this partition belongs here, we'll need a sync job
|
||||
frag_index = policy.get_backend_index(node['index'])
|
||||
primary_frag_index = policy.get_backend_index(node['index'])
|
||||
try:
|
||||
suffixes = data_fi_to_suffixes.pop(frag_index)
|
||||
suffixes = data_fi_to_suffixes.pop(primary_frag_index)
|
||||
except KeyError:
|
||||
# N.B. If this function ever returns an empty list of jobs
|
||||
# the entire partition will be deleted.
|
||||
suffixes = []
|
||||
sync_job = build_job(
|
||||
job_type=SYNC,
|
||||
frag_index=frag_index,
|
||||
frag_index=primary_frag_index,
|
||||
suffixes=suffixes,
|
||||
sync_to=_get_partners(node['index'], part_nodes),
|
||||
primary_frag_index=primary_frag_index
|
||||
)
|
||||
# ssync callback to rebuild missing fragment_archives
|
||||
sync_job['sync_diskfile_builder'] = self.reconstruct_fa
|
||||
@ -1215,6 +1233,7 @@ class ObjectReconstructor(Daemon):
|
||||
frag_index=fi,
|
||||
suffixes=data_fi_to_suffixes[fi],
|
||||
sync_to=nodes_sync_to,
|
||||
primary_frag_index=primary_frag_index
|
||||
)
|
||||
jobs.append(revert_job)
|
||||
|
||||
@ -1241,7 +1260,8 @@ class ObjectReconstructor(Daemon):
|
||||
job_type=REVERT,
|
||||
frag_index=None,
|
||||
suffixes=non_data_fragment_suffixes,
|
||||
sync_to=random.sample(part_nodes, nsample)
|
||||
sync_to=random.sample(part_nodes, nsample),
|
||||
primary_frag_index=primary_frag_index
|
||||
))
|
||||
# return a list of jobs for this part
|
||||
return jobs
|
||||
|
@ -1678,13 +1678,15 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
|
||||
def test_get_diskfile_from_hash(self):
|
||||
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
|
||||
with mock.patch(self._manager_mock('diskfile_cls')) as dfclass, \
|
||||
mock_return = object()
|
||||
with mock.patch(self._manager_mock('diskfile_cls'),
|
||||
return_value=mock_return) as dfclass, \
|
||||
mock.patch(self._manager_mock(
|
||||
'cleanup_ondisk_files')) as cleanup, \
|
||||
mock.patch('swift.obj.diskfile.read_metadata') as readmeta:
|
||||
cleanup.return_value = {'files': ['1381679759.90941.data']}
|
||||
readmeta.return_value = {'name': '/a/c/o'}
|
||||
self.df_mgr.get_diskfile_from_hash(
|
||||
actual = self.df_mgr.get_diskfile_from_hash(
|
||||
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
|
||||
dfclass.assert_called_once_with(
|
||||
self.df_mgr, '/srv/dev/', '9',
|
||||
@ -1694,6 +1696,30 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
readmeta.assert_called_once_with(
|
||||
'/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900/'
|
||||
'1381679759.90941.data')
|
||||
self.assertEqual(mock_return, actual)
|
||||
|
||||
def test_get_diskfile_and_filenames_from_hash(self):
|
||||
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
|
||||
mock_return = object()
|
||||
with mock.patch(self._manager_mock('diskfile_cls'),
|
||||
return_value=mock_return) as dfclass, \
|
||||
mock.patch(self._manager_mock(
|
||||
'cleanup_ondisk_files')) as cleanup, \
|
||||
mock.patch('swift.obj.diskfile.read_metadata') as readmeta:
|
||||
cleanup.return_value = {'files': ['1381679759.90941.data']}
|
||||
readmeta.return_value = {'name': '/a/c/o'}
|
||||
actual, names = self.df_mgr.get_diskfile_and_filenames_from_hash(
|
||||
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
|
||||
dfclass.assert_called_once_with(
|
||||
self.df_mgr, '/srv/dev/', '9',
|
||||
'a', 'c', 'o', policy=POLICIES[0])
|
||||
cleanup.assert_called_once_with(
|
||||
'/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900')
|
||||
readmeta.assert_called_once_with(
|
||||
'/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900/'
|
||||
'1381679759.90941.data')
|
||||
self.assertEqual(mock_return, actual)
|
||||
self.assertEqual(['1381679759.90941.data'], names)
|
||||
|
||||
def test_listdir_enoent(self):
|
||||
oserror = OSError()
|
||||
@ -1919,7 +1945,7 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
expected = sorted(expected_items)
|
||||
actual = sorted(hash_items)
|
||||
# default list diff easiest to debug
|
||||
self.assertEqual(actual, expected)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_yield_hashes_tombstones(self):
|
||||
ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
|
||||
@ -2127,6 +2153,9 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
||||
'9333a92d072897b136b3fc06595b4abc': [
|
||||
ts1.internal + '.ts',
|
||||
ts2.internal + '.meta'],
|
||||
# dangling .meta is not yielded because it cannot be sync'd
|
||||
'9222a92d072897b136b3fc06595b4abc': [
|
||||
ts3.internal + '.meta'],
|
||||
},
|
||||
'456': {
|
||||
# only latest metadata timestamp
|
||||
@ -6116,15 +6145,61 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
|
||||
for frag_index in (1, 2):
|
||||
df = self._simple_get_diskfile(frag_index=frag_index)
|
||||
write_diskfile(df, ts)
|
||||
ts_meta = self.ts()
|
||||
df.write_metadata({
|
||||
'X-Timestamp': ts_meta.internal,
|
||||
'X-Object-Meta-Delete': 'me'
|
||||
})
|
||||
|
||||
# sanity
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [
|
||||
ts.internal + '#1#d.data',
|
||||
ts.internal + '#2#d.data',
|
||||
ts_meta.internal + '.meta',
|
||||
])
|
||||
df.purge(ts, 2)
|
||||
self.assertEqual(os.listdir(df._datadir), [
|
||||
# by default .meta file is not purged
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [
|
||||
ts.internal + '#1#d.data',
|
||||
ts_meta.internal + '.meta',
|
||||
])
|
||||
|
||||
def test_purge_final_fragment_index_and_meta(self):
|
||||
ts = self.ts()
|
||||
df = self._simple_get_diskfile(frag_index=1)
|
||||
write_diskfile(df, ts)
|
||||
ts_meta = self.ts()
|
||||
df.write_metadata({
|
||||
'X-Timestamp': ts_meta.internal,
|
||||
'X-Object-Meta-Delete': 'me',
|
||||
})
|
||||
|
||||
# sanity
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [
|
||||
ts.internal + '#1#d.data',
|
||||
ts_meta.internal + '.meta',
|
||||
])
|
||||
df.purge(ts, 1, meta_timestamp=ts_meta)
|
||||
self.assertFalse(os.path.exists(df._datadir))
|
||||
|
||||
def test_purge_final_fragment_index_and_not_meta(self):
|
||||
ts = self.ts()
|
||||
df = self._simple_get_diskfile(frag_index=1)
|
||||
write_diskfile(df, ts)
|
||||
ts_meta = self.ts()
|
||||
df.write_metadata({
|
||||
'X-Timestamp': ts_meta.internal,
|
||||
'X-Object-Meta-Delete': 'me',
|
||||
})
|
||||
|
||||
# sanity
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [
|
||||
ts.internal + '#1#d.data',
|
||||
ts_meta.internal + '.meta',
|
||||
])
|
||||
df.purge(ts, 1, meta_timestamp=ts)
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [
|
||||
ts_meta.internal + '.meta',
|
||||
])
|
||||
|
||||
def test_purge_last_fragment_index(self):
|
||||
|
@ -36,7 +36,7 @@ from six.moves.urllib.parse import unquote
|
||||
from swift.common import utils
|
||||
from swift.common.exceptions import DiskFileError, DiskFileQuarantined
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.utils import dump_recon_cache, md5, Timestamp
|
||||
from swift.common.utils import dump_recon_cache, md5, Timestamp, mkdirs
|
||||
from swift.obj import diskfile, reconstructor as object_reconstructor
|
||||
from swift.common import ring
|
||||
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
|
||||
@ -332,6 +332,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
'suffixes': ['061'],
|
||||
'partition': 0,
|
||||
'frag_index': 2,
|
||||
'primary_frag_index': 1,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
@ -395,6 +396,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
'suffixes': ['061', '3c1'],
|
||||
'partition': 0,
|
||||
'frag_index': 1,
|
||||
'primary_frag_index': 1,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
@ -440,6 +442,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
'suffixes': ['061', '3c1'],
|
||||
'partition': 1,
|
||||
'frag_index': 1,
|
||||
'primary_frag_index': 4,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
@ -481,6 +484,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
'suffixes': ['3c1'],
|
||||
'partition': 1,
|
||||
'frag_index': 0,
|
||||
'primary_frag_index': 4,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
@ -544,6 +548,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
'suffixes': [],
|
||||
'partition': 1,
|
||||
'frag_index': 4,
|
||||
'primary_frag_index': 4,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
@ -589,6 +594,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
'suffixes': ['061'],
|
||||
'partition': 2,
|
||||
'frag_index': 0,
|
||||
'primary_frag_index': None,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
@ -628,6 +634,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
'suffixes': ['3c1'],
|
||||
'partition': 2,
|
||||
'frag_index': 2,
|
||||
'primary_frag_index': None,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
@ -1166,9 +1173,10 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
# may not be for the reverted frag index
|
||||
self.assertTrue(files)
|
||||
n_files += len(files)
|
||||
self.assertEqual(context['job']['frag_index'],
|
||||
context['node']['index'])
|
||||
expected_calls.append(mock.call(context['job'],
|
||||
context['available_map'],
|
||||
context['node']['index']))
|
||||
context['available_map']))
|
||||
else:
|
||||
self.assertFalse(context.get('include_non_durable'))
|
||||
|
||||
@ -4642,6 +4650,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
job = {
|
||||
'job_type': object_reconstructor.REVERT,
|
||||
'frag_index': frag_index,
|
||||
'primary_frag_index': None,
|
||||
'suffixes': [suffix],
|
||||
'sync_to': sync_to,
|
||||
'partition': partition,
|
||||
@ -4722,6 +4731,193 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
self.assertEqual(
|
||||
[], self.reconstructor.logger.logger.get_lines_for_level('error'))
|
||||
|
||||
def _make_frag(self, df, fi, ts_data):
|
||||
with df.create() as writer:
|
||||
test_data = b'test data'
|
||||
writer.write(test_data)
|
||||
metadata = {
|
||||
'X-Timestamp': ts_data.internal,
|
||||
'Content-Length': len(test_data),
|
||||
'Etag': md5(test_data, usedforsecurity=False).hexdigest(),
|
||||
'X-Object-Sysmeta-Ec-Frag-Index': fi,
|
||||
}
|
||||
writer.put(metadata)
|
||||
writer.commit(ts_data)
|
||||
|
||||
def _do_test_process_job_revert_cleanup_with_meta(self, frag_indexes,
|
||||
primary_frag_index):
|
||||
sync_to = [[dict(random.choice([n for n in self.policy.object_ring.devs
|
||||
if n != self.local_dev]),
|
||||
index=frag_index)] for frag_index in frag_indexes]
|
||||
partition = 0
|
||||
|
||||
part_path = os.path.join(self.devices, self.local_dev['device'],
|
||||
diskfile.get_data_dir(self.policy),
|
||||
str(partition))
|
||||
mkdirs(part_path)
|
||||
df_mgr = self.reconstructor._df_router[self.policy]
|
||||
df = df_mgr.get_diskfile(self.local_dev['device'], partition, 'a',
|
||||
'c', 'data-obj', policy=self.policy)
|
||||
|
||||
ts_data = self.ts()
|
||||
for frag_index in frag_indexes:
|
||||
self._make_frag(df, frag_index, ts_data)
|
||||
if primary_frag_index is not None:
|
||||
self._make_frag(df, primary_frag_index, ts_data)
|
||||
ts_meta = self.ts()
|
||||
df.write_metadata({'X-Timestamp': ts_meta.internal,
|
||||
'X-Object-Meta-Test': 'testing'})
|
||||
|
||||
ohash = os.path.basename(df._datadir)
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
|
||||
jobs = [{
|
||||
'job_type': object_reconstructor.REVERT,
|
||||
'frag_index': frag_index,
|
||||
'primary_frag_index': primary_frag_index,
|
||||
'suffixes': [suffix],
|
||||
'sync_to': sync_to[i],
|
||||
'partition': partition,
|
||||
'path': part_path,
|
||||
'hashes': {},
|
||||
'policy': self.policy,
|
||||
'local_dev': self.local_dev,
|
||||
'device': self.local_dev['device'],
|
||||
} for i, frag_index in enumerate(frag_indexes)]
|
||||
|
||||
ondisk_files_during_sync = []
|
||||
|
||||
def ssync_response_callback(*args):
|
||||
ondisk_files_during_sync.append(os.listdir(df._datadir))
|
||||
# success should not increment handoffs_remaining
|
||||
return True, {ohash: {'ts_data': ts_data, 'ts_meta': ts_meta}}
|
||||
|
||||
ssync_calls = []
|
||||
with mock_ssync_sender(ssync_calls,
|
||||
response_callback=ssync_response_callback):
|
||||
for job in jobs:
|
||||
self.reconstructor.process_job(job)
|
||||
|
||||
self.assertEqual(self.reconstructor.handoffs_remaining, 0)
|
||||
self.assertEqual(len(jobs), len(ssync_calls))
|
||||
self.assertEqual(len(jobs), len(ondisk_files_during_sync))
|
||||
# verify that the meta file is intact at startof every job/ssync call:
|
||||
# if it is removed at all, it should be removed in the *last* call
|
||||
for fileset in ondisk_files_during_sync:
|
||||
self.assertIn(ts_meta.internal + '.meta', fileset)
|
||||
return df
|
||||
|
||||
def test_process_job_revert_does_cleanup_meta_pure_handoff(self):
|
||||
# verify that danging meta files are cleaned up if the revert job is
|
||||
# for a pure handoff partition
|
||||
frag_index = random.randint(
|
||||
0, self.policy.ec_n_unique_fragments - 1)
|
||||
df = self._do_test_process_job_revert_cleanup_with_meta(
|
||||
frag_indexes=[frag_index], primary_frag_index=None)
|
||||
# hashpath has been removed
|
||||
self.assertFalse(os.path.exists(df._datadir))
|
||||
|
||||
extra_index = frag_index
|
||||
while extra_index == frag_index:
|
||||
extra_index = random.randint(
|
||||
0, self.policy.ec_n_unique_fragments - 1)
|
||||
df = self._do_test_process_job_revert_cleanup_with_meta(
|
||||
frag_indexes=[frag_index, extra_index], primary_frag_index=None)
|
||||
# hashpath has been removed
|
||||
self.assertFalse(os.path.exists(df._datadir))
|
||||
|
||||
def test_process_job_revert_does_not_cleanup_meta_also_primary(self):
|
||||
# verify that danging meta files are not cleaned up if the revert job
|
||||
# is for a handoff partition that is also a primary for another frag
|
||||
# index
|
||||
frag_index = random.randint(
|
||||
0, self.policy.ec_n_unique_fragments - 1)
|
||||
primary_frag_index = frag_index
|
||||
while primary_frag_index == frag_index:
|
||||
primary_frag_index = random.randint(
|
||||
0, self.policy.ec_n_unique_fragments - 1)
|
||||
df = self._do_test_process_job_revert_cleanup_with_meta(
|
||||
frag_indexes=[frag_index], primary_frag_index=primary_frag_index)
|
||||
# hashpath has not been removed
|
||||
self.assertTrue(os.path.exists(df._datadir))
|
||||
file_info = df._manager.cleanup_ondisk_files(df._datadir)
|
||||
self.maxDiff = None
|
||||
self.assertTrue('meta_file' in file_info)
|
||||
self.assertTrue(os.path.exists(file_info['meta_file']))
|
||||
self.assertTrue('data_info' in file_info)
|
||||
self.assertEqual(primary_frag_index,
|
||||
file_info['data_info']['frag_index'])
|
||||
self.assertTrue(os.path.exists(file_info['data_file']))
|
||||
# only the primary frag and meta file remain
|
||||
self.assertEqual(2, len(os.listdir(df._datadir)))
|
||||
|
||||
def test_process_job_revert_does_not_cleanup_meta_new_data(self):
|
||||
# verify that danging meta files are not cleaned up if the revert job
|
||||
# is for a pure handoff partition that has a newer data frag in
|
||||
# addition to the frag that was sync'd
|
||||
frag_index = 0
|
||||
extra_frag_index = 1
|
||||
sync_to = [dict(random.choice([n for n in self.policy.object_ring.devs
|
||||
if n != self.local_dev]),
|
||||
index=frag_index)]
|
||||
partition = 0
|
||||
|
||||
part_path = os.path.join(self.devices, self.local_dev['device'],
|
||||
diskfile.get_data_dir(self.policy),
|
||||
str(partition))
|
||||
mkdirs(part_path)
|
||||
df_mgr = self.reconstructor._df_router[self.policy]
|
||||
df = df_mgr.get_diskfile(self.local_dev['device'], partition, 'a',
|
||||
'c', 'data-obj', policy=self.policy)
|
||||
|
||||
ts_data0 = self.ts() # original frag
|
||||
ts_data1 = self.ts() # new one written during ssync
|
||||
self._make_frag(df, frag_index, ts_data0)
|
||||
ts_meta = self.ts()
|
||||
df.write_metadata({'X-Timestamp': ts_meta.internal,
|
||||
'X-Object-Meta-Test': 'testing'})
|
||||
|
||||
ohash = os.path.basename(df._datadir)
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
|
||||
job = {
|
||||
'job_type': object_reconstructor.REVERT,
|
||||
'frag_index': frag_index,
|
||||
'primary_frag_index': None,
|
||||
'suffixes': [suffix],
|
||||
'sync_to': sync_to,
|
||||
'partition': partition,
|
||||
'path': part_path,
|
||||
'hashes': {},
|
||||
'policy': self.policy,
|
||||
'local_dev': self.local_dev,
|
||||
'device': self.local_dev['device'],
|
||||
}
|
||||
|
||||
def ssync_response_callback(*args):
|
||||
# pretend that during the ssync call the original frag is replaced
|
||||
# by a newer one
|
||||
self._make_frag(df, extra_frag_index, ts_data1)
|
||||
return True, {ohash: {'ts_data': ts_data0, 'ts_meta': ts_meta}}
|
||||
|
||||
ssync_calls = []
|
||||
with mock_ssync_sender(ssync_calls,
|
||||
response_callback=ssync_response_callback):
|
||||
self.reconstructor.process_job(job)
|
||||
|
||||
self.assertEqual(1, len(ssync_calls))
|
||||
# hashpath has not been removed
|
||||
self.assertTrue(os.path.exists(df._datadir))
|
||||
file_info = df._manager.cleanup_ondisk_files(df._datadir)
|
||||
self.maxDiff = None
|
||||
self.assertIsNotNone(file_info['meta_file'])
|
||||
self.assertTrue(os.path.exists(file_info['meta_file']))
|
||||
self.assertTrue('data_info' in file_info)
|
||||
self.assertTrue(os.path.exists(file_info['data_file']))
|
||||
# only the newer frag and meta file remain
|
||||
self.assertEqual(2, len(os.listdir(df._datadir)))
|
||||
self.assertEqual(ts_data1, file_info['data_info']['timestamp'])
|
||||
|
||||
def test_process_job_revert_cleanup_tombstone(self):
|
||||
partition = 0
|
||||
sync_to = [random.choice([
|
||||
@ -4744,6 +4940,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
job = {
|
||||
'job_type': object_reconstructor.REVERT,
|
||||
'frag_index': None,
|
||||
'primary_frag_index': None,
|
||||
'suffixes': [suffix],
|
||||
'sync_to': sync_to,
|
||||
'partition': partition,
|
||||
|
Loading…
x
Reference in New Issue
Block a user