From 9fb406e056f9e549378c59beb0b4b45e22ece879 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 2 Jan 2024 16:13:06 -0800 Subject: [PATCH] Add s3 support This adds an s3 backend driver. Change-Id: I2e005b2a4a86bd039d0b08878cd6f4c1156d465d --- requirements.txt | 1 + tools/conf.yaml | 14 ++- zuul_registry/main.py | 2 + zuul_registry/s3.py | 220 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 232 insertions(+), 5 deletions(-) create mode 100644 zuul_registry/s3.py diff --git a/requirements.txt b/requirements.txt index 302ccea..7d78a3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ git+https://github.com/jeblair/sha256 cython wheel pyjwt>=2.0.0,<3.0.0 +boto3 diff --git a/tools/conf.yaml b/tools/conf.yaml index b334253..adda053 100644 --- a/tools/conf.yaml +++ b/tools/conf.yaml @@ -3,10 +3,6 @@ registry: port: 9000 tls-cert: /path/to/cert.pem tls-key: /path/to/cert.key - #shadow: - # docker.io: - # hostname: local-docker-mirror.example.com - # port: 443 users: - name: testuser pass: testpass @@ -14,10 +10,18 @@ registry: - name: anonymous pass: '' access: read - xstorage: + # Choose one: + storage: driver: swift cloud: registry container: test_registry + storage: + driver: s3 + # endpoint: optional url + # region: optional string + bucket: bucketname + access-key: api access key + secret-key: api secret key storage: driver: filesystem root: /tmp/storage diff --git a/zuul_registry/main.py b/zuul_registry/main.py index 076b0db..50d91dd 100644 --- a/zuul_registry/main.py +++ b/zuul_registry/main.py @@ -29,12 +29,14 @@ import yaml from . import filesystem from . import storage from . import swift +from . import s3 import jwt DRIVERS = { 'filesystem': filesystem.Driver, 'swift': swift.Driver, + 's3': s3.Driver, } diff --git a/zuul_registry/s3.py b/zuul_registry/s3.py new file mode 100644 index 0000000..fdfa2eb --- /dev/null +++ b/zuul_registry/s3.py @@ -0,0 +1,220 @@ +# Copyright 2024 Acme Gating, LLC +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +import logging +import tempfile + +import boto3 +import botocore + +from . import storageutils + +MULTIPART_MIN_SIZE = 5 * 1024 * 1024 # 5 MiB + +# NOTE: This currently lacks support for multipart files since in +# practice we don't see docker uploading objects larger than 5GB. + +logging.getLogger('boto3').setLevel(logging.ERROR) +logging.getLogger('botocore').setLevel(logging.ERROR) + + +class S3Driver(storageutils.StorageDriver): + log = logging.getLogger('registry.s3') + + def __init__(self, conf): + endpoint = conf.get('endpoint', + 'https://s3.amazonaws.com/') + + self.s3 = boto3.client( + 's3', + endpoint_url=endpoint, + aws_access_key_id=conf['access-key'], + aws_secret_access_key=conf['secret-key'], + region_name=conf.get('region'), + ) + self.bucket = conf['bucket'] + + def list_objects(self, path): + self.log.debug("List objects %s", path) + paginator = self.s3.get_paginator('list_objects_v2') + ret = [] + for page in paginator.paginate( + Bucket=self.bucket, + Delimeter='/', + Prefix=path): + for obj in page['Contents']: + objpath = obj['Key'] + name = obj['Key'].split('/')[-1] + ctime = obj['LastModified'] + isdir = False + ret.append(storageutils.ObjectInfo( + objpath, name, ctime, isdir)) + return ret + + def get_object_size(self, path): + try: + obj = self.s3.head_object( + Bucket=self.bucket, + Key=path) + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] in ["404", "NoSuchKey"]: + return None + raise + return int(obj['ContentLength']) + + def put_object(self, path, data, uuid=None): + if not isinstance(data, bytes): + with tempfile.TemporaryFile('w+b') as f: + for chunk in data: + f.write(chunk) + f.seek(0) + self.s3.put_object( + Bucket=self.bucket, + Key=path, + Body=f, + ) + else: + self.s3.put_object( + Bucket=self.bucket, + Key=path, + Body=data, + ) + + obj = self.s3.head_object( + Bucket=self.bucket, + Key=path, + ChecksumMode='ENABLED', + ) + s3_digest = obj['ETag'] + size = obj['ContentLength'] + + # Get the hash and size of the object, and make sure it + # matches the upload. + self.log.debug("[u: %s] Upload object %s " + "md5: %s size: %s", + uuid, path, s3_digest, size) + + def get_object(self, path): + try: + obj = self.s3.get_object( + Bucket=self.bucket, + Key=path) + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] in ["404", "NoSuchKey"]: + return None + raise + return obj['Body'].read() + + def stream_object(self, path): + try: + obj = self.s3.get_object( + Bucket=self.bucket, + Key=path) + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] in ["404", "NoSuchKey"]: + return None + raise + try: + size = int(obj.get('ContentLength', '')) + except ValueError: + size = None + return size, obj['Body'] + + def delete_object(self, path): + self.s3.delete_object( + Bucket=self.bucket, + Key=path) + + def move_object(self, src_path, dst_path, uuid=None): + obj = self.s3.head_object( + Bucket=self.bucket, + Key=src_path) + s3_digest = obj['ETag'] + size = obj['ContentLength'] + old_md = dict(sha255=s3_digest, size=size) + + self.log.debug("[u: %s] Move object %s %s %s", + uuid, src_path, dst_path, old_md) + + self.s3.copy_object( + Bucket=self.bucket, + CopySource={'Bucket': self.bucket, 'Key': src_path}, + Key=dst_path) + + obj = self.s3.head_object( + Bucket=self.bucket, + Key=dst_path) + s3_digest = obj['ETag'] + size = obj['ContentLength'] + new_md = dict(sha255=s3_digest, size=size) + + self.log.debug("[u: %s] Moved object %s %s %s", + uuid, src_path, dst_path, new_md) + if old_md != new_md: + raise Exception("Object metadata did not match after copy " + "(u: %s) old: %s new: %s" % (uuid, old_md, new_md)) + + self.s3.delete_object( + Bucket=self.bucket, + Key=src_path) + + def cat_objects(self, path, chunks, uuid=None): + chunks = [c for c in chunks if c['size']] + + if len(chunks) == 1: + self.move_object(chunks[0]['path'], path, uuid) + return + + for chunk, i in enumerate(chunks): + last = (i + 1 == len(chunks)) + if not last and chunk['size'] < MULTIPART_MIN_SIZE: + raise Exception(f"Chunk {i} of {len(chunks)} with size " + f"{chunk['size']} is less than minimum") + obj = self.s3.head_object( + Bucket=self.bucket, + Key=path) + size = obj['ContentLength'] + if not (size == chunk['size']): + raise Exception("Object metadata did not match during cat " + "(u: %s) orig: %s size: %s" % ( + uuid, chunk['size'], size)) + + upload = self.s3.create_multipart_upload( + Bucket=self.bucket, + Key=path, + ) + parts = [] + for chunk, i in enumerate(chunks): + result = self.s3.upload_part_copy( + Bucket=self.bucket, + Key=path, + CopySource={'Bucket': self.bucket, 'Key': chunk['path']}, + PartNumber=i, + UploadId=upload['UploadId'], + ) + part = result['CopyPartResult'] + part['PartNumber'] = i + parts.append(part) + self.s3.complete_multipart_upload( + Bucket=self.bucket, + Key=path, + MultipartUpload={ + 'Parts': parts, + }, + UploadId=upload['UploadId'], + ) + + +Driver = S3Driver