Merge "Refactor diskfile"
This commit is contained in:
commit
e21918bee8
File diff suppressed because it is too large
Load Diff
@ -37,8 +37,7 @@ from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
|
||||
from swift.obj import ssync_sender
|
||||
from swift.obj.diskfile import (DiskFileManager, get_hashes, get_data_dir,
|
||||
get_tmp_dir)
|
||||
from swift.obj.diskfile import DiskFileManager, get_data_dir, get_tmp_dir
|
||||
from swift.common.storage_policy import POLICIES, REPL_POLICY
|
||||
|
||||
|
||||
@ -332,7 +331,7 @@ class ObjectReplicator(Daemon):
|
||||
begin = time.time()
|
||||
try:
|
||||
hashed, local_hash = tpool_reraise(
|
||||
get_hashes, job['path'],
|
||||
self._diskfile_mgr._get_hashes, job['path'],
|
||||
do_listdir=(self.replication_count % 10) == 0,
|
||||
reclaim_age=self.reclaim_age)
|
||||
self.suffix_hash += hashed
|
||||
@ -377,7 +376,7 @@ class ObjectReplicator(Daemon):
|
||||
if not suffixes:
|
||||
continue
|
||||
hashed, recalc_hash = tpool_reraise(
|
||||
get_hashes,
|
||||
self._diskfile_mgr._get_hashes,
|
||||
job['path'], recalculate=suffixes,
|
||||
reclaim_age=self.reclaim_age)
|
||||
self.logger.update_stats('suffix.hashes', hashed)
|
||||
|
@ -22,12 +22,11 @@ import string
|
||||
from shutil import rmtree
|
||||
from hashlib import md5
|
||||
from tempfile import mkdtemp
|
||||
from test.unit import FakeLogger, patch_policies
|
||||
from test.unit import FakeLogger, patch_policies, make_timestamp_iter
|
||||
from swift.obj import auditor
|
||||
from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \
|
||||
get_data_dir, DiskFileManager, AuditLocation
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
storage_directory
|
||||
from swift.common.utils import mkdirs, normalize_timestamp, Timestamp
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
|
||||
|
||||
@ -432,28 +431,17 @@ class TestAuditor(unittest.TestCase):
|
||||
self.auditor.run_audit(**kwargs)
|
||||
self.assertTrue(os.path.isdir(quarantine_path))
|
||||
|
||||
def setup_bad_zero_byte(self, with_ts=False):
|
||||
def setup_bad_zero_byte(self, timestamp=None):
|
||||
if timestamp is None:
|
||||
timestamp = Timestamp(time.time())
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.auditor.log_time = 0
|
||||
ts_file_path = ''
|
||||
if with_ts:
|
||||
name_hash = hash_path('a', 'c', 'o')
|
||||
dir_path = os.path.join(
|
||||
self.devices, 'sda',
|
||||
storage_directory(get_data_dir(POLICIES[0]), '0', name_hash))
|
||||
ts_file_path = os.path.join(dir_path, '99999.ts')
|
||||
if not os.path.exists(dir_path):
|
||||
mkdirs(dir_path)
|
||||
fp = open(ts_file_path, 'w')
|
||||
write_metadata(fp, {'X-Timestamp': '99999', 'name': '/a/c/o'})
|
||||
fp.close()
|
||||
|
||||
etag = md5()
|
||||
with self.disk_file.create() as writer:
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': str(normalize_timestamp(time.time())),
|
||||
'X-Timestamp': timestamp.internal,
|
||||
'Content-Length': 10,
|
||||
}
|
||||
writer.put(metadata)
|
||||
@ -461,7 +449,6 @@ class TestAuditor(unittest.TestCase):
|
||||
etag = etag.hexdigest()
|
||||
metadata['ETag'] = etag
|
||||
write_metadata(writer._fd, metadata)
|
||||
return ts_file_path
|
||||
|
||||
def test_object_run_fast_track_all(self):
|
||||
self.setup_bad_zero_byte()
|
||||
@ -512,12 +499,36 @@ class TestAuditor(unittest.TestCase):
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.assertRaises(SystemExit, self.auditor.fork_child, self)
|
||||
|
||||
def test_with_tombstone(self):
|
||||
ts_file_path = self.setup_bad_zero_byte(with_ts=True)
|
||||
self.assertTrue(ts_file_path.endswith('ts'))
|
||||
def test_with_only_tombstone(self):
|
||||
# sanity check that auditor doesn't touch solitary tombstones
|
||||
ts_iter = make_timestamp_iter()
|
||||
self.setup_bad_zero_byte(timestamp=ts_iter.next())
|
||||
self.disk_file.delete(ts_iter.next())
|
||||
files = os.listdir(self.disk_file._datadir)
|
||||
self.assertEqual(1, len(files))
|
||||
self.assertTrue(files[0].endswith('ts'))
|
||||
kwargs = {'mode': 'once'}
|
||||
self.auditor.run_audit(**kwargs)
|
||||
self.assertTrue(os.path.exists(ts_file_path))
|
||||
files_after = os.listdir(self.disk_file._datadir)
|
||||
self.assertEqual(files, files_after)
|
||||
|
||||
def test_with_tombstone_and_data(self):
|
||||
# rsync replication could leave a tombstone and data file in object
|
||||
# dir - verify they are both removed during audit
|
||||
ts_iter = make_timestamp_iter()
|
||||
ts_tomb = ts_iter.next()
|
||||
ts_data = ts_iter.next()
|
||||
self.setup_bad_zero_byte(timestamp=ts_data)
|
||||
tomb_file_path = os.path.join(self.disk_file._datadir,
|
||||
'%s.ts' % ts_tomb.internal)
|
||||
with open(tomb_file_path, 'wb') as fd:
|
||||
write_metadata(fd, {'X-Timestamp': ts_tomb.internal})
|
||||
files = os.listdir(self.disk_file._datadir)
|
||||
self.assertEqual(2, len(files))
|
||||
self.assertTrue(os.path.basename(tomb_file_path) in files, files)
|
||||
kwargs = {'mode': 'once'}
|
||||
self.auditor.run_audit(**kwargs)
|
||||
self.assertFalse(os.path.exists(self.disk_file._datadir))
|
||||
|
||||
def test_sleeper(self):
|
||||
with mock.patch(
|
||||
|
@ -574,8 +574,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
with mock.patch('swift.obj.diskfile.time') as mock_time:
|
||||
# don't reclaim anything
|
||||
mock_time.time.return_value = 0.0
|
||||
mock_func = 'swift.obj.diskfile.DiskFileManager.get_dev_path'
|
||||
with mock.patch(mock_func) as mock_path:
|
||||
mocked = 'swift.obj.diskfile.BaseDiskFileManager.get_dev_path'
|
||||
with mock.patch(mocked) as mock_path:
|
||||
mock_path.return_value = dev_path
|
||||
for _ in class_under_test.yield_hashes(
|
||||
'ignored', '0', policy, suffixes=['abc']):
|
||||
@ -1019,6 +1019,39 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
||||
class_under_test.manager.get_ondisk_files, files,
|
||||
self.testdir)
|
||||
|
||||
def test_hash_cleanup_listdir_reclaim(self):
|
||||
# Each scenario specifies a list of (filename, extension, [survives])
|
||||
# tuples. If extension is set or 'survives' is True, the filename
|
||||
# should still be in the dir after cleanup.
|
||||
much_older = Timestamp(time() - 2000).internal
|
||||
older = Timestamp(time() - 1001).internal
|
||||
newer = Timestamp(time() - 900).internal
|
||||
scenarios = [[('%s.ts' % older, False, False)],
|
||||
|
||||
# fresh tombstone is preserved
|
||||
[('%s.ts' % newer, '.ts', True)],
|
||||
|
||||
# .data files are not reclaimed, ever
|
||||
[('%s.data' % older, '.data', True)],
|
||||
[('%s.data' % newer, '.data', True)],
|
||||
|
||||
# ... and we could have a mixture of fresh and stale .data
|
||||
[('%s.data' % newer, '.data', True),
|
||||
('%s.data' % older, False, False)],
|
||||
|
||||
# tombstone reclaimed despite newer data
|
||||
[('%s.data' % newer, '.data', True),
|
||||
('%s.data' % older, False, False),
|
||||
('%s.ts' % much_older, '.ts', False)],
|
||||
|
||||
# tombstone reclaimed despite junk file
|
||||
[('junk', False, True),
|
||||
('%s.ts' % much_older, '.ts', False)],
|
||||
]
|
||||
|
||||
self._test_hash_cleanup_listdir_files(scenarios, POLICIES.default,
|
||||
reclaim_age=1000)
|
||||
|
||||
def test_yield_hashes(self):
|
||||
old_ts = '1383180000.12345'
|
||||
fresh_ts = Timestamp(time() - 10).internal
|
||||
@ -1304,16 +1337,12 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
||||
[('%s#2.data' % newer, False, True),
|
||||
('%s#4.data' % older, False, False)],
|
||||
|
||||
# TODO these remaining scenarios exhibit different
|
||||
# behavior than the legacy replication DiskFileManager
|
||||
# behavior...
|
||||
|
||||
# tombstone reclaimed despite newer non-durable data
|
||||
[('%s#2.data' % newer, False, True),
|
||||
('%s#4.data' % older, False, False),
|
||||
('%s.ts' % much_older, '.ts', False)],
|
||||
|
||||
# tombstone reclaimed despite newer non-durable data
|
||||
# tombstone reclaimed despite much older durable
|
||||
[('%s.ts' % older, '.ts', False),
|
||||
('%s.durable' % much_older, False, False)],
|
||||
|
||||
@ -4019,14 +4048,7 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
for policy in self.iter_policies():
|
||||
file1, file2 = [self.ts().internal + '.meta' for i in range(2)]
|
||||
file_list = [file1, file2]
|
||||
if policy.policy_type == EC_POLICY:
|
||||
# EC policy does tolerate only .meta's in dir when cleaning up
|
||||
expected = [file2]
|
||||
else:
|
||||
# the get_ondisk_files contract validation doesn't allow a
|
||||
# directory with only .meta files
|
||||
expected = AssertionError()
|
||||
self.check_hash_cleanup_listdir(policy, file_list, expected)
|
||||
self.check_hash_cleanup_listdir(policy, file_list, [file2])
|
||||
|
||||
def test_hash_cleanup_listdir_ignore_orphaned_ts(self):
|
||||
for policy in self.iter_policies():
|
||||
@ -4060,13 +4082,7 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
file1 = Timestamp(old_float).internal + '.ts'
|
||||
file2 = Timestamp(time() + 2).internal + '.meta'
|
||||
file_list = [file1, file2]
|
||||
if policy.policy_type == EC_POLICY:
|
||||
# EC will clean up old .ts despite a .meta
|
||||
expected = [file2]
|
||||
else:
|
||||
# An orphaned .meta will not clean up a very old .ts
|
||||
expected = [file2, file1]
|
||||
self.check_hash_cleanup_listdir(policy, file_list, expected)
|
||||
self.check_hash_cleanup_listdir(policy, file_list, [file2])
|
||||
|
||||
def test_hash_cleanup_listdir_keep_single_old_data(self):
|
||||
for policy in self.iter_policies():
|
||||
@ -4131,13 +4147,7 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
file1 = self._datafilename(Timestamp(1), policy)
|
||||
file2 = '0000000002.00000.ts'
|
||||
file_list = [file1, file2]
|
||||
if policy.policy_type == EC_POLICY:
|
||||
# the .ts gets reclaimed up despite failed .data delete
|
||||
expected = []
|
||||
else:
|
||||
# the .ts isn't reclaimed because there were two files in dir
|
||||
expected = [file2]
|
||||
self.check_hash_cleanup_listdir(policy, file_list, expected)
|
||||
self.check_hash_cleanup_listdir(policy, file_list, [])
|
||||
|
||||
# invalidate_hash tests - behavior
|
||||
|
||||
@ -4241,14 +4251,12 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
old_time = time() - 1001
|
||||
timestamp = Timestamp(old_time)
|
||||
df.delete(timestamp.internal)
|
||||
tombstone_hash = md5(timestamp.internal + '.ts').hexdigest()
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
expected = {
|
||||
# repl is broken, it doesn't use self.reclaim_age
|
||||
REPL_POLICY: tombstone_hash,
|
||||
EC_POLICY: {},
|
||||
REPL_POLICY: {suffix: EMPTY_ETAG},
|
||||
EC_POLICY: {suffix: {}},
|
||||
}[policy.policy_type]
|
||||
self.assertEqual(hashes, {suffix: expected})
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
self.assertEqual(hashes, expected)
|
||||
|
||||
def test_hash_suffix_one_datafile(self):
|
||||
for policy in self.iter_policies():
|
||||
|
@ -1282,7 +1282,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
mount_check='false', timeout='300', stats_interval='1')
|
||||
replicator = object_replicator.ObjectReplicator(conf)
|
||||
was_connector = object_replicator.http_connect
|
||||
was_get_hashes = object_replicator.get_hashes
|
||||
was_get_hashes = object_replicator.DiskFileManager._get_hashes
|
||||
was_execute = tpool.execute
|
||||
self.get_hash_count = 0
|
||||
try:
|
||||
@ -1300,7 +1300,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
|
||||
self.i_failed = False
|
||||
object_replicator.http_connect = mock_http_connect(200)
|
||||
object_replicator.get_hashes = fake_get_hashes
|
||||
object_replicator.DiskFileManager._get_hashes = fake_get_hashes
|
||||
replicator.logger.exception = \
|
||||
lambda *args, **kwargs: fake_exc(self, *args, **kwargs)
|
||||
# Write some files into '1' and run replicate- they should be moved
|
||||
@ -1337,7 +1337,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.assertFalse(self.i_failed)
|
||||
finally:
|
||||
object_replicator.http_connect = was_connector
|
||||
object_replicator.get_hashes = was_get_hashes
|
||||
object_replicator.DiskFileManager._get_hashes = was_get_hashes
|
||||
tpool.execute = was_execute
|
||||
|
||||
def test_run(self):
|
||||
|
Loading…
Reference in New Issue
Block a user