diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index a5835f7e79..594bdba981 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -698,10 +698,17 @@ use = egg:swift#slo # Time limit on GET requests (seconds) # max_get_time = 86400 # -# When deleting with ?multipart-manifest=delete, multiple deletes may be -# executed in parallel. Avoid setting this too high, as it gives clients a -# force multiplier which may be used in DoS attacks. The suggested range is -# between 2 and 10. +# When creating an SLO, multiple segment validations may be executed in +# parallel. Further, multiple deletes may be executed in parallel when deleting +# with ?multipart-manifest=delete. Use this setting to limit how many +# subrequests may be executed concurrently. Avoid setting it too high, as it +# gives clients a force multiplier which may be used in DoS attacks. The +# suggested range is between 2 and 10. +# concurrency = 2 +# +# This may be used to separately tune validation and delete concurrency values. +# Default is to use the concurrency value from above; all of the same caveats +# apply regarding recommended ranges. # delete_concurrency = 2 # Note: Put after auth and staticweb in the pipeline. diff --git a/swift/common/middleware/slo.py b/swift/common/middleware/slo.py index 747d4980bd..02e6e0354b 100644 --- a/swift/common/middleware/slo.py +++ b/swift/common/middleware/slo.py @@ -202,6 +202,7 @@ metadata which can be used for stats purposes. from six.moves import range +from collections import defaultdict from datetime import datetime import json import mimetypes @@ -217,7 +218,7 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \ from swift.common.utils import get_logger, config_true_value, \ get_valid_utf8_str, override_bytes_from_content_type, split_path, \ register_swift_info, RateLimitedIterator, quote, close_if_possible, \ - closing_if_possible, LRUCache + closing_if_possible, LRUCache, StreamingPile from swift.common.request_helpers import SegmentedIterable from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success @@ -798,7 +799,10 @@ class StaticLargeObject(object): 'rate_limit_after_segment', '10')) self.rate_limit_segments_per_sec = int(self.conf.get( 'rate_limit_segments_per_sec', '1')) - delete_concurrency = int(self.conf.get('delete_concurrency', '2')) + self.concurrency = min(1000, max(0, int(self.conf.get( + 'concurrency', '2')))) + delete_concurrency = int(self.conf.get( + 'delete_concurrency', self.concurrency)) self.bulk_deleter = Bulk( app, {}, delete_concurrency=delete_concurrency, logger=self.logger) @@ -851,93 +855,103 @@ class StaticLargeObject(object): if not out_content_type: out_content_type = 'text/plain' data_for_storage = [] - slo_etag = md5() - last_obj_path = None + path2indices = defaultdict(list) for index, seg_dict in enumerate(parsed_data): - obj_name = seg_dict['path'] - if isinstance(obj_name, six.text_type): - obj_name = obj_name.encode('utf-8') - obj_path = '/'.join(['', vrs, account, obj_name.lstrip('/')]) + path2indices[seg_dict['path']].append(index) - if obj_path != last_obj_path: - last_obj_path = obj_path - sub_req = make_subrequest( - req.environ, path=obj_path + '?', # kill the query string - method='HEAD', - headers={'x-auth-token': req.headers.get('x-auth-token')}, - agent='%(orig)s SLO MultipartPUT', swift_source='SLO') - head_seg_resp = sub_req.get_response(self) + def do_head(obj_name): + obj_path = '/'.join(['', vrs, account, + get_valid_utf8_str(obj_name).lstrip('/')]) - if head_seg_resp.is_success: - segment_length = head_seg_resp.content_length - if seg_dict.get('range'): - # Since we now know the length, we can normalize the - # range. We know that there is exactly one range - # requested since we checked that earlier in - # parse_and_validate_input(). - ranges = seg_dict['range'].ranges_for_length( - head_seg_resp.content_length) + sub_req = make_subrequest( + req.environ, path=obj_path + '?', # kill the query string + method='HEAD', + headers={'x-auth-token': req.headers.get('x-auth-token')}, + agent='%(orig)s SLO MultipartPUT', swift_source='SLO') + return obj_name, sub_req.get_response(self) - if not ranges: - problem_segments.append([quote(obj_name), - 'Unsatisfiable Range']) - elif ranges == [(0, head_seg_resp.content_length)]: - # Just one range, and it exactly matches the object. - # Why'd we do this again? - del seg_dict['range'] - segment_length = head_seg_resp.content_length - else: - rng = ranges[0] - seg_dict['range'] = '%d-%d' % (rng[0], rng[1] - 1) - segment_length = rng[1] - rng[0] - - if segment_length < 1: - problem_segments.append( - [quote(obj_name), - 'Too small; each segment must be at least 1 byte.']) - total_size += segment_length - if seg_dict['size_bytes'] is not None and \ - seg_dict['size_bytes'] != head_seg_resp.content_length: - problem_segments.append([quote(obj_name), 'Size Mismatch']) - if seg_dict['etag'] is None or \ - seg_dict['etag'] == head_seg_resp.etag: - if seg_dict.get('range'): - slo_etag.update('%s:%s;' % (head_seg_resp.etag, - seg_dict['range'])) - else: - slo_etag.update(head_seg_resp.etag) - else: - problem_segments.append([quote(obj_name), 'Etag Mismatch']) - if head_seg_resp.last_modified: - last_modified = head_seg_resp.last_modified - else: - # shouldn't happen - last_modified = datetime.now() - - last_modified_formatted = \ - last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f') - seg_data = {'name': '/' + seg_dict['path'].lstrip('/'), - 'bytes': head_seg_resp.content_length, - 'hash': head_seg_resp.etag, - 'content_type': head_seg_resp.content_type, - 'last_modified': last_modified_formatted} - if seg_dict.get('range'): - seg_data['range'] = seg_dict['range'] - - if config_true_value( - head_seg_resp.headers.get('X-Static-Large-Object')): - seg_data['sub_slo'] = True - data_for_storage.append(seg_data) - - else: + def validate_seg_dict(seg_dict, head_seg_resp): + if not head_seg_resp.is_success: problem_segments.append([quote(obj_name), head_seg_resp.status]) + return 0, None + + segment_length = head_seg_resp.content_length + if seg_dict.get('range'): + # Since we now know the length, we can normalize the + # range. We know that there is exactly one range + # requested since we checked that earlier in + # parse_and_validate_input(). + ranges = seg_dict['range'].ranges_for_length( + head_seg_resp.content_length) + + if not ranges: + problem_segments.append([quote(obj_name), + 'Unsatisfiable Range']) + elif ranges == [(0, head_seg_resp.content_length)]: + # Just one range, and it exactly matches the object. + # Why'd we do this again? + del seg_dict['range'] + segment_length = head_seg_resp.content_length + else: + rng = ranges[0] + seg_dict['range'] = '%d-%d' % (rng[0], rng[1] - 1) + segment_length = rng[1] - rng[0] + + if segment_length < 1: + problem_segments.append( + [quote(obj_name), + 'Too small; each segment must be at least 1 byte.']) + if seg_dict['size_bytes'] is not None and \ + seg_dict['size_bytes'] != head_seg_resp.content_length: + problem_segments.append([quote(obj_name), 'Size Mismatch']) + if seg_dict['etag'] is not None and \ + seg_dict['etag'] != head_seg_resp.etag: + problem_segments.append([quote(obj_name), 'Etag Mismatch']) + if head_seg_resp.last_modified: + last_modified = head_seg_resp.last_modified + else: + # shouldn't happen + last_modified = datetime.now() + + last_modified_formatted = \ + last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f') + seg_data = {'name': '/' + seg_dict['path'].lstrip('/'), + 'bytes': head_seg_resp.content_length, + 'hash': head_seg_resp.etag, + 'content_type': head_seg_resp.content_type, + 'last_modified': last_modified_formatted} + if seg_dict.get('range'): + seg_data['range'] = seg_dict['range'] + if config_true_value( + head_seg_resp.headers.get('X-Static-Large-Object')): + seg_data['sub_slo'] = True + return segment_length, seg_data + + data_for_storage = [None] * len(parsed_data) + with StreamingPile(self.concurrency) as pile: + for obj_name, resp in pile.asyncstarmap(do_head, ( + (path, ) for path in path2indices)): + for i in path2indices[obj_name]: + segment_length, seg_data = validate_seg_dict( + parsed_data[i], resp) + data_for_storage[i] = seg_data + total_size += segment_length + if problem_segments: resp_body = get_response_body( out_content_type, {}, problem_segments) raise HTTPBadRequest(resp_body, content_type=out_content_type) env = req.environ + slo_etag = md5() + for seg_data in data_for_storage: + if seg_data.get('range'): + slo_etag.update('%s:%s;' % (seg_data['hash'], + seg_data['range'])) + else: + slo_etag.update(seg_data['hash']) + if not env.get('CONTENT_TYPE'): guessed_type, _junk = mimetypes.guess_type(req.path_info) env['CONTENT_TYPE'] = guessed_type or 'application/octet-stream' diff --git a/test/unit/common/middleware/test_slo.py b/test/unit/common/middleware/test_slo.py index aa97de9f97..8e61d143a8 100644 --- a/test/unit/common/middleware/test_slo.py +++ b/test/unit/common/middleware/test_slo.py @@ -418,7 +418,7 @@ class TestSloPutManifest(SloTestCase): def my_fake_start_response(*args, **kwargs): gen_etag = '"' + md5hex('etagoftheobjectsegment') + '"' - self.assertTrue(('Etag', gen_etag) in args[1]) + self.assertIn(('Etag', gen_etag), args[1]) self.slo(req.environ, my_fake_start_response) self.assertIn('X-Static-Large-Object', req.headers) @@ -552,17 +552,13 @@ class TestSloPutManifest(SloTestCase): self.assertEqual(self.app.call_count, 5) errors = json.loads(body)['Errors'] - self.assertEqual(len(errors), 5) - self.assertEqual(errors[0][0], '/checktest/a_1') - self.assertEqual(errors[0][1], 'Size Mismatch') - self.assertEqual(errors[1][0], '/checktest/badreq') - self.assertEqual(errors[1][1], '400 Bad Request') - self.assertEqual(errors[2][0], '/checktest/b_2') - self.assertEqual(errors[2][1], 'Etag Mismatch') - self.assertEqual(errors[3][0], '/checktest/slob') - self.assertEqual(errors[3][1], 'Size Mismatch') - self.assertEqual(errors[4][0], '/checktest/slob') - self.assertEqual(errors[4][1], 'Etag Mismatch') + self.assertEqual([ + [u'/checktest/a_1', u'Size Mismatch'], + [u'/checktest/b_2', u'Etag Mismatch'], + [u'/checktest/badreq', u'400 Bad Request'], + [u'/checktest/slob', u'Etag Mismatch'], + [u'/checktest/slob', u'Size Mismatch'], + ], sorted(errors)) def test_handle_multipart_put_skip_size_check(self): good_data = json.dumps( @@ -675,21 +671,25 @@ class TestSloPutManifest(SloTestCase): 'size_bytes': 2, 'range': '-1'}, {'path': '/checktest/b_2', 'etag': None, 'size_bytes': 2, 'range': '0-0'}, + {'path': '/checktest/a_1', 'etag': None, + 'size_bytes': None}, {'path': '/cont/object', 'etag': None, 'size_bytes': None, 'range': '10-40'}]) req = Request.blank( '/v1/AUTH_test/checktest/man_3?multipart-manifest=put', environ={'REQUEST_METHOD': 'PUT'}, body=good_data) status, headers, body = self.call_slo(req) - expected_etag = '"%s"' % md5hex('ab:1-1;b:0-0;etagoftheobjectsegment:' + expected_etag = '"%s"' % md5hex('ab:1-1;b:0-0;aetagoftheobjectsegment:' '10-40;') self.assertEqual(expected_etag, dict(headers)['Etag']) self.assertEqual([ - ('HEAD', '/v1/AUTH_test/checktest/a_1'), + ('HEAD', '/v1/AUTH_test/checktest/a_1'), # Only once! ('HEAD', '/v1/AUTH_test/checktest/b_2'), # Only once! ('HEAD', '/v1/AUTH_test/cont/object'), + ], sorted(self.app.calls[:-1])) + self.assertEqual( ('PUT', '/v1/AUTH_test/checktest/man_3?multipart-manifest=put'), - ], self.app.calls) + self.app.calls[-1]) # Check that we still populated the manifest properly from our HEADs req = Request.blank( @@ -699,9 +699,10 @@ class TestSloPutManifest(SloTestCase): environ={'REQUEST_METHOD': 'GET'}) status, headers, body = self.call_app(req) manifest_data = json.loads(body) + self.assertEqual(len(manifest_data), 5) + self.assertEqual('a', manifest_data[0]['hash']) self.assertNotIn('range', manifest_data[0]) - self.assertNotIn('segment_bytes', manifest_data[0]) self.assertEqual('b', manifest_data[1]['hash']) self.assertEqual('1-1', manifest_data[1]['range']) @@ -709,8 +710,11 @@ class TestSloPutManifest(SloTestCase): self.assertEqual('b', manifest_data[2]['hash']) self.assertEqual('0-0', manifest_data[2]['range']) - self.assertEqual('etagoftheobjectsegment', manifest_data[3]['hash']) - self.assertEqual('10-40', manifest_data[3]['range']) + self.assertEqual('a', manifest_data[3]['hash']) + self.assertNotIn('range', manifest_data[3]) + + self.assertEqual('etagoftheobjectsegment', manifest_data[4]['hash']) + self.assertEqual('10-40', manifest_data[4]['range']) class TestSloDeleteManifest(SloTestCase):