swift/test/s3api/test_versioning.py
karen chan 6097660f0c s3api: Implement object versioning API
Translate AWS S3 Object Versioning API requests to native Swift Object
Versioning API, speficially:

 * bucket versioning status
 * bucket versioned objects listing params
 * object GETorHEAD & DELETE versionId
 * multi_delete versionId

Change-Id: I8296681b61996e073b3ba12ad46f99042dc15c37
Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
2020-01-28 14:00:08 -08:00

759 lines
29 KiB
Python

# Copyright (c) 2019 SwiftStack, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import hashlib
from collections import defaultdict
from botocore.exceptions import ClientError
import six
from swift.common.header_key_dict import HeaderKeyDict
from test.s3api import BaseS3TestCase
def retry(f, timeout=10):
timelimit = time.time() + timeout
while True:
try:
f()
except (ClientError, AssertionError):
if time.time() > timelimit:
raise
continue
else:
break
class TestObjectVersioning(BaseS3TestCase):
maxDiff = None
def setUp(self):
self.client = self.get_s3_client(1)
self.bucket_name = self.create_name('versioning')
resp = self.client.create_bucket(Bucket=self.bucket_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
def enable_versioning():
resp = self.client.put_bucket_versioning(
Bucket=self.bucket_name,
VersioningConfiguration={'Status': 'Enabled'})
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
retry(enable_versioning)
def tearDown(self):
resp = self.client.put_bucket_versioning(
Bucket=self.bucket_name,
VersioningConfiguration={'Status': 'Suspended'})
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.clear_bucket(self.client, self.bucket_name)
super(TestObjectVersioning, self).tearDown()
def test_setup(self):
bucket_name = self.create_name('new-bucket')
resp = self.client.create_bucket(Bucket=bucket_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
expected_location = '/%s' % bucket_name
self.assertEqual(expected_location, resp['Location'])
headers = HeaderKeyDict(resp['ResponseMetadata']['HTTPHeaders'])
self.assertEqual('0', headers['content-length'])
self.assertEqual(expected_location, headers['location'])
# get versioning
resp = self.client.get_bucket_versioning(Bucket=bucket_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertNotIn('Status', resp)
# put versioning
versioning_config = {
'Status': 'Enabled',
}
resp = self.client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration=versioning_config)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
# ... now it's enabled
def check_status():
resp = self.client.get_bucket_versioning(Bucket=bucket_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
try:
self.assertEqual('Enabled', resp['Status'])
except KeyError:
self.fail('Status was not in %r' % resp)
retry(check_status)
# send over some bogus junk
versioning_config['Status'] = 'Disabled'
with self.assertRaises(ClientError) as ctx:
self.client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration=versioning_config)
expected_err = 'An error occurred (MalformedXML) when calling the ' \
'PutBucketVersioning operation: The XML you provided was ' \
'not well-formed or did not validate against our published schema'
self.assertEqual(expected_err, str(ctx.exception))
# disable it
versioning_config['Status'] = 'Suspended'
resp = self.client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration=versioning_config)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
# ... now it's disabled again
def check_status():
resp = self.client.get_bucket_versioning(Bucket=bucket_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('Suspended', resp['Status'])
retry(check_status)
def test_upload_fileobj_versioned(self):
obj_data = self.create_name('some-data').encode('ascii')
obj_etag = hashlib.md5(obj_data).hexdigest()
obj_name = self.create_name('versioned-obj')
self.client.upload_fileobj(six.BytesIO(obj_data),
self.bucket_name, obj_name)
# object is in the listing
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
self.assertEqual([{
'ETag': '"%s"' % obj_etag,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# object version listing
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
obj.pop('VersionId')
self.assertEqual([{
'ETag': '"%s"' % obj_etag,
'IsLatest': True,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# overwrite the object
new_obj_data = self.create_name('some-new-data').encode('ascii')
new_obj_etag = hashlib.md5(new_obj_data).hexdigest()
self.client.upload_fileobj(six.BytesIO(new_obj_data),
self.bucket_name, obj_name)
# new object is in the listing
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
self.assertEqual([{
'ETag': '"%s"' % new_obj_etag,
'Key': obj_name,
'Size': len(new_obj_data),
'StorageClass': 'STANDARD',
}], objs)
# both object versions in the versions listing
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
obj.pop('VersionId')
self.assertEqual([{
'ETag': '"%s"' % new_obj_etag,
'IsLatest': True,
'Key': obj_name,
'Size': len(new_obj_data),
'StorageClass': 'STANDARD',
}, {
'ETag': '"%s"' % obj_etag,
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
def test_delete_versioned_objects(self):
etags = []
obj_name = self.create_name('versioned-obj')
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
etags.insert(0, hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(six.BytesIO(obj_data),
self.bucket_name, obj_name)
# only one object appears in the listing
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
self.assertEqual([{
'ETag': '"%s"' % etags[0],
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# but everything is layed out in the object versions listing
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
versions = []
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
versions.append(obj.pop('VersionId'))
self.assertEqual([{
'ETag': '"%s"' % etags[0],
'IsLatest': True,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}, {
'ETag': '"%s"' % etags[1],
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}, {
'ETag': '"%s"' % etags[2],
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# we can delete a specific version
resp = self.client.delete_object(Bucket=self.bucket_name,
Key=obj_name,
VersionId=versions[1])
# and that just pulls it out of the versions listing
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
obj.pop('VersionId')
self.assertEqual([{
'ETag': '"%s"' % etags[0],
'IsLatest': True,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}, {
'ETag': '"%s"' % etags[2],
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# ... but the current listing is unaffected
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
self.assertEqual([{
'ETag': '"%s"' % etags[0],
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# OTOH, if you delete specifically the latest version
# we can delete a specific version
resp = self.client.delete_object(Bucket=self.bucket_name,
Key=obj_name,
VersionId=versions[0])
# the versions listing has a new IsLatest
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
obj.pop('VersionId')
self.assertEqual([{
'ETag': '"%s"' % etags[2],
'IsLatest': True,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# and the stack pops
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
self.assertEqual([{
'ETag': '"%s"' % etags[2],
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
def test_delete_versioned_deletes(self):
etags = []
obj_name = self.create_name('versioned-obj')
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
etags.insert(0, hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(six.BytesIO(obj_data),
self.bucket_name, obj_name)
# and make a delete marker
self.client.delete_object(Bucket=self.bucket_name, Key=obj_name)
# current listing is empty
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
self.assertEqual([], objs)
# but everything is in layed out in the versions listing
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
versions = []
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
versions.append(obj.pop('VersionId'))
self.assertEqual([{
'ETag': '"%s"' % etag,
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
} for etag in etags], objs)
# ... plus the delete markers
delete_markers = resp.get('DeleteMarkers', [])
marker_versions = []
for marker in delete_markers:
marker.pop('LastModified')
marker.pop('Owner')
marker_versions.append(marker.pop('VersionId'))
self.assertEqual([{
'Key': obj_name,
'IsLatest': is_latest,
} for is_latest in (True, False, False)], delete_markers)
# delete an old delete markers
resp = self.client.delete_object(Bucket=self.bucket_name,
Key=obj_name,
VersionId=marker_versions[2])
# since IsLatest is still marker we'll raise NoSuchKey
with self.assertRaises(ClientError) as caught:
resp = self.client.get_object(Bucket=self.bucket_name,
Key=obj_name)
expected_err = 'An error occurred (NoSuchKey) when calling the ' \
'GetObject operation: The specified key does not exist.'
self.assertEqual(expected_err, str(caught.exception))
# now delete the delete marker (IsLatest)
resp = self.client.delete_object(Bucket=self.bucket_name,
Key=obj_name,
VersionId=marker_versions[0])
# most recent version is now latest
resp = self.client.get_object(Bucket=self.bucket_name,
Key=obj_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[0], resp['ETag'])
# now delete the IsLatest object version
resp = self.client.delete_object(Bucket=self.bucket_name,
Key=obj_name,
VersionId=versions[0])
# and object is deleted again
with self.assertRaises(ClientError) as caught:
resp = self.client.get_object(Bucket=self.bucket_name,
Key=obj_name)
expected_err = 'An error occurred (NoSuchKey) when calling the ' \
'GetObject operation: The specified key does not exist.'
self.assertEqual(expected_err, str(caught.exception))
# delete marker IsLatest
resp = self.client.list_object_versions(Bucket=self.bucket_name)
delete_markers = resp.get('DeleteMarkers', [])
for marker in delete_markers:
marker.pop('LastModified')
marker.pop('Owner')
self.assertEqual([{
'Key': obj_name,
'IsLatest': True,
'VersionId': marker_versions[1],
}], delete_markers)
def test_multipart_upload(self):
obj_name = self.create_name('versioned-obj')
obj_data = b'data'
mu = self.client.create_multipart_upload(
Bucket=self.bucket_name,
Key=obj_name)
part_md5 = self.client.upload_part(
Bucket=self.bucket_name,
Key=obj_name,
UploadId=mu['UploadId'],
PartNumber=1,
Body=obj_data)['ETag']
complete_response = self.client.complete_multipart_upload(
Bucket=self.bucket_name,
Key=obj_name,
UploadId=mu['UploadId'],
MultipartUpload={'Parts': [
{'PartNumber': 1, 'ETag': part_md5},
]})
obj_etag = complete_response['ETag']
delete_response = self.client.delete_object(
Bucket=self.bucket_name,
Key=obj_name)
marker_version_id = delete_response['VersionId']
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
versions = []
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
versions.append(obj.pop('VersionId'))
self.assertEqual([{
'ETag': obj_etag,
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
markers = resp.get('DeleteMarkers', [])
for marker in markers:
marker.pop('LastModified')
marker.pop('Owner')
self.assertEqual([{
'IsLatest': True,
'Key': obj_name,
'VersionId': marker_version_id,
}], markers)
# Can still get the old version
resp = self.client.get_object(
Bucket=self.bucket_name,
Key=obj_name,
VersionId=versions[0])
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual(obj_etag, resp['ETag'])
delete_response = self.client.delete_object(
Bucket=self.bucket_name,
Key=obj_name,
VersionId=versions[0])
resp = self.client.list_object_versions(Bucket=self.bucket_name)
self.assertEqual([], resp.get('Versions', []))
markers = resp.get('DeleteMarkers', [])
for marker in markers:
marker.pop('LastModified')
marker.pop('Owner')
self.assertEqual([{
'IsLatest': True,
'Key': obj_name,
'VersionId': marker_version_id,
}], markers)
def test_get_versioned_object(self):
etags = []
obj_name = self.create_name('versioned-obj')
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
# TODO: pull etag from response instead
etags.insert(0, hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(
six.BytesIO(obj_data), self.bucket_name, obj_name)
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
versions = []
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
versions.append(obj.pop('VersionId'))
self.assertEqual([{
'ETag': '"%s"' % etags[0],
'IsLatest': True,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}, {
'ETag': '"%s"' % etags[1],
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}, {
'ETag': '"%s"' % etags[2],
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# un-versioned get_object returns IsLatest
resp = self.client.get_object(Bucket=self.bucket_name, Key=obj_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[0], resp['ETag'])
# but you can get any object by version
for i, version in enumerate(versions):
resp = self.client.get_object(
Bucket=self.bucket_name, Key=obj_name, VersionId=version)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[i], resp['ETag'])
# and head_object works about the same
resp = self.client.head_object(Bucket=self.bucket_name, Key=obj_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[0], resp['ETag'])
self.assertEqual(versions[0], resp['VersionId'])
for version, etag in zip(versions, etags):
resp = self.client.head_object(
Bucket=self.bucket_name, Key=obj_name, VersionId=version)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual(version, resp['VersionId'])
self.assertEqual('"%s"' % etag, resp['ETag'])
def test_get_versioned_object_invalid_params(self):
with self.assertRaises(ClientError) as ctx:
self.client.list_object_versions(Bucket=self.bucket_name,
KeyMarker='',
VersionIdMarker='bogus')
expected_err = 'An error occurred (InvalidArgument) when calling ' \
'the ListObjectVersions operation: Invalid version id specified'
self.assertEqual(expected_err, str(ctx.exception))
with self.assertRaises(ClientError) as ctx:
self.client.list_object_versions(
Bucket=self.bucket_name,
VersionIdMarker='a' * 32)
expected_err = 'An error occurred (InvalidArgument) when calling ' \
'the ListObjectVersions operation: A version-id marker cannot ' \
'be specified without a key marker.'
self.assertEqual(expected_err, str(ctx.exception))
def test_get_versioned_object_key_marker(self):
obj00_name = self.create_name('00-versioned-obj')
obj01_name = self.create_name('01-versioned-obj')
names = [obj00_name] * 3 + [obj01_name] * 3
latest = [True, False, False, True, False, False]
etags = []
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
etags.insert(0, '"%s"' % hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(
six.BytesIO(obj_data), self.bucket_name, obj01_name)
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
etags.insert(0, '"%s"' % hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(
six.BytesIO(obj_data), self.bucket_name, obj00_name)
resp = self.client.list_object_versions(Bucket=self.bucket_name)
versions = []
objs = []
for o in resp.get('Versions', []):
versions.append(o['VersionId'])
objs.append({
'Key': o['Key'],
'VersionId': o['VersionId'],
'IsLatest': o['IsLatest'],
'ETag': o['ETag'],
})
expected = [{
'Key': name,
'VersionId': version,
'IsLatest': is_latest,
'ETag': etag,
} for name, etag, version, is_latest in zip(
names, etags, versions, latest)]
self.assertEqual(expected, objs)
# on s3 this makes expected[0]['IsLatest'] magicaly change to False?
# resp = self.client.list_object_versions(Bucket=self.bucket_name,
# KeyMarker='',
# VersionIdMarker=versions[0])
# objs = [{
# 'Key': o['Key'],
# 'VersionId': o['VersionId'],
# 'IsLatest': o['IsLatest'],
# 'ETag': o['ETag'],
# } for o in resp.get('Versions', [])]
# self.assertEqual(expected, objs)
# KeyMarker skips past that key
resp = self.client.list_object_versions(Bucket=self.bucket_name,
KeyMarker=obj00_name)
objs = [{
'Key': o['Key'],
'VersionId': o['VersionId'],
'IsLatest': o['IsLatest'],
'ETag': o['ETag'],
} for o in resp.get('Versions', [])]
self.assertEqual(expected[3:], objs)
# KeyMarker with VersionIdMarker skips past that version
resp = self.client.list_object_versions(Bucket=self.bucket_name,
KeyMarker=obj00_name,
VersionIdMarker=versions[0])
objs = [{
'Key': o['Key'],
'VersionId': o['VersionId'],
'IsLatest': o['IsLatest'],
'ETag': o['ETag'],
} for o in resp.get('Versions', [])]
self.assertEqual(expected[1:], objs)
# KeyMarker with bogus version skips past that key
resp = self.client.list_object_versions(
Bucket=self.bucket_name,
KeyMarker=obj00_name,
VersionIdMarker=versions[4])
objs = [{
'Key': o['Key'],
'VersionId': o['VersionId'],
'IsLatest': o['IsLatest'],
'ETag': o['ETag'],
} for o in resp.get('Versions', [])]
self.assertEqual(expected[3:], objs)
def test_list_objects(self):
etags = defaultdict(list)
for i in range(3):
obj_name = self.create_name('versioned-obj')
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
etags[obj_name].insert(0, hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(
six.BytesIO(obj_data), self.bucket_name, obj_name)
# both unversioned list_objects responses are similar
expected = []
for name, obj_etags in sorted(etags.items()):
expected.append({
'ETag': '"%s"' % obj_etags[0],
'Key': name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
})
resp = self.client.list_objects(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
# one difference seems to be the Owner key
self.assertEqual({'DisplayName', 'ID'},
set(obj.pop('Owner').keys()))
self.assertEqual(expected, objs)
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
self.assertEqual(expected, objs)
# versioned listings has something for everyone
expected = []
for name, obj_etags in sorted(etags.items()):
is_latest = True
for etag in obj_etags:
expected.append({
'ETag': '"%s"' % etag,
'IsLatest': is_latest,
'Key': name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
})
is_latest = False
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
versions = []
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
versions.append(obj.pop('VersionId'))
self.assertEqual(expected, objs)
def test_copy_object(self):
etags = []
obj_name = self.create_name('versioned-obj')
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
etags.insert(0, hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(
six.BytesIO(obj_data), self.bucket_name, obj_name)
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
versions = []
for obj in objs:
versions.append(obj.pop('VersionId'))
# CopySource can just be Bucket/Key string
first_target = self.create_name('target-obj1')
copy_resp = self.client.copy_object(
Bucket=self.bucket_name, Key=first_target,
CopySource='%s/%s' % (self.bucket_name, obj_name))
self.assertEqual(versions[0], copy_resp['CopySourceVersionId'])
# and you'll just get the most recent version
resp = self.client.head_object(Bucket=self.bucket_name,
Key=first_target)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[0], resp['ETag'])
# or you can be more explicit
explicit_target = self.create_name('target-%s' % versions[0])
copy_source = {'Bucket': self.bucket_name, 'Key': obj_name,
'VersionId': versions[0]}
copy_resp = self.client.copy_object(
Bucket=self.bucket_name, Key=explicit_target,
CopySource=copy_source)
self.assertEqual(versions[0], copy_resp['CopySourceVersionId'])
# and you still get the same thing
resp = self.client.head_object(Bucket=self.bucket_name,
Key=explicit_target)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[0], resp['ETag'])
# but you can also copy from a specific version
version_target = self.create_name('target-%s' % versions[2])
copy_source['VersionId'] = versions[2]
copy_resp = self.client.copy_object(
Bucket=self.bucket_name, Key=version_target,
CopySource=copy_source)
self.assertEqual(versions[2], copy_resp['CopySourceVersionId'])
resp = self.client.head_object(Bucket=self.bucket_name,
Key=version_target)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[2], resp['ETag'])