Merge "Start to decouple Object GET path"
This commit is contained in:
commit
a7f3fe93ab
@ -854,10 +854,10 @@ class ByteCountEnforcer(object):
|
||||
return chunk
|
||||
|
||||
|
||||
class ResumingGetter(object):
|
||||
class GetOrHeadHandler(object):
|
||||
def __init__(self, app, req, server_type, node_iter, partition, path,
|
||||
backend_headers, concurrency=1, client_chunk_size=None,
|
||||
newest=None, header_provider=None):
|
||||
newest=None):
|
||||
self.app = app
|
||||
self.node_iter = node_iter
|
||||
self.server_type = server_type
|
||||
@ -871,7 +871,6 @@ class ResumingGetter(object):
|
||||
self.used_source_etag = ''
|
||||
self.concurrency = concurrency
|
||||
self.node = None
|
||||
self.header_provider = header_provider
|
||||
self.latest_404_timestamp = Timestamp(0)
|
||||
|
||||
# stuff from request
|
||||
@ -1011,13 +1010,6 @@ class ResumingGetter(object):
|
||||
return True
|
||||
return is_success(src.status) or is_redirection(src.status)
|
||||
|
||||
def response_parts_iter(self, req):
|
||||
source, node = self._get_source_and_node()
|
||||
it = None
|
||||
if source:
|
||||
it = self._get_response_parts_iter(req, node, source)
|
||||
return it
|
||||
|
||||
def _get_response_parts_iter(self, req, node, source):
|
||||
# Someday we can replace this [mess] with python 3's "nonlocal"
|
||||
source = [source]
|
||||
@ -1261,10 +1253,6 @@ class ResumingGetter(object):
|
||||
if node in self.used_nodes:
|
||||
return False
|
||||
req_headers = dict(self.backend_headers)
|
||||
# a request may be specialised with specific backend headers
|
||||
if self.header_provider:
|
||||
req_headers.update(self.header_provider())
|
||||
|
||||
ip, port = get_ip_port(node, req_headers)
|
||||
start_node_timing = time.time()
|
||||
try:
|
||||
@ -1300,9 +1288,8 @@ class ResumingGetter(object):
|
||||
close_swift_conn(possible_source)
|
||||
else:
|
||||
if self.used_source_etag and \
|
||||
self.used_source_etag != normalize_etag(src_headers.get(
|
||||
'x-object-sysmeta-ec-etag',
|
||||
src_headers.get('etag', ''))):
|
||||
self.used_source_etag != normalize_etag(
|
||||
src_headers.get('etag', '')):
|
||||
self.statuses.append(HTTP_NOT_FOUND)
|
||||
self.reasons.append('')
|
||||
self.bodies.append('')
|
||||
@ -1402,18 +1389,14 @@ class ResumingGetter(object):
|
||||
|
||||
# Save off the source etag so that, if we lose the connection
|
||||
# and have to resume from a different node, we can be sure that
|
||||
# we have the same object (replication) or a fragment archive
|
||||
# from the same object (EC). Otherwise, if the cluster has two
|
||||
# versions of the same object, we might end up switching between
|
||||
# old and new mid-stream and giving garbage to the client.
|
||||
self.used_source_etag = normalize_etag(src_headers.get(
|
||||
'x-object-sysmeta-ec-etag', src_headers.get('etag', '')))
|
||||
# we have the same object (replication). Otherwise, if the cluster
|
||||
# has two versions of the same object, we might end up switching
|
||||
# between old and new mid-stream and giving garbage to the client.
|
||||
self.used_source_etag = normalize_etag(src_headers.get('etag', ''))
|
||||
self.node = node
|
||||
return source, node
|
||||
return None, None
|
||||
|
||||
|
||||
class GetOrHeadHandler(ResumingGetter):
|
||||
def _make_app_iter(self, req, node, source):
|
||||
"""
|
||||
Returns an iterator over the contents of the source (via its read
|
||||
|
@ -24,6 +24,7 @@
|
||||
# These shenanigans are to ensure all related objects can be garbage
|
||||
# collected. We've seen objects hang around forever otherwise.
|
||||
|
||||
import six
|
||||
from six.moves.urllib.parse import quote, unquote
|
||||
from six.moves import zip
|
||||
|
||||
@ -35,13 +36,14 @@ import time
|
||||
import math
|
||||
import random
|
||||
from hashlib import md5
|
||||
from swift import gettext_ as _
|
||||
import sys
|
||||
|
||||
from greenlet import GreenletExit
|
||||
from eventlet import GreenPile, sleep
|
||||
from eventlet.queue import Queue
|
||||
from eventlet.timeout import Timeout
|
||||
|
||||
from swift import gettext_ as _
|
||||
from swift.common.utils import (
|
||||
clean_content_type, config_true_value, ContextPool, csv_append,
|
||||
GreenAsyncPile, GreenthreadSafeIterator, Timestamp, WatchdogTimeout,
|
||||
@ -54,18 +56,19 @@ from swift.common import constraints
|
||||
from swift.common.exceptions import ChunkReadTimeout, \
|
||||
ChunkWriteTimeout, ConnectionTimeout, ResponseTimeout, \
|
||||
InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \
|
||||
PutterConnectError, ChunkReadError
|
||||
PutterConnectError, ChunkReadError, RangeAlreadyComplete, ShortReadError
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.http import (
|
||||
is_informational, is_success, is_client_error, is_server_error,
|
||||
is_redirection, HTTP_CONTINUE, HTTP_INTERNAL_SERVER_ERROR,
|
||||
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
|
||||
HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, HTTP_UNPROCESSABLE_ENTITY,
|
||||
HTTP_REQUESTED_RANGE_NOT_SATISFIABLE)
|
||||
HTTP_REQUESTED_RANGE_NOT_SATISFIABLE, HTTP_NOT_FOUND)
|
||||
from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
|
||||
ECDriverError, PolicyError)
|
||||
from swift.proxy.controllers.base import Controller, delay_denial, \
|
||||
cors_validation, ResumingGetter, update_headers
|
||||
cors_validation, update_headers, bytes_to_skip, close_swift_conn, \
|
||||
ByteCountEnforcer, source_key
|
||||
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
|
||||
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
|
||||
HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \
|
||||
@ -73,7 +76,8 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
|
||||
HTTPRequestedRangeNotSatisfiable, Range, HTTPInternalServerError, \
|
||||
normalize_etag
|
||||
from swift.common.request_helpers import update_etag_is_at_header, \
|
||||
resolve_etag_is_at_header, validate_internal_obj, get_ip_port
|
||||
resolve_etag_is_at_header, validate_internal_obj, get_ip_port, \
|
||||
http_response_to_document_iters
|
||||
|
||||
|
||||
def check_content_type(req):
|
||||
@ -2034,7 +2038,7 @@ class ECGetResponseBucket(object):
|
||||
associated with the same frag_index then only one is included.
|
||||
|
||||
:return: a list of sources, each source being a tuple of form
|
||||
(ResumingGetter, iter)
|
||||
(ECFragGetter, iter)
|
||||
"""
|
||||
all_sources = []
|
||||
for frag_index, sources in self.gets.items():
|
||||
@ -2072,7 +2076,7 @@ class ECGetResponseBucket(object):
|
||||
|
||||
class ECGetResponseCollection(object):
|
||||
"""
|
||||
Manages all successful EC GET responses gathered by ResumingGetters.
|
||||
Manages all successful EC GET responses gathered by ECFragGetters.
|
||||
|
||||
A response comprises a tuple of (<getter instance>, <parts iterator>). All
|
||||
responses having the same data timestamp are placed in an
|
||||
@ -2103,7 +2107,7 @@ class ECGetResponseCollection(object):
|
||||
Add a response to the collection.
|
||||
|
||||
:param get: An instance of
|
||||
:class:`~swift.proxy.controllers.base.ResumingGetter`
|
||||
:class:`~swift.proxy.controllers.obj.ECFragGetter`
|
||||
:param parts_iter: An iterator over response body parts
|
||||
:raises ValueError: if the response etag or status code values do not
|
||||
match any values previously received for the same timestamp
|
||||
@ -2233,6 +2237,564 @@ class ECGetResponseCollection(object):
|
||||
return nodes.pop(0).copy()
|
||||
|
||||
|
||||
class ECFragGetter(object):
|
||||
def __init__(self, app, req, server_type, node_iter, partition, path,
|
||||
backend_headers, concurrency=1, client_chunk_size=None,
|
||||
newest=None, header_provider=None):
|
||||
self.app = app
|
||||
self.node_iter = node_iter
|
||||
self.server_type = server_type
|
||||
self.partition = partition
|
||||
self.path = path
|
||||
self.backend_headers = backend_headers
|
||||
self.client_chunk_size = client_chunk_size
|
||||
self.skip_bytes = 0
|
||||
self.bytes_used_from_backend = 0
|
||||
self.used_nodes = []
|
||||
self.used_source_etag = ''
|
||||
self.concurrency = concurrency
|
||||
self.node = None
|
||||
self.header_provider = header_provider
|
||||
self.latest_404_timestamp = Timestamp(0)
|
||||
|
||||
# stuff from request
|
||||
self.req_method = req.method
|
||||
self.req_path = req.path
|
||||
self.req_query_string = req.query_string
|
||||
if newest is None:
|
||||
self.newest = config_true_value(req.headers.get('x-newest', 'f'))
|
||||
else:
|
||||
self.newest = newest
|
||||
|
||||
# populated when finding source
|
||||
self.statuses = []
|
||||
self.reasons = []
|
||||
self.bodies = []
|
||||
self.source_headers = []
|
||||
self.sources = []
|
||||
|
||||
# populated from response headers
|
||||
self.start_byte = self.end_byte = self.length = None
|
||||
|
||||
def fast_forward(self, num_bytes):
|
||||
"""
|
||||
Will skip num_bytes into the current ranges.
|
||||
|
||||
:params num_bytes: the number of bytes that have already been read on
|
||||
this request. This will change the Range header
|
||||
so that the next req will start where it left off.
|
||||
|
||||
:raises HTTPRequestedRangeNotSatisfiable: if begin + num_bytes
|
||||
> end of range + 1
|
||||
:raises RangeAlreadyComplete: if begin + num_bytes == end of range + 1
|
||||
"""
|
||||
try:
|
||||
req_range = Range(self.backend_headers.get('Range'))
|
||||
except ValueError:
|
||||
req_range = None
|
||||
|
||||
if req_range:
|
||||
begin, end = req_range.ranges[0]
|
||||
if begin is None:
|
||||
# this is a -50 range req (last 50 bytes of file)
|
||||
end -= num_bytes
|
||||
if end == 0:
|
||||
# we sent out exactly the first range's worth of bytes, so
|
||||
# we're done with it
|
||||
raise RangeAlreadyComplete()
|
||||
|
||||
if end < 0:
|
||||
raise HTTPRequestedRangeNotSatisfiable()
|
||||
|
||||
else:
|
||||
begin += num_bytes
|
||||
if end is not None and begin == end + 1:
|
||||
# we sent out exactly the first range's worth of bytes, so
|
||||
# we're done with it
|
||||
raise RangeAlreadyComplete()
|
||||
|
||||
if end is not None and begin > end:
|
||||
raise HTTPRequestedRangeNotSatisfiable()
|
||||
|
||||
req_range.ranges = [(begin, end)] + req_range.ranges[1:]
|
||||
self.backend_headers['Range'] = str(req_range)
|
||||
else:
|
||||
self.backend_headers['Range'] = 'bytes=%d-' % num_bytes
|
||||
|
||||
# Reset so if we need to do this more than once, we don't double-up
|
||||
self.bytes_used_from_backend = 0
|
||||
|
||||
def pop_range(self):
|
||||
"""
|
||||
Remove the first byterange from our Range header.
|
||||
|
||||
This is used after a byterange has been completely sent to the
|
||||
client; this way, should we need to resume the download from another
|
||||
object server, we do not re-fetch byteranges that the client already
|
||||
has.
|
||||
|
||||
If we have no Range header, this is a no-op.
|
||||
"""
|
||||
if 'Range' in self.backend_headers:
|
||||
try:
|
||||
req_range = Range(self.backend_headers['Range'])
|
||||
except ValueError:
|
||||
# there's a Range header, but it's garbage, so get rid of it
|
||||
self.backend_headers.pop('Range')
|
||||
return
|
||||
begin, end = req_range.ranges.pop(0)
|
||||
if len(req_range.ranges) > 0:
|
||||
self.backend_headers['Range'] = str(req_range)
|
||||
else:
|
||||
self.backend_headers.pop('Range')
|
||||
|
||||
def learn_size_from_content_range(self, start, end, length):
|
||||
"""
|
||||
If client_chunk_size is set, makes sure we yield things starting on
|
||||
chunk boundaries based on the Content-Range header in the response.
|
||||
|
||||
Sets our Range header's first byterange to the value learned from
|
||||
the Content-Range header in the response; if we were given a
|
||||
fully-specified range (e.g. "bytes=123-456"), this is a no-op.
|
||||
|
||||
If we were given a half-specified range (e.g. "bytes=123-" or
|
||||
"bytes=-456"), then this changes the Range header to a
|
||||
semantically-equivalent one *and* it lets us resume on a proper
|
||||
boundary instead of just in the middle of a piece somewhere.
|
||||
"""
|
||||
if length == 0:
|
||||
return
|
||||
|
||||
if self.client_chunk_size:
|
||||
self.skip_bytes = bytes_to_skip(self.client_chunk_size, start)
|
||||
|
||||
if 'Range' in self.backend_headers:
|
||||
try:
|
||||
req_range = Range(self.backend_headers['Range'])
|
||||
new_ranges = [(start, end)] + req_range.ranges[1:]
|
||||
except ValueError:
|
||||
new_ranges = [(start, end)]
|
||||
else:
|
||||
new_ranges = [(start, end)]
|
||||
|
||||
self.backend_headers['Range'] = (
|
||||
"bytes=" + (",".join("%s-%s" % (s if s is not None else '',
|
||||
e if e is not None else '')
|
||||
for s, e in new_ranges)))
|
||||
|
||||
def is_good_source(self, src):
|
||||
"""
|
||||
Indicates whether or not the request made to the backend found
|
||||
what it was looking for.
|
||||
|
||||
:param src: the response from the backend
|
||||
:returns: True if found, False if not
|
||||
"""
|
||||
if self.server_type == 'Object' and src.status == 416:
|
||||
return True
|
||||
return is_success(src.status) or is_redirection(src.status)
|
||||
|
||||
def response_parts_iter(self, req):
|
||||
source, node = self._get_source_and_node()
|
||||
it = None
|
||||
if source:
|
||||
it = self._get_response_parts_iter(req, node, source)
|
||||
return it
|
||||
|
||||
def _get_response_parts_iter(self, req, node, source):
|
||||
# Someday we can replace this [mess] with python 3's "nonlocal"
|
||||
source = [source]
|
||||
node = [node]
|
||||
|
||||
try:
|
||||
client_chunk_size = self.client_chunk_size
|
||||
node_timeout = self.app.node_timeout
|
||||
if self.server_type == 'Object':
|
||||
node_timeout = self.app.recoverable_node_timeout
|
||||
|
||||
# This is safe; it sets up a generator but does not call next()
|
||||
# on it, so no IO is performed.
|
||||
parts_iter = [
|
||||
http_response_to_document_iters(
|
||||
source[0], read_chunk_size=self.app.object_chunk_size)]
|
||||
|
||||
def get_next_doc_part():
|
||||
while True:
|
||||
try:
|
||||
# This call to next() performs IO when we have a
|
||||
# multipart/byteranges response; it reads the MIME
|
||||
# boundary and part headers.
|
||||
#
|
||||
# If we don't have a multipart/byteranges response,
|
||||
# but just a 200 or a single-range 206, then this
|
||||
# performs no IO, and either just returns source or
|
||||
# raises StopIteration.
|
||||
with WatchdogTimeout(self.app.watchdog, node_timeout,
|
||||
ChunkReadTimeout):
|
||||
# if StopIteration is raised, it escapes and is
|
||||
# handled elsewhere
|
||||
start_byte, end_byte, length, headers, part = next(
|
||||
parts_iter[0])
|
||||
return (start_byte, end_byte, length, headers, part)
|
||||
except ChunkReadTimeout:
|
||||
new_source, new_node = self._get_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
node[0], _('Trying to read object during '
|
||||
'GET (retrying)'))
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source[0], 'swift_conn', None):
|
||||
close_swift_conn(source[0])
|
||||
source[0] = new_source
|
||||
node[0] = new_node
|
||||
# This is safe; it sets up a generator but does
|
||||
# not call next() on it, so no IO is performed.
|
||||
parts_iter[0] = http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
else:
|
||||
raise StopIteration()
|
||||
|
||||
def iter_bytes_from_response_part(part_file, nbytes):
|
||||
nchunks = 0
|
||||
buf = b''
|
||||
part_file = ByteCountEnforcer(part_file, nbytes)
|
||||
while True:
|
||||
try:
|
||||
with WatchdogTimeout(self.app.watchdog, node_timeout,
|
||||
ChunkReadTimeout):
|
||||
chunk = part_file.read(self.app.object_chunk_size)
|
||||
nchunks += 1
|
||||
# NB: this append must be *inside* the context
|
||||
# manager for test.unit.SlowBody to do its thing
|
||||
buf += chunk
|
||||
if nbytes is not None:
|
||||
nbytes -= len(chunk)
|
||||
except (ChunkReadTimeout, ShortReadError):
|
||||
exc_type, exc_value, exc_traceback = sys.exc_info()
|
||||
if self.newest or self.server_type != 'Object':
|
||||
raise
|
||||
try:
|
||||
self.fast_forward(self.bytes_used_from_backend)
|
||||
except (HTTPException, ValueError):
|
||||
six.reraise(exc_type, exc_value, exc_traceback)
|
||||
except RangeAlreadyComplete:
|
||||
break
|
||||
buf = b''
|
||||
new_source, new_node = self._get_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
node[0], _('Trying to read object during '
|
||||
'GET (retrying)'))
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source[0], 'swift_conn', None):
|
||||
close_swift_conn(source[0])
|
||||
source[0] = new_source
|
||||
node[0] = new_node
|
||||
# This is safe; it just sets up a generator but
|
||||
# does not call next() on it, so no IO is
|
||||
# performed.
|
||||
parts_iter[0] = http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
|
||||
try:
|
||||
_junk, _junk, _junk, _junk, part_file = \
|
||||
get_next_doc_part()
|
||||
except StopIteration:
|
||||
# Tried to find a new node from which to
|
||||
# finish the GET, but failed. There's
|
||||
# nothing more we can do here.
|
||||
six.reraise(exc_type, exc_value, exc_traceback)
|
||||
part_file = ByteCountEnforcer(part_file, nbytes)
|
||||
else:
|
||||
six.reraise(exc_type, exc_value, exc_traceback)
|
||||
else:
|
||||
if buf and self.skip_bytes:
|
||||
if self.skip_bytes < len(buf):
|
||||
buf = buf[self.skip_bytes:]
|
||||
self.bytes_used_from_backend += self.skip_bytes
|
||||
self.skip_bytes = 0
|
||||
else:
|
||||
self.skip_bytes -= len(buf)
|
||||
self.bytes_used_from_backend += len(buf)
|
||||
buf = b''
|
||||
|
||||
if not chunk:
|
||||
if buf:
|
||||
with WatchdogTimeout(self.app.watchdog,
|
||||
self.app.client_timeout,
|
||||
ChunkWriteTimeout):
|
||||
self.bytes_used_from_backend += len(buf)
|
||||
yield buf
|
||||
buf = b''
|
||||
break
|
||||
|
||||
if client_chunk_size is not None:
|
||||
while len(buf) >= client_chunk_size:
|
||||
client_chunk = buf[:client_chunk_size]
|
||||
buf = buf[client_chunk_size:]
|
||||
with WatchdogTimeout(self.app.watchdog,
|
||||
self.app.client_timeout,
|
||||
ChunkWriteTimeout):
|
||||
self.bytes_used_from_backend += \
|
||||
len(client_chunk)
|
||||
yield client_chunk
|
||||
else:
|
||||
with WatchdogTimeout(self.app.watchdog,
|
||||
self.app.client_timeout,
|
||||
ChunkWriteTimeout):
|
||||
self.bytes_used_from_backend += len(buf)
|
||||
yield buf
|
||||
buf = b''
|
||||
|
||||
# This is for fairness; if the network is outpacing
|
||||
# the CPU, we'll always be able to read and write
|
||||
# data without encountering an EWOULDBLOCK, and so
|
||||
# eventlet will not switch greenthreads on its own.
|
||||
# We do it manually so that clients don't starve.
|
||||
#
|
||||
# The number 5 here was chosen by making stuff up.
|
||||
# It's not every single chunk, but it's not too big
|
||||
# either, so it seemed like it would probably be an
|
||||
# okay choice.
|
||||
#
|
||||
# Note that we may trampoline to other greenthreads
|
||||
# more often than once every 5 chunks, depending on
|
||||
# how blocking our network IO is; the explicit sleep
|
||||
# here simply provides a lower bound on the rate of
|
||||
# trampolining.
|
||||
if nchunks % 5 == 0:
|
||||
sleep()
|
||||
|
||||
part_iter = None
|
||||
try:
|
||||
while True:
|
||||
start_byte, end_byte, length, headers, part = \
|
||||
get_next_doc_part()
|
||||
# note: learn_size_from_content_range() sets
|
||||
# self.skip_bytes
|
||||
self.learn_size_from_content_range(
|
||||
start_byte, end_byte, length)
|
||||
self.bytes_used_from_backend = 0
|
||||
# not length; that refers to the whole object, so is the
|
||||
# wrong value to use for GET-range responses
|
||||
byte_count = ((end_byte - start_byte + 1) - self.skip_bytes
|
||||
if (end_byte is not None
|
||||
and start_byte is not None)
|
||||
else None)
|
||||
part_iter = iter_bytes_from_response_part(part, byte_count)
|
||||
yield {'start_byte': start_byte, 'end_byte': end_byte,
|
||||
'entity_length': length, 'headers': headers,
|
||||
'part_iter': part_iter}
|
||||
self.pop_range()
|
||||
except StopIteration:
|
||||
req.environ['swift.non_client_disconnect'] = True
|
||||
finally:
|
||||
if part_iter:
|
||||
part_iter.close()
|
||||
|
||||
except ChunkReadTimeout:
|
||||
self.app.exception_occurred(node[0], _('Object'),
|
||||
_('Trying to read during GET'))
|
||||
raise
|
||||
except ChunkWriteTimeout:
|
||||
self.app.logger.warning(
|
||||
_('Client did not read from proxy within %ss') %
|
||||
self.app.client_timeout)
|
||||
self.app.logger.increment('client_timeouts')
|
||||
except GeneratorExit:
|
||||
warn = True
|
||||
req_range = self.backend_headers['Range']
|
||||
if req_range:
|
||||
req_range = Range(req_range)
|
||||
if len(req_range.ranges) == 1:
|
||||
begin, end = req_range.ranges[0]
|
||||
if end is not None and begin is not None:
|
||||
if end - begin + 1 == self.bytes_used_from_backend:
|
||||
warn = False
|
||||
if not req.environ.get('swift.non_client_disconnect') and warn:
|
||||
self.app.logger.warning('Client disconnected on read of %r',
|
||||
self.path)
|
||||
raise
|
||||
except Exception:
|
||||
self.app.logger.exception(_('Trying to send to client'))
|
||||
raise
|
||||
finally:
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source[0], 'swift_conn', None):
|
||||
close_swift_conn(source[0])
|
||||
|
||||
@property
|
||||
def last_status(self):
|
||||
if self.statuses:
|
||||
return self.statuses[-1]
|
||||
else:
|
||||
return None
|
||||
|
||||
@property
|
||||
def last_headers(self):
|
||||
if self.source_headers:
|
||||
return HeaderKeyDict(self.source_headers[-1])
|
||||
else:
|
||||
return None
|
||||
|
||||
def _make_node_request(self, node, node_timeout, logger_thread_locals):
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
if node in self.used_nodes:
|
||||
return False
|
||||
req_headers = dict(self.backend_headers)
|
||||
# a request may be specialised with specific backend headers
|
||||
if self.header_provider:
|
||||
req_headers.update(self.header_provider())
|
||||
ip, port = get_ip_port(node, req_headers)
|
||||
start_node_timing = time.time()
|
||||
try:
|
||||
with ConnectionTimeout(self.app.conn_timeout):
|
||||
conn = http_connect(
|
||||
ip, port, node['device'],
|
||||
self.partition, self.req_method, self.path,
|
||||
headers=req_headers,
|
||||
query_string=self.req_query_string)
|
||||
self.app.set_node_timing(node, time.time() - start_node_timing)
|
||||
|
||||
with Timeout(node_timeout):
|
||||
possible_source = conn.getresponse()
|
||||
# See NOTE: swift_conn at top of file about this.
|
||||
possible_source.swift_conn = conn
|
||||
except (Exception, Timeout):
|
||||
self.app.exception_occurred(
|
||||
node, self.server_type,
|
||||
_('Trying to %(method)s %(path)s') %
|
||||
{'method': self.req_method, 'path': self.req_path})
|
||||
return False
|
||||
|
||||
src_headers = dict(
|
||||
(k.lower(), v) for k, v in
|
||||
possible_source.getheaders())
|
||||
if self.is_good_source(possible_source):
|
||||
# 404 if we know we don't have a synced copy
|
||||
if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
|
||||
self.statuses.append(HTTP_NOT_FOUND)
|
||||
self.reasons.append('')
|
||||
self.bodies.append('')
|
||||
self.source_headers.append([])
|
||||
close_swift_conn(possible_source)
|
||||
else:
|
||||
if self.used_source_etag and \
|
||||
self.used_source_etag != normalize_etag(src_headers.get(
|
||||
'x-object-sysmeta-ec-etag',
|
||||
src_headers.get('etag', ''))):
|
||||
self.statuses.append(HTTP_NOT_FOUND)
|
||||
self.reasons.append('')
|
||||
self.bodies.append('')
|
||||
self.source_headers.append([])
|
||||
return False
|
||||
|
||||
# a possible source should only be added as a valid source
|
||||
# if its timestamp is newer than previously found tombstones
|
||||
ps_timestamp = Timestamp(
|
||||
src_headers.get('x-backend-data-timestamp') or
|
||||
src_headers.get('x-backend-timestamp') or
|
||||
src_headers.get('x-put-timestamp') or
|
||||
src_headers.get('x-timestamp') or 0)
|
||||
if ps_timestamp >= self.latest_404_timestamp:
|
||||
self.statuses.append(possible_source.status)
|
||||
self.reasons.append(possible_source.reason)
|
||||
self.bodies.append(None)
|
||||
self.source_headers.append(possible_source.getheaders())
|
||||
self.sources.append((possible_source, node))
|
||||
if not self.newest: # one good source is enough
|
||||
return True
|
||||
else:
|
||||
if 'handoff_index' in node and \
|
||||
(is_server_error(possible_source.status) or
|
||||
possible_source.status == HTTP_NOT_FOUND) and \
|
||||
not Timestamp(src_headers.get('x-backend-timestamp', 0)):
|
||||
# throw out 5XX and 404s from handoff nodes unless the data is
|
||||
# really on disk and had been DELETEd
|
||||
return False
|
||||
self.statuses.append(possible_source.status)
|
||||
self.reasons.append(possible_source.reason)
|
||||
self.bodies.append(possible_source.read())
|
||||
self.source_headers.append(possible_source.getheaders())
|
||||
|
||||
# if 404, record the timestamp. If a good source shows up, its
|
||||
# timestamp will be compared to the latest 404.
|
||||
# For now checking only on objects, but future work could include
|
||||
# the same check for account and containers. See lp 1560574.
|
||||
if self.server_type == 'Object' and \
|
||||
possible_source.status == HTTP_NOT_FOUND:
|
||||
hdrs = HeaderKeyDict(possible_source.getheaders())
|
||||
ts = Timestamp(hdrs.get('X-Backend-Timestamp', 0))
|
||||
if ts > self.latest_404_timestamp:
|
||||
self.latest_404_timestamp = ts
|
||||
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
|
||||
self.app.error_limit(node, _('ERROR Insufficient Storage'))
|
||||
elif is_server_error(possible_source.status):
|
||||
self.app.error_occurred(
|
||||
node, _('ERROR %(status)d %(body)s '
|
||||
'From %(type)s Server') %
|
||||
{'status': possible_source.status,
|
||||
'body': self.bodies[-1][:1024],
|
||||
'type': self.server_type})
|
||||
return False
|
||||
|
||||
def _get_source_and_node(self):
|
||||
self.statuses = []
|
||||
self.reasons = []
|
||||
self.bodies = []
|
||||
self.source_headers = []
|
||||
self.sources = []
|
||||
|
||||
nodes = GreenthreadSafeIterator(self.node_iter)
|
||||
|
||||
node_timeout = self.app.node_timeout
|
||||
if self.server_type == 'Object' and not self.newest:
|
||||
node_timeout = self.app.recoverable_node_timeout
|
||||
|
||||
pile = GreenAsyncPile(self.concurrency)
|
||||
|
||||
for node in nodes:
|
||||
pile.spawn(self._make_node_request, node, node_timeout,
|
||||
self.app.logger.thread_locals)
|
||||
_timeout = self.app.concurrency_timeout \
|
||||
if pile.inflight < self.concurrency else None
|
||||
if pile.waitfirst(_timeout):
|
||||
break
|
||||
else:
|
||||
# ran out of nodes, see if any stragglers will finish
|
||||
any(pile)
|
||||
|
||||
# this helps weed out any sucess status that were found before a 404
|
||||
# and added to the list in the case of x-newest.
|
||||
if self.sources:
|
||||
self.sources = [s for s in self.sources
|
||||
if source_key(s[0]) >= self.latest_404_timestamp]
|
||||
|
||||
if self.sources:
|
||||
self.sources.sort(key=lambda s: source_key(s[0]))
|
||||
source, node = self.sources.pop()
|
||||
for src, _junk in self.sources:
|
||||
close_swift_conn(src)
|
||||
self.used_nodes.append(node)
|
||||
src_headers = dict(
|
||||
(k.lower(), v) for k, v in
|
||||
source.getheaders())
|
||||
|
||||
# Save off the source etag so that, if we lose the connection
|
||||
# and have to resume from a different node, we can be sure that
|
||||
# we have the same object (replication) or a fragment archive
|
||||
# from the same object (EC). Otherwise, if the cluster has two
|
||||
# versions of the same object, we might end up switching between
|
||||
# old and new mid-stream and giving garbage to the client.
|
||||
self.used_source_etag = normalize_etag(src_headers.get(
|
||||
'x-object-sysmeta-ec-etag', src_headers.get('etag', '')))
|
||||
self.node = node
|
||||
return source, node
|
||||
return None, None
|
||||
|
||||
|
||||
@ObjectControllerRouter.register(EC_POLICY)
|
||||
class ECObjectController(BaseObjectController):
|
||||
def _fragment_GET_request(self, req, node_iter, partition, policy,
|
||||
@ -2243,11 +2805,11 @@ class ECObjectController(BaseObjectController):
|
||||
backend_headers = self.generate_request_headers(
|
||||
req, additional=req.headers)
|
||||
|
||||
getter = ResumingGetter(self.app, req, 'Object', node_iter,
|
||||
partition, req.swift_entity_path,
|
||||
backend_headers,
|
||||
client_chunk_size=policy.fragment_size,
|
||||
newest=False, header_provider=header_provider)
|
||||
getter = ECFragGetter(self.app, req, 'Object', node_iter,
|
||||
partition, req.swift_entity_path,
|
||||
backend_headers,
|
||||
client_chunk_size=policy.fragment_size,
|
||||
newest=False, header_provider=header_provider)
|
||||
return (getter, getter.response_parts_iter(req))
|
||||
|
||||
def _convert_range(self, req, policy):
|
||||
@ -2330,7 +2892,7 @@ class ECObjectController(BaseObjectController):
|
||||
# nodes contain different fragments. Also EC has implemented it's
|
||||
# own specific implementation of concurrent gets to ec_ndata nodes.
|
||||
# So we don't need to worry about plumbing and sending a
|
||||
# concurrency value to ResumingGetter.
|
||||
# concurrency value to ECFragGetter.
|
||||
with ContextPool(policy.ec_ndata) as pool:
|
||||
pile = GreenAsyncPile(pool)
|
||||
buckets = ECGetResponseCollection(policy)
|
||||
|
@ -7297,7 +7297,7 @@ class BaseTestECObjectController(BaseTestObjectController):
|
||||
return super(WrappedTimeout, self).__exit__(typ, value, tb)
|
||||
|
||||
timeouts = {}
|
||||
with mock.patch('swift.proxy.controllers.base.WatchdogTimeout',
|
||||
with mock.patch('swift.proxy.controllers.obj.WatchdogTimeout',
|
||||
WrappedTimeout):
|
||||
with mock.patch.object(_test_servers[0], 'client_timeout', new=5):
|
||||
# get object
|
||||
|
Loading…
x
Reference in New Issue
Block a user