977 lines
47 KiB
Python
977 lines
47 KiB
Python
# Copyright (c) 2010 OpenStack, LLC.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from __future__ import with_statement
|
|
import unittest
|
|
import os
|
|
from shutil import rmtree
|
|
from StringIO import StringIO
|
|
from uuid import uuid4
|
|
from logging import StreamHandler
|
|
|
|
import sqlite3
|
|
from webob import Request
|
|
|
|
from swift.auth import server as auth_server
|
|
from swift.common.db import DatabaseConnectionError, get_db_connection
|
|
from swift.common.utils import get_logger
|
|
|
|
|
|
class TestException(Exception):
|
|
pass
|
|
|
|
|
|
def fake_http_connect(*code_iter, **kwargs):
|
|
class FakeConn(object):
|
|
def __init__(self, status):
|
|
self.status = status
|
|
self.reason = 'Fake'
|
|
self.host = '1.2.3.4'
|
|
self.port = '1234'
|
|
def getresponse(self):
|
|
if 'slow' in kwargs:
|
|
sleep(0.2)
|
|
if 'raise_exc' in kwargs:
|
|
raise kwargs['raise_exc']
|
|
return self
|
|
def getheaders(self):
|
|
return {'x-account-bytes-used': '20'}
|
|
def read(self, amt=None):
|
|
return ''
|
|
def getheader(self, name):
|
|
return self.getheaders().get(name.lower())
|
|
code_iter = iter(code_iter)
|
|
def connect(*args, **kwargs):
|
|
connect.last_args = args
|
|
connect.last_kwargs = kwargs
|
|
return FakeConn(code_iter.next())
|
|
return connect
|
|
|
|
|
|
class TestAuthServer(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.ohttp_connect = auth_server.http_connect
|
|
self.testdir = os.path.join(os.path.dirname(__file__),
|
|
'auth_server')
|
|
rmtree(self.testdir, ignore_errors=1)
|
|
os.mkdir(self.testdir)
|
|
self.conf = {'swift_dir': self.testdir, 'log_name': 'auth',
|
|
'super_admin_key': 'testkey'}
|
|
self.controller = auth_server.AuthController(self.conf)
|
|
|
|
def tearDown(self):
|
|
auth_server.http_connect = self.ohttp_connect
|
|
rmtree(self.testdir, ignore_errors=1)
|
|
|
|
def test_get_conn(self):
|
|
with self.controller.get_conn() as conn:
|
|
pass
|
|
exc = False
|
|
try:
|
|
with self.controller.get_conn() as conn:
|
|
raise TestException('test')
|
|
except TestException:
|
|
exc = True
|
|
self.assert_(exc)
|
|
# We allow reentrant calls for the auth-server
|
|
with self.controller.get_conn() as conn1:
|
|
exc = False
|
|
try:
|
|
with self.controller.get_conn() as conn2:
|
|
self.assert_(conn1 is not conn2)
|
|
except DatabaseConnectionError:
|
|
exc = True
|
|
self.assert_(not exc)
|
|
self.controller.conn = None
|
|
with self.controller.get_conn() as conn:
|
|
self.assert_(conn is not None)
|
|
|
|
def test_validate_token_non_existant_token(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing',).split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
self.assertEquals(self.controller.validate_token(token + 'bad'), False)
|
|
|
|
def test_validate_token_good(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing',).split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
ttl, _, _, _ = self.controller.validate_token(token)
|
|
self.assert_(ttl > 0, repr(ttl))
|
|
|
|
def test_validate_token_expired(self):
|
|
orig_time = auth_server.time
|
|
try:
|
|
auth_server.time = lambda: 1
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user('test', 'tester',
|
|
'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
ttl, _, _, _ = self.controller.validate_token(token)
|
|
self.assert_(ttl > 0, repr(ttl))
|
|
auth_server.time = lambda: 1 + self.controller.token_life
|
|
self.assertEquals(self.controller.validate_token(token), False)
|
|
finally:
|
|
auth_server.time = orig_time
|
|
|
|
def test_create_user_no_new_account(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
result = self.controller.create_user('', 'tester', 'testing')
|
|
self.assertFalse(result)
|
|
|
|
def test_create_user_no_new_user(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
result = self.controller.create_user('test', '', 'testing')
|
|
self.assertFalse(result)
|
|
|
|
def test_create_user_no_new_password(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
result = self.controller.create_user('test', 'tester', '')
|
|
self.assertFalse(result)
|
|
|
|
def test_create_user_good(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url = self.controller.create_user('test', 'tester', 'testing')
|
|
self.assert_(url)
|
|
self.assertEquals('/'.join(url.split('/')[:-1]),
|
|
self.controller.default_cluster_url.rstrip('/'), repr(url))
|
|
|
|
def test_recreate_accounts_none(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
rv = self.controller.recreate_accounts()
|
|
self.assertEquals(rv.split()[0], '0', repr(rv))
|
|
self.assertEquals(rv.split()[-1], '[]', repr(rv))
|
|
|
|
def test_recreate_accounts_one(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
self.controller.create_user('test', 'tester', 'testing')
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
rv = self.controller.recreate_accounts()
|
|
self.assertEquals(rv.split()[0], '1', repr(rv))
|
|
self.assertEquals(rv.split()[-1], '[]', repr(rv))
|
|
|
|
def test_recreate_accounts_several(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
self.controller.create_user('test1', 'tester', 'testing')
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
self.controller.create_user('test2', 'tester', 'testing')
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
self.controller.create_user('test3', 'tester', 'testing')
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
self.controller.create_user('test4', 'tester', 'testing')
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201, 201)
|
|
rv = self.controller.recreate_accounts()
|
|
self.assertEquals(rv.split()[0], '4', repr(rv))
|
|
self.assertEquals(rv.split()[-1], '[]', repr(rv))
|
|
|
|
def test_recreate_accounts_one_fail(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url = self.controller.create_user('test', 'tester', 'testing')
|
|
cfaccount = url.split('/')[-1]
|
|
auth_server.http_connect = fake_http_connect(500)
|
|
rv = self.controller.recreate_accounts()
|
|
self.assertEquals(rv.split()[0], '1', repr(rv))
|
|
self.assertEquals(rv.split()[-1], '[%s]' % repr(cfaccount),
|
|
repr(rv))
|
|
|
|
def test_recreate_accounts_several_fail(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url = self.controller.create_user('test1', 'tester', 'testing')
|
|
cfaccounts = [url.split('/')[-1]]
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url = self.controller.create_user('test2', 'tester', 'testing')
|
|
cfaccounts.append(url.split('/')[-1])
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url = self.controller.create_user('test3', 'tester', 'testing')
|
|
cfaccounts.append(url.split('/')[-1])
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url = self.controller.create_user('test4', 'tester', 'testing')
|
|
cfaccounts.append(url.split('/')[-1])
|
|
auth_server.http_connect = fake_http_connect(500, 500, 500, 500)
|
|
rv = self.controller.recreate_accounts()
|
|
self.assertEquals(rv.split()[0], '4', repr(rv))
|
|
failed = rv.split('[', 1)[-1][:-1].split(', ')
|
|
self.assertEquals(set(failed), set(repr(a) for a in cfaccounts))
|
|
|
|
def test_recreate_accounts_several_fail_some(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url = self.controller.create_user('test1', 'tester', 'testing')
|
|
cfaccounts = [url.split('/')[-1]]
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url = self.controller.create_user('test2', 'tester', 'testing')
|
|
cfaccounts.append(url.split('/')[-1])
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url = self.controller.create_user('test3', 'tester', 'testing')
|
|
cfaccounts.append(url.split('/')[-1])
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url = self.controller.create_user('test4', 'tester', 'testing')
|
|
cfaccounts.append(url.split('/')[-1])
|
|
auth_server.http_connect = fake_http_connect(500, 201, 500, 201)
|
|
rv = self.controller.recreate_accounts()
|
|
self.assertEquals(rv.split()[0], '4', repr(rv))
|
|
failed = rv.split('[', 1)[-1][:-1].split(', ')
|
|
self.assertEquals(
|
|
len(set(repr(a) for a in cfaccounts) - set(failed)), 2)
|
|
|
|
def test_auth_bad_path(self):
|
|
self.assertRaises(ValueError, self.controller.handle_auth,
|
|
Request.blank('', environ={'REQUEST_METHOD': 'GET'}))
|
|
res = self.controller.handle_auth(Request.blank('/bad',
|
|
environ={'REQUEST_METHOD': 'GET'}))
|
|
self.assertEquals(res.status_int, 400)
|
|
|
|
def test_auth_SOSO_missing_headers(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-Pass': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_SOSO_bad_account(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/testbad/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1//auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_SOSO_bad_user(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'testerbad',
|
|
'X-Storage-Pass': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': '',
|
|
'X-Storage-Pass': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_SOSO_bad_password(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testingbad'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': ''}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_SOSO_good(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
ttl, _, _, _ = self.controller.validate_token(token)
|
|
self.assert_(ttl > 0, repr(ttl))
|
|
|
|
def test_auth_SOSO_good_Mosso_headers(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:tester',
|
|
'X-Auth-Key': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
ttl, _, _, _ = self.controller.validate_token(token)
|
|
self.assert_(ttl > 0, repr(ttl))
|
|
|
|
def test_auth_SOSO_bad_Mosso_headers(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing',).split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test2:tester',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': ':tester',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_Mosso_missing_headers(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:tester'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_Mosso_bad_header_format(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'badformat',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': '',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_Mosso_bad_account(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'testbad:tester',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': ':tester',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_Mosso_bad_user(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:testerbad',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_Mosso_bad_password(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:tester',
|
|
'X-Auth-Key': 'testingbad'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:tester',
|
|
'X-Auth-Key': ''}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_Mosso_good(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:tester',
|
|
'X-Auth-Key': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
ttl, _, _, _ = self.controller.validate_token(token)
|
|
self.assert_(ttl > 0, repr(ttl))
|
|
|
|
def test_auth_Mosso_good_SOSO_header_names(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
cfaccount = self.controller.create_user(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'test:tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
ttl, _, _, _ = self.controller.validate_token(token)
|
|
self.assert_(ttl > 0, repr(ttl))
|
|
|
|
def test_basic_logging(self):
|
|
log = StringIO()
|
|
log_handler = StreamHandler(log)
|
|
logger = get_logger(self.conf, 'auth')
|
|
logger.logger.addHandler(log_handler)
|
|
try:
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url = self.controller.create_user('test', 'tester', 'testing')
|
|
self.assertEquals(log.getvalue().rsplit(' ', 1)[0],
|
|
"auth SUCCESS create_user('test', 'tester', _, False, False) "
|
|
"= %s" % repr(url))
|
|
log.truncate(0)
|
|
def start_response(*args):
|
|
pass
|
|
self.controller.handleREST({'REQUEST_METHOD': 'GET',
|
|
'SCRIPT_NAME': '',
|
|
'PATH_INFO': '/v1/test/auth',
|
|
'QUERY_STRING': 'test=True',
|
|
'SERVER_NAME': '127.0.0.1',
|
|
'SERVER_PORT': '8080',
|
|
'SERVER_PROTOCOL': 'HTTP/1.0',
|
|
'CONTENT_LENGTH': '0',
|
|
'wsgi.version': (1, 0),
|
|
'wsgi.url_scheme': 'http',
|
|
'wsgi.input': StringIO(),
|
|
'wsgi.errors': StringIO(),
|
|
'wsgi.multithread': False,
|
|
'wsgi.multiprocess': False,
|
|
'wsgi.run_once': False,
|
|
'HTTP_X_FORWARDED_FOR': 'testhost',
|
|
'HTTP_X_STORAGE_USER': 'tester',
|
|
'HTTP_X_STORAGE_PASS': 'testing'},
|
|
start_response)
|
|
logsegs = log.getvalue().split(' [', 1)
|
|
logsegs[1:] = logsegs[1].split('] ', 1)
|
|
logsegs[1] = '[01/Jan/2001:01:02:03 +0000]'
|
|
logsegs[2:] = logsegs[2].split(' ')
|
|
logsegs[-1] = '0.1234'
|
|
self.assertEquals(' '.join(logsegs), 'auth testhost - - '
|
|
'[01/Jan/2001:01:02:03 +0000] "GET /v1/test/auth?test=True '
|
|
'HTTP/1.0" 204 - "-" "-" - - - - - - - - - "-" "None" "-" '
|
|
'0.1234')
|
|
self.controller.log_headers = True
|
|
log.truncate(0)
|
|
self.controller.handleREST({'REQUEST_METHOD': 'GET',
|
|
'SCRIPT_NAME': '',
|
|
'PATH_INFO': '/v1/test/auth',
|
|
'SERVER_NAME': '127.0.0.1',
|
|
'SERVER_PORT': '8080',
|
|
'SERVER_PROTOCOL': 'HTTP/1.0',
|
|
'CONTENT_LENGTH': '0',
|
|
'wsgi.version': (1, 0),
|
|
'wsgi.url_scheme': 'http',
|
|
'wsgi.input': StringIO(),
|
|
'wsgi.errors': StringIO(),
|
|
'wsgi.multithread': False,
|
|
'wsgi.multiprocess': False,
|
|
'wsgi.run_once': False,
|
|
'HTTP_X_STORAGE_USER': 'tester',
|
|
'HTTP_X_STORAGE_PASS': 'testing'},
|
|
start_response)
|
|
logsegs = log.getvalue().split(' [', 1)
|
|
logsegs[1:] = logsegs[1].split('] ', 1)
|
|
logsegs[1] = '[01/Jan/2001:01:02:03 +0000]'
|
|
logsegs[2:] = logsegs[2].split(' ')
|
|
logsegs[-1] = '0.1234'
|
|
self.assertEquals(' '.join(logsegs), 'auth None - - [01/Jan/2001:'
|
|
'01:02:03 +0000] "GET /v1/test/auth HTTP/1.0" 204 - "-" "-" - '
|
|
'- - - - - - - - "-" "None" "Content-Length: 0\n'
|
|
'X-Storage-User: tester\nX-Storage-Pass: testing" 0.1234')
|
|
finally:
|
|
logger.logger.handlers.remove(log_handler)
|
|
|
|
def test_unhandled_exceptions(self):
|
|
def request_causing_exception(*args, **kwargs):
|
|
pass
|
|
def start_response(*args):
|
|
pass
|
|
orig_Request = auth_server.Request
|
|
log = StringIO()
|
|
log_handler = StreamHandler(log)
|
|
logger = get_logger(self.conf, 'auth')
|
|
logger.logger.addHandler(log_handler)
|
|
try:
|
|
auth_server.Request = request_causing_exception
|
|
self.controller.handleREST({'REQUEST_METHOD': 'GET',
|
|
'SCRIPT_NAME': '',
|
|
'PATH_INFO': '/v1/test/auth',
|
|
'SERVER_NAME': '127.0.0.1',
|
|
'SERVER_PORT': '8080',
|
|
'SERVER_PROTOCOL': 'HTTP/1.0',
|
|
'CONTENT_LENGTH': '0',
|
|
'wsgi.version': (1, 0),
|
|
'wsgi.url_scheme': 'http',
|
|
'wsgi.input': StringIO(),
|
|
'wsgi.errors': StringIO(),
|
|
'wsgi.multithread': False,
|
|
'wsgi.multiprocess': False,
|
|
'wsgi.run_once': False,
|
|
'HTTP_X_STORAGE_USER': 'tester',
|
|
'HTTP_X_STORAGE_PASS': 'testing'},
|
|
start_response)
|
|
self.assert_(log.getvalue().startswith(
|
|
'auth ERROR Unhandled exception in ReST request'),
|
|
log.getvalue())
|
|
log.truncate(0)
|
|
finally:
|
|
auth_server.Request = orig_Request
|
|
logger.logger.handlers.remove(log_handler)
|
|
|
|
def test_upgrading_from_db1(self):
|
|
swift_dir = '/tmp/swift_test_auth_%s' % uuid4().hex
|
|
os.mkdir(swift_dir)
|
|
try:
|
|
# Create db1
|
|
db_file = os.path.join(swift_dir, 'auth.db')
|
|
conn = get_db_connection(db_file, okay_to_create=True)
|
|
conn.execute('''CREATE TABLE IF NOT EXISTS account (
|
|
account TEXT, url TEXT, cfaccount TEXT,
|
|
user TEXT, password TEXT)''')
|
|
conn.execute('''CREATE INDEX IF NOT EXISTS ix_account_account
|
|
ON account (account)''')
|
|
conn.execute('''CREATE TABLE IF NOT EXISTS token (
|
|
cfaccount TEXT, token TEXT, created FLOAT)''')
|
|
conn.execute('''CREATE INDEX IF NOT EXISTS ix_token_cfaccount
|
|
ON token (cfaccount)''')
|
|
conn.execute('''CREATE INDEX IF NOT EXISTS ix_token_created
|
|
ON token (created)''')
|
|
conn.execute('''INSERT INTO account
|
|
(account, url, cfaccount, user, password)
|
|
VALUES ('act', 'url', 'cfa', 'usr', 'pas')''')
|
|
conn.execute('''INSERT INTO token (cfaccount, token, created)
|
|
VALUES ('cfa', 'tok', '1')''')
|
|
conn.commit()
|
|
conn.close()
|
|
# Upgrade to current db
|
|
conf = {'swift_dir': swift_dir, 'super_admin_key': 'testkey'}
|
|
exc = None
|
|
try:
|
|
auth_server.AuthController(conf)
|
|
except Exception, err:
|
|
exc = err
|
|
self.assert_(str(err).strip().startswith('THERE ARE ACCOUNTS IN '
|
|
'YOUR auth.db THAT DO NOT BEGIN WITH YOUR NEW RESELLER'), err)
|
|
# Check new items exist and are correct
|
|
conn = get_db_connection(db_file)
|
|
row = conn.execute('SELECT admin FROM account').fetchone()
|
|
self.assertEquals(row[0], 't')
|
|
row = conn.execute('SELECT user FROM token').fetchone()
|
|
self.assert_(not row)
|
|
finally:
|
|
rmtree(swift_dir)
|
|
|
|
def test_upgrading_from_db2(self):
|
|
swift_dir = '/tmp/swift_test_auth_%s' % uuid4().hex
|
|
os.mkdir(swift_dir)
|
|
try:
|
|
# Create db1
|
|
db_file = os.path.join(swift_dir, 'auth.db')
|
|
conn = get_db_connection(db_file, okay_to_create=True)
|
|
conn.execute('''CREATE TABLE IF NOT EXISTS account (
|
|
account TEXT, url TEXT, cfaccount TEXT,
|
|
user TEXT, password TEXT, admin TEXT)''')
|
|
conn.execute('''CREATE INDEX IF NOT EXISTS ix_account_account
|
|
ON account (account)''')
|
|
conn.execute('''CREATE TABLE IF NOT EXISTS token (
|
|
token TEXT, created FLOAT,
|
|
account TEXT, user TEXT, cfaccount TEXT)''')
|
|
conn.execute('''CREATE INDEX IF NOT EXISTS ix_token_token
|
|
ON token (token)''')
|
|
conn.execute('''CREATE INDEX IF NOT EXISTS ix_token_created
|
|
ON token (created)''')
|
|
conn.execute('''CREATE INDEX IF NOT EXISTS ix_token_account
|
|
ON token (account)''')
|
|
conn.execute('''INSERT INTO account
|
|
(account, url, cfaccount, user, password, admin)
|
|
VALUES ('act', 'url', 'cfa', 'us1', 'pas', '')''')
|
|
conn.execute('''INSERT INTO account
|
|
(account, url, cfaccount, user, password, admin)
|
|
VALUES ('act', 'url', 'cfa', 'us2', 'pas', 't')''')
|
|
conn.execute('''INSERT INTO token
|
|
(token, created, account, user, cfaccount)
|
|
VALUES ('tok', '1', 'act', 'us1', 'cfa')''')
|
|
conn.commit()
|
|
conn.close()
|
|
# Upgrade to current db
|
|
conf = {'swift_dir': swift_dir, 'super_admin_key': 'testkey'}
|
|
exc = None
|
|
try:
|
|
auth_server.AuthController(conf)
|
|
except Exception, err:
|
|
exc = err
|
|
self.assert_(str(err).strip().startswith('THERE ARE ACCOUNTS IN '
|
|
'YOUR auth.db THAT DO NOT BEGIN WITH YOUR NEW RESELLER'), err)
|
|
# Check new items exist and are correct
|
|
conn = get_db_connection(db_file)
|
|
row = conn.execute('''SELECT admin, reseller_admin
|
|
FROM account WHERE user = 'us1' ''').fetchone()
|
|
self.assert_(not row[0], row[0])
|
|
self.assert_(not row[1], row[1])
|
|
row = conn.execute('''SELECT admin, reseller_admin
|
|
FROM account WHERE user = 'us2' ''').fetchone()
|
|
self.assertEquals(row[0], 't')
|
|
self.assert_(not row[1], row[1])
|
|
row = conn.execute('SELECT user FROM token').fetchone()
|
|
self.assert_(row)
|
|
finally:
|
|
rmtree(swift_dir)
|
|
|
|
def test_create_user_twice(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
self.controller.create_user('test', 'tester', 'testing')
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
self.assertEquals(
|
|
self.controller.create_user('test', 'tester', 'testing'),
|
|
'already exists')
|
|
req = Request.blank('/account/test/tester',
|
|
headers={'X-Auth-User-Key': 'testing'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assertEquals(resp.status_int, 409)
|
|
|
|
|
|
def test_create_2users_1account(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url = self.controller.create_user('test', 'tester', 'testing')
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
url2 = self.controller.create_user('test', 'tester2', 'testing2')
|
|
self.assertEquals(url, url2)
|
|
|
|
def test_no_super_admin_key(self):
|
|
conf = {'swift_dir': self.testdir, 'log_name': 'auth'}
|
|
self.assertRaises(ValueError, auth_server.AuthController, conf)
|
|
conf['super_admin_key'] = 'testkey'
|
|
controller = auth_server.AuthController(conf)
|
|
self.assertEquals(controller.super_admin_key, conf['super_admin_key'])
|
|
|
|
def test_add_storage_account(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
stgact = self.controller.add_storage_account()
|
|
self.assert_(stgact.startswith(self.controller.reseller_prefix),
|
|
stgact)
|
|
# Make sure token given is the expected single use token
|
|
token = auth_server.http_connect.last_args[-1]['X-Auth-Token']
|
|
self.assert_(self.controller.validate_token(token))
|
|
self.assert_(not self.controller.validate_token(token))
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
stgact = self.controller.add_storage_account('bob')
|
|
self.assertEquals(stgact, 'bob')
|
|
# Make sure token given is the expected single use token
|
|
token = auth_server.http_connect.last_args[-1]['X-Auth-Token']
|
|
self.assert_(self.controller.validate_token(token))
|
|
self.assert_(not self.controller.validate_token(token))
|
|
|
|
def test_regular_user(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
self.controller.create_user('act', 'usr', 'pas').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1.0',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'pas'}))
|
|
_, _, _, stgact = \
|
|
self.controller.validate_token(res.headers['x-auth-token'])
|
|
self.assertEquals(stgact, '')
|
|
|
|
def test_account_admin(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
stgact = self.controller.create_user(
|
|
'act', 'usr', 'pas', admin=True).split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1.0',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'pas'}))
|
|
_, _, _, vstgact = \
|
|
self.controller.validate_token(res.headers['x-auth-token'])
|
|
self.assertEquals(stgact, vstgact)
|
|
|
|
def test_reseller_admin(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
self.controller.create_user(
|
|
'act', 'usr', 'pas', reseller_admin=True).split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1.0',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'pas'}))
|
|
_, _, _, stgact = \
|
|
self.controller.validate_token(res.headers['x-auth-token'])
|
|
self.assertEquals(stgact, '.reseller_admin')
|
|
|
|
def test_is_account_admin(self):
|
|
req = Request.blank('/', headers={'X-Auth-Admin-User': '.super_admin',
|
|
'X-Auth-Admin-Key': 'testkey'})
|
|
self.assert_(self.controller.is_account_admin(req, 'any'))
|
|
req = Request.blank('/', headers={'X-Auth-Admin-User': '.super_admin',
|
|
'X-Auth-Admin-Key': 'testkey2'})
|
|
self.assert_(not self.controller.is_account_admin(req, 'any'))
|
|
req = Request.blank('/', headers={'X-Auth-Admin-User': '.super_admi',
|
|
'X-Auth-Admin-Key': 'testkey'})
|
|
self.assert_(not self.controller.is_account_admin(req, 'any'))
|
|
|
|
auth_server.http_connect = fake_http_connect(201, 201)
|
|
self.controller.create_user(
|
|
'act1', 'resadmin', 'pas', reseller_admin=True).split('/')[-1]
|
|
self.controller.create_user('act1', 'usr', 'pas').split('/')[-1]
|
|
self.controller.create_user(
|
|
'act2', 'actadmin', 'pas', admin=True).split('/')[-1]
|
|
|
|
req = Request.blank('/', headers={'X-Auth-Admin-User': 'act1:resadmin',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
self.assert_(self.controller.is_account_admin(req, 'any'))
|
|
self.assert_(self.controller.is_account_admin(req, 'act1'))
|
|
self.assert_(self.controller.is_account_admin(req, 'act2'))
|
|
|
|
req = Request.blank('/', headers={'X-Auth-Admin-User': 'act1:usr',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
self.assert_(not self.controller.is_account_admin(req, 'any'))
|
|
self.assert_(not self.controller.is_account_admin(req, 'act1'))
|
|
self.assert_(not self.controller.is_account_admin(req, 'act2'))
|
|
|
|
req = Request.blank('/', headers={'X-Auth-Admin-User': 'act2:actadmin',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
self.assert_(not self.controller.is_account_admin(req, 'any'))
|
|
self.assert_(not self.controller.is_account_admin(req, 'act1'))
|
|
self.assert_(self.controller.is_account_admin(req, 'act2'))
|
|
|
|
def test_handle_add_user_create_reseller_admin(self):
|
|
auth_server.http_connect = fake_http_connect(201)
|
|
self.controller.create_user('act', 'usr', 'pas')
|
|
self.controller.create_user('act', 'actadmin', 'pas', admin=True)
|
|
self.controller.create_user('act', 'resadmin', 'pas',
|
|
reseller_admin=True)
|
|
|
|
req = Request.blank('/account/act/resadmin2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Reseller-Admin': 'true'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/resadmin2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Reseller-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act:usr',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/resadmin2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Reseller-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act:actadmin',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/resadmin2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Reseller-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act:resadmin',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/resadmin2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Reseller-Admin': 'true',
|
|
'X-Auth-Admin-User': '.super_admin',
|
|
'X-Auth-Admin-Key': 'testkey'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 2, resp.status_int)
|
|
|
|
def test_handle_add_user_create_account_admin(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201)
|
|
self.controller.create_user('act', 'usr', 'pas')
|
|
self.controller.create_user('act', 'actadmin', 'pas', admin=True)
|
|
self.controller.create_user('act2', 'actadmin', 'pas', admin=True)
|
|
self.controller.create_user('act2', 'resadmin', 'pas',
|
|
reseller_admin=True)
|
|
|
|
req = Request.blank('/account/act/actadmin2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/actadmin2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act:usr',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/actadmin2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act2:actadmin',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/actadmin2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act:actadmin',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 2, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/actadmin3',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act2:resadmin',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 2, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/actadmin4',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': '.super_admin',
|
|
'X-Auth-Admin-Key': 'testkey'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 2, resp.status_int)
|
|
|
|
def test_handle_add_user_create_normal_user(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201)
|
|
self.controller.create_user('act', 'usr', 'pas')
|
|
self.controller.create_user('act', 'actadmin', 'pas', admin=True)
|
|
self.controller.create_user('act2', 'actadmin', 'pas', admin=True)
|
|
self.controller.create_user('act2', 'resadmin', 'pas',
|
|
reseller_admin=True)
|
|
|
|
req = Request.blank('/account/act/usr2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/usr2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act:usr',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/usr2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act2:actadmin',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/usr2',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act:actadmin',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 2, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/usr3',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act2:resadmin',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 2, resp.status_int)
|
|
|
|
req = Request.blank('/account/act/usr4',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': '.super_admin',
|
|
'X-Auth-Admin-Key': 'testkey'})
|
|
resp = self.controller.handle_add_user(req)
|
|
self.assert_(resp.status_int // 100 == 2, resp.status_int)
|
|
|
|
def test_handle_account_recreate_permissions(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201)
|
|
self.controller.create_user('act', 'usr', 'pas')
|
|
self.controller.create_user('act', 'actadmin', 'pas', admin=True)
|
|
self.controller.create_user('act', 'resadmin', 'pas',
|
|
reseller_admin=True)
|
|
|
|
req = Request.blank('/recreate_accounts',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true'})
|
|
resp = self.controller.handle_account_recreate(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/recreate_accounts',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act:usr',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_account_recreate(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/recreate_accounts',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act:actadmin',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_account_recreate(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/recreate_accounts',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': 'act:resadmin',
|
|
'X-Auth-Admin-Key': 'pas'})
|
|
resp = self.controller.handle_account_recreate(req)
|
|
self.assert_(resp.status_int // 100 == 4, resp.status_int)
|
|
|
|
req = Request.blank('/recreate_accounts',
|
|
headers={'X-Auth-User-Key': 'pas',
|
|
'X-Auth-User-Admin': 'true',
|
|
'X-Auth-Admin-User': '.super_admin',
|
|
'X-Auth-Admin-Key': 'testkey'})
|
|
resp = self.controller.handle_account_recreate(req)
|
|
self.assert_(resp.status_int // 100 == 2, resp.status_int)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|