4213b96d3a
The way the registry was previously written if two concurrent uploads of the same blob were happening one would fail to grab the lock and then return early. The uploading client would then immediately HEAD the blob and if it did so quickly enough would get a short size or 404. To avoid this we need the upload to continue until all concurrent uploads are complete. To make this happen we treat upload chunks separately per upload so that separate uploads cannot overwrite the chunks once they are moved to the blob directory. We end up moving the chunks to the blob directory in upload specific locations to facilitate this. Once that is done we can atomically update the actual blob data from the chunks. In the filesystem driver we concatenate the chunks into the blob then atomically rename the result into its final blob/data location. This ensures that we only ever return valid HEAD info for a blob, and that it is only requested by the client once it exists. This should be safe because the objects are hashsum addresses which means their contents should be identical. If we end up moving one copy into place then another atomically they will always have the same data and size. These logs from an OpenDev test job seem to capture this in action: # First upload is completing and grabs the lock 2022-02-25 21:28:14,514 INFO registry.api: [u: 935f8eddbb9a4dab8dd8cc45ce7f9384] Upload final chunk _local opendevorg/gerrit digest sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 2022-02-25 21:28:14,576 DEBUG registry.storage: [u: 935f8eddbb9a4dab8dd8cc45ce7f9384] Locked digest sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 # Second upload attempts to complete but ends early without the lock 2022-02-25 21:28:15,517 INFO registry.api: [u: e817d8fd6c464f80bf405581e580cbab] Upload final chunk _local opendevorg/gerrit digest sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 2022-02-25 21:28:15,578 WARNING registry.storage: [u: e817d8fd6c464f80bf405581e580cbab] Failed to obtain lock(1) on digest sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 2022-02-25 21:28:15,588 INFO registry.api: [u: e817d8fd6c464f80bf405581e580cbab] Upload complete _local opendevorg/gerrit digest sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 2022-02-25 21:28:15,589 INFO cherrypy.access.140551593545056: ::ffff:172.17.0.1 - - [25/Feb/2022:21:28:15] "PUT /v2/opendevorg/gerrit/blobs/uploads/e817d8fd6c464f80bf405581e580cbab?digest=sha256%3A0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 HTTP/1.1" 201 - "" "docker/20.10.12 go/go1.16.12 git-commit/459d0df kernel/5.4.0-100-generic os/linux arch/amd64 UpstreamClient(Docker-Client/20.10.12 \(linux\))" # Second upload completion triggers the HEAD requests that is either a # 404 or short read. This causes the second upload client to error. 2022-02-25 21:28:15,605 INFO registry.api: Head blob _local opendevorg/gerrit sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 not found 2022-02-25 21:28:15,607 INFO cherrypy.access.140551593545056: ::ffff:172.17.0.1 - - [25/Feb/2022:21:28:15] "HEAD /v2/opendevorg/gerrit/blobs/sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 HTTP/1.1" 404 735 "" "docker/20.10.12 go/go1.16.12 git-commit/459d0df kernel/5.4.0-100-generic os/linux arch/amd64 UpstreamClient(Docker-Client/20.10.12 \(linux\))" # Now first upload has completed and the HEAD request by the first # upload client is successful 2022-02-25 21:28:18,898 INFO registry.api: [u: 935f8eddbb9a4dab8dd8cc45ce7f9384] Upload complete _local opendevorg/gerrit digest sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 2022-02-25 21:28:18,898 INFO cherrypy.access.140551593545056: ::ffff:172.17.0.1 - - [25/Feb/2022:21:28:18] "PUT /v2/opendevorg/gerrit/blobs/uploads/935f8eddbb9a4dab8dd8cc45ce7f9384?digest=sha256%3A0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 HTTP/1.1" 201 - "" "docker/20.10.12 go/go1.16.12 git-commit/459d0df kernel/5.4.0-100-generic os/linux arch/amd64 UpstreamClient(Docker-Client/20.10.12 \(linux\))" 2022-02-25 21:28:18,915 INFO registry.api: Head blob _local opendevorg/gerrit sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 size 54917164 2022-02-25 21:28:18,916 INFO cherrypy.access.140551593545056: ::ffff:172.17.0.1 - - [25/Feb/2022:21:28:18] "HEAD /v2/opendevorg/gerrit/blobs/sha256:0c6b8ff8c37e92eb1ca65ed8917e818927d5bf318b6f18896049b5d9afc28343 HTTP/1.1" 200 54917164 "" "docker/20.10.12 go/go1.16.12 git-commit/459d0df kernel/5.4.0-100-generic os/linux arch/amd64 UpstreamClient(Docker-Client/20.10.12 \(linux\))" Change-Id: Ibdf1ca554756af61247d705b2ea3cf85c39c2b83
252 lines
9.7 KiB
Python
252 lines
9.7 KiB
Python
# Copyright 2019 Red Hat, Inc.
|
|
#
|
|
# This module is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This software is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this software. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import logging
|
|
import openstack
|
|
import os
|
|
import keystoneauth1
|
|
import tempfile
|
|
import time
|
|
import json
|
|
|
|
import dateutil.parser
|
|
|
|
from . import storageutils
|
|
|
|
POST_ATTEMPTS = 3
|
|
SWIFT_CHUNK_SIZE = 64 * 1024
|
|
|
|
|
|
def retry_function(func):
|
|
for attempt in range(1, POST_ATTEMPTS + 1):
|
|
try:
|
|
return func()
|
|
except keystoneauth1.exceptions.http.NotFound:
|
|
raise
|
|
except Exception:
|
|
if attempt >= POST_ATTEMPTS:
|
|
raise
|
|
else:
|
|
logging.exception("Error on attempt %d" % attempt)
|
|
time.sleep(attempt * 10)
|
|
|
|
|
|
class SwiftDriver(storageutils.StorageDriver):
|
|
log = logging.getLogger('registry.swift')
|
|
|
|
def __init__(self, conf):
|
|
self.cloud_name = conf['cloud']
|
|
self.container_name = conf['container']
|
|
self.conn = openstack.connect(cloud=self.cloud_name)
|
|
container = retry_function(
|
|
lambda: self.conn.get_container(self.container_name))
|
|
if not container:
|
|
self.log.info("Creating container %s", self.container_name)
|
|
retry_function(
|
|
lambda: self.conn.create_container(
|
|
name=self.container_name, public=False))
|
|
endpoint = self.conn.object_store.get_endpoint()
|
|
self.url = os.path.join(endpoint, self.container_name)
|
|
|
|
def get_url(self, path):
|
|
return os.path.join(self.url, path)
|
|
|
|
def list_objects(self, path):
|
|
self.log.debug("List objects %s", path)
|
|
url = self.get_url('') + '?prefix=%s&delimiter=/&format=json' % (path,)
|
|
ret = retry_function(
|
|
lambda: self.conn.session.get(url).content.decode('utf8'))
|
|
data = json.loads(ret)
|
|
ret = []
|
|
for obj in data:
|
|
if 'subdir' in obj:
|
|
objpath = obj['subdir']
|
|
name = obj['subdir'].split('/')[-2]
|
|
ctime = time.time()
|
|
isdir = True
|
|
else:
|
|
objpath = obj['name']
|
|
name = obj['name'].split('/')[-1]
|
|
ctime = dateutil.parser.parse(
|
|
obj['last_modified'] + 'Z').timestamp()
|
|
isdir = False
|
|
ret.append(storageutils.ObjectInfo(
|
|
objpath, name, ctime, isdir))
|
|
return ret
|
|
|
|
def get_object_size(self, path):
|
|
try:
|
|
ret = retry_function(
|
|
lambda: self.conn.session.head(self.get_url(path)))
|
|
except keystoneauth1.exceptions.http.NotFound:
|
|
return None
|
|
return int(ret.headers['Content-Length'])
|
|
|
|
def put_object(self, path, data, uuid=None):
|
|
name = None
|
|
try:
|
|
with tempfile.NamedTemporaryFile('wb', delete=False) as f:
|
|
name = f.name
|
|
if isinstance(data, bytes):
|
|
f.write(data)
|
|
else:
|
|
for chunk in data:
|
|
f.write(chunk)
|
|
retry_function(
|
|
lambda: self.conn.object_store.upload_object(
|
|
self.container_name,
|
|
path,
|
|
filename=name))
|
|
|
|
# Get the md5sum and size of the object, and make sure it
|
|
# matches the upload.
|
|
ret = retry_function(lambda: self.conn.session.head(
|
|
self.get_url(path)))
|
|
try:
|
|
size = int(ret.headers.get('Content-Length', ''))
|
|
except ValueError:
|
|
size = None
|
|
md5 = ret.headers.get('Etag', '')
|
|
sdk_md5 = ret.headers.get('X-Object-Meta-X-Sdk-Md5', '')
|
|
self.log.debug("[u: %s] Upload object %s "
|
|
"md5: %s sdkmd5: %s size: %s",
|
|
uuid, path, md5, sdk_md5, size)
|
|
if md5 != sdk_md5:
|
|
raise Exception("Swift and SDK md5s did not match (u: %s)" %
|
|
uuid)
|
|
|
|
finally:
|
|
if name:
|
|
os.unlink(name)
|
|
|
|
def get_object(self, path):
|
|
try:
|
|
ret = retry_function(
|
|
lambda: self.conn.session.get(self.get_url(path)))
|
|
except keystoneauth1.exceptions.http.NotFound:
|
|
return None
|
|
return ret.content
|
|
|
|
def stream_object(self, path):
|
|
try:
|
|
ret = retry_function(
|
|
lambda: self.conn.session.get(self.get_url(path), stream=True))
|
|
except keystoneauth1.exceptions.http.NotFound:
|
|
return None, None
|
|
try:
|
|
size = int(ret.headers.get('Content-Length', ''))
|
|
except ValueError:
|
|
size = None
|
|
return size, ret.iter_content(chunk_size=SWIFT_CHUNK_SIZE)
|
|
|
|
def delete_object(self, path):
|
|
retry_function(
|
|
lambda: self.conn.session.delete(
|
|
self.get_url(path)))
|
|
|
|
def move_object(self, src_path, dst_path, uuid=None):
|
|
dst = os.path.join(self.container_name, dst_path)
|
|
|
|
# Get the md5sum and size of the object, and make sure it
|
|
# matches on both sides of the copy.
|
|
ret = retry_function(lambda: self.conn.session.head(
|
|
self.get_url(src_path)))
|
|
try:
|
|
size = int(ret.headers.get('Content-Length', ''))
|
|
except ValueError:
|
|
size = None
|
|
md5 = ret.headers.get('Etag', '')
|
|
sdk_md5 = ret.headers.get('X-Object-Meta-X-Sdk-Md5', '')
|
|
old_md = dict(md5=md5, sdk_md5=sdk_md5, size=size)
|
|
self.log.debug("[u: %s] Move object %s %s %s",
|
|
uuid, src_path, dst_path, old_md)
|
|
if md5 != sdk_md5:
|
|
raise Exception("Swift and SDK md5s did not match at start "
|
|
"of copy (u: %s) %s" % (uuid, old_md))
|
|
|
|
# FIXME: The multipart-manifest argument below means that in
|
|
# the event this docker chunk is a large object, we intend to
|
|
# copy the manifest but not the underlying large object
|
|
# segments. That seems incorrect, and we should actually just
|
|
# recast the large object segments into docker chunks and
|
|
# discard this manifest. But first we should verify that's
|
|
# what's happening -- it's not clear we ever hit a segment
|
|
# limit in practice, so we may never have a large object
|
|
# chunk.
|
|
retry_function(
|
|
lambda: self.conn.session.request(
|
|
self.get_url(src_path) + "?multipart-manfest=get",
|
|
'COPY',
|
|
headers={'Destination': dst}
|
|
))
|
|
|
|
# Get the md5sum and size of the object, and make sure it
|
|
# matches on both sides of the copy.
|
|
ret = retry_function(lambda: self.conn.session.head(
|
|
self.get_url(dst_path)))
|
|
try:
|
|
size = int(ret.headers.get('Content-Length', ''))
|
|
except ValueError:
|
|
size = None
|
|
md5 = ret.headers.get('Etag', '')
|
|
sdk_md5 = ret.headers.get('X-Object-Meta-X-Sdk-Md5', '')
|
|
new_md = dict(md5=md5, sdk_md5=sdk_md5, size=size)
|
|
self.log.debug("[u: %s] Moved object %s %s %s",
|
|
uuid, src_path, dst_path, new_md)
|
|
if md5 != sdk_md5:
|
|
raise Exception("Swift and SDK md5s did not match at end of copy "
|
|
"(u: %s) %s" % (uuid, new_md))
|
|
if old_md != new_md:
|
|
raise Exception("Object metadata did not match after copy "
|
|
"(u: %s) old: %s new: %s" % (uuid, old_md, new_md))
|
|
|
|
retry_function(
|
|
lambda: self.conn.session.delete(
|
|
self.get_url(src_path)))
|
|
|
|
def cat_objects(self, path, chunks, uuid=None):
|
|
manifest = []
|
|
# TODO: Would it be better to move 1-chunk objects?
|
|
# TODO: We can leak the upload chunks here if a blob is uploaded
|
|
# concurrently by two different clients. We should update the prune
|
|
# system to clean them up.
|
|
for chunk in chunks:
|
|
ret = retry_function(
|
|
lambda: self.conn.session.head(self.get_url(chunk['path'])))
|
|
size = int(ret.headers['Content-Length'])
|
|
if size == 0:
|
|
continue
|
|
etag = ret.headers['Etag']
|
|
sdk_md5 = ret.headers['X-Object-Meta-X-Sdk-Md5']
|
|
if not (sdk_md5 == etag == chunk['md5']):
|
|
raise Exception("Object metadata did not match during cat "
|
|
"(u: %s) orig: %s sdk: %s etag: %s" % (
|
|
uuid, chunk['md5'], sdk_md5, etag))
|
|
if not (size == chunk['size']):
|
|
raise Exception("Object metadata did not match during cat "
|
|
"(u: %s) orig: %s size: %s" % (
|
|
uuid, chunk['size'], size))
|
|
manifest.append({'path':
|
|
os.path.join(self.container_name, chunk['path']),
|
|
'etag': ret.headers['Etag'],
|
|
'size_bytes': ret.headers['Content-Length']})
|
|
retry_function(lambda:
|
|
self.conn.session.put(
|
|
self.get_url(path) + "?multipart-manifest=put",
|
|
data=json.dumps(manifest)))
|
|
|
|
|
|
Driver = SwiftDriver
|