Merge "Fix the GET's response code when there is a missing segment in LO"

This commit is contained in:
Jenkins 2015-01-05 16:23:02 +00:00 committed by Gerrit Code Review
commit 1f1cdceabe
5 changed files with 308 additions and 98 deletions

View File

@ -17,10 +17,10 @@ import os
from ConfigParser import ConfigParser, NoSectionError, NoOptionError
from hashlib import md5
from swift.common import constraints
from swift.common.exceptions import ListingIterError
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.http import is_success
from swift.common.swob import Request, Response, \
HTTPRequestedRangeNotSatisfiable, HTTPBadRequest
HTTPRequestedRangeNotSatisfiable, HTTPBadRequest, HTTPConflict
from swift.common.utils import get_logger, json, \
RateLimitedIterator, read_conf_dir, quote
from swift.common.request_helpers import SegmentedIterable
@ -131,9 +131,10 @@ class GetContext(WSGIContext):
constraints.CONTAINER_LISTING_LIMIT
first_byte = last_byte = None
content_length = None
actual_content_length = None
content_length_for_swob_range = None
if req.range and len(req.range.ranges) == 1:
content_length = sum(o['bytes'] for o in segments)
content_length_for_swob_range = sum(o['bytes'] for o in segments)
# This is a hack to handle suffix byte ranges (e.g. "bytes=-5"),
# which we can't honor unless we have a complete listing.
@ -144,28 +145,32 @@ class GetContext(WSGIContext):
#
# Alternately, we may not have all the segments, but this range
# falls entirely within the first page's segments, so we know
# whether or not it's satisfiable.
if have_complete_listing or range_end < content_length:
byteranges = req.range.ranges_for_length(content_length)
# that it is satisfiable.
if (have_complete_listing
or range_end < content_length_for_swob_range):
byteranges = req.range.ranges_for_length(
content_length_for_swob_range)
if not byteranges:
return HTTPRequestedRangeNotSatisfiable(request=req)
first_byte, last_byte = byteranges[0]
# For some reason, swob.Range.ranges_for_length adds 1 to the
# last byte's position.
last_byte -= 1
actual_content_length = last_byte - first_byte + 1
else:
# The range may or may not be satisfiable, but we can't tell
# based on just one page of listing, and we're not going to go
# get more pages because that would use up too many resources,
# so we ignore the Range header and return the whole object.
content_length = None
actual_content_length = None
content_length_for_swob_range = None
req.range = None
response_headers = [
(h, v) for h, v in response_headers
if h.lower() not in ("content-length", "content-range")]
if content_length is not None:
if content_length_for_swob_range is not None:
# Here, we have to give swob a big-enough content length so that
# it can compute the actual content length based on the Range
# header. This value will not be visible to the client; swob will
@ -175,10 +180,12 @@ class GetContext(WSGIContext):
# segments, this may be less than the sum of all the segments'
# sizes. However, it'll still be greater than the last byte in the
# Range header, so it's good enough for swob.
response_headers.append(('Content-Length', str(content_length)))
elif have_complete_listing:
response_headers.append(('Content-Length',
str(sum(o['bytes'] for o in segments))))
str(content_length_for_swob_range)))
elif have_complete_listing:
actual_content_length = sum(o['bytes'] for o in segments)
response_headers.append(('Content-Length',
str(actual_content_length)))
if have_complete_listing:
response_headers = [(h, v) for h, v in response_headers
@ -188,21 +195,30 @@ class GetContext(WSGIContext):
etag.update(seg_dict['hash'].strip('"'))
response_headers.append(('Etag', '"%s"' % etag.hexdigest()))
listing_iter = RateLimitedIterator(
self._segment_listing_iterator(
req, version, account, container, obj_prefix, segments,
first_byte=first_byte, last_byte=last_byte),
self.dlo.rate_limit_segments_per_sec,
limit_after=self.dlo.rate_limit_after_segment)
app_iter = None
if req.method == 'GET':
listing_iter = RateLimitedIterator(
self._segment_listing_iterator(
req, version, account, container, obj_prefix, segments,
first_byte=first_byte, last_byte=last_byte),
self.dlo.rate_limit_segments_per_sec,
limit_after=self.dlo.rate_limit_after_segment)
app_iter = SegmentedIterable(
req, self.dlo.app, listing_iter, ua_suffix="DLO MultipartGET",
swift_source="DLO", name=req.path, logger=self.logger,
max_get_time=self.dlo.max_get_time,
response_body_length=actual_content_length)
try:
app_iter.validate_first_segment()
except (SegmentError, ListingIterError):
return HTTPConflict(request=req)
resp = Response(request=req, headers=response_headers,
conditional_response=True,
app_iter=SegmentedIterable(
req, self.dlo.app, listing_iter,
ua_suffix="DLO MultipartGET",
swift_source="DLO",
name=req.path, logger=self.logger,
max_get_time=self.dlo.max_get_time))
resp.app_iter.response = resp
app_iter=app_iter)
return resp
def handle_request(self, req, start_response):

View File

@ -139,11 +139,12 @@ from datetime import datetime
import mimetypes
import re
from hashlib import md5
from swift.common.exceptions import ListingIterError
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, \
HTTPOk, HTTPPreconditionFailed, HTTPException, HTTPNotFound, \
HTTPUnauthorized, HTTPRequestedRangeNotSatisfiable, Response
HTTPUnauthorized, HTTPConflict, HTTPRequestedRangeNotSatisfiable,\
Response
from swift.common.utils import json, get_logger, config_true_value, \
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
register_swift_info, RateLimitedIterator, quote
@ -464,15 +465,27 @@ class SloGetContext(WSGIContext):
start_byte, end_byte)
for seg_dict, start_byte, end_byte in ratelimited_listing_iter)
segmented_iter = SegmentedIterable(
req, self.slo.app, segment_listing_iter,
name=req.path, logger=self.slo.logger,
ua_suffix="SLO MultipartGET",
swift_source="SLO",
max_get_time=self.slo.max_get_time)
try:
segmented_iter.validate_first_segment()
except (ListingIterError, SegmentError):
# Copy from the SLO explanation in top of this file.
# If any of the segments from the manifest are not found or
# their Etag/Content Length no longer match the connection
# will drop. In this case a 409 Conflict will be logged in
# the proxy logs and the user will receive incomplete results.
return HTTPConflict(request=req)
response = Response(request=req, content_length=content_length,
headers=response_headers,
conditional_response=True,
app_iter=SegmentedIterable(
req, self.slo.app, segment_listing_iter,
name=req.path, logger=self.slo.logger,
ua_suffix="SLO MultipartGET",
swift_source="SLO",
max_get_time=self.slo.max_get_time))
app_iter=segmented_iter)
if req.range:
response.headers.pop('Etag')
return response

View File

@ -21,13 +21,14 @@ from swob in here without creating circular imports.
"""
import hashlib
import sys
import itertools
import time
from contextlib import contextmanager
from urllib import unquote
from swift import gettext_ as _
from swift.common.constraints import FORMAT2CONTENT_TYPE
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.http import is_success, HTTP_SERVICE_UNAVAILABLE
from swift.common.http import is_success
from swift.common.swob import HTTPBadRequest, HTTPNotAcceptable
from swift.common.utils import split_path, validate_device_partition
from swift.common.wsgi import make_subrequest
@ -276,12 +277,13 @@ class SegmentedIterable(object):
(just for logging)
:param ua_suffix: string to append to user-agent.
:param name: name of manifest (used in logging only)
:param response: optional response object for the response being sent
to the client.
:param response_body_length: optional response body length for
the response being sent to the client.
"""
def __init__(self, req, app, listing_iter, max_get_time,
logger, ua_suffix, swift_source,
name='<not specified>', response=None):
name='<not specified>', response_body_length=None):
self.req = req
self.app = app
self.listing_iter = listing_iter
@ -290,26 +292,14 @@ class SegmentedIterable(object):
self.ua_suffix = " " + ua_suffix
self.swift_source = swift_source
self.name = name
self.response = response
self.response_body_length = response_body_length
self.peeked_chunk = None
self.app_iter = self._internal_iter()
self.validated_first_segment = False
def app_iter_range(self, *a, **kw):
"""
swob.Response will only respond with a 206 status in certain cases; one
of those is if the body iterator responds to .app_iter_range().
However, this object (or really, its listing iter) is smart enough to
handle the range stuff internally, so we just no-op this out for swob.
"""
return self
def __iter__(self):
def _internal_iter(self):
start_time = time.time()
have_yielded_data = False
if self.response and self.response.content_length:
bytes_left = int(self.response.content_length)
else:
bytes_left = None
bytes_left = self.response_body_length
try:
for seg_path, seg_etag, seg_size, first_byte, last_byte \
@ -366,7 +356,6 @@ class SegmentedIterable(object):
seg_hash = hashlib.md5()
for chunk in seg_resp.app_iter:
seg_hash.update(chunk)
have_yielded_data = True
if bytes_left is None:
yield chunk
elif bytes_left >= len(chunk):
@ -393,29 +382,44 @@ class SegmentedIterable(object):
if bytes_left:
raise SegmentError(
'Not enough bytes for %s; closing connection' %
self.name)
except ListingIterError as err:
# I have to save this error because yielding the ' ' below clears
# the exception from the current stack frame.
excinfo = sys.exc_info()
self.logger.exception('ERROR: While processing manifest %s, %s',
self.name, err)
# Normally, exceptions before any data has been yielded will
# cause Eventlet to send a 5xx response. In this particular
# case of ListingIterError we don't want that and we'd rather
# just send the normal 2xx response and then hang up early
# since 5xx codes are often used to judge Service Level
# Agreements and this ListingIterError indicates the user has
# created an invalid condition.
if not have_yielded_data:
yield ' '
raise excinfo
except SegmentError as err:
self.logger.exception(err)
# This doesn't actually change the response status (we're too
# late for that), but this does make it to the logs.
if self.response:
self.response.status = HTTP_SERVICE_UNAVAILABLE
'Not enough bytes for %s; closing connection' % self.name)
except (ListingIterError, SegmentError):
self.logger.exception(_('ERROR: An error occurred '
'while retrieving segments'))
raise
def app_iter_range(self, *a, **kw):
"""
swob.Response will only respond with a 206 status in certain cases; one
of those is if the body iterator responds to .app_iter_range().
However, this object (or really, its listing iter) is smart enough to
handle the range stuff internally, so we just no-op this out for swob.
"""
return self
def validate_first_segment(self):
"""
Start fetching object data to ensure that the first segment (if any) is
valid. This is to catch cases like "first segment is missing" or
"first segment's etag doesn't match manifest".
Note: this does not validate that you have any segments. A
zero-segment large object is not erroneous; it is just empty.
"""
if self.validated_first_segment:
return
self.validated_first_segment = True
try:
self.peeked_chunk = self.app_iter.next()
except StopIteration:
pass
def __iter__(self):
if self.peeked_chunk is not None:
pc = self.peeked_chunk
self.peeked_chunk = None
return itertools.chain([pc], self.app_iter)
else:
return self.app_iter

View File

@ -73,7 +73,7 @@ class DloTestCase(unittest.TestCase):
# don't slow down tests with rate limiting
'rate_limit_after_segment': '1000000',
})(self.app)
self.dlo.logger = self.app.logger
self.app.register(
'GET', '/v1/AUTH_test/c/seg_01',
swob.HTTPOk, {'Content-Length': '5', 'Etag': md5hex("aaaaa")},
@ -562,12 +562,11 @@ class TestDloGetManifest(DloTestCase):
req = swob.Request.blank('/v1/AUTH_test/mancon/manifest',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body, exc = self.call_dlo(req, expect_exception=True)
headers = swob.HeaderKeyDict(headers)
self.assertTrue(isinstance(exc, exceptions.SegmentError))
self.assertEqual(status, "200 OK")
self.assertEqual(body, '') # error right away -> no body bytes sent
status, headers, body = self.call_dlo(req)
self.assertEqual(status, "409 Conflict")
err_log = self.dlo.logger.log_dict['exception'][0][0][0]
self.assertTrue(err_log.startswith('ERROR: An error occurred '
'while retrieving segments'))
def test_error_fetching_second_segment(self):
self.app.register(
@ -582,6 +581,9 @@ class TestDloGetManifest(DloTestCase):
self.assertTrue(isinstance(exc, exceptions.SegmentError))
self.assertEqual(status, "200 OK")
self.assertEqual(''.join(body), "aaaaa") # first segment made it out
err_log = self.dlo.logger.log_dict['exception'][0][0][0]
self.assertTrue(err_log.startswith('ERROR: An error occurred '
'while retrieving segments'))
def test_error_listing_container_first_listing_request(self):
self.app.register(
@ -626,7 +628,7 @@ class TestDloGetManifest(DloTestCase):
self.assertEqual(''.join(body), "aaaaabbWRONGbb") # stop after error
def test_etag_comparison_ignores_quotes(self):
# a little future-proofing here in case we ever fix this
# a little future-proofing here in case we ever fix this in swob
self.app.register(
'HEAD', '/v1/AUTH_test/mani/festo',
swob.HTTPOk, {'Content-Length': '0', 'Etag': 'blah',

View File

@ -55,6 +55,7 @@ class SloTestCase(unittest.TestCase):
self.app = FakeSwift()
self.slo = slo.filter_factory({})(self.app)
self.slo.min_segment_size = 1
self.slo.logger = self.app.logger
def call_app(self, req, app=None, expect_exception=False):
if app is None:
@ -1286,6 +1287,119 @@ class TestSloGetManifest(SloTestCase):
# make sure we didn't keep asking for segments
self.assertEqual(self.app.call_count, 20)
def test_sub_slo_recursion(self):
# man1 points to man2 and obj1, man2 points to man3 and obj2...
for i in xrange(11):
self.app.register('GET', '/v1/AUTH_test/gettest/obj%d' % i,
swob.HTTPOk, {'Content-Type': 'text/plain',
'Content-Length': '6',
'Etag': md5hex('body%02d' % i)},
'body%02d' % i)
manifest_json = json.dumps([{'name': '/gettest/obj%d' % i,
'hash': md5hex('body%2d' % i),
'content_type': 'text/plain',
'bytes': '6'}])
self.app.register(
'GET', '/v1/AUTH_test/gettest/man%d' % i,
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true',
'Etag': 'man%d' % i},
manifest_json)
self.app.register(
'HEAD', '/v1/AUTH_test/gettest/obj%d' % i,
swob.HTTPOk, {'Content-Length': '6',
'Etag': md5hex('body%2d' % i)},
None)
for i in xrange(9, 0, -1):
manifest_data = [
{'name': '/gettest/man%d' % (i + 1),
'hash': 'man%d' % (i + 1),
'sub_slo': True,
'bytes': len(manifest_json),
'content_type':
'application/json;swift_bytes=%d' % ((10 - i) * 6)},
{'name': '/gettest/obj%d' % i,
'hash': md5hex('body%02d' % i),
'bytes': '6',
'content_type': 'text/plain'}]
manifest_json = json.dumps(manifest_data)
self.app.register(
'GET', '/v1/AUTH_test/gettest/man%d' % i,
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true',
'Etag': 'man%d' % i},
manifest_json)
req = Request.blank(
'/v1/AUTH_test/gettest/man1',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK')
self.assertEqual(body, ('body10body09body08body07body06' +
'body05body04body03body02body01'))
self.assertEqual(self.app.call_count, 20)
def test_sub_slo_recursion_limit(self):
# man1 points to man2 and obj1, man2 points to man3 and obj2...
for i in xrange(12):
self.app.register('GET', '/v1/AUTH_test/gettest/obj%d' % i,
swob.HTTPOk,
{'Content-Type': 'text/plain',
'Content-Length': '6',
'Etag': md5hex('body%02d' % i)}, 'body%02d' % i)
manifest_json = json.dumps([{'name': '/gettest/obj%d' % i,
'hash': md5hex('body%2d' % i),
'content_type': 'text/plain',
'bytes': '6'}])
self.app.register(
'GET', '/v1/AUTH_test/gettest/man%d' % i,
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true',
'Etag': 'man%d' % i},
manifest_json)
self.app.register(
'HEAD', '/v1/AUTH_test/gettest/obj%d' % i,
swob.HTTPOk, {'Content-Length': '6',
'Etag': md5hex('body%2d' % i)},
None)
for i in xrange(11, 0, -1):
manifest_data = [
{'name': '/gettest/man%d' % (i + 1),
'hash': 'man%d' % (i + 1),
'sub_slo': True,
'bytes': len(manifest_json),
'content_type':
'application/json;swift_bytes=%d' % ((12 - i) * 6)},
{'name': '/gettest/obj%d' % i,
'hash': md5hex('body%02d' % i),
'bytes': '6',
'content_type': 'text/plain'}]
manifest_json = json.dumps(manifest_data)
self.app.register('GET', '/v1/AUTH_test/gettest/man%d' % i,
swob.HTTPOk,
{'Content-Type': 'application/json',
'X-Static-Large-Object': 'true',
'Etag': 'man%d' % i},
manifest_json)
req = Request.blank(
'/v1/AUTH_test/gettest/man1',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
self.assertEqual(status, '409 Conflict')
self.assertEqual(self.app.call_count, 10)
err_log = self.slo.logger.log_dict['exception'][0][0][0]
self.assertTrue(err_log.startswith('ERROR: An error occurred '
'while retrieving segments'))
def test_get_with_if_modified_since(self):
# It's important not to pass the If-[Un]Modified-Since header to the
# proxy for segment or submanifest GET requests, as it may result in
@ -1356,11 +1470,12 @@ class TestSloGetManifest(SloTestCase):
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-manifest-a',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body, exc = self.call_slo(req, expect_exception=True)
status, headers, body = self.call_slo(req)
self.assertTrue(isinstance(exc, ListingIterError))
self.assertEqual('200 OK', status)
self.assertEqual(body, ' ')
self.assertEqual('409 Conflict', status)
err_log = self.slo.logger.log_dict['exception'][0][0][0]
self.assertTrue(err_log.startswith('ERROR: An error occurred '
'while retrieving segments'))
def test_invalid_json_submanifest(self):
self.app.register(
@ -1421,6 +1536,42 @@ class TestSloGetManifest(SloTestCase):
self.assertEqual('200 OK', status)
self.assertEqual(body, 'aaaaa')
def test_first_segment_mismatched_etag(self):
self.app.register('GET', '/v1/AUTH_test/gettest/manifest-badetag',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/gettest/a_5',
'hash': 'wrong!',
'content_type': 'text/plain',
'bytes': '5'}]))
req = Request.blank('/v1/AUTH_test/gettest/manifest-badetag',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
self.assertEqual('409 Conflict', status)
err_log = self.slo.logger.log_dict['exception'][0][0][0]
self.assertTrue(err_log.startswith('ERROR: An error occurred '
'while retrieving segments'))
def test_first_segment_mismatched_size(self):
self.app.register('GET', '/v1/AUTH_test/gettest/manifest-badsize',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/gettest/a_5',
'hash': md5hex('a' * 5),
'content_type': 'text/plain',
'bytes': '999999'}]))
req = Request.blank('/v1/AUTH_test/gettest/manifest-badsize',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
self.assertEqual('409 Conflict', status)
err_log = self.slo.logger.log_dict['exception'][0][0][0]
self.assertTrue(err_log.startswith('ERROR: An error occurred '
'while retrieving segments'))
def test_download_takes_too_long(self):
the_time = [time.time()]
@ -1454,6 +1605,27 @@ class TestSloGetManifest(SloTestCase):
('GET', '/v1/AUTH_test/gettest/b_10?multipart-manifest=get'),
('GET', '/v1/AUTH_test/gettest/c_15?multipart-manifest=get')])
def test_first_segment_not_exists(self):
self.app.register('GET', '/v1/AUTH_test/gettest/not_exists_obj',
swob.HTTPNotFound, {}, None)
self.app.register('GET', '/v1/AUTH_test/gettest/manifest-not-exists',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/gettest/not_exists_obj',
'hash': md5hex('not_exists_obj'),
'content_type': 'text/plain',
'bytes': '%d' % len('not_exists_obj')
}]))
req = Request.blank('/v1/AUTH_test/gettest/manifest-not-exists',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
self.assertEqual('409 Conflict', status)
err_log = self.slo.logger.log_dict['exception'][0][0][0]
self.assertTrue(err_log.startswith('ERROR: An error occurred '
'while retrieving segments'))
class TestSloBulkLogger(unittest.TestCase):
def test_reused_logger(self):
@ -1474,18 +1646,21 @@ class TestSloCopyHook(SloTestCase):
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/c/o', 'hash': md5hex("obj"),
'bytes': '3'}]))
self.app.register(
'COPY', '/v1/AUTH_test/c/o', swob.HTTPCreated, {})
copy_hook = [None]
# slip this guy in there to pull out the hook
def extract_copy_hook(env, sr):
copy_hook[0] = env['swift.copy_hook']
if env['REQUEST_METHOD'] == 'COPY':
copy_hook[0] = env['swift.copy_hook']
return self.app(env, sr)
self.slo = slo.filter_factory({})(extract_copy_hook)
req = Request.blank('/v1/AUTH_test/c/o',
environ={'REQUEST_METHOD': 'GET'})
environ={'REQUEST_METHOD': 'COPY'})
self.slo(req.environ, fake_start_response)
self.copy_hook = copy_hook[0]
@ -1514,12 +1689,12 @@ class TestSloCopyHook(SloTestCase):
source_resp = Response(request=source_req, status=200,
headers={"X-Static-Large-Object": "true"},
app_iter=[json.dumps([{'name': '/c/o',
'hash': 'obj-etag',
'hash': md5hex("obj"),
'bytes': '3'}])])
modified_resp = self.copy_hook(source_req, source_resp, sink_req)
self.assertTrue(modified_resp is not source_resp)
self.assertEqual(modified_resp.etag, md5("obj-etag").hexdigest())
self.assertEqual(modified_resp.etag, md5hex(md5hex("obj")))
class TestSwiftInfo(unittest.TestCase):