py3: port tempauth

Note that the user DB is stored in-memory as native strings, so we do
some crazy-decoding to make comparisons for auth decisions. Seems to
keep the config handling mostly sane, though.

I maybe need to look harder at container ACLs?

Change-Id: Ia58698c9b30d2211eeee8ecb3bbdd1c26fa4034d
This commit is contained in:
Tim Burke 2018-09-17 15:22:55 -07:00
parent 16fe18ae3b
commit c90d34bd02
4 changed files with 40 additions and 26 deletions

View File

@ -182,7 +182,7 @@ import base64
from eventlet import Timeout
import six
from swift.common.swob import Response, Request
from swift.common.swob import Response, Request, wsgi_to_str
from swift.common.swob import HTTPBadRequest, HTTPForbidden, HTTPNotFound, \
HTTPUnauthorized
@ -239,6 +239,9 @@ class TempAuth(object):
account = base64.b64decode(account)
username += '=' * (len(username) % 4)
username = base64.b64decode(username)
if not six.PY2:
account = account.decode('utf8')
username = username.decode('utf8')
values = conf[conf_key].split()
if not values:
raise ValueError('%s has no key set' % conf_key)
@ -438,7 +441,7 @@ class TempAuth(object):
expires, groups = cached_auth_data
if expires < time():
groups = None
else:
elif six.PY2:
groups = groups.encode('utf8')
s3_auth_details = env.get('s3api.auth_details') or\
@ -498,7 +501,7 @@ class TempAuth(object):
or None if there are no errors.
"""
acl_header = 'x-account-access-control'
acl_data = req.headers.get(acl_header)
acl_data = wsgi_to_str(req.headers.get(acl_header))
result = parse_acl(version=2, data=acl_data)
if result is None:
return 'Syntax error in input (%r)' % acl_data
@ -509,19 +512,17 @@ class TempAuth(object):
# on ACLs, TempAuth is not such an auth system. At this point,
# it thinks it is authoritative.
if key not in tempauth_acl_keys:
return "Key %s not recognized" % json.dumps(
key).encode('ascii')
return "Key %s not recognized" % json.dumps(key)
for key in tempauth_acl_keys:
if key not in result:
continue
if not isinstance(result[key], list):
return "Value for key %s must be a list" % json.dumps(
key).encode('ascii')
return "Value for key %s must be a list" % json.dumps(key)
for grantee in result[key]:
if not isinstance(grantee, six.string_types):
return "Elements of %s list must be strings" % json.dumps(
key).encode('ascii')
key)
# Everything looks fine, no errors found
internal_hdr = get_sys_meta_prefix('account') + 'core-access-control'
@ -568,7 +569,7 @@ class TempAuth(object):
% account_user)
return None
if account in user_groups and \
if wsgi_to_str(account) in user_groups and \
(req.method not in ('DELETE', 'PUT') or container):
# The user is admin for the account and is not trying to do an
# account DELETE or PUT
@ -744,7 +745,7 @@ class TempAuth(object):
return HTTPUnauthorized(request=req,
headers={'Www-Authenticate': auth})
account2, user = user.split(':', 1)
if account != account2:
if wsgi_to_str(account) != account2:
self.logger.increment('token_denied')
auth = 'Swift realm="%s"' % account
return HTTPUnauthorized(request=req,
@ -800,7 +801,7 @@ class TempAuth(object):
cached_auth_data = memcache_client.get(memcache_token_key)
if cached_auth_data:
expires, old_groups = cached_auth_data
old_groups = [group.encode('utf8')
old_groups = [group.encode('utf8') if six.PY2 else group
for group in old_groups.split(',')]
new_groups = self._get_user_groups(account, account_user,
account_id)

View File

@ -590,7 +590,8 @@ def _get_info_from_memcache(app, env, account, container=None):
memcache = getattr(app, 'memcache', None) or env.get('swift.cache')
if memcache:
info = memcache.get(cache_key)
if info:
if info and six.PY2:
# Get back to native strings
for key in info:
if isinstance(info[key], six.text_type):
info[key] = info[key].encode("utf-8")
@ -598,6 +599,7 @@ def _get_info_from_memcache(app, env, account, container=None):
for subkey, value in info[key].items():
if isinstance(value, six.text_type):
info[key][subkey] = value.encode("utf-8")
if info:
env.setdefault('swift.infocache', {})[cache_key] = info
return info
return None

View File

@ -17,9 +17,10 @@
import json
import unittest
from contextlib import contextmanager
from base64 import b64encode
from base64 import b64encode as _b64encode
from time import time
import six
from six.moves.urllib.parse import quote, urlparse
from swift.common.middleware import tempauth as auth
from swift.common.middleware.acl import format_acl
@ -29,6 +30,12 @@ from swift.common.utils import split_path
NO_CONTENT_RESP = (('204 No Content', {}, ''),) # mock server response
def b64encode(str_or_bytes):
if not isinstance(str_or_bytes, bytes):
str_or_bytes = str_or_bytes.encode('utf8')
return _b64encode(str_or_bytes).decode('ascii')
class FakeMemcache(object):
def __init__(self):
@ -41,7 +48,7 @@ class FakeMemcache(object):
if isinstance(value, (tuple, list)):
decoded = []
for elem in value:
if type(elem) == str:
if isinstance(elem, bytes):
decoded.append(elem.decode('utf8'))
else:
decoded.append(elem)
@ -972,9 +979,11 @@ class TestAuth(unittest.TestCase):
def test_successful_token_unicode_user(self):
app = FakeApp(iter(NO_CONTENT_RESP * 2))
ath = auth.filter_factory(
{u'user_t\u00e9st_t\u00e9ster'.encode('utf8'):
u'p\u00e1ss .admin'.encode('utf8')})(app)
conf = {u'user_t\u00e9st_t\u00e9ster': u'p\u00e1ss .admin'}
if six.PY2:
conf = {k.encode('utf8'): v.encode('utf8')
for k, v in conf.items()}
ath = auth.filter_factory(conf)(app)
quoted_acct = quote(u'/v1/AUTH_t\u00e9st'.encode('utf8'))
memcache = FakeMemcache()
@ -1009,7 +1018,8 @@ class TestAuth(unittest.TestCase):
# ...but it also works if you send the account raw
req = self._make_request(
u'/v1/AUTH_t\u00e9st', headers={'X-Auth-Token': auth_token})
u'/v1/AUTH_t\u00e9st'.encode('utf8'),
headers={'X-Auth-Token': auth_token})
req.environ['swift.cache'] = memcache
resp = req.get_response(ath)
self.assertEqual(204, resp.status_int)
@ -1395,13 +1405,13 @@ class TestAccountAcls(unittest.TestCase):
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 204)
errmsg = 'X-Account-Access-Control invalid: %s'
errmsg = b'X-Account-Access-Control invalid: %s'
# syntactically invalid acls get a 400
update = {'x-account-access-control': bad_acl}
req = self._make_request(target, headers=dict(good_headers, **update))
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 400)
self.assertEqual(errmsg % "Syntax error", resp.body[:46])
self.assertEqual(errmsg % b"Syntax error", resp.body[:46])
# syntactically valid acls with bad keys also get a 400
update = {'x-account-access-control': wrong_acl}
@ -1409,7 +1419,7 @@ class TestAccountAcls(unittest.TestCase):
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 400)
self.assertTrue(resp.body.startswith(
errmsg % 'Key "other-auth-system" not recognized'), resp.body)
errmsg % b'Key "other-auth-system" not recognized'), resp.body)
# and do something sane with crazy data
update = {'x-account-access-control': u'{"\u1234": []}'.encode('utf8')}
@ -1417,7 +1427,7 @@ class TestAccountAcls(unittest.TestCase):
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 400)
self.assertTrue(resp.body.startswith(
errmsg % 'Key "\\u1234" not recognized'), resp.body)
errmsg % b'Key "\\u1234" not recognized'), resp.body)
# acls with good keys but bad values also get a 400
update = {'x-account-access-control': bad_value_acl}
@ -1425,7 +1435,7 @@ class TestAccountAcls(unittest.TestCase):
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 400)
self.assertTrue(resp.body.startswith(
errmsg % 'Value for key "admin" must be a list'), resp.body)
errmsg % b'Value for key "admin" must be a list'), resp.body)
# acls with non-string-types in list also get a 400
update = {'x-account-access-control': bad_list_types}
@ -1433,7 +1443,7 @@ class TestAccountAcls(unittest.TestCase):
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 400)
self.assertTrue(resp.body.startswith(
errmsg % 'Elements of "read-only" list must be strings'),
errmsg % b'Elements of "read-only" list must be strings'),
resp.body)
# acls with wrong json structure also get a 400
@ -1441,14 +1451,14 @@ class TestAccountAcls(unittest.TestCase):
req = self._make_request(target, headers=dict(good_headers, **update))
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 400)
self.assertEqual(errmsg % "Syntax error", resp.body[:46])
self.assertEqual(errmsg % b"Syntax error", resp.body[:46])
# acls with wrong json structure also get a 400
update = {'x-account-access-control': not_dict_acl2}
req = self._make_request(target, headers=dict(good_headers, **update))
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 400)
self.assertEqual(errmsg % "Syntax error", resp.body[:46])
self.assertEqual(errmsg % b"Syntax error", resp.body[:46])
def test_acls_propagate_to_sysmeta(self):
test_auth = auth.filter_factory({'user_admin_user': 'testing'})(

View File

@ -44,6 +44,7 @@ commands =
test/unit/common/middleware/test_list_endpoints.py \
test/unit/common/middleware/test_listing_formats.py \
test/unit/common/middleware/test_proxy_logging.py \
test/unit/common/middleware/test_tempauth.py \
test/unit/common/ring \
test/unit/common/test_base_storage_server.py \
test/unit/common/test_bufferedhttp.py \