Refactor diskfile
This patch mostly eliminates the duplicate code that was deliberately left in place during EC review to avoid major churn of the diskfile module prior to the kilo release. This focuses on obvious de-duplication and shuffling code between classes. It deliberately does not attempt to hammer out every last piece of de-duplication where that would introduce more complex changes - that can come later. Code is moved from the module level and from ECDiskFile* classes into new BaseDiskFile* classes. Concrete classes for replication and EC policy retain their existing names i.e. DiskFile[Manager|Writer|Reader|] and ECDiskFile[Manager|Writer|Reader|] respectively. Knock-on changes: - fix bug whereby get_hashes was ignoring self.reclaim_age and always using the default arg value. - replication diskfile manager now deletes a tombstone that is older than reclaim_age even when there is a newer .meta file. - replication diskfile manager will no longer raise an AssertionError if only a .meta file is found during hash_cleanup_listdir. - fix stale test in test_auditor.py: test_with_tombstone test setup was convoluted (probably dates back to when object puts did not clean up the object dir). Now that they do you have to try harder to create a dir with a tombstone and a data file. Change-Id: I963e0d0ae0d6569ad1de605034c529529cbb4f9a
This commit is contained in:
parent
5fda35c0b4
commit
bcd00d9461
File diff suppressed because it is too large
Load Diff
@ -37,8 +37,7 @@ from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
|
||||
from swift.obj import ssync_sender
|
||||
from swift.obj.diskfile import (DiskFileManager, get_hashes, get_data_dir,
|
||||
get_tmp_dir)
|
||||
from swift.obj.diskfile import DiskFileManager, get_data_dir, get_tmp_dir
|
||||
from swift.common.storage_policy import POLICIES, REPL_POLICY
|
||||
|
||||
|
||||
@ -332,7 +331,7 @@ class ObjectReplicator(Daemon):
|
||||
begin = time.time()
|
||||
try:
|
||||
hashed, local_hash = tpool_reraise(
|
||||
get_hashes, job['path'],
|
||||
self._diskfile_mgr._get_hashes, job['path'],
|
||||
do_listdir=(self.replication_count % 10) == 0,
|
||||
reclaim_age=self.reclaim_age)
|
||||
self.suffix_hash += hashed
|
||||
@ -377,7 +376,7 @@ class ObjectReplicator(Daemon):
|
||||
if not suffixes:
|
||||
continue
|
||||
hashed, recalc_hash = tpool_reraise(
|
||||
get_hashes,
|
||||
self._diskfile_mgr._get_hashes,
|
||||
job['path'], recalculate=suffixes,
|
||||
reclaim_age=self.reclaim_age)
|
||||
self.logger.update_stats('suffix.hashes', hashed)
|
||||
|
@ -22,12 +22,11 @@ import string
|
||||
from shutil import rmtree
|
||||
from hashlib import md5
|
||||
from tempfile import mkdtemp
|
||||
from test.unit import FakeLogger, patch_policies
|
||||
from test.unit import FakeLogger, patch_policies, make_timestamp_iter
|
||||
from swift.obj import auditor
|
||||
from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \
|
||||
get_data_dir, DiskFileManager, AuditLocation
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
storage_directory
|
||||
from swift.common.utils import mkdirs, normalize_timestamp, Timestamp
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
|
||||
|
||||
@ -432,28 +431,17 @@ class TestAuditor(unittest.TestCase):
|
||||
self.auditor.run_audit(**kwargs)
|
||||
self.assertTrue(os.path.isdir(quarantine_path))
|
||||
|
||||
def setup_bad_zero_byte(self, with_ts=False):
|
||||
def setup_bad_zero_byte(self, timestamp=None):
|
||||
if timestamp is None:
|
||||
timestamp = Timestamp(time.time())
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.auditor.log_time = 0
|
||||
ts_file_path = ''
|
||||
if with_ts:
|
||||
name_hash = hash_path('a', 'c', 'o')
|
||||
dir_path = os.path.join(
|
||||
self.devices, 'sda',
|
||||
storage_directory(get_data_dir(POLICIES[0]), '0', name_hash))
|
||||
ts_file_path = os.path.join(dir_path, '99999.ts')
|
||||
if not os.path.exists(dir_path):
|
||||
mkdirs(dir_path)
|
||||
fp = open(ts_file_path, 'w')
|
||||
write_metadata(fp, {'X-Timestamp': '99999', 'name': '/a/c/o'})
|
||||
fp.close()
|
||||
|
||||
etag = md5()
|
||||
with self.disk_file.create() as writer:
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': str(normalize_timestamp(time.time())),
|
||||
'X-Timestamp': timestamp.internal,
|
||||
'Content-Length': 10,
|
||||
}
|
||||
writer.put(metadata)
|
||||
@ -461,7 +449,6 @@ class TestAuditor(unittest.TestCase):
|
||||
etag = etag.hexdigest()
|
||||
metadata['ETag'] = etag
|
||||
write_metadata(writer._fd, metadata)
|
||||
return ts_file_path
|
||||
|
||||
def test_object_run_fast_track_all(self):
|
||||
self.setup_bad_zero_byte()
|
||||
@ -512,12 +499,36 @@ class TestAuditor(unittest.TestCase):
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.assertRaises(SystemExit, self.auditor.fork_child, self)
|
||||
|
||||
def test_with_tombstone(self):
|
||||
ts_file_path = self.setup_bad_zero_byte(with_ts=True)
|
||||
self.assertTrue(ts_file_path.endswith('ts'))
|
||||
def test_with_only_tombstone(self):
|
||||
# sanity check that auditor doesn't touch solitary tombstones
|
||||
ts_iter = make_timestamp_iter()
|
||||
self.setup_bad_zero_byte(timestamp=ts_iter.next())
|
||||
self.disk_file.delete(ts_iter.next())
|
||||
files = os.listdir(self.disk_file._datadir)
|
||||
self.assertEqual(1, len(files))
|
||||
self.assertTrue(files[0].endswith('ts'))
|
||||
kwargs = {'mode': 'once'}
|
||||
self.auditor.run_audit(**kwargs)
|
||||
self.assertTrue(os.path.exists(ts_file_path))
|
||||
files_after = os.listdir(self.disk_file._datadir)
|
||||
self.assertEqual(files, files_after)
|
||||
|
||||
def test_with_tombstone_and_data(self):
|
||||
# rsync replication could leave a tombstone and data file in object
|
||||
# dir - verify they are both removed during audit
|
||||
ts_iter = make_timestamp_iter()
|
||||
ts_tomb = ts_iter.next()
|
||||
ts_data = ts_iter.next()
|
||||
self.setup_bad_zero_byte(timestamp=ts_data)
|
||||
tomb_file_path = os.path.join(self.disk_file._datadir,
|
||||
'%s.ts' % ts_tomb.internal)
|
||||
with open(tomb_file_path, 'wb') as fd:
|
||||
write_metadata(fd, {'X-Timestamp': ts_tomb.internal})
|
||||
files = os.listdir(self.disk_file._datadir)
|
||||
self.assertEqual(2, len(files))
|
||||
self.assertTrue(os.path.basename(tomb_file_path) in files, files)
|
||||
kwargs = {'mode': 'once'}
|
||||
self.auditor.run_audit(**kwargs)
|
||||
self.assertFalse(os.path.exists(self.disk_file._datadir))
|
||||
|
||||
def test_sleeper(self):
|
||||
with mock.patch(
|
||||
|
@ -574,8 +574,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
with mock.patch('swift.obj.diskfile.time') as mock_time:
|
||||
# don't reclaim anything
|
||||
mock_time.time.return_value = 0.0
|
||||
mock_func = 'swift.obj.diskfile.DiskFileManager.get_dev_path'
|
||||
with mock.patch(mock_func) as mock_path:
|
||||
mocked = 'swift.obj.diskfile.BaseDiskFileManager.get_dev_path'
|
||||
with mock.patch(mocked) as mock_path:
|
||||
mock_path.return_value = dev_path
|
||||
for _ in class_under_test.yield_hashes(
|
||||
'ignored', '0', policy, suffixes=['abc']):
|
||||
@ -1019,6 +1019,39 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
||||
class_under_test.manager.get_ondisk_files, files,
|
||||
self.testdir)
|
||||
|
||||
def test_hash_cleanup_listdir_reclaim(self):
|
||||
# Each scenario specifies a list of (filename, extension, [survives])
|
||||
# tuples. If extension is set or 'survives' is True, the filename
|
||||
# should still be in the dir after cleanup.
|
||||
much_older = Timestamp(time() - 2000).internal
|
||||
older = Timestamp(time() - 1001).internal
|
||||
newer = Timestamp(time() - 900).internal
|
||||
scenarios = [[('%s.ts' % older, False, False)],
|
||||
|
||||
# fresh tombstone is preserved
|
||||
[('%s.ts' % newer, '.ts', True)],
|
||||
|
||||
# .data files are not reclaimed, ever
|
||||
[('%s.data' % older, '.data', True)],
|
||||
[('%s.data' % newer, '.data', True)],
|
||||
|
||||
# ... and we could have a mixture of fresh and stale .data
|
||||
[('%s.data' % newer, '.data', True),
|
||||
('%s.data' % older, False, False)],
|
||||
|
||||
# tombstone reclaimed despite newer data
|
||||
[('%s.data' % newer, '.data', True),
|
||||
('%s.data' % older, False, False),
|
||||
('%s.ts' % much_older, '.ts', False)],
|
||||
|
||||
# tombstone reclaimed despite junk file
|
||||
[('junk', False, True),
|
||||
('%s.ts' % much_older, '.ts', False)],
|
||||
]
|
||||
|
||||
self._test_hash_cleanup_listdir_files(scenarios, POLICIES.default,
|
||||
reclaim_age=1000)
|
||||
|
||||
def test_yield_hashes(self):
|
||||
old_ts = '1383180000.12345'
|
||||
fresh_ts = Timestamp(time() - 10).internal
|
||||
@ -1304,16 +1337,12 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
||||
[('%s#2.data' % newer, False, True),
|
||||
('%s#4.data' % older, False, False)],
|
||||
|
||||
# TODO these remaining scenarios exhibit different
|
||||
# behavior than the legacy replication DiskFileManager
|
||||
# behavior...
|
||||
|
||||
# tombstone reclaimed despite newer non-durable data
|
||||
[('%s#2.data' % newer, False, True),
|
||||
('%s#4.data' % older, False, False),
|
||||
('%s.ts' % much_older, '.ts', False)],
|
||||
|
||||
# tombstone reclaimed despite newer non-durable data
|
||||
# tombstone reclaimed despite much older durable
|
||||
[('%s.ts' % older, '.ts', False),
|
||||
('%s.durable' % much_older, False, False)],
|
||||
|
||||
@ -4019,14 +4048,7 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
for policy in self.iter_policies():
|
||||
file1, file2 = [self.ts().internal + '.meta' for i in range(2)]
|
||||
file_list = [file1, file2]
|
||||
if policy.policy_type == EC_POLICY:
|
||||
# EC policy does tolerate only .meta's in dir when cleaning up
|
||||
expected = [file2]
|
||||
else:
|
||||
# the get_ondisk_files contract validation doesn't allow a
|
||||
# directory with only .meta files
|
||||
expected = AssertionError()
|
||||
self.check_hash_cleanup_listdir(policy, file_list, expected)
|
||||
self.check_hash_cleanup_listdir(policy, file_list, [file2])
|
||||
|
||||
def test_hash_cleanup_listdir_ignore_orphaned_ts(self):
|
||||
for policy in self.iter_policies():
|
||||
@ -4060,13 +4082,7 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
file1 = Timestamp(old_float).internal + '.ts'
|
||||
file2 = Timestamp(time() + 2).internal + '.meta'
|
||||
file_list = [file1, file2]
|
||||
if policy.policy_type == EC_POLICY:
|
||||
# EC will clean up old .ts despite a .meta
|
||||
expected = [file2]
|
||||
else:
|
||||
# An orphaned .meta will not clean up a very old .ts
|
||||
expected = [file2, file1]
|
||||
self.check_hash_cleanup_listdir(policy, file_list, expected)
|
||||
self.check_hash_cleanup_listdir(policy, file_list, [file2])
|
||||
|
||||
def test_hash_cleanup_listdir_keep_single_old_data(self):
|
||||
for policy in self.iter_policies():
|
||||
@ -4131,13 +4147,7 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
file1 = self._datafilename(Timestamp(1), policy)
|
||||
file2 = '0000000002.00000.ts'
|
||||
file_list = [file1, file2]
|
||||
if policy.policy_type == EC_POLICY:
|
||||
# the .ts gets reclaimed up despite failed .data delete
|
||||
expected = []
|
||||
else:
|
||||
# the .ts isn't reclaimed because there were two files in dir
|
||||
expected = [file2]
|
||||
self.check_hash_cleanup_listdir(policy, file_list, expected)
|
||||
self.check_hash_cleanup_listdir(policy, file_list, [])
|
||||
|
||||
# invalidate_hash tests - behavior
|
||||
|
||||
@ -4241,14 +4251,12 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
old_time = time() - 1001
|
||||
timestamp = Timestamp(old_time)
|
||||
df.delete(timestamp.internal)
|
||||
tombstone_hash = md5(timestamp.internal + '.ts').hexdigest()
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
expected = {
|
||||
# repl is broken, it doesn't use self.reclaim_age
|
||||
REPL_POLICY: tombstone_hash,
|
||||
EC_POLICY: {},
|
||||
REPL_POLICY: {suffix: EMPTY_ETAG},
|
||||
EC_POLICY: {suffix: {}},
|
||||
}[policy.policy_type]
|
||||
self.assertEqual(hashes, {suffix: expected})
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
self.assertEqual(hashes, expected)
|
||||
|
||||
def test_hash_suffix_one_datafile(self):
|
||||
for policy in self.iter_policies():
|
||||
|
@ -1282,7 +1282,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
mount_check='false', timeout='300', stats_interval='1')
|
||||
replicator = object_replicator.ObjectReplicator(conf)
|
||||
was_connector = object_replicator.http_connect
|
||||
was_get_hashes = object_replicator.get_hashes
|
||||
was_get_hashes = object_replicator.DiskFileManager._get_hashes
|
||||
was_execute = tpool.execute
|
||||
self.get_hash_count = 0
|
||||
try:
|
||||
@ -1300,7 +1300,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
|
||||
self.i_failed = False
|
||||
object_replicator.http_connect = mock_http_connect(200)
|
||||
object_replicator.get_hashes = fake_get_hashes
|
||||
object_replicator.DiskFileManager._get_hashes = fake_get_hashes
|
||||
replicator.logger.exception = \
|
||||
lambda *args, **kwargs: fake_exc(self, *args, **kwargs)
|
||||
# Write some files into '1' and run replicate- they should be moved
|
||||
@ -1337,7 +1337,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.assertFalse(self.i_failed)
|
||||
finally:
|
||||
object_replicator.http_connect = was_connector
|
||||
object_replicator.get_hashes = was_get_hashes
|
||||
object_replicator.DiskFileManager._get_hashes = was_get_hashes
|
||||
tpool.execute = was_execute
|
||||
|
||||
def test_run(self):
|
||||
|
Loading…
Reference in New Issue
Block a user