Merge "s3api: use native strings in s3api.auth_details access_key"

This commit is contained in:
Zuul 2020-01-29 19:10:21 +00:00 committed by Gerrit Code Review
commit 898cb928c7
2 changed files with 37 additions and 9 deletions

View File

@ -253,15 +253,16 @@ class SigV4Mixin(object):
self.params.get('X-Amz-Algorithm'))
try:
cred_param = self._parse_credential(
self.params['X-Amz-Credential'])
sig = self.params['X-Amz-Signature']
swob.wsgi_to_str(self.params['X-Amz-Credential']))
sig = swob.wsgi_to_str(self.params['X-Amz-Signature'])
if not sig:
raise AccessDenied()
except KeyError:
raise AccessDenied()
try:
signed_headers = self.params['X-Amz-SignedHeaders']
signed_headers = swob.wsgi_to_str(
self.params['X-Amz-SignedHeaders'])
except KeyError:
# TODO: make sure if is it malformed request?
raise AuthorizationHeaderMalformed()
@ -298,7 +299,7 @@ class SigV4Mixin(object):
:raises: AuthorizationHeaderMalformed
"""
auth_str = self.headers['Authorization']
auth_str = swob.wsgi_to_str(self.headers['Authorization'])
cred_param = self._parse_credential(auth_str.partition(
"Credential=")[2].split(',')[0])
sig = auth_str.partition("Signature=")[2].split(',')[0]
@ -371,7 +372,7 @@ class SigV4Mixin(object):
headers_to_sign = [
(key, value) for key, value in sorted(headers_lower_dict.items())
if key in self._signed_headers]
if swob.wsgi_to_str(key) in self._signed_headers]
if len(headers_to_sign) != len(self._signed_headers):
# NOTE: if we are missing the header suggested via
@ -646,9 +647,9 @@ class S3Request(swob.Request):
:raises: AccessDenied
"""
try:
access = self.params['AWSAccessKeyId']
expires = self.params['Expires']
sig = self.params['Signature']
access = swob.wsgi_to_str(self.params['AWSAccessKeyId'])
expires = swob.wsgi_to_str(self.params['Expires'])
sig = swob.wsgi_to_str(self.params['Signature'])
except KeyError:
raise AccessDenied()
@ -664,7 +665,7 @@ class S3Request(swob.Request):
:returns: a tuple of access_key and signature
:raises: AccessDenied
"""
auth_str = self.headers['Authorization']
auth_str = swob.wsgi_to_str(self.headers['Authorization'])
if not auth_str.startswith('AWS ') or ':' not in auth_str:
raise AccessDenied()
# This means signature format V2

View File

@ -490,6 +490,33 @@ class TestS3ApiMiddleware(S3ApiTestCase):
b'/bucket/object?partNumber=1&uploadId=123456789abcdef']),
'check_signature': mock_cs})
def test_non_ascii_user(self):
self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/'
'object/123456789abcdef',
swob.HTTPOk, {}, None)
self.swift.register('PUT', '/v1/AUTH_test/bucket+segments/'
'object/123456789abcdef/1',
swob.HTTPCreated, {}, None)
req = Request.blank('/bucket/object?uploadId=123456789abcdef'
'&partNumber=1',
environ={'REQUEST_METHOD': 'PUT'})
# NB: WSGI string for a snowman
req.headers['Authorization'] = 'AWS test:\xe2\x98\x83:sig'
date_header = self.get_date_header()
req.headers['Date'] = date_header
with mock.patch('swift.common.middleware.s3api.s3request.'
'S3Request.check_signature') as mock_cs:
status, headers, body = self.call_s3api(req)
_, _, headers = self.swift.calls_with_headers[-1]
self.assertEqual(req.environ['s3api.auth_details'], {
'access_key': (u'test:\N{SNOWMAN}'.encode('utf-8') if six.PY2
else u'test:\N{SNOWMAN}'),
'signature': 'sig',
'string_to_sign': b'\n'.join([
b'PUT', b'', b'', date_header.encode('ascii'),
b'/bucket/object?partNumber=1&uploadId=123456789abcdef']),
'check_signature': mock_cs})
def test_invalid_uri(self):
req = Request.blank('/bucket/invalid\xffname',
environ={'REQUEST_METHOD': 'GET'},