slo: refactor GET/HEAD response handling

This patch reorganizes the SLO read response handling.  The main goal
was to push the response header replacement for both GET/HEAD SLO and
multipart-manifest=get paths all into a common return path.  A new
RespAttrs primitive is used to carry around some metadata details from
requests made in SLO.  The authors hope these changes make the code more
easily readable and easier to modify.

Drive-By: add new "friendly_close" function in common.utils so we can
drain empty/error responses more confidently (and use it in swob and
request_helpers).

Drive-By: the tests added in the Related-Change discovered a 500 on
If-[Un]Modified-Since conditional GET requests - it probably wasn't
important, but this refactor fixed it on accident as a side effect.

Closes-Bug: #2040178
Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
Co-Authored-By: Ashwin Nair <nairashwin952013@gmail.com>
Related-Change-Id: I54094f3d2098f56b755ec19cc9315d06a6ca8b15
Change-Id: Idc84e70539fc7480b6ecb86e2f0da904baf2c727
This commit is contained in:
Clay Gerrard 2023-10-11 09:33:46 -05:00
parent d93ce63333
commit 4a37a2976b
9 changed files with 1036 additions and 434 deletions

View File

@ -334,6 +334,7 @@ import time
import six
from swift.cli.container_deleter import make_delete_jobs
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.middleware.listing_formats import \
MAX_CONTAINER_LISTING_CONTENT_LENGTH
@ -345,15 +346,16 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
RESPONSE_REASONS, str_to_wsgi, bytes_to_wsgi, wsgi_to_str, wsgi_quote
from swift.common.utils import get_logger, config_true_value, \
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
RateLimitedIterator, quote, close_if_possible, closing_if_possible, \
LRUCache, StreamingPile, strict_b64decode, Timestamp, drain_and_close, \
RateLimitedIterator, quote, closing_if_possible, \
LRUCache, StreamingPile, strict_b64decode, Timestamp, friendly_close, \
get_expirer_container, md5
from swift.common.registry import register_swift_info
from swift.common.request_helpers import SegmentedIterable, \
get_sys_meta_prefix, update_etag_is_at_header, resolve_etag_is_at_header, \
get_container_update_override_key, update_ignore_range_header
get_container_update_override_key, update_ignore_range_header, \
get_param
from swift.common.constraints import check_utf8, AUTO_CREATE_ACCOUNT_PREFIX
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED
from swift.common.wsgi import WSGIContext, make_subrequest, make_env, \
make_pre_authed_request
from swift.common.middleware.bulk import get_response_body, \
@ -530,6 +532,129 @@ def parse_and_validate_input(req_body, req_path):
return parsed_data
def _annotate_segments(segments, logger=None):
"""
Decode any inlined data and update sub_slo segments bytes from content-type
when available; then annotate segment dicts in segments list with
'segment_length'.
N.B. raw_data segments don't have a bytes key and range-segments need to
calculate their length from their range key but afterwards all segments
dicts will have 'segment_length' representing the length of the segment.
"""
for seg_dict in segments:
if 'data' in seg_dict:
seg_dict['raw_data'] = base64.b64decode(seg_dict.pop('data'))
segment_length = len(seg_dict['raw_data'])
else:
if config_true_value(seg_dict.get('sub_slo')):
override_bytes_from_content_type(
seg_dict, logger=logger)
seg_range = seg_dict.get('range')
if seg_range is not None:
# The range is of the form N-M, where N and M are both
# positive decimal integers. We know this because this
# middleware is the only thing that creates the SLO
# manifests stored in the cluster.
range_start, range_end = [
int(x) for x in seg_range.split('-')]
segment_length = (range_end - range_start) + 1
else:
segment_length = int(seg_dict['bytes'])
seg_dict['segment_length'] = segment_length
class RespAttrs(object):
"""
Encapsulate properties of a GET or HEAD response that are pertinent to
handling a potential SLO response.
Instances of this class are typically constructed using the
``from_headers`` method.
:param is_slo: True if the response appears to be an SLO manifest, False
otherwise.
:param timestamp: an instance of :class:`~swift.common.utils.Timestamp`.
:param manifest_etag: the Etag of the manifest object, or None if
``is_slo`` is False.
:param slo_etag: the Etag of the SLO.
:param slo_size: the size of the SLO.
"""
def __init__(self, is_slo, timestamp, manifest_etag, slo_etag, slo_size):
self.is_slo = bool(is_slo)
self.timestamp = Timestamp(timestamp or 0)
# manifest_etag is unambiguous, but json_md5 is even more explicit
self.json_md5 = manifest_etag or ''
self.slo_etag = slo_etag or ''
try:
# even though it's from sysmeta, we have to worry about empty
# values - see test_get_invalid_sysmeta_passthrough
self.slo_size = int(slo_size)
except (ValueError, TypeError):
self.slo_size = -1
self.is_legacy = not self._has_size_and_etag()
def _has_size_and_etag(self):
return self.slo_size > 0 and self.slo_etag
@classmethod
def from_headers(cls, response_headers):
"""
Inspect response headers and extract any resp_attrs we can find.
:param response_headers: list of tuples from a object response
:returns: an instance of RespAttrs to represent the response headers
"""
is_slo = False
timestamp = None
found_etag = None
slo_etag = None
slo_size = None
for header, value in response_headers:
header = header.lower()
if header == 'x-static-large-object':
is_slo = config_true_value(value)
elif header == 'x-backend-timestamp':
timestamp = value
elif header == 'etag':
found_etag = value
elif header == SYSMETA_SLO_ETAG:
slo_etag = value
elif header == SYSMETA_SLO_SIZE:
slo_size = value
manifest_etag = found_etag if is_slo else None
return cls(is_slo, timestamp, manifest_etag, slo_etag, slo_size)
def update_from_segments(self, segments):
"""
Always called if SLO has fetched the manifest response body, for
legacy manifests we'll calculate size/etag values we wouldn't have
gotten from sys-meta headers.
"""
# we only have to set size/etag once; it doesn't matter if we got the
# values from sysmeta headers or segments
if self._has_size_and_etag():
return
calculated_size = 0
calculated_etag = md5(usedforsecurity=False)
for seg_dict in segments:
calculated_size += seg_dict['segment_length']
if 'raw_data' in seg_dict:
r = md5(seg_dict['raw_data'],
usedforsecurity=False).hexdigest()
elif seg_dict.get('range'):
r = '%s:%s;' % (seg_dict['hash'], seg_dict['range'])
else:
r = seg_dict['hash']
calculated_etag.update(r.encode('ascii'))
self.slo_size = calculated_size
self.slo_etag = calculated_etag.hexdigest()
class SloGetContext(WSGIContext):
max_slo_recursion_depth = 10
@ -537,6 +662,8 @@ class SloGetContext(WSGIContext):
def __init__(self, slo):
self.slo = slo
super(SloGetContext, self).__init__(slo.app)
# we'll know more after we look at the response metadata
self.segment_listing_needed = False
def _fetch_sub_slo_segments(self, req, version, acc, con, obj):
"""
@ -571,9 +698,8 @@ class SloGetContext(WSGIContext):
body if len(body) <= 60 else body[:57] + '...'))
try:
with closing_if_possible(sub_resp.app_iter):
return json.loads(b''.join(sub_resp.app_iter))
except ValueError as err:
return self._parse_segments(sub_resp.app_iter)
except HTTPException as err:
raise ListingIterError(
'while fetching %s, JSON-decoding of submanifest %s '
'failed with %s' % (req.path, sub_req.path, err))
@ -584,32 +710,8 @@ class SloGetContext(WSGIContext):
conobj=seg_dict['name'].lstrip('/')
)
def _segment_length(self, seg_dict):
"""
Returns the number of bytes that will be fetched from the specified
segment on a plain GET request for this SLO manifest.
"""
if 'raw_data' in seg_dict:
return len(seg_dict['raw_data'])
seg_range = seg_dict.get('range')
if seg_range is not None:
# The range is of the form N-M, where N and M are both positive
# decimal integers. We know this because this middleware is the
# only thing that creates the SLO manifests stored in the
# cluster.
range_start, range_end = [int(x) for x in seg_range.split('-')]
return (range_end - range_start) + 1
else:
return int(seg_dict['bytes'])
def _segment_listing_iterator(self, req, version, account, segments,
byteranges):
for seg_dict in segments:
if config_true_value(seg_dict.get('sub_slo')):
override_bytes_from_content_type(seg_dict,
logger=self.slo.logger)
# We handle the range stuff here so that we can be smart about
# skipping unused submanifests. For example, if our first segment is a
# submanifest referencing 50 MiB total, but start_byte falls in
@ -617,9 +719,6 @@ class SloGetContext(WSGIContext):
#
# If we were to make SegmentedIterable handle all the range
# calculations, we would be unable to make this optimization.
total_length = sum(self._segment_length(seg) for seg in segments)
if not byteranges:
byteranges = [(0, total_length - 1)]
# Cache segments from sub-SLOs in case more than one byterange
# includes data from a particular sub-SLO. We only cache a few sets
@ -646,12 +745,26 @@ class SloGetContext(WSGIContext):
first_byte, last_byte,
cached_fetch_sub_slo_segments,
recursion_depth=1):
"""
Iterable that generates a filtered and annotated stream of segment
dicts describing the sub-segment ranges that would be used by the
SegmentedIterable to construct the bytes for a ranged response.
:param req: original request object
:param version: version
:param account: account
:param segments: segments dictionary
:param first_byte: offset into the large object for the first byte
that is returned to the client
:param last_byte: offset into the large object for the last byte
that is returned to the client
:param cached_fetch_sub_slo_segments: LRU cache used for fetching
sub-segments
:param recursion_depth: max number of recursive sub_slo calls
"""
last_sub_path = None
for seg_dict in segments:
if 'data' in seg_dict:
seg_dict['raw_data'] = strict_b64decode(seg_dict.pop('data'))
seg_length = self._segment_length(seg_dict)
seg_length = seg_dict['segment_length']
if first_byte >= seg_length:
# don't need any bytes from this segment
first_byte -= seg_length
@ -718,50 +831,194 @@ class SloGetContext(WSGIContext):
first_byte -= seg_length
last_byte -= seg_length
def _need_to_refetch_manifest(self, req):
"""
Just because a response shows that an object is a SLO manifest does not
mean that response's body contains the entire SLO manifest. If it
doesn't, we need to make a second request to actually get the whole
thing.
def _is_body_complete(self):
content_range = ''
for header, value in self._response_headers:
if header.lower() == 'content-range':
content_range = value
break
# e.g. Content-Range: bytes 0-14289/14290
match = re.match(r'bytes (\d+)-(\d+)/(\d+)$', content_range)
if not match:
# Malformed or missing, so we don't know what we got.
return False
first_byte, last_byte, length = [int(x) for x in match.groups()]
# If and only if we actually got back the full manifest body, then
# we can avoid re-fetching the object.
return first_byte == 0 and last_byte == length - 1
Note: this assumes that X-Static-Large-Object has already been found.
def _is_manifest_and_need_to_refetch(self, req, resp_attrs,
is_manifest_get):
"""
Check if the segments will be needed to service the request and update
the segment_listing_needed attribute.
:return: boolean indicating if we need to refetch, only if the segments
ARE needed we MAY need to refetch them!
"""
if not resp_attrs.is_slo:
# Not a static large object manifest, maybe an error, regardless
# no refetch needed
return False
if is_manifest_get:
# Any manifest json object response will do
return False
if req.method == 'HEAD':
# We've already looked for SYSMETA_SLO_ETAG/SIZE in the response
# and didn't find them. We have to fetch the whole manifest and
# recompute.
# There may be some cases in the future where a HEAD resp on even a
# modern manifest should refetch, e.g. lp bug #2029174
self.segment_listing_needed = resp_attrs.is_legacy
# it will always be the case that a HEAD must re-fetch iff
# segment_listing_needed
return self.segment_listing_needed
last_resp_status_int = self._get_status_int()
# These are based on etag (or last-modified), but the SLO's etag is
# almost certainly not the manifest object's etag. Still, it's highly
# likely that the submitted If-None-Match won't match the manifest
# object's etag, so we can avoid re-fetching the manifest if we got a
# successful response.
if last_resp_status_int in (412, 304):
# a conditional response from a modern manifest would have an
# accurate SLO etag, AND comparison with the etag-is-at header, but
# for legacy manifests responses (who always need to calculate the
# correct etag, even for if-[un]modified-since errors) we can't say
# what the etag is or if it matches unless we calculate it from
# segments - so we always need them
self.segment_listing_needed = resp_attrs.is_legacy
# if we need them; we can't get them from the error
return self.segment_listing_needed
# This is GET request for an SLO object, if we're going to return a
# successful response we're going to need the segments, but this
# resp_iter may not contain the entire SLO manifest.
self.segment_listing_needed = True
# modern swift object-servers should ignore Range headers on manifests,
# but during upgrade if we get a range response we'll probably have to
# refetch
if last_resp_status_int == 416:
# if the range wasn't satisfiable we need to refetch
return True
elif last_resp_status_int == 206:
# a partial response might included the whole content-range?!
return not self._is_body_complete()
else:
# a good number of error responses would have returned earlier for
# lacking is_slo sys-meta, at this point we've filtered all the
# other response codes, so this is a prefectly normal 200 response,
# no need to refetch
return False
response_status = int(self._response_status[:3])
def _refetch_manifest(self, req, resp_iter, orig_resp_attrs):
req.environ['swift.non_client_disconnect'] = True
friendly_close(resp_iter)
del req.environ['swift.non_client_disconnect']
# These are based on etag, and the SLO's etag is almost certainly not
# the manifest object's etag. Still, it's highly likely that the
# submitted If-None-Match won't match the manifest object's etag, so
# we can avoid re-fetching the manifest if we got a successful
# response.
if ((req.if_match or req.if_none_match) and
not is_success(response_status)):
return True
get_req = make_subrequest(
req.environ, method='GET',
headers={'x-auth-token': req.headers.get('x-auth-token')},
agent='%(orig)s SLO MultipartGET', swift_source='SLO')
resp_iter = self._app_call(get_req.environ)
new_resp_attrs = RespAttrs.from_headers(self._response_headers)
if new_resp_attrs.timestamp < orig_resp_attrs.timestamp and \
not new_resp_attrs.is_slo:
# Our *orig_resp_attrs* saw *newer* data that indicated it was an
# SLO, but on refetch it's an older object or error; 503 seems
# reasonable?
friendly_close(resp_iter)
raise HTTPServiceUnavailable(request=req)
# else, the caller will know how to return this response
return new_resp_attrs, resp_iter
if req.range and response_status in (206, 416):
content_range = ''
for header, value in self._response_headers:
if header.lower() == 'content-range':
content_range = value
break
# e.g. Content-Range: bytes 0-14289/14290
match = re.match(r'bytes (\d+)-(\d+)/(\d+)$', content_range)
if not match:
# Malformed or missing, so we don't know what we got.
return True
first_byte, last_byte, length = [int(x) for x in match.groups()]
# If and only if we actually got back the full manifest body, then
# we can avoid re-fetching the object.
got_everything = (first_byte == 0 and last_byte == length - 1)
return not got_everything
def _parse_segments(self, resp_iter):
"""
Read the manifest body and parse segments.
return False
:returns: segments
:raises: HTTPServerError
"""
segments = self._get_manifest_read(resp_iter)
_annotate_segments(segments, logger=self.slo.logger)
return segments
def _return_manifest_response(self, req, start_response, resp_iter,
is_format_raw):
if is_format_raw:
json_data = self.convert_segment_listing(resp_iter)
# we've created a new response body
resp_iter = [json_data]
replace_headers = {
# Note that we have to return the large object's content-type
# (not application/json) so it's like what the client sent on
# PUT. Otherwise, server-side copy won't work.
'Content-Length': len(json_data),
'Etag': md5(json_data, usedforsecurity=False).hexdigest(),
}
else:
# we're going to return the manifest resp_iter as-is
replace_headers = {
'Content-Type': 'application/json; charset=utf-8',
}
return self._return_response(req, start_response, resp_iter,
replace_headers)
def _return_slo_response(self, req, start_response, resp_iter, resp_attrs):
if self.segment_listing_needed:
# consume existing resp_iter; we'll create a new one
segments = self._parse_segments(resp_iter)
resp_attrs.update_from_segments(segments)
if req.method == 'HEAD':
resp_iter = []
else:
resp_iter = self._build_resp_iter(req, segments, resp_attrs)
headers = {
'Etag': '"%s"' % resp_attrs.slo_etag,
'X-Manifest-Etag': resp_attrs.json_md5,
# This isn't correct for range requests, but swob will fix it?
'Content-Length': str(resp_attrs.slo_size),
# ignore bogus content-range, make swob figure it out
'Content-Range': None
}
return self._return_response(req, start_response, resp_iter,
replace_headers=headers)
def _return_response(self, req, start_response, resp_iter,
replace_headers):
if req.method == 'HEAD' or self._get_status_int() in (412, 304):
# we should drain HEAD and unmet condition responses since they
# don't have bodies
friendly_close(resp_iter)
resp_iter = b''
resp_headers = HeaderKeyDict(self._response_headers, **replace_headers)
resp = Response(
status=self._response_status,
headers=resp_headers,
app_iter=resp_iter,
request=req,
conditional_response=True,
conditional_etag=resolve_etag_is_at_header(req, resp_headers))
return resp(req.environ, start_response)
def _return_non_slo_response(self, req, start_response, resp_iter):
# our "pass-through" response may have been from a manifest refetch w/o
# range/conditional headers that turned out to be a real object, and
# now we want out. But if the original client request included Range
# or Conditional headers we can trust swob to do the right conversion
# back into a 206/416/304/412 (as long as the response we have is a
# normal successful response and we respect any forwarding middleware's
# etag-is-at header that we stripped off for the refetch!)
resp = Response(
status=self._response_status,
headers=self._response_headers,
app_iter=resp_iter,
request=req,
conditional_response=self._get_status_int() == 200,
conditional_etag=resolve_etag_is_at_header(
req, self._response_headers)
)
return resp(req.environ, start_response)
def handle_slo_get_or_head(self, req, start_response):
"""
@ -774,137 +1031,61 @@ class SloGetContext(WSGIContext):
large object manifest.
:param start_response: WSGI start_response callable
"""
if req.params.get('multipart-manifest') != 'get':
is_manifest_get = get_param(req, 'multipart-manifest') == 'get'
is_format_raw = is_manifest_get and get_param(req, 'format') == 'raw'
if not is_manifest_get:
# If this object is an SLO manifest, we may have saved off the
# large object etag during the original PUT. Send an
# X-Backend-Etag-Is-At header so that, if the SLO etag *was*
# saved, we can trust the object-server to respond appropriately
# to If-Match/If-None-Match requests.
# X-Backend-Etag-Is-At header so that, if the SLO etag *was* saved,
# we can trust the object-server to respond appropriately to
# If-Match/If-None-Match requests.
update_etag_is_at_header(req, SYSMETA_SLO_ETAG)
# Tell the object server that if it's a manifest,
# we want the whole thing
update_ignore_range_header(req, 'X-Static-Large-Object')
# process original request
resp_iter = self._app_call(req.environ)
resp_attrs = RespAttrs.from_headers(self._response_headers)
# the next two calls hide a couple side-effects, sorry:
#
# 1) regardless of the return value the "need_to_refetch" check *may*
# also set self.segment_listing_needed = True (it's commented to
# help you wrap your head around that one, good luck)
# 2) if we refetch, we overwrite the current resp_iter and resp_attrs
# variables, partly because we *might* get back a NOT
# resp_attrs.is_slo response (even if we had one to start), but
# hopefully they're just the manifest resp we needed to refetch!
if self._is_manifest_and_need_to_refetch(req, resp_attrs,
is_manifest_get):
resp_attrs, resp_iter = self._refetch_manifest(
req, resp_iter, resp_attrs)
# make sure this response is for a static large object manifest
slo_marker = slo_etag = slo_size = slo_timestamp = None
for header, value in self._response_headers:
header = header.lower()
if header == SYSMETA_SLO_ETAG:
slo_etag = value
elif header == SYSMETA_SLO_SIZE:
slo_size = value
elif (header == 'x-static-large-object' and
config_true_value(value)):
slo_marker = value
elif header == 'x-backend-timestamp':
slo_timestamp = value
if not resp_attrs.is_slo:
# even if the original resp_attrs may have been SLO we may have
# refetched, this also handles the server error case
return self._return_non_slo_response(
req, start_response, resp_iter)
if slo_marker and slo_etag and slo_size and slo_timestamp:
break
if is_manifest_get:
# manifest pass through doesn't require resp_attrs
return self._return_manifest_response(req, start_response,
resp_iter, is_format_raw)
if not slo_marker:
# Not a static large object manifest. Just pass it through.
start_response(self._response_status,
self._response_headers,
self._response_exc_info)
return resp_iter
# this a GET/HEAD response for the SLO object (not the manifest)
return self._return_slo_response(req, start_response, resp_iter,
resp_attrs)
# Handle pass-through request for the manifest itself
if req.params.get('multipart-manifest') == 'get':
if req.params.get('format') == 'raw':
resp_iter = self.convert_segment_listing(
self._response_headers, resp_iter)
else:
new_headers = []
for header, value in self._response_headers:
if header.lower() == 'content-type':
new_headers.append(('Content-Type',
'application/json; charset=utf-8'))
else:
new_headers.append((header, value))
self._response_headers = new_headers
start_response(self._response_status,
self._response_headers,
self._response_exc_info)
return resp_iter
is_conditional = self._response_status.startswith(('304', '412')) and (
req.if_match or req.if_none_match)
if slo_etag and slo_size and (
req.method == 'HEAD' or is_conditional):
# Since we have length and etag, we can respond immediately
resp = Response(
status=self._response_status,
headers=self._response_headers,
app_iter=resp_iter,
request=req,
conditional_etag=resolve_etag_is_at_header(
req, self._response_headers),
conditional_response=True)
resp.headers.update({
'Etag': '"%s"' % slo_etag,
'X-Manifest-Etag': self._response_header_value('etag'),
'Content-Length': slo_size,
})
return resp(req.environ, start_response)
if self._need_to_refetch_manifest(req):
req.environ['swift.non_client_disconnect'] = True
close_if_possible(resp_iter)
del req.environ['swift.non_client_disconnect']
get_req = make_subrequest(
req.environ, method='GET',
headers={'x-auth-token': req.headers.get('x-auth-token')},
agent='%(orig)s SLO MultipartGET', swift_source='SLO')
resp_iter = self._app_call(get_req.environ)
slo_marker = config_true_value(self._response_header_value(
'x-static-large-object'))
if not slo_marker: # will also catch non-2xx responses
got_timestamp = self._response_header_value(
'x-backend-timestamp') or '0'
if Timestamp(got_timestamp) >= Timestamp(slo_timestamp):
# We've got a newer response available, so serve that.
# Note that if there's data, it's going to be a 200 now,
# not a 206, and we're not going to drop bytes in the
# proxy on the client's behalf. Fortunately, the RFC is
# pretty forgiving for a server; there's no guarantee that
# a Range header will be respected.
resp = Response(
status=self._response_status,
headers=self._response_headers,
app_iter=resp_iter,
request=req,
conditional_etag=resolve_etag_is_at_header(
req, self._response_headers),
conditional_response=is_success(
int(self._response_status[:3])))
return resp(req.environ, start_response)
else:
# We saw newer data that indicated it's an SLO, but
# couldn't fetch the whole thing; 503 seems reasonable?
close_if_possible(resp_iter)
raise HTTPServiceUnavailable(request=req)
# NB: we might have gotten an out-of-date manifest -- that's OK;
# we'll just try to serve the old data
# Any Content-Range from a manifest is almost certainly wrong for the
# full large object.
resp_headers = [(h, v) for h, v in self._response_headers
if not h.lower() == 'content-range']
response = self.get_or_head_response(
req, resp_headers, resp_iter)
return response(req.environ, start_response)
def convert_segment_listing(self, resp_headers, resp_iter):
def convert_segment_listing(self, resp_iter):
"""
Converts the manifest data to match with the format
that was put in through ?multipart-manifest=put
:param resp_headers: response headers
:param resp_iter: a response iterable
:raises HTTPServerError:
:returns: the json-serialized raw format (as bytes)
"""
segments = self._get_manifest_read(resp_iter)
@ -921,108 +1102,36 @@ class SloGetContext(WSGIContext):
json_data = json.dumps(segments, sort_keys=True) # convert to string
if six.PY3:
json_data = json_data.encode('utf-8')
new_headers = []
for header, value in resp_headers:
if header.lower() == 'content-length':
new_headers.append(('Content-Length', len(json_data)))
elif header.lower() == 'etag':
new_headers.append(
('Etag', md5(json_data, usedforsecurity=False)
.hexdigest()))
else:
new_headers.append((header, value))
self._response_headers = new_headers
return [json_data]
return json_data
def _get_manifest_read(self, resp_iter):
with closing_if_possible(resp_iter):
resp_body = b''.join(resp_iter)
try:
segments = json.loads(resp_body)
except ValueError:
raise HTTPServerError('Unable to load SLO manifest')
except ValueError as e:
msg = 'Unable to load SLO manifest'
self.slo.logger.error('%s: %s', msg, e)
raise HTTPServerError(msg)
return segments
def get_or_head_response(self, req, resp_headers, resp_iter):
segments = self._get_manifest_read(resp_iter)
slo_etag = None
content_length = None
response_headers = []
for header, value in resp_headers:
lheader = header.lower()
if lheader == 'etag':
response_headers.append(('X-Manifest-Etag', value))
elif lheader != 'content-length':
response_headers.append((header, value))
def _build_resp_iter(self, req, segments, resp_attrs):
"""
Build a response iterable for a GET request.
if lheader == SYSMETA_SLO_ETAG:
slo_etag = value
elif lheader == SYSMETA_SLO_SIZE:
# it's from sysmeta, so we don't worry about non-integer
# values here
content_length = int(value)
:param req: the request object
:param resp_attrs: the slo attributes
# Prep to calculate content_length & etag if necessary
if slo_etag is None:
calculated_etag = md5(usedforsecurity=False)
if content_length is None:
calculated_content_length = 0
for seg_dict in segments:
# Decode any inlined data; it's important that we do this *before*
# calculating the segment length and etag
if 'data' in seg_dict:
seg_dict['raw_data'] = base64.b64decode(seg_dict.pop('data'))
if slo_etag is None:
if 'raw_data' in seg_dict:
r = md5(seg_dict['raw_data'],
usedforsecurity=False).hexdigest()
elif seg_dict.get('range'):
r = '%s:%s;' % (seg_dict['hash'], seg_dict['range'])
else:
r = seg_dict['hash']
calculated_etag.update(r.encode('ascii'))
if content_length is None:
if config_true_value(seg_dict.get('sub_slo')):
override_bytes_from_content_type(
seg_dict, logger=self.slo.logger)
calculated_content_length += self._segment_length(seg_dict)
if slo_etag is None:
slo_etag = calculated_etag.hexdigest()
if content_length is None:
content_length = calculated_content_length
response_headers.append(('Content-Length', str(content_length)))
response_headers.append(('Etag', '"%s"' % slo_etag))
if req.method == 'HEAD':
return self._manifest_head_response(req, response_headers)
else:
return self._manifest_get_response(
req, content_length, response_headers, segments)
def _manifest_head_response(self, req, response_headers):
conditional_etag = resolve_etag_is_at_header(req, response_headers)
return HTTPOk(request=req, headers=response_headers, body=b'',
conditional_etag=conditional_etag,
conditional_response=True)
def _manifest_get_response(self, req, content_length, response_headers,
segments):
:returns: a segmented iterable
"""
if req.range:
byteranges = [
# For some reason, swob.Range.ranges_for_length adds 1 to the
# last byte's position.
(start, end - 1) for start, end
in req.range.ranges_for_length(content_length)]
in req.range.ranges_for_length(resp_attrs.slo_size)]
else:
byteranges = []
byteranges = [(0, resp_attrs.slo_size - 1)]
ver, account, _junk = req.split_path(3, 3, rest_with_last=True)
account = wsgi_to_str(account)
@ -1067,15 +1176,8 @@ class SloGetContext(WSGIContext):
# their Etag/Content Length no longer match the connection
# will drop. In this case a 409 Conflict will be logged in
# the proxy logs and the user will receive incomplete results.
return HTTPConflict(request=req)
conditional_etag = resolve_etag_is_at_header(req, response_headers)
response = Response(request=req, content_length=content_length,
headers=response_headers,
conditional_response=True,
conditional_etag=conditional_etag,
app_iter=segmented_iter)
return response
raise HTTPConflict(request=req)
return segmented_iter
class StaticLargeObject(object):
@ -1524,7 +1626,7 @@ class StaticLargeObject(object):
'/%s/%s/%s' % (vrs, account, str_to_wsgi(obj_name.lstrip('/')))
)
# Just request the last byte of non-SLO objects so we don't waste
# a bunch of resources in drain_and_close() below
# a resources in friendly_close() below
manifest_req = Request.blank('', new_env, range='bytes=-1')
update_ignore_range_header(manifest_req, 'X-Static-Large-Object')
resp = manifest_req.get_response(self.app)
@ -1543,7 +1645,7 @@ class StaticLargeObject(object):
raise HTTPServerError('Unable to load SLO manifest')
else:
# Drain and close GET request (prevents socket leaks)
drain_and_close(resp)
friendly_close(resp)
raise HTTPBadRequest('Not an SLO manifest')
elif resp.status_int == HTTP_NOT_FOUND:
raise HTTPNotFound('SLO manifest not found')
@ -1624,7 +1726,7 @@ class StaticLargeObject(object):
resp.status, resp.body)
return HTTPServiceUnavailable()
# consume the response (should be short)
drain_and_close(resp)
friendly_close(resp)
# Finally, delete the manifest
return self.app

View File

@ -36,7 +36,8 @@ from swift.common.swob import HTTPBadRequest, \
HTTPServiceUnavailable, Range, is_chunked, multi_range_iterator, \
HTTPPreconditionFailed, wsgi_to_bytes, wsgi_unquote, wsgi_to_str
from swift.common.utils import split_path, validate_device_partition, \
close_if_possible, maybe_multipart_byteranges_to_document_iters, \
close_if_possible, friendly_close, \
maybe_multipart_byteranges_to_document_iters, \
multipart_byteranges_to_document_iters, parse_content_type, \
parse_content_range, csv_append, list_from_csv, Spliterator, quote, \
RESERVED, config_true_value, md5, CloseableChain, select_ip_port
@ -742,7 +743,10 @@ class SegmentedIterable(object):
for x in mri:
yield x
finally:
self.close()
# Spliterator and multi_range_iterator can't possibly know we've
# consumed the whole of the app_iter, but we want to read/close the
# final segment response
friendly_close(self.app_iter)
def validate_first_segment(self):
"""

View File

@ -55,7 +55,7 @@ from six.moves import urllib
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.utils import UTC, reiterate, split_path, Timestamp, pairs, \
close_if_possible, closing_if_possible, config_true_value, drain_and_close
close_if_possible, closing_if_possible, config_true_value, friendly_close
from swift.common.exceptions import InvalidTimestamp
@ -1395,12 +1395,15 @@ class Response(object):
if empty_resp is not None:
self.status = empty_resp
self.content_length = 0
# the existing successful response and it's app_iter have been
# determined to not meet the conditions of the reqeust, the
# response app_iter should be closed but not drained.
close_if_possible(app_iter)
return [b'']
if self.request and self.request.method == 'HEAD':
# We explicitly do NOT want to set self.content_length to 0 here
drain_and_close(app_iter) # be friendly to our app_iter
friendly_close(app_iter) # be friendly to our app_iter
return [b'']
if self.conditional_response and self.request and \

View File

@ -170,6 +170,11 @@ LOG_LINE_DEFAULT_FORMAT = '{remote_addr} - - [{time.d}/{time.b}/{time.Y}' \
'{trans_time:.4f} "{additional_info}" {pid} ' \
'{policy_index}'
DEFAULT_LOCK_TIMEOUT = 10
# this is coupled with object-server.conf's network_chunk_size; if someone is
# running that unreasonably small they may find this number inefficient, but in
# the more likely case they've increased the value to optimize high througput
# transfers this will still cut off the transfer after the first chunk.
DEFAULT_DRAIN_LIMIT = 65536
class InvalidHashPathConfigError(ValueError):
@ -4018,7 +4023,7 @@ def closing_if_possible(maybe_closable):
close_if_possible(maybe_closable)
def drain_and_close(response_or_app_iter):
def drain_and_close(response_or_app_iter, read_limit=None):
"""
Drain and close a swob or WSGI response.
@ -4028,9 +4033,26 @@ def drain_and_close(response_or_app_iter):
app_iter = getattr(response_or_app_iter, 'app_iter', response_or_app_iter)
if app_iter is None: # for example, if we used the Response.body property
return
for _chunk in app_iter:
pass
close_if_possible(app_iter)
bytes_read = 0
with closing_if_possible(app_iter):
for chunk in app_iter:
bytes_read += len(chunk)
if read_limit is not None and bytes_read >= read_limit:
break
def friendly_close(resp):
"""
Close a swob or WSGI response and maybe drain it.
It's basically free to "read" a HEAD or HTTPException response - the bytes
are probably already in our network buffers. For a larger response we
could possibly burn a lot of CPU/network trying to drain an un-used
response. This method will read up to DEFAULT_DRAIN_LIMIT bytes to avoid
logging a 499 in the proxy when it would otherwise be easy to just throw
away the small/empty body.
"""
return drain_and_close(resp, read_limit=DEFAULT_DRAIN_LIMIT)
_rfc_token = r'[^()<>@,;:\"/\[\]?={}\x00-\x20\x7f]+'

View File

@ -20,6 +20,7 @@ import json
import time
import unittest
import mock
from mock import patch
import six
@ -2254,9 +2255,6 @@ class TestSloHeadOldManifest(SloGETorHEADTestCase):
expected_app_calls = [('HEAD', '/v1/AUTH_test/headtest/man')]
if not self.modern_manifest_headers:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
# XXX slo isn't draining the orig resp_iter after refetch
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/headtest/man')] = 1
self.assertEqual(self.app.calls, expected_app_calls)
def test_get_manifest_passthrough(self):
@ -2301,9 +2299,6 @@ class TestSloHeadOldManifest(SloGETorHEADTestCase):
if not self.modern_manifest_headers:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
self.assertEqual(self.app.calls, expected_app_calls)
# XXX swob isn't draining the backend resp_iter on conditional error
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/headtest/man')] = 1
def test_if_match_etag_not_matching(self):
req = Request.blank(
@ -2323,9 +2318,6 @@ class TestSloHeadOldManifest(SloGETorHEADTestCase):
if not self.modern_manifest_headers:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
self.assertEqual(self.app.calls, expected_app_calls)
# XXX swob isn't draining the backend resp_iter on conditional error
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/headtest/man')] = 1
def test_if_none_match_etag_matching_with_override(self):
req = Request.blank(
@ -2349,9 +2341,6 @@ class TestSloHeadOldManifest(SloGETorHEADTestCase):
if not self.modern_manifest_headers:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
self.assertEqual(self.app.calls, expected_app_calls)
# XXX swob isn't draining the backend resp_iter on conditional error
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/headtest/man')] = 1
def test_if_match_etag_not_matching_with_override(self):
req = Request.blank(
@ -2375,9 +2364,6 @@ class TestSloHeadOldManifest(SloGETorHEADTestCase):
if not self.modern_manifest_headers:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
self.assertEqual(self.app.calls, expected_app_calls)
# XXX swob isn't draining the backend resp_iter on conditional error
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/headtest/man')] = 1
def test_head_manifest_is_efficient(self):
req = Request.blank(
@ -2402,9 +2388,6 @@ class TestSloHeadOldManifest(SloGETorHEADTestCase):
# might do it anyway.
expected_calls.append(
('GET', '/v1/AUTH_test/gettest/manifest-abcd'))
# XXX slo isn't closing the orig resp_iter on refetch
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
self.assertEqual(self.app.calls, expected_calls)
@ -2826,10 +2809,6 @@ class TestSloGetManifests(SloGETorHEADTestCase):
self.assertIsNone(self.app.swift_sources[0])
self.assertEqual(self.app.swift_sources[1:],
['SLO'] * (len(self.app.swift_sources) - 1))
# XXX SegmntedIterable stops reading after the last byte of the last
# segment in app_iter_ranges
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/d_20?multipart-manifest=get')] = 1
def test_multiple_ranges_including_suffix_get_manifest(self):
req = Request.blank(
@ -2890,10 +2869,6 @@ class TestSloGetManifests(SloGETorHEADTestCase):
None, # b_10
'bytes=0-2,14-', # c_15
None]) # d_20
# XXX SegmntedIterable stops reading after the last byte of the last
# segment in app_iter_ranges
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/d_20?multipart-manifest=get')] = 1
class TestSloGetOldManifests(TestSloGetManifests):
@ -3052,9 +3027,6 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
self.assertEqual('bytes=100000-199999', self.app.headers[0]['Range'])
self.assertNotIn('Range', self.app.headers[1])
self.assertEqual('bytes=100000-199999', self.app.headers[2]['Range'])
# XXX slo isn't draining the orig resp_iter after refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 1
def test_old_swift_range_get_beyond_manifest_refetch_fails(self):
# new swift would have ignored the range and got the whole
@ -3078,10 +3050,6 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
# retry the first one
('GET', '/v1/AUTH_test/gettest/big_manifest'),
])
# XXX slo isn't draining the orig resp_iter after refetch, and then
# when it sees 404 it closes that one too before returning an error.
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 2
def test_old_swift_range_get_beyond_manifest_refetch_finds_old(self):
# new swift would have ignored the range and got the whole
@ -3105,11 +3073,6 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
# retry the first one
('GET', '/v1/AUTH_test/gettest/big_manifest'),
])
# XXX slo isn't draining the orig resp_iter after refetch, and then
# when it sees older data it closes that one too before returning an
# error.
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 2
def test_old_swift_range_get_beyond_manifest_refetch_small_non_slo(self):
# new swift would have ignored the range and got the whole
@ -3132,11 +3095,10 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
# retry the first one
('GET', '/v1/AUTH_test/gettest/big_manifest'),
])
# XXX slo isn't draining the orig resp_iter after refetch, and then
# when it sees older data it closes that one too before returning an
# error.
# swob is converting the successful non-slo response to conditional
# error and closing our unconditionally refetched resp_iter
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 2
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 1
def test_old_swift_range_get_beyond_manifest_refetch_big_non_slo(self):
# new swift would have ignored the range and got the whole
@ -3162,9 +3124,6 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
# retry the first one
('GET', '/v1/AUTH_test/gettest/big_manifest'),
])
# XXX slo isn't draining the orig resp_iter after refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 1
def test_old_swift_range_get_beyond_manifest_refetch_tombstone(self):
# new swift would have ignored the range and got the whole
@ -3188,9 +3147,6 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
# retry the first one
('GET', '/v1/AUTH_test/gettest/big_manifest'),
])
# XXX slo isn't draining the orig resp_iter after refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 1
def test_old_swift_range_get_bogus_content_range(self):
# Just a little paranoia; Swift currently sends back valid
@ -3232,9 +3188,6 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
('GET', '/v1/AUTH_test/gettest/b_10?multipart-manifest=get'),
('GET', '/v1/AUTH_test/gettest/c_15?multipart-manifest=get'),
('GET', '/v1/AUTH_test/gettest/d_20?multipart-manifest=get')])
# XXX slo isn't draining the orig resp_iter after refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
def test_old_swift_range_get_includes_whole_range_manifest(self):
# If the first range GET results in retrieval of the entire manifest
@ -3387,10 +3340,6 @@ class TestSloRangeRequests(SloGETorHEADTestCase):
headers={'Range': 'bytes=100-200'})
status, headers, body = self.call_slo(req)
self.assertEqual(status, '416 Requested Range Not Satisfiable')
# XXX it seems we validate the first segment before handing the
# resp_iter to swob; who decides it can't serve the given range.
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get')] = 1
def test_get_segment_with_non_ascii_path(self):
segment_body = u"a møøse once bit my sister".encode("utf-8")
@ -4081,14 +4030,20 @@ class TestSloErrors(SloGETorHEADTestCase):
self.assertEqual('200 OK', status)
self.assertEqual(body, b'aaaaa')
if six.PY2:
error = "No JSON object could be decoded"
else:
error = "Expecting value: line 1 column 2 (char 1)"
self.assertEqual(self.slo.logger.get_lines_for_level('error'), [
lines = self.slo.logger.get_lines_for_level('error')
self.assertEqual(lines, [
mock.ANY,
'while fetching /v1/AUTH_test/gettest/manifest-abcd, '
'JSON-decoding of submanifest /v1/AUTH_test/gettest/manifest-bc '
'failed with %s' % error
'failed with 500 Internal Error'
])
self.assertIn(lines[0], [
# py2
'Unable to load SLO manifest: '
'No JSON object could be decoded',
# py3
'Unable to load SLO manifest: '
'Expecting value: line 1 column 2 (char 1)',
])
def test_mismatched_etag(self):
@ -4254,9 +4209,6 @@ class TestSloErrors(SloGETorHEADTestCase):
if not self.modern_manifest_headers:
expected_calls.append(
('GET', '/v1/AUTH_test/gettest/manifest-badetag'))
# XXX slo isn't closing the orig resp_iter on refetch
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/gettest/manifest-badetag')] = 1
self.assertEqual(self.app.calls, expected_calls)
def test_first_segment_mismatched_size(self):
@ -4301,9 +4253,6 @@ class TestSloErrors(SloGETorHEADTestCase):
if not self.modern_manifest_headers:
expected_calls.append(
('GET', '/v1/AUTH_test/gettest/manifest-badsize'))
# XXX slo isn't closing the orig resp_iter on refetch
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/gettest/manifest-badsize')] = 1
self.assertEqual(self.app.calls, expected_calls)
@patch('swift.common.request_helpers.time')
@ -4746,16 +4695,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
for headers in self.app.headers[1:]:
self.assertNotIn('If-Match', headers)
self.assertNotIn('X-Backend-Etag-Is-At', headers)
if self.modern_manifest_headers:
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
else:
# XXX it seems we validate the first segment before handing the
# resp_iter to swob; who decides it can't serve the given range.
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/a_5'
'?multipart-manifest=get')] = 1
def test_if_none_match_mismatches(self):
req = Request.blank(
@ -4800,9 +4739,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
expected_app_calls.append(
('GET', '/v1/AUTH_test/gettest/manifest-abcd')
)
# XXX slo isn't draining the orig resp_iter after refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
expected_app_calls.extend([
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
@ -4849,9 +4785,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
for headers in self.app.headers[1:]:
self.assertNotIn('If-None-Match', headers)
self.assertNotIn('X-Backend-Etag-Is-At', headers)
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
self.assertEqual(self.app.calls, expected_app_calls)
def test_if_none_match_matches_no_alternate_etag(self):
@ -4891,10 +4824,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/alt_00'
'?multipart-manifest=get')] = 1
else:
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
self.assertEqual(self.app.calls, expected_app_calls)
def test_if_none_match_mismatches_alternate_etag(self):
@ -4956,10 +4885,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
self.assertEqual(self.app.calls, expected_app_calls)
self.assertEqual(self.app.headers[0].get('X-Backend-Etag-Is-At'),
'x-object-sysmeta-slo-etag')
if not self.modern_manifest_headers:
# XXX slo isn't closing the orig resp_iter on refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
def test_if_match_mismatches(self):
req = Request.blank(
@ -4997,9 +4922,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
for headers in self.app.headers[1:]:
self.assertNotIn('If-Match', headers)
self.assertNotIn('X-Backend-Etag-Is-At', headers)
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
def test_if_match_mismatches_manifest_json_md5(self):
req = Request.blank(
@ -5028,10 +4950,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
# reading the remaining segments
self.expected_unread_requests[('GET', '/v1/AUTH_test/gettest/a_5'
'?multipart-manifest=get')] = 1
else:
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
self.assertEqual(self.app.calls, expected_app_calls)
def test_if_match_matches_alternate_etag(self):
@ -5078,9 +4996,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
expected_app_calls = [('GET', '/v1/AUTH_test/c/manifest-alt')]
self.assertEqual(self.app.headers[0].get('X-Backend-Etag-Is-At'),
'X-Object-Sysmeta-Alt-Etag,x-object-sysmeta-slo-etag')
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
if not self.modern_manifest_headers:
expected_app_calls.extend([
# Needed to re-fetch because if-match can't find slo-etag
@ -5328,9 +5243,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
self.assertEqual(self.app.calls, expected_calls)
def test_if_match_matches_alternate_etag_non_slo_after_refetch(self):
# XXX 2/250 tests requiring this header sounds like a bug
self.app._responses[('GET', '/v1/AUTH_test/c/manifest-alt')][1][
'X-Backend-Timestamp'] = '1234'
self.app.register_next_response(
'GET', '/v1/AUTH_test/c/manifest-alt',
swob.HTTPOk, {'Content-Length': '25',
@ -5352,10 +5264,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
self.assertIn('X-Object-Sysmeta-Alt-Etag',
self.app.headers[0]['X-Backend-Etag-Is-At'])
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
if self.modern_manifest_headers:
# and since the response includes modern sysmeta, slo trusts the
# 412 w/o refetch
@ -5384,16 +5292,10 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
# wrapping middleware
self.assertEqual('alt-object-etag',
headers['X-Object-Sysmeta-Alt-Etag'])
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
self.assertEqual(self.app.calls, expected_app_calls)
def test_if_match_mismatches_alternate_etag_non_slo_after_refetch(self):
# XXX 2/~250 tests requiring this header sounds like a bug
self.app._responses[('GET', '/v1/AUTH_test/c/manifest-alt')][1][
'X-Backend-Timestamp'] = '1234'
self.app.register_next_response(
'GET', '/v1/AUTH_test/c/manifest-alt',
swob.HTTPOk, {'Content-Length': '25',
@ -5415,10 +5317,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
self.assertIn('X-Object-Sysmeta-Alt-Etag',
self.app.headers[0]['X-Backend-Etag-Is-At'])
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
if self.modern_manifest_headers:
# and since the response includes modern sysmeta, slo trusts the
# 412 w/o refetch
@ -5450,7 +5348,7 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
# swob is converting the successful non-slo response to conditional
# error and closing our unconditionally refetched resp_iter
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] += 1
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
self.assertEqual(self.app.calls, expected_app_calls)
@ -5476,9 +5374,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
expected_app_calls.append(
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
)
# XXX the orig resp_iter doesn't get closed
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
# and then fetch the segments
expected_app_calls.extend([
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
@ -5515,9 +5410,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
self.assertEqual(self.app.calls, expected_app_calls)
self.assertEqual(self.app.headers[0].get('X-Backend-Etag-Is-At'),
'x-object-sysmeta-slo-etag')
# XXX slo isn't closing the orig resp_iter on refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
def test_range_resume_download(self):
req = Request.blank(
@ -5592,9 +5484,32 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
'If-Modified-Since': 'Mon, 23 Oct 2023 10:05:32 GMT',
})
status, headers, body = self.call_slo(req)
# XXX see lp bug #2040178
self.assertEqual(status, '500 Internal Error')
self.assertEqual(b'Unable to load SLO manifest', body)
# nope, that was the last time it was changed
self.assertEqual(status, '304 Not Modified')
self.assertEqual(headers['X-Static-Large-Object'], 'true')
self.assertEqual(headers['Etag'],
'"%s"' % self.manifest_last_modified_slo_etag)
self.assertEqual(headers['X-Manifest-Etag'],
self.manifest_last_modified_json_md5)
self.assertEqual(headers['Content-Length'], '0')
self.assertEqual('Mon, 23 Oct 2023 10:05:32 GMT',
headers['Last-Modified'])
self.assertEqual(b'', body)
expected_calls = [
('GET', '/v1/AUTH_test/c/manifest-last-modified'),
]
if not self.modern_manifest_headers:
# N.B. legacy manifests must refetch for accurate Etag, and then we
# validate first segment before lettting swob return the error
expected_calls.extend([
('GET', '/v1/AUTH_test/c/manifest-last-modified'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
])
# we don't drain the segment's resp_iter if validation fails
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/a_5'
'?multipart-manifest=get')] = 1
self.assertEqual(self.app.calls, expected_calls)
def test_if_modified_since_now(self):
now = datetime.now()
@ -5605,9 +5520,32 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
'If-Modified-Since': last_modified,
})
status, headers, body = self.call_slo(req)
# XXX see lp bug #2040178
self.assertEqual(status, '500 Internal Error')
self.assertEqual(b'Unable to load SLO manifest', body)
# nope, that was the last time it was changed
self.assertEqual(status, '304 Not Modified')
self.assertEqual(headers['X-Static-Large-Object'], 'true')
self.assertEqual(headers['Etag'],
'"%s"' % self.manifest_last_modified_slo_etag)
self.assertEqual(headers['X-Manifest-Etag'],
self.manifest_last_modified_json_md5)
self.assertEqual(headers['Content-Length'], '0')
self.assertEqual('Mon, 23 Oct 2023 10:05:32 GMT',
headers['Last-Modified'])
self.assertEqual(b'', body)
expected_calls = [
('GET', '/v1/AUTH_test/c/manifest-last-modified'),
]
if not self.modern_manifest_headers:
# N.B. legacy manifests must refetch for accurate Etag, and then we
# validate first segment before lettting swob return the error
expected_calls.extend([
('GET', '/v1/AUTH_test/c/manifest-last-modified'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
])
# we don't drain the segment's resp_iter if validation fails
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/a_5'
'?multipart-manifest=get')] = 1
self.assertEqual(self.app.calls, expected_calls)
def test_if_unmodified_since_ancient_date(self):
req = swob.Request.blank(
@ -5616,9 +5554,32 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
'If-Unmodified-Since': 'Fri, 01 Feb 2012 20:38:36 GMT',
})
status, headers, body = self.call_slo(req)
# XXX see lp bug #2040178
self.assertEqual(status, '500 Internal Error')
self.assertEqual(b'Unable to load SLO manifest', body)
# oh it's *definately* been modified since then!
self.assertEqual(status, '412 Precondition Failed')
self.assertEqual(headers['X-Static-Large-Object'], 'true')
self.assertEqual(headers['Etag'],
'"%s"' % self.manifest_last_modified_slo_etag)
self.assertEqual(headers['X-Manifest-Etag'],
self.manifest_last_modified_json_md5)
self.assertEqual(headers['Content-Length'], '0')
self.assertEqual('Mon, 23 Oct 2023 10:05:32 GMT',
headers['Last-Modified'])
self.assertEqual(b'', body)
expected_calls = [
('GET', '/v1/AUTH_test/c/manifest-last-modified'),
]
if not self.modern_manifest_headers:
# N.B. legacy manifests must refetch for accurate Etag, and then we
# validate first segment before lettting swob return the error
expected_calls.extend([
('GET', '/v1/AUTH_test/c/manifest-last-modified'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
])
# we don't drain the segment's resp_iter if validation fails
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/a_5'
'?multipart-manifest=get')] = 1
self.assertEqual(self.app.calls, expected_calls)
def test_if_unmodified_since_last_modified(self):
req = swob.Request.blank(
@ -5929,5 +5890,352 @@ class TestNonSloPassthrough(SloGETorHEADTestCase):
])
class TestRespAttrs(unittest.TestCase):
def test_init_calculates_is_legacy(self):
attrs = slo.RespAttrs(True, 123456789.12345,
'manifest-etag', 'slo-etag', 999)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertIsInstance(attrs.timestamp, Timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('slo-etag', attrs.slo_etag)
self.assertEqual(999, attrs.slo_size)
# we gave it etag and size!
self.assertTrue(attrs._has_size_and_etag())
self.assertFalse(attrs.is_legacy)
def test_init_converts_timestamps_from_strings(self):
attrs = slo.RespAttrs(True, '123456789.12345',
'manifest-etag', 'slo-etag', 999)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertIsInstance(attrs.timestamp, Timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('slo-etag', attrs.slo_etag)
self.assertEqual(999, attrs.slo_size)
# we gave it etag and size!
self.assertTrue(attrs._has_size_and_etag())
self.assertFalse(attrs.is_legacy)
def test_default_types(self):
attrs = slo.RespAttrs(None, None, None, None, None)
# types are correct, values are default/place-holders
self.assertTrue(attrs.is_slo is False) # not None!
self.assertEqual(0, attrs.timestamp)
self.assertIsInstance(attrs.timestamp, Timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
# we didn't provide etag & size
self.assertFalse(attrs._has_size_and_etag())
self.assertTrue(attrs.is_legacy)
def test_init_with_no_sysmeta(self):
now = Timestamp.now()
attrs = slo.RespAttrs(True, now.normal, None, None, None)
self.assertTrue(attrs.is_slo)
self.assertEqual(now, attrs.timestamp)
self.assertIsInstance(attrs.timestamp, Timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
# we didn't provide etag & size
self.assertFalse(attrs._has_size_and_etag())
self.assertTrue(attrs.is_legacy)
def test_init_with_no_sysmeta_offset(self):
now = Timestamp.now(offset=123)
attrs = slo.RespAttrs(True, now.internal, None, None, None)
self.assertTrue(attrs.is_slo)
self.assertEqual(now, attrs.timestamp)
self.assertIsInstance(attrs.timestamp, Timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
# we didn't provide etag & size
self.assertFalse(attrs._has_size_and_etag())
self.assertTrue(attrs.is_legacy)
def test_from_empty_headers(self):
attrs = slo.RespAttrs.from_headers([])
self.assertFalse(attrs.is_slo)
self.assertEqual(0, attrs.timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
def test_from_only_timestamp(self):
now = Timestamp.now(offset=1)
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', now.internal),
('X-Irrelevant', 'ignored')])
self.assertFalse(attrs.is_slo)
self.assertEqual(now, attrs.timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
def test_legacy_slo_sysmeta(self):
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('Etag', 'manifest-etag'),
('X-Static-lARGE-Object', 'yes')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
def test_partial_modern_sysmeta(self):
# missing slo etag
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('Etag', 'manifest-etag'),
('X-Static-lARGE-Object', 'yes'),
('x-object-sysmeta-slo-size', '1234')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(1234, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
# missing slo size
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('Etag', 'manifest-etag'),
('X-Static-lARGE-Object', 'yes'),
('x-object-sysmeta-slo-etag', 'slo-etag')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('slo-etag', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
# missing manifest etag
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('X-Static-lARGE-Object', 'yes'),
('x-object-sysmeta-slo-size', '1234'),
('x-object-sysmeta-slo-etag', 'slo-etag')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('slo-etag', attrs.slo_etag)
self.assertEqual(1234, attrs.slo_size)
# missing Etag might be some kind of bug, but it has all sysmeta
self.assertFalse(attrs.is_legacy)
def test_invalid_sysmeta(self):
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('X-Static-lARGE-Object', 'yes'),
('x-object-sysmeta-slo-size', 'huge!')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('X-Static-lARGE-Object', 'yes'),
('e-TAG', 'wrong!'),
('x-object-sysmeta-slo-size', '')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
def test_from_valid_sysmeta(self):
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('Etag', 'manifest-etag'),
('X-Static-lARGE-Object', 'yes'),
('x-object-sysmeta-slo-etag', 'slo-tag'),
('x-object-sysmeta-slo-size', '1234')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('slo-tag', attrs.slo_etag)
self.assertEqual(1234, attrs.slo_size)
self.assertFalse(attrs.is_legacy)
def test_from_regular_object(self):
now = Timestamp.now()
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', now.normal),
('Etag', 'object-etag')])
self.assertFalse(attrs.is_slo)
self.assertEqual(now, attrs.timestamp)
# N.B. we only set manifest_etag on slo objects
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
def test_non_slo_with_sysmeta(self):
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('X-Static-lARGE-Object', 'false')])
self.assertFalse(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('Etag', 'segment-etag'),
('x-object-sysmeta-slo-etag', 'tag'),
('x-object-sysmeta-slo-size', '1234')])
# this is NOT an SLO
self.assertFalse(attrs.is_slo)
self.assertEqual('', attrs.json_md5)
# ... but we set these based on the sysmeta values
self.assertEqual('tag', attrs.slo_etag)
self.assertEqual(1234, attrs.slo_size)
self.assertEqual(123456789.12345, attrs.timestamp)
# I hope someday a non-slo with slo sysmeta *will* be just a legacy,
# see lp bug #2035158
self.assertFalse(attrs.is_legacy)
def _legacy_from_headers(self):
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('Etag', 'manifest-etag'),
('X-Static-lARGE-Object', 'yes')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
return attrs
def test_update_from_segments(self):
attrs = self._legacy_from_headers()
segments = [
{'hash': 'abc', 'bytes': 2},
{'hash': 'def', 'bytes': 3},
]
slo._annotate_segments(segments)
attrs.update_from_segments(segments)
exp_etag = md5('abcdef'.encode('ascii'), usedforsecurity=False)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual(exp_etag.hexdigest(), attrs.slo_etag)
self.assertEqual(5, attrs.slo_size)
# N.B. it's still a legacy manifest
self.assertTrue(attrs.is_legacy)
def test_update_from_segments_with_raw_data(self):
attrs = self._legacy_from_headers()
raw_data = b'something'
segments = [
{'hash': 'abc', 'bytes': 2},
{'data': base64.b64encode(raw_data)},
]
slo._annotate_segments(segments)
attrs.update_from_segments(segments)
raw_data_checksum = md5(raw_data).hexdigest()
exp_etag = md5(('abc' + raw_data_checksum).encode('ascii'),
usedforsecurity=False)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual(exp_etag.hexdigest(), attrs.slo_etag)
self.assertEqual(11, attrs.slo_size)
# N.B. it's still a legacy manifest
self.assertTrue(attrs.is_legacy)
def test_update_from_segments_with_range(self):
attrs = self._legacy_from_headers()
segments = [
{'hash': 'abc', 'bytes': 2},
{'hash': 'def', 'range': '1-2'},
]
slo._annotate_segments(segments)
attrs.update_from_segments(segments)
exp_etag = md5('abcdef:1-2;'.encode('ascii'), usedforsecurity=False)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual(exp_etag.hexdigest(), attrs.slo_etag)
self.assertEqual(4, attrs.slo_size)
# N.B. it's still a legacy manifest
self.assertTrue(attrs.is_legacy)
def test_update_from_segments_with_sub_slo(self):
attrs = self._legacy_from_headers()
content_type = 'application/octet-stream'
content_type += ";swift_bytes=%d" % 5
segments = [
{'hash': 'abc', 'bytes': 2},
{'hash': '123', 'sub_slo': True, 'content_type': content_type},
]
slo._annotate_segments(segments)
attrs.update_from_segments(segments)
exp_etag = md5('abc123'.encode('ascii'), usedforsecurity=False)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual(exp_etag.hexdigest(), attrs.slo_etag)
self.assertEqual(7, attrs.slo_size)
# N.B. it's still a legacy manifest
self.assertTrue(attrs.is_legacy)
def test_update_from_segments_with_sub_slo_range(self):
attrs = self._legacy_from_headers()
content_type = 'application/octet-stream'
content_type += ";swift_bytes=%d" % 5
segments = [
{'hash': 'abc', 'bytes': 2},
{'hash': '123', 'sub_slo': True, 'content_type': content_type,
'range': '2-4'},
]
slo._annotate_segments(segments)
attrs.update_from_segments(segments)
exp_etag = md5('abc123:2-4;'.encode('ascii'), usedforsecurity=False)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual(exp_etag.hexdigest(), attrs.slo_etag)
self.assertEqual(5, attrs.slo_size)
# N.B. it's still a legacy manifest
self.assertTrue(attrs.is_legacy)
def test_update_from_segments_not_legacy(self):
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('X-Static-lARGE-Object', 'yes'),
('x-object-sysmeta-slo-etag', 'tag'),
('x-object-sysmeta-slo-size', '1234')])
segments = 'not even json; does not matter'
attrs.update_from_segments(segments)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('tag', attrs.slo_etag)
self.assertEqual(1234, attrs.slo_size)
# N.B. it's still a legacy manifest
self.assertFalse(attrs.is_legacy)
if __name__ == '__main__':
unittest.main()

View File

@ -759,7 +759,3 @@ class TestSegmentedIterable(unittest.TestCase):
b'--bound--',
])
self.assertEqual(expected, body)
# XXX Spliterator stops SegementedIterable from asking to exhasut the
# segment response after it gets the last byte in app_iter_ranges
self.expected_unread_requests[
('GET', '/a/c/seg2?multipart-manifest=get')] = 1

View File

@ -22,6 +22,7 @@ import time
from io import BytesIO
import mock
import six
from six.moves.urllib.parse import quote
@ -1263,6 +1264,72 @@ class TestResponse(unittest.TestCase):
'read': 1,
})
def test_swob_drains_small_HEAD_resp_iter(self):
tracking = {
'closed': 0,
'read': 0,
}
def mark_closed(*args):
tracking['closed'] += 1
def mark_read(*args):
tracking['read'] += 1
def test_app(environ, start_response):
start_response('200 OK', [])
body = [b'hello', b'world']
return LeakTrackingIter(body, mark_closed, mark_read, None)
req = swob.Request.blank('/', method='HEAD')
status, headers, app_iter = req.call_application(test_app)
resp = swob.Response(status=status, headers=dict(headers),
app_iter=app_iter)
# sanity, swob drains small HEAD responses
output_iter = resp(req.environ, lambda *_: None)
with utils.closing_if_possible(output_iter):
# regardless what the app returns swob's HEAD response is empty
body = b''.join(output_iter)
self.assertEqual(body, b'')
self.assertEqual(tracking, {
'closed': 1,
'read': 1,
})
def test_swob_closes_large_HEAD_resp_iter(self):
tracking = {
'closed': 0,
'read': 0,
}
def mark_closed(*args):
tracking['closed'] += 1
def mark_read(*args):
tracking['read'] += 1
def test_app(environ, start_response):
start_response('200 OK', [])
body = [b'hello', b'world']
return LeakTrackingIter(body, mark_closed, mark_read, None)
req = swob.Request.blank('/', method='HEAD')
status, headers, app_iter = req.call_application(test_app)
resp = swob.Response(status=status, headers=dict(headers),
app_iter=app_iter)
# N.B. if we call next a third time (i.e. len(helloworld) < read_limit)
# then leak tracker will notice StopIteration and count it drained.
with mock.patch.object(utils, 'DEFAULT_DRAIN_LIMIT', 10):
output_iter = resp(req.environ, lambda *_: None)
with utils.closing_if_possible(output_iter):
# regardless what the app returns swob's HEAD response is empty
body = b''.join(output_iter)
self.assertEqual(body, b'')
self.assertEqual(tracking, {
'closed': 1,
'read': 0,
})
def test_call_preserves_closeability(self):
def test_app(environ, start_response):
start_response('200 OK', [])

View File

@ -545,13 +545,108 @@ class TestUtils(unittest.TestCase):
yield 'y'
drained[0] = True
utils.drain_and_close(gen())
g = gen()
utils.drain_and_close(g)
self.assertTrue(drained[0])
self.assertIsNone(g.gi_frame)
utils.drain_and_close(Response(status=200, body=b'Some body'))
drained = [False]
utils.drain_and_close(Response(status=200, app_iter=gen()))
self.assertTrue(drained[0])
def test_drain_and_close_with_limit(self):
def gen():
yield 'a' * 5
yield 'a' * 4
yield 'a' * 3
drained[0] = True
drained = [False]
g = gen()
utils.drain_and_close(g, read_limit=13)
self.assertTrue(drained[0])
self.assertIsNone(g.gi_frame)
drained = [False]
g = gen()
utils.drain_and_close(g, read_limit=12)
# this would need *one more* call to next
self.assertFalse(drained[0])
self.assertIsNone(g.gi_frame)
drained = [False]
# not even close to the whole thing
g = gen()
utils.drain_and_close(g, read_limit=3)
self.assertFalse(drained[0])
self.assertIsNone(g.gi_frame)
drained = [False]
# default is to drain; no limit!
g = gen()
utils.drain_and_close(g)
self.assertIsNone(g.gi_frame)
self.assertTrue(drained[0])
def test_friendly_close_small_body(self):
def small_body_iter():
yield 'a small body'
drained[0] = True
drained = [False]
utils.friendly_close(small_body_iter())
self.assertTrue(drained[0])
def test_friendly_close_large_body(self):
def large_body_iter():
for i in range(10):
chunk = chr(97 + i) * 64 * 2 ** 10
yielded_chunks.append(chunk)
yield chunk
drained[0] = True
drained = [False]
yielded_chunks = []
utils.friendly_close(large_body_iter())
self.assertFalse(drained[0])
self.assertEqual(['a' * 65536], yielded_chunks)
def test_friendly_close_exploding_body(self):
class ExplodingBody(object):
def __init__(self):
self.yielded_chunks = []
self.close_calls = []
self._body = self._exploding_iter()
def _exploding_iter(self):
chunk = 'a' * 63 * 2 ** 10
self.yielded_chunks.append(chunk)
yield chunk
raise Exception('kaboom!')
def __iter__(self):
return self
def __next__(self):
return next(self._body)
next = __next__ # py2
def close(self):
self.close_calls.append(True)
body = ExplodingBody()
with self.assertRaises(Exception) as ctx:
utils.friendly_close(body)
self.assertEqual('kaboom!', str(ctx.exception))
self.assertEqual(['a' * 64512], body.yielded_chunks)
self.assertEqual([True], body.close_calls)
def test_backwards(self):
# Test swift.common.utils.backward

View File

@ -1623,10 +1623,12 @@ class TestWSGIContext(unittest.TestCase):
r = Request.blank('/')
it = wc._app_call(r.environ)
self.assertEqual(wc._response_status, '200 Ok')
self.assertEqual(wc._get_status_int(), 200)
self.assertEqual(b''.join(it), b'Ok\n')
r = Request.blank('/')
it = wc._app_call(r.environ)
self.assertEqual(wc._response_status, '404 Not Found')
self.assertEqual(wc._get_status_int(), 404)
self.assertEqual(b''.join(it), b'Ok\n')
def test_app_iter_is_closable(self):
@ -1645,6 +1647,7 @@ class TestWSGIContext(unittest.TestCase):
r = Request.blank('/')
iterable = wc._app_call(r.environ)
self.assertEqual(wc._response_status, '200 OK')
self.assertEqual(wc._get_status_int(), 200)
iterator = iter(iterable)
self.assertEqual(b'aaaaa', next(iterator))
@ -1665,6 +1668,7 @@ class TestWSGIContext(unittest.TestCase):
it = wc._app_call(r.environ)
wc.update_content_length(35)
self.assertEqual(wc._response_status, '200 Ok')
self.assertEqual(wc._get_status_int(), 200)
self.assertEqual(b''.join(it), b'Ok\n')
self.assertEqual(wc._response_headers, [('Content-Length', '35')])
@ -1680,6 +1684,7 @@ class TestWSGIContext(unittest.TestCase):
it = wc._app_call(r.environ)
wc._response_headers.append(('X-Trans-Id', 'txn'))
self.assertEqual(wc._response_status, '200 Ok')
self.assertEqual(wc._get_status_int(), 200)
self.assertEqual(b''.join(it), b'Ok\n')
self.assertEqual(wc._response_headers, [
('Content-Length', '3'),