f05119c16f
test_bucket.py is proving somewhat problematic. Change-Id: I5b337ef66a23fc989762801dd6a5ba1ed903f57b
238 lines
9.6 KiB
Python
238 lines
9.6 KiB
Python
# Copyright (c) 2016 SwiftStack, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import os
|
|
|
|
import requests
|
|
|
|
from swift.common.middleware.s3api.etree import fromstring
|
|
|
|
import test.functional as tf
|
|
|
|
from test.functional.s3api import S3ApiBase
|
|
from test.functional.s3api.utils import get_error_code, get_error_msg
|
|
|
|
|
|
def setUpModule():
|
|
tf.setup_package()
|
|
|
|
|
|
def tearDownModule():
|
|
tf.teardown_package()
|
|
|
|
|
|
class TestS3ApiPresignedUrls(S3ApiBase):
|
|
def test_bucket(self):
|
|
bucket = 'test-bucket'
|
|
req_objects = ('object', 'object2')
|
|
max_bucket_listing = tf.cluster_info['s3api'].get(
|
|
'max_bucket_listing', 1000)
|
|
|
|
# GET Bucket (Without Object)
|
|
status, _junk, _junk = self.conn.make_request('PUT', bucket)
|
|
self.assertEqual(status, 200)
|
|
|
|
url, headers = self.conn.generate_url_and_headers('GET', bucket)
|
|
resp = requests.get(url, headers=headers)
|
|
self.assertEqual(resp.status_code, 200,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
self.assertCommonResponseHeaders(resp.headers)
|
|
self.assertIsNotNone(resp.headers['content-type'])
|
|
self.assertEqual(resp.headers['content-length'],
|
|
str(len(resp.content)))
|
|
|
|
elem = fromstring(resp.content, 'ListBucketResult')
|
|
self.assertEqual(elem.find('Name').text, bucket)
|
|
self.assertIsNone(elem.find('Prefix').text)
|
|
self.assertIsNone(elem.find('Marker').text)
|
|
self.assertEqual(elem.find('MaxKeys').text,
|
|
str(max_bucket_listing))
|
|
self.assertEqual(elem.find('IsTruncated').text, 'false')
|
|
objects = elem.findall('./Contents')
|
|
self.assertEqual(list(objects), [])
|
|
|
|
# GET Bucket (With Object)
|
|
for obj in req_objects:
|
|
status, _junk, _junk = self.conn.make_request('PUT', bucket, obj)
|
|
self.assertEqual(
|
|
status, 200,
|
|
'Got %d response while creating %s' % (status, obj))
|
|
|
|
resp = requests.get(url, headers=headers)
|
|
self.assertEqual(resp.status_code, 200,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
self.assertCommonResponseHeaders(resp.headers)
|
|
self.assertIsNotNone(resp.headers['content-type'])
|
|
self.assertEqual(resp.headers['content-length'],
|
|
str(len(resp.content)))
|
|
|
|
elem = fromstring(resp.content, 'ListBucketResult')
|
|
self.assertEqual(elem.find('Name').text, bucket)
|
|
self.assertIsNone(elem.find('Prefix').text)
|
|
self.assertIsNone(elem.find('Marker').text)
|
|
self.assertEqual(elem.find('MaxKeys').text,
|
|
str(max_bucket_listing))
|
|
self.assertEqual(elem.find('IsTruncated').text, 'false')
|
|
resp_objects = elem.findall('./Contents')
|
|
self.assertEqual(len(list(resp_objects)), 2)
|
|
for o in resp_objects:
|
|
self.assertIn(o.find('Key').text, req_objects)
|
|
self.assertIsNotNone(o.find('LastModified').text)
|
|
self.assertRegexpMatches(
|
|
o.find('LastModified').text,
|
|
r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$')
|
|
self.assertIsNotNone(o.find('ETag').text)
|
|
self.assertEqual(o.find('Size').text, '0')
|
|
self.assertIsNotNone(o.find('StorageClass').text is not None)
|
|
self.assertEqual(o.find('Owner/ID').text, self.conn.user_id)
|
|
self.assertEqual(o.find('Owner/DisplayName').text,
|
|
self.conn.user_id)
|
|
# DELETE Bucket
|
|
for obj in req_objects:
|
|
self.conn.make_request('DELETE', bucket, obj)
|
|
url, headers = self.conn.generate_url_and_headers('DELETE', bucket)
|
|
resp = requests.delete(url, headers=headers)
|
|
self.assertEqual(resp.status_code, 204,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
|
|
def test_expiration_limits(self):
|
|
if os.environ.get('S3_USE_SIGV4'):
|
|
self._test_expiration_limits_v4()
|
|
else:
|
|
self._test_expiration_limits_v2()
|
|
|
|
def _test_expiration_limits_v2(self):
|
|
bucket = 'test-bucket'
|
|
|
|
# Expiration date is too far in the future
|
|
url, headers = self.conn.generate_url_and_headers(
|
|
'GET', bucket, expires_in=2 ** 32)
|
|
resp = requests.get(url, headers=headers)
|
|
self.assertEqual(resp.status_code, 403,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
self.assertEqual(get_error_code(resp.content),
|
|
'AccessDenied')
|
|
self.assertIn('Invalid date (should be seconds since epoch)',
|
|
get_error_msg(resp.content))
|
|
|
|
def _test_expiration_limits_v4(self):
|
|
bucket = 'test-bucket'
|
|
|
|
# Expiration is negative
|
|
url, headers = self.conn.generate_url_and_headers(
|
|
'GET', bucket, expires_in=-1)
|
|
resp = requests.get(url, headers=headers)
|
|
self.assertEqual(resp.status_code, 400,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
self.assertEqual(get_error_code(resp.content),
|
|
'AuthorizationQueryParametersError')
|
|
self.assertIn('X-Amz-Expires must be non-negative',
|
|
get_error_msg(resp.content))
|
|
|
|
# Expiration date is too far in the future
|
|
for exp in (7 * 24 * 60 * 60 + 1,
|
|
2 ** 63 - 1):
|
|
url, headers = self.conn.generate_url_and_headers(
|
|
'GET', bucket, expires_in=exp)
|
|
resp = requests.get(url, headers=headers)
|
|
self.assertEqual(resp.status_code, 400,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
self.assertEqual(get_error_code(resp.content),
|
|
'AuthorizationQueryParametersError')
|
|
self.assertIn('X-Amz-Expires must be less than 604800 seconds',
|
|
get_error_msg(resp.content))
|
|
|
|
# Expiration date is *way* too far in the future, or isn't a number
|
|
for exp in (2 ** 63, 'foo'):
|
|
url, headers = self.conn.generate_url_and_headers(
|
|
'GET', bucket, expires_in=2 ** 63)
|
|
resp = requests.get(url, headers=headers)
|
|
self.assertEqual(resp.status_code, 400,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
self.assertEqual(get_error_code(resp.content),
|
|
'AuthorizationQueryParametersError')
|
|
self.assertEqual('X-Amz-Expires should be a number',
|
|
get_error_msg(resp.content))
|
|
|
|
def test_object(self):
|
|
bucket = 'test-bucket'
|
|
obj = 'object'
|
|
|
|
status, _junk, _junk = self.conn.make_request('PUT', bucket)
|
|
self.assertEqual(status, 200)
|
|
|
|
# HEAD/missing object
|
|
head_url, headers = self.conn.generate_url_and_headers(
|
|
'HEAD', bucket, obj)
|
|
resp = requests.head(head_url, headers=headers)
|
|
self.assertEqual(resp.status_code, 404,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
|
|
# Wrong verb
|
|
resp = requests.get(head_url)
|
|
self.assertEqual(resp.status_code, 403,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
self.assertEqual(get_error_code(resp.content),
|
|
'SignatureDoesNotMatch')
|
|
|
|
# PUT empty object
|
|
put_url, headers = self.conn.generate_url_and_headers(
|
|
'PUT', bucket, obj)
|
|
resp = requests.put(put_url, data=b'', headers=headers)
|
|
self.assertEqual(resp.status_code, 200,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
# GET empty object
|
|
get_url, headers = self.conn.generate_url_and_headers(
|
|
'GET', bucket, obj)
|
|
resp = requests.get(get_url, headers=headers)
|
|
self.assertEqual(resp.status_code, 200,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
self.assertEqual(resp.content, b'')
|
|
|
|
# PUT over object
|
|
resp = requests.put(put_url, data=b'foobar', headers=headers)
|
|
self.assertEqual(resp.status_code, 200,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
|
|
# GET non-empty object
|
|
resp = requests.get(get_url, headers=headers)
|
|
self.assertEqual(resp.status_code, 200,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
self.assertEqual(resp.content, b'foobar')
|
|
|
|
# DELETE Object
|
|
delete_url, headers = self.conn.generate_url_and_headers(
|
|
'DELETE', bucket, obj)
|
|
resp = requests.delete(delete_url, headers=headers)
|
|
self.assertEqual(resp.status_code, 204,
|
|
'Got %d %s' % (resp.status_code, resp.content))
|
|
|
|
# Final cleanup
|
|
status, _junk, _junk = self.conn.make_request('DELETE', bucket)
|
|
self.assertEqual(status, 204)
|
|
|
|
|
|
class TestS3ApiPresignedUrlsSigV4(TestS3ApiPresignedUrls):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
os.environ['S3_USE_SIGV4'] = "True"
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
del os.environ['S3_USE_SIGV4']
|
|
|
|
def setUp(self):
|
|
super(TestS3ApiPresignedUrlsSigV4, self).setUp()
|