diff --git a/swift/common/middleware/dlo.py b/swift/common/middleware/dlo.py index 9175b18949..d2761acb67 100644 --- a/swift/common/middleware/dlo.py +++ b/swift/common/middleware/dlo.py @@ -17,10 +17,10 @@ import os from ConfigParser import ConfigParser, NoSectionError, NoOptionError from hashlib import md5 from swift.common import constraints -from swift.common.exceptions import ListingIterError +from swift.common.exceptions import ListingIterError, SegmentError from swift.common.http import is_success from swift.common.swob import Request, Response, \ - HTTPRequestedRangeNotSatisfiable, HTTPBadRequest + HTTPRequestedRangeNotSatisfiable, HTTPBadRequest, HTTPConflict from swift.common.utils import get_logger, json, \ RateLimitedIterator, read_conf_dir, quote from swift.common.request_helpers import SegmentedIterable @@ -131,9 +131,10 @@ class GetContext(WSGIContext): constraints.CONTAINER_LISTING_LIMIT first_byte = last_byte = None - content_length = None + actual_content_length = None + content_length_for_swob_range = None if req.range and len(req.range.ranges) == 1: - content_length = sum(o['bytes'] for o in segments) + content_length_for_swob_range = sum(o['bytes'] for o in segments) # This is a hack to handle suffix byte ranges (e.g. "bytes=-5"), # which we can't honor unless we have a complete listing. @@ -144,28 +145,32 @@ class GetContext(WSGIContext): # # Alternately, we may not have all the segments, but this range # falls entirely within the first page's segments, so we know - # whether or not it's satisfiable. - if have_complete_listing or range_end < content_length: - byteranges = req.range.ranges_for_length(content_length) + # that it is satisfiable. + if (have_complete_listing + or range_end < content_length_for_swob_range): + byteranges = req.range.ranges_for_length( + content_length_for_swob_range) if not byteranges: return HTTPRequestedRangeNotSatisfiable(request=req) first_byte, last_byte = byteranges[0] # For some reason, swob.Range.ranges_for_length adds 1 to the # last byte's position. last_byte -= 1 + actual_content_length = last_byte - first_byte + 1 else: # The range may or may not be satisfiable, but we can't tell # based on just one page of listing, and we're not going to go # get more pages because that would use up too many resources, # so we ignore the Range header and return the whole object. - content_length = None + actual_content_length = None + content_length_for_swob_range = None req.range = None response_headers = [ (h, v) for h, v in response_headers if h.lower() not in ("content-length", "content-range")] - if content_length is not None: + if content_length_for_swob_range is not None: # Here, we have to give swob a big-enough content length so that # it can compute the actual content length based on the Range # header. This value will not be visible to the client; swob will @@ -175,10 +180,12 @@ class GetContext(WSGIContext): # segments, this may be less than the sum of all the segments' # sizes. However, it'll still be greater than the last byte in the # Range header, so it's good enough for swob. - response_headers.append(('Content-Length', str(content_length))) - elif have_complete_listing: response_headers.append(('Content-Length', - str(sum(o['bytes'] for o in segments)))) + str(content_length_for_swob_range))) + elif have_complete_listing: + actual_content_length = sum(o['bytes'] for o in segments) + response_headers.append(('Content-Length', + str(actual_content_length))) if have_complete_listing: response_headers = [(h, v) for h, v in response_headers @@ -188,21 +195,30 @@ class GetContext(WSGIContext): etag.update(seg_dict['hash'].strip('"')) response_headers.append(('Etag', '"%s"' % etag.hexdigest())) - listing_iter = RateLimitedIterator( - self._segment_listing_iterator( - req, version, account, container, obj_prefix, segments, - first_byte=first_byte, last_byte=last_byte), - self.dlo.rate_limit_segments_per_sec, - limit_after=self.dlo.rate_limit_after_segment) + app_iter = None + if req.method == 'GET': + listing_iter = RateLimitedIterator( + self._segment_listing_iterator( + req, version, account, container, obj_prefix, segments, + first_byte=first_byte, last_byte=last_byte), + self.dlo.rate_limit_segments_per_sec, + limit_after=self.dlo.rate_limit_after_segment) + + app_iter = SegmentedIterable( + req, self.dlo.app, listing_iter, ua_suffix="DLO MultipartGET", + swift_source="DLO", name=req.path, logger=self.logger, + max_get_time=self.dlo.max_get_time, + response_body_length=actual_content_length) + + try: + app_iter.validate_first_segment() + except (SegmentError, ListingIterError): + return HTTPConflict(request=req) + resp = Response(request=req, headers=response_headers, conditional_response=True, - app_iter=SegmentedIterable( - req, self.dlo.app, listing_iter, - ua_suffix="DLO MultipartGET", - swift_source="DLO", - name=req.path, logger=self.logger, - max_get_time=self.dlo.max_get_time)) - resp.app_iter.response = resp + app_iter=app_iter) + return resp def handle_request(self, req, start_response): diff --git a/swift/common/middleware/slo.py b/swift/common/middleware/slo.py index bedf93e944..24e2faad32 100644 --- a/swift/common/middleware/slo.py +++ b/swift/common/middleware/slo.py @@ -139,11 +139,12 @@ from datetime import datetime import mimetypes import re from hashlib import md5 -from swift.common.exceptions import ListingIterError +from swift.common.exceptions import ListingIterError, SegmentError from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \ HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, \ HTTPOk, HTTPPreconditionFailed, HTTPException, HTTPNotFound, \ - HTTPUnauthorized, HTTPRequestedRangeNotSatisfiable, Response + HTTPUnauthorized, HTTPConflict, HTTPRequestedRangeNotSatisfiable,\ + Response from swift.common.utils import json, get_logger, config_true_value, \ get_valid_utf8_str, override_bytes_from_content_type, split_path, \ register_swift_info, RateLimitedIterator, quote @@ -464,15 +465,27 @@ class SloGetContext(WSGIContext): start_byte, end_byte) for seg_dict, start_byte, end_byte in ratelimited_listing_iter) + segmented_iter = SegmentedIterable( + req, self.slo.app, segment_listing_iter, + name=req.path, logger=self.slo.logger, + ua_suffix="SLO MultipartGET", + swift_source="SLO", + max_get_time=self.slo.max_get_time) + + try: + segmented_iter.validate_first_segment() + except (ListingIterError, SegmentError): + # Copy from the SLO explanation in top of this file. + # If any of the segments from the manifest are not found or + # their Etag/Content Length no longer match the connection + # will drop. In this case a 409 Conflict will be logged in + # the proxy logs and the user will receive incomplete results. + return HTTPConflict(request=req) + response = Response(request=req, content_length=content_length, headers=response_headers, conditional_response=True, - app_iter=SegmentedIterable( - req, self.slo.app, segment_listing_iter, - name=req.path, logger=self.slo.logger, - ua_suffix="SLO MultipartGET", - swift_source="SLO", - max_get_time=self.slo.max_get_time)) + app_iter=segmented_iter) if req.range: response.headers.pop('Etag') return response diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index 5ee246e55a..08e0ab5dc4 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -21,13 +21,14 @@ from swob in here without creating circular imports. """ import hashlib -import sys +import itertools import time from contextlib import contextmanager from urllib import unquote +from swift import gettext_ as _ from swift.common.constraints import FORMAT2CONTENT_TYPE from swift.common.exceptions import ListingIterError, SegmentError -from swift.common.http import is_success, HTTP_SERVICE_UNAVAILABLE +from swift.common.http import is_success from swift.common.swob import HTTPBadRequest, HTTPNotAcceptable from swift.common.utils import split_path, validate_device_partition from swift.common.wsgi import make_subrequest @@ -276,12 +277,13 @@ class SegmentedIterable(object): (just for logging) :param ua_suffix: string to append to user-agent. :param name: name of manifest (used in logging only) - :param response: optional response object for the response being sent - to the client. + :param response_body_length: optional response body length for + the response being sent to the client. """ + def __init__(self, req, app, listing_iter, max_get_time, logger, ua_suffix, swift_source, - name='', response=None): + name='', response_body_length=None): self.req = req self.app = app self.listing_iter = listing_iter @@ -290,26 +292,14 @@ class SegmentedIterable(object): self.ua_suffix = " " + ua_suffix self.swift_source = swift_source self.name = name - self.response = response + self.response_body_length = response_body_length + self.peeked_chunk = None + self.app_iter = self._internal_iter() + self.validated_first_segment = False - def app_iter_range(self, *a, **kw): - """ - swob.Response will only respond with a 206 status in certain cases; one - of those is if the body iterator responds to .app_iter_range(). - - However, this object (or really, its listing iter) is smart enough to - handle the range stuff internally, so we just no-op this out for swob. - """ - return self - - def __iter__(self): + def _internal_iter(self): start_time = time.time() - have_yielded_data = False - - if self.response and self.response.content_length: - bytes_left = int(self.response.content_length) - else: - bytes_left = None + bytes_left = self.response_body_length try: for seg_path, seg_etag, seg_size, first_byte, last_byte \ @@ -366,7 +356,6 @@ class SegmentedIterable(object): seg_hash = hashlib.md5() for chunk in seg_resp.app_iter: seg_hash.update(chunk) - have_yielded_data = True if bytes_left is None: yield chunk elif bytes_left >= len(chunk): @@ -393,29 +382,44 @@ class SegmentedIterable(object): if bytes_left: raise SegmentError( - 'Not enough bytes for %s; closing connection' % - self.name) - - except ListingIterError as err: - # I have to save this error because yielding the ' ' below clears - # the exception from the current stack frame. - excinfo = sys.exc_info() - self.logger.exception('ERROR: While processing manifest %s, %s', - self.name, err) - # Normally, exceptions before any data has been yielded will - # cause Eventlet to send a 5xx response. In this particular - # case of ListingIterError we don't want that and we'd rather - # just send the normal 2xx response and then hang up early - # since 5xx codes are often used to judge Service Level - # Agreements and this ListingIterError indicates the user has - # created an invalid condition. - if not have_yielded_data: - yield ' ' - raise excinfo - except SegmentError as err: - self.logger.exception(err) - # This doesn't actually change the response status (we're too - # late for that), but this does make it to the logs. - if self.response: - self.response.status = HTTP_SERVICE_UNAVAILABLE + 'Not enough bytes for %s; closing connection' % self.name) + except (ListingIterError, SegmentError): + self.logger.exception(_('ERROR: An error occurred ' + 'while retrieving segments')) raise + + def app_iter_range(self, *a, **kw): + """ + swob.Response will only respond with a 206 status in certain cases; one + of those is if the body iterator responds to .app_iter_range(). + + However, this object (or really, its listing iter) is smart enough to + handle the range stuff internally, so we just no-op this out for swob. + """ + return self + + def validate_first_segment(self): + """ + Start fetching object data to ensure that the first segment (if any) is + valid. This is to catch cases like "first segment is missing" or + "first segment's etag doesn't match manifest". + + Note: this does not validate that you have any segments. A + zero-segment large object is not erroneous; it is just empty. + """ + if self.validated_first_segment: + return + self.validated_first_segment = True + + try: + self.peeked_chunk = self.app_iter.next() + except StopIteration: + pass + + def __iter__(self): + if self.peeked_chunk is not None: + pc = self.peeked_chunk + self.peeked_chunk = None + return itertools.chain([pc], self.app_iter) + else: + return self.app_iter diff --git a/test/unit/common/middleware/test_dlo.py b/test/unit/common/middleware/test_dlo.py index 492a8092fc..a292bc92b8 100644 --- a/test/unit/common/middleware/test_dlo.py +++ b/test/unit/common/middleware/test_dlo.py @@ -73,7 +73,7 @@ class DloTestCase(unittest.TestCase): # don't slow down tests with rate limiting 'rate_limit_after_segment': '1000000', })(self.app) - + self.dlo.logger = self.app.logger self.app.register( 'GET', '/v1/AUTH_test/c/seg_01', swob.HTTPOk, {'Content-Length': '5', 'Etag': md5hex("aaaaa")}, @@ -562,12 +562,11 @@ class TestDloGetManifest(DloTestCase): req = swob.Request.blank('/v1/AUTH_test/mancon/manifest', environ={'REQUEST_METHOD': 'GET'}) - status, headers, body, exc = self.call_dlo(req, expect_exception=True) - headers = swob.HeaderKeyDict(headers) - self.assertTrue(isinstance(exc, exceptions.SegmentError)) - - self.assertEqual(status, "200 OK") - self.assertEqual(body, '') # error right away -> no body bytes sent + status, headers, body = self.call_dlo(req) + self.assertEqual(status, "409 Conflict") + err_log = self.dlo.logger.log_dict['exception'][0][0][0] + self.assertTrue(err_log.startswith('ERROR: An error occurred ' + 'while retrieving segments')) def test_error_fetching_second_segment(self): self.app.register( @@ -582,6 +581,9 @@ class TestDloGetManifest(DloTestCase): self.assertTrue(isinstance(exc, exceptions.SegmentError)) self.assertEqual(status, "200 OK") self.assertEqual(''.join(body), "aaaaa") # first segment made it out + err_log = self.dlo.logger.log_dict['exception'][0][0][0] + self.assertTrue(err_log.startswith('ERROR: An error occurred ' + 'while retrieving segments')) def test_error_listing_container_first_listing_request(self): self.app.register( @@ -626,7 +628,7 @@ class TestDloGetManifest(DloTestCase): self.assertEqual(''.join(body), "aaaaabbWRONGbb") # stop after error def test_etag_comparison_ignores_quotes(self): - # a little future-proofing here in case we ever fix this + # a little future-proofing here in case we ever fix this in swob self.app.register( 'HEAD', '/v1/AUTH_test/mani/festo', swob.HTTPOk, {'Content-Length': '0', 'Etag': 'blah', diff --git a/test/unit/common/middleware/test_slo.py b/test/unit/common/middleware/test_slo.py index e3d978031d..2dd076ad72 100644 --- a/test/unit/common/middleware/test_slo.py +++ b/test/unit/common/middleware/test_slo.py @@ -55,6 +55,7 @@ class SloTestCase(unittest.TestCase): self.app = FakeSwift() self.slo = slo.filter_factory({})(self.app) self.slo.min_segment_size = 1 + self.slo.logger = self.app.logger def call_app(self, req, app=None, expect_exception=False): if app is None: @@ -1286,6 +1287,119 @@ class TestSloGetManifest(SloTestCase): # make sure we didn't keep asking for segments self.assertEqual(self.app.call_count, 20) + def test_sub_slo_recursion(self): + # man1 points to man2 and obj1, man2 points to man3 and obj2... + for i in xrange(11): + self.app.register('GET', '/v1/AUTH_test/gettest/obj%d' % i, + swob.HTTPOk, {'Content-Type': 'text/plain', + 'Content-Length': '6', + 'Etag': md5hex('body%02d' % i)}, + 'body%02d' % i) + + manifest_json = json.dumps([{'name': '/gettest/obj%d' % i, + 'hash': md5hex('body%2d' % i), + 'content_type': 'text/plain', + 'bytes': '6'}]) + self.app.register( + 'GET', '/v1/AUTH_test/gettest/man%d' % i, + swob.HTTPOk, {'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true', + 'Etag': 'man%d' % i}, + manifest_json) + self.app.register( + 'HEAD', '/v1/AUTH_test/gettest/obj%d' % i, + swob.HTTPOk, {'Content-Length': '6', + 'Etag': md5hex('body%2d' % i)}, + None) + + for i in xrange(9, 0, -1): + manifest_data = [ + {'name': '/gettest/man%d' % (i + 1), + 'hash': 'man%d' % (i + 1), + 'sub_slo': True, + 'bytes': len(manifest_json), + 'content_type': + 'application/json;swift_bytes=%d' % ((10 - i) * 6)}, + {'name': '/gettest/obj%d' % i, + 'hash': md5hex('body%02d' % i), + 'bytes': '6', + 'content_type': 'text/plain'}] + + manifest_json = json.dumps(manifest_data) + self.app.register( + 'GET', '/v1/AUTH_test/gettest/man%d' % i, + swob.HTTPOk, {'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true', + 'Etag': 'man%d' % i}, + manifest_json) + + req = Request.blank( + '/v1/AUTH_test/gettest/man1', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual(status, '200 OK') + self.assertEqual(body, ('body10body09body08body07body06' + + 'body05body04body03body02body01')) + + self.assertEqual(self.app.call_count, 20) + + def test_sub_slo_recursion_limit(self): + # man1 points to man2 and obj1, man2 points to man3 and obj2... + for i in xrange(12): + self.app.register('GET', '/v1/AUTH_test/gettest/obj%d' % i, + swob.HTTPOk, + {'Content-Type': 'text/plain', + 'Content-Length': '6', + 'Etag': md5hex('body%02d' % i)}, 'body%02d' % i) + + manifest_json = json.dumps([{'name': '/gettest/obj%d' % i, + 'hash': md5hex('body%2d' % i), + 'content_type': 'text/plain', + 'bytes': '6'}]) + self.app.register( + 'GET', '/v1/AUTH_test/gettest/man%d' % i, + swob.HTTPOk, {'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true', + 'Etag': 'man%d' % i}, + manifest_json) + self.app.register( + 'HEAD', '/v1/AUTH_test/gettest/obj%d' % i, + swob.HTTPOk, {'Content-Length': '6', + 'Etag': md5hex('body%2d' % i)}, + None) + + for i in xrange(11, 0, -1): + manifest_data = [ + {'name': '/gettest/man%d' % (i + 1), + 'hash': 'man%d' % (i + 1), + 'sub_slo': True, + 'bytes': len(manifest_json), + 'content_type': + 'application/json;swift_bytes=%d' % ((12 - i) * 6)}, + {'name': '/gettest/obj%d' % i, + 'hash': md5hex('body%02d' % i), + 'bytes': '6', + 'content_type': 'text/plain'}] + manifest_json = json.dumps(manifest_data) + self.app.register('GET', '/v1/AUTH_test/gettest/man%d' % i, + swob.HTTPOk, + {'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true', + 'Etag': 'man%d' % i}, + manifest_json) + + req = Request.blank( + '/v1/AUTH_test/gettest/man1', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual(status, '409 Conflict') + self.assertEqual(self.app.call_count, 10) + err_log = self.slo.logger.log_dict['exception'][0][0][0] + self.assertTrue(err_log.startswith('ERROR: An error occurred ' + 'while retrieving segments')) + def test_get_with_if_modified_since(self): # It's important not to pass the If-[Un]Modified-Since header to the # proxy for segment or submanifest GET requests, as it may result in @@ -1356,11 +1470,12 @@ class TestSloGetManifest(SloTestCase): req = Request.blank( '/v1/AUTH_test/gettest/manifest-manifest-a', environ={'REQUEST_METHOD': 'GET'}) - status, headers, body, exc = self.call_slo(req, expect_exception=True) + status, headers, body = self.call_slo(req) - self.assertTrue(isinstance(exc, ListingIterError)) - self.assertEqual('200 OK', status) - self.assertEqual(body, ' ') + self.assertEqual('409 Conflict', status) + err_log = self.slo.logger.log_dict['exception'][0][0][0] + self.assertTrue(err_log.startswith('ERROR: An error occurred ' + 'while retrieving segments')) def test_invalid_json_submanifest(self): self.app.register( @@ -1421,6 +1536,42 @@ class TestSloGetManifest(SloTestCase): self.assertEqual('200 OK', status) self.assertEqual(body, 'aaaaa') + def test_first_segment_mismatched_etag(self): + self.app.register('GET', '/v1/AUTH_test/gettest/manifest-badetag', + swob.HTTPOk, {'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true'}, + json.dumps([{'name': '/gettest/a_5', + 'hash': 'wrong!', + 'content_type': 'text/plain', + 'bytes': '5'}])) + + req = Request.blank('/v1/AUTH_test/gettest/manifest-badetag', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('409 Conflict', status) + err_log = self.slo.logger.log_dict['exception'][0][0][0] + self.assertTrue(err_log.startswith('ERROR: An error occurred ' + 'while retrieving segments')) + + def test_first_segment_mismatched_size(self): + self.app.register('GET', '/v1/AUTH_test/gettest/manifest-badsize', + swob.HTTPOk, {'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true'}, + json.dumps([{'name': '/gettest/a_5', + 'hash': md5hex('a' * 5), + 'content_type': 'text/plain', + 'bytes': '999999'}])) + + req = Request.blank('/v1/AUTH_test/gettest/manifest-badsize', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('409 Conflict', status) + err_log = self.slo.logger.log_dict['exception'][0][0][0] + self.assertTrue(err_log.startswith('ERROR: An error occurred ' + 'while retrieving segments')) + def test_download_takes_too_long(self): the_time = [time.time()] @@ -1454,6 +1605,27 @@ class TestSloGetManifest(SloTestCase): ('GET', '/v1/AUTH_test/gettest/b_10?multipart-manifest=get'), ('GET', '/v1/AUTH_test/gettest/c_15?multipart-manifest=get')]) + def test_first_segment_not_exists(self): + self.app.register('GET', '/v1/AUTH_test/gettest/not_exists_obj', + swob.HTTPNotFound, {}, None) + self.app.register('GET', '/v1/AUTH_test/gettest/manifest-not-exists', + swob.HTTPOk, {'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true'}, + json.dumps([{'name': '/gettest/not_exists_obj', + 'hash': md5hex('not_exists_obj'), + 'content_type': 'text/plain', + 'bytes': '%d' % len('not_exists_obj') + }])) + + req = Request.blank('/v1/AUTH_test/gettest/manifest-not-exists', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('409 Conflict', status) + err_log = self.slo.logger.log_dict['exception'][0][0][0] + self.assertTrue(err_log.startswith('ERROR: An error occurred ' + 'while retrieving segments')) + class TestSloBulkLogger(unittest.TestCase): def test_reused_logger(self): @@ -1474,18 +1646,21 @@ class TestSloCopyHook(SloTestCase): 'X-Static-Large-Object': 'true'}, json.dumps([{'name': '/c/o', 'hash': md5hex("obj"), 'bytes': '3'}])) + self.app.register( + 'COPY', '/v1/AUTH_test/c/o', swob.HTTPCreated, {}) copy_hook = [None] # slip this guy in there to pull out the hook def extract_copy_hook(env, sr): - copy_hook[0] = env['swift.copy_hook'] + if env['REQUEST_METHOD'] == 'COPY': + copy_hook[0] = env['swift.copy_hook'] return self.app(env, sr) self.slo = slo.filter_factory({})(extract_copy_hook) req = Request.blank('/v1/AUTH_test/c/o', - environ={'REQUEST_METHOD': 'GET'}) + environ={'REQUEST_METHOD': 'COPY'}) self.slo(req.environ, fake_start_response) self.copy_hook = copy_hook[0] @@ -1514,12 +1689,12 @@ class TestSloCopyHook(SloTestCase): source_resp = Response(request=source_req, status=200, headers={"X-Static-Large-Object": "true"}, app_iter=[json.dumps([{'name': '/c/o', - 'hash': 'obj-etag', + 'hash': md5hex("obj"), 'bytes': '3'}])]) modified_resp = self.copy_hook(source_req, source_resp, sink_req) self.assertTrue(modified_resp is not source_resp) - self.assertEqual(modified_resp.etag, md5("obj-etag").hexdigest()) + self.assertEqual(modified_resp.etag, md5hex(md5hex("obj"))) class TestSwiftInfo(unittest.TestCase):