Push fallocate() down into mkstemp(); use known size
Towards defining the DiskFile class, or something like it, as an API for the low level disk acesses, we push the fallocate() system call down into the DiskFile.mkstemp() method. This allows another implementation of DiskFile to decide to use or not use fallocate(). Change-Id: Ib4d2ee1f971e4e20e53ca4b41892c5e44ecc88d5 Signed-off-by: Peter Portante <peter.portante@redhat.com>
This commit is contained in:
parent
5d52d2d1cc
commit
d62a2a832e
@ -50,6 +50,10 @@ class DiskFileNotExist(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class DiskFileNoSpace(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class PathNotDir(OSError):
|
||||
pass
|
||||
|
||||
|
@ -38,7 +38,7 @@ from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.constraints import check_object_creation, check_mount, \
|
||||
check_float, check_utf8
|
||||
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
|
||||
DiskFileNotExist, DiskFileCollision
|
||||
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace
|
||||
from swift.obj.replicator import tpool_reraise, invalidate_hash, \
|
||||
quarantine_renamer, get_hashes
|
||||
from swift.common.http import is_success
|
||||
@ -284,12 +284,22 @@ class DiskFile(object):
|
||||
int(self.metadata['X-Delete-At']) <= time.time())
|
||||
|
||||
@contextmanager
|
||||
def mkstemp(self):
|
||||
"""Contextmanager to make a temporary file."""
|
||||
def mkstemp(self, size=None):
|
||||
"""
|
||||
Contextmanager to make a temporary file.
|
||||
|
||||
:param size: optional initial size of file to allocate on disk
|
||||
:raises DiskFileNoSpace: if a size is specified and fallocate fails
|
||||
"""
|
||||
if not os.path.exists(self.tmpdir):
|
||||
mkdirs(self.tmpdir)
|
||||
fd, self.tmppath = mkstemp(dir=self.tmpdir)
|
||||
try:
|
||||
if size is not None and size > 0:
|
||||
try:
|
||||
fallocate(fd, size)
|
||||
except OSError:
|
||||
raise DiskFileNoSpace()
|
||||
yield fd
|
||||
finally:
|
||||
try:
|
||||
@ -302,13 +312,14 @@ class DiskFile(object):
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def put(self, fd, metadata, extension='.data'):
|
||||
def put(self, fd, fsize, metadata, extension='.data'):
|
||||
"""
|
||||
Finalize writing the file on disk, and renames it from the temp file to
|
||||
the real location. This should be called after the data has been
|
||||
written to the temp file.
|
||||
|
||||
:param fd: file descriptor of the temp file
|
||||
:param fsize: final on-disk size of the created file
|
||||
:param metadata: dictionary of metadata to be written
|
||||
:param extension: extension to be used when making the file
|
||||
"""
|
||||
@ -322,11 +333,10 @@ class DiskFile(object):
|
||||
# redundant work the drop cache code will perform on the pages (now
|
||||
# that after fsync the pages will be all clean).
|
||||
tpool.execute(fsync, fd)
|
||||
if 'Content-Length' in metadata:
|
||||
# From the Department of the Redundancy Department, make sure we
|
||||
# call drop_cache() after fsync() to avoid redundant work (pages
|
||||
# all clean).
|
||||
self.drop_cache(fd, 0, int(metadata['Content-Length']))
|
||||
# From the Department of the Redundancy Department, make sure we
|
||||
# call drop_cache() after fsync() to avoid redundant work (pages
|
||||
# all clean).
|
||||
self.drop_cache(fd, 0, fsize)
|
||||
invalidate_hash(os.path.dirname(self.datadir))
|
||||
# After the rename completes, this object will be available for other
|
||||
# requests to reference.
|
||||
@ -343,7 +353,7 @@ class DiskFile(object):
|
||||
"""
|
||||
extension = '.ts' if tombstone else '.meta'
|
||||
with self.mkstemp() as fd:
|
||||
self.put(fd, metadata, extension=extension)
|
||||
self.put(fd, 0, metadata, extension=extension)
|
||||
|
||||
def unlinkold(self, timestamp):
|
||||
"""
|
||||
@ -660,68 +670,70 @@ class ObjectController(object):
|
||||
orig_timestamp = file.metadata.get('X-Timestamp')
|
||||
upload_expiration = time.time() + self.max_upload_time
|
||||
etag = md5()
|
||||
fsize = request.headers.get('content-length', None)
|
||||
if fsize is not None:
|
||||
fsize = int(fsize)
|
||||
upload_size = 0
|
||||
last_sync = 0
|
||||
elapsed_time = 0
|
||||
with file.mkstemp() as fd:
|
||||
try:
|
||||
fallocate(fd, int(request.headers.get('content-length', 0)))
|
||||
except OSError:
|
||||
return HTTPInsufficientStorage(drive=device, request=request)
|
||||
reader = request.environ['wsgi.input'].read
|
||||
for chunk in iter(lambda: reader(self.network_chunk_size), ''):
|
||||
start_time = time.time()
|
||||
upload_size += len(chunk)
|
||||
if time.time() > upload_expiration:
|
||||
self.logger.increment('PUT.timeouts')
|
||||
return HTTPRequestTimeout(request=request)
|
||||
etag.update(chunk)
|
||||
while chunk:
|
||||
written = os.write(fd, chunk)
|
||||
chunk = chunk[written:]
|
||||
# For large files sync every 512MB (by default) written
|
||||
if upload_size - last_sync >= self.bytes_per_sync:
|
||||
tpool.execute(fdatasync, fd)
|
||||
drop_buffer_cache(fd, last_sync, upload_size - last_sync)
|
||||
last_sync = upload_size
|
||||
sleep()
|
||||
elapsed_time += time.time() - start_time
|
||||
try:
|
||||
with file.mkstemp(size=fsize) as fd:
|
||||
reader = request.environ['wsgi.input'].read
|
||||
for chunk in iter(lambda: reader(self.network_chunk_size), ''):
|
||||
start_time = time.time()
|
||||
upload_size += len(chunk)
|
||||
if time.time() > upload_expiration:
|
||||
self.logger.increment('PUT.timeouts')
|
||||
return HTTPRequestTimeout(request=request)
|
||||
etag.update(chunk)
|
||||
while chunk:
|
||||
written = os.write(fd, chunk)
|
||||
chunk = chunk[written:]
|
||||
# For large files sync every 512MB (by default) written
|
||||
if upload_size - last_sync >= self.bytes_per_sync:
|
||||
tpool.execute(fdatasync, fd)
|
||||
drop_buffer_cache(fd, last_sync,
|
||||
upload_size - last_sync)
|
||||
last_sync = upload_size
|
||||
sleep()
|
||||
elapsed_time += time.time() - start_time
|
||||
|
||||
if upload_size:
|
||||
self.logger.transfer_rate(
|
||||
'PUT.' + device + '.timing', elapsed_time, upload_size)
|
||||
if upload_size:
|
||||
self.logger.transfer_rate(
|
||||
'PUT.' + device + '.timing', elapsed_time, upload_size)
|
||||
|
||||
if 'content-length' in request.headers and \
|
||||
int(request.headers['content-length']) != upload_size:
|
||||
return HTTPClientDisconnect(request=request)
|
||||
etag = etag.hexdigest()
|
||||
if 'etag' in request.headers and \
|
||||
request.headers['etag'].lower() != etag:
|
||||
return HTTPUnprocessableEntity(request=request)
|
||||
metadata = {
|
||||
'X-Timestamp': request.headers['x-timestamp'],
|
||||
'Content-Type': request.headers['content-type'],
|
||||
'ETag': etag,
|
||||
'Content-Length': str(upload_size),
|
||||
}
|
||||
metadata.update(val for val in request.headers.iteritems()
|
||||
if val[0].lower().startswith('x-object-meta-') and
|
||||
len(val[0]) > 14)
|
||||
for header_key in self.allowed_headers:
|
||||
if header_key in request.headers:
|
||||
header_caps = header_key.title()
|
||||
metadata[header_caps] = request.headers[header_key]
|
||||
old_delete_at = int(file.metadata.get('X-Delete-At') or 0)
|
||||
if old_delete_at != new_delete_at:
|
||||
if new_delete_at:
|
||||
self.delete_at_update(
|
||||
'PUT', new_delete_at, account, container, obj,
|
||||
request.headers, device)
|
||||
if old_delete_at:
|
||||
self.delete_at_update(
|
||||
'DELETE', old_delete_at, account, container, obj,
|
||||
request.headers, device)
|
||||
file.put(fd, metadata)
|
||||
if fsize is not None and fsize != upload_size:
|
||||
return HTTPClientDisconnect(request=request)
|
||||
etag = etag.hexdigest()
|
||||
if 'etag' in request.headers and \
|
||||
request.headers['etag'].lower() != etag:
|
||||
return HTTPUnprocessableEntity(request=request)
|
||||
metadata = {
|
||||
'X-Timestamp': request.headers['x-timestamp'],
|
||||
'Content-Type': request.headers['content-type'],
|
||||
'ETag': etag,
|
||||
'Content-Length': str(upload_size),
|
||||
}
|
||||
metadata.update(val for val in request.headers.iteritems()
|
||||
if val[0].lower().startswith('x-object-meta-')
|
||||
and len(val[0]) > 14)
|
||||
for header_key in self.allowed_headers:
|
||||
if header_key in request.headers:
|
||||
header_caps = header_key.title()
|
||||
metadata[header_caps] = request.headers[header_key]
|
||||
old_delete_at = int(file.metadata.get('X-Delete-At') or 0)
|
||||
if old_delete_at != new_delete_at:
|
||||
if new_delete_at:
|
||||
self.delete_at_update(
|
||||
'PUT', new_delete_at, account, container, obj,
|
||||
request.headers, device)
|
||||
if old_delete_at:
|
||||
self.delete_at_update(
|
||||
'DELETE', old_delete_at, account, container, obj,
|
||||
request.headers, device)
|
||||
file.put(fd, upload_size, metadata)
|
||||
except DiskFileNoSpace:
|
||||
return HTTPInsufficientStorage(drive=device, request=request)
|
||||
file.unlinkold(metadata['X-Timestamp'])
|
||||
if not orig_timestamp or \
|
||||
orig_timestamp < request.headers['x-timestamp']:
|
||||
|
@ -72,7 +72,7 @@ class TestAuditor(unittest.TestCase):
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(fd).st_size),
|
||||
}
|
||||
self.disk_file.put(fd, metadata)
|
||||
self.disk_file.put(fd, 1024, metadata)
|
||||
pre_quarantines = self.auditor.quarantines
|
||||
|
||||
self.auditor.object_audit(
|
||||
@ -100,7 +100,7 @@ class TestAuditor(unittest.TestCase):
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(fd).st_size),
|
||||
}
|
||||
self.disk_file.put(fd, metadata)
|
||||
self.disk_file.put(fd, 1024, metadata)
|
||||
pre_quarantines = self.auditor.quarantines
|
||||
# remake so it will have metadata
|
||||
self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
|
||||
@ -161,7 +161,7 @@ class TestAuditor(unittest.TestCase):
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(fd).st_size),
|
||||
}
|
||||
self.disk_file.put(fd, metadata)
|
||||
self.disk_file.put(fd, 1024, metadata)
|
||||
self.disk_file.close()
|
||||
self.auditor.audit_all_objects()
|
||||
self.assertEquals(self.auditor.quarantines, pre_quarantines)
|
||||
@ -181,7 +181,7 @@ class TestAuditor(unittest.TestCase):
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(fd).st_size),
|
||||
}
|
||||
self.disk_file.put(fd, metadata)
|
||||
self.disk_file.put(fd, 1024, metadata)
|
||||
self.disk_file.close()
|
||||
os.write(fd, 'extra_data')
|
||||
self.auditor.audit_all_objects()
|
||||
@ -202,7 +202,7 @@ class TestAuditor(unittest.TestCase):
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(fd).st_size),
|
||||
}
|
||||
self.disk_file.put(fd, metadata)
|
||||
self.disk_file.put(fd, 10, metadata)
|
||||
self.disk_file.close()
|
||||
self.auditor.audit_all_objects()
|
||||
self.disk_file = DiskFile(self.devices, 'sdb', '0', 'a', 'c',
|
||||
@ -218,7 +218,7 @@ class TestAuditor(unittest.TestCase):
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(fd).st_size),
|
||||
}
|
||||
self.disk_file.put(fd, metadata)
|
||||
self.disk_file.put(fd, 10, metadata)
|
||||
self.disk_file.close()
|
||||
os.write(fd, 'extra_data')
|
||||
self.auditor.audit_all_objects()
|
||||
@ -238,7 +238,7 @@ class TestAuditor(unittest.TestCase):
|
||||
'X-Timestamp': str(normalize_timestamp(time.time())),
|
||||
'Content-Length': str(os.fstat(fd).st_size),
|
||||
}
|
||||
self.disk_file.put(fd, metadata)
|
||||
self.disk_file.put(fd, 1024, metadata)
|
||||
etag = md5()
|
||||
etag.update('1' + '0' * 1023)
|
||||
etag = etag.hexdigest()
|
||||
@ -275,7 +275,7 @@ class TestAuditor(unittest.TestCase):
|
||||
'X-Timestamp': str(normalize_timestamp(time.time())),
|
||||
'Content-Length': 10,
|
||||
}
|
||||
self.disk_file.put(fd, metadata)
|
||||
self.disk_file.put(fd, 10, metadata)
|
||||
etag = md5()
|
||||
etag = etag.hexdigest()
|
||||
metadata['ETag'] = etag
|
||||
|
@ -224,7 +224,7 @@ class TestDiskFile(unittest.TestCase):
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(fd).st_size),
|
||||
}
|
||||
df.put(fd, metadata, extension=extension)
|
||||
df.put(fd, fsize, metadata, extension=extension)
|
||||
if invalid_type == 'ETag':
|
||||
etag = md5()
|
||||
etag.update('1' + '0' * (fsize - 1))
|
||||
|
Loading…
x
Reference in New Issue
Block a user