Sync with OpenStack Swift v1.13.1

Signed-off-by: Prashanth Pai <ppai@redhat.com>
This commit is contained in:
Prashanth Pai 2014-05-02 15:34:34 +05:30
parent b6d1671b61
commit d23fd1b56c
16 changed files with 2793 additions and 593 deletions

View File

@ -1,10 +0,0 @@
#!/bin/bash
SRC_DIR=$(dirname $0)
cd ${SRC_DIR}/test/functional
nosetests --exe $@
func1=$?
cd -
exit $func1

View File

@ -45,6 +45,6 @@ class PkgInfo(object):
###
### Change the Package version here
###
_pkginfo = PkgInfo('1.13.0', '0', 'gluster_swift', False)
_pkginfo = PkgInfo('1.13.1', '0', 'gluster_swift', False)
__version__ = _pkginfo.pretty_version
__canonical_version__ = _pkginfo.canonical_version

View File

@ -144,6 +144,7 @@ class Connection(object):
auth_scheme = 'https://' if self.auth_ssl else 'http://'
auth_netloc = "%s:%d" % (self.auth_host, self.auth_port)
auth_url = auth_scheme + auth_netloc + auth_path
(storage_url, storage_token) = get_auth(
auth_url, auth_user, self.password, snet=False,
tenant_name=self.account, auth_version=self.auth_version,
@ -166,17 +167,29 @@ class Connection(object):
self.storage_host = x[2].split(':')[0]
if ':' in x[2]:
self.storage_port = int(x[2].split(':')[1])
# Make sure storage_url and the storage_token are
# strings and not unicode, since
# Make sure storage_url is a string and not unicode, since
# keystoneclient (called by swiftclient) returns them in
# unicode and this would cause troubles when doing
# no_safe_quote query.
self.storage_url = str('/%s/%s' % (x[3], x[4]))
self.storage_token = str(storage_token)
self.storage_token = storage_token
self.http_connect()
return self.storage_url, self.storage_token
def cluster_info(self):
"""
Retrieve the data in /info, or {} on 404
"""
status = self.make_request('GET', '/info',
cfg={'absolute_path': True})
if status == 404:
return {}
if not 200 <= status <= 299:
raise ResponseError(self.response, 'GET', '/info')
return json.loads(self.response.read())
def http_connect(self):
self.connection = self.conn_class(self.storage_host,
port=self.storage_port)
@ -207,8 +220,8 @@ class Connection(object):
def make_request(self, method, path=[], data='', hdrs={}, parms={},
cfg={}):
if not cfg.get('verbatim_path'):
# Set verbatim_path=True to make a request to exactly the given
if not cfg.get('absolute_path'):
# Set absolute_path=True to make a request to exactly the given
# path, not storage path + given path. Useful for
# non-account/container/object requests.
path = self.make_path(path, cfg=cfg)
@ -305,7 +318,7 @@ class Connection(object):
return self.response.status
class Base:
class Base(object):
def __str__(self):
return self.name
@ -339,6 +352,16 @@ class Account(Base):
self.conn = conn
self.name = str(name)
def update_metadata(self, metadata={}, cfg={}):
headers = dict(("X-Account-Meta-%s" % k, v)
for k, v in metadata.items())
self.conn.make_request('POST', self.path, hdrs=headers, cfg=cfg)
if not 200 <= self.conn.response.status <= 299:
raise ResponseError(self.conn.response, 'POST',
self.conn.make_path(self.path))
return True
def container(self, container_name):
return Container(self.conn, self.name, container_name)

View File

@ -19,10 +19,13 @@ import socket
import sys
from time import sleep
from urlparse import urlparse
import functools
from nose import SkipTest
from test import get_config
from swiftclient import get_auth, http_connection
from test.functional.swift_test_client import Connection
conf = get_config('func_test')
web_front_end = conf.get('web_front_end', 'integral')
@ -184,3 +187,45 @@ def check_response(conn):
resp.read()
raise InternalServerError()
return resp
cluster_info = {}
def get_cluster_info():
conn = Connection(conf)
conn.authenticate()
global cluster_info
cluster_info = conn.cluster_info()
def reset_acl():
def post(url, token, parsed, conn):
conn.request('POST', parsed.path, '', {
'X-Auth-Token': token,
'X-Account-Access-Control': '{}'
})
return check_response(conn)
resp = retry(post, use_account=1)
resp.read()
def requires_acls(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
if skip:
raise SkipTest
if not cluster_info:
get_cluster_info()
# Determine whether this cluster has account ACLs; if not, skip test
if not cluster_info.get('tempauth', {}).get('account_acls'):
raise SkipTest
if 'keystoneauth' in cluster_info:
# remove when keystoneauth supports account acls
raise SkipTest
reset_acl()
try:
rv = f(*args, **kwargs)
finally:
reset_acl()
return rv
return wrapper

View File

@ -17,15 +17,17 @@
import unittest
import json
from uuid import uuid4
from nose import SkipTest
from string import letters
from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
from swift.common.middleware.acl import format_acl
from test.functional.swift_test_client import Connection
from test import get_config
from swift_testing import check_response, retry, skip, web_front_end
from swift_testing import (check_response, retry, skip, skip2, skip3,
web_front_end, requires_acls)
import swift_testing
from test.functional.tests import load_constraint
class TestAccount(unittest.TestCase):
@ -49,49 +51,338 @@ class TestAccount(unittest.TestCase):
resp = retry(post, '')
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-account-meta-test'), None)
self.assertEqual(resp.getheader('x-account-meta-test'), None)
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-account-meta-test'), None)
self.assertEqual(resp.getheader('x-account-meta-test'), None)
resp = retry(post, 'Value')
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
self.assertEqual(resp.getheader('x-account-meta-test'), 'Value')
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
self.assertEqual(resp.getheader('x-account-meta-test'), 'Value')
def test_tempauth_account_acls(self):
if skip:
def test_invalid_acls(self):
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
# needs to be an acceptable header size
num_keys = 8
max_key_size = load_constraint('max_header_size') / num_keys
acl = {'admin': [c * max_key_size for c in letters[:num_keys]]}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
resp = retry(post, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 400)
# and again a touch smaller
acl = {'admin': [c * max_key_size for c in letters[:num_keys - 1]]}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
resp = retry(post, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
@requires_acls
def test_invalid_acl_keys(self):
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
# needs to be json
resp = retry(post, headers={'X-Account-Access-Control': 'invalid'},
use_account=1)
resp.read()
self.assertEqual(resp.status, 400)
acl_user = swift_testing.swift_test_user[1]
acl = {'admin': [acl_user], 'invalid_key': 'invalid_value'}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
resp = retry(post, headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 400)
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
@requires_acls
def test_invalid_acl_values(self):
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
acl = {'admin': 'invalid_value'}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
resp = retry(post, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 400)
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
@requires_acls
def test_read_only_acl(self):
if skip3:
raise SkipTest
# Determine whether this cluster has account ACLs; if not, skip test
conn = Connection(get_config('func_test'))
conn.authenticate()
status = conn.make_request(
'GET', '/info', cfg={'verbatim_path': True})
if status // 100 != 2:
# Can't tell if account ACLs are enabled; skip tests proactively.
def get(url, token, parsed, conn):
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
return check_response(conn)
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
# cannot read account
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# grant read access
acl_user = swift_testing.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
resp = retry(post, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# read-only can read account headers
resp = retry(get, use_account=3)
resp.read()
self.assert_(resp.status in (200, 204))
# but not acls
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
# read-only can not write metadata
headers = {'x-account-meta-test': 'value'}
resp = retry(post, headers=headers, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# but they can read it
headers = {'x-account-meta-test': 'value'}
resp = retry(post, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
resp = retry(get, use_account=3)
resp.read()
self.assert_(resp.status in (200, 204))
self.assertEqual(resp.getheader('X-Account-Meta-Test'), 'value')
@requires_acls
def test_read_write_acl(self):
if skip3:
raise SkipTest
def get(url, token, parsed, conn):
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
return check_response(conn)
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
# cannot read account
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# grant read-write access
acl_user = swift_testing.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
resp = retry(post, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# read-write can read account headers
resp = retry(get, use_account=3)
resp.read()
self.assert_(resp.status in (200, 204))
# but not acls
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
# read-write can not write account metadata
headers = {'x-account-meta-test': 'value'}
resp = retry(post, headers=headers, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
@requires_acls
def test_admin_acl(self):
if skip3:
raise SkipTest
def get(url, token, parsed, conn):
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
return check_response(conn)
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
# cannot read account
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# grant admin access
acl_user = swift_testing.swift_test_user[2]
acl = {'admin': [acl_user]}
acl_json_str = format_acl(version=2, acl_dict=acl)
headers = {'x-account-access-control': acl_json_str}
resp = retry(post, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# admin can read account headers
resp = retry(get, use_account=3)
resp.read()
self.assert_(resp.status in (200, 204))
# including acls
self.assertEqual(resp.getheader('X-Account-Access-Control'),
acl_json_str)
# admin can write account metadata
value = str(uuid4())
headers = {'x-account-meta-test': value}
resp = retry(post, headers=headers, use_account=3)
resp.read()
self.assertEqual(resp.status, 204)
resp = retry(get, use_account=3)
resp.read()
self.assert_(resp.status in (200, 204))
self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
# admin can even revoke their own access
headers = {'x-account-access-control': '{}'}
resp = retry(post, headers=headers, use_account=3)
resp.read()
self.assertEqual(resp.status, 204)
# and again, cannot read account
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
@requires_acls
def test_protected_tempurl(self):
if skip3:
raise SkipTest
def get(url, token, parsed, conn):
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
return check_response(conn)
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
# add a account metadata, and temp-url-key to account
value = str(uuid4())
headers = {
'x-account-meta-temp-url-key': 'secret',
'x-account-meta-test': value,
}
resp = retry(post, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# grant read-only access to tester3
acl_user = swift_testing.swift_test_user[2]
acl = {'read-only': [acl_user]}
acl_json_str = format_acl(version=2, acl_dict=acl)
headers = {'x-account-access-control': acl_json_str}
resp = retry(post, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# read-only tester3 can read account metadata
resp = retry(get, use_account=3)
resp.read()
self.assert_(resp.status in (200, 204),
'Expected status in (200, 204), got %s' % resp.status)
self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
# but not temp-url-key
self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None)
# grant read-write access to tester3
acl_user = swift_testing.swift_test_user[2]
acl = {'read-write': [acl_user]}
acl_json_str = format_acl(version=2, acl_dict=acl)
headers = {'x-account-access-control': acl_json_str}
resp = retry(post, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# read-write tester3 can read account metadata
resp = retry(get, use_account=3)
resp.read()
self.assert_(resp.status in (200, 204),
'Expected status in (200, 204), got %s' % resp.status)
self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
# but not temp-url-key
self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None)
# grant admin access to tester3
acl_user = swift_testing.swift_test_user[2]
acl = {'admin': [acl_user]}
acl_json_str = format_acl(version=2, acl_dict=acl)
headers = {'x-account-access-control': acl_json_str}
resp = retry(post, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# admin tester3 can read account metadata
resp = retry(get, use_account=3)
resp.read()
self.assert_(resp.status in (200, 204),
'Expected status in (200, 204), got %s' % resp.status)
self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
# including temp-url-key
self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'),
'secret')
# admin tester3 can even change temp-url-key
secret = str(uuid4())
headers = {
'x-account-meta-temp-url-key': secret,
}
resp = retry(post, headers=headers, use_account=3)
resp.read()
self.assertEqual(resp.status, 204)
resp = retry(get, use_account=3)
resp.read()
self.assert_(resp.status in (200, 204),
'Expected status in (200, 204), got %s' % resp.status)
self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'),
secret)
@requires_acls
def test_account_acls(self):
if skip2:
raise SkipTest
else:
cluster_info = json.loads(conn.response.read())
if not cluster_info.get('tempauth', {}).get('account_acls'):
raise SkipTest
if 'keystoneauth' in cluster_info:
# Unfortunate hack -- tempauth (with account ACLs) is expected
# to play nice with Keystone (without account ACLs), but Zuul
# functest framework doesn't give us an easy way to get a
# tempauth user.
raise SkipTest
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
@ -212,6 +503,137 @@ class TestAccount(unittest.TestCase):
use_account=1)
resp.read()
@requires_acls
def test_swift_account_acls(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
def head(url, token, parsed, conn):
conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
return check_response(conn)
def get(url, token, parsed, conn):
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
return check_response(conn)
try:
# User1 can POST to their own account
resp = retry(post, headers={'X-Account-Access-Control': '{}'})
resp.read()
self.assertEqual(resp.status, 204)
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
# User1 can GET their own empty account
resp = retry(get)
resp.read()
self.assertEqual(resp.status // 100, 2)
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
# User1 can POST non-empty data
acl_json = '{"admin":["bob"]}'
resp = retry(post, headers={'X-Account-Access-Control': acl_json})
resp.read()
self.assertEqual(resp.status, 204)
# User1 can GET the non-empty data
resp = retry(get)
resp.read()
self.assertEqual(resp.status // 100, 2)
self.assertEqual(resp.getheader('X-Account-Access-Control'),
acl_json)
# POST non-JSON ACL should fail
resp = retry(post, headers={'X-Account-Access-Control': 'yuck'})
resp.read()
# resp.status will be 400 if tempauth or some other ACL-aware
# auth middleware rejects it, or 200 (but silently swallowed by
# core Swift) if ACL-unaware auth middleware approves it.
# A subsequent GET should show the old, valid data, not the garbage
resp = retry(get)
resp.read()
self.assertEqual(resp.status // 100, 2)
self.assertEqual(resp.getheader('X-Account-Access-Control'),
acl_json)
finally:
# Make sure to clean up even if tests fail -- User2 should not
# have access to User1's account in other functional tests!
resp = retry(post, headers={'X-Account-Access-Control': '{}'})
resp.read()
def test_swift_prohibits_garbage_account_acls(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
def get(url, token, parsed, conn):
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
return check_response(conn)
try:
# User1 can POST to their own account
resp = retry(post, headers={'X-Account-Access-Control': '{}'})
resp.read()
self.assertEqual(resp.status, 204)
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
# User1 can GET their own empty account
resp = retry(get)
resp.read()
self.assertEqual(resp.status // 100, 2)
self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
# User1 can POST non-empty data
acl_json = '{"admin":["bob"]}'
resp = retry(post, headers={'X-Account-Access-Control': acl_json})
resp.read()
self.assertEqual(resp.status, 204)
# If this request is handled by ACL-aware auth middleware, then the
# ACL will be persisted. If it is handled by ACL-unaware auth
# middleware, then the header will be thrown out. But the request
# should return successfully in any case.
# User1 can GET the non-empty data
resp = retry(get)
resp.read()
self.assertEqual(resp.status // 100, 2)
# ACL will be set if some ACL-aware auth middleware (e.g. tempauth)
# propagates it to sysmeta; if no ACL-aware auth middleware does,
# then X-Account-Access-Control will still be empty.
# POST non-JSON ACL should fail
resp = retry(post, headers={'X-Account-Access-Control': 'yuck'})
resp.read()
# resp.status will be 400 if tempauth or some other ACL-aware
# auth middleware rejects it, or 200 (but silently swallowed by
# core Swift) if ACL-unaware auth middleware approves it.
# A subsequent GET should either show the old, valid data (if
# ACL-aware auth middleware is propagating it) or show nothing
# (if no auth middleware in the pipeline is ACL-aware), but should
# never return the garbage ACL.
resp = retry(get)
resp.read()
self.assertEqual(resp.status // 100, 2)
self.assertNotEqual(resp.getheader('X-Account-Access-Control'),
'yuck')
finally:
# Make sure to clean up even if tests fail -- User2 should not
# have access to User1's account in other functional tests!
resp = retry(post, headers={'X-Account-Access-Control': '{}'})
resp.read()
def test_unicode_metadata(self):
if skip:
raise SkipTest
@ -233,24 +655,24 @@ class TestAccount(unittest.TestCase):
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader(uni_key.encode('utf-8')), '1')
self.assertEqual(resp.getheader(uni_key.encode('utf-8')), '1')
resp = retry(post, 'X-Account-Meta-uni', uni_value)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('X-Account-Meta-uni'),
uni_value.encode('utf-8'))
self.assertEqual(resp.getheader('X-Account-Meta-uni'),
uni_value.encode('utf-8'))
if (web_front_end == 'integral'):
resp = retry(post, uni_key, uni_value)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader(uni_key.encode('utf-8')),
uni_value.encode('utf-8'))
self.assertEqual(resp.getheader(uni_key.encode('utf-8')),
uni_value.encode('utf-8'))
def test_multi_metadata(self):
if skip:
@ -267,19 +689,19 @@ class TestAccount(unittest.TestCase):
resp = retry(post, 'X-Account-Meta-One', '1')
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-account-meta-one'), '1')
self.assertEqual(resp.getheader('x-account-meta-one'), '1')
resp = retry(post, 'X-Account-Meta-Two', '2')
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-account-meta-one'), '1')
self.assertEquals(resp.getheader('x-account-meta-two'), '2')
self.assertEqual(resp.getheader('x-account-meta-one'), '1')
self.assertEqual(resp.getheader('x-account-meta-two'), '2')
def test_bad_metadata(self):
if skip:
@ -294,35 +716,35 @@ class TestAccount(unittest.TestCase):
resp = retry(post,
{'X-Account-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
resp = retry(
post,
{'X-Account-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
resp.read()
self.assertEquals(resp.status, 400)
self.assertEqual(resp.status, 400)
resp = retry(post,
{'X-Account-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
resp = retry(
post,
{'X-Account-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
resp.read()
self.assertEquals(resp.status, 400)
self.assertEqual(resp.status, 400)
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Account-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Account-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 400)
self.assertEqual(resp.status, 400)
headers = {}
header_value = 'k' * MAX_META_VALUE_LENGTH
@ -337,12 +759,12 @@ class TestAccount(unittest.TestCase):
'v' * (MAX_META_OVERALL_SIZE - size - 1)
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
headers['X-Account-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size)
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 400)
self.assertEqual(resp.status, 400)
if __name__ == '__main__':

File diff suppressed because it is too large Load Diff

View File

@ -19,8 +19,10 @@ import unittest
from nose import SkipTest
from uuid import uuid4
from swift.common.utils import json
from swift_testing import check_response, retry, skip, skip3, \
swift_test_perm, web_front_end
swift_test_perm, web_front_end, requires_acls, swift_test_user
class TestObject(unittest.TestCase):
@ -36,7 +38,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
self.obj = uuid4().hex
def put(url, token, parsed, conn):
@ -46,7 +48,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
def tearDown(self):
if skip:
@ -66,13 +68,13 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(list)
object_listing = resp.read()
self.assertEquals(resp.status, 200)
self.assertEqual(resp.status, 200)
# iterate over object listing and delete all objects
for obj in object_listing.splitlines():
resp = retry(delete, obj)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
# delete the container
def delete(url, token, parsed, conn):
@ -81,7 +83,33 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
def test_if_none_match(self):
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, 'if_none_match_test'), '',
{'X-Auth-Token': token,
'Content-Length': '0',
'If-None-Match': '*'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 412)
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, 'if_none_match_test'), '',
{'X-Auth-Token': token,
'Content-Length': '0',
'If-None-Match': 'somethingelse'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 400)
def test_copy_object(self):
if skip:
@ -98,8 +126,8 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get_source)
source_contents = resp.read()
self.assertEquals(resp.status, 200)
self.assertEquals(source_contents, 'test')
self.assertEqual(resp.status, 200)
self.assertEqual(source_contents, 'test')
# copy source to dest with X-Copy-From
def put(url, token, parsed, conn):
@ -110,7 +138,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
# contents of dest should be the same as source
def get_dest(url, token, parsed, conn):
@ -120,8 +148,8 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get_dest)
dest_contents = resp.read()
self.assertEquals(resp.status, 200)
self.assertEquals(dest_contents, source_contents)
self.assertEqual(resp.status, 200)
self.assertEqual(dest_contents, source_contents)
# delete the copy
def delete(url, token, parsed, conn):
@ -130,11 +158,11 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
# verify dest does not exist
resp = retry(get_dest)
resp.read()
self.assertEquals(resp.status, 404)
self.assertEqual(resp.status, 404)
# copy source to dest with COPY
def copy(url, token, parsed, conn):
@ -144,18 +172,18 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(copy)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
# contents of dest should be the same as source
resp = retry(get_dest)
dest_contents = resp.read()
self.assertEquals(resp.status, 200)
self.assertEquals(dest_contents, source_contents)
self.assertEqual(resp.status, 200)
self.assertEqual(dest_contents, source_contents)
# delete the copy
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
def test_public_object(self):
if skip:
@ -178,10 +206,10 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
resp = retry(get)
resp.read()
self.assertEquals(resp.status, 200)
self.assertEqual(resp.status, 200)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.container, '',
@ -189,7 +217,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
@ -208,7 +236,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
self.assertEqual(resp.status, 403)
# create a shared container writable by account3
shared_container = uuid4().hex
@ -222,7 +250,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
# verify third account can not copy from private container
def copy(url, token, parsed, conn):
@ -234,7 +262,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(copy, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
self.assertEqual(resp.status, 403)
# verify third account can write "obj1" to shared container
def put(url, token, parsed, conn):
@ -244,7 +272,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put, use_account=3)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
# verify third account can copy "obj1" to shared container
def copy2(url, token, parsed, conn):
@ -255,7 +283,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(copy2, use_account=3)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
# verify third account STILL can not copy from private container
def copy3(url, token, parsed, conn):
@ -267,7 +295,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(copy3, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
self.assertEqual(resp.status, 403)
# clean up "obj1"
def delete(url, token, parsed, conn):
@ -277,7 +305,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
# clean up shared_container
def delete(url, token, parsed, conn):
@ -287,8 +315,251 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEqual(resp.status, 204)
@requires_acls
def test_read_only(self):
if skip3:
raise SkipTest
def get_listing(url, token, parsed, conn):
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
{'X-Auth-Token': token})
return check_response(conn)
def post_account(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
def get(url, token, parsed, conn, name):
conn.request('GET', '%s/%s/%s' % (
parsed.path, self.container, name), '',
{'X-Auth-Token': token})
return check_response(conn)
def put(url, token, parsed, conn, name):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, name), 'test',
{'X-Auth-Token': token})
return check_response(conn)
def delete(url, token, parsed, conn, name):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, name), '',
{'X-Auth-Token': token})
return check_response(conn)
# cannot list objects
resp = retry(get_listing, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# cannot get object
resp = retry(get, self.obj, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# grant read-only access
acl_user = swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# can list objects
resp = retry(get_listing, use_account=3)
listing = resp.read()
self.assertEquals(resp.status, 200)
self.assert_(self.obj in listing)
# can get object
resp = retry(get, self.obj, use_account=3)
body = resp.read()
self.assertEquals(resp.status, 200)
self.assertEquals(body, 'test')
# can not put an object
obj_name = str(uuid4())
resp = retry(put, obj_name, use_account=3)
body = resp.read()
self.assertEquals(resp.status, 403)
# can not delete an object
resp = retry(delete, self.obj, use_account=3)
body = resp.read()
self.assertEquals(resp.status, 403)
# sanity with account1
resp = retry(get_listing, use_account=3)
listing = resp.read()
self.assertEquals(resp.status, 200)
self.assert_(obj_name not in listing)
self.assert_(self.obj in listing)
@requires_acls
def test_read_write(self):
if skip3:
raise SkipTest
def get_listing(url, token, parsed, conn):
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
{'X-Auth-Token': token})
return check_response(conn)
def post_account(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
def get(url, token, parsed, conn, name):
conn.request('GET', '%s/%s/%s' % (
parsed.path, self.container, name), '',
{'X-Auth-Token': token})
return check_response(conn)
def put(url, token, parsed, conn, name):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, name), 'test',
{'X-Auth-Token': token})
return check_response(conn)
def delete(url, token, parsed, conn, name):
conn.request('DELETE', '%s/%s/%s' % (
parsed.path, self.container, name), '',
{'X-Auth-Token': token})
return check_response(conn)
# cannot list objects
resp = retry(get_listing, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# cannot get object
resp = retry(get, self.obj, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# grant read-write access
acl_user = swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# can list objects
resp = retry(get_listing, use_account=3)
listing = resp.read()
self.assertEquals(resp.status, 200)
self.assert_(self.obj in listing)
# can get object
resp = retry(get, self.obj, use_account=3)
body = resp.read()
self.assertEquals(resp.status, 200)
self.assertEquals(body, 'test')
# can put an object
obj_name = str(uuid4())
resp = retry(put, obj_name, use_account=3)
body = resp.read()
self.assertEquals(resp.status, 201)
# can delete an object
resp = retry(delete, self.obj, use_account=3)
body = resp.read()
self.assertEquals(resp.status, 204)
# sanity with account1
resp = retry(get_listing, use_account=3)
listing = resp.read()
self.assertEquals(resp.status, 200)
self.assert_(obj_name in listing)
self.assert_(self.obj not in listing)
@requires_acls
def test_admin(self):
if skip3:
raise SkipTest
def get_listing(url, token, parsed, conn):
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
{'X-Auth-Token': token})
return check_response(conn)
def post_account(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
def get(url, token, parsed, conn, name):
conn.request('GET', '%s/%s/%s' % (
parsed.path, self.container, name), '',
{'X-Auth-Token': token})
return check_response(conn)
def put(url, token, parsed, conn, name):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, name), 'test',
{'X-Auth-Token': token})
return check_response(conn)
def delete(url, token, parsed, conn, name):
conn.request('DELETE', '%s/%s/%s' % (
parsed.path, self.container, name), '',
{'X-Auth-Token': token})
return check_response(conn)
# cannot list objects
resp = retry(get_listing, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# cannot get object
resp = retry(get, self.obj, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# grant admin access
acl_user = swift_test_user[2]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# can list objects
resp = retry(get_listing, use_account=3)
listing = resp.read()
self.assertEquals(resp.status, 200)
self.assert_(self.obj in listing)
# can get object
resp = retry(get, self.obj, use_account=3)
body = resp.read()
self.assertEquals(resp.status, 200)
self.assertEquals(body, 'test')
# can put an object
obj_name = str(uuid4())
resp = retry(put, obj_name, use_account=3)
body = resp.read()
self.assertEquals(resp.status, 201)
# can delete an object
resp = retry(delete, self.obj, use_account=3)
body = resp.read()
self.assertEquals(resp.status, 204)
# sanity with account1
resp = retry(get_listing, use_account=3)
listing = resp.read()
self.assertEquals(resp.status, 200)
self.assert_(obj_name in listing)
self.assert_(self.obj not in listing)
def test_manifest(self):
if skip:
raise SkipTest
@ -306,7 +577,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments1)):
resp = retry(put, objnum)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
# Upload the manifest
def put(url, token, parsed, conn):
@ -318,7 +589,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
# Get the manifest (should get all the segments as the body)
def get(url, token, parsed, conn):
@ -326,9 +597,9 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1))
self.assertEquals(resp.status, 200)
self.assertEquals(resp.getheader('content-type'), 'text/jibberish')
self.assertEqual(resp.read(), ''.join(segments1))
self.assertEqual(resp.status, 200)
self.assertEqual(resp.getheader('content-type'), 'text/jibberish')
# Get with a range at the start of the second segment
def get(url, token, parsed, conn):
@ -337,8 +608,8 @@ class TestObject(unittest.TestCase):
'X-Auth-Token': token, 'Range': 'bytes=3-'})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1[1:]))
self.assertEquals(resp.status, 206)
self.assertEqual(resp.read(), ''.join(segments1[1:]))
self.assertEqual(resp.status, 206)
# Get with a range in the middle of the second segment
def get(url, token, parsed, conn):
@ -347,8 +618,8 @@ class TestObject(unittest.TestCase):
'X-Auth-Token': token, 'Range': 'bytes=5-'})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1)[5:])
self.assertEquals(resp.status, 206)
self.assertEqual(resp.read(), ''.join(segments1)[5:])
self.assertEqual(resp.status, 206)
# Get with a full start and stop range
def get(url, token, parsed, conn):
@ -357,8 +628,8 @@ class TestObject(unittest.TestCase):
'X-Auth-Token': token, 'Range': 'bytes=5-10'})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1)[5:11])
self.assertEquals(resp.status, 206)
self.assertEqual(resp.read(), ''.join(segments1)[5:11])
self.assertEqual(resp.status, 206)
# Upload the second set of segments
def put(url, token, parsed, conn, objnum):
@ -369,7 +640,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments2)):
resp = retry(put, objnum)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
# Get the manifest (should still be the first segments of course)
def get(url, token, parsed, conn):
@ -377,8 +648,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1))
self.assertEquals(resp.status, 200)
self.assertEqual(resp.read(), ''.join(segments1))
self.assertEqual(resp.status, 200)
# Update the manifest
def put(url, token, parsed, conn):
@ -390,7 +661,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
# Get the manifest (should be the second set of segments now)
def get(url, token, parsed, conn):
@ -398,8 +669,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments2))
self.assertEquals(resp.status, 200)
self.assertEqual(resp.read(), ''.join(segments2))
self.assertEqual(resp.status, 200)
if not skip3:
@ -410,7 +681,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
self.assertEqual(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
@ -420,7 +691,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
@ -428,8 +699,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
self.assertEquals(resp.read(), ''.join(segments2))
self.assertEquals(resp.status, 200)
self.assertEqual(resp.read(), ''.join(segments2))
self.assertEqual(resp.status, 200)
# Create another container for the third set of segments
acontainer = uuid4().hex
@ -440,7 +711,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
# Upload the third set of segments in the other container
def put(url, token, parsed, conn, objnum):
@ -451,7 +722,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments3)):
resp = retry(put, objnum)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
# Update the manifest
def put(url, token, parsed, conn):
@ -463,7 +734,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
# Get the manifest to ensure it's the third set of segments
def get(url, token, parsed, conn):
@ -471,8 +742,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments3))
self.assertEquals(resp.status, 200)
self.assertEqual(resp.read(), ''.join(segments3))
self.assertEqual(resp.status, 200)
if not skip3:
@ -486,7 +757,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
self.assertEqual(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
@ -496,7 +767,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
@ -504,8 +775,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
self.assertEquals(resp.read(), ''.join(segments3))
self.assertEquals(resp.status, 200)
self.assertEqual(resp.read(), ''.join(segments3))
self.assertEqual(resp.status, 200)
# Delete the manifest
def delete(url, token, parsed, conn, objnum):
@ -515,7 +786,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
# Delete the third set of segments
def delete(url, token, parsed, conn, objnum):
@ -526,7 +797,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments3)):
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
# Delete the second set of segments
def delete(url, token, parsed, conn, objnum):
@ -537,7 +808,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments2)):
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
# Delete the first set of segments
def delete(url, token, parsed, conn, objnum):
@ -548,7 +819,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments1)):
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
# Delete the extra container
def delete(url, token, parsed, conn):
@ -557,7 +828,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEqual(resp.status, 204)
def test_delete_content_type(self):
if skip:
@ -569,7 +840,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
@ -577,9 +848,9 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEquals(resp.getheader('Content-Type'),
'text/html; charset=UTF-8')
self.assertEqual(resp.status, 204)
self.assertEqual(resp.getheader('Content-Type'),
'text/html; charset=UTF-8')
def test_delete_if_delete_at_bad(self):
if skip:
@ -592,7 +863,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
self.assertEqual(resp.status, 201)
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
@ -601,7 +872,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 400)
self.assertEqual(resp.status, 400)
def test_null_name(self):
if skip:
@ -614,10 +885,121 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
if (web_front_end == 'apache2'):
self.assertEquals(resp.status, 404)
self.assertEqual(resp.status, 404)
else:
self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
self.assertEquals(resp.status, 412)
self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL')
self.assertEqual(resp.status, 412)
def test_cors(self):
if skip:
raise SkipTest
def is_strict_mode(url, token, parsed, conn):
conn.request('GET', '/info')
resp = conn.getresponse()
if resp.status // 100 == 2:
info = json.loads(resp.read())
return info.get('swift', {}).get('strict_cors_mode', False)
return False
def put_cors_cont(url, token, parsed, conn, orig):
conn.request(
'PUT', '%s/%s' % (parsed.path, self.container),
'', {'X-Auth-Token': token,
'X-Container-Meta-Access-Control-Allow-Origin': orig})
return check_response(conn)
def put_obj(url, token, parsed, conn, obj):
conn.request(
'PUT', '%s/%s/%s' % (parsed.path, self.container, obj),
'test', {'X-Auth-Token': token})
return check_response(conn)
def check_cors(url, token, parsed, conn,
method, obj, headers):
if method != 'OPTIONS':
headers['X-Auth-Token'] = token
conn.request(
method, '%s/%s/%s' % (parsed.path, self.container, obj),
'', headers)
return conn.getresponse()
strict_cors = retry(is_strict_mode)
resp = retry(put_cors_cont, '*')
resp.read()
self.assertEquals(resp.status // 100, 2)
resp = retry(put_obj, 'cat')
resp.read()
self.assertEquals(resp.status // 100, 2)
resp = retry(check_cors,
'OPTIONS', 'cat', {'Origin': 'http://m.com'})
self.assertEquals(resp.status, 401)
resp = retry(check_cors,
'OPTIONS', 'cat',
{'Origin': 'http://m.com',
'Access-Control-Request-Method': 'GET'})
self.assertEquals(resp.status, 200)
resp.read()
headers = dict((k.lower(), v) for k, v in resp.getheaders())
self.assertEquals(headers.get('access-control-allow-origin'),
'*')
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://m.com'})
self.assertEquals(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
self.assertEquals(headers.get('access-control-allow-origin'),
'*')
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://m.com',
'X-Web-Mode': 'True'})
self.assertEquals(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
self.assertEquals(headers.get('access-control-allow-origin'),
'*')
####################
resp = retry(put_cors_cont, 'http://secret.com')
resp.read()
self.assertEquals(resp.status // 100, 2)
resp = retry(check_cors,
'OPTIONS', 'cat',
{'Origin': 'http://m.com',
'Access-Control-Request-Method': 'GET'})
resp.read()
self.assertEquals(resp.status, 401)
if strict_cors:
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://m.com'})
resp.read()
self.assertEquals(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
self.assertTrue('access-control-allow-origin' not in headers)
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://secret.com'})
resp.read()
self.assertEquals(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
self.assertEquals(headers.get('access-control-allow-origin'),
'http://secret.com')
else:
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://m.com'})
resp.read()
self.assertEquals(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
self.assertEquals(headers.get('access-control-allow-origin'),
'http://m.com')
if __name__ == '__main__':

View File

@ -19,14 +19,16 @@
from datetime import datetime
import os
import hashlib
import hmac
import json
import locale
import random
import StringIO
import time
import threading
import uuid
import unittest
import urllib
import uuid
from nose import SkipTest
from ConfigParser import ConfigParser
@ -36,7 +38,7 @@ from test.functional.swift_test_client import Account, Connection, File, \
from swift.common.constraints import MAX_FILE_SIZE, MAX_META_NAME_LENGTH, \
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, \
MAX_OBJECT_NAME_LENGTH, CONTAINER_LISTING_LIMIT, ACCOUNT_LISTING_LIMIT, \
MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH
MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, MAX_HEADER_SIZE
from gluster.swift.common.constraints import \
set_object_name_component_length, get_object_name_component_length
@ -50,7 +52,8 @@ default_constraints = dict((
('container_listing_limit', CONTAINER_LISTING_LIMIT),
('account_listing_limit', ACCOUNT_LISTING_LIMIT),
('max_account_name_length', MAX_ACCOUNT_NAME_LENGTH),
('max_container_name_length', MAX_CONTAINER_NAME_LENGTH)))
('max_container_name_length', MAX_CONTAINER_NAME_LENGTH),
('max_header_size', MAX_HEADER_SIZE)))
constraints_conf = ConfigParser()
conf_exists = constraints_conf.read('/etc/swift/swift.conf')
# Constraints are set first from the test config, then from
@ -285,7 +288,7 @@ class TestAccount(Base):
if try_count < 5:
time.sleep(1)
self.assertEquals(info['container_count'], len(self.env.containers))
self.assertEqual(info['container_count'], len(self.env.containers))
self.assert_status(204)
def testContainerSerializedInfo(self):
@ -309,11 +312,11 @@ class TestAccount(Base):
headers = dict(self.env.conn.response.getheaders())
if format_type == 'json':
self.assertEquals(headers['content-type'],
'application/json; charset=utf-8')
self.assertEqual(headers['content-type'],
'application/json; charset=utf-8')
elif format_type == 'xml':
self.assertEquals(headers['content-type'],
'application/xml; charset=utf-8')
self.assertEqual(headers['content-type'],
'application/xml; charset=utf-8')
def testListingLimit(self):
limit = load_constraint('account_listing_limit')
@ -337,7 +340,7 @@ class TestAccount(Base):
if isinstance(b[0], dict):
b = [x['name'] for x in b]
self.assertEquals(a, b)
self.assertEqual(a, b)
def testInvalidAuthToken(self):
hdrs = {'X-Auth-Token': 'bogus_auth_token'}
@ -347,12 +350,12 @@ class TestAccount(Base):
def testLastContainerMarker(self):
for format_type in [None, 'json', 'xml']:
containers = self.env.account.containers({'format': format_type})
self.assertEquals(len(containers), len(self.env.containers))
self.assertEqual(len(containers), len(self.env.containers))
self.assert_status(200)
containers = self.env.account.containers(
parms={'format': format_type, 'marker': containers[-1]})
self.assertEquals(len(containers), 0)
self.assertEqual(len(containers), 0)
if format_type is None:
self.assert_status(204)
else:
@ -380,8 +383,8 @@ class TestAccount(Base):
parms={'format': format_type})
if isinstance(containers[0], dict):
containers = [x['name'] for x in containers]
self.assertEquals(sorted(containers, cmp=locale.strcoll),
containers)
self.assertEqual(sorted(containers, cmp=locale.strcoll),
containers)
class TestAccountUTF8(Base2, TestAccount):
@ -518,13 +521,13 @@ class TestContainer(Base):
for format_type in [None, 'json', 'xml']:
for prefix in prefixs:
files = cont.files(parms={'prefix': prefix})
self.assertEquals(files, sorted(prefix_files[prefix]))
self.assertEqual(files, sorted(prefix_files[prefix]))
for format_type in [None, 'json', 'xml']:
for prefix in prefixs:
files = cont.files(parms={'limit': limit_count,
'prefix': prefix})
self.assertEquals(len(files), limit_count)
self.assertEqual(len(files), limit_count)
for file_item in files:
self.assert_(file_item.startswith(prefix))
@ -548,7 +551,7 @@ class TestContainer(Base):
container = self.env.account.container(valid_utf8)
self.assert_(container.create(cfg={'no_path_quote': True}))
self.assert_(container.name in self.env.account.containers())
self.assertEquals(container.files(), [])
self.assertEqual(container.files(), [])
self.assert_(container.delete())
container = self.env.account.container(invalid_utf8)
@ -614,12 +617,12 @@ class TestContainer(Base):
def testLastFileMarker(self):
for format_type in [None, 'json', 'xml']:
files = self.env.container.files({'format': format_type})
self.assertEquals(len(files), len(self.env.files))
self.assertEqual(len(files), len(self.env.files))
self.assert_status(200)
files = self.env.container.files(
parms={'format': format_type, 'marker': files[-1]})
self.assertEquals(len(files), 0)
self.assertEqual(len(files), 0)
if format_type is None:
self.assert_status(204)
@ -665,14 +668,14 @@ class TestContainer(Base):
files = self.env.container.files(parms={'format': format_type})
if isinstance(files[0], dict):
files = [x['name'] for x in files]
self.assertEquals(sorted(files, cmp=locale.strcoll), files)
self.assertEqual(sorted(files, cmp=locale.strcoll), files)
def testContainerInfo(self):
info = self.env.container.info()
self.assert_status(204)
self.assertEquals(info['object_count'], self.env.file_count)
self.assertEquals(info['bytes_used'],
self.env.file_count * self.env.file_size)
self.assertEqual(info['object_count'], self.env.file_count)
self.assertEqual(info['bytes_used'],
self.env.file_count * self.env.file_size)
def testContainerInfoOnContainerThatDoesNotExist(self):
container = self.env.account.container(Utils.create_name())
@ -683,7 +686,7 @@ class TestContainer(Base):
for format_type in [None, 'json', 'xml']:
files = self.env.container.files(parms={'format': format_type,
'limit': 2})
self.assertEquals(len(files), 2)
self.assertEqual(len(files), 2)
def testTooLongName(self):
cont = self.env.account.container('x' * 257)
@ -838,7 +841,7 @@ class TestContainerPaths(Base):
if isinstance(files[0], dict):
files = [str(x['name']) for x in files]
self.assertEquals(files, self.env.stored_files)
self.assertEqual(files, self.env.stored_files)
for format_type in ('json', 'xml'):
for file_item in self.env.container.files(parms={'format':
@ -846,13 +849,13 @@ class TestContainerPaths(Base):
self.assert_(int(file_item['bytes']) >= 0)
self.assert_('last_modified' in file_item)
if file_item['name'].endswith('/'):
self.assertEquals(file_item['content_type'],
'application/directory')
self.assertEqual(file_item['content_type'],
'application/directory')
def testStructure(self):
def assert_listing(path, file_list):
files = self.env.container.files(parms={'path': path})
self.assertEquals(sorted(file_list, cmp=locale.strcoll), files)
self.assertEqual(sorted(file_list, cmp=locale.strcoll), files)
if not normalized_urls:
assert_listing('/', ['/dir1/', '/dir2/', '/file1', '/file A'])
assert_listing('/dir1',
@ -1176,7 +1179,7 @@ class TestFile(Base):
for i in container.files(parms={'format': 'json'}):
file_types_read[i['name'].split('.')[1]] = i['content_type']
self.assertEquals(file_types, file_types_read)
self.assertEqual(file_types, file_types_read)
def testRangedGets(self):
file_length = 10000
@ -1201,7 +1204,7 @@ class TestFile(Base):
self.assertRaises(ResponseError, file_item.read, hdrs=hdrs)
self.assert_status(416)
else:
self.assertEquals(file_item.read(hdrs=hdrs), data[-i:])
self.assertEqual(file_item.read(hdrs=hdrs), data[-i:])
range_string = 'bytes=%d-' % (i)
hdrs = {'Range': range_string}
@ -1350,9 +1353,9 @@ class TestFile(Base):
info = file_item.info()
self.assert_status(200)
self.assertEquals(info['content_length'], self.env.file_size)
self.assertEquals(info['etag'], md5)
self.assertEquals(info['content_type'], content_type)
self.assertEqual(info['content_length'], self.env.file_size)
self.assertEqual(info['etag'], md5)
self.assertEqual(info['content_type'], content_type)
self.assert_('last_modified' in info)
def testDeleteOfFileThatDoesNotExist(self):
@ -1395,7 +1398,7 @@ class TestFile(Base):
file_item = self.env.container.file(file_item.name)
self.assert_(file_item.initialize())
self.assert_status(200)
self.assertEquals(file_item.metadata, metadata)
self.assertEqual(file_item.metadata, metadata)
def testGetContentType(self):
file_name = Utils.create_name()
@ -1408,7 +1411,7 @@ class TestFile(Base):
file_item = self.env.container.file(file_name)
file_item.read()
self.assertEquals(content_type, file_item.content_type)
self.assertEqual(content_type, file_item.content_type)
def testGetOnFileThatDoesNotExist(self):
# in container that exists
@ -1449,7 +1452,7 @@ class TestFile(Base):
file_item = self.env.container.file(file_item.name)
self.assert_(file_item.initialize())
self.assert_status(200)
self.assertEquals(file_item.metadata, metadata)
self.assertEqual(file_item.metadata, metadata)
def testSerialization(self):
container = self.env.account.container(Utils.create_name())
@ -1478,9 +1481,9 @@ class TestFile(Base):
if f['name'] != file_item['name']:
continue
self.assertEquals(file_item['content_type'],
f['content_type'])
self.assertEquals(int(file_item['bytes']), f['bytes'])
self.assertEqual(file_item['content_type'],
f['content_type'])
self.assertEqual(int(file_item['bytes']), f['bytes'])
d = datetime.strptime(
file_item['last_modified'].split('.')[0],
@ -1488,7 +1491,7 @@ class TestFile(Base):
lm = time.mktime(d.timetuple())
if 'last_modified' in f:
self.assertEquals(f['last_modified'], lm)
self.assertEqual(f['last_modified'], lm)
else:
f['last_modified'] = lm
@ -1500,11 +1503,11 @@ class TestFile(Base):
headers = dict(self.env.conn.response.getheaders())
if format_type == 'json':
self.assertEquals(headers['content-type'],
'application/json; charset=utf-8')
self.assertEqual(headers['content-type'],
'application/json; charset=utf-8')
elif format_type == 'xml':
self.assertEquals(headers['content-type'],
'application/xml; charset=utf-8')
self.assertEqual(headers['content-type'],
'application/xml; charset=utf-8')
lm_diff = max([f['last_modified'] for f in files]) -\
min([f['last_modified'] for f in files])
@ -1547,7 +1550,7 @@ class TestFile(Base):
self.assert_('etag' in headers.keys())
header_etag = headers['etag'].strip('"')
self.assertEquals(etag, header_etag)
self.assertEqual(etag, header_etag)
def testChunkedPut(self):
if (web_front_end == 'apache2'):
@ -1565,7 +1568,7 @@ class TestFile(Base):
self.assert_(data == file_item.read())
info = file_item.info()
self.assertEquals(etag, info['etag'])
self.assertEqual(etag, info['etag'])
class TestFileUTF8(Base2, TestFile):
@ -1677,12 +1680,30 @@ class TestDlo(Base):
file_contents,
"aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffff")
def test_copy_manifest(self):
# Copying the manifest should result in another manifest
try:
man1_item = self.env.container.file('man1')
man1_item.copy(self.env.container.name, "copied-man1",
parms={'multipart-manifest': 'get'})
copied = self.env.container.file("copied-man1")
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
self.assertEqual(copied_contents, "man1-contents")
copied_contents = copied.read()
self.assertEqual(
copied_contents,
"aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee")
finally:
# try not to leave this around for other tests to stumble over
self.env.container.file("copied-man1").delete()
class TestDloUTF8(Base2, TestDlo):
set_up = False
class TestFileComparisonEnv:
class TestFileComparisonEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
@ -1806,19 +1827,8 @@ class TestSloEnv(object):
cls.conn.authenticate()
if cls.slo_enabled is None:
status = cls.conn.make_request('GET', '/info',
cfg={'verbatim_path': True})
if not (200 <= status <= 299):
# Can't tell if SLO is enabled or not since we're running
# against an old cluster, so let's skip the tests instead of
# possibly having spurious failures.
cls.slo_enabled = False
else:
# Don't bother looking for ValueError here. If something is
# responding to a GET /info request with invalid JSON, then
# the cluster is broken and a test failure will let us know.
cluster_info = json.loads(cls.conn.response.read())
cls.slo_enabled = 'slo' in cluster_info
cluster_info = cls.conn.cluster_info()
cls.slo_enabled = 'slo' in cluster_info
if not cls.slo_enabled:
return
@ -2034,5 +2044,347 @@ class TestSloUTF8(Base2, TestSlo):
set_up = False
class TestObjectVersioningEnv(object):
versioning_enabled = None # tri-state: None initially, then True/False
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn.authenticate()
cls.account = Account(cls.conn, config.get('account',
config['username']))
# avoid getting a prefix that stops halfway through an encoded
# character
prefix = Utils.create_name().decode("utf-8")[:10].encode("utf-8")
cls.versions_container = cls.account.container(prefix + "-versions")
if not cls.versions_container.create():
raise ResponseError(cls.conn.response)
cls.container = cls.account.container(prefix + "-objs")
if not cls.container.create(
hdrs={'X-Versions-Location': cls.versions_container.name}):
raise ResponseError(cls.conn.response)
container_info = cls.container.info()
# if versioning is off, then X-Versions-Location won't persist
cls.versioning_enabled = 'versions' in container_info
class TestObjectVersioning(Base):
env = TestObjectVersioningEnv
set_up = False
def setUp(self):
super(TestObjectVersioning, self).setUp()
if self.env.versioning_enabled is False:
raise SkipTest("Object versioning not enabled")
elif self.env.versioning_enabled is not True:
# just some sanity checking
raise Exception(
"Expected versioning_enabled to be True/False, got %r" %
(self.env.versioning_enabled,))
def test_overwriting(self):
container = self.env.container
versions_container = self.env.versions_container
obj_name = Utils.create_name()
versioned_obj = container.file(obj_name)
versioned_obj.write("aaaaa")
self.assertEqual(0, versions_container.info()['object_count'])
versioned_obj.write("bbbbb")
# the old version got saved off
self.assertEqual(1, versions_container.info()['object_count'])
versioned_obj_name = versions_container.files()[0]
self.assertEqual(
"aaaaa", versions_container.file(versioned_obj_name).read())
# if we overwrite it again, there are two versions
versioned_obj.write("ccccc")
self.assertEqual(2, versions_container.info()['object_count'])
# as we delete things, the old contents return
self.assertEqual("ccccc", versioned_obj.read())
versioned_obj.delete()
self.assertEqual("bbbbb", versioned_obj.read())
versioned_obj.delete()
self.assertEqual("aaaaa", versioned_obj.read())
versioned_obj.delete()
self.assertRaises(ResponseError, versioned_obj.read)
class TestObjectVersioningUTF8(Base2, TestObjectVersioning):
set_up = False
class TestTempurlEnv(object):
tempurl_enabled = None # tri-state: None initially, then True/False
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn.authenticate()
if cls.tempurl_enabled is None:
cluster_info = cls.conn.cluster_info()
cls.tempurl_enabled = 'tempurl' in cluster_info
if not cls.tempurl_enabled:
return
cls.tempurl_methods = cluster_info['tempurl']['methods']
cls.tempurl_key = Utils.create_name()
cls.tempurl_key2 = Utils.create_name()
cls.account = Account(
cls.conn, config.get('account', config['username']))
cls.account.delete_containers()
cls.account.update_metadata({
'temp-url-key': cls.tempurl_key,
'temp-url-key-2': cls.tempurl_key2
})
cls.container = cls.account.container(Utils.create_name())
if not cls.container.create():
raise ResponseError(cls.conn.response)
cls.obj = cls.container.file(Utils.create_name())
cls.obj.write("obj contents")
cls.other_obj = cls.container.file(Utils.create_name())
cls.other_obj.write("other obj contents")
class TestTempurl(Base):
env = TestTempurlEnv
set_up = False
def setUp(self):
super(TestTempurl, self).setUp()
if self.env.tempurl_enabled is False:
raise SkipTest("TempURL not enabled")
elif self.env.tempurl_enabled is not True:
# just some sanity checking
raise Exception(
"Expected tempurl_enabled to be True/False, got %r" %
(self.env.tempurl_enabled,))
expires = int(time.time()) + 86400
sig = self.tempurl_sig(
'GET', expires, self.env.conn.make_path(self.env.obj.path),
self.env.tempurl_key)
self.obj_tempurl_parms = {'temp_url_sig': sig,
'temp_url_expires': str(expires)}
def tempurl_sig(self, method, expires, path, key):
return hmac.new(
key,
'%s\n%s\n%s' % (method, expires, urllib.unquote(path)),
hashlib.sha1).hexdigest()
def test_GET(self):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
# GET tempurls also allow HEAD requests
self.assert_(self.env.obj.info(parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True}))
def test_GET_with_key_2(self):
expires = int(time.time()) + 86400
sig = self.tempurl_sig(
'GET', expires, self.env.conn.make_path(self.env.obj.path),
self.env.tempurl_key2)
parms = {'temp_url_sig': sig,
'temp_url_expires': str(expires)}
contents = self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
def test_PUT(self):
new_obj = self.env.container.file(Utils.create_name())
expires = int(time.time()) + 86400
sig = self.tempurl_sig(
'PUT', expires, self.env.conn.make_path(new_obj.path),
self.env.tempurl_key)
put_parms = {'temp_url_sig': sig,
'temp_url_expires': str(expires)}
new_obj.write('new obj contents',
parms=put_parms, cfg={'no_auth_token': True})
self.assertEqual(new_obj.read(), "new obj contents")
# PUT tempurls also allow HEAD requests
self.assert_(new_obj.info(parms=put_parms,
cfg={'no_auth_token': True}))
def test_HEAD(self):
expires = int(time.time()) + 86400
sig = self.tempurl_sig(
'HEAD', expires, self.env.conn.make_path(self.env.obj.path),
self.env.tempurl_key)
head_parms = {'temp_url_sig': sig,
'temp_url_expires': str(expires)}
self.assert_(self.env.obj.info(parms=head_parms,
cfg={'no_auth_token': True}))
# HEAD tempurls don't allow PUT or GET requests, despite the fact that
# PUT and GET tempurls both allow HEAD requests
self.assertRaises(ResponseError, self.env.other_obj.read,
cfg={'no_auth_token': True},
parms=self.obj_tempurl_parms)
self.assert_status([401])
self.assertRaises(ResponseError, self.env.other_obj.write,
'new contents',
cfg={'no_auth_token': True},
parms=self.obj_tempurl_parms)
self.assert_status([401])
def test_different_object(self):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertRaises(ResponseError, self.env.other_obj.read,
cfg={'no_auth_token': True},
parms=self.obj_tempurl_parms)
self.assert_status([401])
def test_changing_sig(self):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
parms = self.obj_tempurl_parms.copy()
if parms['temp_url_sig'][0] == 'a':
parms['temp_url_sig'] = 'b' + parms['temp_url_sig'][1:]
else:
parms['temp_url_sig'] = 'a' + parms['temp_url_sig'][1:]
self.assertRaises(ResponseError, self.env.obj.read,
cfg={'no_auth_token': True},
parms=parms)
self.assert_status([401])
def test_changing_expires(self):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
parms = self.obj_tempurl_parms.copy()
if parms['temp_url_expires'][-1] == '0':
parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '1'
else:
parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '0'
self.assertRaises(ResponseError, self.env.obj.read,
cfg={'no_auth_token': True},
parms=parms)
self.assert_status([401])
class TestTempurlUTF8(Base2, TestTempurl):
set_up = False
class TestSloTempurlEnv(object):
enabled = None # tri-state: None initially, then True/False
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn.authenticate()
if cls.enabled is None:
cluster_info = cls.conn.cluster_info()
cls.enabled = 'tempurl' in cluster_info and 'slo' in cluster_info
cls.tempurl_key = Utils.create_name()
cls.account = Account(
cls.conn, config.get('account', config['username']))
cls.account.delete_containers()
cls.account.update_metadata({'temp-url-key': cls.tempurl_key})
cls.manifest_container = cls.account.container(Utils.create_name())
cls.segments_container = cls.account.container(Utils.create_name())
if not cls.manifest_container.create():
raise ResponseError(cls.conn.response)
if not cls.segments_container.create():
raise ResponseError(cls.conn.response)
seg1 = cls.segments_container.file(Utils.create_name())
seg1.write('1' * 1024 * 1024)
seg2 = cls.segments_container.file(Utils.create_name())
seg2.write('2' * 1024 * 1024)
cls.manifest_data = [{'size_bytes': 1024 * 1024,
'etag': seg1.md5,
'path': '/%s/%s' % (cls.segments_container.name,
seg1.name)},
{'size_bytes': 1024 * 1024,
'etag': seg2.md5,
'path': '/%s/%s' % (cls.segments_container.name,
seg2.name)}]
cls.manifest = cls.manifest_container.file(Utils.create_name())
cls.manifest.write(
json.dumps(cls.manifest_data),
parms={'multipart-manifest': 'put'})
class TestSloTempurl(Base):
env = TestSloTempurlEnv
set_up = False
def setUp(self):
super(TestSloTempurl, self).setUp()
if self.env.enabled is False:
raise SkipTest("TempURL and SLO not both enabled")
elif self.env.enabled is not True:
# just some sanity checking
raise Exception(
"Expected enabled to be True/False, got %r" %
(self.env.enabled,))
def tempurl_sig(self, method, expires, path, key):
return hmac.new(
key,
'%s\n%s\n%s' % (method, expires, urllib.unquote(path)),
hashlib.sha1).hexdigest()
def test_GET(self):
expires = int(time.time()) + 86400
sig = self.tempurl_sig(
'GET', expires, self.env.conn.make_path(self.env.manifest.path),
self.env.tempurl_key)
parms = {'temp_url_sig': sig, 'temp_url_expires': str(expires)}
contents = self.env.manifest.read(
parms=parms,
cfg={'no_auth_token': True})
self.assertEqual(len(contents), 2 * 1024 * 1024)
# GET tempurls also allow HEAD requests
self.assert_(self.env.manifest.info(
parms=parms, cfg={'no_auth_token': True}))
class TestSloTempurlUTF8(Base2, TestSloTempurl):
set_up = False
if __name__ == '__main__':
unittest.main()

View File

@ -33,6 +33,7 @@ from hashlib import md5
from eventlet import sleep, Timeout
import logging.handlers
from httplib import HTTPException
from numbers import Number
class FakeRing(object):
@ -248,6 +249,7 @@ class FakeLogger(logging.Logger):
if 'facility' in kwargs:
self.facility = kwargs['facility']
self.statsd_client = None
self.thread_locals = None
def _clear(self):
self.log_dict = defaultdict(list)
@ -465,8 +467,11 @@ def fake_http_connect(*code_iter, **kwargs):
self.body = body
self.headers = headers or {}
self.timestamp = timestamp
if kwargs.get('slow') and isinstance(kwargs['slow'], list):
kwargs['slow'][0] -= 1
if 'slow' in kwargs and isinstance(kwargs['slow'], list):
try:
self._next_sleep = kwargs['slow'].pop(0)
except IndexError:
self._next_sleep = None
def getresponse(self):
if kwargs.get('raise_exc'):
@ -482,6 +487,8 @@ def fake_http_connect(*code_iter, **kwargs):
return FakeConn(507)
if self.expect_status == -4:
return FakeConn(201)
if self.expect_status == 412:
return FakeConn(412)
return FakeConn(100)
def getheaders(self):
@ -510,31 +517,39 @@ def fake_http_connect(*code_iter, **kwargs):
headers['x-container-timestamp'] = '1'
except StopIteration:
pass
if self.am_slow():
am_slow, value = self.get_slow()
if am_slow:
headers['content-length'] = '4'
headers.update(self.headers)
return headers.items()
def am_slow(self):
if kwargs.get('slow') and isinstance(kwargs['slow'], list):
return kwargs['slow'][0] >= 0
return bool(kwargs.get('slow'))
def get_slow(self):
if 'slow' in kwargs and isinstance(kwargs['slow'], list):
if self._next_sleep is not None:
return True, self._next_sleep
else:
return False, 0.01
if kwargs.get('slow') and isinstance(kwargs['slow'], Number):
return True, kwargs['slow']
return bool(kwargs.get('slow')), 0.1
def read(self, amt=None):
if self.am_slow():
am_slow, value = self.get_slow()
if am_slow:
if self.sent < 4:
self.sent += 1
sleep(0.1)
sleep(value)
return ' '
rv = self.body[:amt]
self.body = self.body[amt:]
return rv
def send(self, amt=None):
if self.am_slow():
am_slow, value = self.get_slow()
if am_slow:
if self.received < 4:
self.received += 1
sleep(0.1)
sleep(value)
def getheader(self, name, default=None):
return dict(self.getheaders()).get(name.lower(), default)
@ -584,4 +599,6 @@ def fake_http_connect(*code_iter, **kwargs):
return FakeConn(status, etag, body=body, timestamp=timestamp,
expect_status=expect_status, headers=headers)
connect.code_iter = code_iter
return connect

View File

@ -57,10 +57,10 @@ class TestConstraints(unittest.TestCase):
cnt.set_object_name_component_length()
self.assertEqual(len, cnt.get_object_name_component_length())
with patch('swift.common.constraints.constraints_conf_int',
mock_constraints_conf_int):
cnt.set_object_name_component_length()
self.assertEqual(cnt.get_object_name_component_length(), 1000)
with patch('swift.common.constraints.constraints_conf_int',
mock_constraints_conf_int):
cnt.set_object_name_component_length()
self.assertEqual(cnt.get_object_name_component_length(), 1000)
def test_validate_obj_name_component(self):
max_obj_len = cnt.get_object_name_component_length()

View File

@ -46,7 +46,7 @@ class TestObjectExpirer(TestCase):
self.old_loadapp = internal_client.loadapp
self.old_sleep = internal_client.sleep
internal_client.loadapp = lambda x: None
internal_client.loadapp = lambda *a, **kw: None
internal_client.sleep = not_sleep
def teardown(self):
@ -618,7 +618,7 @@ class TestObjectExpirer(TestCase):
start_response('204 No Content', [('Content-Length', '0')])
return []
internal_client.loadapp = lambda x: fake_app
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
ts = '1234'
@ -635,7 +635,7 @@ class TestObjectExpirer(TestCase):
start_response('204 No Content', [('Content-Length', '0')])
return []
internal_client.loadapp = lambda x: fake_app
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
ts = '1234'
@ -649,7 +649,7 @@ class TestObjectExpirer(TestCase):
start_response('404 Not Found', [('Content-Length', '0')])
return []
internal_client.loadapp = lambda x: fake_app
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
x.delete_actual_object('/path/to/object', '1234')
@ -661,7 +661,7 @@ class TestObjectExpirer(TestCase):
[('Content-Length', '0')])
return []
internal_client.loadapp = lambda x: fake_app
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
x.delete_actual_object('/path/to/object', '1234')
@ -674,7 +674,7 @@ class TestObjectExpirer(TestCase):
[('Content-Length', '0')])
return []
internal_client.loadapp = lambda x: fake_app
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
exc = None

View File

@ -22,7 +22,7 @@ import mock
import swift
from swift.proxy import server as proxy_server
from swift.common.swob import HTTPException
from test.unit import FakeRing, FakeMemcache, fake_http_connect
from test.unit import FakeRing, FakeMemcache, fake_http_connect, debug_logger
@contextmanager
@ -90,26 +90,96 @@ class TestObjControllerWriteAffinity(unittest.TestCase):
class TestObjController(unittest.TestCase):
def setUp(self):
logger = debug_logger('proxy-server')
logger.thread_locals = ('txn1', '127.0.0.2')
self.app = proxy_server.Application(
None, FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), object_ring=FakeRing(),
logger=logger)
self.controller = proxy_server.ObjectController(self.app,
'a', 'c', 'o')
self.controller.container_info = mock.MagicMock(return_value={
'partition': 1,
'nodes': [
{'ip': '127.0.0.1', 'port': '1', 'device': 'sda'},
{'ip': '127.0.0.1', 'port': '2', 'device': 'sda'},
{'ip': '127.0.0.1', 'port': '3', 'device': 'sda'},
],
'write_acl': None,
'read_acl': None,
'sync_key': None,
'versions': None})
def test_PUT_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req.headers['content-length'] = '0'
with set_http_connect(201, 201, 201):
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 201)
def test_PUT_if_none_match(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req.headers['if-none-match'] = '*'
req.headers['content-length'] = '0'
with set_http_connect(201, 201, 201):
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 201)
def test_PUT_if_none_match_denied(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req.headers['if-none-match'] = '*'
req.headers['content-length'] = '0'
with set_http_connect(201, (412, 412), 201):
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 412)
def test_PUT_if_none_match_not_star(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req.headers['if-none-match'] = 'somethingelse'
req.headers['content-length'] = '0'
with set_http_connect(201, 201, 201):
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 400)
def test_GET_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(200):
resp = self.controller.GET(req)
self.assertEquals(resp.status_int, 200)
def test_DELETE_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(204, 204, 204):
resp = self.controller.DELETE(req)
self.assertEquals(resp.status_int, 204)
def test_POST_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(200, 200, 200, 201, 201, 201):
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 202)
def test_COPY_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(200, 200, 200, 201, 201, 201):
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 202)
def test_HEAD_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(200, 200, 200, 201, 201, 201):
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 202)
def test_PUT_log_info(self):
# mock out enough to get to the area of the code we want to test
with mock.patch('swift.proxy.controllers.obj.check_object_creation',
mock.MagicMock(return_value=None)):
app = mock.MagicMock()
app.container_ring.get_nodes.return_value = (1, [2])
app.object_ring.get_nodes.return_value = (1, [2])
controller = proxy_server.ObjectController(app, 'a', 'c', 'o')
controller.container_info = mock.MagicMock(return_value={
'partition': 1,
'nodes': [{}],
'write_acl': None,
'sync_key': None,
'versions': None})
# and now test that we add the header to log_info
req = swift.common.swob.Request.blank('/v1/a/c/o')
req.headers['x-copy-from'] = 'somewhere'
try:
controller.PUT(req)
self.controller.PUT(req)
except HTTPException:
pass
self.assertEquals(
@ -119,7 +189,7 @@ class TestObjController(unittest.TestCase):
req.method = 'POST'
req.headers['x-copy-from'] = 'elsewhere'
try:
controller.PUT(req)
self.controller.PUT(req)
except HTTPException:
pass
self.assertEquals(req.environ.get('swift.log_info'), None)

File diff suppressed because it is too large Load Diff

View File

@ -15,10 +15,10 @@ setenv = VIRTUAL_ENV={envdir}
NOSE_OPENSTACK_SHOW_ELAPSED=1
NOSE_OPENSTACK_STDOUT=1
deps =
https://launchpad.net/gluster-swift/icehouse/1.13.0/+download/swift-1.13.0.tar.gz
https://launchpad.net/swift/icehouse/1.13.1/+download/swift-1.13.1.tar.gz
--download-cache={homedir}/.pipcache
-r{toxinidir}/tools/test-requires
-r{toxinidir}/tools/requirements.txt
-r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
changedir = {toxinidir}/test/unit
commands = nosetests -v --exe --with-xunit --with-coverage --cover-package gluster --cover-erase --cover-xml --cover-html --cover-branches --with-html-output {posargs}
@ -41,7 +41,7 @@ commands = bash tools/swkrbath_functional_tests.sh
[testenv:pep8]
deps =
--download-cache={homedir}/.pipcache
-r{toxinidir}/tools/test-requires
-r{toxinidir}/test-requirements.txt
changedir = {toxinidir}
commands =
flake8