Merge "Push fallocate() down into mkstemp(); use known size"

This commit is contained in:
Jenkins 2013-04-29 04:44:26 +00:00 committed by Gerrit Code Review
commit 58259df8df
4 changed files with 92 additions and 76 deletions

View File

@ -50,6 +50,10 @@ class DiskFileNotExist(SwiftException):
pass
class DiskFileNoSpace(SwiftException):
pass
class PathNotDir(OSError):
pass

View File

@ -38,7 +38,7 @@ from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, check_mount, \
check_float, check_utf8
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
DiskFileNotExist, DiskFileCollision
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace
from swift.obj.replicator import tpool_reraise, invalidate_hash, \
quarantine_renamer, get_hashes
from swift.common.http import is_success
@ -284,12 +284,22 @@ class DiskFile(object):
int(self.metadata['X-Delete-At']) <= time.time())
@contextmanager
def mkstemp(self):
"""Contextmanager to make a temporary file."""
def mkstemp(self, size=None):
"""
Contextmanager to make a temporary file.
:param size: optional initial size of file to allocate on disk
:raises DiskFileNoSpace: if a size is specified and fallocate fails
"""
if not os.path.exists(self.tmpdir):
mkdirs(self.tmpdir)
fd, self.tmppath = mkstemp(dir=self.tmpdir)
try:
if size is not None and size > 0:
try:
fallocate(fd, size)
except OSError:
raise DiskFileNoSpace()
yield fd
finally:
try:
@ -302,13 +312,14 @@ class DiskFile(object):
except OSError:
pass
def put(self, fd, metadata, extension='.data'):
def put(self, fd, fsize, metadata, extension='.data'):
"""
Finalize writing the file on disk, and renames it from the temp file to
the real location. This should be called after the data has been
written to the temp file.
:param fd: file descriptor of the temp file
:param fsize: final on-disk size of the created file
:param metadata: dictionary of metadata to be written
:param extension: extension to be used when making the file
"""
@ -322,11 +333,10 @@ class DiskFile(object):
# redundant work the drop cache code will perform on the pages (now
# that after fsync the pages will be all clean).
tpool.execute(fsync, fd)
if 'Content-Length' in metadata:
# From the Department of the Redundancy Department, make sure we
# call drop_cache() after fsync() to avoid redundant work (pages
# all clean).
self.drop_cache(fd, 0, int(metadata['Content-Length']))
# From the Department of the Redundancy Department, make sure we
# call drop_cache() after fsync() to avoid redundant work (pages
# all clean).
self.drop_cache(fd, 0, fsize)
invalidate_hash(os.path.dirname(self.datadir))
# After the rename completes, this object will be available for other
# requests to reference.
@ -343,7 +353,7 @@ class DiskFile(object):
"""
extension = '.ts' if tombstone else '.meta'
with self.mkstemp() as fd:
self.put(fd, metadata, extension=extension)
self.put(fd, 0, metadata, extension=extension)
def unlinkold(self, timestamp):
"""
@ -660,68 +670,70 @@ class ObjectController(object):
orig_timestamp = file.metadata.get('X-Timestamp')
upload_expiration = time.time() + self.max_upload_time
etag = md5()
fsize = request.headers.get('content-length', None)
if fsize is not None:
fsize = int(fsize)
upload_size = 0
last_sync = 0
elapsed_time = 0
with file.mkstemp() as fd:
try:
fallocate(fd, int(request.headers.get('content-length', 0)))
except OSError:
return HTTPInsufficientStorage(drive=device, request=request)
reader = request.environ['wsgi.input'].read
for chunk in iter(lambda: reader(self.network_chunk_size), ''):
start_time = time.time()
upload_size += len(chunk)
if time.time() > upload_expiration:
self.logger.increment('PUT.timeouts')
return HTTPRequestTimeout(request=request)
etag.update(chunk)
while chunk:
written = os.write(fd, chunk)
chunk = chunk[written:]
# For large files sync every 512MB (by default) written
if upload_size - last_sync >= self.bytes_per_sync:
tpool.execute(fdatasync, fd)
drop_buffer_cache(fd, last_sync, upload_size - last_sync)
last_sync = upload_size
sleep()
elapsed_time += time.time() - start_time
try:
with file.mkstemp(size=fsize) as fd:
reader = request.environ['wsgi.input'].read
for chunk in iter(lambda: reader(self.network_chunk_size), ''):
start_time = time.time()
upload_size += len(chunk)
if time.time() > upload_expiration:
self.logger.increment('PUT.timeouts')
return HTTPRequestTimeout(request=request)
etag.update(chunk)
while chunk:
written = os.write(fd, chunk)
chunk = chunk[written:]
# For large files sync every 512MB (by default) written
if upload_size - last_sync >= self.bytes_per_sync:
tpool.execute(fdatasync, fd)
drop_buffer_cache(fd, last_sync,
upload_size - last_sync)
last_sync = upload_size
sleep()
elapsed_time += time.time() - start_time
if upload_size:
self.logger.transfer_rate(
'PUT.' + device + '.timing', elapsed_time, upload_size)
if upload_size:
self.logger.transfer_rate(
'PUT.' + device + '.timing', elapsed_time, upload_size)
if 'content-length' in request.headers and \
int(request.headers['content-length']) != upload_size:
return HTTPClientDisconnect(request=request)
etag = etag.hexdigest()
if 'etag' in request.headers and \
request.headers['etag'].lower() != etag:
return HTTPUnprocessableEntity(request=request)
metadata = {
'X-Timestamp': request.headers['x-timestamp'],
'Content-Type': request.headers['content-type'],
'ETag': etag,
'Content-Length': str(upload_size),
}
metadata.update(val for val in request.headers.iteritems()
if val[0].lower().startswith('x-object-meta-') and
len(val[0]) > 14)
for header_key in self.allowed_headers:
if header_key in request.headers:
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
old_delete_at = int(file.metadata.get('X-Delete-At') or 0)
if old_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update(
'PUT', new_delete_at, account, container, obj,
request.headers, device)
if old_delete_at:
self.delete_at_update(
'DELETE', old_delete_at, account, container, obj,
request.headers, device)
file.put(fd, metadata)
if fsize is not None and fsize != upload_size:
return HTTPClientDisconnect(request=request)
etag = etag.hexdigest()
if 'etag' in request.headers and \
request.headers['etag'].lower() != etag:
return HTTPUnprocessableEntity(request=request)
metadata = {
'X-Timestamp': request.headers['x-timestamp'],
'Content-Type': request.headers['content-type'],
'ETag': etag,
'Content-Length': str(upload_size),
}
metadata.update(val for val in request.headers.iteritems()
if val[0].lower().startswith('x-object-meta-')
and len(val[0]) > 14)
for header_key in self.allowed_headers:
if header_key in request.headers:
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
old_delete_at = int(file.metadata.get('X-Delete-At') or 0)
if old_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update(
'PUT', new_delete_at, account, container, obj,
request.headers, device)
if old_delete_at:
self.delete_at_update(
'DELETE', old_delete_at, account, container, obj,
request.headers, device)
file.put(fd, upload_size, metadata)
except DiskFileNoSpace:
return HTTPInsufficientStorage(drive=device, request=request)
file.unlinkold(metadata['X-Timestamp'])
if not orig_timestamp or \
orig_timestamp < request.headers['x-timestamp']:

View File

@ -72,7 +72,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
self.disk_file.put(fd, metadata)
self.disk_file.put(fd, 1024, metadata)
pre_quarantines = self.auditor.quarantines
self.auditor.object_audit(
@ -100,7 +100,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
self.disk_file.put(fd, metadata)
self.disk_file.put(fd, 1024, metadata)
pre_quarantines = self.auditor.quarantines
# remake so it will have metadata
self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
@ -161,7 +161,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
self.disk_file.put(fd, metadata)
self.disk_file.put(fd, 1024, metadata)
self.disk_file.close()
self.auditor.audit_all_objects()
self.assertEquals(self.auditor.quarantines, pre_quarantines)
@ -181,7 +181,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
self.disk_file.put(fd, metadata)
self.disk_file.put(fd, 1024, metadata)
self.disk_file.close()
os.write(fd, 'extra_data')
self.auditor.audit_all_objects()
@ -202,7 +202,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
self.disk_file.put(fd, metadata)
self.disk_file.put(fd, 10, metadata)
self.disk_file.close()
self.auditor.audit_all_objects()
self.disk_file = DiskFile(self.devices, 'sdb', '0', 'a', 'c',
@ -218,7 +218,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
self.disk_file.put(fd, metadata)
self.disk_file.put(fd, 10, metadata)
self.disk_file.close()
os.write(fd, 'extra_data')
self.auditor.audit_all_objects()
@ -238,7 +238,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': str(os.fstat(fd).st_size),
}
self.disk_file.put(fd, metadata)
self.disk_file.put(fd, 1024, metadata)
etag = md5()
etag.update('1' + '0' * 1023)
etag = etag.hexdigest()
@ -275,7 +275,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': 10,
}
self.disk_file.put(fd, metadata)
self.disk_file.put(fd, 10, metadata)
etag = md5()
etag = etag.hexdigest()
metadata['ETag'] = etag

View File

@ -224,7 +224,7 @@ class TestDiskFile(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
df.put(fd, metadata, extension=extension)
df.put(fd, fsize, metadata, extension=extension)
if invalid_type == 'ETag':
etag = md5()
etag.update('1' + '0' * (fsize - 1))