Merge "Perform atomic upload updates v2"
This commit is contained in:
commit
ba3c104689
@ -14,6 +14,7 @@
|
||||
# along with this software. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
from . import storageutils
|
||||
|
||||
@ -110,7 +111,14 @@ class FilesystemDriver(storageutils.StorageDriver):
|
||||
def cat_objects(self, path, chunks, uuid=None):
|
||||
path = os.path.join(self.root, path)
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
with open(path, 'wb') as outf:
|
||||
# We write to a temporary file in the same directory as the destiation
|
||||
# file to ensure that we can rename it atomically once fully written.
|
||||
# This is important because there may be multiple concurrent writes to
|
||||
# the same object and due to client behavior we cannot return until
|
||||
# at least one write is completed. To facilitate this we ensure each
|
||||
# write happens completely then make that safe with atomic renames.
|
||||
with tempfile.NamedTemporaryFile(dir=os.path.dirname(path),
|
||||
delete=False) as outf:
|
||||
for chunk in chunks:
|
||||
chunk_path = os.path.join(self.root, chunk['path'])
|
||||
with open(chunk_path, 'rb') as inf:
|
||||
@ -121,6 +129,7 @@ class FilesystemDriver(storageutils.StorageDriver):
|
||||
outf.write(d)
|
||||
outf.flush()
|
||||
os.fsync(outf.fileno())
|
||||
os.rename(outf.name, path)
|
||||
for chunk in chunks:
|
||||
chunk_path = os.path.join(self.root, chunk['path'])
|
||||
os.unlink(chunk_path)
|
||||
|
@ -216,69 +216,6 @@ class Storage:
|
||||
self._update_upload(namespace, uuid, upload)
|
||||
return upload.size - size, upload.size
|
||||
|
||||
def _delete_upload(self, upload, namespace, uuid):
|
||||
"""Delete the files for an upload
|
||||
|
||||
This is called when we have detected a race with another
|
||||
upload for the same blob and have chosen to delete this upload
|
||||
without finalizing.
|
||||
"""
|
||||
|
||||
for i, chunk in enumerate(upload.chunks):
|
||||
src_path = os.path.join(namespace, 'uploads', uuid, str(i + 1))
|
||||
self.backend.delete_object(src_path)
|
||||
path = os.path.join(namespace, 'uploads', uuid, 'metadata')
|
||||
self.backend.delete_object(path)
|
||||
|
||||
def _lock_upload(self, namespace, uuid, digest):
|
||||
"""Lock the upload
|
||||
|
||||
Place a metadata file in the blob directory so we can detect
|
||||
whether we are racing another upload for the same blob.
|
||||
"""
|
||||
|
||||
# Check if the blob is locked
|
||||
path = os.path.join(namespace, 'blobs', digest, 'metadata')
|
||||
now = time.time()
|
||||
current = self.backend.get_object(path)
|
||||
waslocked = False
|
||||
if current:
|
||||
waslocked = True
|
||||
current = json.loads(current.decode('utf8'))
|
||||
locktime = int(current.get('time', 0))
|
||||
if now - locktime < 300:
|
||||
# The lock is in force, another simultaneous upload
|
||||
# must be handling this; assume it will succeed and go
|
||||
# ahead and clean up this upload.
|
||||
self.log.warning("[u: %s] Failed to obtain lock(1) on "
|
||||
"digest %s", uuid, digest)
|
||||
return False
|
||||
|
||||
# Lock the blob
|
||||
metadata = dict(upload=uuid,
|
||||
time=now)
|
||||
self.backend.put_object(path, json.dumps(metadata).encode('utf8'))
|
||||
current = self.backend.get_object(path)
|
||||
current = json.loads(current.decode('utf8'))
|
||||
locktime = int(current.get('time', 0))
|
||||
if (current.get('upload') != uuid and
|
||||
now - locktime < 300):
|
||||
# We lost a race for the lock, another simultaneous upload
|
||||
# must be handling this; assume it will succeed and go
|
||||
# ahead and clean up this upload.
|
||||
self.log.warning("[u: %s] Failed to obtain lock(2) on digest %s",
|
||||
uuid, digest)
|
||||
return False
|
||||
|
||||
if waslocked:
|
||||
self.log.warning("[u: %s] Breaking lock on digest %s",
|
||||
uuid, digest)
|
||||
else:
|
||||
self.log.debug("[u: %s] Locked digest %s",
|
||||
uuid, digest)
|
||||
|
||||
return True
|
||||
|
||||
def store_upload(self, namespace, uuid, digest):
|
||||
|
||||
"""Complete an upload.
|
||||
@ -293,16 +230,13 @@ class Storage:
|
||||
raise Exception('Digest does not match %s %s' %
|
||||
(digest, upload.digest))
|
||||
|
||||
if not self._lock_upload(namespace, uuid, digest):
|
||||
self._delete_upload(upload, namespace, uuid)
|
||||
return
|
||||
|
||||
# Move the chunks into the blob dir to get them out of the
|
||||
# uploads dir.
|
||||
chunks = []
|
||||
for i, chunk in enumerate(upload.chunks):
|
||||
src_path = os.path.join(namespace, 'uploads', uuid, str(i + 1))
|
||||
dst_path = os.path.join(namespace, 'blobs', digest, str(i + 1))
|
||||
dst_path = os.path.join(namespace, 'blobs', digest,
|
||||
'uploads', uuid, str(i + 1))
|
||||
chunks.append(dict(path=dst_path,
|
||||
md5=chunk['md5'], size=chunk['size']))
|
||||
self.backend.move_object(src_path, dst_path, uuid)
|
||||
|
@ -219,6 +219,9 @@ class SwiftDriver(storageutils.StorageDriver):
|
||||
def cat_objects(self, path, chunks, uuid=None):
|
||||
manifest = []
|
||||
# TODO: Would it be better to move 1-chunk objects?
|
||||
# TODO: We can leak the upload chunks here if a blob is uploaded
|
||||
# concurrently by two different clients. We should update the prune
|
||||
# system to clean them up.
|
||||
for chunk in chunks:
|
||||
ret = retry_function(
|
||||
lambda: self.conn.session.head(self.get_url(chunk['path'])))
|
||||
|
Loading…
Reference in New Issue
Block a user