5a8cfd6e06
Previously we'd use two users, one admin and one unprivileged. Ceph's s3-tests, however, assume that both users should have access to create buckets. Further, there are different errors that may be returned depending on whether you are the *bucket* owner or not when using s3_acl. So now we've got: test:tester1 (admin) test:tester2 (also admin) test:tester3 (unprivileged) Change-Id: I0b67c53de3bcadc2c656d86131fca5f2c3114f14
158 lines
6.1 KiB
Python
158 lines
6.1 KiB
Python
# Copyright (c) 2015 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import unittest2
|
|
import os
|
|
import test.functional as tf
|
|
from swift.common.middleware.s3api.etree import fromstring
|
|
from test.functional.s3api import S3ApiBase
|
|
from test.functional.s3api.s3_test_client import Connection
|
|
from test.functional.s3api.utils import get_error_code
|
|
|
|
|
|
def setUpModule():
|
|
tf.setup_package()
|
|
|
|
|
|
def tearDownModule():
|
|
tf.teardown_package()
|
|
|
|
|
|
class TestS3Acl(S3ApiBase):
|
|
def setUp(self):
|
|
super(TestS3Acl, self).setUp()
|
|
self.bucket = 'bucket'
|
|
self.obj = 'object'
|
|
if 's3_access_key3' not in tf.config or \
|
|
's3_secret_key3' not in tf.config:
|
|
raise tf.SkipTest(
|
|
'TestS3Acl requires s3_access_key3 and s3_secret_key3 '
|
|
'configured for reduced-access user')
|
|
self.conn.make_request('PUT', self.bucket)
|
|
access_key3 = tf.config['s3_access_key3']
|
|
secret_key3 = tf.config['s3_secret_key3']
|
|
self.conn3 = Connection(access_key3, secret_key3, access_key3)
|
|
|
|
def test_acl(self):
|
|
self.conn.make_request('PUT', self.bucket, self.obj)
|
|
query = 'acl'
|
|
|
|
# PUT Bucket ACL
|
|
headers = {'x-amz-acl': 'public-read'}
|
|
status, headers, body = \
|
|
self.conn.make_request('PUT', self.bucket, headers=headers,
|
|
query=query)
|
|
self.assertEqual(status, 200)
|
|
self.assertCommonResponseHeaders(headers)
|
|
self.assertEqual(headers['content-length'], '0')
|
|
|
|
# GET Bucket ACL
|
|
status, headers, body = \
|
|
self.conn.make_request('GET', self.bucket, query=query)
|
|
self.assertEqual(status, 200)
|
|
self.assertCommonResponseHeaders(headers)
|
|
# TODO: Fix the response that last-modified must be in the response.
|
|
# self.assertTrue(headers['last-modified'] is not None)
|
|
self.assertEqual(headers['content-length'], str(len(body)))
|
|
self.assertTrue(headers['content-type'] is not None)
|
|
elem = fromstring(body, 'AccessControlPolicy')
|
|
owner = elem.find('Owner')
|
|
self.assertEqual(owner.find('ID').text, self.conn.user_id)
|
|
self.assertEqual(owner.find('DisplayName').text, self.conn.user_id)
|
|
acl = elem.find('AccessControlList')
|
|
self.assertTrue(acl.find('Grant') is not None)
|
|
|
|
# GET Object ACL
|
|
status, headers, body = \
|
|
self.conn.make_request('GET', self.bucket, self.obj, query=query)
|
|
self.assertEqual(status, 200)
|
|
self.assertCommonResponseHeaders(headers)
|
|
# TODO: Fix the response that last-modified must be in the response.
|
|
# self.assertTrue(headers['last-modified'] is not None)
|
|
self.assertEqual(headers['content-length'], str(len(body)))
|
|
self.assertTrue(headers['content-type'] is not None)
|
|
elem = fromstring(body, 'AccessControlPolicy')
|
|
owner = elem.find('Owner')
|
|
self.assertEqual(owner.find('ID').text, self.conn.user_id)
|
|
self.assertEqual(owner.find('DisplayName').text, self.conn.user_id)
|
|
acl = elem.find('AccessControlList')
|
|
self.assertTrue(acl.find('Grant') is not None)
|
|
|
|
def test_put_bucket_acl_error(self):
|
|
req_headers = {'x-amz-acl': 'public-read'}
|
|
aws_error_conn = Connection(aws_secret_key='invalid')
|
|
status, headers, body = \
|
|
aws_error_conn.make_request('PUT', self.bucket,
|
|
headers=req_headers, query='acl')
|
|
self.assertEqual(get_error_code(body), 'SignatureDoesNotMatch')
|
|
|
|
status, headers, body = \
|
|
self.conn.make_request('PUT', 'nothing',
|
|
headers=req_headers, query='acl')
|
|
self.assertEqual(get_error_code(body), 'NoSuchBucket')
|
|
|
|
status, headers, body = \
|
|
self.conn3.make_request('PUT', self.bucket,
|
|
headers=req_headers, query='acl')
|
|
self.assertEqual(get_error_code(body), 'AccessDenied')
|
|
|
|
def test_get_bucket_acl_error(self):
|
|
aws_error_conn = Connection(aws_secret_key='invalid')
|
|
status, headers, body = \
|
|
aws_error_conn.make_request('GET', self.bucket, query='acl')
|
|
self.assertEqual(get_error_code(body), 'SignatureDoesNotMatch')
|
|
|
|
status, headers, body = \
|
|
self.conn.make_request('GET', 'nothing', query='acl')
|
|
self.assertEqual(get_error_code(body), 'NoSuchBucket')
|
|
|
|
status, headers, body = \
|
|
self.conn3.make_request('GET', self.bucket, query='acl')
|
|
self.assertEqual(get_error_code(body), 'AccessDenied')
|
|
|
|
def test_get_object_acl_error(self):
|
|
self.conn.make_request('PUT', self.bucket, self.obj)
|
|
|
|
aws_error_conn = Connection(aws_secret_key='invalid')
|
|
status, headers, body = \
|
|
aws_error_conn.make_request('GET', self.bucket, self.obj,
|
|
query='acl')
|
|
self.assertEqual(get_error_code(body), 'SignatureDoesNotMatch')
|
|
|
|
status, headers, body = \
|
|
self.conn.make_request('GET', self.bucket, 'nothing', query='acl')
|
|
self.assertEqual(get_error_code(body), 'NoSuchKey')
|
|
|
|
status, headers, body = \
|
|
self.conn3.make_request('GET', self.bucket, self.obj, query='acl')
|
|
self.assertEqual(get_error_code(body), 'AccessDenied')
|
|
|
|
|
|
class TestS3AclSigV4(TestS3Acl):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
os.environ['S3_USE_SIGV4'] = "True"
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
del os.environ['S3_USE_SIGV4']
|
|
|
|
def setUp(self):
|
|
super(TestS3AclSigV4, self).setUp()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest2.main()
|