Referrers now support user:pass part of URLs; rest of tests for what has changed
This commit is contained in:
parent
d0367fdf19
commit
85b8d97086
@ -139,7 +139,10 @@ def referrer_allowed(referrer, referrer_acl):
|
||||
else:
|
||||
parts = referrer.split('//', 1)
|
||||
if len(parts) == 2:
|
||||
rhost = parts[1].split('/', 1)[0].split(':', 1)[0].lower()
|
||||
rhost = parts[1].split('/', 1)[0]
|
||||
if '@' in rhost:
|
||||
rhost = rhost.rsplit('@', 1)[1]
|
||||
rhost = rhost.split(':', 1)[0].lower()
|
||||
else:
|
||||
rhost = 'unknown'
|
||||
for mhost in referrer_acl:
|
||||
|
@ -21,10 +21,11 @@ from StringIO import StringIO
|
||||
from uuid import uuid4
|
||||
from logging import StreamHandler
|
||||
|
||||
import sqlite3
|
||||
from webob import Request
|
||||
|
||||
from swift.auth import server as auth_server
|
||||
from swift.common.db import DatabaseConnectionError
|
||||
from swift.common.db import DatabaseConnectionError, get_db_connection
|
||||
from swift.common.utils import get_logger
|
||||
|
||||
|
||||
@ -576,6 +577,58 @@ class TestAuthServer(unittest.TestCase):
|
||||
auth_server.Request = orig_Request
|
||||
logger.logger.handlers.remove(log_handler)
|
||||
|
||||
def test_upgrading_from_db1(self):
|
||||
swift_dir = '/tmp/swift_test_auth_%s' % uuid4().hex
|
||||
os.mkdir(swift_dir)
|
||||
try:
|
||||
# Create db1
|
||||
db_file = os.path.join(swift_dir, 'auth.db')
|
||||
conn = get_db_connection(db_file, okay_to_create=True)
|
||||
conn.execute('''CREATE TABLE IF NOT EXISTS account (
|
||||
account TEXT, url TEXT, cfaccount TEXT,
|
||||
user TEXT, password TEXT)''')
|
||||
conn.execute('''CREATE INDEX IF NOT EXISTS ix_account_account
|
||||
ON account (account)''')
|
||||
conn.execute('''CREATE TABLE IF NOT EXISTS token (
|
||||
cfaccount TEXT, token TEXT, created FLOAT)''')
|
||||
conn.execute('''CREATE INDEX IF NOT EXISTS ix_token_cfaccount
|
||||
ON token (cfaccount)''')
|
||||
conn.execute('''CREATE INDEX IF NOT EXISTS ix_token_created
|
||||
ON token (created)''')
|
||||
conn.execute('''INSERT INTO account
|
||||
(account, url, cfaccount, user, password)
|
||||
VALUES ('act', 'url', 'cfa', 'usr', 'pas')''')
|
||||
conn.execute('''INSERT INTO token (cfaccount, token, created)
|
||||
VALUES ('cfa', 'tok', '1')''')
|
||||
conn.commit()
|
||||
conn.close()
|
||||
# Upgrade to current db
|
||||
conf = {'swift_dir': swift_dir}
|
||||
controller = auth_server.AuthController(conf, FakeRing())
|
||||
# Check new items exist and are correct
|
||||
conn = get_db_connection(db_file)
|
||||
row = conn.execute('SELECT admin FROM account').fetchone()
|
||||
self.assertEquals(row[0], 't')
|
||||
row = conn.execute('SELECT user FROM token').fetchone()
|
||||
self.assert_(not row)
|
||||
finally:
|
||||
rmtree(swift_dir)
|
||||
|
||||
def test_create_user_twice(self):
|
||||
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
||||
self.controller.create_user('test', 'tester', 'testing')
|
||||
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
||||
self.assertEquals(
|
||||
self.controller.create_user('test', 'tester', 'testing'),
|
||||
'already exists')
|
||||
|
||||
def test_create_2users_1account(self):
|
||||
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
||||
url = self.controller.create_user('test', 'tester', 'testing')
|
||||
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
||||
url2 = self.controller.create_user('test', 'tester2', 'testing2')
|
||||
self.assertEquals(url, url2)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -13,95 +13,12 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import with_statement
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
from contextlib import contextmanager
|
||||
|
||||
import eventlet
|
||||
from webob import Request
|
||||
|
||||
from swift.common.middleware import acl
|
||||
|
||||
# mocks
|
||||
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
|
||||
|
||||
|
||||
class FakeMemcache(object):
|
||||
def __init__(self):
|
||||
self.store = {}
|
||||
|
||||
def get(self, key):
|
||||
return self.store.get(key)
|
||||
|
||||
def set(self, key, value, timeout=0):
|
||||
self.store[key] = value
|
||||
return True
|
||||
|
||||
def incr(self, key, timeout=0):
|
||||
self.store[key] = self.store.setdefault(key, 0) + 1
|
||||
return self.store[key]
|
||||
|
||||
@contextmanager
|
||||
def soft_lock(self, key, timeout=0, retries=5):
|
||||
yield True
|
||||
|
||||
def delete(self, key):
|
||||
try:
|
||||
del self.store[key]
|
||||
except:
|
||||
pass
|
||||
return True
|
||||
|
||||
|
||||
def mock_http_connect(response, headers=None, with_exc=False):
|
||||
class FakeConn(object):
|
||||
def __init__(self, status, headers, with_exc):
|
||||
self.status = status
|
||||
self.reason = 'Fake'
|
||||
self.host = '1.2.3.4'
|
||||
self.port = '1234'
|
||||
self.with_exc = with_exc
|
||||
self.headers = headers
|
||||
if self.headers is None:
|
||||
self.headers = {}
|
||||
def getresponse(self):
|
||||
if self.with_exc:
|
||||
raise Exception('test')
|
||||
return self
|
||||
def getheader(self, header):
|
||||
return self.headers[header]
|
||||
def read(self, amt=None):
|
||||
return ''
|
||||
def close(self):
|
||||
return
|
||||
return lambda *args, **kwargs: FakeConn(response, headers, with_exc)
|
||||
|
||||
|
||||
class Logger(object):
|
||||
def __init__(self):
|
||||
self.error_value = None
|
||||
self.exception_value = None
|
||||
def error(self, msg, *args, **kwargs):
|
||||
self.error_value = (msg, args, kwargs)
|
||||
def exception(self, msg, *args, **kwargs):
|
||||
_, exc, _ = sys.exc_info()
|
||||
self.exception_value = (msg,
|
||||
'%s %s' % (exc.__class__.__name__, str(exc)), args, kwargs)
|
||||
# tests
|
||||
|
||||
class FakeApp(object):
|
||||
def __call__(self, env, start_response):
|
||||
return "OK"
|
||||
|
||||
def start_response(*args):
|
||||
pass
|
||||
|
||||
class TestAuth(unittest.TestCase):
|
||||
# I brought these over from another refactor I've been trying, but they
|
||||
# need work.
|
||||
class TestACL(unittest.TestCase):
|
||||
|
||||
def test_clean_acl(self):
|
||||
value = acl.clean_acl('header', '.ref:any')
|
||||
@ -118,6 +35,9 @@ class TestAuth(unittest.TestCase):
|
||||
self.assertEquals(value, '.ref:any,.ref:-.ending.with')
|
||||
value = acl.clean_acl('header', '.ref:one,.ref:-two')
|
||||
self.assertEquals(value, '.ref:one,.ref:-two')
|
||||
value = acl.clean_acl('header',
|
||||
'.ref:one,.ref:-two,account,account:user')
|
||||
self.assertEquals(value, '.ref:one,.ref:-two,account,account:user')
|
||||
value = acl.clean_acl('header',
|
||||
' .ref : one , ,, .ref:two , .ref : - three ')
|
||||
self.assertEquals(value, '.ref:one,.ref:two,.ref:-three')
|
||||
@ -129,6 +49,7 @@ class TestAuth(unittest.TestCase):
|
||||
self.assertRaises(ValueError, acl.clean_acl, 'header', ' .ref : - ')
|
||||
self.assertRaises(ValueError, acl.clean_acl, 'header',
|
||||
'user , .ref : - ')
|
||||
self.assertRaises(ValueError, acl.clean_acl, 'write-header', '.ref:r')
|
||||
|
||||
def test_parse_acl(self):
|
||||
self.assertEquals(acl.parse_acl(None), ([], []))
|
||||
@ -145,6 +66,46 @@ class TestAuth(unittest.TestCase):
|
||||
(['ref3', 'ref5', '-ref6'],
|
||||
['acc1', 'acc2:usr2', 'acc3', 'acc4:usr4']))
|
||||
|
||||
def test_referrer_allowed(self):
|
||||
self.assert_(not acl.referrer_allowed('host', None))
|
||||
self.assert_(not acl.referrer_allowed('host', []))
|
||||
self.assert_(acl.referrer_allowed(None, ['any']))
|
||||
self.assert_(acl.referrer_allowed('', ['any']))
|
||||
self.assert_(not acl.referrer_allowed(None, ['specific.host']))
|
||||
self.assert_(not acl.referrer_allowed('', ['specific.host']))
|
||||
self.assert_(acl.referrer_allowed('http://www.example.com/index.html',
|
||||
['.example.com']))
|
||||
self.assert_(acl.referrer_allowed(
|
||||
'http://user@www.example.com/index.html', ['.example.com']))
|
||||
self.assert_(acl.referrer_allowed(
|
||||
'http://user:pass@www.example.com/index.html', ['.example.com']))
|
||||
self.assert_(acl.referrer_allowed(
|
||||
'http://www.example.com:8080/index.html', ['.example.com']))
|
||||
self.assert_(acl.referrer_allowed(
|
||||
'http://user@www.example.com:8080/index.html', ['.example.com']))
|
||||
self.assert_(acl.referrer_allowed(
|
||||
'http://user:pass@www.example.com:8080/index.html',
|
||||
['.example.com']))
|
||||
self.assert_(acl.referrer_allowed(
|
||||
'http://user:pass@www.example.com:8080', ['.example.com']))
|
||||
self.assert_(acl.referrer_allowed('http://www.example.com',
|
||||
['.example.com']))
|
||||
self.assert_(not acl.referrer_allowed('http://thief.example.com',
|
||||
['.example.com', '-thief.example.com']))
|
||||
self.assert_(not acl.referrer_allowed('http://thief.example.com',
|
||||
['any', '-thief.example.com']))
|
||||
self.assert_(acl.referrer_allowed('http://www.example.com',
|
||||
['.other.com', 'www.example.com']))
|
||||
self.assert_(acl.referrer_allowed('http://www.example.com',
|
||||
['-.example.com', 'www.example.com']))
|
||||
# This is considered a relative uri to the request uri, a mode not
|
||||
# currently supported.
|
||||
self.assert_(not acl.referrer_allowed('www.example.com',
|
||||
['.example.com']))
|
||||
self.assert_(not acl.referrer_allowed('../index.html',
|
||||
['.example.com']))
|
||||
self.assert_(acl.referrer_allowed('www.example.com', ['any']))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -94,14 +94,191 @@ class Logger(object):
|
||||
|
||||
class FakeApp(object):
|
||||
def __call__(self, env, start_response):
|
||||
return "OK"
|
||||
return ['204 No Content']
|
||||
|
||||
def start_response(*args):
|
||||
pass
|
||||
|
||||
class TestAuth(unittest.TestCase):
|
||||
# TODO: With the auth refactor, these tests have to be refactored as well.
|
||||
pass
|
||||
|
||||
def setUp(self):
|
||||
self.test_auth = auth.filter_factory({})(FakeApp())
|
||||
|
||||
def test_auth_fail(self):
|
||||
old_http_connect = auth.http_connect
|
||||
try:
|
||||
auth.http_connect = mock_http_connect(404)
|
||||
result = ''.join(self.test_auth({'REQUEST_METHOD': 'GET',
|
||||
'HTTP_X_AUTH_TOKEN': 't', 'swift.cache': FakeMemcache()},
|
||||
lambda x, y: None))
|
||||
self.assert_(result.startswith('401'), result)
|
||||
finally:
|
||||
auth.http_connect = old_http_connect
|
||||
|
||||
def test_auth_success(self):
|
||||
old_http_connect = auth.http_connect
|
||||
try:
|
||||
auth.http_connect = mock_http_connect(204,
|
||||
{'x-auth-ttl': '1234', 'x-auth-groups': 'act:usr,act,cfa'})
|
||||
result = ''.join(self.test_auth({'REQUEST_METHOD': 'GET',
|
||||
'HTTP_X_AUTH_TOKEN': 't', 'swift.cache': FakeMemcache()},
|
||||
lambda x, y: None))
|
||||
self.assert_(result.startswith('204'), result)
|
||||
finally:
|
||||
auth.http_connect = old_http_connect
|
||||
|
||||
def test_auth_memcache(self):
|
||||
old_http_connect = auth.http_connect
|
||||
try:
|
||||
fake_memcache = FakeMemcache()
|
||||
auth.http_connect = mock_http_connect(204,
|
||||
{'x-auth-ttl': '1234', 'x-auth-groups': 'act:usr,act,cfa'})
|
||||
result = ''.join(self.test_auth({'REQUEST_METHOD': 'GET',
|
||||
'HTTP_X_AUTH_TOKEN': 't', 'swift.cache': fake_memcache},
|
||||
lambda x, y: None))
|
||||
self.assert_(result.startswith('204'), result)
|
||||
auth.http_connect = mock_http_connect(404)
|
||||
# Should still be in memcache
|
||||
result = ''.join(self.test_auth({'REQUEST_METHOD': 'GET',
|
||||
'HTTP_X_AUTH_TOKEN': 't', 'swift.cache': fake_memcache},
|
||||
lambda x, y: None))
|
||||
self.assert_(result.startswith('204'), result)
|
||||
finally:
|
||||
auth.http_connect = old_http_connect
|
||||
|
||||
def test_auth_just_expired(self):
|
||||
old_http_connect = auth.http_connect
|
||||
try:
|
||||
fake_memcache = FakeMemcache()
|
||||
auth.http_connect = mock_http_connect(204,
|
||||
{'x-auth-ttl': '0', 'x-auth-groups': 'act:usr,act,cfa'})
|
||||
result = ''.join(self.test_auth({'REQUEST_METHOD': 'GET',
|
||||
'HTTP_X_AUTH_TOKEN': 't', 'swift.cache': fake_memcache},
|
||||
lambda x, y: None))
|
||||
self.assert_(result.startswith('204'), result)
|
||||
auth.http_connect = mock_http_connect(404)
|
||||
# Should still be in memcache, but expired
|
||||
result = ''.join(self.test_auth({'REQUEST_METHOD': 'GET',
|
||||
'HTTP_X_AUTH_TOKEN': 't', 'swift.cache': fake_memcache},
|
||||
lambda x, y: None))
|
||||
self.assert_(result.startswith('401'), result)
|
||||
finally:
|
||||
auth.http_connect = old_http_connect
|
||||
|
||||
def test_middleware_success(self):
|
||||
old_http_connect = auth.http_connect
|
||||
try:
|
||||
auth.http_connect = mock_http_connect(204,
|
||||
{'x-auth-ttl': '1234', 'x-auth-groups': 'act:usr,act,cfa'})
|
||||
req = Request.blank('/v/a/c/o', headers={'x-auth-token': 't'})
|
||||
req.environ['swift.cache'] = FakeMemcache()
|
||||
result = ''.join(self.test_auth(req.environ, start_response))
|
||||
self.assert_(result.startswith('204'), result)
|
||||
self.assertEquals(req.remote_user, 'act:usr,act,cfa')
|
||||
finally:
|
||||
auth.http_connect = old_http_connect
|
||||
|
||||
def test_middleware_no_header(self):
|
||||
old_http_connect = auth.http_connect
|
||||
try:
|
||||
auth.http_connect = mock_http_connect(204,
|
||||
{'x-auth-ttl': '1234', 'x-auth-groups': 'act:usr,act,cfa'})
|
||||
req = Request.blank('/v/a/c/o')
|
||||
req.environ['swift.cache'] = FakeMemcache()
|
||||
result = ''.join(self.test_auth(req.environ, start_response))
|
||||
self.assert_(result.startswith('204'), result)
|
||||
self.assert_(not req.remote_user, req.remote_user)
|
||||
finally:
|
||||
auth.http_connect = old_http_connect
|
||||
|
||||
def test_middleware_storage_token(self):
|
||||
old_http_connect = auth.http_connect
|
||||
try:
|
||||
auth.http_connect = mock_http_connect(204,
|
||||
{'x-auth-ttl': '1234', 'x-auth-groups': 'act:usr,act,cfa'})
|
||||
req = Request.blank('/v/a/c/o', headers={'x-storage-token': 't'})
|
||||
req.environ['swift.cache'] = FakeMemcache()
|
||||
result = ''.join(self.test_auth(req.environ, start_response))
|
||||
self.assert_(result.startswith('204'), result)
|
||||
self.assertEquals(req.remote_user, 'act:usr,act,cfa')
|
||||
finally:
|
||||
auth.http_connect = old_http_connect
|
||||
|
||||
def test_authorize_bad_path(self):
|
||||
req = Request.blank('/badpath')
|
||||
resp = str(self.test_auth.authorize(req))
|
||||
self.assert_(resp.startswith('401'), resp)
|
||||
req = Request.blank('/badpath')
|
||||
req.remote_user = 'act:usr,act,cfa'
|
||||
resp = str(self.test_auth.authorize(req))
|
||||
self.assert_(resp.startswith('403'), resp)
|
||||
|
||||
def test_authorize_account_access(self):
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.remote_user = 'act:usr,act,cfa'
|
||||
self.assertEquals(self.test_auth.authorize(req), None)
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.remote_user = 'act:usr,act'
|
||||
resp = str(self.test_auth.authorize(req))
|
||||
self.assert_(resp.startswith('403'), resp)
|
||||
|
||||
def test_authorize_acl_group_access(self):
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.remote_user = 'act:usr,act'
|
||||
resp = str(self.test_auth.authorize(req))
|
||||
self.assert_(resp.startswith('403'), resp)
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.remote_user = 'act:usr,act'
|
||||
req.acl = 'act'
|
||||
self.assertEquals(self.test_auth.authorize(req), None)
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.remote_user = 'act:usr,act'
|
||||
req.acl = 'act:usr'
|
||||
self.assertEquals(self.test_auth.authorize(req), None)
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.remote_user = 'act:usr,act'
|
||||
req.acl = 'act2'
|
||||
resp = str(self.test_auth.authorize(req))
|
||||
self.assert_(resp.startswith('403'), resp)
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.remote_user = 'act:usr,act'
|
||||
req.acl = 'act:usr2'
|
||||
resp = str(self.test_auth.authorize(req))
|
||||
self.assert_(resp.startswith('403'), resp)
|
||||
|
||||
def test_authorize_acl_referrer_access(self):
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.remote_user = 'act:usr,act'
|
||||
resp = str(self.test_auth.authorize(req))
|
||||
self.assert_(resp.startswith('403'), resp)
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.remote_user = 'act:usr,act'
|
||||
req.acl = '.ref:any'
|
||||
self.assertEquals(self.test_auth.authorize(req), None)
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.remote_user = 'act:usr,act'
|
||||
req.acl = '.ref:.example.com'
|
||||
resp = str(self.test_auth.authorize(req))
|
||||
self.assert_(resp.startswith('403'), resp)
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.remote_user = 'act:usr,act'
|
||||
req.referrer = 'http://www.example.com/index.html'
|
||||
req.acl = '.ref:.example.com'
|
||||
self.assertEquals(self.test_auth.authorize(req), None)
|
||||
req = Request.blank('/v1/cfa')
|
||||
resp = str(self.test_auth.authorize(req))
|
||||
self.assert_(resp.startswith('401'), resp)
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.acl = '.ref:any'
|
||||
self.assertEquals(self.test_auth.authorize(req), None)
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.acl = '.ref:.example.com'
|
||||
resp = str(self.test_auth.authorize(req))
|
||||
self.assert_(resp.startswith('401'), resp)
|
||||
req = Request.blank('/v1/cfa')
|
||||
req.referrer = 'http://www.example.com/index.html'
|
||||
req.acl = '.ref:.example.com'
|
||||
self.assertEquals(self.test_auth.authorize(req), None)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -34,6 +34,7 @@ from eventlet import sleep, spawn, TimeoutError, util, wsgi, listen
|
||||
from eventlet.timeout import Timeout
|
||||
import simplejson
|
||||
from webob import Request
|
||||
from webob.exc import HTTPUnauthorized
|
||||
|
||||
from test.unit import connect_tcp, readuntil2crlfs
|
||||
from swift.proxy import server as proxy_server
|
||||
@ -208,6 +209,35 @@ class TestProxyServer(unittest.TestCase):
|
||||
resp = app.handle_request(req)
|
||||
self.assertEquals(resp.status_int, 500)
|
||||
|
||||
def test_calls_authorize_allow(self):
|
||||
called = [False]
|
||||
def authorize(req):
|
||||
called[0] = True
|
||||
with save_globals():
|
||||
proxy_server.http_connect = fake_http_connect(200)
|
||||
app = proxy_server.Application(None, FakeMemcache(),
|
||||
account_ring=FakeRing(), container_ring=FakeRing(),
|
||||
object_ring=FakeRing())
|
||||
req = Request.blank('/v1/a')
|
||||
req.environ['swift.authorize'] = authorize
|
||||
app.update_request(req)
|
||||
resp = app.handle_request(req)
|
||||
self.assert_(called[0])
|
||||
|
||||
def test_calls_authorize_deny(self):
|
||||
called = [False]
|
||||
def authorize(req):
|
||||
called[0] = True
|
||||
return HTTPUnauthorized(request=req)
|
||||
app = proxy_server.Application(None, FakeMemcache(),
|
||||
account_ring=FakeRing(), container_ring=FakeRing(),
|
||||
object_ring=FakeRing())
|
||||
req = Request.blank('/v1/a')
|
||||
req.environ['swift.authorize'] = authorize
|
||||
app.update_request(req)
|
||||
resp = app.handle_request(req)
|
||||
self.assert_(called[0])
|
||||
|
||||
|
||||
class TestObjectController(unittest.TestCase):
|
||||
|
||||
@ -1510,6 +1540,73 @@ class TestObjectController(unittest.TestCase):
|
||||
finally:
|
||||
self.app.object_chunk_size = orig_object_chunk_size
|
||||
|
||||
def test_GET_calls_authorize(self):
|
||||
called = [False]
|
||||
def authorize(req):
|
||||
called[0] = True
|
||||
return HTTPUnauthorized(request=req)
|
||||
with save_globals():
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 200, 201, 201, 201)
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
req = Request.blank('/a/c/o')
|
||||
req.environ['swift.authorize'] = authorize
|
||||
self.app.update_request(req)
|
||||
res = controller.GET(req)
|
||||
self.assert_(called[0])
|
||||
|
||||
def test_HEAD_calls_authorize(self):
|
||||
called = [False]
|
||||
def authorize(req):
|
||||
called[0] = True
|
||||
return HTTPUnauthorized(request=req)
|
||||
with save_globals():
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 200, 201, 201, 201)
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
req = Request.blank('/a/c/o', {'REQUEST_METHOD': 'HEAD'})
|
||||
req.environ['swift.authorize'] = authorize
|
||||
self.app.update_request(req)
|
||||
res = controller.HEAD(req)
|
||||
self.assert_(called[0])
|
||||
|
||||
def test_POST_calls_authorize(self):
|
||||
called = [False]
|
||||
def authorize(req):
|
||||
called[0] = True
|
||||
return HTTPUnauthorized(request=req)
|
||||
with save_globals():
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 200, 201, 201, 201)
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Content-Length': '5'}, body='12345')
|
||||
req.environ['swift.authorize'] = authorize
|
||||
self.app.update_request(req)
|
||||
res = controller.POST(req)
|
||||
self.assert_(called[0])
|
||||
|
||||
def test_PUT_calls_authorize(self):
|
||||
called = [False]
|
||||
def authorize(req):
|
||||
called[0] = True
|
||||
return HTTPUnauthorized(request=req)
|
||||
with save_globals():
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 200, 201, 201, 201)
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Length': '5'}, body='12345')
|
||||
req.environ['swift.authorize'] = authorize
|
||||
self.app.update_request(req)
|
||||
res = controller.PUT(req)
|
||||
self.assert_(called[0])
|
||||
|
||||
|
||||
|
||||
class TestContainerController(unittest.TestCase):
|
||||
"Test swift.proxy_server.ContainerController"
|
||||
@ -1855,6 +1952,92 @@ class TestContainerController(unittest.TestCase):
|
||||
resp = getattr(controller, method)(req)
|
||||
self.assertEquals(resp.status_int, 400)
|
||||
|
||||
def test_POST_calls_clean_acl(self):
|
||||
called = [False]
|
||||
def clean_acl(header, value):
|
||||
called[0] = True
|
||||
raise ValueError('fake error')
|
||||
with save_globals():
|
||||
proxy_server.http_connect = fake_http_connect(200, 201, 201, 201)
|
||||
controller = proxy_server.ContainerController(self.app, 'account',
|
||||
'container')
|
||||
req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Container-Read': '.ref:any'})
|
||||
req.environ['swift.clean_acl'] = clean_acl
|
||||
self.app.update_request(req)
|
||||
res = controller.POST(req)
|
||||
self.assert_(called[0])
|
||||
called[0] = False
|
||||
with save_globals():
|
||||
proxy_server.http_connect = fake_http_connect(200, 201, 201, 201)
|
||||
controller = proxy_server.ContainerController(self.app, 'account',
|
||||
'container')
|
||||
req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Container-Write': '.ref:any'})
|
||||
req.environ['swift.clean_acl'] = clean_acl
|
||||
self.app.update_request(req)
|
||||
res = controller.POST(req)
|
||||
self.assert_(called[0])
|
||||
|
||||
def test_PUT_calls_clean_acl(self):
|
||||
called = [False]
|
||||
def clean_acl(header, value):
|
||||
called[0] = True
|
||||
raise ValueError('fake error')
|
||||
with save_globals():
|
||||
proxy_server.http_connect = fake_http_connect(200, 201, 201, 201)
|
||||
controller = proxy_server.ContainerController(self.app, 'account',
|
||||
'container')
|
||||
req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Container-Read': '.ref:any'})
|
||||
req.environ['swift.clean_acl'] = clean_acl
|
||||
self.app.update_request(req)
|
||||
res = controller.PUT(req)
|
||||
self.assert_(called[0])
|
||||
called[0] = False
|
||||
with save_globals():
|
||||
proxy_server.http_connect = fake_http_connect(200, 201, 201, 201)
|
||||
controller = proxy_server.ContainerController(self.app, 'account',
|
||||
'container')
|
||||
req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Container-Write': '.ref:any'})
|
||||
req.environ['swift.clean_acl'] = clean_acl
|
||||
self.app.update_request(req)
|
||||
res = controller.PUT(req)
|
||||
self.assert_(called[0])
|
||||
|
||||
def test_GET_calls_authorize(self):
|
||||
called = [False]
|
||||
def authorize(req):
|
||||
called[0] = True
|
||||
return HTTPUnauthorized(request=req)
|
||||
with save_globals():
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 201, 201, 201)
|
||||
controller = proxy_server.ContainerController(self.app, 'account',
|
||||
'container')
|
||||
req = Request.blank('/a/c')
|
||||
req.environ['swift.authorize'] = authorize
|
||||
self.app.update_request(req)
|
||||
res = controller.GET(req)
|
||||
self.assert_(called[0])
|
||||
|
||||
def test_HEAD_calls_authorize(self):
|
||||
called = [False]
|
||||
def authorize(req):
|
||||
called[0] = True
|
||||
return HTTPUnauthorized(request=req)
|
||||
with save_globals():
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 201, 201, 201)
|
||||
controller = proxy_server.ContainerController(self.app, 'account',
|
||||
'container')
|
||||
req = Request.blank('/a/c', {'REQUEST_METHOD': 'HEAD'})
|
||||
req.environ['swift.authorize'] = authorize
|
||||
self.app.update_request(req)
|
||||
res = controller.HEAD(req)
|
||||
self.assert_(called[0])
|
||||
|
||||
|
||||
class TestAccountController(unittest.TestCase):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user