Merge "Add debug/verification for uploads"
This commit is contained in:
commit
033b8a89a5
@ -43,7 +43,7 @@ class FilesystemDriver(storageutils.StorageDriver):
|
||||
return None
|
||||
return os.stat(path).st_size
|
||||
|
||||
def put_object(self, path, data):
|
||||
def put_object(self, path, data, uuid=None):
|
||||
path = os.path.join(self.root, path)
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
with open(path, 'wb') as f:
|
||||
@ -99,26 +99,26 @@ class FilesystemDriver(storageutils.StorageDriver):
|
||||
else:
|
||||
os.unlink(path)
|
||||
|
||||
def move_object(self, src_path, dst_path):
|
||||
def move_object(self, src_path, dst_path, uuid=None):
|
||||
src_path = os.path.join(self.root, src_path)
|
||||
dst_path = os.path.join(self.root, dst_path)
|
||||
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
|
||||
os.rename(src_path, dst_path)
|
||||
|
||||
def cat_objects(self, path, chunks):
|
||||
def cat_objects(self, path, chunks, uuid=None):
|
||||
path = os.path.join(self.root, path)
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
with open(path, 'wb') as outf:
|
||||
for chunk_path in chunks:
|
||||
chunk_path = os.path.join(self.root, chunk_path)
|
||||
for chunk in chunks:
|
||||
chunk_path = os.path.join(self.root, chunk['path'])
|
||||
with open(chunk_path, 'rb') as inf:
|
||||
while True:
|
||||
d = inf.read(4096)
|
||||
if not d:
|
||||
break
|
||||
outf.write(d)
|
||||
for chunk_path in chunks:
|
||||
chunk_path = os.path.join(self.root, chunk_path)
|
||||
for chunk in chunks:
|
||||
chunk_path = os.path.join(self.root, chunk['path'])
|
||||
os.unlink(chunk_path)
|
||||
|
||||
|
||||
|
@ -196,8 +196,8 @@ class RegistryAPI:
|
||||
namespace, repository = self.get_namespace(repository)
|
||||
method = cherrypy.request.method
|
||||
uuid = self.storage.start_upload(namespace)
|
||||
self.log.info('Start upload %s %s %s uuid %s digest %s',
|
||||
method, namespace, repository, uuid, digest)
|
||||
self.log.info('[u: %s] Start upload %s %s %s digest %s',
|
||||
uuid, method, namespace, repository, digest)
|
||||
res = cherrypy.response
|
||||
res.headers['Location'] = '/v2/%s/blobs/uploads/%s' % (
|
||||
orig_repository, uuid)
|
||||
@ -210,7 +210,8 @@ class RegistryAPI:
|
||||
def upload_chunk(self, repository, uuid):
|
||||
orig_repository = repository
|
||||
namespace, repository = self.get_namespace(repository)
|
||||
self.log.info('Upload chunk %s %s %s', namespace, repository, uuid)
|
||||
self.log.info('[u: %s] Upload chunk %s %s',
|
||||
uuid, namespace, repository)
|
||||
old_length, new_length = self.storage.upload_chunk(
|
||||
namespace, uuid, cherrypy.request.body)
|
||||
res = cherrypy.response
|
||||
@ -220,17 +221,22 @@ class RegistryAPI:
|
||||
res.headers['Range'] = '0-%s' % (new_length,)
|
||||
res.status = '204 No Content'
|
||||
self.log.info(
|
||||
'Finish Upload chunk %s %s %s', repository, uuid, new_length)
|
||||
'[u: %s] Finish Upload chunk %s %s', uuid, repository, new_length)
|
||||
|
||||
@cherrypy.expose
|
||||
@cherrypy.config(**{'tools.check_auth.level': Authorization.WRITE})
|
||||
def finish_upload(self, repository, uuid, digest):
|
||||
orig_repository = repository
|
||||
namespace, repository = self.get_namespace(repository)
|
||||
self.log.info('Finish upload %s %s %s', namespace, repository, uuid)
|
||||
self.log.info('[u: %s] Upload final chunk %s %s',
|
||||
uuid, namespace, repository)
|
||||
old_length, new_length = self.storage.upload_chunk(
|
||||
namespace, uuid, cherrypy.request.body)
|
||||
self.log.debug('[u: %s] Store upload %s %s',
|
||||
uuid, namespace, repository)
|
||||
self.storage.store_upload(namespace, uuid, digest)
|
||||
self.log.info('[u: %s] Upload complete %s %s',
|
||||
uuid, namespace, repository)
|
||||
res = cherrypy.response
|
||||
res.headers['Location'] = '/v2/%s/blobs/%s' % (orig_repository, digest)
|
||||
res.headers['Docker-Content-Digest'] = digest
|
||||
@ -409,18 +415,20 @@ def main():
|
||||
help='Command: serve, prune',
|
||||
default='serve')
|
||||
args = parser.parse_args()
|
||||
logformat = '%(levelname)s %(name)s: %(message)s'
|
||||
logformat = '%(asctime)s %(levelname)s %(name)s: %(message)s'
|
||||
if args.debug or os.environ.get('DEBUG') == '1':
|
||||
logging.basicConfig(level=logging.DEBUG, format=logformat)
|
||||
logging.getLogger("openstack").setLevel(logging.DEBUG)
|
||||
logging.getLogger("urllib3").setLevel(logging.DEBUG)
|
||||
logging.getLogger("requests").setLevel(logging.DEBUG)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO, format=logformat)
|
||||
logging.getLogger("openstack").setLevel(logging.INFO)
|
||||
logging.getLogger("urllib3").setLevel(logging.ERROR)
|
||||
logging.getLogger("requests").setLevel(logging.ERROR)
|
||||
cherrypy.log.access_log.propagate = False
|
||||
logging.getLogger("requests").setLevel(logging.DEBUG)
|
||||
logging.getLogger("keystoneauth").setLevel(logging.ERROR)
|
||||
logging.getLogger("urllib3").setLevel(logging.DEBUG)
|
||||
logging.getLogger("stevedore").setLevel(logging.INFO)
|
||||
logging.getLogger("openstack").setLevel(logging.DEBUG)
|
||||
# cherrypy.log.error_log.propagate = False
|
||||
logging.getLogger("stevedore").setLevel(logging.ERROR)
|
||||
|
||||
s = RegistryServer(args.config)
|
||||
if args.command == 'serve':
|
||||
|
@ -19,6 +19,7 @@ import logging
|
||||
import os
|
||||
import queue
|
||||
import rehash
|
||||
import hashlib
|
||||
import threading
|
||||
import time
|
||||
from uuid import uuid4
|
||||
@ -167,7 +168,9 @@ class Storage:
|
||||
|
||||
def _update_upload(self, namespace, uuid, upload):
|
||||
path = os.path.join(namespace, 'uploads', uuid, 'metadata')
|
||||
self.backend.put_object(path, upload.dump())
|
||||
self.log.debug("[u: %s] Update upload metadata chunks: %s",
|
||||
uuid, upload.chunks)
|
||||
self.backend.put_object(path, upload.dump(), uuid)
|
||||
|
||||
def upload_chunk(self, namespace, uuid, fp):
|
||||
"""Add a chunk to an upload.
|
||||
@ -188,9 +191,13 @@ class Storage:
|
||||
path = os.path.join(namespace, 'uploads', uuid, str(upload.count + 1))
|
||||
streamer = UploadStreamer()
|
||||
t = threading.Thread(target=self.backend.put_object,
|
||||
args=(path, streamer))
|
||||
args=(path, streamer, uuid))
|
||||
t.start()
|
||||
size = 0
|
||||
# This calculates the md5 of just this chunk for internal
|
||||
# integrity checking; it is not the overall hash of the layer
|
||||
# (that's a running calculation in the upload record).
|
||||
chunk_hasher = hashlib.md5()
|
||||
while True:
|
||||
try:
|
||||
d = fp.read(4096)
|
||||
@ -200,11 +207,12 @@ class Storage:
|
||||
if not d:
|
||||
break
|
||||
upload.hasher.update(d)
|
||||
chunk_hasher.update(d)
|
||||
size += len(d)
|
||||
streamer.write(d)
|
||||
streamer.write(None)
|
||||
t.join()
|
||||
upload.chunks.append(dict(size=size))
|
||||
upload.chunks.append(dict(size=size, md5=chunk_hasher.hexdigest()))
|
||||
self._update_upload(namespace, uuid, upload)
|
||||
return upload.size - size, upload.size
|
||||
|
||||
@ -223,14 +231,15 @@ class Storage:
|
||||
# Move the chunks into the blob dir to get them out of the
|
||||
# uploads dir.
|
||||
chunks = []
|
||||
for i in range(1, upload.count + 1):
|
||||
src_path = os.path.join(namespace, 'uploads', uuid, str(i))
|
||||
dst_path = os.path.join(namespace, 'blobs', digest, str(i))
|
||||
chunks.append(dst_path)
|
||||
self.backend.move_object(src_path, dst_path)
|
||||
for i, chunk in enumerate(upload.chunks):
|
||||
src_path = os.path.join(namespace, 'uploads', uuid, str(i + 1))
|
||||
dst_path = os.path.join(namespace, 'blobs', digest, str(i + 1))
|
||||
chunks.append(dict(path=dst_path,
|
||||
md5=chunk['md5'], size=chunk['size']))
|
||||
self.backend.move_object(src_path, dst_path, uuid)
|
||||
# Concatenate the chunks into one blob.
|
||||
path = os.path.join(namespace, 'blobs', digest, 'data')
|
||||
self.backend.cat_objects(path, chunks)
|
||||
self.backend.cat_objects(path, chunks, uuid)
|
||||
path = os.path.join(namespace, 'uploads', uuid, 'metadata')
|
||||
self.backend.delete_object(path)
|
||||
|
||||
|
@ -94,7 +94,7 @@ class SwiftDriver(storageutils.StorageDriver):
|
||||
return None
|
||||
return ret.headers['Content-Length']
|
||||
|
||||
def put_object(self, path, data):
|
||||
def put_object(self, path, data, uuid=None):
|
||||
name = None
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile('wb', delete=False) as f:
|
||||
@ -109,6 +109,24 @@ class SwiftDriver(storageutils.StorageDriver):
|
||||
self.container_name,
|
||||
path,
|
||||
filename=name))
|
||||
|
||||
# Get the md5sum and size of the object, and make sure it
|
||||
# matches the upload.
|
||||
ret = retry_function(lambda: self.conn.session.head(
|
||||
self.get_url(path)))
|
||||
try:
|
||||
size = int(ret.headers.get('Content-Length', ''))
|
||||
except ValueError:
|
||||
size = None
|
||||
md5 = ret.headers.get('Etag', '')
|
||||
sdk_md5 = ret.headers.get('X-Object-Meta-X-Sdk-Md5', '')
|
||||
self.log.debug("[u: %s] Upload object %s "
|
||||
"md5: %s sdkmd5: %s size: %s",
|
||||
uuid, path, md5, sdk_md5, size)
|
||||
if md5 != sdk_md5:
|
||||
raise Exception("Swift and SDK md5s did not match (u: %s)" %
|
||||
uuid)
|
||||
|
||||
finally:
|
||||
if name:
|
||||
os.unlink(name)
|
||||
@ -138,28 +156,87 @@ class SwiftDriver(storageutils.StorageDriver):
|
||||
lambda: self.conn.session.delete(
|
||||
self.get_url(path)))
|
||||
|
||||
def move_object(self, src_path, dst_path):
|
||||
def move_object(self, src_path, dst_path, uuid=None):
|
||||
dst = os.path.join(self.container_name, dst_path)
|
||||
|
||||
# Get the md5sum and size of the object, and make sure it
|
||||
# matches on both sides of the copy.
|
||||
ret = retry_function(lambda: self.conn.session.head(
|
||||
self.get_url(src_path)))
|
||||
try:
|
||||
size = int(ret.headers.get('Content-Length', ''))
|
||||
except ValueError:
|
||||
size = None
|
||||
md5 = ret.headers.get('Etag', '')
|
||||
sdk_md5 = ret.headers.get('X-Object-Meta-X-Sdk-Md5', '')
|
||||
old_md = dict(md5=md5, sdk_md5=sdk_md5, size=size)
|
||||
self.log.debug("[u: %s] Move object %s %s %s",
|
||||
uuid, src_path, dst_path, old_md)
|
||||
if md5 != sdk_md5:
|
||||
raise Exception("Swift and SDK md5s did not match at start "
|
||||
"of copy (u: %s) %s" % (uuid, old_md))
|
||||
|
||||
# FIXME: The multipart-manifest argument below means that in
|
||||
# the event this docker chunk is a large object, we intend to
|
||||
# copy the manifest but not the underlying large object
|
||||
# segments. That seems incorrect, and we should actually just
|
||||
# recast the large object segments into docker chunks and
|
||||
# discard this manifest. But first we should verify that's
|
||||
# what's happening -- it's not clear we ever hit a segment
|
||||
# limit in practice, so we may never have a large object
|
||||
# chunk.
|
||||
retry_function(
|
||||
lambda: self.conn.session.request(
|
||||
self.get_url(src_path) + "?multipart-manfest=get",
|
||||
'COPY',
|
||||
headers={'Destination': dst}
|
||||
))
|
||||
|
||||
# Get the md5sum and size of the object, and make sure it
|
||||
# matches on both sides of the copy.
|
||||
ret = retry_function(lambda: self.conn.session.head(
|
||||
self.get_url(dst_path)))
|
||||
try:
|
||||
size = int(ret.headers.get('Content-Length', ''))
|
||||
except ValueError:
|
||||
size = None
|
||||
md5 = ret.headers.get('Etag', '')
|
||||
sdk_md5 = ret.headers.get('X-Object-Meta-X-Sdk-Md5', '')
|
||||
new_md = dict(md5=md5, sdk_md5=sdk_md5, size=size)
|
||||
self.log.debug("[u: %s] Moved object %s %s %s",
|
||||
uuid, src_path, dst_path, new_md)
|
||||
if md5 != sdk_md5:
|
||||
raise Exception("Swift and SDK md5s did not match at end of copy "
|
||||
"(u: %s) %s" % (uuid, new_md))
|
||||
if old_md != new_md:
|
||||
raise Exception("Object metadata did not match after copy "
|
||||
"(u: %s) old: %s new: %s" % (uuid, old_md, new_md))
|
||||
|
||||
retry_function(
|
||||
lambda: self.conn.session.delete(
|
||||
self.get_url(src_path)))
|
||||
|
||||
def cat_objects(self, path, chunks):
|
||||
def cat_objects(self, path, chunks, uuid=None):
|
||||
manifest = []
|
||||
# TODO: Would it be better to move 1-chunk objects?
|
||||
for chunk_path in chunks:
|
||||
for chunk in chunks:
|
||||
ret = retry_function(
|
||||
lambda: self.conn.session.head(self.get_url(chunk_path)))
|
||||
if int(ret.headers['Content-Length']) == 0:
|
||||
lambda: self.conn.session.head(self.get_url(chunk['path'])))
|
||||
size = int(ret.headers['Content-Length'])
|
||||
if size == 0:
|
||||
continue
|
||||
etag = ret.headers['Etag']
|
||||
sdk_md5 = ret.headers['X-Object-Meta-X-Sdk-Md5']
|
||||
if not (sdk_md5 == etag == chunk['md5']):
|
||||
raise Exception("Object metadata did not match during cat "
|
||||
"(u: %s) orig: %s sdk: %s etag: %s" % (
|
||||
uuid, chunk['md5'], sdk_md5, etag))
|
||||
if not (size == chunk['size']):
|
||||
raise Exception("Object metadata did not match during cat "
|
||||
"(u: %s) orig: %s size: %s" % (
|
||||
uuid, chunk['size'], size))
|
||||
manifest.append({'path':
|
||||
os.path.join(self.container_name, chunk_path),
|
||||
os.path.join(self.container_name, chunk['path']),
|
||||
'etag': ret.headers['Etag'],
|
||||
'size_bytes': ret.headers['Content-Length']})
|
||||
retry_function(lambda:
|
||||
|
Loading…
Reference in New Issue
Block a user