# Copyright (c) 2019 SwiftStack, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import time import hashlib from collections import defaultdict from botocore.exceptions import ClientError import six from swift.common.header_key_dict import HeaderKeyDict from test.s3api import BaseS3TestCase def retry(f, timeout=10): timelimit = time.time() + timeout while True: try: f() except (ClientError, AssertionError): if time.time() > timelimit: raise continue else: break class TestObjectVersioning(BaseS3TestCase): maxDiff = None def setUp(self): self.client = self.get_s3_client(1) self.bucket_name = self.create_name('versioning') resp = self.client.create_bucket(Bucket=self.bucket_name) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) def enable_versioning(): resp = self.client.put_bucket_versioning( Bucket=self.bucket_name, VersioningConfiguration={'Status': 'Enabled'}) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) retry(enable_versioning) def tearDown(self): resp = self.client.put_bucket_versioning( Bucket=self.bucket_name, VersioningConfiguration={'Status': 'Suspended'}) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) self.clear_bucket(self.client, self.bucket_name) super(TestObjectVersioning, self).tearDown() def test_setup(self): bucket_name = self.create_name('new-bucket') resp = self.client.create_bucket(Bucket=bucket_name) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) expected_location = '/%s' % bucket_name self.assertEqual(expected_location, resp['Location']) headers = HeaderKeyDict(resp['ResponseMetadata']['HTTPHeaders']) self.assertEqual('0', headers['content-length']) self.assertEqual(expected_location, headers['location']) # get versioning resp = self.client.get_bucket_versioning(Bucket=bucket_name) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) self.assertNotIn('Status', resp) # put versioning versioning_config = { 'Status': 'Enabled', } resp = self.client.put_bucket_versioning( Bucket=bucket_name, VersioningConfiguration=versioning_config) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) # ... now it's enabled def check_status(): resp = self.client.get_bucket_versioning(Bucket=bucket_name) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) try: self.assertEqual('Enabled', resp['Status']) except KeyError: self.fail('Status was not in %r' % resp) retry(check_status) # send over some bogus junk versioning_config['Status'] = 'Disabled' with self.assertRaises(ClientError) as ctx: self.client.put_bucket_versioning( Bucket=bucket_name, VersioningConfiguration=versioning_config) expected_err = 'An error occurred (MalformedXML) when calling the ' \ 'PutBucketVersioning operation: The XML you provided was ' \ 'not well-formed or did not validate against our published schema' self.assertEqual(expected_err, str(ctx.exception)) # disable it versioning_config['Status'] = 'Suspended' resp = self.client.put_bucket_versioning( Bucket=bucket_name, VersioningConfiguration=versioning_config) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) # ... now it's disabled again def check_status(): resp = self.client.get_bucket_versioning(Bucket=bucket_name) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) self.assertEqual('Suspended', resp['Status']) retry(check_status) def test_upload_fileobj_versioned(self): obj_data = self.create_name('some-data').encode('ascii') obj_etag = hashlib.md5(obj_data).hexdigest() obj_name = self.create_name('versioned-obj') self.client.upload_fileobj(six.BytesIO(obj_data), self.bucket_name, obj_name) # object is in the listing resp = self.client.list_objects_v2(Bucket=self.bucket_name) objs = resp.get('Contents', []) for obj in objs: obj.pop('LastModified') self.assertEqual([{ 'ETag': '"%s"' % obj_etag, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }], objs) # object version listing resp = self.client.list_object_versions(Bucket=self.bucket_name) objs = resp.get('Versions', []) for obj in objs: obj.pop('LastModified') obj.pop('Owner') obj.pop('VersionId') self.assertEqual([{ 'ETag': '"%s"' % obj_etag, 'IsLatest': True, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }], objs) # overwrite the object new_obj_data = self.create_name('some-new-data').encode('ascii') new_obj_etag = hashlib.md5(new_obj_data).hexdigest() self.client.upload_fileobj(six.BytesIO(new_obj_data), self.bucket_name, obj_name) # new object is in the listing resp = self.client.list_objects_v2(Bucket=self.bucket_name) objs = resp.get('Contents', []) for obj in objs: obj.pop('LastModified') self.assertEqual([{ 'ETag': '"%s"' % new_obj_etag, 'Key': obj_name, 'Size': len(new_obj_data), 'StorageClass': 'STANDARD', }], objs) # both object versions in the versions listing resp = self.client.list_object_versions(Bucket=self.bucket_name) objs = resp.get('Versions', []) for obj in objs: obj.pop('LastModified') obj.pop('Owner') obj.pop('VersionId') self.assertEqual([{ 'ETag': '"%s"' % new_obj_etag, 'IsLatest': True, 'Key': obj_name, 'Size': len(new_obj_data), 'StorageClass': 'STANDARD', }, { 'ETag': '"%s"' % obj_etag, 'IsLatest': False, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }], objs) def test_delete_versioned_objects(self): etags = [] obj_name = self.create_name('versioned-obj') for i in range(3): obj_data = self.create_name('some-data-%s' % i).encode('ascii') etags.insert(0, hashlib.md5(obj_data).hexdigest()) self.client.upload_fileobj(six.BytesIO(obj_data), self.bucket_name, obj_name) # only one object appears in the listing resp = self.client.list_objects_v2(Bucket=self.bucket_name) objs = resp.get('Contents', []) for obj in objs: obj.pop('LastModified') self.assertEqual([{ 'ETag': '"%s"' % etags[0], 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }], objs) # but everything is layed out in the object versions listing resp = self.client.list_object_versions(Bucket=self.bucket_name) objs = resp.get('Versions', []) versions = [] for obj in objs: obj.pop('LastModified') obj.pop('Owner') versions.append(obj.pop('VersionId')) self.assertEqual([{ 'ETag': '"%s"' % etags[0], 'IsLatest': True, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }, { 'ETag': '"%s"' % etags[1], 'IsLatest': False, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }, { 'ETag': '"%s"' % etags[2], 'IsLatest': False, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }], objs) # we can delete a specific version resp = self.client.delete_object(Bucket=self.bucket_name, Key=obj_name, VersionId=versions[1]) # and that just pulls it out of the versions listing resp = self.client.list_object_versions(Bucket=self.bucket_name) objs = resp.get('Versions', []) for obj in objs: obj.pop('LastModified') obj.pop('Owner') obj.pop('VersionId') self.assertEqual([{ 'ETag': '"%s"' % etags[0], 'IsLatest': True, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }, { 'ETag': '"%s"' % etags[2], 'IsLatest': False, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }], objs) # ... but the current listing is unaffected resp = self.client.list_objects_v2(Bucket=self.bucket_name) objs = resp.get('Contents', []) for obj in objs: obj.pop('LastModified') self.assertEqual([{ 'ETag': '"%s"' % etags[0], 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }], objs) # OTOH, if you delete specifically the latest version # we can delete a specific version resp = self.client.delete_object(Bucket=self.bucket_name, Key=obj_name, VersionId=versions[0]) # the versions listing has a new IsLatest resp = self.client.list_object_versions(Bucket=self.bucket_name) objs = resp.get('Versions', []) for obj in objs: obj.pop('LastModified') obj.pop('Owner') obj.pop('VersionId') self.assertEqual([{ 'ETag': '"%s"' % etags[2], 'IsLatest': True, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }], objs) # and the stack pops resp = self.client.list_objects_v2(Bucket=self.bucket_name) objs = resp.get('Contents', []) for obj in objs: obj.pop('LastModified') self.assertEqual([{ 'ETag': '"%s"' % etags[2], 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }], objs) def test_delete_versioned_deletes(self): etags = [] obj_name = self.create_name('versioned-obj') for i in range(3): obj_data = self.create_name('some-data-%s' % i).encode('ascii') etags.insert(0, hashlib.md5(obj_data).hexdigest()) self.client.upload_fileobj(six.BytesIO(obj_data), self.bucket_name, obj_name) # and make a delete marker self.client.delete_object(Bucket=self.bucket_name, Key=obj_name) # current listing is empty resp = self.client.list_objects_v2(Bucket=self.bucket_name) objs = resp.get('Contents', []) self.assertEqual([], objs) # but everything is in layed out in the versions listing resp = self.client.list_object_versions(Bucket=self.bucket_name) objs = resp.get('Versions', []) versions = [] for obj in objs: obj.pop('LastModified') obj.pop('Owner') versions.append(obj.pop('VersionId')) self.assertEqual([{ 'ETag': '"%s"' % etag, 'IsLatest': False, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', } for etag in etags], objs) # ... plus the delete markers delete_markers = resp.get('DeleteMarkers', []) marker_versions = [] for marker in delete_markers: marker.pop('LastModified') marker.pop('Owner') marker_versions.append(marker.pop('VersionId')) self.assertEqual([{ 'Key': obj_name, 'IsLatest': is_latest, } for is_latest in (True, False, False)], delete_markers) # delete an old delete markers resp = self.client.delete_object(Bucket=self.bucket_name, Key=obj_name, VersionId=marker_versions[2]) # since IsLatest is still marker we'll raise NoSuchKey with self.assertRaises(ClientError) as caught: resp = self.client.get_object(Bucket=self.bucket_name, Key=obj_name) expected_err = 'An error occurred (NoSuchKey) when calling the ' \ 'GetObject operation: The specified key does not exist.' self.assertEqual(expected_err, str(caught.exception)) # now delete the delete marker (IsLatest) resp = self.client.delete_object(Bucket=self.bucket_name, Key=obj_name, VersionId=marker_versions[0]) # most recent version is now latest resp = self.client.get_object(Bucket=self.bucket_name, Key=obj_name) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) self.assertEqual('"%s"' % etags[0], resp['ETag']) # now delete the IsLatest object version resp = self.client.delete_object(Bucket=self.bucket_name, Key=obj_name, VersionId=versions[0]) # and object is deleted again with self.assertRaises(ClientError) as caught: resp = self.client.get_object(Bucket=self.bucket_name, Key=obj_name) expected_err = 'An error occurred (NoSuchKey) when calling the ' \ 'GetObject operation: The specified key does not exist.' self.assertEqual(expected_err, str(caught.exception)) # delete marker IsLatest resp = self.client.list_object_versions(Bucket=self.bucket_name) delete_markers = resp.get('DeleteMarkers', []) for marker in delete_markers: marker.pop('LastModified') marker.pop('Owner') self.assertEqual([{ 'Key': obj_name, 'IsLatest': True, 'VersionId': marker_versions[1], }], delete_markers) def test_multipart_upload(self): obj_name = self.create_name('versioned-obj') obj_data = b'data' mu = self.client.create_multipart_upload( Bucket=self.bucket_name, Key=obj_name) part_md5 = self.client.upload_part( Bucket=self.bucket_name, Key=obj_name, UploadId=mu['UploadId'], PartNumber=1, Body=obj_data)['ETag'] complete_response = self.client.complete_multipart_upload( Bucket=self.bucket_name, Key=obj_name, UploadId=mu['UploadId'], MultipartUpload={'Parts': [ {'PartNumber': 1, 'ETag': part_md5}, ]}) obj_etag = complete_response['ETag'] delete_response = self.client.delete_object( Bucket=self.bucket_name, Key=obj_name) marker_version_id = delete_response['VersionId'] resp = self.client.list_object_versions(Bucket=self.bucket_name) objs = resp.get('Versions', []) versions = [] for obj in objs: obj.pop('LastModified') obj.pop('Owner') versions.append(obj.pop('VersionId')) self.assertEqual([{ 'ETag': obj_etag, 'IsLatest': False, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }], objs) markers = resp.get('DeleteMarkers', []) for marker in markers: marker.pop('LastModified') marker.pop('Owner') self.assertEqual([{ 'IsLatest': True, 'Key': obj_name, 'VersionId': marker_version_id, }], markers) # Can still get the old version resp = self.client.get_object( Bucket=self.bucket_name, Key=obj_name, VersionId=versions[0]) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) self.assertEqual(obj_etag, resp['ETag']) delete_response = self.client.delete_object( Bucket=self.bucket_name, Key=obj_name, VersionId=versions[0]) resp = self.client.list_object_versions(Bucket=self.bucket_name) self.assertEqual([], resp.get('Versions', [])) markers = resp.get('DeleteMarkers', []) for marker in markers: marker.pop('LastModified') marker.pop('Owner') self.assertEqual([{ 'IsLatest': True, 'Key': obj_name, 'VersionId': marker_version_id, }], markers) def test_get_versioned_object(self): etags = [] obj_name = self.create_name('versioned-obj') for i in range(3): obj_data = self.create_name('some-data-%s' % i).encode('ascii') # TODO: pull etag from response instead etags.insert(0, hashlib.md5(obj_data).hexdigest()) self.client.upload_fileobj( six.BytesIO(obj_data), self.bucket_name, obj_name) resp = self.client.list_object_versions(Bucket=self.bucket_name) objs = resp.get('Versions', []) versions = [] for obj in objs: obj.pop('LastModified') obj.pop('Owner') versions.append(obj.pop('VersionId')) self.assertEqual([{ 'ETag': '"%s"' % etags[0], 'IsLatest': True, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }, { 'ETag': '"%s"' % etags[1], 'IsLatest': False, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }, { 'ETag': '"%s"' % etags[2], 'IsLatest': False, 'Key': obj_name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }], objs) # un-versioned get_object returns IsLatest resp = self.client.get_object(Bucket=self.bucket_name, Key=obj_name) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) self.assertEqual('"%s"' % etags[0], resp['ETag']) # but you can get any object by version for i, version in enumerate(versions): resp = self.client.get_object( Bucket=self.bucket_name, Key=obj_name, VersionId=version) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) self.assertEqual('"%s"' % etags[i], resp['ETag']) # and head_object works about the same resp = self.client.head_object(Bucket=self.bucket_name, Key=obj_name) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) self.assertEqual('"%s"' % etags[0], resp['ETag']) self.assertEqual(versions[0], resp['VersionId']) for version, etag in zip(versions, etags): resp = self.client.head_object( Bucket=self.bucket_name, Key=obj_name, VersionId=version) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) self.assertEqual(version, resp['VersionId']) self.assertEqual('"%s"' % etag, resp['ETag']) def test_get_versioned_object_invalid_params(self): with self.assertRaises(ClientError) as ctx: self.client.list_object_versions(Bucket=self.bucket_name, KeyMarker='', VersionIdMarker='bogus') expected_err = 'An error occurred (InvalidArgument) when calling ' \ 'the ListObjectVersions operation: Invalid version id specified' self.assertEqual(expected_err, str(ctx.exception)) with self.assertRaises(ClientError) as ctx: self.client.list_object_versions( Bucket=self.bucket_name, VersionIdMarker='a' * 32) expected_err = 'An error occurred (InvalidArgument) when calling ' \ 'the ListObjectVersions operation: A version-id marker cannot ' \ 'be specified without a key marker.' self.assertEqual(expected_err, str(ctx.exception)) def test_get_versioned_object_key_marker(self): obj00_name = self.create_name('00-versioned-obj') obj01_name = self.create_name('01-versioned-obj') names = [obj00_name] * 3 + [obj01_name] * 3 latest = [True, False, False, True, False, False] etags = [] for i in range(3): obj_data = self.create_name('some-data-%s' % i).encode('ascii') etags.insert(0, '"%s"' % hashlib.md5(obj_data).hexdigest()) self.client.upload_fileobj( six.BytesIO(obj_data), self.bucket_name, obj01_name) for i in range(3): obj_data = self.create_name('some-data-%s' % i).encode('ascii') etags.insert(0, '"%s"' % hashlib.md5(obj_data).hexdigest()) self.client.upload_fileobj( six.BytesIO(obj_data), self.bucket_name, obj00_name) resp = self.client.list_object_versions(Bucket=self.bucket_name) versions = [] objs = [] for o in resp.get('Versions', []): versions.append(o['VersionId']) objs.append({ 'Key': o['Key'], 'VersionId': o['VersionId'], 'IsLatest': o['IsLatest'], 'ETag': o['ETag'], }) expected = [{ 'Key': name, 'VersionId': version, 'IsLatest': is_latest, 'ETag': etag, } for name, etag, version, is_latest in zip( names, etags, versions, latest)] self.assertEqual(expected, objs) # on s3 this makes expected[0]['IsLatest'] magicaly change to False? # resp = self.client.list_object_versions(Bucket=self.bucket_name, # KeyMarker='', # VersionIdMarker=versions[0]) # objs = [{ # 'Key': o['Key'], # 'VersionId': o['VersionId'], # 'IsLatest': o['IsLatest'], # 'ETag': o['ETag'], # } for o in resp.get('Versions', [])] # self.assertEqual(expected, objs) # KeyMarker skips past that key resp = self.client.list_object_versions(Bucket=self.bucket_name, KeyMarker=obj00_name) objs = [{ 'Key': o['Key'], 'VersionId': o['VersionId'], 'IsLatest': o['IsLatest'], 'ETag': o['ETag'], } for o in resp.get('Versions', [])] self.assertEqual(expected[3:], objs) # KeyMarker with VersionIdMarker skips past that version resp = self.client.list_object_versions(Bucket=self.bucket_name, KeyMarker=obj00_name, VersionIdMarker=versions[0]) objs = [{ 'Key': o['Key'], 'VersionId': o['VersionId'], 'IsLatest': o['IsLatest'], 'ETag': o['ETag'], } for o in resp.get('Versions', [])] self.assertEqual(expected[1:], objs) # KeyMarker with bogus version skips past that key resp = self.client.list_object_versions( Bucket=self.bucket_name, KeyMarker=obj00_name, VersionIdMarker=versions[4]) objs = [{ 'Key': o['Key'], 'VersionId': o['VersionId'], 'IsLatest': o['IsLatest'], 'ETag': o['ETag'], } for o in resp.get('Versions', [])] self.assertEqual(expected[3:], objs) def test_list_objects(self): etags = defaultdict(list) for i in range(3): obj_name = self.create_name('versioned-obj') for i in range(3): obj_data = self.create_name('some-data-%s' % i).encode('ascii') etags[obj_name].insert(0, hashlib.md5(obj_data).hexdigest()) self.client.upload_fileobj( six.BytesIO(obj_data), self.bucket_name, obj_name) # both unversioned list_objects responses are similar expected = [] for name, obj_etags in sorted(etags.items()): expected.append({ 'ETag': '"%s"' % obj_etags[0], 'Key': name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }) resp = self.client.list_objects(Bucket=self.bucket_name) objs = resp.get('Contents', []) for obj in objs: obj.pop('LastModified') # one difference seems to be the Owner key self.assertEqual({'DisplayName', 'ID'}, set(obj.pop('Owner').keys())) self.assertEqual(expected, objs) resp = self.client.list_objects_v2(Bucket=self.bucket_name) objs = resp.get('Contents', []) for obj in objs: obj.pop('LastModified') self.assertEqual(expected, objs) # versioned listings has something for everyone expected = [] for name, obj_etags in sorted(etags.items()): is_latest = True for etag in obj_etags: expected.append({ 'ETag': '"%s"' % etag, 'IsLatest': is_latest, 'Key': name, 'Size': len(obj_data), 'StorageClass': 'STANDARD', }) is_latest = False resp = self.client.list_object_versions(Bucket=self.bucket_name) objs = resp.get('Versions', []) versions = [] for obj in objs: obj.pop('LastModified') obj.pop('Owner') versions.append(obj.pop('VersionId')) self.assertEqual(expected, objs) def test_copy_object(self): etags = [] obj_name = self.create_name('versioned-obj') for i in range(3): obj_data = self.create_name('some-data-%s' % i).encode('ascii') etags.insert(0, hashlib.md5(obj_data).hexdigest()) self.client.upload_fileobj( six.BytesIO(obj_data), self.bucket_name, obj_name) resp = self.client.list_object_versions(Bucket=self.bucket_name) objs = resp.get('Versions', []) versions = [] for obj in objs: versions.append(obj.pop('VersionId')) # CopySource can just be Bucket/Key string first_target = self.create_name('target-obj1') copy_resp = self.client.copy_object( Bucket=self.bucket_name, Key=first_target, CopySource='%s/%s' % (self.bucket_name, obj_name)) self.assertEqual(versions[0], copy_resp['CopySourceVersionId']) # and you'll just get the most recent version resp = self.client.head_object(Bucket=self.bucket_name, Key=first_target) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) self.assertEqual('"%s"' % etags[0], resp['ETag']) # or you can be more explicit explicit_target = self.create_name('target-%s' % versions[0]) copy_source = {'Bucket': self.bucket_name, 'Key': obj_name, 'VersionId': versions[0]} copy_resp = self.client.copy_object( Bucket=self.bucket_name, Key=explicit_target, CopySource=copy_source) self.assertEqual(versions[0], copy_resp['CopySourceVersionId']) # and you still get the same thing resp = self.client.head_object(Bucket=self.bucket_name, Key=explicit_target) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) self.assertEqual('"%s"' % etags[0], resp['ETag']) # but you can also copy from a specific version version_target = self.create_name('target-%s' % versions[2]) copy_source['VersionId'] = versions[2] copy_resp = self.client.copy_object( Bucket=self.bucket_name, Key=version_target, CopySource=copy_source) self.assertEqual(versions[2], copy_resp['CopySourceVersionId']) resp = self.client.head_object(Bucket=self.bucket_name, Key=version_target) self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode']) self.assertEqual('"%s"' % etags[2], resp['ETag'])