s3api: Make allowable clock skew configurable

While we're at it, make the default match AWS's 15 minute limit (instead
of our old 5 minute limit).

UpgradeImpact
=============
This (somewhat) weakens some security protections for requests over the
S3 API; operators may want to preserve the prior behavior by setting

    allowable_clock_skew = 300

in the [filter:s3api] section of their proxy-server.conf

Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
Change-Id: I0da777fcccf056e537b48af4d3277835b265d5c9
This commit is contained in:
Tim Burke 2021-01-08 14:08:08 -08:00 committed by Alistair Coles
parent 83233e7b36
commit 10d9a737d8
7 changed files with 110 additions and 4 deletions

View File

@ -613,6 +613,10 @@ use = egg:swift#s3api
# AWS S3 document says that each part must be at least 5 MB in a multipart
# upload, except the last part.
# min_segment_size = 5242880
#
# AWS allows clock skew up to 15 mins; note that older versions of swift/swift3
# allowed at most 5 mins.
# allowable_clock_skew = 900
# You can override the default log routing for this filter here:
# log_name = s3api

View File

@ -275,6 +275,8 @@ class S3ApiMiddleware(object):
conf.get('allow_multipart_uploads', True))
self.conf.min_segment_size = config_positive_int_value(
conf.get('min_segment_size', 5242880))
self.conf.allowable_clock_skew = config_positive_int_value(
conf.get('allowable_clock_skew', 15 * 60))
self.logger = get_logger(
conf, log_route=conf.get('log_name', 's3api'))

View File

@ -731,8 +731,8 @@ class S3Request(swob.Request):
# If the standard date is too far ahead or behind, it is an
# error
delta = 60 * 5
if abs(int(self.timestamp) - int(S3Timestamp.now())) > delta:
delta = abs(int(self.timestamp) - int(S3Timestamp.now()))
if delta > self.conf.allowable_clock_skew:
raise RequestTimeTooSkewed()
def _validate_headers(self):

View File

@ -160,6 +160,7 @@ class Config(dict):
'dns_compliant_bucket_names': True,
'allow_multipart_uploads': True,
'allow_no_owner': False,
'allowable_clock_skew': 900,
}
def __init__(self, base=None):

View File

@ -100,6 +100,69 @@ class TestS3ApiMiddleware(S3ApiTestCase):
self.swift.register('GET', '/something', swob.HTTPOk, {}, 'FAKE APP')
def test_init_config(self):
# verify config loading
# note: test confs do not have __file__ attribute so check_pipeline
# will be short-circuited
# check all defaults
expected = Config()
expected.update({
'auth_pipeline_check': True,
'check_bucket_owner': False,
'max_bucket_listing': 1000,
'max_multi_delete_objects': 1000,
'max_parts_listing': 1000,
'max_upload_part_num': 1000,
'min_segment_size': 5242880,
'multi_delete_concurrency': 2,
's3_acl': False,
})
s3api = S3ApiMiddleware(None, {})
self.assertEqual(expected, s3api.conf)
# check all non-defaults are loaded
conf = {
'slo_enabled': False,
'storage_domain': 'somewhere',
'location': 'us-west-1',
'force_swift_request_proxy_log': True,
'dns_compliant_bucket_names': False,
'allow_multipart_uploads': False,
'allow_no_owner': True,
'allowable_clock_skew': 300,
'auth_pipeline_check': False,
'check_bucket_owner': True,
'max_bucket_listing': 500,
'max_multi_delete_objects': 600,
'max_parts_listing': 70,
'max_upload_part_num': 800,
'min_segment_size': 1000000,
'multi_delete_concurrency': 1,
's3_acl': True,
}
s3api = S3ApiMiddleware(None, conf)
self.assertEqual(conf, s3api.conf)
def check_bad_positive_ints(**kwargs):
bad_conf = dict(conf, **kwargs)
self.assertRaises(ValueError, S3ApiMiddleware, None, bad_conf)
check_bad_positive_ints(allowable_clock_skew=-100)
check_bad_positive_ints(allowable_clock_skew=0)
check_bad_positive_ints(max_bucket_listing=-100)
check_bad_positive_ints(max_bucket_listing=0)
check_bad_positive_ints(max_multi_delete_objects=-100)
check_bad_positive_ints(max_multi_delete_objects=0)
check_bad_positive_ints(max_parts_listing=-100)
check_bad_positive_ints(max_parts_listing=0)
check_bad_positive_ints(max_upload_part_num=-100)
check_bad_positive_ints(max_upload_part_num=0)
check_bad_positive_ints(min_segment_size=-100)
check_bad_positive_ints(min_segment_size=0)
check_bad_positive_ints(multi_delete_concurrency=-100)
check_bad_positive_ints(multi_delete_concurrency=0)
def test_non_s3_request_passthrough(self):
req = Request.blank('/something')
status, headers, body = self.call_s3api(req)

View File

@ -413,7 +413,7 @@ class TestRequest(S3ApiTestCase):
headers.update(date_header)
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ, Config(), app=None)
sigv4_req = SigV4Request(req.environ, self.s3api.conf, app=None)
if 'X-Amz-Date' in date_header:
timestamp = mktime(
@ -467,7 +467,32 @@ class TestRequest(S3ApiTestCase):
self.assertEqual('403 Forbidden', cm.exception.args[0])
self.assertIn(access_denied_message, cm.exception.body)
# near-future X-Amz-Date header
# near-past X-Amz-Date headers
date_header = {'X-Amz-Date': self.get_v4_amz_date_header(
datetime.utcnow() - timedelta(minutes=10)
)}
self._test_request_timestamp_sigv4(date_header)
date_header = {'X-Amz-Date': self.get_v4_amz_date_header(
datetime.utcnow() - timedelta(minutes=10)
)}
with self.assertRaises(RequestTimeTooSkewed) as cm, \
patch.object(self.s3api.conf, 'allowable_clock_skew', 300):
self._test_request_timestamp_sigv4(date_header)
# near-future X-Amz-Date headers
date_header = {'X-Amz-Date': self.get_v4_amz_date_header(
datetime.utcnow() + timedelta(minutes=10)
)}
self._test_request_timestamp_sigv4(date_header)
date_header = {'X-Amz-Date': self.get_v4_amz_date_header(
datetime.utcnow() + timedelta(minutes=10)
)}
with self.assertRaises(RequestTimeTooSkewed) as cm, \
patch.object(self.s3api.conf, 'allowable_clock_skew', 300):
self._test_request_timestamp_sigv4(date_header)
date_header = {'X-Amz-Date': self.get_v4_amz_date_header(
datetime.utcnow() + timedelta(days=1)
)}

View File

@ -140,10 +140,21 @@ class TestConfig(unittest.TestCase):
self.assertTrue(conf.dns_compliant_bucket_names)
self.assertTrue(conf.allow_multipart_uploads)
self.assertFalse(conf.allow_no_owner)
self.assertEqual(900, conf.allowable_clock_skew)
def test_defaults(self):
# deliberately brittle so new defaults will need to be added to test
conf = utils.Config()
self._assert_defaults(conf)
del conf.slo_enabled
del conf.storage_domain
del conf.location
del conf.force_swift_request_proxy_log
del conf.dns_compliant_bucket_names
del conf.allow_multipart_uploads
del conf.allow_no_owner
del conf.allowable_clock_skew
self.assertEqual({}, conf)
def test_update(self):
conf = utils.Config()