Merge "SLO: Concurrently HEAD segments"
This commit is contained in:
@ -698,10 +698,17 @@ use = egg:swift#slo
# Time limit on GET requests (seconds)
# max_get_time = 86400
# When deleting with ?multipart-manifest=delete, multiple deletes may be
# executed in parallel. Avoid setting this too high, as it gives clients a
# force multiplier which may be used in DoS attacks. The suggested range is
# between 2 and 10.
# When creating an SLO, multiple segment validations may be executed in
# parallel. Further, multiple deletes may be executed in parallel when deleting
# with ?multipart-manifest=delete. Use this setting to limit how many
# subrequests may be executed concurrently. Avoid setting it too high, as it
# gives clients a force multiplier which may be used in DoS attacks. The
# suggested range is between 2 and 10.
# concurrency = 2
# This may be used to separately tune validation and delete concurrency values.
# Default is to use the concurrency value from above; all of the same caveats
# apply regarding recommended ranges.
# delete_concurrency = 2
# Note: Put after auth and staticweb in the pipeline.
@ -202,6 +202,7 @@ metadata which can be used for stats purposes.
from six.moves import range
from collections import defaultdict
from datetime import datetime
import json
import mimetypes
@ -217,7 +218,7 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
from swift.common.utils import get_logger, config_true_value, \
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
register_swift_info, RateLimitedIterator, quote, close_if_possible, \
closing_if_possible, LRUCache
closing_if_possible, LRUCache, StreamingPile
from swift.common.request_helpers import SegmentedIterable
from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success
@ -798,7 +799,10 @@ class StaticLargeObject(object):
'rate_limit_after_segment', '10'))
self.rate_limit_segments_per_sec = int(self.conf.get(
'rate_limit_segments_per_sec', '1'))
delete_concurrency = int(self.conf.get('delete_concurrency', '2'))
self.concurrency = min(1000, max(0, int(self.conf.get(
'concurrency', '2'))))
delete_concurrency = int(self.conf.get(
'delete_concurrency', self.concurrency))
self.bulk_deleter = Bulk(
app, {}, delete_concurrency=delete_concurrency, logger=self.logger)
@ -851,93 +855,103 @@ class StaticLargeObject(object):
if not out_content_type:
out_content_type = 'text/plain'
data_for_storage = []
slo_etag = md5()
last_obj_path = None
path2indices = defaultdict(list)
for index, seg_dict in enumerate(parsed_data):
obj_name = seg_dict['path']
if isinstance(obj_name, six.text_type):
obj_name = obj_name.encode('utf-8')
obj_path = '/'.join(['', vrs, account, obj_name.lstrip('/')])
if obj_path != last_obj_path:
last_obj_path = obj_path
sub_req = make_subrequest(
req.environ, path=obj_path + '?', # kill the query string
headers={'x-auth-token': req.headers.get('x-auth-token')},
agent='%(orig)s SLO MultipartPUT', swift_source='SLO')
head_seg_resp = sub_req.get_response(self)
def do_head(obj_name):
obj_path = '/'.join(['', vrs, account,
if head_seg_resp.is_success:
segment_length = head_seg_resp.content_length
if seg_dict.get('range'):
# Since we now know the length, we can normalize the
# range. We know that there is exactly one range
# requested since we checked that earlier in
# parse_and_validate_input().
ranges = seg_dict['range'].ranges_for_length(
sub_req = make_subrequest(
req.environ, path=obj_path + '?', # kill the query string
headers={'x-auth-token': req.headers.get('x-auth-token')},
agent='%(orig)s SLO MultipartPUT', swift_source='SLO')
return obj_name, sub_req.get_response(self)
if not ranges:
'Unsatisfiable Range'])
elif ranges == [(0, head_seg_resp.content_length)]:
# Just one range, and it exactly matches the object.
# Why'd we do this again?
del seg_dict['range']
segment_length = head_seg_resp.content_length
rng = ranges[0]
seg_dict['range'] = '%d-%d' % (rng[0], rng[1] - 1)
segment_length = rng[1] - rng[0]
if segment_length < 1:
'Too small; each segment must be at least 1 byte.'])
total_size += segment_length
if seg_dict['size_bytes'] is not None and \
seg_dict['size_bytes'] != head_seg_resp.content_length:
problem_segments.append([quote(obj_name), 'Size Mismatch'])
if seg_dict['etag'] is None or \
seg_dict['etag'] == head_seg_resp.etag:
if seg_dict.get('range'):
slo_etag.update('%s:%s;' % (head_seg_resp.etag,
problem_segments.append([quote(obj_name), 'Etag Mismatch'])
if head_seg_resp.last_modified:
last_modified = head_seg_resp.last_modified
# shouldn't happen
last_modified =
last_modified_formatted = \
seg_data = {'name': '/' + seg_dict['path'].lstrip('/'),
'bytes': head_seg_resp.content_length,
'hash': head_seg_resp.etag,
'content_type': head_seg_resp.content_type,
'last_modified': last_modified_formatted}
if seg_dict.get('range'):
seg_data['range'] = seg_dict['range']
if config_true_value(
seg_data['sub_slo'] = True
def validate_seg_dict(seg_dict, head_seg_resp):
if not head_seg_resp.is_success:
return 0, None
segment_length = head_seg_resp.content_length
if seg_dict.get('range'):
# Since we now know the length, we can normalize the
# range. We know that there is exactly one range
# requested since we checked that earlier in
# parse_and_validate_input().
ranges = seg_dict['range'].ranges_for_length(
if not ranges:
'Unsatisfiable Range'])
elif ranges == [(0, head_seg_resp.content_length)]:
# Just one range, and it exactly matches the object.
# Why'd we do this again?
del seg_dict['range']
segment_length = head_seg_resp.content_length
rng = ranges[0]
seg_dict['range'] = '%d-%d' % (rng[0], rng[1] - 1)
segment_length = rng[1] - rng[0]
if segment_length < 1:
'Too small; each segment must be at least 1 byte.'])
if seg_dict['size_bytes'] is not None and \
seg_dict['size_bytes'] != head_seg_resp.content_length:
problem_segments.append([quote(obj_name), 'Size Mismatch'])
if seg_dict['etag'] is not None and \
seg_dict['etag'] != head_seg_resp.etag:
problem_segments.append([quote(obj_name), 'Etag Mismatch'])
if head_seg_resp.last_modified:
last_modified = head_seg_resp.last_modified
# shouldn't happen
last_modified =
last_modified_formatted = \
seg_data = {'name': '/' + seg_dict['path'].lstrip('/'),
'bytes': head_seg_resp.content_length,
'hash': head_seg_resp.etag,
'content_type': head_seg_resp.content_type,
'last_modified': last_modified_formatted}
if seg_dict.get('range'):
seg_data['range'] = seg_dict['range']
if config_true_value(
seg_data['sub_slo'] = True
return segment_length, seg_data
data_for_storage = [None] * len(parsed_data)
with StreamingPile(self.concurrency) as pile:
for obj_name, resp in pile.asyncstarmap(do_head, (
(path, ) for path in path2indices)):
for i in path2indices[obj_name]:
segment_length, seg_data = validate_seg_dict(
parsed_data[i], resp)
data_for_storage[i] = seg_data
total_size += segment_length
if problem_segments:
resp_body = get_response_body(
out_content_type, {}, problem_segments)
raise HTTPBadRequest(resp_body, content_type=out_content_type)
env = req.environ
slo_etag = md5()
for seg_data in data_for_storage:
if seg_data.get('range'):
slo_etag.update('%s:%s;' % (seg_data['hash'],
if not env.get('CONTENT_TYPE'):
guessed_type, _junk = mimetypes.guess_type(req.path_info)
env['CONTENT_TYPE'] = guessed_type or 'application/octet-stream'
@ -418,7 +418,7 @@ class TestSloPutManifest(SloTestCase):
def my_fake_start_response(*args, **kwargs):
gen_etag = '"' + md5hex('etagoftheobjectsegment') + '"'
self.assertTrue(('Etag', gen_etag) in args[1])
self.assertIn(('Etag', gen_etag), args[1])
self.slo(req.environ, my_fake_start_response)
self.assertIn('X-Static-Large-Object', req.headers)
@ -552,17 +552,13 @@ class TestSloPutManifest(SloTestCase):
self.assertEqual(, 5)
errors = json.loads(body)['Errors']
self.assertEqual(len(errors), 5)
self.assertEqual(errors[0][0], '/checktest/a_1')
self.assertEqual(errors[0][1], 'Size Mismatch')
self.assertEqual(errors[1][0], '/checktest/badreq')
self.assertEqual(errors[1][1], '400 Bad Request')
self.assertEqual(errors[2][0], '/checktest/b_2')
self.assertEqual(errors[2][1], 'Etag Mismatch')
self.assertEqual(errors[3][0], '/checktest/slob')
self.assertEqual(errors[3][1], 'Size Mismatch')
self.assertEqual(errors[4][0], '/checktest/slob')
self.assertEqual(errors[4][1], 'Etag Mismatch')
[u'/checktest/a_1', u'Size Mismatch'],
[u'/checktest/b_2', u'Etag Mismatch'],
[u'/checktest/badreq', u'400 Bad Request'],
[u'/checktest/slob', u'Etag Mismatch'],
[u'/checktest/slob', u'Size Mismatch'],
], sorted(errors))
def test_handle_multipart_put_skip_size_check(self):
good_data = json.dumps(
@ -675,21 +671,25 @@ class TestSloPutManifest(SloTestCase):
'size_bytes': 2, 'range': '-1'},
{'path': '/checktest/b_2', 'etag': None,
'size_bytes': 2, 'range': '0-0'},
{'path': '/checktest/a_1', 'etag': None,
'size_bytes': None},
{'path': '/cont/object', 'etag': None,
'size_bytes': None, 'range': '10-40'}])
req = Request.blank(
environ={'REQUEST_METHOD': 'PUT'}, body=good_data)
status, headers, body = self.call_slo(req)
expected_etag = '"%s"' % md5hex('ab:1-1;b:0-0;etagoftheobjectsegment:'
expected_etag = '"%s"' % md5hex('ab:1-1;b:0-0;aetagoftheobjectsegment:'
self.assertEqual(expected_etag, dict(headers)['Etag'])
('HEAD', '/v1/AUTH_test/checktest/a_1'),
('HEAD', '/v1/AUTH_test/checktest/a_1'), # Only once!
('HEAD', '/v1/AUTH_test/checktest/b_2'), # Only once!
('HEAD', '/v1/AUTH_test/cont/object'),
], sorted([:-1]))
('PUT', '/v1/AUTH_test/checktest/man_3?multipart-manifest=put'),
# Check that we still populated the manifest properly from our HEADs
req = Request.blank(
@ -699,9 +699,10 @@ class TestSloPutManifest(SloTestCase):
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_app(req)
manifest_data = json.loads(body)
self.assertEqual(len(manifest_data), 5)
self.assertEqual('a', manifest_data[0]['hash'])
self.assertNotIn('range', manifest_data[0])
self.assertNotIn('segment_bytes', manifest_data[0])
self.assertEqual('b', manifest_data[1]['hash'])
self.assertEqual('1-1', manifest_data[1]['range'])
@ -709,8 +710,11 @@ class TestSloPutManifest(SloTestCase):
self.assertEqual('b', manifest_data[2]['hash'])
self.assertEqual('0-0', manifest_data[2]['range'])
self.assertEqual('etagoftheobjectsegment', manifest_data[3]['hash'])
self.assertEqual('10-40', manifest_data[3]['range'])
self.assertEqual('a', manifest_data[3]['hash'])
self.assertNotIn('range', manifest_data[3])
self.assertEqual('etagoftheobjectsegment', manifest_data[4]['hash'])
self.assertEqual('10-40', manifest_data[4]['range'])
class TestSloDeleteManifest(SloTestCase):
Reference in New Issue
Block a user