Merge "Remove keep_data_fp argument from DiskFile constructor"

This commit is contained in:
Jenkins 2013-09-11 22:52:52 +00:00 committed by Gerrit Code Review
commit bfcf72a3f0
6 changed files with 157 additions and 87 deletions

View File

@ -38,6 +38,10 @@ class DiskFileError(SwiftException):
pass
class DiskFileNotOpenError(DiskFileError):
pass
class DiskFileCollision(DiskFileError):
pass

View File

@ -166,8 +166,8 @@ class AuditorWorker(object):
raise AuditException('Error when reading metadata: %s' % exc)
_junk, account, container, obj = name.split('/', 3)
df = diskfile.DiskFile(self.devices, device, partition,
account, container, obj, self.logger,
keep_data_fp=True)
account, container, obj, self.logger)
df.open()
try:
try:
obj_size = df.get_data_file_size()

View File

@ -38,7 +38,7 @@ from swift.common.utils import mkdirs, normalize_timestamp, \
fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle
from swift.common.exceptions import DiskFileError, DiskFileNotExist, \
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
PathNotDir
PathNotDir, DiskFileNotOpenError
from swift.common.swob import multi_range_iterator
@ -342,7 +342,6 @@ class DiskWriter(object):
self.threadpool.force_run_in_thread(
self._finalize_put, metadata, target_path)
self.disk_file.metadata = metadata
class DiskFile(object):
@ -355,19 +354,17 @@ class DiskFile(object):
:param account: account name for the object
:param container: container name for the object
:param obj: object name for the object
:param keep_data_fp: if True, don't close the fp, otherwise close it
:param disk_chunk_size: size of chunks on file reads
:param bytes_per_sync: number of bytes between fdatasync calls
:param iter_hook: called when __iter__ returns a chunk
:param threadpool: thread pool in which to do blocking operations
:raises DiskFileCollision: on md5 collision
"""
def __init__(self, path, device, partition, account, container, obj,
logger, keep_data_fp=False, disk_chunk_size=65536,
bytes_per_sync=(512 * 1024 * 1024), iter_hook=None,
threadpool=None, obj_dir='objects', mount_check=False):
logger, disk_chunk_size=65536,
bytes_per_sync=(512 * 1024 * 1024),
iter_hook=None, threadpool=None, obj_dir='objects',
mount_check=False):
if mount_check and not check_mount(path, device):
raise DiskFileDeviceUnavailable()
self.disk_chunk_size = disk_chunk_size
@ -380,27 +377,50 @@ class DiskFile(object):
self.device_path = join(path, device)
self.tmpdir = join(path, device, 'tmp')
self.logger = logger
self._metadata = {}
self._metadata = None
self.data_file = None
self._data_file_size = None
self.fp = None
self.iter_etag = None
self.started_at_0 = False
self.read_to_eof = False
self.quarantined_dir = None
self.keep_cache = False
self.suppress_file_closing = False
self._verify_close = False
self.threadpool = threadpool or ThreadPool(nthreads=0)
# FIXME(clayg): this attribute is set after open and affects the
# behavior of the class (i.e. public interface)
self.keep_cache = False
def open(self, verify_close=False):
"""
Open the file and read the metadata.
This method must populate the _metadata attribute.
:param verify_close: force implicit close to verify_file, no effect on
explicit close.
:raises DiskFileCollision: on md5 collision
"""
data_file, meta_file, ts_file = self._get_ondisk_file()
if not data_file:
if ts_file:
self._construct_from_ts_file(ts_file)
else:
fp = self._construct_from_data_file(data_file, meta_file)
if keep_data_fp:
self.fp = fp
else:
fp.close()
self.fp = self._construct_from_data_file(data_file, meta_file)
self._verify_close = verify_close
self._metadata = self._metadata or {}
return self
def __enter__(self):
if self._metadata is None:
raise DiskFileNotOpenError()
return self
def __exit__(self, t, v, tb):
self.close(verify_file=self._verify_close)
def _get_ondisk_file(self):
"""
@ -508,6 +528,8 @@ class DiskFile(object):
def __iter__(self):
"""Returns an iterator over the data file."""
if self.fp is None:
raise DiskFileNotOpenError()
try:
dropped_cache = 0
read = 0
@ -609,6 +631,9 @@ class DiskFile(object):
finally:
self.fp.close()
self.fp = None
self._metadata = None
self._data_file_size = None
self._verify_close = False
def get_metadata(self):
"""
@ -616,6 +641,8 @@ class DiskFile(object):
:returns: object's metadata dictionary
"""
if self._metadata is None:
raise DiskFileNotOpenError()
return self._metadata
def is_deleted(self):
@ -715,13 +742,20 @@ class DiskFile(object):
:raises DiskFileError: on file size mismatch.
:raises DiskFileNotExist: on file not existing (including deleted)
"""
if self._data_file_size is None:
self._data_file_size = self._get_data_file_size()
return self._data_file_size
def _get_data_file_size(self):
# ensure file is opened
metadata = self.get_metadata()
try:
file_size = 0
if self.data_file:
file_size = self.threadpool.run_in_thread(
getsize, self.data_file)
if 'Content-Length' in self._metadata:
metadata_size = int(self._metadata['Content-Length'])
if 'Content-Length' in metadata:
metadata_size = int(metadata['Content-Length'])
if file_size != metadata_size:
raise DiskFileError(
'Content-Length of %s does not match file size '

View File

@ -298,14 +298,15 @@ class ObjectController(object):
obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request)
try:
disk_file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
disk_file.quarantine()
return HTTPNotFound(request=request)
orig_metadata = disk_file.get_metadata()
with disk_file.open():
if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request)
try:
disk_file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
disk_file.quarantine()
return HTTPNotFound(request=request)
orig_metadata = disk_file.get_metadata()
orig_timestamp = orig_metadata.get('X-Timestamp', '0')
if orig_timestamp >= request.headers['x-timestamp']:
return HTTPConflict(request=request)
@ -355,7 +356,8 @@ class ObjectController(object):
obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
orig_metadata = disk_file.get_metadata()
with disk_file.open():
orig_metadata = disk_file.get_metadata()
old_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
orig_timestamp = orig_metadata.get('X-Timestamp')
if orig_timestamp and orig_timestamp >= request.headers['x-timestamp']:
@ -432,9 +434,10 @@ class ObjectController(object):
split_and_validate_path(request, 5, 5, True)
try:
disk_file = self._diskfile(device, partition, account, container,
obj, keep_data_fp=True, iter_hook=sleep)
obj, iter_hook=sleep)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
disk_file.open()
if disk_file.is_deleted() or disk_file.is_expired():
if request.headers.get('if-match') == '*':
return HTTPPreconditionFailed(request=request)
@ -510,15 +513,16 @@ class ObjectController(object):
obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request)
try:
file_size = disk_file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
disk_file.quarantine()
return HTTPNotFound(request=request)
with disk_file.open():
if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request)
try:
file_size = disk_file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
disk_file.quarantine()
return HTTPNotFound(request=request)
metadata = disk_file.get_metadata()
response = Response(request=request, conditional_response=True)
metadata = disk_file.get_metadata()
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
@ -549,7 +553,10 @@ class ObjectController(object):
obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
orig_metadata = disk_file.get_metadata()
with disk_file.open():
orig_metadata = disk_file.get_metadata()
is_deleted = disk_file.is_deleted()
is_expired = disk_file.is_expired()
if 'x-if-delete-at' in request.headers and \
int(request.headers['x-if-delete-at']) != \
int(orig_metadata.get('X-Delete-At') or 0):
@ -562,7 +569,7 @@ class ObjectController(object):
container, obj, request, device)
orig_timestamp = orig_metadata.get('X-Timestamp', 0)
req_timestamp = request.headers['X-Timestamp']
if disk_file.is_deleted() or disk_file.is_expired():
if is_deleted or is_expired:
response_class = HTTPNotFound
else:
if orig_timestamp < req_timestamp:

View File

@ -377,14 +377,15 @@ class TestDiskFile(unittest.TestCase):
setxattr(f.fileno(), diskfile.METADATA_KEY,
pickle.dumps(md, diskfile.PICKLE_PROTOCOL))
def _create_test_file(self, data, keep_data_fp=True, timestamp=None):
def _create_test_file(self, data, timestamp=None):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
if timestamp is None:
timestamp = time()
self._create_ondisk_file(df, data, timestamp)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=keep_data_fp)
FakeLogger())
df.open()
return df
def test_get_metadata(self):
@ -397,33 +398,37 @@ class TestDiskFile(unittest.TestCase):
orig_metadata = {'X-Object-Meta-Key1': 'Value1',
'Content-Type': 'text/garbage'}
df = self._get_disk_file(ts=41, extra_metadata=orig_metadata)
self.assertEquals('1024', df._metadata['Content-Length'])
with df.open():
self.assertEquals('1024', df._metadata['Content-Length'])
# write some new metadata (fast POST, don't send orig meta, ts 42)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
df.put_metadata({'X-Timestamp': '42', 'X-Object-Meta-Key2': 'Value2'})
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
# non-fast-post updateable keys are preserved
self.assertEquals('text/garbage', df._metadata['Content-Type'])
# original fast-post updateable keys are removed
self.assert_('X-Object-Meta-Key1' not in df._metadata)
# new fast-post updateable keys are added
self.assertEquals('Value2', df._metadata['X-Object-Meta-Key2'])
with df.open():
# non-fast-post updateable keys are preserved
self.assertEquals('text/garbage', df._metadata['Content-Type'])
# original fast-post updateable keys are removed
self.assert_('X-Object-Meta-Key1' not in df._metadata)
# new fast-post updateable keys are added
self.assertEquals('Value2', df._metadata['X-Object-Meta-Key2'])
def test_disk_file_app_iter_corners(self):
df = self._create_test_file('1234567890')
self.assertEquals(''.join(df.app_iter_range(0, None)), '1234567890')
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
self.assertEqual(''.join(df.app_iter_range(5, None)), '67890')
FakeLogger())
with df.open():
self.assertEqual(''.join(df.app_iter_range(5, None)), '67890')
def test_disk_file_app_iter_partial_closes(self):
df = self._create_test_file('1234567890')
it = df.app_iter_range(0, 5)
self.assertEqual(''.join(it), '12345')
self.assertEqual(df.fp, None)
with df.open():
it = df.app_iter_range(0, 5)
self.assertEqual(''.join(it), '12345')
self.assertEqual(df.fp, None)
def test_disk_file_app_iter_ranges(self):
df = self._create_test_file('012345678911234567892123456789')
@ -482,10 +487,11 @@ class TestDiskFile(unittest.TestCase):
self.assertEqual(''.join(it), '')
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
it = df.app_iter_ranges(None, 'app/something',
'\r\n--someheader\r\n', 150)
self.assertEqual(''.join(it), '')
FakeLogger())
with df.open():
it = df.app_iter_ranges(None, 'app/something',
'\r\n--someheader\r\n', 150)
self.assertEqual(''.join(it), '')
def test_disk_file_mkstemp_creates_dir(self):
tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
@ -501,8 +507,9 @@ class TestDiskFile(unittest.TestCase):
hook_call_count[0] += 1
df = self._get_disk_file(fsize=65, csize=8, iter_hook=hook)
for _ in df:
pass
with df.open():
for _ in df:
pass
self.assertEquals(hook_call_count[0], 9)
@ -525,7 +532,8 @@ class TestDiskFile(unittest.TestCase):
# have to remake the datadir and file
self._create_ondisk_file(df, '', time()) # still empty
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
FakeLogger())
df.open()
double_uuid_path = df.quarantine()
self.assert_(os.path.isdir(double_uuid_path))
self.assert_('-' in os.path.basename(double_uuid_path))
@ -573,10 +581,10 @@ class TestDiskFile(unittest.TestCase):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
obj_name, FakeLogger(),
keep_data_fp=True, disk_chunk_size=csize,
disk_chunk_size=csize,
iter_hook=iter_hook, mount_check=mount_check)
df.open()
if invalid_type == 'Zero-Byte':
os.remove(df.data_file)
fp = open(df.data_file, 'w')
fp.close()
df.unit_test_len = fsize
@ -694,8 +702,9 @@ class TestDiskFile(unittest.TestCase):
df = self._get_disk_file(fsize=1024 * 2)
df._handle_close_quarantine = err
for chunk in df:
pass
with df.open():
for chunk in df:
pass
# close is called at the end of the iterator
self.assertEquals(df.fp, None)
self.assertEquals(len(df.logger.log_dict['error']), 1)
@ -726,10 +735,12 @@ class TestDiskFile(unittest.TestCase):
self._create_ondisk_file(df, 'A', ext='.data', timestamp=5)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'], normalize_timestamp(10))
self.assertTrue('deleted' in df._metadata)
self.assertTrue(df._metadata['deleted'])
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'],
normalize_timestamp(10))
self.assertTrue('deleted' in df._metadata)
self.assertTrue(df._metadata['deleted'])
def test_ondisk_search_loop_meta_ts_data(self):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
@ -742,9 +753,11 @@ class TestDiskFile(unittest.TestCase):
self._create_ondisk_file(df, 'A', ext='.data', timestamp=5)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'], normalize_timestamp(8))
self.assertTrue('deleted' in df._metadata)
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'],
normalize_timestamp(8))
self.assertTrue('deleted' in df._metadata)
def test_ondisk_search_loop_meta_data_ts(self):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
@ -757,9 +770,11 @@ class TestDiskFile(unittest.TestCase):
self._create_ondisk_file(df, '', ext='.ts', timestamp=5)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'], normalize_timestamp(10))
self.assertTrue('deleted' not in df._metadata)
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'],
normalize_timestamp(10))
self.assertTrue('deleted' not in df._metadata)
def test_ondisk_search_loop_data_meta_ts(self):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
@ -772,9 +787,11 @@ class TestDiskFile(unittest.TestCase):
self._create_ondisk_file(df, '', ext='.meta', timestamp=5)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'], normalize_timestamp(10))
self.assertTrue('deleted' not in df._metadata)
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'],
normalize_timestamp(10))
self.assertTrue('deleted' not in df._metadata)
def test_ondisk_search_loop_wayward_files_ignored(self):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
@ -788,9 +805,11 @@ class TestDiskFile(unittest.TestCase):
self._create_ondisk_file(df, '', ext='.meta', timestamp=5)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'], normalize_timestamp(10))
self.assertTrue('deleted' not in df._metadata)
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'],
normalize_timestamp(10))
self.assertTrue('deleted' not in df._metadata)
def test_ondisk_search_loop_listdir_error(self):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
@ -807,8 +826,9 @@ class TestDiskFile(unittest.TestCase):
self._create_ondisk_file(df, '', ext='.ts', timestamp=7)
self._create_ondisk_file(df, '', ext='.meta', timestamp=6)
self._create_ondisk_file(df, '', ext='.meta', timestamp=5)
self.assertRaises(OSError, diskfile.DiskFile, self.testdir, 'sda1',
'0', 'a', 'c', 'o', FakeLogger())
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
self.assertRaises(OSError, df.open)
def test_exception_in_handle_close_quarantine(self):
df = self._get_disk_file()

View File

@ -361,7 +361,8 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
FakeLogger())
objfile.open()
file_name = os.path.basename(objfile.data_file)
with open(objfile.data_file) as fp:
@ -718,7 +719,8 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
FakeLogger())
objfile.open()
file_name = os.path.basename(objfile.data_file)
with open(objfile.data_file) as fp:
@ -1016,7 +1018,8 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
FakeLogger())
objfile.open()
file_name = os.path.basename(objfile.data_file)
etag = md5()
etag.update('VERIF')
@ -1048,7 +1051,8 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
FakeLogger())
objfile.open()
file_name = os.path.basename(objfile.data_file)
with open(objfile.data_file) as fp:
metadata = diskfile.read_metadata(fp)
@ -1076,7 +1080,8 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
FakeLogger())
objfile.open()
file_name = os.path.basename(objfile.data_file)
etag = md5()
etag.update('VERIF')