diff --git a/zuul_registry/filesystem.py b/zuul_registry/filesystem.py index cedc76f..b68b9b2 100644 --- a/zuul_registry/filesystem.py +++ b/zuul_registry/filesystem.py @@ -14,6 +14,7 @@ # along with this software. If not, see . import os +import tempfile from . import storageutils @@ -110,7 +111,14 @@ class FilesystemDriver(storageutils.StorageDriver): def cat_objects(self, path, chunks, uuid=None): path = os.path.join(self.root, path) os.makedirs(os.path.dirname(path), exist_ok=True) - with open(path, 'wb') as outf: + # We write to a temporary file in the same directory as the destiation + # file to ensure that we can rename it atomically once fully written. + # This is important because there may be multiple concurrent writes to + # the same object and due to client behavior we cannot return until + # at least one write is completed. To facilitate this we ensure each + # write happens completely then make that safe with atomic renames. + with tempfile.NamedTemporaryFile(dir=os.path.dirname(path), + delete=False) as outf: for chunk in chunks: chunk_path = os.path.join(self.root, chunk['path']) with open(chunk_path, 'rb') as inf: @@ -121,6 +129,7 @@ class FilesystemDriver(storageutils.StorageDriver): outf.write(d) outf.flush() os.fsync(outf.fileno()) + os.rename(outf.name, path) for chunk in chunks: chunk_path = os.path.join(self.root, chunk['path']) os.unlink(chunk_path) diff --git a/zuul_registry/storage.py b/zuul_registry/storage.py index ba21f5f..f944b35 100644 --- a/zuul_registry/storage.py +++ b/zuul_registry/storage.py @@ -216,69 +216,6 @@ class Storage: self._update_upload(namespace, uuid, upload) return upload.size - size, upload.size - def _delete_upload(self, upload, namespace, uuid): - """Delete the files for an upload - - This is called when we have detected a race with another - upload for the same blob and have chosen to delete this upload - without finalizing. - """ - - for i, chunk in enumerate(upload.chunks): - src_path = os.path.join(namespace, 'uploads', uuid, str(i + 1)) - self.backend.delete_object(src_path) - path = os.path.join(namespace, 'uploads', uuid, 'metadata') - self.backend.delete_object(path) - - def _lock_upload(self, namespace, uuid, digest): - """Lock the upload - - Place a metadata file in the blob directory so we can detect - whether we are racing another upload for the same blob. - """ - - # Check if the blob is locked - path = os.path.join(namespace, 'blobs', digest, 'metadata') - now = time.time() - current = self.backend.get_object(path) - waslocked = False - if current: - waslocked = True - current = json.loads(current.decode('utf8')) - locktime = int(current.get('time', 0)) - if now - locktime < 300: - # The lock is in force, another simultaneous upload - # must be handling this; assume it will succeed and go - # ahead and clean up this upload. - self.log.warning("[u: %s] Failed to obtain lock(1) on " - "digest %s", uuid, digest) - return False - - # Lock the blob - metadata = dict(upload=uuid, - time=now) - self.backend.put_object(path, json.dumps(metadata).encode('utf8')) - current = self.backend.get_object(path) - current = json.loads(current.decode('utf8')) - locktime = int(current.get('time', 0)) - if (current.get('upload') != uuid and - now - locktime < 300): - # We lost a race for the lock, another simultaneous upload - # must be handling this; assume it will succeed and go - # ahead and clean up this upload. - self.log.warning("[u: %s] Failed to obtain lock(2) on digest %s", - uuid, digest) - return False - - if waslocked: - self.log.warning("[u: %s] Breaking lock on digest %s", - uuid, digest) - else: - self.log.debug("[u: %s] Locked digest %s", - uuid, digest) - - return True - def store_upload(self, namespace, uuid, digest): """Complete an upload. @@ -293,16 +230,13 @@ class Storage: raise Exception('Digest does not match %s %s' % (digest, upload.digest)) - if not self._lock_upload(namespace, uuid, digest): - self._delete_upload(upload, namespace, uuid) - return - # Move the chunks into the blob dir to get them out of the # uploads dir. chunks = [] for i, chunk in enumerate(upload.chunks): src_path = os.path.join(namespace, 'uploads', uuid, str(i + 1)) - dst_path = os.path.join(namespace, 'blobs', digest, str(i + 1)) + dst_path = os.path.join(namespace, 'blobs', digest, + 'uploads', uuid, str(i + 1)) chunks.append(dict(path=dst_path, md5=chunk['md5'], size=chunk['size'])) self.backend.move_object(src_path, dst_path, uuid) diff --git a/zuul_registry/swift.py b/zuul_registry/swift.py index 1dd3bd3..4c2a3f5 100644 --- a/zuul_registry/swift.py +++ b/zuul_registry/swift.py @@ -219,6 +219,9 @@ class SwiftDriver(storageutils.StorageDriver): def cat_objects(self, path, chunks, uuid=None): manifest = [] # TODO: Would it be better to move 1-chunk objects? + # TODO: We can leak the upload chunks here if a blob is uploaded + # concurrently by two different clients. We should update the prune + # system to clean them up. for chunk in chunks: ret = retry_function( lambda: self.conn.session.head(self.get_url(chunk['path'])))