Merge "additional functional tests for account acls"
This commit is contained in:
commit
a5cd0935e3
@ -19,10 +19,13 @@ import socket
|
||||
import sys
|
||||
from time import sleep
|
||||
from urlparse import urlparse
|
||||
import functools
|
||||
from nose import SkipTest
|
||||
|
||||
from test import get_config
|
||||
|
||||
from swiftclient import get_auth, http_connection
|
||||
from test.functional.swift_test_client import Connection
|
||||
|
||||
conf = get_config('func_test')
|
||||
web_front_end = conf.get('web_front_end', 'integral')
|
||||
@ -184,3 +187,45 @@ def check_response(conn):
|
||||
resp.read()
|
||||
raise InternalServerError()
|
||||
return resp
|
||||
|
||||
cluster_info = {}
|
||||
|
||||
|
||||
def get_cluster_info():
|
||||
conn = Connection(conf)
|
||||
conn.authenticate()
|
||||
global cluster_info
|
||||
cluster_info = conn.cluster_info()
|
||||
|
||||
|
||||
def reset_acl():
|
||||
def post(url, token, parsed, conn):
|
||||
conn.request('POST', parsed.path, '', {
|
||||
'X-Auth-Token': token,
|
||||
'X-Account-Access-Control': '{}'
|
||||
})
|
||||
return check_response(conn)
|
||||
resp = retry(post, use_account=1)
|
||||
resp.read()
|
||||
|
||||
|
||||
def requires_acls(f):
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
if skip:
|
||||
raise SkipTest
|
||||
if not cluster_info:
|
||||
get_cluster_info()
|
||||
# Determine whether this cluster has account ACLs; if not, skip test
|
||||
if not cluster_info.get('tempauth', {}).get('account_acls'):
|
||||
raise SkipTest
|
||||
if 'keystoneauth' in cluster_info:
|
||||
# remove when keystoneauth supports account acls
|
||||
raise SkipTest
|
||||
reset_acl()
|
||||
try:
|
||||
rv = f(*args, **kwargs)
|
||||
finally:
|
||||
reset_acl()
|
||||
return rv
|
||||
return wrapper
|
||||
|
@ -17,15 +17,17 @@
|
||||
|
||||
import unittest
|
||||
import json
|
||||
from uuid import uuid4
|
||||
from nose import SkipTest
|
||||
from string import letters
|
||||
|
||||
from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
|
||||
MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
|
||||
from swift.common.middleware.acl import format_acl
|
||||
from test.functional.swift_test_client import Connection
|
||||
from test import get_config
|
||||
from swift_testing import check_response, retry, skip, web_front_end
|
||||
from swift_testing import (check_response, retry, skip, skip2, skip3,
|
||||
web_front_end, requires_acls)
|
||||
import swift_testing
|
||||
from test.functional.tests import load_constraint
|
||||
|
||||
|
||||
class TestAccount(unittest.TestCase):
|
||||
@ -70,21 +72,316 @@ class TestAccount(unittest.TestCase):
|
||||
self.assert_(resp.status in (200, 204), resp.status)
|
||||
self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
|
||||
|
||||
def test_tempauth_account_acls(self):
|
||||
if skip:
|
||||
def test_invalid_acls(self):
|
||||
def post(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
# needs to be an acceptable header size
|
||||
num_keys = 8
|
||||
max_key_size = load_constraint('max_header_size') / num_keys
|
||||
acl = {'admin': [c * max_key_size for c in letters[:num_keys]]}
|
||||
headers = {'x-account-access-control': format_acl(
|
||||
version=2, acl_dict=acl)}
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 400)
|
||||
|
||||
# and again a touch smaller
|
||||
acl = {'admin': [c * max_key_size for c in letters[:num_keys - 1]]}
|
||||
headers = {'x-account-access-control': format_acl(
|
||||
version=2, acl_dict=acl)}
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
@requires_acls
|
||||
def test_invalid_acl_keys(self):
|
||||
def post(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
# needs to be json
|
||||
resp = retry(post, headers={'X-Account-Access-Control': 'invalid'},
|
||||
use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 400)
|
||||
|
||||
acl_user = swift_testing.swift_test_user[1]
|
||||
acl = {'admin': [acl_user], 'invalid_key': 'invalid_value'}
|
||||
headers = {'x-account-access-control': format_acl(
|
||||
version=2, acl_dict=acl)}
|
||||
|
||||
resp = retry(post, headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 400)
|
||||
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
|
||||
|
||||
@requires_acls
|
||||
def test_invalid_acl_values(self):
|
||||
def post(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
acl = {'admin': 'invalid_value'}
|
||||
headers = {'x-account-access-control': format_acl(
|
||||
version=2, acl_dict=acl)}
|
||||
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 400)
|
||||
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
|
||||
|
||||
@requires_acls
|
||||
def test_read_only_acl(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
# Determine whether this cluster has account ACLs; if not, skip test
|
||||
conn = Connection(get_config('func_test'))
|
||||
conn.authenticate()
|
||||
cluster_info = conn.cluster_info()
|
||||
if not cluster_info.get('tempauth', {}).get('account_acls'):
|
||||
def get(url, token, parsed, conn):
|
||||
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
# cannot read account
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# grant read access
|
||||
acl_user = swift_testing.swift_test_user[2]
|
||||
acl = {'read-only': [acl_user]}
|
||||
headers = {'x-account-access-control': format_acl(
|
||||
version=2, acl_dict=acl)}
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# read-only can read account headers
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assert_(resp.status in (200, 204))
|
||||
# but not acls
|
||||
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
|
||||
|
||||
# read-only can not write metadata
|
||||
headers = {'x-account-meta-test': 'value'}
|
||||
resp = retry(post, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 403)
|
||||
|
||||
# but they can read it
|
||||
headers = {'x-account-meta-test': 'value'}
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assert_(resp.status in (200, 204))
|
||||
self.assertEqual(resp.getheader('X-Account-Meta-Test'), 'value')
|
||||
|
||||
@requires_acls
|
||||
def test_read_write_acl(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
if 'keystoneauth' in cluster_info:
|
||||
# Unfortunate hack -- tempauth (with account ACLs) is expected
|
||||
# to play nice with Keystone (without account ACLs), but Zuul
|
||||
# functest framework doesn't give us an easy way to get a
|
||||
# tempauth user.
|
||||
|
||||
def get(url, token, parsed, conn):
|
||||
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
# cannot read account
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# grant read-write access
|
||||
acl_user = swift_testing.swift_test_user[2]
|
||||
acl = {'read-write': [acl_user]}
|
||||
headers = {'x-account-access-control': format_acl(
|
||||
version=2, acl_dict=acl)}
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# read-write can read account headers
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assert_(resp.status in (200, 204))
|
||||
# but not acls
|
||||
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
|
||||
|
||||
# read-write can not write account metadata
|
||||
headers = {'x-account-meta-test': 'value'}
|
||||
resp = retry(post, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 403)
|
||||
|
||||
@requires_acls
|
||||
def test_admin_acl(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get(url, token, parsed, conn):
|
||||
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
# cannot read account
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# grant admin access
|
||||
acl_user = swift_testing.swift_test_user[2]
|
||||
acl = {'admin': [acl_user]}
|
||||
acl_json_str = format_acl(version=2, acl_dict=acl)
|
||||
headers = {'x-account-access-control': acl_json_str}
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# admin can read account headers
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assert_(resp.status in (200, 204))
|
||||
# including acls
|
||||
self.assertEqual(resp.getheader('X-Account-Access-Control'),
|
||||
acl_json_str)
|
||||
|
||||
# admin can write account metadata
|
||||
value = str(uuid4())
|
||||
headers = {'x-account-meta-test': value}
|
||||
resp = retry(post, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assert_(resp.status in (200, 204))
|
||||
self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
|
||||
|
||||
# admin can even revoke their own access
|
||||
headers = {'x-account-access-control': '{}'}
|
||||
resp = retry(post, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# and again, cannot read account
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
@requires_acls
|
||||
def test_protected_tempurl(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get(url, token, parsed, conn):
|
||||
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
# add a account metadata, and temp-url-key to account
|
||||
value = str(uuid4())
|
||||
headers = {
|
||||
'x-account-meta-temp-url-key': 'secret',
|
||||
'x-account-meta-test': value,
|
||||
}
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# grant read-only access to tester3
|
||||
acl_user = swift_testing.swift_test_user[2]
|
||||
acl = {'read-only': [acl_user]}
|
||||
acl_json_str = format_acl(version=2, acl_dict=acl)
|
||||
headers = {'x-account-access-control': acl_json_str}
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# read-only tester3 can read account metadata
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assert_(resp.status in (200, 204),
|
||||
'Expected status in (200, 204), got %s' % resp.status)
|
||||
self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
|
||||
# but not temp-url-key
|
||||
self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None)
|
||||
|
||||
# grant read-write access to tester3
|
||||
acl_user = swift_testing.swift_test_user[2]
|
||||
acl = {'read-write': [acl_user]}
|
||||
acl_json_str = format_acl(version=2, acl_dict=acl)
|
||||
headers = {'x-account-access-control': acl_json_str}
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# read-write tester3 can read account metadata
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assert_(resp.status in (200, 204),
|
||||
'Expected status in (200, 204), got %s' % resp.status)
|
||||
self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
|
||||
# but not temp-url-key
|
||||
self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None)
|
||||
|
||||
# grant admin access to tester3
|
||||
acl_user = swift_testing.swift_test_user[2]
|
||||
acl = {'admin': [acl_user]}
|
||||
acl_json_str = format_acl(version=2, acl_dict=acl)
|
||||
headers = {'x-account-access-control': acl_json_str}
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# admin tester3 can read account metadata
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assert_(resp.status in (200, 204),
|
||||
'Expected status in (200, 204), got %s' % resp.status)
|
||||
self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
|
||||
# including temp-url-key
|
||||
self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'),
|
||||
'secret')
|
||||
|
||||
# admin tester3 can even change temp-url-key
|
||||
secret = str(uuid4())
|
||||
headers = {
|
||||
'x-account-meta-temp-url-key': secret,
|
||||
}
|
||||
resp = retry(post, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assert_(resp.status in (200, 204),
|
||||
'Expected status in (200, 204), got %s' % resp.status)
|
||||
self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'),
|
||||
secret)
|
||||
|
||||
@requires_acls
|
||||
def test_account_acls(self):
|
||||
if skip2:
|
||||
raise SkipTest
|
||||
|
||||
def post(url, token, parsed, conn, headers):
|
||||
@ -206,6 +503,137 @@ class TestAccount(unittest.TestCase):
|
||||
use_account=1)
|
||||
resp.read()
|
||||
|
||||
@requires_acls
|
||||
def test_swift_account_acls(self):
|
||||
if skip:
|
||||
raise SkipTest
|
||||
|
||||
def post(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def head(url, token, parsed, conn):
|
||||
conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def get(url, token, parsed, conn):
|
||||
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
try:
|
||||
# User1 can POST to their own account
|
||||
resp = retry(post, headers={'X-Account-Access-Control': '{}'})
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
|
||||
|
||||
# User1 can GET their own empty account
|
||||
resp = retry(get)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status // 100, 2)
|
||||
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
|
||||
|
||||
# User1 can POST non-empty data
|
||||
acl_json = '{"admin":["bob"]}'
|
||||
resp = retry(post, headers={'X-Account-Access-Control': acl_json})
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# User1 can GET the non-empty data
|
||||
resp = retry(get)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status // 100, 2)
|
||||
self.assertEqual(resp.getheader('X-Account-Access-Control'),
|
||||
acl_json)
|
||||
|
||||
# POST non-JSON ACL should fail
|
||||
resp = retry(post, headers={'X-Account-Access-Control': 'yuck'})
|
||||
resp.read()
|
||||
# resp.status will be 400 if tempauth or some other ACL-aware
|
||||
# auth middleware rejects it, or 200 (but silently swallowed by
|
||||
# core Swift) if ACL-unaware auth middleware approves it.
|
||||
|
||||
# A subsequent GET should show the old, valid data, not the garbage
|
||||
resp = retry(get)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status // 100, 2)
|
||||
self.assertEqual(resp.getheader('X-Account-Access-Control'),
|
||||
acl_json)
|
||||
|
||||
finally:
|
||||
# Make sure to clean up even if tests fail -- User2 should not
|
||||
# have access to User1's account in other functional tests!
|
||||
resp = retry(post, headers={'X-Account-Access-Control': '{}'})
|
||||
resp.read()
|
||||
|
||||
def test_swift_prohibits_garbage_account_acls(self):
|
||||
if skip:
|
||||
raise SkipTest
|
||||
|
||||
def post(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def get(url, token, parsed, conn):
|
||||
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
try:
|
||||
# User1 can POST to their own account
|
||||
resp = retry(post, headers={'X-Account-Access-Control': '{}'})
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
|
||||
|
||||
# User1 can GET their own empty account
|
||||
resp = retry(get)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status // 100, 2)
|
||||
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
|
||||
|
||||
# User1 can POST non-empty data
|
||||
acl_json = '{"admin":["bob"]}'
|
||||
resp = retry(post, headers={'X-Account-Access-Control': acl_json})
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
# If this request is handled by ACL-aware auth middleware, then the
|
||||
# ACL will be persisted. If it is handled by ACL-unaware auth
|
||||
# middleware, then the header will be thrown out. But the request
|
||||
# should return successfully in any case.
|
||||
|
||||
# User1 can GET the non-empty data
|
||||
resp = retry(get)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status // 100, 2)
|
||||
# ACL will be set if some ACL-aware auth middleware (e.g. tempauth)
|
||||
# propagates it to sysmeta; if no ACL-aware auth middleware does,
|
||||
# then X-Account-Access-Control will still be empty.
|
||||
|
||||
# POST non-JSON ACL should fail
|
||||
resp = retry(post, headers={'X-Account-Access-Control': 'yuck'})
|
||||
resp.read()
|
||||
# resp.status will be 400 if tempauth or some other ACL-aware
|
||||
# auth middleware rejects it, or 200 (but silently swallowed by
|
||||
# core Swift) if ACL-unaware auth middleware approves it.
|
||||
|
||||
# A subsequent GET should either show the old, valid data (if
|
||||
# ACL-aware auth middleware is propagating it) or show nothing
|
||||
# (if no auth middleware in the pipeline is ACL-aware), but should
|
||||
# never return the garbage ACL.
|
||||
resp = retry(get)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status // 100, 2)
|
||||
self.assertNotEqual(resp.getheader('X-Account-Access-Control'),
|
||||
'yuck')
|
||||
|
||||
finally:
|
||||
# Make sure to clean up even if tests fail -- User2 should not
|
||||
# have access to User1's account in other functional tests!
|
||||
resp = retry(post, headers={'X-Account-Access-Control': '{}'})
|
||||
resp.read()
|
||||
|
||||
def test_unicode_metadata(self):
|
||||
if skip:
|
||||
raise SkipTest
|
||||
|
@ -24,7 +24,7 @@ from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
|
||||
MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
|
||||
|
||||
from swift_testing import check_response, retry, skip, skip2, skip3, \
|
||||
swift_test_perm, web_front_end
|
||||
swift_test_perm, web_front_end, requires_acls, swift_test_user
|
||||
|
||||
|
||||
class TestContainer(unittest.TestCase):
|
||||
@ -650,6 +650,657 @@ class TestContainer(unittest.TestCase):
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 201)
|
||||
|
||||
@requires_acls
|
||||
def test_read_only_acl_listings(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get(url, token, parsed, conn):
|
||||
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post_account(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def put(url, token, parsed, conn, name):
|
||||
conn.request('PUT', parsed.path + '/%s' % name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
# cannot list containers
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# grant read-only access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'read-only': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# read-only can list containers
|
||||
resp = retry(get, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(self.name in listing)
|
||||
|
||||
# read-only can not create containers
|
||||
new_container_name = str(uuid4())
|
||||
resp = retry(put, new_container_name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# but it can see newly created ones
|
||||
resp = retry(put, new_container_name, use_account=1)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 201)
|
||||
resp = retry(get, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(new_container_name in listing)
|
||||
|
||||
@requires_acls
|
||||
def test_read_only_acl_metadata(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get(url, token, parsed, conn, name):
|
||||
conn.request('GET', parsed.path + '/%s' % name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post_account(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def post(url, token, parsed, conn, name, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
# add some metadata
|
||||
value = str(uuid4())
|
||||
headers = {'x-container-meta-test': value}
|
||||
resp = retry(post, self.name, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=1)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
|
||||
|
||||
# cannot see metadata
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# grant read-only access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'read-only': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# read-only can NOT write container metadata
|
||||
new_value = str(uuid4())
|
||||
headers = {'x-container-meta-test': new_value}
|
||||
resp = retry(post, self.name, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 403)
|
||||
|
||||
# read-only can read container metadata
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
|
||||
|
||||
@requires_acls
|
||||
def test_read_write_acl_listings(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get(url, token, parsed, conn):
|
||||
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def put(url, token, parsed, conn, name):
|
||||
conn.request('PUT', parsed.path + '/%s' % name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def delete(url, token, parsed, conn, name):
|
||||
conn.request('DELETE', parsed.path + '/%s' % name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
# cannot list containers
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# grant read-write access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'read-write': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# can list containers
|
||||
resp = retry(get, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(self.name in listing)
|
||||
|
||||
# can create new containers
|
||||
new_container_name = str(uuid4())
|
||||
resp = retry(put, new_container_name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 201)
|
||||
resp = retry(get, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(new_container_name in listing)
|
||||
|
||||
# can also delete them
|
||||
resp = retry(delete, new_container_name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
resp = retry(get, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(new_container_name not in listing)
|
||||
|
||||
# even if they didn't create them
|
||||
empty_container_name = str(uuid4())
|
||||
resp = retry(put, empty_container_name, use_account=1)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 201)
|
||||
resp = retry(delete, empty_container_name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
|
||||
@requires_acls
|
||||
def test_read_write_acl_metadata(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get(url, token, parsed, conn, name):
|
||||
conn.request('GET', parsed.path + '/%s' % name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post_account(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def post(url, token, parsed, conn, name, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
# add some metadata
|
||||
value = str(uuid4())
|
||||
headers = {'x-container-meta-test': value}
|
||||
resp = retry(post, self.name, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=1)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
|
||||
|
||||
# cannot see metadata
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# grant read-write access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'read-write': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# read-write can read container metadata
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
|
||||
|
||||
# read-write can also write container metadata
|
||||
new_value = str(uuid4())
|
||||
headers = {'x-container-meta-test': new_value}
|
||||
resp = retry(post, self.name, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
|
||||
|
||||
# and remove it
|
||||
headers = {'x-remove-container-meta-test': 'true'}
|
||||
resp = retry(post, self.name, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), None)
|
||||
|
||||
@requires_acls
|
||||
def test_admin_acl_listing(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get(url, token, parsed, conn):
|
||||
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def put(url, token, parsed, conn, name):
|
||||
conn.request('PUT', parsed.path + '/%s' % name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def delete(url, token, parsed, conn, name):
|
||||
conn.request('DELETE', parsed.path + '/%s' % name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
# cannot list containers
|
||||
resp = retry(get, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# grant admin access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'admin': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# can list containers
|
||||
resp = retry(get, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(self.name in listing)
|
||||
|
||||
# can create new containers
|
||||
new_container_name = str(uuid4())
|
||||
resp = retry(put, new_container_name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 201)
|
||||
resp = retry(get, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(new_container_name in listing)
|
||||
|
||||
# can also delete them
|
||||
resp = retry(delete, new_container_name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
resp = retry(get, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(new_container_name not in listing)
|
||||
|
||||
# even if they didn't create them
|
||||
empty_container_name = str(uuid4())
|
||||
resp = retry(put, empty_container_name, use_account=1)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 201)
|
||||
resp = retry(delete, empty_container_name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
|
||||
@requires_acls
|
||||
def test_admin_acl_metadata(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get(url, token, parsed, conn, name):
|
||||
conn.request('GET', parsed.path + '/%s' % name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post_account(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def post(url, token, parsed, conn, name, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
# add some metadata
|
||||
value = str(uuid4())
|
||||
headers = {'x-container-meta-test': value}
|
||||
resp = retry(post, self.name, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=1)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
|
||||
|
||||
# cannot see metadata
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# grant access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'admin': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# can read container metadata
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
|
||||
|
||||
# can also write container metadata
|
||||
new_value = str(uuid4())
|
||||
headers = {'x-container-meta-test': new_value}
|
||||
resp = retry(post, self.name, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
|
||||
|
||||
# and remove it
|
||||
headers = {'x-remove-container-meta-test': 'true'}
|
||||
resp = retry(post, self.name, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), None)
|
||||
|
||||
@requires_acls
|
||||
def test_protected_container_sync(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get(url, token, parsed, conn, name):
|
||||
conn.request('GET', parsed.path + '/%s' % name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post_account(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def post(url, token, parsed, conn, name, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
# add some metadata
|
||||
value = str(uuid4())
|
||||
headers = {
|
||||
'x-container-sync-key': 'secret',
|
||||
'x-container-meta-test': value,
|
||||
}
|
||||
resp = retry(post, self.name, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=1)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
|
||||
|
||||
# grant read-only access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'read-only': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# can read container metadata
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
|
||||
# but not sync-key
|
||||
self.assertEqual(resp.getheader('X-Container-Sync-Key'), None)
|
||||
|
||||
# and can not write
|
||||
headers = {'x-container-sync-key': str(uuid4())}
|
||||
resp = retry(post, self.name, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 403)
|
||||
|
||||
# grant read-write access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'read-write': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# can read container metadata
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
|
||||
# but not sync-key
|
||||
self.assertEqual(resp.getheader('X-Container-Sync-Key'), None)
|
||||
|
||||
# sanity check sync-key w/ account1
|
||||
resp = retry(get, self.name, use_account=1)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
|
||||
|
||||
# and can write
|
||||
new_value = str(uuid4())
|
||||
headers = {
|
||||
'x-container-sync-key': str(uuid4()),
|
||||
'x-container-meta-test': new_value,
|
||||
}
|
||||
resp = retry(post, self.name, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=1) # validate w/ account1
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
|
||||
# but can not write sync-key
|
||||
self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
|
||||
|
||||
# grant admin access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'admin': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# admin can read container metadata
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
|
||||
# and ALSO sync-key
|
||||
self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
|
||||
|
||||
# admin tester3 can even change sync-key
|
||||
new_secret = str(uuid4())
|
||||
headers = {'x-container-sync-key': new_secret}
|
||||
resp = retry(post, self.name, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Sync-Key'), new_secret)
|
||||
|
||||
@requires_acls
|
||||
def test_protected_container_acl(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get(url, token, parsed, conn, name):
|
||||
conn.request('GET', parsed.path + '/%s' % name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post_account(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def post(url, token, parsed, conn, name, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
# add some container acls
|
||||
value = str(uuid4())
|
||||
headers = {
|
||||
'x-container-read': 'jdoe',
|
||||
'x-container-write': 'jdoe',
|
||||
'x-container-meta-test': value,
|
||||
}
|
||||
resp = retry(post, self.name, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=1)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe')
|
||||
self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
|
||||
|
||||
# grant read-only access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'read-only': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# can read container metadata
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
|
||||
# but not container acl
|
||||
self.assertEqual(resp.getheader('X-Container-Read'), None)
|
||||
self.assertEqual(resp.getheader('X-Container-Write'), None)
|
||||
|
||||
# and can not write
|
||||
headers = {
|
||||
'x-container-read': 'frank',
|
||||
'x-container-write': 'frank',
|
||||
}
|
||||
resp = retry(post, self.name, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 403)
|
||||
|
||||
# grant read-write access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'read-write': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# can read container metadata
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
|
||||
# but not container acl
|
||||
self.assertEqual(resp.getheader('X-Container-Read'), None)
|
||||
self.assertEqual(resp.getheader('X-Container-Write'), None)
|
||||
|
||||
# sanity check container acls with account1
|
||||
resp = retry(get, self.name, use_account=1)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe')
|
||||
self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
|
||||
|
||||
# and can write
|
||||
new_value = str(uuid4())
|
||||
headers = {
|
||||
'x-container-read': 'frank',
|
||||
'x-container-write': 'frank',
|
||||
'x-container-meta-test': new_value,
|
||||
}
|
||||
resp = retry(post, self.name, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=1) # validate w/ account1
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
|
||||
# but can not write container acls
|
||||
self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe')
|
||||
self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
|
||||
|
||||
# grant admin access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'admin': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# admin can read container metadata
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
|
||||
# and ALSO container acls
|
||||
self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe')
|
||||
self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
|
||||
|
||||
# admin tester3 can even change container acls
|
||||
new_value = str(uuid4())
|
||||
headers = {
|
||||
'x-container-read': '.r:*',
|
||||
}
|
||||
resp = retry(post, self.name, headers=headers, use_account=3)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
resp = retry(get, self.name, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
self.assertEqual(resp.getheader('X-Container-Read'), '.r:*')
|
||||
|
||||
def test_long_name_content_type(self):
|
||||
if skip:
|
||||
raise SkipTest
|
||||
|
@ -19,10 +19,11 @@ import unittest
|
||||
from nose import SkipTest
|
||||
from uuid import uuid4
|
||||
|
||||
from swift_testing import check_response, retry, skip, skip3, \
|
||||
swift_test_perm, web_front_end
|
||||
from swift.common.utils import json
|
||||
|
||||
from swift_testing import check_response, retry, skip, skip3, \
|
||||
swift_test_perm, web_front_end, requires_acls, swift_test_user
|
||||
|
||||
|
||||
class TestObject(unittest.TestCase):
|
||||
|
||||
@ -290,6 +291,249 @@ class TestObject(unittest.TestCase):
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
|
||||
@requires_acls
|
||||
def test_read_only(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get_listing(url, token, parsed, conn):
|
||||
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post_account(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def get(url, token, parsed, conn, name):
|
||||
conn.request('GET', '%s/%s/%s' % (
|
||||
parsed.path, self.container, name), '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def put(url, token, parsed, conn, name):
|
||||
conn.request('PUT', '%s/%s/%s' % (
|
||||
parsed.path, self.container, name), 'test',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def delete(url, token, parsed, conn, name):
|
||||
conn.request('PUT', '%s/%s/%s' % (
|
||||
parsed.path, self.container, name), '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
# cannot list objects
|
||||
resp = retry(get_listing, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# cannot get object
|
||||
resp = retry(get, self.obj, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# grant read-only access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'read-only': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# can list objects
|
||||
resp = retry(get_listing, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(self.obj in listing)
|
||||
|
||||
# can get object
|
||||
resp = retry(get, self.obj, use_account=3)
|
||||
body = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assertEquals(body, 'test')
|
||||
|
||||
# can not put an object
|
||||
obj_name = str(uuid4())
|
||||
resp = retry(put, obj_name, use_account=3)
|
||||
body = resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# can not delete an object
|
||||
resp = retry(delete, self.obj, use_account=3)
|
||||
body = resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# sanity with account1
|
||||
resp = retry(get_listing, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(obj_name not in listing)
|
||||
self.assert_(self.obj in listing)
|
||||
|
||||
@requires_acls
|
||||
def test_read_write(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get_listing(url, token, parsed, conn):
|
||||
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post_account(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def get(url, token, parsed, conn, name):
|
||||
conn.request('GET', '%s/%s/%s' % (
|
||||
parsed.path, self.container, name), '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def put(url, token, parsed, conn, name):
|
||||
conn.request('PUT', '%s/%s/%s' % (
|
||||
parsed.path, self.container, name), 'test',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def delete(url, token, parsed, conn, name):
|
||||
conn.request('DELETE', '%s/%s/%s' % (
|
||||
parsed.path, self.container, name), '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
# cannot list objects
|
||||
resp = retry(get_listing, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# cannot get object
|
||||
resp = retry(get, self.obj, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# grant read-write access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'read-write': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# can list objects
|
||||
resp = retry(get_listing, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(self.obj in listing)
|
||||
|
||||
# can get object
|
||||
resp = retry(get, self.obj, use_account=3)
|
||||
body = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assertEquals(body, 'test')
|
||||
|
||||
# can put an object
|
||||
obj_name = str(uuid4())
|
||||
resp = retry(put, obj_name, use_account=3)
|
||||
body = resp.read()
|
||||
self.assertEquals(resp.status, 201)
|
||||
|
||||
# can delete an object
|
||||
resp = retry(delete, self.obj, use_account=3)
|
||||
body = resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
|
||||
# sanity with account1
|
||||
resp = retry(get_listing, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(obj_name in listing)
|
||||
self.assert_(self.obj not in listing)
|
||||
|
||||
@requires_acls
|
||||
def test_admin(self):
|
||||
if skip3:
|
||||
raise SkipTest
|
||||
|
||||
def get_listing(url, token, parsed, conn):
|
||||
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def post_account(url, token, parsed, conn, headers):
|
||||
new_headers = dict({'X-Auth-Token': token}, **headers)
|
||||
conn.request('POST', parsed.path, '', new_headers)
|
||||
return check_response(conn)
|
||||
|
||||
def get(url, token, parsed, conn, name):
|
||||
conn.request('GET', '%s/%s/%s' % (
|
||||
parsed.path, self.container, name), '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def put(url, token, parsed, conn, name):
|
||||
conn.request('PUT', '%s/%s/%s' % (
|
||||
parsed.path, self.container, name), 'test',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def delete(url, token, parsed, conn, name):
|
||||
conn.request('DELETE', '%s/%s/%s' % (
|
||||
parsed.path, self.container, name), '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
# cannot list objects
|
||||
resp = retry(get_listing, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# cannot get object
|
||||
resp = retry(get, self.obj, use_account=3)
|
||||
resp.read()
|
||||
self.assertEquals(resp.status, 403)
|
||||
|
||||
# grant admin access
|
||||
acl_user = swift_test_user[2]
|
||||
acl = {'admin': [acl_user]}
|
||||
headers = {'x-account-access-control': json.dumps(acl)}
|
||||
resp = retry(post_account, headers=headers, use_account=1)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# can list objects
|
||||
resp = retry(get_listing, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(self.obj in listing)
|
||||
|
||||
# can get object
|
||||
resp = retry(get, self.obj, use_account=3)
|
||||
body = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assertEquals(body, 'test')
|
||||
|
||||
# can put an object
|
||||
obj_name = str(uuid4())
|
||||
resp = retry(put, obj_name, use_account=3)
|
||||
body = resp.read()
|
||||
self.assertEquals(resp.status, 201)
|
||||
|
||||
# can delete an object
|
||||
resp = retry(delete, self.obj, use_account=3)
|
||||
body = resp.read()
|
||||
self.assertEquals(resp.status, 204)
|
||||
|
||||
# sanity with account1
|
||||
resp = retry(get_listing, use_account=3)
|
||||
listing = resp.read()
|
||||
self.assertEquals(resp.status, 200)
|
||||
self.assert_(obj_name in listing)
|
||||
self.assert_(self.obj not in listing)
|
||||
|
||||
def test_manifest(self):
|
||||
if skip:
|
||||
raise SkipTest
|
||||
|
Loading…
x
Reference in New Issue
Block a user