From f7eec937c8e71981cdbeebb4cfbd79d2c6ee190b Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 31 Mar 2020 15:45:09 -0700 Subject: [PATCH] Add debug/verification for uploads Very rarely, we see an object in swift doesn't match the sha256 we expect. Nor does the Etag (swift-calculated md5sum) match the md5sum that openstacksdk calculated on upload. Something is going wrong somewhere, but nothing is raising an exception. This change adds considerable debugging to attempt to localize the point at which the data in swift is not what we expect. It also raises exceptions in those cases to attempt to cause the upload to fail (which in a content-addressible storage system is better than having the wrong content at a given address). If we see uploads fail, we can track down the log entries to find out why. * Add more log lines to swift driver * Include upload UUID in more log lines * Calculate and keep track of each chunk's md5 during upload * Verify that the local and swift-calculated md5sums match after each chunk upload * Verify that all metadata for each chunk matches before and after the move * Verify that all metadata for each chunk matches the original calculation before finalizing the final large-object manifest. * Adjust logging verbosity for debug and non-debug modes Change-Id: If9f7cfe7d4be2f23de80e9d3659af9c060d3f22c --- zuul_registry/filesystem.py | 14 +++--- zuul_registry/main.py | 30 +++++++----- zuul_registry/storage.py | 27 +++++++---- zuul_registry/swift.py | 91 ++++++++++++++++++++++++++++++++++--- 4 files changed, 128 insertions(+), 34 deletions(-) diff --git a/zuul_registry/filesystem.py b/zuul_registry/filesystem.py index 087efed..6cdb102 100644 --- a/zuul_registry/filesystem.py +++ b/zuul_registry/filesystem.py @@ -43,7 +43,7 @@ class FilesystemDriver(storageutils.StorageDriver): return None return os.stat(path).st_size - def put_object(self, path, data): + def put_object(self, path, data, uuid=None): path = os.path.join(self.root, path) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'wb') as f: @@ -99,26 +99,26 @@ class FilesystemDriver(storageutils.StorageDriver): else: os.unlink(path) - def move_object(self, src_path, dst_path): + def move_object(self, src_path, dst_path, uuid=None): src_path = os.path.join(self.root, src_path) dst_path = os.path.join(self.root, dst_path) os.makedirs(os.path.dirname(dst_path), exist_ok=True) os.rename(src_path, dst_path) - def cat_objects(self, path, chunks): + def cat_objects(self, path, chunks, uuid=None): path = os.path.join(self.root, path) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'wb') as outf: - for chunk_path in chunks: - chunk_path = os.path.join(self.root, chunk_path) + for chunk in chunks: + chunk_path = os.path.join(self.root, chunk['path']) with open(chunk_path, 'rb') as inf: while True: d = inf.read(4096) if not d: break outf.write(d) - for chunk_path in chunks: - chunk_path = os.path.join(self.root, chunk_path) + for chunk in chunks: + chunk_path = os.path.join(self.root, chunk['path']) os.unlink(chunk_path) diff --git a/zuul_registry/main.py b/zuul_registry/main.py index d1b00dc..b51362b 100644 --- a/zuul_registry/main.py +++ b/zuul_registry/main.py @@ -196,8 +196,8 @@ class RegistryAPI: namespace, repository = self.get_namespace(repository) method = cherrypy.request.method uuid = self.storage.start_upload(namespace) - self.log.info('Start upload %s %s %s uuid %s digest %s', - method, namespace, repository, uuid, digest) + self.log.info('[u: %s] Start upload %s %s %s digest %s', + uuid, method, namespace, repository, digest) res = cherrypy.response res.headers['Location'] = '/v2/%s/blobs/uploads/%s' % ( orig_repository, uuid) @@ -210,7 +210,8 @@ class RegistryAPI: def upload_chunk(self, repository, uuid): orig_repository = repository namespace, repository = self.get_namespace(repository) - self.log.info('Upload chunk %s %s %s', namespace, repository, uuid) + self.log.info('[u: %s] Upload chunk %s %s', + uuid, namespace, repository) old_length, new_length = self.storage.upload_chunk( namespace, uuid, cherrypy.request.body) res = cherrypy.response @@ -220,17 +221,22 @@ class RegistryAPI: res.headers['Range'] = '0-%s' % (new_length,) res.status = '204 No Content' self.log.info( - 'Finish Upload chunk %s %s %s', repository, uuid, new_length) + '[u: %s] Finish Upload chunk %s %s', uuid, repository, new_length) @cherrypy.expose @cherrypy.config(**{'tools.check_auth.level': Authorization.WRITE}) def finish_upload(self, repository, uuid, digest): orig_repository = repository namespace, repository = self.get_namespace(repository) - self.log.info('Finish upload %s %s %s', namespace, repository, uuid) + self.log.info('[u: %s] Upload final chunk %s %s', + uuid, namespace, repository) old_length, new_length = self.storage.upload_chunk( namespace, uuid, cherrypy.request.body) + self.log.debug('[u: %s] Store upload %s %s', + uuid, namespace, repository) self.storage.store_upload(namespace, uuid, digest) + self.log.info('[u: %s] Upload complete %s %s', + uuid, namespace, repository) res = cherrypy.response res.headers['Location'] = '/v2/%s/blobs/%s' % (orig_repository, digest) res.headers['Docker-Content-Digest'] = digest @@ -409,18 +415,20 @@ def main(): help='Command: serve, prune', default='serve') args = parser.parse_args() - logformat = '%(levelname)s %(name)s: %(message)s' + logformat = '%(asctime)s %(levelname)s %(name)s: %(message)s' if args.debug or os.environ.get('DEBUG') == '1': logging.basicConfig(level=logging.DEBUG, format=logformat) + logging.getLogger("openstack").setLevel(logging.DEBUG) + logging.getLogger("urllib3").setLevel(logging.DEBUG) + logging.getLogger("requests").setLevel(logging.DEBUG) else: logging.basicConfig(level=logging.INFO, format=logformat) + logging.getLogger("openstack").setLevel(logging.INFO) + logging.getLogger("urllib3").setLevel(logging.ERROR) + logging.getLogger("requests").setLevel(logging.ERROR) cherrypy.log.access_log.propagate = False - logging.getLogger("requests").setLevel(logging.DEBUG) logging.getLogger("keystoneauth").setLevel(logging.ERROR) - logging.getLogger("urllib3").setLevel(logging.DEBUG) - logging.getLogger("stevedore").setLevel(logging.INFO) - logging.getLogger("openstack").setLevel(logging.DEBUG) - # cherrypy.log.error_log.propagate = False + logging.getLogger("stevedore").setLevel(logging.ERROR) s = RegistryServer(args.config) if args.command == 'serve': diff --git a/zuul_registry/storage.py b/zuul_registry/storage.py index 030b6c6..126d7ef 100644 --- a/zuul_registry/storage.py +++ b/zuul_registry/storage.py @@ -19,6 +19,7 @@ import logging import os import queue import rehash +import hashlib import threading import time from uuid import uuid4 @@ -167,7 +168,9 @@ class Storage: def _update_upload(self, namespace, uuid, upload): path = os.path.join(namespace, 'uploads', uuid, 'metadata') - self.backend.put_object(path, upload.dump()) + self.log.debug("[u: %s] Update upload metadata chunks: %s", + uuid, upload.chunks) + self.backend.put_object(path, upload.dump(), uuid) def upload_chunk(self, namespace, uuid, fp): """Add a chunk to an upload. @@ -188,9 +191,13 @@ class Storage: path = os.path.join(namespace, 'uploads', uuid, str(upload.count + 1)) streamer = UploadStreamer() t = threading.Thread(target=self.backend.put_object, - args=(path, streamer)) + args=(path, streamer, uuid)) t.start() size = 0 + # This calculates the md5 of just this chunk for internal + # integrity checking; it is not the overall hash of the layer + # (that's a running calculation in the upload record). + chunk_hasher = hashlib.md5() while True: try: d = fp.read(4096) @@ -200,11 +207,12 @@ class Storage: if not d: break upload.hasher.update(d) + chunk_hasher.update(d) size += len(d) streamer.write(d) streamer.write(None) t.join() - upload.chunks.append(dict(size=size)) + upload.chunks.append(dict(size=size, md5=chunk_hasher.hexdigest())) self._update_upload(namespace, uuid, upload) return upload.size - size, upload.size @@ -223,14 +231,15 @@ class Storage: # Move the chunks into the blob dir to get them out of the # uploads dir. chunks = [] - for i in range(1, upload.count + 1): - src_path = os.path.join(namespace, 'uploads', uuid, str(i)) - dst_path = os.path.join(namespace, 'blobs', digest, str(i)) - chunks.append(dst_path) - self.backend.move_object(src_path, dst_path) + for i, chunk in enumerate(upload.chunks): + src_path = os.path.join(namespace, 'uploads', uuid, str(i + 1)) + dst_path = os.path.join(namespace, 'blobs', digest, str(i + 1)) + chunks.append(dict(path=dst_path, + md5=chunk['md5'], size=chunk['size'])) + self.backend.move_object(src_path, dst_path, uuid) # Concatenate the chunks into one blob. path = os.path.join(namespace, 'blobs', digest, 'data') - self.backend.cat_objects(path, chunks) + self.backend.cat_objects(path, chunks, uuid) path = os.path.join(namespace, 'uploads', uuid, 'metadata') self.backend.delete_object(path) diff --git a/zuul_registry/swift.py b/zuul_registry/swift.py index 25027c1..c086fef 100644 --- a/zuul_registry/swift.py +++ b/zuul_registry/swift.py @@ -94,7 +94,7 @@ class SwiftDriver(storageutils.StorageDriver): return None return ret.headers['Content-Length'] - def put_object(self, path, data): + def put_object(self, path, data, uuid=None): name = None try: with tempfile.NamedTemporaryFile('wb', delete=False) as f: @@ -109,6 +109,24 @@ class SwiftDriver(storageutils.StorageDriver): self.container_name, path, filename=name)) + + # Get the md5sum and size of the object, and make sure it + # matches the upload. + ret = retry_function(lambda: self.conn.session.head( + self.get_url(path))) + try: + size = int(ret.headers.get('Content-Length', '')) + except ValueError: + size = None + md5 = ret.headers.get('Etag', '') + sdk_md5 = ret.headers.get('X-Object-Meta-X-Sdk-Md5', '') + self.log.debug("[u: %s] Upload object %s " + "md5: %s sdkmd5: %s size: %s", + uuid, path, md5, sdk_md5, size) + if md5 != sdk_md5: + raise Exception("Swift and SDK md5s did not match (u: %s)" % + uuid) + finally: if name: os.unlink(name) @@ -138,28 +156,87 @@ class SwiftDriver(storageutils.StorageDriver): lambda: self.conn.session.delete( self.get_url(path))) - def move_object(self, src_path, dst_path): + def move_object(self, src_path, dst_path, uuid=None): dst = os.path.join(self.container_name, dst_path) + + # Get the md5sum and size of the object, and make sure it + # matches on both sides of the copy. + ret = retry_function(lambda: self.conn.session.head( + self.get_url(src_path))) + try: + size = int(ret.headers.get('Content-Length', '')) + except ValueError: + size = None + md5 = ret.headers.get('Etag', '') + sdk_md5 = ret.headers.get('X-Object-Meta-X-Sdk-Md5', '') + old_md = dict(md5=md5, sdk_md5=sdk_md5, size=size) + self.log.debug("[u: %s] Move object %s %s %s", + uuid, src_path, dst_path, old_md) + if md5 != sdk_md5: + raise Exception("Swift and SDK md5s did not match at start " + "of copy (u: %s) %s" % (uuid, old_md)) + + # FIXME: The multipart-manifest argument below means that in + # the event this docker chunk is a large object, we intend to + # copy the manifest but not the underlying large object + # segments. That seems incorrect, and we should actually just + # recast the large object segments into docker chunks and + # discard this manifest. But first we should verify that's + # what's happening -- it's not clear we ever hit a segment + # limit in practice, so we may never have a large object + # chunk. retry_function( lambda: self.conn.session.request( self.get_url(src_path) + "?multipart-manfest=get", 'COPY', headers={'Destination': dst} )) + + # Get the md5sum and size of the object, and make sure it + # matches on both sides of the copy. + ret = retry_function(lambda: self.conn.session.head( + self.get_url(dst_path))) + try: + size = int(ret.headers.get('Content-Length', '')) + except ValueError: + size = None + md5 = ret.headers.get('Etag', '') + sdk_md5 = ret.headers.get('X-Object-Meta-X-Sdk-Md5', '') + new_md = dict(md5=md5, sdk_md5=sdk_md5, size=size) + self.log.debug("[u: %s] Moved object %s %s %s", + uuid, src_path, dst_path, new_md) + if md5 != sdk_md5: + raise Exception("Swift and SDK md5s did not match at end of copy " + "(u: %s) %s" % (uuid, new_md)) + if old_md != new_md: + raise Exception("Object metadata did not match after copy " + "(u: %s) old: %s new: %s" % (uuid, old_md, new_md)) + retry_function( lambda: self.conn.session.delete( self.get_url(src_path))) - def cat_objects(self, path, chunks): + def cat_objects(self, path, chunks, uuid=None): manifest = [] # TODO: Would it be better to move 1-chunk objects? - for chunk_path in chunks: + for chunk in chunks: ret = retry_function( - lambda: self.conn.session.head(self.get_url(chunk_path))) - if int(ret.headers['Content-Length']) == 0: + lambda: self.conn.session.head(self.get_url(chunk['path']))) + size = int(ret.headers['Content-Length']) + if size == 0: continue + etag = ret.headers['Etag'] + sdk_md5 = ret.headers['X-Object-Meta-X-Sdk-Md5'] + if not (sdk_md5 == etag == chunk['md5']): + raise Exception("Object metadata did not match during cat " + "(u: %s) orig: %s sdk: %s etag: %s" % ( + uuid, chunk['md5'], sdk_md5, etag)) + if not (size == chunk['size']): + raise Exception("Object metadata did not match during cat " + "(u: %s) orig: %s size: %s" % ( + uuid, chunk['size'], size)) manifest.append({'path': - os.path.join(self.container_name, chunk_path), + os.path.join(self.container_name, chunk['path']), 'etag': ret.headers['Etag'], 'size_bytes': ret.headers['Content-Length']}) retry_function(lambda: