Merge "Enable object server to return non-durable data"

This commit is contained in:
Jenkins 2016-09-16 22:11:36 +00:00 committed by Gerrit Code Review
commit 5126cc844a
11 changed files with 2418 additions and 282 deletions

View File

@ -461,13 +461,9 @@ A few key points on the .durable file:
* The .durable file means \"the matching .data file for this has sufficient * The .durable file means \"the matching .data file for this has sufficient
fragment archives somewhere, committed, to reconstruct the object\". fragment archives somewhere, committed, to reconstruct the object\".
* The Proxy Server will never have knowledge, either on GET or HEAD, of the * When a proxy does a GET, it will require at least one object server to
existence of a .data file on an object server if it does not have a matching respond with a fragment archive that has a matching `.durable` file before
.durable file. reconstructing and returning the object to the client.
* The object server will never return a .data that does not have a matching
.durable.
* When a proxy does a GET, it will only receive fragment archives that have
enough present somewhere to be reconstructed.
Partial PUT Failures Partial PUT Failures
==================== ====================
@ -500,17 +496,39 @@ The GET for EC is different enough from replication that subclassing the
`BaseObjectController` to the `ECObjectController` enables an efficient way to `BaseObjectController` to the `ECObjectController` enables an efficient way to
implement the high level steps described earlier: implement the high level steps described earlier:
#. The proxy server makes simultaneous requests to participating nodes. #. The proxy server makes simultaneous requests to `ec_ndata` primary object
#. As soon as the proxy has the fragments it needs, it calls on PyECLib to server nodes with goal of finding a set of `ec_ndata` distinct EC archives
decode the data. at the same timestamp, and an indication from at least one object server
that a `<timestamp>.durable` file exists for that timestamp. If this goal is
not achieved with the first `ec_ndata` requests then the proxy server
continues to issue requests to the remaining primary nodes and then handoff
nodes.
#. As soon as the proxy server has found a usable set of `ec_ndata` EC
archives, it starts to call PyECLib to decode fragments as they are returned
by the object server nodes.
#. The proxy server creates Etag and content length headers for the client
response since each EC archive's metadata is valid only for that archive.
#. The proxy streams the decoded data it has back to the client. #. The proxy streams the decoded data it has back to the client.
#. Repeat until the proxy is done sending data back to the client.
The GET path will attempt to contact all nodes participating in the EC scheme, Note that the proxy does not require all objects servers to have a `.durable`
if not enough primaries respond then handoffs will be contacted just as with file for the EC archive that they return in response to a GET. The proxy
replication. Etag and content length headers are updated for the client will be satisfied if just one object server has a `.durable` file at the same
response following reconstruction as the individual fragment archives metadata timestamp as EC archives returned from other object servers. This means
is valid only for that fragment archive. that the proxy can successfully GET an object that had missing `.durable` files
when it was PUT (i.e. a partial PUT failure occurred).
Note also that an object server may inform the proxy server that it has more
than one EC archive for different timestamps and/or fragment indexes, which may
cause the proxy server to issue multiple requests for distinct EC archives to
that object server. (This situation can temporarily occur after a ring
rebalance when a handoff node storing an archive has become a primary node and
received its primary archive but not yet moved the handoff archive to its
primary node.)
The proxy may receive EC archives having different timestamps, and may
receive several EC archives having the same index. The proxy therefore
ensures that it has sufficient EC archives with the same timestamp
and distinct fragment indexes before considering a GET to be successful.
Object Server Object Server
------------- -------------

View File

@ -4089,3 +4089,12 @@ def o_tmpfile_supported():
return all([linkat.available, return all([linkat.available,
platform.system() == 'Linux', platform.system() == 'Linux',
LooseVersion(platform.release()) >= LooseVersion('3.16')]) LooseVersion(platform.release()) >= LooseVersion('3.16')])
def safe_json_loads(value):
if value:
try:
return json.loads(value)
except (TypeError, ValueError):
pass
return None

View File

@ -827,15 +827,20 @@ class BaseDiskFileManager(object):
self._process_ondisk_files(exts, results, **kwargs) self._process_ondisk_files(exts, results, **kwargs)
# set final choice of files # set final choice of files
if exts.get('.ts'): if 'data_info' in results:
if exts.get('.meta'):
# only report a meta file if a data file has been chosen
results['meta_info'] = exts['.meta'][0]
ctype_info = exts['.meta'].pop()
if (ctype_info['ctype_timestamp']
> results['data_info']['timestamp']):
results['ctype_info'] = ctype_info
elif exts.get('.ts'):
# only report a ts file if a data file has not been chosen
# (ts files will commonly already have been removed from exts if
# a data file was chosen, but that may not be the case if
# non-durable EC fragment(s) were chosen, hence the elif here)
results['ts_info'] = exts['.ts'][0] results['ts_info'] = exts['.ts'][0]
if 'data_info' in results and exts.get('.meta'):
# only report a meta file if a data file has been chosen
results['meta_info'] = exts['.meta'][0]
ctype_info = exts['.meta'].pop()
if (ctype_info['ctype_timestamp']
> results['data_info']['timestamp']):
results['ctype_info'] = ctype_info
# set ts_file, data_file, meta_file and ctype_file with path to # set ts_file, data_file, meta_file and ctype_file with path to
# chosen file or None # chosen file or None
@ -2635,6 +2640,41 @@ class ECDiskFile(BaseDiskFile):
self._frag_index = None self._frag_index = None
if frag_index is not None: if frag_index is not None:
self._frag_index = self.manager.validate_fragment_index(frag_index) self._frag_index = self.manager.validate_fragment_index(frag_index)
self._frag_prefs = self._validate_frag_prefs(kwargs.get('frag_prefs'))
self._durable_frag_set = None
def _validate_frag_prefs(self, frag_prefs):
"""
Validate that frag_prefs is a list of dicts containing expected keys
'timestamp' and 'exclude'. Convert timestamp values to Timestamp
instances and validate that exclude values are valid fragment indexes.
:param frag_prefs: data to validate, should be a list of dicts.
:raise DiskFileError: if the frag_prefs data is invalid.
:return: a list of dicts with converted and validated values.
"""
# We *do* want to preserve an empty frag_prefs list because it
# indicates that a durable file is not required.
if frag_prefs is None:
return None
try:
return [
{'timestamp': Timestamp(pref['timestamp']),
'exclude': [self.manager.validate_fragment_index(fi)
for fi in pref['exclude']]}
for pref in frag_prefs]
except ValueError as e:
raise DiskFileError(
'Bad timestamp in frag_prefs: %r: %s'
% (frag_prefs, e))
except DiskFileError as e:
raise DiskFileError(
'Bad fragment index in frag_prefs: %r: %s'
% (frag_prefs, e))
except (KeyError, TypeError) as e:
raise DiskFileError(
'Bad frag_prefs: %r: %s' % (frag_prefs, e))
@property @property
def durable_timestamp(self): def durable_timestamp(self):
@ -2671,13 +2711,14 @@ class ECDiskFile(BaseDiskFile):
def _get_ondisk_files(self, files): def _get_ondisk_files(self, files):
""" """
The only difference between this method and the replication policy The only difference between this method and the replication policy
DiskFile method is passing in the frag_index kwarg to our manager's DiskFile method is passing in the frag_index and frag_prefs kwargs to
get_ondisk_files method. our manager's get_ondisk_files method.
:param files: list of file names :param files: list of file names
""" """
self._ondisk_info = self.manager.get_ondisk_files( self._ondisk_info = self.manager.get_ondisk_files(
files, self._datadir, frag_index=self._frag_index) files, self._datadir, frag_index=self._frag_index,
frag_prefs=self._frag_prefs)
return self._ondisk_info return self._ondisk_info
def purge(self, timestamp, frag_index): def purge(self, timestamp, frag_index):
@ -2804,14 +2845,49 @@ class ECDiskFileManager(BaseDiskFileManager):
rv['frag_index'] = None rv['frag_index'] = None
return rv return rv
def _process_ondisk_files(self, exts, results, frag_index=None, **kwargs): def _process_ondisk_files(self, exts, results, frag_index=None,
frag_prefs=None, **kwargs):
""" """
Implement EC policy specific handling of .data and .durable files. Implement EC policy specific handling of .data and .durable files.
If a frag_prefs keyword arg is provided then its value may determine
which fragment index at which timestamp is used to construct the
diskfile. The value of frag_prefs should be a list. Each item in the
frag_prefs list should be a dict that describes per-timestamp
preferences using the following items:
* timestamp: An instance of :class:`~swift.common.utils.Timestamp`.
* exclude: A list of valid fragment indexes (i.e. whole numbers)
that should be EXCLUDED when choosing a fragment at the
timestamp. This list may be empty.
For example::
[
{'timestamp': <Timestamp instance>, 'exclude': [1,3]},
{'timestamp': <Timestamp instance>, 'exclude': []}
]
The order of per-timestamp dicts in the frag_prefs list is significant
and indicates descending preference for fragments from each timestamp
i.e. a fragment that satisfies the first per-timestamp preference in
the frag_prefs will be preferred over a fragment that satisfies a
subsequent per-timestamp preferred, and so on.
If a timestamp is not cited in any per-timestamp preference dict then
it is assumed that any fragment index at that timestamp may be used to
construct the diskfile.
When a frag_prefs arg is provided, including an empty list, there is no
requirement for there to be a durable file at the same timestamp as a
data file that is chosen to construct the disk file
:param exts: dict of lists of file info, keyed by extension :param exts: dict of lists of file info, keyed by extension
:param results: a dict that may be updated with results :param results: a dict that may be updated with results
:param frag_index: if set, search for a specific fragment index .data :param frag_index: if set, search for a specific fragment index .data
file, otherwise accept the first valid .data file. file, otherwise accept the first valid .data file.
:param frag_prefs: if set, search for any fragment index .data file
that satisfies the frag_prefs.
""" """
durable_info = None durable_info = None
if exts.get('.durable'): if exts.get('.durable'):
@ -2841,23 +2917,66 @@ class ECDiskFileManager(BaseDiskFileManager):
if durable_info and durable_info['timestamp'] == timestamp: if durable_info and durable_info['timestamp'] == timestamp:
durable_frag_set = frag_set durable_frag_set = frag_set
# Choose which frag set to use
chosen_frag_set = None
if frag_prefs is not None:
candidate_frag_sets = dict(frag_sets)
# For each per-timestamp frag preference dict, do we have any frag
# indexes at that timestamp that are not in the exclusion list for
# that timestamp? If so choose the highest of those frag_indexes.
for ts, exclude_indexes in [
(ts_pref['timestamp'], ts_pref['exclude'])
for ts_pref in frag_prefs
if ts_pref['timestamp'] in candidate_frag_sets]:
available_indexes = [info['frag_index']
for info in candidate_frag_sets[ts]]
acceptable_indexes = list(set(available_indexes) -
set(exclude_indexes))
if acceptable_indexes:
chosen_frag_set = candidate_frag_sets[ts]
# override any frag_index passed in as method param with
# the last (highest) acceptable_index
frag_index = acceptable_indexes[-1]
break
else:
# this frag_set has no acceptable frag index so
# remove it from the candidate frag_sets
candidate_frag_sets.pop(ts)
else:
# No acceptable frag index was found at any timestamp mentioned
# in the frag_prefs. Choose the newest remaining candidate
# frag_set - the proxy can decide if it wants the returned
# fragment with that time.
if candidate_frag_sets:
ts_newest = sorted(candidate_frag_sets.keys())[-1]
chosen_frag_set = candidate_frag_sets[ts_newest]
else:
chosen_frag_set = durable_frag_set
# Select a single chosen frag from the chosen frag_set, by either # Select a single chosen frag from the chosen frag_set, by either
# matching against a specified frag_index or taking the highest index. # matching against a specified frag_index or taking the highest index.
chosen_frag = None chosen_frag = None
if durable_frag_set: if chosen_frag_set:
if frag_index is not None: if frag_index is not None:
# search the frag set to find the exact frag_index # search the frag set to find the exact frag_index
for info in durable_frag_set: for info in chosen_frag_set:
if info['frag_index'] == frag_index: if info['frag_index'] == frag_index:
chosen_frag = info chosen_frag = info
break break
else: else:
chosen_frag = durable_frag_set[-1] chosen_frag = chosen_frag_set[-1]
# If we successfully found a frag then set results # If we successfully found a frag then set results
if chosen_frag: if chosen_frag:
results['data_info'] = chosen_frag results['data_info'] = chosen_frag
results['durable_frag_set'] = durable_frag_set results['durable_frag_set'] = durable_frag_set
results['chosen_frag_set'] = chosen_frag_set
if chosen_frag_set != durable_frag_set:
# hide meta files older than data file but newer than durable
# file so they don't get marked as obsolete (we already threw
# out .meta's that are older than a .durable)
exts['.meta'], _older = self._split_gt_timestamp(
exts['.meta'], chosen_frag['timestamp'])
results['frag_sets'] = frag_sets results['frag_sets'] = frag_sets
# Mark any isolated .durable as obsolete # Mark any isolated .durable as obsolete
@ -2867,7 +2986,7 @@ class ECDiskFileManager(BaseDiskFileManager):
# Fragments *may* be ready for reclaim, unless they are durable # Fragments *may* be ready for reclaim, unless they are durable
for frag_set in frag_sets.values(): for frag_set in frag_sets.values():
if frag_set == durable_frag_set: if frag_set in (durable_frag_set, chosen_frag_set):
continue continue
results.setdefault('possible_reclaim', []).extend(frag_set) results.setdefault('possible_reclaim', []).extend(frag_set)
@ -2876,19 +2995,24 @@ class ECDiskFileManager(BaseDiskFileManager):
results.setdefault('possible_reclaim', []).extend( results.setdefault('possible_reclaim', []).extend(
exts.get('.meta')) exts.get('.meta'))
def _verify_ondisk_files(self, results, frag_index=None, **kwargs): def _verify_ondisk_files(self, results, frag_index=None,
frag_prefs=None, **kwargs):
""" """
Verify that the final combination of on disk files complies with the Verify that the final combination of on disk files complies with the
erasure-coded diskfile contract. erasure-coded diskfile contract.
:param results: files that have been found and accepted :param results: files that have been found and accepted
:param frag_index: specifies a specific fragment index .data file :param frag_index: specifies a specific fragment index .data file
:param frag_prefs: if set, indicates that fragment preferences have
been specified and therefore that a selected fragment is not
required to be durable.
:returns: True if the file combination is compliant, False otherwise :returns: True if the file combination is compliant, False otherwise
""" """
if super(ECDiskFileManager, self)._verify_ondisk_files( if super(ECDiskFileManager, self)._verify_ondisk_files(
results, **kwargs): results, **kwargs):
have_data_file = results['data_file'] is not None have_data_file = results['data_file'] is not None
have_durable = results.get('durable_frag_set') is not None have_durable = (results.get('durable_frag_set') is not None or
(have_data_file and frag_prefs is not None))
return have_data_file == have_durable return have_data_file == have_durable
return False return False

View File

@ -33,7 +33,7 @@ from swift.common.utils import public, get_logger, \
config_true_value, timing_stats, replication, \ config_true_value, timing_stats, replication, \
normalize_delete_at_timestamp, get_log_line, Timestamp, \ normalize_delete_at_timestamp, get_log_line, Timestamp, \
get_expirer_container, parse_mime_headers, \ get_expirer_container, parse_mime_headers, \
iter_multipart_mime_documents, extract_swift_bytes iter_multipart_mime_documents, extract_swift_bytes, safe_json_loads
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, \ from swift.common.constraints import check_object_creation, \
valid_timestamp, check_utf8 valid_timestamp, check_utf8
@ -84,6 +84,15 @@ def drain(file_like, read_size, timeout):
break break
def _make_backend_fragments_header(fragments):
if fragments:
result = {}
for ts, frag_list in fragments.items():
result[ts.internal] = frag_list
return json.dumps(result)
return None
class EventletPlungerString(str): class EventletPlungerString(str):
""" """
Eventlet won't send headers until it's accumulated at least Eventlet won't send headers until it's accumulated at least
@ -853,10 +862,12 @@ class ObjectController(BaseStorageServer):
"""Handle HTTP GET requests for the Swift Object Server.""" """Handle HTTP GET requests for the Swift Object Server."""
device, partition, account, container, obj, policy = \ device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True) get_name_and_placement(request, 5, 5, True)
frag_prefs = safe_json_loads(
request.headers.get('X-Backend-Fragment-Preferences'))
try: try:
disk_file = self.get_diskfile( disk_file = self.get_diskfile(
device, partition, account, container, obj, device, partition, account, container, obj,
policy=policy) policy=policy, frag_prefs=frag_prefs)
except DiskFileDeviceUnavailable: except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request) return HTTPInsufficientStorage(drive=device, request=request)
try: try:
@ -889,6 +900,13 @@ class ObjectController(BaseStorageServer):
pass pass
response.headers['X-Timestamp'] = file_x_ts.normal response.headers['X-Timestamp'] = file_x_ts.normal
response.headers['X-Backend-Timestamp'] = file_x_ts.internal response.headers['X-Backend-Timestamp'] = file_x_ts.internal
response.headers['X-Backend-Data-Timestamp'] = \
disk_file.data_timestamp.internal
if disk_file.durable_timestamp:
response.headers['X-Backend-Durable-Timestamp'] = \
disk_file.durable_timestamp.internal
response.headers['X-Backend-Fragments'] = \
_make_backend_fragments_header(disk_file.fragments)
resp = request.get_response(response) resp = request.get_response(response)
except DiskFileXattrNotSupported: except DiskFileXattrNotSupported:
return HTTPInsufficientStorage(drive=device, request=request) return HTTPInsufficientStorage(drive=device, request=request)
@ -906,10 +924,12 @@ class ObjectController(BaseStorageServer):
"""Handle HTTP HEAD requests for the Swift Object Server.""" """Handle HTTP HEAD requests for the Swift Object Server."""
device, partition, account, container, obj, policy = \ device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True) get_name_and_placement(request, 5, 5, True)
frag_prefs = safe_json_loads(
request.headers.get('X-Backend-Fragment-Preferences'))
try: try:
disk_file = self.get_diskfile( disk_file = self.get_diskfile(
device, partition, account, container, obj, device, partition, account, container, obj,
policy=policy) policy=policy, frag_prefs=frag_prefs)
except DiskFileDeviceUnavailable: except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request) return HTTPInsufficientStorage(drive=device, request=request)
try: try:
@ -938,6 +958,13 @@ class ObjectController(BaseStorageServer):
# Needed for container sync feature # Needed for container sync feature
response.headers['X-Timestamp'] = ts.normal response.headers['X-Timestamp'] = ts.normal
response.headers['X-Backend-Timestamp'] = ts.internal response.headers['X-Backend-Timestamp'] = ts.internal
response.headers['X-Backend-Data-Timestamp'] = \
disk_file.data_timestamp.internal
if disk_file.durable_timestamp:
response.headers['X-Backend-Durable-Timestamp'] = \
disk_file.durable_timestamp.internal
response.headers['X-Backend-Fragments'] = \
_make_backend_fragments_header(disk_file.fragments)
response.content_length = int(metadata['Content-Length']) response.content_length = int(metadata['Content-Length'])
try: try:
response.content_encoding = metadata['Content-Encoding'] response.content_encoding = metadata['Content-Encoding']

View File

@ -729,7 +729,7 @@ def bytes_to_skip(record_size, range_start):
class ResumingGetter(object): class ResumingGetter(object):
def __init__(self, app, req, server_type, node_iter, partition, path, def __init__(self, app, req, server_type, node_iter, partition, path,
backend_headers, concurrency=1, client_chunk_size=None, backend_headers, concurrency=1, client_chunk_size=None,
newest=None): newest=None, header_provider=None):
self.app = app self.app = app
self.node_iter = node_iter self.node_iter = node_iter
self.server_type = server_type self.server_type = server_type
@ -742,6 +742,8 @@ class ResumingGetter(object):
self.used_nodes = [] self.used_nodes = []
self.used_source_etag = '' self.used_source_etag = ''
self.concurrency = concurrency self.concurrency = concurrency
self.node = None
self.header_provider = header_provider
# stuff from request # stuff from request
self.req_method = req.method self.req_method = req.method
@ -1093,7 +1095,7 @@ class ResumingGetter(object):
@property @property
def last_headers(self): def last_headers(self):
if self.source_headers: if self.source_headers:
return self.source_headers[-1] return HeaderKeyDict(self.source_headers[-1])
else: else:
return None return None
@ -1101,13 +1103,17 @@ class ResumingGetter(object):
self.app.logger.thread_locals = logger_thread_locals self.app.logger.thread_locals = logger_thread_locals
if node in self.used_nodes: if node in self.used_nodes:
return False return False
req_headers = dict(self.backend_headers)
# a request may be specialised with specific backend headers
if self.header_provider:
req_headers.update(self.header_provider())
start_node_timing = time.time() start_node_timing = time.time()
try: try:
with ConnectionTimeout(self.app.conn_timeout): with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect( conn = http_connect(
node['ip'], node['port'], node['device'], node['ip'], node['port'], node['device'],
self.partition, self.req_method, self.path, self.partition, self.req_method, self.path,
headers=self.backend_headers, headers=req_headers,
query_string=self.req_query_string) query_string=self.req_query_string)
self.app.set_node_timing(node, time.time() - start_node_timing) self.app.set_node_timing(node, time.time() - start_node_timing)
@ -1212,6 +1218,7 @@ class ResumingGetter(object):
self.used_source_etag = src_headers.get( self.used_source_etag = src_headers.get(
'x-object-sysmeta-ec-etag', 'x-object-sysmeta-ec-etag',
src_headers.get('etag', '')).strip('"') src_headers.get('etag', '')).strip('"')
self.node = node
return source, node return source, node
return None, None return None, None
@ -1316,6 +1323,7 @@ class NodeIter(object):
self.primary_nodes = self.app.sort_nodes( self.primary_nodes = self.app.sort_nodes(
list(itertools.islice(node_iter, num_primary_nodes))) list(itertools.islice(node_iter, num_primary_nodes)))
self.handoff_iter = node_iter self.handoff_iter = node_iter
self._node_provider = None
def __iter__(self): def __iter__(self):
self._node_iter = self._node_gen() self._node_iter = self._node_gen()
@ -1344,6 +1352,16 @@ class NodeIter(object):
# all the primaries were skipped, and handoffs didn't help # all the primaries were skipped, and handoffs didn't help
self.app.logger.increment('handoff_all_count') self.app.logger.increment('handoff_all_count')
def set_node_provider(self, callback):
"""
Install a callback function that will be used during a call to next()
to get an alternate node instead of returning the next node from the
iterator.
:param callback: A no argument function that should return a node dict
or None.
"""
self._node_provider = callback
def _node_gen(self): def _node_gen(self):
for node in self.primary_nodes: for node in self.primary_nodes:
if not self.app.error_limited(node): if not self.app.error_limited(node):
@ -1364,6 +1382,11 @@ class NodeIter(object):
return return
def next(self): def next(self):
if self._node_provider:
# give node provider the opportunity to inject a node
node = self._node_provider()
if node:
return node
return next(self._node_iter) return next(self._node_iter)
def __next__(self): def __next__(self):

View File

@ -47,7 +47,7 @@ from swift.common.utils import (
GreenAsyncPile, GreenthreadSafeIterator, Timestamp, GreenAsyncPile, GreenthreadSafeIterator, Timestamp,
normalize_delete_at_timestamp, public, get_expirer_container, normalize_delete_at_timestamp, public, get_expirer_container,
document_iters_to_http_response_body, parse_content_range, document_iters_to_http_response_body, parse_content_range,
quorum_size, reiterate, close_if_possible) quorum_size, reiterate, close_if_possible, safe_json_loads)
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation from swift.common.constraints import check_metadata, check_object_creation
from swift.common import constraints from swift.common import constraints
@ -1835,9 +1835,262 @@ def trailing_metadata(policy, client_obj_hasher,
}) })
class ECGetResponseBucket(object):
"""
A helper class to encapsulate the properties of buckets in which fragment
getters and alternate nodes are collected.
"""
def __init__(self, policy, timestamp_str):
"""
:param policy: an instance of ECStoragePolicy
:param timestamp_str: a string representation of a timestamp
"""
self.policy = policy
self.timestamp_str = timestamp_str
self.gets = collections.defaultdict(list)
self.alt_nodes = collections.defaultdict(list)
self._durable = False
self.status = self.headers = None
def set_durable(self):
self._durable = True
def add_response(self, getter, parts_iter):
if not self.gets:
self.status = getter.last_status
# stash first set of backend headers, which will be used to
# populate a client response
# TODO: each bucket is for a single *data* timestamp, but sources
# in the same bucket may have different *metadata* timestamps if
# some backends have more recent .meta files than others. Currently
# we just use the last received metadata headers - this behavior is
# ok and is consistent with a replication policy GET which
# similarly does not attempt to find the backend with the most
# recent metadata. We could alternatively choose to the *newest*
# metadata headers for self.headers by selecting the source with
# the latest X-Timestamp.
self.headers = getter.last_headers
elif (getter.last_headers.get('X-Object-Sysmeta-Ec-Etag') !=
self.headers.get('X-Object-Sysmeta-Ec-Etag')):
# Fragments at the same timestamp with different etags are never
# expected. If somehow it happens then ignore those fragments
# to avoid mixing fragments that will not reconstruct otherwise
# an exception from pyeclib is almost certain. This strategy leaves
# a possibility that a set of consistent frags will be gathered.
raise ValueError("ETag mismatch")
frag_index = getter.last_headers.get('X-Object-Sysmeta-Ec-Frag-Index')
frag_index = int(frag_index) if frag_index is not None else None
self.gets[frag_index].append((getter, parts_iter))
def get_responses(self):
"""
Return a list of all useful sources. Where there are multiple sources
associated with the same frag_index then only one is included.
:return: a list of sources, each source being a tuple of form
(ResumingGetter, iter)
"""
all_sources = []
for frag_index, sources in self.gets.items():
if frag_index is None:
# bad responses don't have a frag_index (and fake good
# responses from some unit tests)
all_sources.extend(sources)
else:
all_sources.extend(sources[:1])
return all_sources
def add_alternate_nodes(self, node, frag_indexes):
for frag_index in frag_indexes:
self.alt_nodes[frag_index].append(node)
@property
def shortfall(self):
# A non-durable bucket always has a shortfall of at least 1
result = self.policy.ec_ndata - len(self.get_responses())
return max(result, 0 if self._durable else 1)
@property
def shortfall_with_alts(self):
# The shortfall that we expect to have if we were to send requests
# for frags on the alt nodes.
alts = set(self.alt_nodes.keys()).difference(set(self.gets.keys()))
result = self.policy.ec_ndata - (len(self.get_responses()) + len(alts))
return max(result, 0 if self._durable else 1)
def __str__(self):
# return a string summarising bucket state, useful for debugging.
return '<%s, %s, %s, %s(%s), %s>' \
% (self.timestamp_str, self.status, self._durable,
self.shortfall, self.shortfall_with_alts, len(self.gets))
class ECGetResponseCollection(object):
"""
Manages all successful EC GET responses gathered by ResumingGetters.
A response comprises a tuple of (<getter instance>, <parts iterator>). All
responses having the same data timestamp are placed in an
ECGetResponseBucket for that timestamp. The buckets are stored in the
'buckets' dict which maps timestamp-> bucket.
This class encapsulates logic for selecting the best bucket from the
collection, and for choosing alternate nodes.
"""
def __init__(self, policy):
"""
:param policy: an instance of ECStoragePolicy
"""
self.policy = policy
self.buckets = {}
self.node_iter_count = 0
def _get_bucket(self, timestamp_str):
"""
:param timestamp_str: a string representation of a timestamp
:return: ECGetResponseBucket for given timestamp
"""
return self.buckets.setdefault(
timestamp_str, ECGetResponseBucket(self.policy, timestamp_str))
def add_response(self, get, parts_iter):
"""
Add a response to the collection.
:param get: An instance of
:class:`~swift.proxy.controllers.base.ResumingGetter`
:param parts_iter: An iterator over response body parts
:raises ValueError: if the response etag or status code values do not
match any values previously received for the same timestamp
"""
headers = get.last_headers
# Add the response to the appropriate bucket keyed by data file
# timestamp. Fall back to using X-Backend-Timestamp as key for object
# servers that have not been upgraded.
t_data_file = headers.get('X-Backend-Data-Timestamp')
t_obj = headers.get('X-Backend-Timestamp', headers.get('X-Timestamp'))
self._get_bucket(t_data_file or t_obj).add_response(get, parts_iter)
# The node may also have alternate fragments indexes (possibly at
# different timestamps). For each list of alternate fragments indexes,
# find the bucket for their data file timestamp and add the node and
# list to that bucket's alternate nodes.
frag_sets = safe_json_loads(headers.get('X-Backend-Fragments')) or {}
for t_frag, frag_set in frag_sets.items():
self._get_bucket(t_frag).add_alternate_nodes(get.node, frag_set)
# If the response includes a durable timestamp then mark that bucket as
# durable. Note that this may be a different bucket than the one this
# response got added to, and that we may never go and get a durable
# frag from this node; it is sufficient that we have been told that a
# .durable exists, somewhere, at t_durable.
t_durable = headers.get('X-Backend-Durable-Timestamp')
if not t_durable and not t_data_file:
# obj server not upgraded so assume this response's frag is durable
t_durable = t_obj
if t_durable:
self._get_bucket(t_durable).set_durable()
def _sort_buckets(self):
def key_fn(bucket):
# Returns a tuple to use for sort ordering:
# buckets with no shortfall sort higher,
# otherwise buckets with lowest shortfall_with_alts sort higher,
# finally buckets with newer timestamps sort higher.
return (bucket.shortfall <= 0,
(not (bucket.shortfall <= 0) and
(-1 * bucket.shortfall_with_alts)),
bucket.timestamp_str)
return sorted(self.buckets.values(), key=key_fn, reverse=True)
@property
def best_bucket(self):
"""
Return the best bucket in the collection.
The "best" bucket is the newest timestamp with sufficient getters, or
the closest to having a sufficient getters, unless it is bettered by a
bucket with potential alternate nodes.
:return: An instance of :class:`~ECGetResponseBucket` or None if there
are no buckets in the collection.
"""
sorted_buckets = self._sort_buckets()
if sorted_buckets:
return sorted_buckets[0]
return None
def _get_frag_prefs(self):
# Construct the current frag_prefs list, with best_bucket prefs first.
frag_prefs = []
for bucket in self._sort_buckets():
if bucket.timestamp_str:
exclusions = [fi for fi in bucket.gets if fi is not None]
prefs = {'timestamp': bucket.timestamp_str,
'exclude': exclusions}
frag_prefs.append(prefs)
return frag_prefs
def get_extra_headers(self):
frag_prefs = self._get_frag_prefs()
return {'X-Backend-Fragment-Preferences': json.dumps(frag_prefs)}
def _get_alternate_nodes(self):
if self.node_iter_count <= self.policy.ec_ndata:
# It makes sense to wait before starting to use alternate nodes,
# because if we find sufficient frags on *distinct* nodes then we
# spread work across mode nodes. There's no formal proof that
# waiting for ec_ndata GETs is the right answer, but it seems
# reasonable to try *at least* that many primary nodes before
# resorting to alternate nodes.
return None
bucket = self.best_bucket
if (bucket is None) or (bucket.shortfall <= 0):
return None
alt_frags = set(bucket.alt_nodes.keys())
got_frags = set(bucket.gets.keys())
wanted_frags = list(alt_frags.difference(got_frags))
# We may have the same frag_index on more than one node so shuffle to
# avoid using the same frag_index consecutively, since we may not get a
# response from the last node provided before being asked to provide
# another node.
random.shuffle(wanted_frags)
for frag_index in wanted_frags:
nodes = bucket.alt_nodes.get(frag_index)
if nodes:
return nodes
return None
def has_alternate_node(self):
return True if self._get_alternate_nodes() else False
def provide_alternate_node(self):
"""
Callback function that is installed in a NodeIter. Called on every call
to NodeIter.next(), which means we can track the number of nodes to
which GET requests have been made and selectively inject an alternate
node, if we have one.
:return: A dict describing a node to which the next GET request
should be made.
"""
self.node_iter_count += 1
nodes = self._get_alternate_nodes()
if nodes:
return nodes.pop(0).copy()
@ObjectControllerRouter.register(EC_POLICY) @ObjectControllerRouter.register(EC_POLICY)
class ECObjectController(BaseObjectController): class ECObjectController(BaseObjectController):
def _fragment_GET_request(self, req, node_iter, partition, policy): def _fragment_GET_request(self, req, node_iter, partition, policy,
header_provider=None):
""" """
Makes a GET request for a fragment. Makes a GET request for a fragment.
""" """
@ -1848,7 +2101,7 @@ class ECObjectController(BaseObjectController):
partition, req.swift_entity_path, partition, req.swift_entity_path,
backend_headers, backend_headers,
client_chunk_size=policy.fragment_size, client_chunk_size=policy.fragment_size,
newest=False) newest=False, header_provider=header_provider)
return (getter, getter.response_parts_iter(req)) return (getter, getter.response_parts_iter(req))
def _convert_range(self, req, policy): def _convert_range(self, req, policy):
@ -1914,93 +2167,130 @@ class ECObjectController(BaseObjectController):
resp = self.GETorHEAD_base( resp = self.GETorHEAD_base(
req, _('Object'), node_iter, partition, req, _('Object'), node_iter, partition,
req.swift_entity_path, concurrency) req.swift_entity_path, concurrency)
else: # GET request self._fix_response(req, resp)
orig_range = None return resp
range_specs = []
if req.range:
orig_range = req.range
range_specs = self._convert_range(req, policy)
safe_iter = GreenthreadSafeIterator(node_iter) # GET request
# Sending the request concurrently to all nodes, and responding orig_range = None
# with the first response isn't something useful for EC as all range_specs = []
# nodes contain different fragments. Also EC has implemented it's if req.range:
# own specific implementation of concurrent gets to ec_ndata nodes. orig_range = req.range
# So we don't need to worry about plumbing and sending a range_specs = self._convert_range(req, policy)
# concurrency value to ResumingGetter.
with ContextPool(policy.ec_ndata) as pool:
pile = GreenAsyncPile(pool)
for _junk in range(policy.ec_ndata):
pile.spawn(self._fragment_GET_request,
req, safe_iter, partition,
policy)
bad_gets = [] safe_iter = GreenthreadSafeIterator(node_iter)
etag_buckets = collections.defaultdict(list) # Sending the request concurrently to all nodes, and responding
best_etag = None # with the first response isn't something useful for EC as all
for get, parts_iter in pile: # nodes contain different fragments. Also EC has implemented it's
if is_success(get.last_status): # own specific implementation of concurrent gets to ec_ndata nodes.
etag = HeaderKeyDict( # So we don't need to worry about plumbing and sending a
get.last_headers)['X-Object-Sysmeta-Ec-Etag'] # concurrency value to ResumingGetter.
etag_buckets[etag].append((get, parts_iter)) with ContextPool(policy.ec_ndata) as pool:
if etag != best_etag and ( pile = GreenAsyncPile(pool)
len(etag_buckets[etag]) > buckets = ECGetResponseCollection(policy)
len(etag_buckets[best_etag])): node_iter.set_node_provider(buckets.provide_alternate_node)
best_etag = etag # include what may well be an empty X-Backend-Fragment-Preferences
else: # header from the buckets.get_extra_headers to let the object
bad_gets.append((get, parts_iter)) # server know that it is ok to return non-durable fragments
matching_response_count = max( for _junk in range(policy.ec_ndata):
len(etag_buckets[best_etag]), len(bad_gets)) pile.spawn(self._fragment_GET_request,
if (policy.ec_ndata - matching_response_count > req, safe_iter, partition,
pile._pending) and node_iter.nodes_left > 0: policy, buckets.get_extra_headers)
# we need more matching responses to reach ec_ndata
# than we have pending gets, as long as we still have
# nodes in node_iter we can spawn another
pile.spawn(self._fragment_GET_request, req,
safe_iter, partition, policy)
req.range = orig_range bad_bucket = ECGetResponseBucket(policy, None)
if len(etag_buckets[best_etag]) >= policy.ec_ndata: bad_bucket.set_durable()
# headers can come from any of the getters best_bucket = None
resp_headers = HeaderKeyDict( extra_requests = 0
etag_buckets[best_etag][0][0].source_headers[-1]) # max_extra_requests is an arbitrary hard limit for spawning extra
resp_headers.pop('Content-Range', None) # getters in case some unforeseen scenario, or a misbehaving object
eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length') # server, causes us to otherwise make endless requests e.g. if an
obj_length = int(eccl) if eccl is not None else None # object server were to ignore frag_prefs and always respond with
# a frag that is already in a bucket.
# This is only true if we didn't get a 206 response, but max_extra_requests = 2 * policy.ec_nparity + policy.ec_ndata
# that's the only time this is used anyway. for get, parts_iter in pile:
fa_length = int(resp_headers['Content-Length']) if get.last_status is None:
app_iter = ECAppIter( # We may have spawned getters that find the node iterator
req.swift_entity_path, # has been exhausted. Ignore them.
policy, # TODO: turns out that node_iter.nodes_left can bottom
[iterator for getter, iterator in etag_buckets[best_etag]], # out at >0 when number of devs in ring is < 2* replicas,
range_specs, fa_length, obj_length, # which definitely happens in tests and results in status
self.app.logger) # of None. We should fix that but keep this guard because
resp = Response( # there is also a race between testing nodes_left/spawning
request=req, # a getter and an existing getter calling next(node_iter).
headers=resp_headers, continue
conditional_response=True,
app_iter=app_iter)
try: try:
app_iter.kickoff(req, resp) if is_success(get.last_status):
except HTTPException as err_resp: # 2xx responses are managed by a response collection
# catch any HTTPException response here so that we can buckets.add_response(get, parts_iter)
# process response headers uniformly in _fix_response else:
resp = err_resp # all other responses are lumped into a single bucket
else: bad_bucket.add_response(get, parts_iter)
statuses = [] except ValueError as err:
reasons = [] self.app.logger.error(
bodies = [] _("Problem with fragment response: %s"), err)
headers = [] shortfall = bad_bucket.shortfall
for getter, body_parts_iter in bad_gets: best_bucket = buckets.best_bucket
statuses.extend(getter.statuses) if best_bucket:
reasons.extend(getter.reasons) shortfall = min(best_bucket.shortfall, shortfall)
bodies.extend(getter.bodies) if (extra_requests < max_extra_requests and
headers.extend(getter.source_headers) shortfall > pile._pending and
resp = self.best_response( (node_iter.nodes_left > 0 or
req, statuses, reasons, bodies, 'Object', buckets.has_alternate_node())):
headers=headers) # we need more matching responses to reach ec_ndata
# than we have pending gets, as long as we still have
# nodes in node_iter we can spawn another
extra_requests += 1
pile.spawn(self._fragment_GET_request, req,
safe_iter, partition, policy,
buckets.get_extra_headers)
req.range = orig_range
if best_bucket and best_bucket.shortfall <= 0:
# headers can come from any of the getters
resp_headers = best_bucket.headers
resp_headers.pop('Content-Range', None)
eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length')
obj_length = int(eccl) if eccl is not None else None
# This is only true if we didn't get a 206 response, but
# that's the only time this is used anyway.
fa_length = int(resp_headers['Content-Length'])
app_iter = ECAppIter(
req.swift_entity_path,
policy,
[parts_iter for
_getter, parts_iter in best_bucket.get_responses()],
range_specs, fa_length, obj_length,
self.app.logger)
resp = Response(
request=req,
headers=resp_headers,
conditional_response=True,
app_iter=app_iter)
try:
app_iter.kickoff(req, resp)
except HTTPException as err_resp:
# catch any HTTPException response here so that we can
# process response headers uniformly in _fix_response
resp = err_resp
else:
# TODO: we can get here if all buckets are successful but none
# have ec_ndata getters, so bad_bucket may have no gets and we will
# return a 503 when a 404 may be more appropriate. We can also get
# here with less than ec_ndata 416's and may then return a 416
# which is also questionable because a non-range get for same
# object would return 404 or 503.
statuses = []
reasons = []
bodies = []
headers = []
for getter, _parts_iter in bad_bucket.get_responses():
statuses.extend(getter.statuses)
reasons.extend(getter.reasons)
bodies.extend(getter.bodies)
headers.extend(getter.source_headers)
resp = self.best_response(
req, statuses, reasons, bodies, 'Object',
headers=headers)
self._fix_response(req, resp) self._fix_response(req, resp)
return resp return resp

View File

@ -3664,6 +3664,36 @@ cluster_dfw1 = http://dfw1.host/v1/
os.close(fd) os.close(fd)
shutil.rmtree(tempdir) shutil.rmtree(tempdir)
def test_safe_json_loads(self):
expectations = {
None: None,
'': None,
0: None,
1: None,
'"asdf"': 'asdf',
'[]': [],
'{}': {},
"{'foo': 'bar'}": None,
'{"foo": "bar"}': {'foo': 'bar'},
}
failures = []
for value, expected in expectations.items():
try:
result = utils.safe_json_loads(value)
except Exception as e:
# it's called safe, if it blows up the test blows up
self.fail('%r caused safe method to throw %r!' % (
value, e))
try:
self.assertEqual(expected, result)
except AssertionError:
failures.append('%r => %r (expected %r)' % (
value, result, expected))
if failures:
self.fail('Invalid results from pure function:\n%s' %
'\n'.join(failures))
class ResellerConfReader(unittest.TestCase): class ResellerConfReader(unittest.TestCase):

View File

@ -591,14 +591,16 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
def tearDown(self): def tearDown(self):
rmtree(self.tmpdir, ignore_errors=1) rmtree(self.tmpdir, ignore_errors=1)
def _get_diskfile(self, policy, frag_index=None): def _get_diskfile(self, policy, frag_index=None, **kwargs):
df_mgr = self.df_router[policy] df_mgr = self.df_router[policy]
return df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o', return df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
policy=policy, frag_index=frag_index) policy=policy, frag_index=frag_index,
**kwargs)
def _test_get_ondisk_files(self, scenarios, policy, def _test_get_ondisk_files(self, scenarios, policy,
frag_index=None): frag_index=None, **kwargs):
class_under_test = self._get_diskfile(policy, frag_index=frag_index) class_under_test = self._get_diskfile(
policy, frag_index=frag_index, **kwargs)
for test in scenarios: for test in scenarios:
# test => [('filename.ext', '.ext'|False, ...), ...] # test => [('filename.ext', '.ext'|False, ...), ...]
expected = { expected = {
@ -610,7 +612,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
files = list(list(zip(*test))[0]) files = list(list(zip(*test))[0])
for _order in ('ordered', 'shuffled', 'shuffled'): for _order in ('ordered', 'shuffled', 'shuffled'):
class_under_test = self._get_diskfile(policy, frag_index) class_under_test = self._get_diskfile(
policy, frag_index=frag_index, **kwargs)
try: try:
actual = class_under_test._get_ondisk_files(files) actual = class_under_test._get_ondisk_files(files)
self._assertDictContainsSubset( self._assertDictContainsSubset(
@ -621,8 +624,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
self.fail('%s with files %s' % (str(e), files)) self.fail('%s with files %s' % (str(e), files))
shuffle(files) shuffle(files)
def _test_cleanup_ondisk_files_files(self, scenarios, policy, def _test_cleanup_ondisk_files(self, scenarios, policy,
reclaim_age=None): reclaim_age=None):
# check that expected files are left in hashdir after cleanup # check that expected files are left in hashdir after cleanup
for test in scenarios: for test in scenarios:
class_under_test = self.df_router[policy] class_under_test = self.df_router[policy]
@ -753,8 +756,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
[('%s.meta' % older, False, False), [('%s.meta' % older, False, False),
('%s.ts' % older, False, False)]] ('%s.ts' % older, False, False)]]
self._test_cleanup_ondisk_files_files(scenarios, POLICIES.default, self._test_cleanup_ondisk_files(scenarios, POLICIES.default,
reclaim_age=1000) reclaim_age=1000)
def test_construct_dev_path(self): def test_construct_dev_path(self):
res_path = self.df_mgr.construct_dev_path('abc') res_path = self.df_mgr.construct_dev_path('abc')
@ -1165,7 +1168,7 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
] ]
self._test_get_ondisk_files(scenarios, POLICIES[0], None) self._test_get_ondisk_files(scenarios, POLICIES[0], None)
self._test_cleanup_ondisk_files_files(scenarios, POLICIES[0]) self._test_cleanup_ondisk_files(scenarios, POLICIES[0])
self._test_yield_hashes_cleanup(scenarios, POLICIES[0]) self._test_yield_hashes_cleanup(scenarios, POLICIES[0])
def test_get_ondisk_files_with_stray_meta(self): def test_get_ondisk_files_with_stray_meta(self):
@ -1248,8 +1251,8 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
[('%s.meta' % older, '.meta'), [('%s.meta' % older, '.meta'),
('%s.data' % much_older, '.data')]] ('%s.data' % much_older, '.data')]]
self._test_cleanup_ondisk_files_files(scenarios, POLICIES.default, self._test_cleanup_ondisk_files(scenarios, POLICIES.default,
reclaim_age=1000) reclaim_age=1000)
def test_yield_hashes(self): def test_yield_hashes(self):
old_ts = '1383180000.12345' old_ts = '1383180000.12345'
@ -1437,23 +1440,6 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
('0000000007.00000#1.data', '.data'), ('0000000007.00000#1.data', '.data'),
('0000000007.00000#0.data', False, True)], ('0000000007.00000#0.data', False, True)],
# data with no durable is ignored
[('0000000007.00000#0.data', False, True)],
# data newer than tombstone with no durable is ignored
[('0000000007.00000#0.data', False, True),
('0000000006.00000.ts', '.ts', True)],
# data newer than durable is ignored
[('0000000008.00000#1.data', False, True),
('0000000007.00000.durable', '.durable'),
('0000000007.00000#1.data', '.data'),
('0000000007.00000#0.data', False, True)],
# data newer than durable ignored, even if its only data
[('0000000008.00000#1.data', False, True),
('0000000007.00000.durable', False, False)],
# data older than durable is ignored # data older than durable is ignored
[('0000000007.00000.durable', '.durable'), [('0000000007.00000.durable', '.durable'),
('0000000007.00000#1.data', '.data'), ('0000000007.00000#1.data', '.data'),
@ -1489,16 +1475,79 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
('0000000006.00000.ts', '.ts'), ('0000000006.00000.ts', '.ts'),
('0000000006.00000.durable', False), ('0000000006.00000.durable', False),
('0000000006.00000#0.data', False)], ('0000000006.00000#0.data', False)],
# missing durable invalidates data
[('0000000006.00000.meta', False, True),
('0000000006.00000#0.data', False, True)]
] ]
self._test_get_ondisk_files(scenarios, POLICIES.default, None) # these scenarios have same outcome regardless of whether any
self._test_cleanup_ondisk_files_files(scenarios, POLICIES.default) # fragment preferences are specified
self._test_get_ondisk_files(scenarios, POLICIES.default,
frag_index=None)
self._test_get_ondisk_files(scenarios, POLICIES.default,
frag_index=None, frag_prefs=[])
self._test_cleanup_ondisk_files(scenarios, POLICIES.default)
self._test_yield_hashes_cleanup(scenarios, POLICIES.default) self._test_yield_hashes_cleanup(scenarios, POLICIES.default)
# next scenarios have different outcomes dependent on whether a
# frag_prefs parameter is passed to diskfile constructor or not
scenarios = [
# data with no durable is ignored
[('0000000007.00000#0.data', False, True)],
# data newer than tombstone with no durable is ignored
[('0000000007.00000#0.data', False, True),
('0000000006.00000.ts', '.ts', True)],
# data newer than durable is ignored
[('0000000009.00000#2.data', False, True),
('0000000009.00000#1.data', False, True),
('0000000008.00000#3.data', False, True),
('0000000007.00000.durable', '.durable'),
('0000000007.00000#1.data', '.data'),
('0000000007.00000#0.data', False, True)],
# data newer than durable ignored, even if its only data
[('0000000008.00000#1.data', False, True),
('0000000007.00000.durable', False, False)],
# missing durable invalidates data, older meta deleted
[('0000000007.00000.meta', False, True),
('0000000006.00000#0.data', False, True),
('0000000005.00000.meta', False, False),
('0000000004.00000#1.data', False, True)]]
self._test_get_ondisk_files(scenarios, POLICIES.default,
frag_index=None)
self._test_cleanup_ondisk_files(scenarios, POLICIES.default)
scenarios = [
# data with no durable is chosen
[('0000000007.00000#0.data', '.data', True)],
# data newer than tombstone with no durable is chosen
[('0000000007.00000#0.data', '.data', True),
('0000000006.00000.ts', False, True)],
# data newer than durable is chosen, older data preserved
[('0000000009.00000#2.data', '.data', True),
('0000000009.00000#1.data', False, True),
('0000000008.00000#3.data', False, True),
('0000000007.00000.durable', False, True),
('0000000007.00000#1.data', False, True),
('0000000007.00000#0.data', False, True)],
# data newer than durable chosen when its only data
[('0000000008.00000#1.data', '.data', True),
('0000000007.00000.durable', False, False)],
# data plus meta chosen without durable, older meta deleted
[('0000000007.00000.meta', '.meta', True),
('0000000006.00000#0.data', '.data', True),
('0000000005.00000.meta', False, False),
('0000000004.00000#1.data', False, True)]]
self._test_get_ondisk_files(scenarios, POLICIES.default,
frag_index=None, frag_prefs=[])
self._test_cleanup_ondisk_files(scenarios, POLICIES.default)
def test_get_ondisk_files_with_ec_policy_and_frag_index(self): def test_get_ondisk_files_with_ec_policy_and_frag_index(self):
# Each scenario specifies a list of (filename, extension) tuples. If # Each scenario specifies a list of (filename, extension) tuples. If
# extension is set then that filename should be returned by the method # extension is set then that filename should be returned by the method
@ -1512,20 +1561,20 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
[('0000000007.00000#2.data', False, True), [('0000000007.00000#2.data', False, True),
('0000000007.00000#1.data', False, True), ('0000000007.00000#1.data', False, True),
('0000000007.00000#0.data', False, True), ('0000000007.00000#0.data', False, True),
('0000000006.00000.durable', '.durable')], ('0000000006.00000.durable', False)],
# specific frag older than durable is ignored # specific frag older than durable is ignored
[('0000000007.00000#2.data', False), [('0000000007.00000#2.data', False),
('0000000007.00000#1.data', False), ('0000000007.00000#1.data', False),
('0000000007.00000#0.data', False), ('0000000007.00000#0.data', False),
('0000000008.00000.durable', '.durable')], ('0000000008.00000.durable', False)],
# specific frag older than newest durable is ignored # specific frag older than newest durable is ignored
# even if is also has a durable # even if is also has a durable
[('0000000007.00000#2.data', False), [('0000000007.00000#2.data', False),
('0000000007.00000#1.data', False), ('0000000007.00000#1.data', False),
('0000000007.00000.durable', False), ('0000000007.00000.durable', False),
('0000000008.00000#0.data', False), ('0000000008.00000#0.data', False, True),
('0000000008.00000.durable', '.durable')], ('0000000008.00000.durable', '.durable')],
# meta included when frag index is specified # meta included when frag index is specified
@ -1559,12 +1608,23 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
# frag_index, get_ondisk_files will tolerate .meta with # frag_index, get_ondisk_files will tolerate .meta with
# no .data # no .data
[('0000000088.00000.meta', False, True), [('0000000088.00000.meta', False, True),
('0000000077.00000.durable', '.durable')] ('0000000077.00000.durable', False, False)]
] ]
self._test_get_ondisk_files(scenarios, POLICIES.default, frag_index=1) self._test_get_ondisk_files(scenarios, POLICIES.default, frag_index=1)
# note: not calling self._test_cleanup_ondisk_files_files(scenarios, 0) self._test_cleanup_ondisk_files(scenarios, POLICIES.default)
# here due to the anomalous scenario as commented above
# scenarios for empty frag_prefs, meaning durable not required
scenarios = [
# specific frag newer than durable is chosen
[('0000000007.00000#2.data', False, True),
('0000000007.00000#1.data', '.data', True),
('0000000007.00000#0.data', False, True),
('0000000006.00000.durable', False, False)],
]
self._test_get_ondisk_files(scenarios, POLICIES.default, frag_index=1,
frag_prefs=[])
self._test_cleanup_ondisk_files(scenarios, POLICIES.default)
def test_cleanup_ondisk_files_reclaim_with_data_files(self): def test_cleanup_ondisk_files_reclaim_with_data_files(self):
# Each scenario specifies a list of (filename, extension, [survives]) # Each scenario specifies a list of (filename, extension, [survives])
@ -1622,8 +1682,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
[('%s.meta' % older, False, False), [('%s.meta' % older, False, False),
('%s.durable' % much_older, False, False)]] ('%s.durable' % much_older, False, False)]]
self._test_cleanup_ondisk_files_files(scenarios, POLICIES.default, self._test_cleanup_ondisk_files(scenarios, POLICIES.default,
reclaim_age=1000) reclaim_age=1000)
def test_get_ondisk_files_with_stray_meta(self): def test_get_ondisk_files_with_stray_meta(self):
# get_ondisk_files ignores a stray .meta file # get_ondisk_files ignores a stray .meta file
@ -4574,6 +4634,233 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
df.open() df.open()
self.assertEqual(ts1, df.durable_timestamp) self.assertEqual(ts1, df.durable_timestamp)
def test_open_with_fragment_preferences(self):
policy = POLICIES.default
df_mgr = self.df_router[policy]
df = df_mgr.get_diskfile(self.existing_device, '0',
'a', 'c', 'o', policy=policy)
ts_1, ts_2, ts_3, ts_4 = (self.ts() for _ in range(4))
# create two durable frags, first with index 0
with df.create() as writer:
data = 'test data'
writer.write(data)
frag_0_metadata = {
'ETag': md5(data).hexdigest(),
'X-Timestamp': ts_1.internal,
'Content-Length': len(data),
'X-Object-Sysmeta-Ec-Frag-Index': 0,
}
writer.put(frag_0_metadata)
writer.commit(ts_1)
# second with index 3
with df.create() as writer:
data = 'test data'
writer.write(data)
frag_3_metadata = {
'ETag': md5(data).hexdigest(),
'X-Timestamp': ts_1.internal,
'Content-Length': len(data),
'X-Object-Sysmeta-Ec-Frag-Index': 3,
}
writer.put(frag_3_metadata)
writer.commit(ts_1)
# sanity check: should have 2 * .data plus a .durable
self.assertEqual(3, len(os.listdir(df._datadir)))
# add some .meta stuff
meta_1_metadata = {
'X-Object-Meta-Foo': 'Bar',
'X-Timestamp': ts_2.internal,
}
df = df_mgr.get_diskfile(self.existing_device, '0',
'a', 'c', 'o', policy=policy)
df.write_metadata(meta_1_metadata)
# sanity check: should have 2 * .data, .durable, .meta
self.assertEqual(4, len(os.listdir(df._datadir)))
# sanity: should get frag index 3
df = df_mgr.get_diskfile(self.existing_device, '0',
'a', 'c', 'o', policy=policy)
expected = dict(frag_3_metadata)
expected.update(meta_1_metadata)
self.assertEqual(expected, df.read_metadata())
# add a newer datafile for frag index 2
df = df_mgr.get_diskfile(self.existing_device, '0',
'a', 'c', 'o', policy=policy)
with df.create() as writer:
data = 'new test data'
writer.write(data)
frag_2_metadata = {
'ETag': md5(data).hexdigest(),
'X-Timestamp': ts_3.internal,
'Content-Length': len(data),
'X-Object-Sysmeta-Ec-Frag-Index': 2,
}
writer.put(frag_2_metadata)
# N.B. don't make it durable - skip call to commit()
# sanity check: should have 2* .data, .durable, .meta, .data
self.assertEqual(5, len(os.listdir(df._datadir)))
# sanity check: with no frag preferences we get old metadata
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
policy=policy)
self.assertEqual(expected, df.read_metadata())
self.assertEqual(ts_2.internal, df.timestamp)
self.assertEqual(ts_1.internal, df.data_timestamp)
self.assertEqual(ts_1.internal, df.durable_timestamp)
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
# with empty frag preferences we get metadata from newer non-durable
# data file
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
policy=policy, frag_prefs=[])
self.assertEqual(frag_2_metadata, df.read_metadata())
self.assertEqual(ts_3.internal, df.timestamp)
self.assertEqual(ts_3.internal, df.data_timestamp)
self.assertEqual(ts_1.internal, df.durable_timestamp)
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
# check we didn't destroy any potentially valid data by opening the
# non-durable data file
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
policy=policy)
self.assertEqual(expected, df.read_metadata())
# now add some newer .meta stuff which should replace older .meta
meta_2_metadata = {
'X-Object-Meta-Foo': 'BarBarBarAnne',
'X-Timestamp': ts_4.internal,
}
df = df_mgr.get_diskfile(self.existing_device, '0',
'a', 'c', 'o', policy=policy)
df.write_metadata(meta_2_metadata)
# sanity check: should have 2 * .data, .durable, .data, .meta
self.assertEqual(5, len(os.listdir(df._datadir)))
# sanity check: with no frag preferences we get newer metadata applied
# to durable data file
expected = dict(frag_3_metadata)
expected.update(meta_2_metadata)
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
policy=policy)
self.assertEqual(expected, df.read_metadata())
self.assertEqual(ts_4.internal, df.timestamp)
self.assertEqual(ts_1.internal, df.data_timestamp)
self.assertEqual(ts_1.internal, df.durable_timestamp)
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
# with empty frag preferences we still get metadata from newer .meta
# but applied to non-durable data file
expected = dict(frag_2_metadata)
expected.update(meta_2_metadata)
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
policy=policy, frag_prefs=[])
self.assertEqual(expected, df.read_metadata())
self.assertEqual(ts_4.internal, df.timestamp)
self.assertEqual(ts_3.internal, df.data_timestamp)
self.assertEqual(ts_1.internal, df.durable_timestamp)
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
# check we didn't destroy any potentially valid data by opening the
# non-durable data file
expected = dict(frag_3_metadata)
expected.update(meta_2_metadata)
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
policy=policy)
self.assertEqual(expected, df.read_metadata())
self.assertEqual(ts_4.internal, df.timestamp)
self.assertEqual(ts_1.internal, df.data_timestamp)
self.assertEqual(ts_1.internal, df.durable_timestamp)
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
# prefer frags at ts_1, exclude no indexes, expect highest frag index
prefs = [{'timestamp': ts_1.internal, 'exclude': []},
{'timestamp': ts_2.internal, 'exclude': []},
{'timestamp': ts_3.internal, 'exclude': []}]
expected = dict(frag_3_metadata)
expected.update(meta_2_metadata)
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
policy=policy, frag_prefs=prefs)
self.assertEqual(expected, df.read_metadata())
self.assertEqual(ts_4.internal, df.timestamp)
self.assertEqual(ts_1.internal, df.data_timestamp)
self.assertEqual(ts_1.internal, df.durable_timestamp)
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
# prefer frags at ts_1, exclude frag index 3 so expect frag index 0
prefs = [{'timestamp': ts_1.internal, 'exclude': [3]},
{'timestamp': ts_2.internal, 'exclude': []},
{'timestamp': ts_3.internal, 'exclude': []}]
expected = dict(frag_0_metadata)
expected.update(meta_2_metadata)
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
policy=policy, frag_prefs=prefs)
self.assertEqual(expected, df.read_metadata())
self.assertEqual(ts_4.internal, df.timestamp)
self.assertEqual(ts_1.internal, df.data_timestamp)
self.assertEqual(ts_1.internal, df.durable_timestamp)
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
# now make ts_3 the preferred timestamp, excluded indexes don't exist
prefs = [{'timestamp': ts_3.internal, 'exclude': [4, 5, 6]},
{'timestamp': ts_2.internal, 'exclude': []},
{'timestamp': ts_1.internal, 'exclude': []}]
expected = dict(frag_2_metadata)
expected.update(meta_2_metadata)
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
policy=policy, frag_prefs=prefs)
self.assertEqual(expected, df.read_metadata())
self.assertEqual(ts_4.internal, df.timestamp)
self.assertEqual(ts_3.internal, df.data_timestamp)
self.assertEqual(ts_1.internal, df.durable_timestamp)
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
# now make ts_2 the preferred timestamp - there are no frags at ts_2,
# next preference is ts_3 but index 2 is excluded, then at ts_1 index 3
# is excluded so we get frag 0 at ts_1
prefs = [{'timestamp': ts_2.internal, 'exclude': [1]},
{'timestamp': ts_3.internal, 'exclude': [2]},
{'timestamp': ts_1.internal, 'exclude': [3]}]
expected = dict(frag_0_metadata)
expected.update(meta_2_metadata)
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
policy=policy, frag_prefs=prefs)
self.assertEqual(expected, df.read_metadata())
self.assertEqual(ts_4.internal, df.timestamp)
self.assertEqual(ts_1.internal, df.data_timestamp)
self.assertEqual(ts_1.internal, df.durable_timestamp)
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
def test_open_with_bad_fragment_preferences(self):
policy = POLICIES.default
df_mgr = self.df_router[policy]
for bad in (
'ouch',
2,
[{'timestamp': '1234.5678', 'excludes': [1]}, {}],
[{'timestamp': 'not a timestamp', 'excludes': [1, 2]}],
[{'timestamp': '1234.5678', 'excludes': [1, -1]}],
[{'timestamp': '1234.5678', 'excludes': 1}],
[{'timestamp': '1234.5678'}],
[{'excludes': [1, 2]}]
):
try:
df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
policy=policy, frag_prefs=bad)
self.fail('Expected DiskFileError for bad frag_prefs: %r'
% bad)
except DiskFileError as e:
self.assertIn('frag_prefs', str(e))
@patch_policies(with_ec_default=True) @patch_policies(with_ec_default=True)
class TestSuffixHashes(unittest.TestCase): class TestSuffixHashes(unittest.TestCase):

View File

@ -233,6 +233,8 @@ class TestObjectController(unittest.TestCase):
'Content-Encoding': 'gzip', 'Content-Encoding': 'gzip',
'X-Backend-Timestamp': post_timestamp, 'X-Backend-Timestamp': post_timestamp,
'X-Timestamp': post_timestamp, 'X-Timestamp': post_timestamp,
'X-Backend-Data-Timestamp': put_timestamp,
'X-Backend-Durable-Timestamp': put_timestamp,
'Last-Modified': strftime( 'Last-Modified': strftime(
'%a, %d %b %Y %H:%M:%S GMT', '%a, %d %b %Y %H:%M:%S GMT',
gmtime(math.ceil(float(post_timestamp)))), gmtime(math.ceil(float(post_timestamp)))),
@ -266,6 +268,8 @@ class TestObjectController(unittest.TestCase):
'X-Object-Sysmeta-Color': 'blue', 'X-Object-Sysmeta-Color': 'blue',
'X-Backend-Timestamp': post_timestamp, 'X-Backend-Timestamp': post_timestamp,
'X-Timestamp': post_timestamp, 'X-Timestamp': post_timestamp,
'X-Backend-Data-Timestamp': put_timestamp,
'X-Backend-Durable-Timestamp': put_timestamp,
'Last-Modified': strftime( 'Last-Modified': strftime(
'%a, %d %b %Y %H:%M:%S GMT', '%a, %d %b %Y %H:%M:%S GMT',
gmtime(math.ceil(float(post_timestamp)))), gmtime(math.ceil(float(post_timestamp)))),
@ -308,6 +312,8 @@ class TestObjectController(unittest.TestCase):
'X-Static-Large-Object': 'True', 'X-Static-Large-Object': 'True',
'X-Backend-Timestamp': put_timestamp, 'X-Backend-Timestamp': put_timestamp,
'X-Timestamp': put_timestamp, 'X-Timestamp': put_timestamp,
'X-Backend-Data-Timestamp': put_timestamp,
'X-Backend-Durable-Timestamp': put_timestamp,
'Last-Modified': strftime( 'Last-Modified': strftime(
'%a, %d %b %Y %H:%M:%S GMT', '%a, %d %b %Y %H:%M:%S GMT',
gmtime(math.ceil(float(put_timestamp)))), gmtime(math.ceil(float(put_timestamp)))),
@ -338,6 +344,8 @@ class TestObjectController(unittest.TestCase):
'X-Static-Large-Object': 'True', 'X-Static-Large-Object': 'True',
'X-Backend-Timestamp': post_timestamp, 'X-Backend-Timestamp': post_timestamp,
'X-Timestamp': post_timestamp, 'X-Timestamp': post_timestamp,
'X-Backend-Data-Timestamp': put_timestamp,
'X-Backend-Durable-Timestamp': put_timestamp,
'Last-Modified': strftime( 'Last-Modified': strftime(
'%a, %d %b %Y %H:%M:%S GMT', '%a, %d %b %Y %H:%M:%S GMT',
gmtime(math.ceil(float(post_timestamp)))), gmtime(math.ceil(float(post_timestamp)))),
@ -368,6 +376,8 @@ class TestObjectController(unittest.TestCase):
'X-Static-Large-Object': 'True', 'X-Static-Large-Object': 'True',
'X-Backend-Timestamp': post_timestamp, 'X-Backend-Timestamp': post_timestamp,
'X-Timestamp': post_timestamp, 'X-Timestamp': post_timestamp,
'X-Backend-Data-Timestamp': put_timestamp,
'X-Backend-Durable-Timestamp': put_timestamp,
'Last-Modified': strftime( 'Last-Modified': strftime(
'%a, %d %b %Y %H:%M:%S GMT', '%a, %d %b %Y %H:%M:%S GMT',
gmtime(math.ceil(float(post_timestamp)))), gmtime(math.ceil(float(post_timestamp)))),
@ -3185,6 +3195,238 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller) resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 412) self.assertEqual(resp.status_int, 412)
def _create_ondisk_fragments(self, policy):
# Create some on disk files...
ts_iter = make_timestamp_iter()
# PUT at ts_0
ts_0 = next(ts_iter)
headers = {'X-Timestamp': ts_0.internal,
'Content-Length': '5',
'Content-Type': 'application/octet-stream',
'X-Backend-Storage-Policy-Index': int(policy)}
if policy.policy_type == EC_POLICY:
headers['X-Object-Sysmeta-Ec-Frag-Index'] = '0'
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers=headers)
req.body = 'OLDER'
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
# POST at ts_1
ts_1 = next(ts_iter)
headers = {'X-Timestamp': ts_1.internal,
'X-Backend-Storage-Policy-Index': int(policy)}
headers['X-Object-Meta-Test'] = 'abc'
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers=headers)
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 202)
# PUT again at ts_2 but without a .durable file
ts_2 = next(ts_iter)
headers = {'X-Timestamp': ts_2.internal,
'Content-Length': '5',
'Content-Type': 'application/octet-stream',
'X-Backend-Storage-Policy-Index': int(policy)}
if policy.policy_type == EC_POLICY:
headers['X-Object-Sysmeta-Ec-Frag-Index'] = '2'
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers=headers)
req.body = 'NEWER'
# patch the commit method to do nothing so EC object gets
# no .durable file
with mock.patch('swift.obj.diskfile.ECDiskFileWriter.commit'):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
return ts_0, ts_1, ts_2
def test_GET_HEAD_with_fragment_preferences(self):
for policy in POLICIES:
ts_0, ts_1, ts_2 = self._create_ondisk_fragments(policy)
backend_frags = json.dumps({ts_0.internal: [0],
ts_2.internal: [2]})
def _assert_frag_0_at_ts_0(resp):
expect = {
'X-Timestamp': ts_1.normal,
'X-Backend-Timestamp': ts_1.internal,
'X-Backend-Data-Timestamp': ts_0.internal,
'X-Backend-Durable-Timestamp': ts_0.internal,
'X-Backend-Fragments': backend_frags,
'X-Object-Sysmeta-Ec-Frag-Index': '0',
'X-Object-Meta-Test': 'abc'}
self.assertDictContainsSubset(expect, resp.headers)
def _assert_repl_data_at_ts_2():
self.assertIn(resp.status_int, (200, 202))
expect = {
'X-Timestamp': ts_2.normal,
'X-Backend-Timestamp': ts_2.internal,
'X-Backend-Data-Timestamp': ts_2.internal,
'X-Backend-Durable-Timestamp': ts_2.internal}
self.assertDictContainsSubset(expect, resp.headers)
self.assertNotIn('X-Object-Meta-Test', resp.headers)
# Sanity check: Request with no preferences should default to the
# durable frag
headers = {'X-Backend-Storage-Policy-Index': int(policy)}
req = Request.blank('/sda1/p/a/c/o', headers=headers,
environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.object_controller)
if policy.policy_type == EC_POLICY:
_assert_frag_0_at_ts_0(resp)
self.assertEqual(resp.body, 'OLDER')
else:
_assert_repl_data_at_ts_2()
self.assertEqual(resp.body, 'NEWER')
req = Request.blank('/sda1/p/a/c/o', headers=headers,
environ={'REQUEST_METHOD': 'HEAD'})
resp = req.get_response(self.object_controller)
if policy.policy_type == EC_POLICY:
_assert_frag_0_at_ts_0(resp)
else:
_assert_repl_data_at_ts_2()
# Request with preferences can select the older frag
prefs = json.dumps(
[{'timestamp': ts_0.internal, 'exclude': [1, 3]}])
headers = {'X-Backend-Storage-Policy-Index': int(policy),
'X-Backend-Fragment-Preferences': prefs}
req = Request.blank('/sda1/p/a/c/o', headers=headers,
environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.object_controller)
if policy.policy_type == EC_POLICY:
_assert_frag_0_at_ts_0(resp)
self.assertEqual(resp.body, 'OLDER')
else:
_assert_repl_data_at_ts_2()
self.assertEqual(resp.body, 'NEWER')
req = Request.blank('/sda1/p/a/c/o', headers=headers,
environ={'REQUEST_METHOD': 'HEAD'})
resp = req.get_response(self.object_controller)
if policy.policy_type == EC_POLICY:
_assert_frag_0_at_ts_0(resp)
else:
_assert_repl_data_at_ts_2()
def _assert_frag_2_at_ts_2(resp):
self.assertIn(resp.status_int, (200, 202))
# do not expect meta file to be included since it is older
expect = {
'X-Timestamp': ts_2.normal,
'X-Backend-Timestamp': ts_2.internal,
'X-Backend-Data-Timestamp': ts_2.internal,
'X-Backend-Durable-Timestamp': ts_0.internal,
'X-Backend-Fragments': backend_frags,
'X-Object-Sysmeta-Ec-Frag-Index': '2'}
self.assertDictContainsSubset(expect, resp.headers)
self.assertNotIn('X-Object-Meta-Test', resp.headers)
# Request with preferences can select the newer non-durable frag
prefs = json.dumps(
[{'timestamp': ts_2.internal, 'exclude': [1, 3]}])
headers = {'X-Backend-Storage-Policy-Index': int(policy),
'X-Backend-Fragment-Preferences': prefs}
req = Request.blank('/sda1/p/a/c/o', headers=headers,
environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.object_controller)
if policy.policy_type == EC_POLICY:
_assert_frag_2_at_ts_2(resp)
else:
_assert_repl_data_at_ts_2()
self.assertEqual(resp.body, 'NEWER')
req = Request.blank('/sda1/p/a/c/o', headers=headers,
environ={'REQUEST_METHOD': 'HEAD'})
resp = req.get_response(self.object_controller)
if policy.policy_type == EC_POLICY:
_assert_frag_2_at_ts_2(resp)
else:
_assert_repl_data_at_ts_2()
# Request with preference for ts_0 but excludes index 0 will
# default to newest frag
prefs = json.dumps(
[{'timestamp': ts_0.internal, 'exclude': [0]}])
headers = {'X-Backend-Storage-Policy-Index': int(policy),
'X-Backend-Fragment-Preferences': prefs}
req = Request.blank('/sda1/p/a/c/o', headers=headers,
environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.object_controller)
if policy.policy_type == EC_POLICY:
_assert_frag_2_at_ts_2(resp)
else:
_assert_repl_data_at_ts_2()
self.assertEqual(resp.body, 'NEWER')
req = Request.blank('/sda1/p/a/c/o', headers=headers,
environ={'REQUEST_METHOD': 'HEAD'})
resp = req.get_response(self.object_controller)
if policy.policy_type == EC_POLICY:
_assert_frag_2_at_ts_2(resp)
else:
_assert_repl_data_at_ts_2()
# Request with preferences that exclude all frags get nothing
prefs = json.dumps(
[{'timestamp': ts_0.internal, 'exclude': [0]},
{'timestamp': ts_2.internal, 'exclude': [2]}])
headers = {'X-Backend-Storage-Policy-Index': int(policy),
'X-Backend-Fragment-Preferences': prefs}
req = Request.blank('/sda1/p/a/c/o', headers=headers,
environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.object_controller)
if policy.policy_type == EC_POLICY:
self.assertEqual(resp.status_int, 404)
else:
_assert_repl_data_at_ts_2()
self.assertEqual(resp.body, 'NEWER')
req = Request.blank('/sda1/p/a/c/o', headers=headers,
environ={'REQUEST_METHOD': 'HEAD'})
resp = req.get_response(self.object_controller)
if policy.policy_type == EC_POLICY:
self.assertEqual(resp.status_int, 404)
else:
_assert_repl_data_at_ts_2()
# Request with empty preferences will get non-durable
prefs = json.dumps([])
headers = {'X-Backend-Storage-Policy-Index': int(policy),
'X-Backend-Fragment-Preferences': prefs}
req = Request.blank('/sda1/p/a/c/o', headers=headers,
environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.object_controller)
if policy.policy_type == EC_POLICY:
_assert_frag_2_at_ts_2(resp)
else:
_assert_repl_data_at_ts_2()
self.assertEqual(resp.body, 'NEWER')
req = Request.blank('/sda1/p/a/c/o', headers=headers,
environ={'REQUEST_METHOD': 'HEAD'})
resp = req.get_response(self.object_controller)
if policy.policy_type == EC_POLICY:
_assert_frag_2_at_ts_2(resp)
else:
_assert_repl_data_at_ts_2()
def test_GET_quarantine(self): def test_GET_quarantine(self):
# Test swift.obj.server.ObjectController.GET # Test swift.obj.server.ObjectController.GET
timestamp = normalize_timestamp(time()) timestamp = normalize_timestamp(time())

File diff suppressed because it is too large Load Diff

View File

@ -24,11 +24,12 @@ import sys
import traceback import traceback
import unittest import unittest
from contextlib import contextmanager from contextlib import contextmanager
from shutil import rmtree, copyfile from shutil import rmtree, copyfile, move
import gc import gc
import time import time
from textwrap import dedent from textwrap import dedent
from hashlib import md5 from hashlib import md5
import collections
from pyeclib.ec_iface import ECDriverError from pyeclib.ec_iface import ECDriverError
from tempfile import mkdtemp, NamedTemporaryFile from tempfile import mkdtemp, NamedTemporaryFile
import weakref import weakref
@ -55,7 +56,7 @@ from swift.common.utils import hash_path, storage_directory, \
from test.unit import ( from test.unit import (
connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing, connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing,
FakeMemcache, debug_logger, patch_policies, write_fake_ring, FakeMemcache, debug_logger, patch_policies, write_fake_ring,
mocked_http_conn, DEFAULT_TEST_EC_TYPE) mocked_http_conn, DEFAULT_TEST_EC_TYPE, make_timestamp_iter)
from swift.proxy import server as proxy_server from swift.proxy import server as proxy_server
from swift.proxy.controllers.obj import ReplicatedObjectController from swift.proxy.controllers.obj import ReplicatedObjectController
from swift.obj import server as object_server from swift.obj import server as object_server
@ -5595,6 +5596,255 @@ class TestECMismatchedFA(unittest.TestCase):
self.assertEqual(resp.status_int, 503) self.assertEqual(resp.status_int, 503)
class TestECGets(unittest.TestCase):
def tearDown(self):
prosrv = _test_servers[0]
# don't leak error limits and poison other tests
prosrv._error_limiting = {}
def _setup_nodes_and_do_GET(self, objs, node_state):
"""
A helper method that creates object fragments, stashes them in temp
dirs, and then moves selected fragments back into the hash_dirs on each
node according to a specified desired node state description.
:param objs: a dict that maps object references to dicts that describe
the object timestamp and content. Object frags will be
created for each item in this dict.
:param node_state: a dict that maps a node index to the desired state
for that node. Each desired state is a list of
dicts, with each dict describing object reference,
frag_index and file extensions to be moved to the
node's hash_dir.
"""
(prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv,
obj2srv, obj3srv) = _test_servers
ec_policy = POLICIES[3]
container_name = uuid.uuid4().hex
obj_name = uuid.uuid4().hex
obj_path = os.path.join(os.sep, 'v1', 'a', container_name, obj_name)
# PUT container, make sure it worked
container_path = os.path.join(os.sep, 'v1', 'a', container_name)
ec_container = Request.blank(
container_path, environ={"REQUEST_METHOD": "PUT"},
headers={"X-Storage-Policy": "ec", "X-Auth-Token": "t"})
resp = ec_container.get_response(prosrv)
self.assertIn(resp.status_int, (201, 202))
partition, nodes = \
ec_policy.object_ring.get_nodes('a', container_name, obj_name)
# map nodes to hash dirs
node_hash_dirs = {}
node_tmp_dirs = collections.defaultdict(dict)
for node in nodes:
node_hash_dirs[node['index']] = os.path.join(
_testdir, node['device'], storage_directory(
diskfile.get_data_dir(ec_policy),
partition, hash_path('a', container_name, obj_name)))
def _put_object(ref, timestamp, body):
# PUT an object and then move its disk files to a temp dir
headers = {"X-Timestamp": timestamp.internal}
put_req1 = Request.blank(obj_path, method='PUT', headers=headers)
put_req1.body = body
resp = put_req1.get_response(prosrv)
self.assertEqual(resp.status_int, 201)
# GET the obj, should work fine
get_req = Request.blank(obj_path, method="GET")
resp = get_req.get_response(prosrv)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, body)
# move all hash dir files to per-node, per-obj tempdir
for node_index, hash_dir in node_hash_dirs.items():
node_tmp_dirs[node_index][ref] = mkdtemp()
for f in os.listdir(hash_dir):
move(os.path.join(hash_dir, f),
os.path.join(node_tmp_dirs[node_index][ref], f))
for obj_ref, obj_info in objs.items():
_put_object(obj_ref, **obj_info)
# sanity check - all hash_dirs are empty and GET returns a 404
for hash_dir in node_hash_dirs.values():
self.assertFalse(os.listdir(hash_dir))
get_req = Request.blank(obj_path, method="GET")
resp = get_req.get_response(prosrv)
self.assertEqual(resp.status_int, 404)
# node state is in form:
# {node_index: [{ref: object reference,
# frag_index: index,
# exts: ['.data' etc]}, ...],
# node_index: ...}
for node_index, state in node_state.items():
dest = node_hash_dirs[node_index]
for frag_info in state:
src = node_tmp_dirs[frag_info['frag_index']][frag_info['ref']]
src_files = [f for f in os.listdir(src)
if f.endswith(frag_info['exts'])]
self.assertEqual(len(frag_info['exts']), len(src_files),
'Bad test setup for node %s, obj %s'
% (node_index, frag_info['ref']))
for f in src_files:
move(os.path.join(src, f), os.path.join(dest, f))
# do an object GET
get_req = Request.blank(obj_path, method='GET')
return get_req.get_response(prosrv)
def test_GET_with_missing_durables(self):
# verify object GET behavior when durable files are missing
ts_iter = make_timestamp_iter()
objs = {'obj1': dict(timestamp=next(ts_iter), body='body')}
# durable missing from 2/3 nodes
node_state = {
0: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))],
1: [dict(ref='obj1', frag_index=1, exts=('.data',))],
2: [dict(ref='obj1', frag_index=2, exts=('.data',))]
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, objs['obj1']['body'])
# all files missing on 1 node, durable missing from 1/2 other nodes
# durable missing from 2/3 nodes
node_state = {
0: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))],
1: [],
2: [dict(ref='obj1', frag_index=2, exts=('.data',))]
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, objs['obj1']['body'])
# durable missing from all 3 nodes
node_state = {
0: [dict(ref='obj1', frag_index=0, exts=('.data',))],
1: [dict(ref='obj1', frag_index=1, exts=('.data',))],
2: [dict(ref='obj1', frag_index=2, exts=('.data',))]
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
self.assertEqual(resp.status_int, 503)
def test_GET_with_multiple_frags_per_node(self):
# verify object GET behavior when multiple fragments are on same node
ts_iter = make_timestamp_iter()
objs = {'obj1': dict(timestamp=next(ts_iter), body='body')}
# scenario: only two frags, both on same node
node_state = {
0: [],
1: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable')),
dict(ref='obj1', frag_index=1, exts=('.data',))],
2: []
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, objs['obj1']['body'])
# scenario: all 3 frags on same node
node_state = {
0: [],
1: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable')),
dict(ref='obj1', frag_index=1, exts=('.data',)),
dict(ref='obj1', frag_index=2, exts=('.data',))],
2: []
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, objs['obj1']['body'])
def test_GET_with_multiple_timestamps_on_nodes(self):
ts_iter = make_timestamp_iter()
ts_1, ts_2, ts_3 = [next(ts_iter) for _ in range(3)]
objs = {'obj1': dict(timestamp=ts_1, body='body1'),
'obj2': dict(timestamp=ts_2, body='body2'),
'obj3': dict(timestamp=ts_3, body='body3')}
# newer non-durable frags do not prevent proxy getting the durable obj1
node_state = {
0: [dict(ref='obj3', frag_index=0, exts=('.data',)),
dict(ref='obj2', frag_index=0, exts=('.data',)),
dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))],
1: [dict(ref='obj3', frag_index=1, exts=('.data',)),
dict(ref='obj2', frag_index=1, exts=('.data',)),
dict(ref='obj1', frag_index=1, exts=('.data', '.durable'))],
2: [dict(ref='obj3', frag_index=2, exts=('.data',)),
dict(ref='obj2', frag_index=2, exts=('.data',)),
dict(ref='obj1', frag_index=2, exts=('.data', '.durable'))],
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, objs['obj1']['body'])
# .durables at two timestamps: in this scenario proxy is guaranteed
# to see the durable at ts_2 with one of the first 2 responses, so will
# then prefer that when requesting from third obj server
node_state = {
0: [dict(ref='obj3', frag_index=0, exts=('.data',)),
dict(ref='obj2', frag_index=0, exts=('.data',)),
dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))],
1: [dict(ref='obj3', frag_index=1, exts=('.data',)),
dict(ref='obj2', frag_index=1, exts=('.data', '.durable'))],
2: [dict(ref='obj3', frag_index=2, exts=('.data',)),
dict(ref='obj2', frag_index=2, exts=('.data', '.durable'))],
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, objs['obj2']['body'])
def test_GET_with_same_frag_index_on_multiple_nodes(self):
ts_iter = make_timestamp_iter()
# this is a trick to be able to get identical frags placed onto
# multiple nodes: since we cannot *copy* frags, we generate three sets
# of identical frags at same timestamp so we have enough to *move*
ts_1 = next(ts_iter)
objs = {'obj1a': dict(timestamp=ts_1, body='body'),
'obj1b': dict(timestamp=ts_1, body='body'),
'obj1c': dict(timestamp=ts_1, body='body')}
# arrange for duplicate frag indexes across nodes: because the object
# server prefers the highest available frag index, proxy will first get
# back two responses with frag index 1, and will then return to node 0
# for frag_index 0.
node_state = {
0: [dict(ref='obj1a', frag_index=0, exts=('.data',)),
dict(ref='obj1a', frag_index=1, exts=('.data',))],
1: [dict(ref='obj1b', frag_index=1, exts=('.data', '.durable'))],
2: [dict(ref='obj1c', frag_index=1, exts=('.data', '.durable'))]
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, objs['obj1a']['body'])
# if all we have across nodes are frags with same index then expect a
# 404 (the third, 'extra', obj server GET will return 404 because it
# will be sent frag prefs that exclude frag_index 1)
node_state = {
0: [dict(ref='obj1a', frag_index=1, exts=('.data',))],
1: [dict(ref='obj1b', frag_index=1, exts=('.data', '.durable'))],
2: [dict(ref='obj1c', frag_index=1, exts=('.data',))]
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
self.assertEqual(resp.status_int, 404)
class TestObjectDisconnectCleanup(unittest.TestCase): class TestObjectDisconnectCleanup(unittest.TestCase):
# update this if you need to make more different devices in do_setup # update this if you need to make more different devices in do_setup