Perform atomic upload updates v2

The way the registry was previously written if two concurrent uploads
of the same blob were happening one would fail to grab the lock and then
return early. The uploading client would then immediately HEAD the blob
and if it did so quickly enough would get a short size or 404. To avoid
this we need the upload to continue until all concurrent uploads are
complete.

To make this happen we treat upload chunks separately per upload so that
separate uploads cannot overwrite the chunks once they are moved to the
blob directory. We end up moving the chunks to the blob directory in
upload specific locations to facilitate this. Once that is done we can
atomically update the actual blob data from the chunks. In the
filesystem driver we concatenate the chunks into the blob then
atomically rename the result into its final blob/data location. This
ensures that we only ever return valid HEAD info for a blob, and that it
is only requested by the client once it exists.

This should be safe because the objects are hashsum addresses which
means their contents should be identical. If we end up moving one copy
into place then another atomically they will always have the same data
and size.

These logs from an OpenDev test job seem to capture this in action:

  # First upload is completing and grabs the lock
  2022-02-25 21:28:14,514 INFO registry.api: [u: 935f8eddbb9a4dab8dd8cc45ce7f9384] Upload final chunk _local opendevorg/gerrit digest sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343
  2022-02-25 21:28:14,576 DEBUG registry.storage: [u: 935f8eddbb9a4dab8dd8cc45ce7f9384] Locked digest sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343

  # Second upload attempts to complete but ends early without the lock
  2022-02-25 21:28:15,517 INFO registry.api: [u: e817d8fd6c464f80bf405581e580cbab] Upload final chunk _local opendevorg/gerrit digest sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343
  2022-02-25 21:28:15,578 WARNING registry.storage: [u: e817d8fd6c464f80bf405581e580cbab] Failed to obtain lock(1) on digest sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343
  2022-02-25 21:28:15,588 INFO registry.api: [u: e817d8fd6c464f80bf405581e580cbab] Upload complete _local opendevorg/gerrit digest sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343
  2022-02-25 21:28:15,589 INFO cherrypy.access.140551593545056: ::ffff:172.17.0.1 - - [25/Feb/2022:21:28:15] "PUT /v2/opendevorg/gerrit/blobs/uploads/e817d8fd6c464f80bf405581e580cbab?digest=sha256%3A0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 HTTP/1.1" 201 - "" "docker/20.10.12 go/go1.16.12 git-commit/459d0df kernel/5.4.0-100-generic os/linux arch/amd64 UpstreamClient(Docker-Client/20.10.12 \(linux\))"

  # Second upload completion triggers the HEAD requests that is either a
  # 404 or short read. This causes the second upload client to error.
  2022-02-25 21:28:15,605 INFO registry.api: Head blob _local opendevorg/gerrit sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 not found
  2022-02-25 21:28:15,607 INFO cherrypy.access.140551593545056: ::ffff:172.17.0.1 - - [25/Feb/2022:21:28:15] "HEAD /v2/opendevorg/gerrit/blobs/sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 HTTP/1.1" 404 735 "" "docker/20.10.12 go/go1.16.12 git-commit/459d0df kernel/5.4.0-100-generic os/linux arch/amd64 UpstreamClient(Docker-Client/20.10.12 \(linux\))"

  # Now first upload has completed and the HEAD request by the first
  # upload client is successful
  2022-02-25 21:28:18,898 INFO registry.api: [u: 935f8eddbb9a4dab8dd8cc45ce7f9384] Upload complete _local opendevorg/gerrit digest sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343
  2022-02-25 21:28:18,898 INFO cherrypy.access.140551593545056: ::ffff:172.17.0.1 - - [25/Feb/2022:21:28:18] "PUT /v2/opendevorg/gerrit/blobs/uploads/935f8eddbb9a4dab8dd8cc45ce7f9384?digest=sha256%3A0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 HTTP/1.1" 201 - "" "docker/20.10.12 go/go1.16.12 git-commit/459d0df kernel/5.4.0-100-generic os/linux arch/amd64 UpstreamClient(Docker-Client/20.10.12 \(linux\))"
  2022-02-25 21:28:18,915 INFO registry.api: Head blob _local opendevorg/gerrit sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 size 54917164
  2022-02-25 21:28:18,916 INFO cherrypy.access.140551593545056: ::ffff:172.17.0.1 - - [25/Feb/2022:21:28:18] "HEAD /v2/opendevorg/gerrit/blobs/sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 HTTP/1.1" 200 54917164 "" "docker/20.10.12 go/go1.16.12 git-commit/459d0df kernel/5.4.0-100-generic os/linux arch/amd64 UpstreamClient(Docker-Client/20.10.12 \(linux\))"

Change-Id: Ibdf1ca554756af61247d705b2ea3cf85c39c2b83
This commit is contained in:
Clark Boylan 2022-02-28 09:10:05 -08:00
parent 19ccf5990e
commit 4213b96d3a
3 changed files with 15 additions and 69 deletions

View File

@ -14,6 +14,7 @@
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import os
import tempfile
from . import storageutils
@ -110,7 +111,14 @@ class FilesystemDriver(storageutils.StorageDriver):
def cat_objects(self, path, chunks, uuid=None):
path = os.path.join(self.root, path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wb') as outf:
# We write to a temporary file in the same directory as the destiation
# file to ensure that we can rename it atomically once fully written.
# This is important because there may be multiple concurrent writes to
# the same object and due to client behavior we cannot return until
# at least one write is completed. To facilitate this we ensure each
# write happens completely then make that safe with atomic renames.
with tempfile.NamedTemporaryFile(dir=os.path.dirname(path),
delete=False) as outf:
for chunk in chunks:
chunk_path = os.path.join(self.root, chunk['path'])
with open(chunk_path, 'rb') as inf:
@ -121,6 +129,7 @@ class FilesystemDriver(storageutils.StorageDriver):
outf.write(d)
outf.flush()
os.fsync(outf.fileno())
os.rename(outf.name, path)
for chunk in chunks:
chunk_path = os.path.join(self.root, chunk['path'])
os.unlink(chunk_path)

View File

@ -216,69 +216,6 @@ class Storage:
self._update_upload(namespace, uuid, upload)
return upload.size - size, upload.size
def _delete_upload(self, upload, namespace, uuid):
"""Delete the files for an upload
This is called when we have detected a race with another
upload for the same blob and have chosen to delete this upload
without finalizing.
"""
for i, chunk in enumerate(upload.chunks):
src_path = os.path.join(namespace, 'uploads', uuid, str(i + 1))
self.backend.delete_object(src_path)
path = os.path.join(namespace, 'uploads', uuid, 'metadata')
self.backend.delete_object(path)
def _lock_upload(self, namespace, uuid, digest):
"""Lock the upload
Place a metadata file in the blob directory so we can detect
whether we are racing another upload for the same blob.
"""
# Check if the blob is locked
path = os.path.join(namespace, 'blobs', digest, 'metadata')
now = time.time()
current = self.backend.get_object(path)
waslocked = False
if current:
waslocked = True
current = json.loads(current.decode('utf8'))
locktime = int(current.get('time', 0))
if now - locktime < 300:
# The lock is in force, another simultaneous upload
# must be handling this; assume it will succeed and go
# ahead and clean up this upload.
self.log.warning("[u: %s] Failed to obtain lock(1) on "
"digest %s", uuid, digest)
return False
# Lock the blob
metadata = dict(upload=uuid,
time=now)
self.backend.put_object(path, json.dumps(metadata).encode('utf8'))
current = self.backend.get_object(path)
current = json.loads(current.decode('utf8'))
locktime = int(current.get('time', 0))
if (current.get('upload') != uuid and
now - locktime < 300):
# We lost a race for the lock, another simultaneous upload
# must be handling this; assume it will succeed and go
# ahead and clean up this upload.
self.log.warning("[u: %s] Failed to obtain lock(2) on digest %s",
uuid, digest)
return False
if waslocked:
self.log.warning("[u: %s] Breaking lock on digest %s",
uuid, digest)
else:
self.log.debug("[u: %s] Locked digest %s",
uuid, digest)
return True
def store_upload(self, namespace, uuid, digest):
"""Complete an upload.
@ -293,16 +230,13 @@ class Storage:
raise Exception('Digest does not match %s %s' %
(digest, upload.digest))
if not self._lock_upload(namespace, uuid, digest):
self._delete_upload(upload, namespace, uuid)
return
# Move the chunks into the blob dir to get them out of the
# uploads dir.
chunks = []
for i, chunk in enumerate(upload.chunks):
src_path = os.path.join(namespace, 'uploads', uuid, str(i + 1))
dst_path = os.path.join(namespace, 'blobs', digest, str(i + 1))
dst_path = os.path.join(namespace, 'blobs', digest,
'uploads', uuid, str(i + 1))
chunks.append(dict(path=dst_path,
md5=chunk['md5'], size=chunk['size']))
self.backend.move_object(src_path, dst_path, uuid)

View File

@ -219,6 +219,9 @@ class SwiftDriver(storageutils.StorageDriver):
def cat_objects(self, path, chunks, uuid=None):
manifest = []
# TODO: Would it be better to move 1-chunk objects?
# TODO: We can leak the upload chunks here if a blob is uploaded
# concurrently by two different clients. We should update the prune
# system to clean them up.
for chunk in chunks:
ret = retry_function(
lambda: self.conn.session.head(self.get_url(chunk['path'])))