Read the configuration once for all func tests

Merge the swift_testing module into the functional test module itself,
so that we can read the configuration once for all unit tests, sharing
the same constraints.

Change-Id: I9fbbfdade9adca329cd79f7d4291ba009327c842
This commit is contained in:
Peter Portante 2014-03-31 23:22:49 -04:00
parent 5ea6c88128
commit 44b39417ba
6 changed files with 383 additions and 392 deletions

View File

@ -13,10 +13,124 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import socket
import locale
import functools
from time import sleep
from httplib import HTTPException
from urlparse import urlparse
from nose import SkipTest
from swift.common import constraints
from swiftclient import get_auth, http_connection
from test import get_config
from test.functional.swift_test_client import Connection
config = get_config('func_test')
for k in constraints.DEFAULT_CONSTRAINTS:
if k in config:
# prefer what's in test.conf
config[k] = int(config[k])
elif constraints.SWIFT_CONSTRAINTS_LOADED:
# swift.conf exists, so use what's defined there (or swift defaults)
# This normally happens when the test is running locally to the cluster
# as in a SAIO.
config[k] = constraints.EFFECTIVE_CONSTRAINTS[k]
else:
# .functests don't know what the constraints of the tested cluster are,
# so the tests can't reliably pass or fail. Therefore, skip those
# tests.
config[k] = '%s constraint is not defined' % k
web_front_end = config.get('web_front_end', 'integral')
normalized_urls = config.get('normalized_urls', False)
# If no config was read, we will fall back to old school env vars
swift_test_auth = os.environ.get('SWIFT_TEST_AUTH')
swift_test_user = [os.environ.get('SWIFT_TEST_USER'), None, None]
swift_test_key = [os.environ.get('SWIFT_TEST_KEY'), None, None]
swift_test_tenant = ['', '', '']
swift_test_perm = ['', '', '']
if config:
swift_test_auth_version = str(config.get('auth_version', '1'))
swift_test_auth = 'http'
if config.get('auth_ssl', 'no').lower() in ('yes', 'true', 'on', '1'):
swift_test_auth = 'https'
if 'auth_prefix' not in config:
config['auth_prefix'] = '/'
try:
suffix = '://%(auth_host)s:%(auth_port)s%(auth_prefix)s' % config
swift_test_auth += suffix
except KeyError:
pass # skip
if swift_test_auth_version == "1":
swift_test_auth += 'v1.0'
try:
if 'account' in config:
swift_test_user[0] = '%(account)s:%(username)s' % config
else:
swift_test_user[0] = '%(username)s' % config
swift_test_key[0] = config['password']
except KeyError:
# bad config, no account/username configured, tests cannot be run
pass
try:
swift_test_user[1] = '%s%s' % (
'%s:' % config['account2'] if 'account2' in config else '',
config['username2'])
swift_test_key[1] = config['password2']
except KeyError:
pass # old config, no second account tests can be run
try:
swift_test_user[2] = '%s%s' % (
'%s:' % config['account'] if 'account'
in config else '', config['username3'])
swift_test_key[2] = config['password3']
except KeyError:
pass # old config, no third account tests can be run
for _ in range(3):
swift_test_perm[_] = swift_test_user[_]
else:
swift_test_user[0] = config['username']
swift_test_tenant[0] = config['account']
swift_test_key[0] = config['password']
swift_test_user[1] = config['username2']
swift_test_tenant[1] = config['account2']
swift_test_key[1] = config['password2']
swift_test_user[2] = config['username3']
swift_test_tenant[2] = config['account']
swift_test_key[2] = config['password3']
for _ in range(3):
swift_test_perm[_] = swift_test_tenant[_] + ':' \
+ swift_test_user[_]
skip = not all([swift_test_auth, swift_test_user[0], swift_test_key[0]])
if skip:
print >>sys.stderr, 'SKIPPING FUNCTIONAL TESTS DUE TO NO CONFIG'
skip2 = not all([not skip, swift_test_user[1], swift_test_key[1]])
if not skip and skip2:
print >>sys.stderr, \
'SKIPPING SECOND ACCOUNT FUNCTIONAL TESTS DUE TO NO CONFIG FOR THEM'
skip3 = not all([not skip, swift_test_user[2], swift_test_key[2]])
if not skip and skip3:
print >>sys.stderr, \
'SKIPPING THIRD ACCOUNT FUNCTIONAL TESTS DUE TO NO CONFIG FOR THEM'
config = {}
orig_collate = ''
@ -30,3 +144,136 @@ def setup_package():
def teardown_package():
global orig_collate
locale.setlocale(locale.LC_COLLATE, orig_collate)
class AuthError(Exception):
pass
class InternalServerError(Exception):
pass
url = [None, None, None]
token = [None, None, None]
parsed = [None, None, None]
conn = [None, None, None]
def retry(func, *args, **kwargs):
"""
You can use the kwargs to override:
'retries' (default: 5)
'use_account' (default: 1) - which user's token to pass
'url_account' (default: matches 'use_account') - which user's storage URL
'resource' (default: url[url_account] - URL to connect to; retry()
will interpolate the variable :storage_url: if present
"""
global url, token, parsed, conn
retries = kwargs.get('retries', 5)
attempts, backoff = 0, 1
# use account #1 by default; turn user's 1-indexed account into 0-indexed
use_account = kwargs.pop('use_account', 1) - 1
# access our own account by default
url_account = kwargs.pop('url_account', use_account + 1) - 1
while attempts <= retries:
attempts += 1
try:
if not url[use_account] or not token[use_account]:
url[use_account], token[use_account] = \
get_auth(swift_test_auth, swift_test_user[use_account],
swift_test_key[use_account],
snet=False,
tenant_name=swift_test_tenant[use_account],
auth_version=swift_test_auth_version,
os_options={})
parsed[use_account] = conn[use_account] = None
if not parsed[use_account] or not conn[use_account]:
parsed[use_account], conn[use_account] = \
http_connection(url[use_account])
# default resource is the account url[url_account]
resource = kwargs.pop('resource', '%(storage_url)s')
template_vars = {'storage_url': url[url_account]}
parsed_result = urlparse(resource % template_vars)
return func(url[url_account], token[use_account],
parsed_result, conn[url_account],
*args, **kwargs)
except (socket.error, HTTPException):
if attempts > retries:
raise
parsed[use_account] = conn[use_account] = None
except AuthError:
url[use_account] = token[use_account] = None
continue
except InternalServerError:
pass
if attempts <= retries:
sleep(backoff)
backoff *= 2
raise Exception('No result after %s retries.' % retries)
def check_response(conn):
resp = conn.getresponse()
if resp.status == 401:
resp.read()
raise AuthError()
elif resp.status // 100 == 5:
resp.read()
raise InternalServerError()
return resp
def load_constraint(name):
global config
c = config[name]
if not isinstance(c, int):
raise SkipTest(c)
return c
cluster_info = {}
def get_cluster_info():
conn = Connection(config)
conn.authenticate()
global cluster_info
cluster_info = conn.cluster_info()
def reset_acl():
def post(url, token, parsed, conn):
conn.request('POST', parsed.path, '', {
'X-Auth-Token': token,
'X-Account-Access-Control': '{}'
})
return check_response(conn)
resp = retry(post, use_account=1)
resp.read()
def requires_acls(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
if skip:
raise SkipTest
if not cluster_info:
get_cluster_info()
# Determine whether this cluster has account ACLs; if not, skip test
if not cluster_info.get('tempauth', {}).get('account_acls'):
raise SkipTest
if 'keystoneauth' in cluster_info:
# remove when keystoneauth supports account acls
raise SkipTest
reset_acl()
try:
rv = f(*args, **kwargs)
finally:
reset_acl()
return rv
return wrapper

View File

@ -1,231 +0,0 @@
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from httplib import HTTPException
import os
import socket
import sys
from time import sleep
from urlparse import urlparse
import functools
from nose import SkipTest
from test import get_config
from swiftclient import get_auth, http_connection
from test.functional.swift_test_client import Connection
conf = get_config('func_test')
web_front_end = conf.get('web_front_end', 'integral')
normalized_urls = conf.get('normalized_urls', False)
# If no conf was read, we will fall back to old school env vars
swift_test_auth = os.environ.get('SWIFT_TEST_AUTH')
swift_test_user = [os.environ.get('SWIFT_TEST_USER'), None, None]
swift_test_key = [os.environ.get('SWIFT_TEST_KEY'), None, None]
swift_test_tenant = ['', '', '']
swift_test_perm = ['', '', '']
if conf:
swift_test_auth_version = str(conf.get('auth_version', '1'))
swift_test_auth = 'http'
if conf.get('auth_ssl', 'no').lower() in ('yes', 'true', 'on', '1'):
swift_test_auth = 'https'
if 'auth_prefix' not in conf:
conf['auth_prefix'] = '/'
try:
suffix = '://%(auth_host)s:%(auth_port)s%(auth_prefix)s' % conf
swift_test_auth += suffix
except KeyError:
pass # skip
if swift_test_auth_version == "1":
swift_test_auth += 'v1.0'
if 'account' in conf:
swift_test_user[0] = '%(account)s:%(username)s' % conf
else:
swift_test_user[0] = '%(username)s' % conf
swift_test_key[0] = conf['password']
try:
swift_test_user[1] = '%s%s' % (
'%s:' % conf['account2'] if 'account2' in conf else '',
conf['username2'])
swift_test_key[1] = conf['password2']
except KeyError as err:
pass # old conf, no second account tests can be run
try:
swift_test_user[2] = '%s%s' % ('%s:' % conf['account'] if 'account'
in conf else '', conf['username3'])
swift_test_key[2] = conf['password3']
except KeyError as err:
pass # old conf, no third account tests can be run
for _ in range(3):
swift_test_perm[_] = swift_test_user[_]
else:
swift_test_user[0] = conf['username']
swift_test_tenant[0] = conf['account']
swift_test_key[0] = conf['password']
swift_test_user[1] = conf['username2']
swift_test_tenant[1] = conf['account2']
swift_test_key[1] = conf['password2']
swift_test_user[2] = conf['username3']
swift_test_tenant[2] = conf['account']
swift_test_key[2] = conf['password3']
for _ in range(3):
swift_test_perm[_] = swift_test_tenant[_] + ':' \
+ swift_test_user[_]
skip = not all([swift_test_auth, swift_test_user[0], swift_test_key[0]])
if skip:
print >>sys.stderr, 'SKIPPING FUNCTIONAL TESTS DUE TO NO CONFIG'
skip2 = not all([not skip, swift_test_user[1], swift_test_key[1]])
if not skip and skip2:
print >>sys.stderr, \
'SKIPPING SECOND ACCOUNT FUNCTIONAL TESTS DUE TO NO CONFIG FOR THEM'
skip3 = not all([not skip, swift_test_user[2], swift_test_key[2]])
if not skip and skip3:
print >>sys.stderr, \
'SKIPPING THIRD ACCOUNT FUNCTIONAL TESTS DUE TO NO CONFIG FOR THEM'
class AuthError(Exception):
pass
class InternalServerError(Exception):
pass
url = [None, None, None]
token = [None, None, None]
parsed = [None, None, None]
conn = [None, None, None]
def retry(func, *args, **kwargs):
"""
You can use the kwargs to override:
'retries' (default: 5)
'use_account' (default: 1) - which user's token to pass
'url_account' (default: matches 'use_account') - which user's storage URL
'resource' (default: url[url_account] - URL to connect to; retry()
will interpolate the variable :storage_url: if present
"""
global url, token, parsed, conn
retries = kwargs.get('retries', 5)
attempts, backoff = 0, 1
# use account #1 by default; turn user's 1-indexed account into 0-indexed
use_account = kwargs.pop('use_account', 1) - 1
# access our own account by default
url_account = kwargs.pop('url_account', use_account + 1) - 1
while attempts <= retries:
attempts += 1
try:
if not url[use_account] or not token[use_account]:
url[use_account], token[use_account] = \
get_auth(swift_test_auth, swift_test_user[use_account],
swift_test_key[use_account],
snet=False,
tenant_name=swift_test_tenant[use_account],
auth_version=swift_test_auth_version,
os_options={})
parsed[use_account] = conn[use_account] = None
if not parsed[use_account] or not conn[use_account]:
parsed[use_account], conn[use_account] = \
http_connection(url[use_account])
# default resource is the account url[url_account]
resource = kwargs.pop('resource', '%(storage_url)s')
template_vars = {'storage_url': url[url_account]}
parsed_result = urlparse(resource % template_vars)
return func(url[url_account], token[use_account],
parsed_result, conn[url_account],
*args, **kwargs)
except (socket.error, HTTPException):
if attempts > retries:
raise
parsed[use_account] = conn[use_account] = None
except AuthError:
url[use_account] = token[use_account] = None
continue
except InternalServerError:
pass
if attempts <= retries:
sleep(backoff)
backoff *= 2
raise Exception('No result after %s retries.' % retries)
def check_response(conn):
resp = conn.getresponse()
if resp.status == 401:
resp.read()
raise AuthError()
elif resp.status // 100 == 5:
resp.read()
raise InternalServerError()
return resp
cluster_info = {}
def get_cluster_info():
conn = Connection(conf)
conn.authenticate()
global cluster_info
cluster_info = conn.cluster_info()
def reset_acl():
def post(url, token, parsed, conn):
conn.request('POST', parsed.path, '', {
'X-Auth-Token': token,
'X-Account-Access-Control': '{}'
})
return check_response(conn)
resp = retry(post, use_account=1)
resp.read()
def requires_acls(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
if skip:
raise SkipTest
if not cluster_info:
get_cluster_info()
# Determine whether this cluster has account ACLs; if not, skip test
if not cluster_info.get('tempauth', {}).get('account_acls'):
raise SkipTest
if 'keystoneauth' in cluster_info:
# remove when keystoneauth supports account acls
raise SkipTest
reset_acl()
try:
rv = f(*args, **kwargs)
finally:
reset_acl()
return rv
return wrapper

View File

@ -22,10 +22,10 @@ from nose import SkipTest
from string import letters
from swift.common.middleware.acl import format_acl
from swift_testing import (check_response, retry, skip, skip2, skip3,
web_front_end, requires_acls)
import swift_testing
from test.functional.tests import load_constraint
from test.functional import check_response, retry, requires_acls, \
load_constraint
import test.functional as tf
class TestAccount(unittest.TestCase):
@ -37,7 +37,7 @@ class TestAccount(unittest.TestCase):
self.max_meta_value_length = load_constraint('max_meta_value_length')
def test_metadata(self):
if skip:
if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, value):
@ -77,6 +77,9 @@ class TestAccount(unittest.TestCase):
self.assertEqual(resp.getheader('x-account-meta-test'), 'Value')
def test_invalid_acls(self):
if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
@ -113,7 +116,7 @@ class TestAccount(unittest.TestCase):
resp.read()
self.assertEqual(resp.status, 400)
acl_user = swift_testing.swift_test_user[1]
acl_user = tf.swift_test_user[1]
acl = {'admin': [acl_user], 'invalid_key': 'invalid_value'}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
@ -141,7 +144,7 @@ class TestAccount(unittest.TestCase):
@requires_acls
def test_read_only_acl(self):
if skip3:
if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@ -159,7 +162,7 @@ class TestAccount(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read access
acl_user = swift_testing.swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
@ -192,7 +195,7 @@ class TestAccount(unittest.TestCase):
@requires_acls
def test_read_write_acl(self):
if skip3:
if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@ -210,7 +213,7 @@ class TestAccount(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-write access
acl_user = swift_testing.swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
@ -233,7 +236,7 @@ class TestAccount(unittest.TestCase):
@requires_acls
def test_admin_acl(self):
if skip3:
if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@ -251,7 +254,7 @@ class TestAccount(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant admin access
acl_user = swift_testing.swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
acl_json_str = format_acl(version=2, acl_dict=acl)
headers = {'x-account-access-control': acl_json_str}
@ -291,7 +294,7 @@ class TestAccount(unittest.TestCase):
@requires_acls
def test_protected_tempurl(self):
if skip3:
if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@ -314,7 +317,7 @@ class TestAccount(unittest.TestCase):
self.assertEqual(resp.status, 204)
# grant read-only access to tester3
acl_user = swift_testing.swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
acl_json_str = format_acl(version=2, acl_dict=acl)
headers = {'x-account-access-control': acl_json_str}
@ -332,7 +335,7 @@ class TestAccount(unittest.TestCase):
self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None)
# grant read-write access to tester3
acl_user = swift_testing.swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
acl_json_str = format_acl(version=2, acl_dict=acl)
headers = {'x-account-access-control': acl_json_str}
@ -350,7 +353,7 @@ class TestAccount(unittest.TestCase):
self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None)
# grant admin access to tester3
acl_user = swift_testing.swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
acl_json_str = format_acl(version=2, acl_dict=acl)
headers = {'x-account-access-control': acl_json_str}
@ -385,7 +388,7 @@ class TestAccount(unittest.TestCase):
@requires_acls
def test_account_acls(self):
if skip2:
if tf.skip2:
raise SkipTest
def post(url, token, parsed, conn, headers):
@ -432,7 +435,7 @@ class TestAccount(unittest.TestCase):
# User1 is swift_owner of their own account, so they can POST an
# ACL -- let's do this and make User2 (test_user[1]) an admin
acl_user = swift_testing.swift_test_user[1]
acl_user = tf.swift_test_user[1]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
@ -509,7 +512,7 @@ class TestAccount(unittest.TestCase):
@requires_acls
def test_swift_account_acls(self):
if skip:
if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, headers):
@ -572,7 +575,7 @@ class TestAccount(unittest.TestCase):
resp.read()
def test_swift_prohibits_garbage_account_acls(self):
if skip:
if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, headers):
@ -639,7 +642,7 @@ class TestAccount(unittest.TestCase):
resp.read()
def test_unicode_metadata(self):
if skip:
if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
@ -652,7 +655,7 @@ class TestAccount(unittest.TestCase):
return check_response(conn)
uni_key = u'X-Account-Meta-uni\u0E12'
uni_value = u'uni\u0E12'
if (web_front_end == 'integral'):
if (tf.web_front_end == 'integral'):
resp = retry(post, uni_key, '1')
resp.read()
self.assertTrue(resp.status in (201, 204))
@ -668,7 +671,7 @@ class TestAccount(unittest.TestCase):
self.assert_(resp.status in (200, 204), resp.status)
self.assertEqual(resp.getheader('X-Account-Meta-uni'),
uni_value.encode('utf-8'))
if (web_front_end == 'integral'):
if (tf.web_front_end == 'integral'):
resp = retry(post, uni_key, uni_value)
resp.read()
self.assertEqual(resp.status, 204)
@ -679,7 +682,7 @@ class TestAccount(unittest.TestCase):
uni_value.encode('utf-8'))
def test_multi_metadata(self):
if skip:
if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
@ -708,7 +711,7 @@ class TestAccount(unittest.TestCase):
self.assertEqual(resp.getheader('x-account-meta-two'), '2')
def test_bad_metadata(self):
if skip:
if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, extra_headers):

View File

@ -20,16 +20,15 @@ import unittest
from nose import SkipTest
from uuid import uuid4
from swift_testing import check_response, retry, skip, skip2, skip3, \
swift_test_perm, web_front_end, requires_acls, swift_test_user
from tests import load_constraint
from test.functional import check_response, retry, requires_acls, \
load_constraint
import test.functional as tf
class TestContainer(unittest.TestCase):
def setUp(self):
if skip:
if tf.skip:
raise SkipTest
self.name = uuid4().hex
@ -48,7 +47,7 @@ class TestContainer(unittest.TestCase):
self.max_meta_value_length = load_constraint('max_meta_value_length')
def tearDown(self):
if skip:
if tf.skip:
raise SkipTest
def get(url, token, parsed, conn):
@ -84,7 +83,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 204)
def test_multi_metadata(self):
if skip:
if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
@ -114,7 +113,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('x-container-meta-two'), '2')
def test_unicode_metadata(self):
if skip:
if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
@ -129,7 +128,7 @@ class TestContainer(unittest.TestCase):
uni_key = u'X-Container-Meta-uni\u0E12'
uni_value = u'uni\u0E12'
if (web_front_end == 'integral'):
if (tf.web_front_end == 'integral'):
resp = retry(post, uni_key, '1')
resp.read()
self.assertEqual(resp.status, 204)
@ -145,7 +144,7 @@ class TestContainer(unittest.TestCase):
self.assert_(resp.status in (200, 204), resp.status)
self.assertEqual(resp.getheader('X-Container-Meta-uni'),
uni_value.encode('utf-8'))
if (web_front_end == 'integral'):
if (tf.web_front_end == 'integral'):
resp = retry(post, uni_key, uni_value)
resp.read()
self.assertEqual(resp.status, 204)
@ -156,7 +155,7 @@ class TestContainer(unittest.TestCase):
uni_value.encode('utf-8'))
def test_PUT_metadata(self):
if skip:
if tf.skip:
raise SkipTest
def put(url, token, parsed, conn, name, value):
@ -213,7 +212,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 204)
def test_POST_metadata(self):
if skip:
if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, value):
@ -253,7 +252,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('x-container-meta-test'), 'Value')
def test_PUT_bad_metadata(self):
if skip:
if tf.skip:
raise SkipTest
def put(url, token, parsed, conn, name, extra_headers):
@ -358,7 +357,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 404)
def test_POST_bad_metadata(self):
if skip:
if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, extra_headers):
@ -426,7 +425,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 400)
def test_public_container(self):
if skip:
if tf.skip:
raise SkipTest
def get(url, token, parsed, conn):
@ -467,7 +466,7 @@ class TestContainer(unittest.TestCase):
self.assert_(str(err).startswith('No result after '), err)
def test_cross_account_container(self):
if skip or skip2:
if tf.skip or tf.skip2:
raise SkipTest
# Obtain the first account's string
first_account = ['unknown']
@ -495,8 +494,8 @@ class TestContainer(unittest.TestCase):
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Read': swift_test_perm[1],
'X-Container-Write': swift_test_perm[1]})
'X-Container-Read': tf.swift_test_perm[1],
'X-Container-Write': tf.swift_test_perm[1]})
return check_response(conn)
resp = retry(post)
@ -523,7 +522,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 403)
def test_cross_account_public_container(self):
if skip or skip2:
if tf.skip or tf.skip2:
raise SkipTest
# Obtain the first account's string
first_account = ['unknown']
@ -576,7 +575,7 @@ class TestContainer(unittest.TestCase):
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Write': swift_test_perm[1]})
'X-Container-Write': tf.swift_test_perm[1]})
return check_response(conn)
resp = retry(post)
@ -592,7 +591,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 201)
def test_nonadmin_user(self):
if skip or skip3:
if tf.skip or tf.skip3:
raise SkipTest
# Obtain the first account's string
first_account = ['unknown']
@ -620,7 +619,7 @@ class TestContainer(unittest.TestCase):
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Read': swift_test_perm[2]})
'X-Container-Read': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
@ -645,7 +644,7 @@ class TestContainer(unittest.TestCase):
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Write': swift_test_perm[2]})
'X-Container-Write': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
@ -662,7 +661,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_read_only_acl_listings(self):
if skip3:
if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@ -685,7 +684,7 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-only access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -715,7 +714,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_read_only_acl_metadata(self):
if skip3:
if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn, name):
@ -750,7 +749,7 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-only access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -772,7 +771,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_read_write_acl_listings(self):
if skip3:
if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@ -800,7 +799,7 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-write access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post, headers=headers, use_account=1)
@ -843,7 +842,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_read_write_acl_metadata(self):
if skip3:
if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn, name):
@ -878,7 +877,7 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-write access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -914,7 +913,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_admin_acl_listing(self):
if skip3:
if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@ -942,7 +941,7 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant admin access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post, headers=headers, use_account=1)
@ -985,7 +984,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_admin_acl_metadata(self):
if skip3:
if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn, name):
@ -1020,7 +1019,7 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -1056,7 +1055,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_protected_container_sync(self):
if skip3:
if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn, name):
@ -1090,7 +1089,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
# grant read-only access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -1112,7 +1111,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 403)
# grant read-write access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -1150,7 +1149,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
# grant admin access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -1178,7 +1177,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_protected_container_acl(self):
if skip3:
if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn, name):
@ -1214,7 +1213,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
# grant read-only access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -1240,7 +1239,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 403)
# grant read-write access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -1282,7 +1281,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
# grant admin access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -1312,7 +1311,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('X-Container-Read'), '.r:*')
def test_long_name_content_type(self):
if skip:
if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
@ -1328,7 +1327,7 @@ class TestContainer(unittest.TestCase):
'text/html; charset=UTF-8')
def test_null_name(self):
if skip:
if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
@ -1337,7 +1336,7 @@ class TestContainer(unittest.TestCase):
return check_response(conn)
resp = retry(put)
if (web_front_end == 'apache2'):
if (tf.web_front_end == 'apache2'):
self.assertEqual(resp.status, 404)
else:
self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL')

View File

@ -21,14 +21,14 @@ from uuid import uuid4
from swift.common.utils import json
from swift_testing import check_response, retry, skip, skip3, \
swift_test_perm, web_front_end, requires_acls, swift_test_user
from test.functional import check_response, retry, requires_acls
import test.functional as tf
class TestObject(unittest.TestCase):
def setUp(self):
if skip:
if tf.skip:
raise SkipTest
self.container = uuid4().hex
@ -51,7 +51,7 @@ class TestObject(unittest.TestCase):
self.assertEqual(resp.status, 201)
def tearDown(self):
if skip:
if tf.skip:
raise SkipTest
def delete(url, token, parsed, conn, obj):
@ -112,7 +112,7 @@ class TestObject(unittest.TestCase):
self.assertEquals(resp.status, 400)
def test_copy_object(self):
if skip:
if tf.skip:
raise SkipTest
source = '%s/%s' % (self.container, self.obj)
@ -186,7 +186,7 @@ class TestObject(unittest.TestCase):
self.assertEqual(resp.status, 204)
def test_public_object(self):
if skip:
if tf.skip:
raise SkipTest
def get(url, token, parsed, conn):
@ -225,7 +225,7 @@ class TestObject(unittest.TestCase):
self.assert_(str(err).startswith('No result after '))
def test_private_object(self):
if skip or skip3:
if tf.skip or tf.skip3:
raise SkipTest
# Ensure we can't access the object with the third account
@ -245,8 +245,8 @@ class TestObject(unittest.TestCase):
conn.request('PUT', '%s/%s' % (
parsed.path, shared_container), '',
{'X-Auth-Token': token,
'X-Container-Read': swift_test_perm[2],
'X-Container-Write': swift_test_perm[2]})
'X-Container-Read': tf.swift_test_perm[2],
'X-Container-Write': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(put)
resp.read()
@ -319,8 +319,8 @@ class TestObject(unittest.TestCase):
@requires_acls
def test_read_only(self):
if skip3:
raise SkipTest
if tf.skip3:
raise tf.SkipTest
def get_listing(url, token, parsed, conn):
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
@ -361,7 +361,7 @@ class TestObject(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-only access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -400,7 +400,7 @@ class TestObject(unittest.TestCase):
@requires_acls
def test_read_write(self):
if skip3:
if tf.skip3:
raise SkipTest
def get_listing(url, token, parsed, conn):
@ -442,7 +442,7 @@ class TestObject(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-write access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -481,7 +481,7 @@ class TestObject(unittest.TestCase):
@requires_acls
def test_admin(self):
if skip3:
if tf.skip3:
raise SkipTest
def get_listing(url, token, parsed, conn):
@ -523,7 +523,7 @@ class TestObject(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant admin access
acl_user = swift_test_user[2]
acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@ -561,7 +561,7 @@ class TestObject(unittest.TestCase):
self.assert_(self.obj not in listing)
def test_manifest(self):
if skip:
if tf.skip:
raise SkipTest
# Data for the object segments
segments1 = ['one', 'two', 'three', 'four', 'five']
@ -672,7 +672,7 @@ class TestObject(unittest.TestCase):
self.assertEqual(resp.read(), ''.join(segments2))
self.assertEqual(resp.status, 200)
if not skip3:
if not tf.skip3:
# Ensure we can't access the manifest with the third account
def get(url, token, parsed, conn):
@ -687,7 +687,7 @@ class TestObject(unittest.TestCase):
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, self.container),
'', {'X-Auth-Token': token,
'X-Container-Read': swift_test_perm[2]})
'X-Container-Read': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
resp.read()
@ -745,7 +745,7 @@ class TestObject(unittest.TestCase):
self.assertEqual(resp.read(), ''.join(segments3))
self.assertEqual(resp.status, 200)
if not skip3:
if not tf.skip3:
# Ensure we can't access the manifest with the third account
# (because the segments are in a protected container even if the
@ -763,7 +763,7 @@ class TestObject(unittest.TestCase):
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, acontainer),
'', {'X-Auth-Token': token,
'X-Container-Read': swift_test_perm[2]})
'X-Container-Read': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
resp.read()
@ -831,7 +831,7 @@ class TestObject(unittest.TestCase):
self.assertEqual(resp.status, 204)
def test_delete_content_type(self):
if skip:
if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
@ -853,7 +853,7 @@ class TestObject(unittest.TestCase):
'text/html; charset=UTF-8')
def test_delete_if_delete_at_bad(self):
if skip:
if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
@ -875,7 +875,7 @@ class TestObject(unittest.TestCase):
self.assertEqual(resp.status, 400)
def test_null_name(self):
if skip:
if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
@ -884,14 +884,14 @@ class TestObject(unittest.TestCase):
self.container), 'test', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
if (web_front_end == 'apache2'):
if (tf.web_front_end == 'apache2'):
self.assertEqual(resp.status, 404)
else:
self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL')
self.assertEqual(resp.status, 412)
def test_cors(self):
if skip:
if tf.skip:
raise SkipTest
def is_strict_mode(url, token, parsed, conn):

View File

@ -28,38 +28,10 @@ import urllib
import uuid
from nose import SkipTest
from test import get_config
from test.functional import config
from test.functional import normalized_urls, load_constraint
import test.functional as tf
from test.functional.swift_test_client import Account, Connection, File, \
ResponseError
from swift.common import constraints
config.update(get_config('func_test'))
for k in constraints.DEFAULT_CONSTRAINTS:
if k in config:
# prefer what's in test.conf
config[k] = int(config[k])
elif constraints.SWIFT_CONSTRAINTS_LOADED:
# swift.conf exists, so use what's defined there (or swift defaults)
# This normally happens when the test is running locally to the cluster
# as in a SAIO.
config[k] = constraints.EFFECTIVE_CONSTRAINTS[k]
else:
# .functests don't know what the constraints of the tested cluster are,
# so the tests can't reliably pass or fail. Therefore, skip those
# tests.
config[k] = '%s constraint is not defined' % k
web_front_end = config.get('web_front_end', 'integral')
normalized_urls = config.get('normalized_urls', False)
def load_constraint(name):
c = config[name]
if not isinstance(c, int):
raise SkipTest(c)
return c
def chunks(s, length=3):
@ -153,10 +125,10 @@ class Base2(object):
class TestAccountEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn = Connection(tf.config)
cls.conn.authenticate()
cls.account = Account(cls.conn, config.get('account',
config['username']))
cls.account = Account(cls.conn, tf.config.get('account',
tf.config['username']))
cls.account.delete_containers()
cls.containers = []
@ -340,10 +312,10 @@ class TestAccountUTF8(Base2, TestAccount):
class TestAccountNoContainersEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn = Connection(tf.config)
cls.conn.authenticate()
cls.account = Account(cls.conn, config.get('account',
config['username']))
cls.account = Account(cls.conn, tf.config.get('account',
tf.config['username']))
cls.account.delete_containers()
@ -369,10 +341,10 @@ class TestAccountNoContainersUTF8(Base2, TestAccountNoContainers):
class TestContainerEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn = Connection(tf.config)
cls.conn.authenticate()
cls.account = Account(cls.conn, config.get('account',
config['username']))
cls.account = Account(cls.conn, tf.config.get('account',
tf.config['username']))
cls.account.delete_containers()
cls.container = cls.account.container(Utils.create_name())
@ -660,10 +632,10 @@ class TestContainerUTF8(Base2, TestContainer):
class TestContainerPathsEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn = Connection(tf.config)
cls.conn.authenticate()
cls.account = Account(cls.conn, config.get('account',
config['username']))
cls.account = Account(cls.conn, tf.config.get('account',
tf.config['username']))
cls.account.delete_containers()
cls.file_size = 8
@ -839,10 +811,10 @@ class TestContainerPaths(Base):
class TestFileEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn = Connection(tf.config)
cls.conn.authenticate()
cls.account = Account(cls.conn, config.get('account',
config['username']))
cls.account = Account(cls.conn, tf.config.get('account',
tf.config['username']))
cls.account.delete_containers()
cls.container = cls.account.container(Utils.create_name())
@ -1498,8 +1470,9 @@ class TestFile(Base):
self.assertEqual(etag, header_etag)
def testChunkedPut(self):
if (web_front_end == 'apache2'):
raise SkipTest()
if (tf.web_front_end == 'apache2'):
raise SkipTest("Chunked PUT can only be tested with apache2 web"
" front end")
data = File.random_data(10000)
etag = File.compute_md5sum(data)
@ -1523,10 +1496,10 @@ class TestFileUTF8(Base2, TestFile):
class TestDloEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn = Connection(tf.config)
cls.conn.authenticate()
cls.account = Account(cls.conn, config.get('account',
config['username']))
cls.account = Account(cls.conn, tf.config.get('account',
tf.config['username']))
cls.account.delete_containers()
cls.container = cls.account.container(Utils.create_name())
@ -1696,10 +1669,10 @@ class TestDloUTF8(Base2, TestDlo):
class TestFileComparisonEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn = Connection(tf.config)
cls.conn.authenticate()
cls.account = Account(cls.conn, config.get('account',
config['username']))
cls.account = Account(cls.conn, tf.config.get('account',
tf.config['username']))
cls.account.delete_containers()
cls.container = cls.account.container(Utils.create_name())
@ -1813,7 +1786,7 @@ class TestSloEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn = Connection(tf.config)
cls.conn.authenticate()
if cls.slo_enabled is None:
@ -1822,8 +1795,8 @@ class TestSloEnv(object):
if not cls.slo_enabled:
return
cls.account = Account(cls.conn, config.get('account',
config['username']))
cls.account = Account(cls.conn, tf.config.get('account',
tf.config['username']))
cls.account.delete_containers()
cls.container = cls.account.container(Utils.create_name())
@ -2082,11 +2055,11 @@ class TestObjectVersioningEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn = Connection(tf.config)
cls.conn.authenticate()
cls.account = Account(cls.conn, config.get('account',
config['username']))
cls.account = Account(cls.conn, tf.config.get('account',
tf.config['username']))
# avoid getting a prefix that stops halfway through an encoded
# character
@ -2161,7 +2134,7 @@ class TestTempurlEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn = Connection(tf.config)
cls.conn.authenticate()
if cls.tempurl_enabled is None:
@ -2175,7 +2148,7 @@ class TestTempurlEnv(object):
cls.tempurl_key2 = Utils.create_name()
cls.account = Account(
cls.conn, config.get('account', config['username']))
cls.conn, tf.config.get('account', tf.config['username']))
cls.account.delete_containers()
cls.account.update_metadata({
'temp-url-key': cls.tempurl_key,
@ -2336,7 +2309,7 @@ class TestSloTempurlEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn = Connection(tf.config)
cls.conn.authenticate()
if cls.enabled is None:
@ -2346,7 +2319,7 @@ class TestSloTempurlEnv(object):
cls.tempurl_key = Utils.create_name()
cls.account = Account(
cls.conn, config.get('account', config['username']))
cls.conn, tf.config.get('account', tf.config['username']))
cls.account.delete_containers()
cls.account.update_metadata({'temp-url-key': cls.tempurl_key})