diff --git a/doc/source/overview_erasure_code.rst b/doc/source/overview_erasure_code.rst index 68c3d89a5d..cdb476fcf0 100644 --- a/doc/source/overview_erasure_code.rst +++ b/doc/source/overview_erasure_code.rst @@ -461,13 +461,9 @@ A few key points on the .durable file: * The .durable file means \"the matching .data file for this has sufficient fragment archives somewhere, committed, to reconstruct the object\". -* The Proxy Server will never have knowledge, either on GET or HEAD, of the - existence of a .data file on an object server if it does not have a matching - .durable file. -* The object server will never return a .data that does not have a matching - .durable. -* When a proxy does a GET, it will only receive fragment archives that have - enough present somewhere to be reconstructed. +* When a proxy does a GET, it will require at least one object server to + respond with a fragment archive that has a matching `.durable` file before + reconstructing and returning the object to the client. Partial PUT Failures ==================== @@ -500,17 +496,39 @@ The GET for EC is different enough from replication that subclassing the `BaseObjectController` to the `ECObjectController` enables an efficient way to implement the high level steps described earlier: -#. The proxy server makes simultaneous requests to participating nodes. -#. As soon as the proxy has the fragments it needs, it calls on PyECLib to - decode the data. +#. The proxy server makes simultaneous requests to `ec_ndata` primary object + server nodes with goal of finding a set of `ec_ndata` distinct EC archives + at the same timestamp, and an indication from at least one object server + that a `.durable` file exists for that timestamp. If this goal is + not achieved with the first `ec_ndata` requests then the proxy server + continues to issue requests to the remaining primary nodes and then handoff + nodes. +#. As soon as the proxy server has found a usable set of `ec_ndata` EC + archives, it starts to call PyECLib to decode fragments as they are returned + by the object server nodes. +#. The proxy server creates Etag and content length headers for the client + response since each EC archive's metadata is valid only for that archive. #. The proxy streams the decoded data it has back to the client. -#. Repeat until the proxy is done sending data back to the client. -The GET path will attempt to contact all nodes participating in the EC scheme, -if not enough primaries respond then handoffs will be contacted just as with -replication. Etag and content length headers are updated for the client -response following reconstruction as the individual fragment archives metadata -is valid only for that fragment archive. +Note that the proxy does not require all objects servers to have a `.durable` +file for the EC archive that they return in response to a GET. The proxy +will be satisfied if just one object server has a `.durable` file at the same +timestamp as EC archives returned from other object servers. This means +that the proxy can successfully GET an object that had missing `.durable` files +when it was PUT (i.e. a partial PUT failure occurred). + +Note also that an object server may inform the proxy server that it has more +than one EC archive for different timestamps and/or fragment indexes, which may +cause the proxy server to issue multiple requests for distinct EC archives to +that object server. (This situation can temporarily occur after a ring +rebalance when a handoff node storing an archive has become a primary node and +received its primary archive but not yet moved the handoff archive to its +primary node.) + +The proxy may receive EC archives having different timestamps, and may +receive several EC archives having the same index. The proxy therefore +ensures that it has sufficient EC archives with the same timestamp +and distinct fragment indexes before considering a GET to be successful. Object Server ------------- diff --git a/swift/common/utils.py b/swift/common/utils.py index 458ce418b0..dad6448e0a 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -4089,3 +4089,12 @@ def o_tmpfile_supported(): return all([linkat.available, platform.system() == 'Linux', LooseVersion(platform.release()) >= LooseVersion('3.16')]) + + +def safe_json_loads(value): + if value: + try: + return json.loads(value) + except (TypeError, ValueError): + pass + return None diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 43bdaa2dea..776ede0d91 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -827,15 +827,20 @@ class BaseDiskFileManager(object): self._process_ondisk_files(exts, results, **kwargs) # set final choice of files - if exts.get('.ts'): + if 'data_info' in results: + if exts.get('.meta'): + # only report a meta file if a data file has been chosen + results['meta_info'] = exts['.meta'][0] + ctype_info = exts['.meta'].pop() + if (ctype_info['ctype_timestamp'] + > results['data_info']['timestamp']): + results['ctype_info'] = ctype_info + elif exts.get('.ts'): + # only report a ts file if a data file has not been chosen + # (ts files will commonly already have been removed from exts if + # a data file was chosen, but that may not be the case if + # non-durable EC fragment(s) were chosen, hence the elif here) results['ts_info'] = exts['.ts'][0] - if 'data_info' in results and exts.get('.meta'): - # only report a meta file if a data file has been chosen - results['meta_info'] = exts['.meta'][0] - ctype_info = exts['.meta'].pop() - if (ctype_info['ctype_timestamp'] - > results['data_info']['timestamp']): - results['ctype_info'] = ctype_info # set ts_file, data_file, meta_file and ctype_file with path to # chosen file or None @@ -2635,6 +2640,41 @@ class ECDiskFile(BaseDiskFile): self._frag_index = None if frag_index is not None: self._frag_index = self.manager.validate_fragment_index(frag_index) + self._frag_prefs = self._validate_frag_prefs(kwargs.get('frag_prefs')) + self._durable_frag_set = None + + def _validate_frag_prefs(self, frag_prefs): + """ + Validate that frag_prefs is a list of dicts containing expected keys + 'timestamp' and 'exclude'. Convert timestamp values to Timestamp + instances and validate that exclude values are valid fragment indexes. + + :param frag_prefs: data to validate, should be a list of dicts. + :raise DiskFileError: if the frag_prefs data is invalid. + :return: a list of dicts with converted and validated values. + """ + # We *do* want to preserve an empty frag_prefs list because it + # indicates that a durable file is not required. + if frag_prefs is None: + return None + + try: + return [ + {'timestamp': Timestamp(pref['timestamp']), + 'exclude': [self.manager.validate_fragment_index(fi) + for fi in pref['exclude']]} + for pref in frag_prefs] + except ValueError as e: + raise DiskFileError( + 'Bad timestamp in frag_prefs: %r: %s' + % (frag_prefs, e)) + except DiskFileError as e: + raise DiskFileError( + 'Bad fragment index in frag_prefs: %r: %s' + % (frag_prefs, e)) + except (KeyError, TypeError) as e: + raise DiskFileError( + 'Bad frag_prefs: %r: %s' % (frag_prefs, e)) @property def durable_timestamp(self): @@ -2671,13 +2711,14 @@ class ECDiskFile(BaseDiskFile): def _get_ondisk_files(self, files): """ The only difference between this method and the replication policy - DiskFile method is passing in the frag_index kwarg to our manager's - get_ondisk_files method. + DiskFile method is passing in the frag_index and frag_prefs kwargs to + our manager's get_ondisk_files method. :param files: list of file names """ self._ondisk_info = self.manager.get_ondisk_files( - files, self._datadir, frag_index=self._frag_index) + files, self._datadir, frag_index=self._frag_index, + frag_prefs=self._frag_prefs) return self._ondisk_info def purge(self, timestamp, frag_index): @@ -2804,14 +2845,49 @@ class ECDiskFileManager(BaseDiskFileManager): rv['frag_index'] = None return rv - def _process_ondisk_files(self, exts, results, frag_index=None, **kwargs): + def _process_ondisk_files(self, exts, results, frag_index=None, + frag_prefs=None, **kwargs): """ Implement EC policy specific handling of .data and .durable files. + If a frag_prefs keyword arg is provided then its value may determine + which fragment index at which timestamp is used to construct the + diskfile. The value of frag_prefs should be a list. Each item in the + frag_prefs list should be a dict that describes per-timestamp + preferences using the following items: + + * timestamp: An instance of :class:`~swift.common.utils.Timestamp`. + * exclude: A list of valid fragment indexes (i.e. whole numbers) + that should be EXCLUDED when choosing a fragment at the + timestamp. This list may be empty. + + For example:: + + [ + {'timestamp': , 'exclude': [1,3]}, + {'timestamp': , 'exclude': []} + ] + + The order of per-timestamp dicts in the frag_prefs list is significant + and indicates descending preference for fragments from each timestamp + i.e. a fragment that satisfies the first per-timestamp preference in + the frag_prefs will be preferred over a fragment that satisfies a + subsequent per-timestamp preferred, and so on. + + If a timestamp is not cited in any per-timestamp preference dict then + it is assumed that any fragment index at that timestamp may be used to + construct the diskfile. + + When a frag_prefs arg is provided, including an empty list, there is no + requirement for there to be a durable file at the same timestamp as a + data file that is chosen to construct the disk file + :param exts: dict of lists of file info, keyed by extension :param results: a dict that may be updated with results :param frag_index: if set, search for a specific fragment index .data file, otherwise accept the first valid .data file. + :param frag_prefs: if set, search for any fragment index .data file + that satisfies the frag_prefs. """ durable_info = None if exts.get('.durable'): @@ -2841,23 +2917,66 @@ class ECDiskFileManager(BaseDiskFileManager): if durable_info and durable_info['timestamp'] == timestamp: durable_frag_set = frag_set + # Choose which frag set to use + chosen_frag_set = None + if frag_prefs is not None: + candidate_frag_sets = dict(frag_sets) + # For each per-timestamp frag preference dict, do we have any frag + # indexes at that timestamp that are not in the exclusion list for + # that timestamp? If so choose the highest of those frag_indexes. + for ts, exclude_indexes in [ + (ts_pref['timestamp'], ts_pref['exclude']) + for ts_pref in frag_prefs + if ts_pref['timestamp'] in candidate_frag_sets]: + available_indexes = [info['frag_index'] + for info in candidate_frag_sets[ts]] + acceptable_indexes = list(set(available_indexes) - + set(exclude_indexes)) + if acceptable_indexes: + chosen_frag_set = candidate_frag_sets[ts] + # override any frag_index passed in as method param with + # the last (highest) acceptable_index + frag_index = acceptable_indexes[-1] + break + else: + # this frag_set has no acceptable frag index so + # remove it from the candidate frag_sets + candidate_frag_sets.pop(ts) + else: + # No acceptable frag index was found at any timestamp mentioned + # in the frag_prefs. Choose the newest remaining candidate + # frag_set - the proxy can decide if it wants the returned + # fragment with that time. + if candidate_frag_sets: + ts_newest = sorted(candidate_frag_sets.keys())[-1] + chosen_frag_set = candidate_frag_sets[ts_newest] + else: + chosen_frag_set = durable_frag_set + # Select a single chosen frag from the chosen frag_set, by either # matching against a specified frag_index or taking the highest index. chosen_frag = None - if durable_frag_set: + if chosen_frag_set: if frag_index is not None: # search the frag set to find the exact frag_index - for info in durable_frag_set: + for info in chosen_frag_set: if info['frag_index'] == frag_index: chosen_frag = info break else: - chosen_frag = durable_frag_set[-1] + chosen_frag = chosen_frag_set[-1] # If we successfully found a frag then set results if chosen_frag: results['data_info'] = chosen_frag results['durable_frag_set'] = durable_frag_set + results['chosen_frag_set'] = chosen_frag_set + if chosen_frag_set != durable_frag_set: + # hide meta files older than data file but newer than durable + # file so they don't get marked as obsolete (we already threw + # out .meta's that are older than a .durable) + exts['.meta'], _older = self._split_gt_timestamp( + exts['.meta'], chosen_frag['timestamp']) results['frag_sets'] = frag_sets # Mark any isolated .durable as obsolete @@ -2867,7 +2986,7 @@ class ECDiskFileManager(BaseDiskFileManager): # Fragments *may* be ready for reclaim, unless they are durable for frag_set in frag_sets.values(): - if frag_set == durable_frag_set: + if frag_set in (durable_frag_set, chosen_frag_set): continue results.setdefault('possible_reclaim', []).extend(frag_set) @@ -2876,19 +2995,24 @@ class ECDiskFileManager(BaseDiskFileManager): results.setdefault('possible_reclaim', []).extend( exts.get('.meta')) - def _verify_ondisk_files(self, results, frag_index=None, **kwargs): + def _verify_ondisk_files(self, results, frag_index=None, + frag_prefs=None, **kwargs): """ Verify that the final combination of on disk files complies with the erasure-coded diskfile contract. :param results: files that have been found and accepted :param frag_index: specifies a specific fragment index .data file + :param frag_prefs: if set, indicates that fragment preferences have + been specified and therefore that a selected fragment is not + required to be durable. :returns: True if the file combination is compliant, False otherwise """ if super(ECDiskFileManager, self)._verify_ondisk_files( results, **kwargs): have_data_file = results['data_file'] is not None - have_durable = results.get('durable_frag_set') is not None + have_durable = (results.get('durable_frag_set') is not None or + (have_data_file and frag_prefs is not None)) return have_data_file == have_durable return False diff --git a/swift/obj/server.py b/swift/obj/server.py index 55aa3eb9cc..bf911711b8 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -33,7 +33,7 @@ from swift.common.utils import public, get_logger, \ config_true_value, timing_stats, replication, \ normalize_delete_at_timestamp, get_log_line, Timestamp, \ get_expirer_container, parse_mime_headers, \ - iter_multipart_mime_documents, extract_swift_bytes + iter_multipart_mime_documents, extract_swift_bytes, safe_json_loads from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_object_creation, \ valid_timestamp, check_utf8 @@ -84,6 +84,15 @@ def drain(file_like, read_size, timeout): break +def _make_backend_fragments_header(fragments): + if fragments: + result = {} + for ts, frag_list in fragments.items(): + result[ts.internal] = frag_list + return json.dumps(result) + return None + + class EventletPlungerString(str): """ Eventlet won't send headers until it's accumulated at least @@ -853,10 +862,12 @@ class ObjectController(BaseStorageServer): """Handle HTTP GET requests for the Swift Object Server.""" device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) + frag_prefs = safe_json_loads( + request.headers.get('X-Backend-Fragment-Preferences')) try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy=policy) + policy=policy, frag_prefs=frag_prefs) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -889,6 +900,13 @@ class ObjectController(BaseStorageServer): pass response.headers['X-Timestamp'] = file_x_ts.normal response.headers['X-Backend-Timestamp'] = file_x_ts.internal + response.headers['X-Backend-Data-Timestamp'] = \ + disk_file.data_timestamp.internal + if disk_file.durable_timestamp: + response.headers['X-Backend-Durable-Timestamp'] = \ + disk_file.durable_timestamp.internal + response.headers['X-Backend-Fragments'] = \ + _make_backend_fragments_header(disk_file.fragments) resp = request.get_response(response) except DiskFileXattrNotSupported: return HTTPInsufficientStorage(drive=device, request=request) @@ -906,10 +924,12 @@ class ObjectController(BaseStorageServer): """Handle HTTP HEAD requests for the Swift Object Server.""" device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) + frag_prefs = safe_json_loads( + request.headers.get('X-Backend-Fragment-Preferences')) try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy=policy) + policy=policy, frag_prefs=frag_prefs) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -938,6 +958,13 @@ class ObjectController(BaseStorageServer): # Needed for container sync feature response.headers['X-Timestamp'] = ts.normal response.headers['X-Backend-Timestamp'] = ts.internal + response.headers['X-Backend-Data-Timestamp'] = \ + disk_file.data_timestamp.internal + if disk_file.durable_timestamp: + response.headers['X-Backend-Durable-Timestamp'] = \ + disk_file.durable_timestamp.internal + response.headers['X-Backend-Fragments'] = \ + _make_backend_fragments_header(disk_file.fragments) response.content_length = int(metadata['Content-Length']) try: response.content_encoding = metadata['Content-Encoding'] diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 2bc1ef2718..080d726aa7 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -729,7 +729,7 @@ def bytes_to_skip(record_size, range_start): class ResumingGetter(object): def __init__(self, app, req, server_type, node_iter, partition, path, backend_headers, concurrency=1, client_chunk_size=None, - newest=None): + newest=None, header_provider=None): self.app = app self.node_iter = node_iter self.server_type = server_type @@ -742,6 +742,8 @@ class ResumingGetter(object): self.used_nodes = [] self.used_source_etag = '' self.concurrency = concurrency + self.node = None + self.header_provider = header_provider # stuff from request self.req_method = req.method @@ -1093,7 +1095,7 @@ class ResumingGetter(object): @property def last_headers(self): if self.source_headers: - return self.source_headers[-1] + return HeaderKeyDict(self.source_headers[-1]) else: return None @@ -1101,13 +1103,17 @@ class ResumingGetter(object): self.app.logger.thread_locals = logger_thread_locals if node in self.used_nodes: return False + req_headers = dict(self.backend_headers) + # a request may be specialised with specific backend headers + if self.header_provider: + req_headers.update(self.header_provider()) start_node_timing = time.time() try: with ConnectionTimeout(self.app.conn_timeout): conn = http_connect( node['ip'], node['port'], node['device'], self.partition, self.req_method, self.path, - headers=self.backend_headers, + headers=req_headers, query_string=self.req_query_string) self.app.set_node_timing(node, time.time() - start_node_timing) @@ -1212,6 +1218,7 @@ class ResumingGetter(object): self.used_source_etag = src_headers.get( 'x-object-sysmeta-ec-etag', src_headers.get('etag', '')).strip('"') + self.node = node return source, node return None, None @@ -1316,6 +1323,7 @@ class NodeIter(object): self.primary_nodes = self.app.sort_nodes( list(itertools.islice(node_iter, num_primary_nodes))) self.handoff_iter = node_iter + self._node_provider = None def __iter__(self): self._node_iter = self._node_gen() @@ -1344,6 +1352,16 @@ class NodeIter(object): # all the primaries were skipped, and handoffs didn't help self.app.logger.increment('handoff_all_count') + def set_node_provider(self, callback): + """ + Install a callback function that will be used during a call to next() + to get an alternate node instead of returning the next node from the + iterator. + :param callback: A no argument function that should return a node dict + or None. + """ + self._node_provider = callback + def _node_gen(self): for node in self.primary_nodes: if not self.app.error_limited(node): @@ -1364,6 +1382,11 @@ class NodeIter(object): return def next(self): + if self._node_provider: + # give node provider the opportunity to inject a node + node = self._node_provider() + if node: + return node return next(self._node_iter) def __next__(self): diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 19a913d749..7b33cc30a1 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -47,7 +47,7 @@ from swift.common.utils import ( GreenAsyncPile, GreenthreadSafeIterator, Timestamp, normalize_delete_at_timestamp, public, get_expirer_container, document_iters_to_http_response_body, parse_content_range, - quorum_size, reiterate, close_if_possible) + quorum_size, reiterate, close_if_possible, safe_json_loads) from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation from swift.common import constraints @@ -1835,9 +1835,262 @@ def trailing_metadata(policy, client_obj_hasher, }) +class ECGetResponseBucket(object): + """ + A helper class to encapsulate the properties of buckets in which fragment + getters and alternate nodes are collected. + """ + def __init__(self, policy, timestamp_str): + """ + :param policy: an instance of ECStoragePolicy + :param timestamp_str: a string representation of a timestamp + """ + self.policy = policy + self.timestamp_str = timestamp_str + self.gets = collections.defaultdict(list) + self.alt_nodes = collections.defaultdict(list) + self._durable = False + self.status = self.headers = None + + def set_durable(self): + self._durable = True + + def add_response(self, getter, parts_iter): + if not self.gets: + self.status = getter.last_status + # stash first set of backend headers, which will be used to + # populate a client response + # TODO: each bucket is for a single *data* timestamp, but sources + # in the same bucket may have different *metadata* timestamps if + # some backends have more recent .meta files than others. Currently + # we just use the last received metadata headers - this behavior is + # ok and is consistent with a replication policy GET which + # similarly does not attempt to find the backend with the most + # recent metadata. We could alternatively choose to the *newest* + # metadata headers for self.headers by selecting the source with + # the latest X-Timestamp. + self.headers = getter.last_headers + elif (getter.last_headers.get('X-Object-Sysmeta-Ec-Etag') != + self.headers.get('X-Object-Sysmeta-Ec-Etag')): + # Fragments at the same timestamp with different etags are never + # expected. If somehow it happens then ignore those fragments + # to avoid mixing fragments that will not reconstruct otherwise + # an exception from pyeclib is almost certain. This strategy leaves + # a possibility that a set of consistent frags will be gathered. + raise ValueError("ETag mismatch") + + frag_index = getter.last_headers.get('X-Object-Sysmeta-Ec-Frag-Index') + frag_index = int(frag_index) if frag_index is not None else None + self.gets[frag_index].append((getter, parts_iter)) + + def get_responses(self): + """ + Return a list of all useful sources. Where there are multiple sources + associated with the same frag_index then only one is included. + + :return: a list of sources, each source being a tuple of form + (ResumingGetter, iter) + """ + all_sources = [] + for frag_index, sources in self.gets.items(): + if frag_index is None: + # bad responses don't have a frag_index (and fake good + # responses from some unit tests) + all_sources.extend(sources) + else: + all_sources.extend(sources[:1]) + return all_sources + + def add_alternate_nodes(self, node, frag_indexes): + for frag_index in frag_indexes: + self.alt_nodes[frag_index].append(node) + + @property + def shortfall(self): + # A non-durable bucket always has a shortfall of at least 1 + result = self.policy.ec_ndata - len(self.get_responses()) + return max(result, 0 if self._durable else 1) + + @property + def shortfall_with_alts(self): + # The shortfall that we expect to have if we were to send requests + # for frags on the alt nodes. + alts = set(self.alt_nodes.keys()).difference(set(self.gets.keys())) + result = self.policy.ec_ndata - (len(self.get_responses()) + len(alts)) + return max(result, 0 if self._durable else 1) + + def __str__(self): + # return a string summarising bucket state, useful for debugging. + return '<%s, %s, %s, %s(%s), %s>' \ + % (self.timestamp_str, self.status, self._durable, + self.shortfall, self.shortfall_with_alts, len(self.gets)) + + +class ECGetResponseCollection(object): + """ + Manages all successful EC GET responses gathered by ResumingGetters. + + A response comprises a tuple of (, ). All + responses having the same data timestamp are placed in an + ECGetResponseBucket for that timestamp. The buckets are stored in the + 'buckets' dict which maps timestamp-> bucket. + + This class encapsulates logic for selecting the best bucket from the + collection, and for choosing alternate nodes. + """ + def __init__(self, policy): + """ + :param policy: an instance of ECStoragePolicy + """ + self.policy = policy + self.buckets = {} + self.node_iter_count = 0 + + def _get_bucket(self, timestamp_str): + """ + :param timestamp_str: a string representation of a timestamp + :return: ECGetResponseBucket for given timestamp + """ + return self.buckets.setdefault( + timestamp_str, ECGetResponseBucket(self.policy, timestamp_str)) + + def add_response(self, get, parts_iter): + """ + Add a response to the collection. + + :param get: An instance of + :class:`~swift.proxy.controllers.base.ResumingGetter` + :param parts_iter: An iterator over response body parts + :raises ValueError: if the response etag or status code values do not + match any values previously received for the same timestamp + """ + headers = get.last_headers + # Add the response to the appropriate bucket keyed by data file + # timestamp. Fall back to using X-Backend-Timestamp as key for object + # servers that have not been upgraded. + t_data_file = headers.get('X-Backend-Data-Timestamp') + t_obj = headers.get('X-Backend-Timestamp', headers.get('X-Timestamp')) + self._get_bucket(t_data_file or t_obj).add_response(get, parts_iter) + + # The node may also have alternate fragments indexes (possibly at + # different timestamps). For each list of alternate fragments indexes, + # find the bucket for their data file timestamp and add the node and + # list to that bucket's alternate nodes. + frag_sets = safe_json_loads(headers.get('X-Backend-Fragments')) or {} + for t_frag, frag_set in frag_sets.items(): + self._get_bucket(t_frag).add_alternate_nodes(get.node, frag_set) + # If the response includes a durable timestamp then mark that bucket as + # durable. Note that this may be a different bucket than the one this + # response got added to, and that we may never go and get a durable + # frag from this node; it is sufficient that we have been told that a + # .durable exists, somewhere, at t_durable. + t_durable = headers.get('X-Backend-Durable-Timestamp') + if not t_durable and not t_data_file: + # obj server not upgraded so assume this response's frag is durable + t_durable = t_obj + if t_durable: + self._get_bucket(t_durable).set_durable() + + def _sort_buckets(self): + def key_fn(bucket): + # Returns a tuple to use for sort ordering: + # buckets with no shortfall sort higher, + # otherwise buckets with lowest shortfall_with_alts sort higher, + # finally buckets with newer timestamps sort higher. + return (bucket.shortfall <= 0, + (not (bucket.shortfall <= 0) and + (-1 * bucket.shortfall_with_alts)), + bucket.timestamp_str) + + return sorted(self.buckets.values(), key=key_fn, reverse=True) + + @property + def best_bucket(self): + """ + Return the best bucket in the collection. + + The "best" bucket is the newest timestamp with sufficient getters, or + the closest to having a sufficient getters, unless it is bettered by a + bucket with potential alternate nodes. + + :return: An instance of :class:`~ECGetResponseBucket` or None if there + are no buckets in the collection. + """ + sorted_buckets = self._sort_buckets() + if sorted_buckets: + return sorted_buckets[0] + return None + + def _get_frag_prefs(self): + # Construct the current frag_prefs list, with best_bucket prefs first. + frag_prefs = [] + + for bucket in self._sort_buckets(): + if bucket.timestamp_str: + exclusions = [fi for fi in bucket.gets if fi is not None] + prefs = {'timestamp': bucket.timestamp_str, + 'exclude': exclusions} + frag_prefs.append(prefs) + + return frag_prefs + + def get_extra_headers(self): + frag_prefs = self._get_frag_prefs() + return {'X-Backend-Fragment-Preferences': json.dumps(frag_prefs)} + + def _get_alternate_nodes(self): + if self.node_iter_count <= self.policy.ec_ndata: + # It makes sense to wait before starting to use alternate nodes, + # because if we find sufficient frags on *distinct* nodes then we + # spread work across mode nodes. There's no formal proof that + # waiting for ec_ndata GETs is the right answer, but it seems + # reasonable to try *at least* that many primary nodes before + # resorting to alternate nodes. + return None + + bucket = self.best_bucket + if (bucket is None) or (bucket.shortfall <= 0): + return None + + alt_frags = set(bucket.alt_nodes.keys()) + got_frags = set(bucket.gets.keys()) + wanted_frags = list(alt_frags.difference(got_frags)) + + # We may have the same frag_index on more than one node so shuffle to + # avoid using the same frag_index consecutively, since we may not get a + # response from the last node provided before being asked to provide + # another node. + random.shuffle(wanted_frags) + + for frag_index in wanted_frags: + nodes = bucket.alt_nodes.get(frag_index) + if nodes: + return nodes + return None + + def has_alternate_node(self): + return True if self._get_alternate_nodes() else False + + def provide_alternate_node(self): + """ + Callback function that is installed in a NodeIter. Called on every call + to NodeIter.next(), which means we can track the number of nodes to + which GET requests have been made and selectively inject an alternate + node, if we have one. + + :return: A dict describing a node to which the next GET request + should be made. + """ + self.node_iter_count += 1 + nodes = self._get_alternate_nodes() + if nodes: + return nodes.pop(0).copy() + + @ObjectControllerRouter.register(EC_POLICY) class ECObjectController(BaseObjectController): - def _fragment_GET_request(self, req, node_iter, partition, policy): + def _fragment_GET_request(self, req, node_iter, partition, policy, + header_provider=None): """ Makes a GET request for a fragment. """ @@ -1848,7 +2101,7 @@ class ECObjectController(BaseObjectController): partition, req.swift_entity_path, backend_headers, client_chunk_size=policy.fragment_size, - newest=False) + newest=False, header_provider=header_provider) return (getter, getter.response_parts_iter(req)) def _convert_range(self, req, policy): @@ -1914,93 +2167,130 @@ class ECObjectController(BaseObjectController): resp = self.GETorHEAD_base( req, _('Object'), node_iter, partition, req.swift_entity_path, concurrency) - else: # GET request - orig_range = None - range_specs = [] - if req.range: - orig_range = req.range - range_specs = self._convert_range(req, policy) + self._fix_response(req, resp) + return resp - safe_iter = GreenthreadSafeIterator(node_iter) - # Sending the request concurrently to all nodes, and responding - # with the first response isn't something useful for EC as all - # nodes contain different fragments. Also EC has implemented it's - # own specific implementation of concurrent gets to ec_ndata nodes. - # So we don't need to worry about plumbing and sending a - # concurrency value to ResumingGetter. - with ContextPool(policy.ec_ndata) as pool: - pile = GreenAsyncPile(pool) - for _junk in range(policy.ec_ndata): - pile.spawn(self._fragment_GET_request, - req, safe_iter, partition, - policy) + # GET request + orig_range = None + range_specs = [] + if req.range: + orig_range = req.range + range_specs = self._convert_range(req, policy) - bad_gets = [] - etag_buckets = collections.defaultdict(list) - best_etag = None - for get, parts_iter in pile: - if is_success(get.last_status): - etag = HeaderKeyDict( - get.last_headers)['X-Object-Sysmeta-Ec-Etag'] - etag_buckets[etag].append((get, parts_iter)) - if etag != best_etag and ( - len(etag_buckets[etag]) > - len(etag_buckets[best_etag])): - best_etag = etag - else: - bad_gets.append((get, parts_iter)) - matching_response_count = max( - len(etag_buckets[best_etag]), len(bad_gets)) - if (policy.ec_ndata - matching_response_count > - pile._pending) and node_iter.nodes_left > 0: - # we need more matching responses to reach ec_ndata - # than we have pending gets, as long as we still have - # nodes in node_iter we can spawn another - pile.spawn(self._fragment_GET_request, req, - safe_iter, partition, policy) + safe_iter = GreenthreadSafeIterator(node_iter) + # Sending the request concurrently to all nodes, and responding + # with the first response isn't something useful for EC as all + # nodes contain different fragments. Also EC has implemented it's + # own specific implementation of concurrent gets to ec_ndata nodes. + # So we don't need to worry about plumbing and sending a + # concurrency value to ResumingGetter. + with ContextPool(policy.ec_ndata) as pool: + pile = GreenAsyncPile(pool) + buckets = ECGetResponseCollection(policy) + node_iter.set_node_provider(buckets.provide_alternate_node) + # include what may well be an empty X-Backend-Fragment-Preferences + # header from the buckets.get_extra_headers to let the object + # server know that it is ok to return non-durable fragments + for _junk in range(policy.ec_ndata): + pile.spawn(self._fragment_GET_request, + req, safe_iter, partition, + policy, buckets.get_extra_headers) - req.range = orig_range - if len(etag_buckets[best_etag]) >= policy.ec_ndata: - # headers can come from any of the getters - resp_headers = HeaderKeyDict( - etag_buckets[best_etag][0][0].source_headers[-1]) - resp_headers.pop('Content-Range', None) - eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length') - obj_length = int(eccl) if eccl is not None else None - - # This is only true if we didn't get a 206 response, but - # that's the only time this is used anyway. - fa_length = int(resp_headers['Content-Length']) - app_iter = ECAppIter( - req.swift_entity_path, - policy, - [iterator for getter, iterator in etag_buckets[best_etag]], - range_specs, fa_length, obj_length, - self.app.logger) - resp = Response( - request=req, - headers=resp_headers, - conditional_response=True, - app_iter=app_iter) + bad_bucket = ECGetResponseBucket(policy, None) + bad_bucket.set_durable() + best_bucket = None + extra_requests = 0 + # max_extra_requests is an arbitrary hard limit for spawning extra + # getters in case some unforeseen scenario, or a misbehaving object + # server, causes us to otherwise make endless requests e.g. if an + # object server were to ignore frag_prefs and always respond with + # a frag that is already in a bucket. + max_extra_requests = 2 * policy.ec_nparity + policy.ec_ndata + for get, parts_iter in pile: + if get.last_status is None: + # We may have spawned getters that find the node iterator + # has been exhausted. Ignore them. + # TODO: turns out that node_iter.nodes_left can bottom + # out at >0 when number of devs in ring is < 2* replicas, + # which definitely happens in tests and results in status + # of None. We should fix that but keep this guard because + # there is also a race between testing nodes_left/spawning + # a getter and an existing getter calling next(node_iter). + continue try: - app_iter.kickoff(req, resp) - except HTTPException as err_resp: - # catch any HTTPException response here so that we can - # process response headers uniformly in _fix_response - resp = err_resp - else: - statuses = [] - reasons = [] - bodies = [] - headers = [] - for getter, body_parts_iter in bad_gets: - statuses.extend(getter.statuses) - reasons.extend(getter.reasons) - bodies.extend(getter.bodies) - headers.extend(getter.source_headers) - resp = self.best_response( - req, statuses, reasons, bodies, 'Object', - headers=headers) + if is_success(get.last_status): + # 2xx responses are managed by a response collection + buckets.add_response(get, parts_iter) + else: + # all other responses are lumped into a single bucket + bad_bucket.add_response(get, parts_iter) + except ValueError as err: + self.app.logger.error( + _("Problem with fragment response: %s"), err) + shortfall = bad_bucket.shortfall + best_bucket = buckets.best_bucket + if best_bucket: + shortfall = min(best_bucket.shortfall, shortfall) + if (extra_requests < max_extra_requests and + shortfall > pile._pending and + (node_iter.nodes_left > 0 or + buckets.has_alternate_node())): + # we need more matching responses to reach ec_ndata + # than we have pending gets, as long as we still have + # nodes in node_iter we can spawn another + extra_requests += 1 + pile.spawn(self._fragment_GET_request, req, + safe_iter, partition, policy, + buckets.get_extra_headers) + + req.range = orig_range + if best_bucket and best_bucket.shortfall <= 0: + # headers can come from any of the getters + resp_headers = best_bucket.headers + resp_headers.pop('Content-Range', None) + eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length') + obj_length = int(eccl) if eccl is not None else None + + # This is only true if we didn't get a 206 response, but + # that's the only time this is used anyway. + fa_length = int(resp_headers['Content-Length']) + app_iter = ECAppIter( + req.swift_entity_path, + policy, + [parts_iter for + _getter, parts_iter in best_bucket.get_responses()], + range_specs, fa_length, obj_length, + self.app.logger) + resp = Response( + request=req, + headers=resp_headers, + conditional_response=True, + app_iter=app_iter) + try: + app_iter.kickoff(req, resp) + except HTTPException as err_resp: + # catch any HTTPException response here so that we can + # process response headers uniformly in _fix_response + resp = err_resp + else: + # TODO: we can get here if all buckets are successful but none + # have ec_ndata getters, so bad_bucket may have no gets and we will + # return a 503 when a 404 may be more appropriate. We can also get + # here with less than ec_ndata 416's and may then return a 416 + # which is also questionable because a non-range get for same + # object would return 404 or 503. + statuses = [] + reasons = [] + bodies = [] + headers = [] + for getter, _parts_iter in bad_bucket.get_responses(): + statuses.extend(getter.statuses) + reasons.extend(getter.reasons) + bodies.extend(getter.bodies) + headers.extend(getter.source_headers) + resp = self.best_response( + req, statuses, reasons, bodies, 'Object', + headers=headers) self._fix_response(req, resp) return resp diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 3640509a60..16816ba14b 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -3664,6 +3664,36 @@ cluster_dfw1 = http://dfw1.host/v1/ os.close(fd) shutil.rmtree(tempdir) + def test_safe_json_loads(self): + expectations = { + None: None, + '': None, + 0: None, + 1: None, + '"asdf"': 'asdf', + '[]': [], + '{}': {}, + "{'foo': 'bar'}": None, + '{"foo": "bar"}': {'foo': 'bar'}, + } + + failures = [] + for value, expected in expectations.items(): + try: + result = utils.safe_json_loads(value) + except Exception as e: + # it's called safe, if it blows up the test blows up + self.fail('%r caused safe method to throw %r!' % ( + value, e)) + try: + self.assertEqual(expected, result) + except AssertionError: + failures.append('%r => %r (expected %r)' % ( + value, result, expected)) + if failures: + self.fail('Invalid results from pure function:\n%s' % + '\n'.join(failures)) + class ResellerConfReader(unittest.TestCase): diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index bc30733d94..fdcb78673b 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -591,14 +591,16 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): def tearDown(self): rmtree(self.tmpdir, ignore_errors=1) - def _get_diskfile(self, policy, frag_index=None): + def _get_diskfile(self, policy, frag_index=None, **kwargs): df_mgr = self.df_router[policy] return df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o', - policy=policy, frag_index=frag_index) + policy=policy, frag_index=frag_index, + **kwargs) def _test_get_ondisk_files(self, scenarios, policy, - frag_index=None): - class_under_test = self._get_diskfile(policy, frag_index=frag_index) + frag_index=None, **kwargs): + class_under_test = self._get_diskfile( + policy, frag_index=frag_index, **kwargs) for test in scenarios: # test => [('filename.ext', '.ext'|False, ...), ...] expected = { @@ -610,7 +612,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): files = list(list(zip(*test))[0]) for _order in ('ordered', 'shuffled', 'shuffled'): - class_under_test = self._get_diskfile(policy, frag_index) + class_under_test = self._get_diskfile( + policy, frag_index=frag_index, **kwargs) try: actual = class_under_test._get_ondisk_files(files) self._assertDictContainsSubset( @@ -621,8 +624,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): self.fail('%s with files %s' % (str(e), files)) shuffle(files) - def _test_cleanup_ondisk_files_files(self, scenarios, policy, - reclaim_age=None): + def _test_cleanup_ondisk_files(self, scenarios, policy, + reclaim_age=None): # check that expected files are left in hashdir after cleanup for test in scenarios: class_under_test = self.df_router[policy] @@ -753,8 +756,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): [('%s.meta' % older, False, False), ('%s.ts' % older, False, False)]] - self._test_cleanup_ondisk_files_files(scenarios, POLICIES.default, - reclaim_age=1000) + self._test_cleanup_ondisk_files(scenarios, POLICIES.default, + reclaim_age=1000) def test_construct_dev_path(self): res_path = self.df_mgr.construct_dev_path('abc') @@ -1165,7 +1168,7 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase): ] self._test_get_ondisk_files(scenarios, POLICIES[0], None) - self._test_cleanup_ondisk_files_files(scenarios, POLICIES[0]) + self._test_cleanup_ondisk_files(scenarios, POLICIES[0]) self._test_yield_hashes_cleanup(scenarios, POLICIES[0]) def test_get_ondisk_files_with_stray_meta(self): @@ -1248,8 +1251,8 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase): [('%s.meta' % older, '.meta'), ('%s.data' % much_older, '.data')]] - self._test_cleanup_ondisk_files_files(scenarios, POLICIES.default, - reclaim_age=1000) + self._test_cleanup_ondisk_files(scenarios, POLICIES.default, + reclaim_age=1000) def test_yield_hashes(self): old_ts = '1383180000.12345' @@ -1437,23 +1440,6 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): ('0000000007.00000#1.data', '.data'), ('0000000007.00000#0.data', False, True)], - # data with no durable is ignored - [('0000000007.00000#0.data', False, True)], - - # data newer than tombstone with no durable is ignored - [('0000000007.00000#0.data', False, True), - ('0000000006.00000.ts', '.ts', True)], - - # data newer than durable is ignored - [('0000000008.00000#1.data', False, True), - ('0000000007.00000.durable', '.durable'), - ('0000000007.00000#1.data', '.data'), - ('0000000007.00000#0.data', False, True)], - - # data newer than durable ignored, even if its only data - [('0000000008.00000#1.data', False, True), - ('0000000007.00000.durable', False, False)], - # data older than durable is ignored [('0000000007.00000.durable', '.durable'), ('0000000007.00000#1.data', '.data'), @@ -1489,16 +1475,79 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): ('0000000006.00000.ts', '.ts'), ('0000000006.00000.durable', False), ('0000000006.00000#0.data', False)], - - # missing durable invalidates data - [('0000000006.00000.meta', False, True), - ('0000000006.00000#0.data', False, True)] ] - self._test_get_ondisk_files(scenarios, POLICIES.default, None) - self._test_cleanup_ondisk_files_files(scenarios, POLICIES.default) + # these scenarios have same outcome regardless of whether any + # fragment preferences are specified + self._test_get_ondisk_files(scenarios, POLICIES.default, + frag_index=None) + self._test_get_ondisk_files(scenarios, POLICIES.default, + frag_index=None, frag_prefs=[]) + self._test_cleanup_ondisk_files(scenarios, POLICIES.default) self._test_yield_hashes_cleanup(scenarios, POLICIES.default) + # next scenarios have different outcomes dependent on whether a + # frag_prefs parameter is passed to diskfile constructor or not + scenarios = [ + # data with no durable is ignored + [('0000000007.00000#0.data', False, True)], + + # data newer than tombstone with no durable is ignored + [('0000000007.00000#0.data', False, True), + ('0000000006.00000.ts', '.ts', True)], + + # data newer than durable is ignored + [('0000000009.00000#2.data', False, True), + ('0000000009.00000#1.data', False, True), + ('0000000008.00000#3.data', False, True), + ('0000000007.00000.durable', '.durable'), + ('0000000007.00000#1.data', '.data'), + ('0000000007.00000#0.data', False, True)], + + # data newer than durable ignored, even if its only data + [('0000000008.00000#1.data', False, True), + ('0000000007.00000.durable', False, False)], + + # missing durable invalidates data, older meta deleted + [('0000000007.00000.meta', False, True), + ('0000000006.00000#0.data', False, True), + ('0000000005.00000.meta', False, False), + ('0000000004.00000#1.data', False, True)]] + + self._test_get_ondisk_files(scenarios, POLICIES.default, + frag_index=None) + self._test_cleanup_ondisk_files(scenarios, POLICIES.default) + + scenarios = [ + # data with no durable is chosen + [('0000000007.00000#0.data', '.data', True)], + + # data newer than tombstone with no durable is chosen + [('0000000007.00000#0.data', '.data', True), + ('0000000006.00000.ts', False, True)], + + # data newer than durable is chosen, older data preserved + [('0000000009.00000#2.data', '.data', True), + ('0000000009.00000#1.data', False, True), + ('0000000008.00000#3.data', False, True), + ('0000000007.00000.durable', False, True), + ('0000000007.00000#1.data', False, True), + ('0000000007.00000#0.data', False, True)], + + # data newer than durable chosen when its only data + [('0000000008.00000#1.data', '.data', True), + ('0000000007.00000.durable', False, False)], + + # data plus meta chosen without durable, older meta deleted + [('0000000007.00000.meta', '.meta', True), + ('0000000006.00000#0.data', '.data', True), + ('0000000005.00000.meta', False, False), + ('0000000004.00000#1.data', False, True)]] + + self._test_get_ondisk_files(scenarios, POLICIES.default, + frag_index=None, frag_prefs=[]) + self._test_cleanup_ondisk_files(scenarios, POLICIES.default) + def test_get_ondisk_files_with_ec_policy_and_frag_index(self): # Each scenario specifies a list of (filename, extension) tuples. If # extension is set then that filename should be returned by the method @@ -1512,20 +1561,20 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): [('0000000007.00000#2.data', False, True), ('0000000007.00000#1.data', False, True), ('0000000007.00000#0.data', False, True), - ('0000000006.00000.durable', '.durable')], + ('0000000006.00000.durable', False)], # specific frag older than durable is ignored [('0000000007.00000#2.data', False), ('0000000007.00000#1.data', False), ('0000000007.00000#0.data', False), - ('0000000008.00000.durable', '.durable')], + ('0000000008.00000.durable', False)], # specific frag older than newest durable is ignored # even if is also has a durable [('0000000007.00000#2.data', False), ('0000000007.00000#1.data', False), ('0000000007.00000.durable', False), - ('0000000008.00000#0.data', False), + ('0000000008.00000#0.data', False, True), ('0000000008.00000.durable', '.durable')], # meta included when frag index is specified @@ -1559,12 +1608,23 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): # frag_index, get_ondisk_files will tolerate .meta with # no .data [('0000000088.00000.meta', False, True), - ('0000000077.00000.durable', '.durable')] + ('0000000077.00000.durable', False, False)] ] self._test_get_ondisk_files(scenarios, POLICIES.default, frag_index=1) - # note: not calling self._test_cleanup_ondisk_files_files(scenarios, 0) - # here due to the anomalous scenario as commented above + self._test_cleanup_ondisk_files(scenarios, POLICIES.default) + + # scenarios for empty frag_prefs, meaning durable not required + scenarios = [ + # specific frag newer than durable is chosen + [('0000000007.00000#2.data', False, True), + ('0000000007.00000#1.data', '.data', True), + ('0000000007.00000#0.data', False, True), + ('0000000006.00000.durable', False, False)], + ] + self._test_get_ondisk_files(scenarios, POLICIES.default, frag_index=1, + frag_prefs=[]) + self._test_cleanup_ondisk_files(scenarios, POLICIES.default) def test_cleanup_ondisk_files_reclaim_with_data_files(self): # Each scenario specifies a list of (filename, extension, [survives]) @@ -1622,8 +1682,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): [('%s.meta' % older, False, False), ('%s.durable' % much_older, False, False)]] - self._test_cleanup_ondisk_files_files(scenarios, POLICIES.default, - reclaim_age=1000) + self._test_cleanup_ondisk_files(scenarios, POLICIES.default, + reclaim_age=1000) def test_get_ondisk_files_with_stray_meta(self): # get_ondisk_files ignores a stray .meta file @@ -4574,6 +4634,233 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase): df.open() self.assertEqual(ts1, df.durable_timestamp) + def test_open_with_fragment_preferences(self): + policy = POLICIES.default + df_mgr = self.df_router[policy] + + df = df_mgr.get_diskfile(self.existing_device, '0', + 'a', 'c', 'o', policy=policy) + + ts_1, ts_2, ts_3, ts_4 = (self.ts() for _ in range(4)) + + # create two durable frags, first with index 0 + with df.create() as writer: + data = 'test data' + writer.write(data) + frag_0_metadata = { + 'ETag': md5(data).hexdigest(), + 'X-Timestamp': ts_1.internal, + 'Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Frag-Index': 0, + } + writer.put(frag_0_metadata) + writer.commit(ts_1) + + # second with index 3 + with df.create() as writer: + data = 'test data' + writer.write(data) + frag_3_metadata = { + 'ETag': md5(data).hexdigest(), + 'X-Timestamp': ts_1.internal, + 'Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Frag-Index': 3, + } + writer.put(frag_3_metadata) + writer.commit(ts_1) + + # sanity check: should have 2 * .data plus a .durable + self.assertEqual(3, len(os.listdir(df._datadir))) + + # add some .meta stuff + meta_1_metadata = { + 'X-Object-Meta-Foo': 'Bar', + 'X-Timestamp': ts_2.internal, + } + df = df_mgr.get_diskfile(self.existing_device, '0', + 'a', 'c', 'o', policy=policy) + df.write_metadata(meta_1_metadata) + # sanity check: should have 2 * .data, .durable, .meta + self.assertEqual(4, len(os.listdir(df._datadir))) + + # sanity: should get frag index 3 + df = df_mgr.get_diskfile(self.existing_device, '0', + 'a', 'c', 'o', policy=policy) + expected = dict(frag_3_metadata) + expected.update(meta_1_metadata) + self.assertEqual(expected, df.read_metadata()) + + # add a newer datafile for frag index 2 + df = df_mgr.get_diskfile(self.existing_device, '0', + 'a', 'c', 'o', policy=policy) + with df.create() as writer: + data = 'new test data' + writer.write(data) + frag_2_metadata = { + 'ETag': md5(data).hexdigest(), + 'X-Timestamp': ts_3.internal, + 'Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Frag-Index': 2, + } + writer.put(frag_2_metadata) + # N.B. don't make it durable - skip call to commit() + # sanity check: should have 2* .data, .durable, .meta, .data + self.assertEqual(5, len(os.listdir(df._datadir))) + + # sanity check: with no frag preferences we get old metadata + df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o', + policy=policy) + self.assertEqual(expected, df.read_metadata()) + self.assertEqual(ts_2.internal, df.timestamp) + self.assertEqual(ts_1.internal, df.data_timestamp) + self.assertEqual(ts_1.internal, df.durable_timestamp) + self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments) + + # with empty frag preferences we get metadata from newer non-durable + # data file + df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o', + policy=policy, frag_prefs=[]) + self.assertEqual(frag_2_metadata, df.read_metadata()) + self.assertEqual(ts_3.internal, df.timestamp) + self.assertEqual(ts_3.internal, df.data_timestamp) + self.assertEqual(ts_1.internal, df.durable_timestamp) + self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments) + + # check we didn't destroy any potentially valid data by opening the + # non-durable data file + df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o', + policy=policy) + self.assertEqual(expected, df.read_metadata()) + + # now add some newer .meta stuff which should replace older .meta + meta_2_metadata = { + 'X-Object-Meta-Foo': 'BarBarBarAnne', + 'X-Timestamp': ts_4.internal, + } + df = df_mgr.get_diskfile(self.existing_device, '0', + 'a', 'c', 'o', policy=policy) + df.write_metadata(meta_2_metadata) + # sanity check: should have 2 * .data, .durable, .data, .meta + self.assertEqual(5, len(os.listdir(df._datadir))) + + # sanity check: with no frag preferences we get newer metadata applied + # to durable data file + expected = dict(frag_3_metadata) + expected.update(meta_2_metadata) + df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o', + policy=policy) + self.assertEqual(expected, df.read_metadata()) + self.assertEqual(ts_4.internal, df.timestamp) + self.assertEqual(ts_1.internal, df.data_timestamp) + self.assertEqual(ts_1.internal, df.durable_timestamp) + self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments) + + # with empty frag preferences we still get metadata from newer .meta + # but applied to non-durable data file + expected = dict(frag_2_metadata) + expected.update(meta_2_metadata) + df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o', + policy=policy, frag_prefs=[]) + self.assertEqual(expected, df.read_metadata()) + self.assertEqual(ts_4.internal, df.timestamp) + self.assertEqual(ts_3.internal, df.data_timestamp) + self.assertEqual(ts_1.internal, df.durable_timestamp) + self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments) + + # check we didn't destroy any potentially valid data by opening the + # non-durable data file + expected = dict(frag_3_metadata) + expected.update(meta_2_metadata) + df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o', + policy=policy) + self.assertEqual(expected, df.read_metadata()) + self.assertEqual(ts_4.internal, df.timestamp) + self.assertEqual(ts_1.internal, df.data_timestamp) + self.assertEqual(ts_1.internal, df.durable_timestamp) + self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments) + + # prefer frags at ts_1, exclude no indexes, expect highest frag index + prefs = [{'timestamp': ts_1.internal, 'exclude': []}, + {'timestamp': ts_2.internal, 'exclude': []}, + {'timestamp': ts_3.internal, 'exclude': []}] + expected = dict(frag_3_metadata) + expected.update(meta_2_metadata) + df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o', + policy=policy, frag_prefs=prefs) + self.assertEqual(expected, df.read_metadata()) + self.assertEqual(ts_4.internal, df.timestamp) + self.assertEqual(ts_1.internal, df.data_timestamp) + self.assertEqual(ts_1.internal, df.durable_timestamp) + self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments) + + # prefer frags at ts_1, exclude frag index 3 so expect frag index 0 + prefs = [{'timestamp': ts_1.internal, 'exclude': [3]}, + {'timestamp': ts_2.internal, 'exclude': []}, + {'timestamp': ts_3.internal, 'exclude': []}] + expected = dict(frag_0_metadata) + expected.update(meta_2_metadata) + df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o', + policy=policy, frag_prefs=prefs) + self.assertEqual(expected, df.read_metadata()) + self.assertEqual(ts_4.internal, df.timestamp) + self.assertEqual(ts_1.internal, df.data_timestamp) + self.assertEqual(ts_1.internal, df.durable_timestamp) + self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments) + + # now make ts_3 the preferred timestamp, excluded indexes don't exist + prefs = [{'timestamp': ts_3.internal, 'exclude': [4, 5, 6]}, + {'timestamp': ts_2.internal, 'exclude': []}, + {'timestamp': ts_1.internal, 'exclude': []}] + expected = dict(frag_2_metadata) + expected.update(meta_2_metadata) + df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o', + policy=policy, frag_prefs=prefs) + self.assertEqual(expected, df.read_metadata()) + self.assertEqual(ts_4.internal, df.timestamp) + self.assertEqual(ts_3.internal, df.data_timestamp) + self.assertEqual(ts_1.internal, df.durable_timestamp) + self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments) + + # now make ts_2 the preferred timestamp - there are no frags at ts_2, + # next preference is ts_3 but index 2 is excluded, then at ts_1 index 3 + # is excluded so we get frag 0 at ts_1 + prefs = [{'timestamp': ts_2.internal, 'exclude': [1]}, + {'timestamp': ts_3.internal, 'exclude': [2]}, + {'timestamp': ts_1.internal, 'exclude': [3]}] + + expected = dict(frag_0_metadata) + expected.update(meta_2_metadata) + df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o', + policy=policy, frag_prefs=prefs) + self.assertEqual(expected, df.read_metadata()) + self.assertEqual(ts_4.internal, df.timestamp) + self.assertEqual(ts_1.internal, df.data_timestamp) + self.assertEqual(ts_1.internal, df.durable_timestamp) + self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments) + + def test_open_with_bad_fragment_preferences(self): + policy = POLICIES.default + df_mgr = self.df_router[policy] + + for bad in ( + 'ouch', + 2, + [{'timestamp': '1234.5678', 'excludes': [1]}, {}], + [{'timestamp': 'not a timestamp', 'excludes': [1, 2]}], + [{'timestamp': '1234.5678', 'excludes': [1, -1]}], + [{'timestamp': '1234.5678', 'excludes': 1}], + [{'timestamp': '1234.5678'}], + [{'excludes': [1, 2]}] + + ): + try: + df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o', + policy=policy, frag_prefs=bad) + self.fail('Expected DiskFileError for bad frag_prefs: %r' + % bad) + except DiskFileError as e: + self.assertIn('frag_prefs', str(e)) + @patch_policies(with_ec_default=True) class TestSuffixHashes(unittest.TestCase): diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 40e0852a4b..4807ccf01f 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -233,6 +233,8 @@ class TestObjectController(unittest.TestCase): 'Content-Encoding': 'gzip', 'X-Backend-Timestamp': post_timestamp, 'X-Timestamp': post_timestamp, + 'X-Backend-Data-Timestamp': put_timestamp, + 'X-Backend-Durable-Timestamp': put_timestamp, 'Last-Modified': strftime( '%a, %d %b %Y %H:%M:%S GMT', gmtime(math.ceil(float(post_timestamp)))), @@ -266,6 +268,8 @@ class TestObjectController(unittest.TestCase): 'X-Object-Sysmeta-Color': 'blue', 'X-Backend-Timestamp': post_timestamp, 'X-Timestamp': post_timestamp, + 'X-Backend-Data-Timestamp': put_timestamp, + 'X-Backend-Durable-Timestamp': put_timestamp, 'Last-Modified': strftime( '%a, %d %b %Y %H:%M:%S GMT', gmtime(math.ceil(float(post_timestamp)))), @@ -308,6 +312,8 @@ class TestObjectController(unittest.TestCase): 'X-Static-Large-Object': 'True', 'X-Backend-Timestamp': put_timestamp, 'X-Timestamp': put_timestamp, + 'X-Backend-Data-Timestamp': put_timestamp, + 'X-Backend-Durable-Timestamp': put_timestamp, 'Last-Modified': strftime( '%a, %d %b %Y %H:%M:%S GMT', gmtime(math.ceil(float(put_timestamp)))), @@ -338,6 +344,8 @@ class TestObjectController(unittest.TestCase): 'X-Static-Large-Object': 'True', 'X-Backend-Timestamp': post_timestamp, 'X-Timestamp': post_timestamp, + 'X-Backend-Data-Timestamp': put_timestamp, + 'X-Backend-Durable-Timestamp': put_timestamp, 'Last-Modified': strftime( '%a, %d %b %Y %H:%M:%S GMT', gmtime(math.ceil(float(post_timestamp)))), @@ -368,6 +376,8 @@ class TestObjectController(unittest.TestCase): 'X-Static-Large-Object': 'True', 'X-Backend-Timestamp': post_timestamp, 'X-Timestamp': post_timestamp, + 'X-Backend-Data-Timestamp': put_timestamp, + 'X-Backend-Durable-Timestamp': put_timestamp, 'Last-Modified': strftime( '%a, %d %b %Y %H:%M:%S GMT', gmtime(math.ceil(float(post_timestamp)))), @@ -3185,6 +3195,238 @@ class TestObjectController(unittest.TestCase): resp = req.get_response(self.object_controller) self.assertEqual(resp.status_int, 412) + def _create_ondisk_fragments(self, policy): + # Create some on disk files... + ts_iter = make_timestamp_iter() + + # PUT at ts_0 + ts_0 = next(ts_iter) + headers = {'X-Timestamp': ts_0.internal, + 'Content-Length': '5', + 'Content-Type': 'application/octet-stream', + 'X-Backend-Storage-Policy-Index': int(policy)} + if policy.policy_type == EC_POLICY: + headers['X-Object-Sysmeta-Ec-Frag-Index'] = '0' + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers=headers) + req.body = 'OLDER' + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 201) + + # POST at ts_1 + ts_1 = next(ts_iter) + headers = {'X-Timestamp': ts_1.internal, + 'X-Backend-Storage-Policy-Index': int(policy)} + headers['X-Object-Meta-Test'] = 'abc' + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, + headers=headers) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 202) + + # PUT again at ts_2 but without a .durable file + ts_2 = next(ts_iter) + headers = {'X-Timestamp': ts_2.internal, + 'Content-Length': '5', + 'Content-Type': 'application/octet-stream', + 'X-Backend-Storage-Policy-Index': int(policy)} + if policy.policy_type == EC_POLICY: + headers['X-Object-Sysmeta-Ec-Frag-Index'] = '2' + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers=headers) + req.body = 'NEWER' + # patch the commit method to do nothing so EC object gets + # no .durable file + with mock.patch('swift.obj.diskfile.ECDiskFileWriter.commit'): + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 201) + + return ts_0, ts_1, ts_2 + + def test_GET_HEAD_with_fragment_preferences(self): + for policy in POLICIES: + ts_0, ts_1, ts_2 = self._create_ondisk_fragments(policy) + + backend_frags = json.dumps({ts_0.internal: [0], + ts_2.internal: [2]}) + + def _assert_frag_0_at_ts_0(resp): + expect = { + 'X-Timestamp': ts_1.normal, + 'X-Backend-Timestamp': ts_1.internal, + 'X-Backend-Data-Timestamp': ts_0.internal, + 'X-Backend-Durable-Timestamp': ts_0.internal, + 'X-Backend-Fragments': backend_frags, + 'X-Object-Sysmeta-Ec-Frag-Index': '0', + 'X-Object-Meta-Test': 'abc'} + self.assertDictContainsSubset(expect, resp.headers) + + def _assert_repl_data_at_ts_2(): + self.assertIn(resp.status_int, (200, 202)) + expect = { + 'X-Timestamp': ts_2.normal, + 'X-Backend-Timestamp': ts_2.internal, + 'X-Backend-Data-Timestamp': ts_2.internal, + 'X-Backend-Durable-Timestamp': ts_2.internal} + self.assertDictContainsSubset(expect, resp.headers) + self.assertNotIn('X-Object-Meta-Test', resp.headers) + + # Sanity check: Request with no preferences should default to the + # durable frag + headers = {'X-Backend-Storage-Policy-Index': int(policy)} + req = Request.blank('/sda1/p/a/c/o', headers=headers, + environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.object_controller) + + if policy.policy_type == EC_POLICY: + _assert_frag_0_at_ts_0(resp) + self.assertEqual(resp.body, 'OLDER') + else: + _assert_repl_data_at_ts_2() + self.assertEqual(resp.body, 'NEWER') + + req = Request.blank('/sda1/p/a/c/o', headers=headers, + environ={'REQUEST_METHOD': 'HEAD'}) + resp = req.get_response(self.object_controller) + if policy.policy_type == EC_POLICY: + _assert_frag_0_at_ts_0(resp) + else: + _assert_repl_data_at_ts_2() + + # Request with preferences can select the older frag + prefs = json.dumps( + [{'timestamp': ts_0.internal, 'exclude': [1, 3]}]) + headers = {'X-Backend-Storage-Policy-Index': int(policy), + 'X-Backend-Fragment-Preferences': prefs} + req = Request.blank('/sda1/p/a/c/o', headers=headers, + environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.object_controller) + + if policy.policy_type == EC_POLICY: + _assert_frag_0_at_ts_0(resp) + self.assertEqual(resp.body, 'OLDER') + else: + _assert_repl_data_at_ts_2() + self.assertEqual(resp.body, 'NEWER') + + req = Request.blank('/sda1/p/a/c/o', headers=headers, + environ={'REQUEST_METHOD': 'HEAD'}) + resp = req.get_response(self.object_controller) + + if policy.policy_type == EC_POLICY: + _assert_frag_0_at_ts_0(resp) + else: + _assert_repl_data_at_ts_2() + + def _assert_frag_2_at_ts_2(resp): + self.assertIn(resp.status_int, (200, 202)) + # do not expect meta file to be included since it is older + expect = { + 'X-Timestamp': ts_2.normal, + 'X-Backend-Timestamp': ts_2.internal, + 'X-Backend-Data-Timestamp': ts_2.internal, + 'X-Backend-Durable-Timestamp': ts_0.internal, + 'X-Backend-Fragments': backend_frags, + 'X-Object-Sysmeta-Ec-Frag-Index': '2'} + self.assertDictContainsSubset(expect, resp.headers) + self.assertNotIn('X-Object-Meta-Test', resp.headers) + + # Request with preferences can select the newer non-durable frag + prefs = json.dumps( + [{'timestamp': ts_2.internal, 'exclude': [1, 3]}]) + headers = {'X-Backend-Storage-Policy-Index': int(policy), + 'X-Backend-Fragment-Preferences': prefs} + req = Request.blank('/sda1/p/a/c/o', headers=headers, + environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.object_controller) + + if policy.policy_type == EC_POLICY: + _assert_frag_2_at_ts_2(resp) + else: + _assert_repl_data_at_ts_2() + self.assertEqual(resp.body, 'NEWER') + + req = Request.blank('/sda1/p/a/c/o', headers=headers, + environ={'REQUEST_METHOD': 'HEAD'}) + resp = req.get_response(self.object_controller) + + if policy.policy_type == EC_POLICY: + _assert_frag_2_at_ts_2(resp) + else: + _assert_repl_data_at_ts_2() + + # Request with preference for ts_0 but excludes index 0 will + # default to newest frag + prefs = json.dumps( + [{'timestamp': ts_0.internal, 'exclude': [0]}]) + headers = {'X-Backend-Storage-Policy-Index': int(policy), + 'X-Backend-Fragment-Preferences': prefs} + req = Request.blank('/sda1/p/a/c/o', headers=headers, + environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.object_controller) + if policy.policy_type == EC_POLICY: + _assert_frag_2_at_ts_2(resp) + else: + _assert_repl_data_at_ts_2() + self.assertEqual(resp.body, 'NEWER') + + req = Request.blank('/sda1/p/a/c/o', headers=headers, + environ={'REQUEST_METHOD': 'HEAD'}) + resp = req.get_response(self.object_controller) + + if policy.policy_type == EC_POLICY: + _assert_frag_2_at_ts_2(resp) + else: + _assert_repl_data_at_ts_2() + + # Request with preferences that exclude all frags get nothing + prefs = json.dumps( + [{'timestamp': ts_0.internal, 'exclude': [0]}, + {'timestamp': ts_2.internal, 'exclude': [2]}]) + headers = {'X-Backend-Storage-Policy-Index': int(policy), + 'X-Backend-Fragment-Preferences': prefs} + req = Request.blank('/sda1/p/a/c/o', headers=headers, + environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.object_controller) + if policy.policy_type == EC_POLICY: + self.assertEqual(resp.status_int, 404) + else: + _assert_repl_data_at_ts_2() + self.assertEqual(resp.body, 'NEWER') + + req = Request.blank('/sda1/p/a/c/o', headers=headers, + environ={'REQUEST_METHOD': 'HEAD'}) + resp = req.get_response(self.object_controller) + + if policy.policy_type == EC_POLICY: + self.assertEqual(resp.status_int, 404) + else: + _assert_repl_data_at_ts_2() + + # Request with empty preferences will get non-durable + prefs = json.dumps([]) + headers = {'X-Backend-Storage-Policy-Index': int(policy), + 'X-Backend-Fragment-Preferences': prefs} + req = Request.blank('/sda1/p/a/c/o', headers=headers, + environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.object_controller) + if policy.policy_type == EC_POLICY: + _assert_frag_2_at_ts_2(resp) + else: + _assert_repl_data_at_ts_2() + self.assertEqual(resp.body, 'NEWER') + + req = Request.blank('/sda1/p/a/c/o', headers=headers, + environ={'REQUEST_METHOD': 'HEAD'}) + resp = req.get_response(self.object_controller) + + if policy.policy_type == EC_POLICY: + _assert_frag_2_at_ts_2(resp) + else: + _assert_repl_data_at_ts_2() + def test_GET_quarantine(self): # Test swift.obj.server.ObjectController.GET timestamp = normalize_timestamp(time()) diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index bec610c5a6..d8029d362b 100755 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -34,6 +34,8 @@ import swift from swift.common import utils, swob, exceptions from swift.common.exceptions import ChunkWriteTimeout from swift.common.header_key_dict import HeaderKeyDict +from swift.common.utils import Timestamp +from swift.obj import server from swift.proxy import server as proxy_server from swift.proxy.controllers import obj from swift.proxy.controllers.base import \ @@ -1372,11 +1374,13 @@ class TestReplicatedObjControllerVariousReplicas(BaseObjectControllerMixin, class StubResponse(object): - def __init__(self, status, body='', headers=None): + def __init__(self, status, body='', headers=None, frag_index=None): self.status = status self.body = body self.readable = BytesIO(body) self.headers = HeaderKeyDict(headers) + if frag_index is not None: + self.headers['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index fake_reason = ('Fake', 'This response is a lie.') self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0] @@ -1453,6 +1457,11 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): controller_cls = obj.ECObjectController + def _add_frag_index(self, index, headers): + # helper method to add a frag index header to an existing header dict + hdr_name = 'X-Object-Sysmeta-Ec-Frag-Index' + return dict(headers.items() + [(hdr_name, index)]) + def test_determine_chunk_destinations(self): class FakePutter(object): def __init__(self, index): @@ -2213,11 +2222,13 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): for fragments in zip(*fragment_payloads)] return ec_archive_bodies - def _make_ec_object_stub(self, test_body=None, policy=None): + def _make_ec_object_stub(self, test_body=None, policy=None, + timestamp=None): policy = policy or self.policy segment_size = policy.ec_segment_size test_body = test_body or ( - 'test' * segment_size)[:-random.randint(0, 1000)] + 'test' * segment_size)[:-random.randint(1, 1000)] + timestamp = timestamp or Timestamp(time.time()) etag = md5(test_body).hexdigest() ec_archive_bodies = self._make_ec_archive_bodies(test_body, policy=policy) @@ -2225,19 +2236,34 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): 'body': test_body, 'etag': etag, 'frags': ec_archive_bodies, + 'timestamp': timestamp } def _fake_ec_node_response(self, node_frags): """ - Given a list of entries for each node in ring order, where the - entries are a dict (or list of dicts) which describe all of the - fragment(s); create a function suitable for use with - capture_http_requests that will accept a req object and return a - response that will suitably fake the behavior of an object - server who had the given fragments on disk at the time. + Given a list of entries for each node in ring order, where the entries + are a dict (or list of dicts) which describes the fragment (or + fragments) that are on the node; create a function suitable for use + with capture_http_requests that will accept a req object and return a + response that will suitably fake the behavior of an object server who + had the given fragments on disk at the time. + + :param node_frags: a list. Each item in the list describes the + fragments that are on a node; each item is a dict or list of dicts, + each dict describing a single fragment; where the item is a list, + repeated calls to get_response will return fragments in the order + of the list; each dict has keys: + - obj: an object stub, as generated by _make_ec_object_stub, + that defines all of the fragments that compose an object + at a specific timestamp. + - frag: the index of a fragment to be selected from the object + stub + - durable (optional): True if the selected fragment is durable + """ - node_map = {} + node_map = {} # maps node ip and port to node index all_nodes = [] + call_count = {} # maps node index to get_response call count for node def _build_node_map(req): node_key = lambda n: (n['ip'], n['port']) @@ -2248,6 +2274,7 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): all_nodes.extend(policy.object_ring.get_more_nodes(part)) for i, node in enumerate(all_nodes): node_map[node_key(node)] = i + call_count[i] = 0 # normalize node_frags to a list of fragments for each node even # if there's only one fragment in the dataset provided. @@ -2264,31 +2291,55 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): except KeyError: raise Exception("Couldn't find node %s:%s in %r" % ( req['ip'], req['port'], all_nodes)) - try: frags = node_frags[node_index] - except KeyError: + except IndexError: raise Exception('Found node %r:%r at index %s - ' 'but only got %s stub response nodes' % ( req['ip'], req['port'], node_index, len(node_frags))) + if not frags: + return StubResponse(404) + + # determine response fragment (if any) for this call + resp_frag = frags[call_count[node_index]] + call_count[node_index] += 1 + frag_prefs = req['headers'].get('X-Backend-Fragment-Preferences') + if not (frag_prefs or resp_frag.get('durable', True)): + return StubResponse(404) + + # prepare durable timestamp and backend frags header for this node + obj_stub = resp_frag['obj'] + ts2frags = defaultdict(list) + durable_timestamp = None + for frag in frags: + ts_frag = frag['obj']['timestamp'] + if frag.get('durable', True): + durable_timestamp = ts_frag.internal + ts2frags[ts_frag].append(frag['frag']) + try: - stub = random.choice(frags) - except IndexError: - stub = None - if stub: - body = stub['obj']['frags'][stub['frag']] - headers = { - 'X-Object-Sysmeta-Ec-Content-Length': len( - stub['obj']['body']), - 'X-Object-Sysmeta-Ec-Etag': stub['obj']['etag'], - 'X-Object-Sysmeta-Ec-Frag-Index': stub['frag'], - } - resp = StubResponse(200, body, headers) - else: - resp = StubResponse(404) - return resp + body = obj_stub['frags'][resp_frag['frag']] + except IndexError as err: + raise Exception( + 'Frag index %s not defined: node index %s, frags %r\n%s' % + (resp_frag['frag'], node_index, [f['frag'] for f in frags], + err)) + headers = { + 'X-Object-Sysmeta-Ec-Content-Length': len(obj_stub['body']), + 'X-Object-Sysmeta-Ec-Etag': obj_stub['etag'], + 'X-Object-Sysmeta-Ec-Frag-Index': resp_frag['frag'], + 'X-Backend-Timestamp': obj_stub['timestamp'].internal, + 'X-Timestamp': obj_stub['timestamp'].normal, + 'X-Backend-Data-Timestamp': obj_stub['timestamp'].internal, + 'X-Backend-Fragments': + server._make_backend_fragments_header(ts2frags) + } + if durable_timestamp: + headers['X-Backend-Durable-Timestamp'] = durable_timestamp + + return StubResponse(200, body, headers) return get_response @@ -2301,11 +2352,15 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): _part, primary_nodes = self.obj_ring.get_nodes('a', 'c', 'o') node_key = lambda n: (n['ip'], n['port']) + + ts = self._ts_iter.next() response_map = { node_key(n): StubResponse(200, ec_archive_bodies[i], { 'X-Object-Sysmeta-Ec-Content-Length': len(test_data), 'X-Object-Sysmeta-Ec-Etag': etag, 'X-Object-Sysmeta-Ec-Frag-Index': i, + 'X-Timestamp': ts.normal, + 'X-Backend-Timestamp': ts.internal }) for i, n in enumerate(primary_nodes) } @@ -2330,6 +2385,62 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): self.assertEqual(len(response_map), len(primary_nodes) - self.policy.ec_ndata) + def test_GET_with_no_success(self): + node_frags = [[]] * 28 # no frags on any node + + fake_response = self._fake_ec_node_response(node_frags) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 404) + self.assertEqual(len(log), 2 * self.replicas()) + + def test_GET_with_only_handoffs(self): + obj1 = self._make_ec_object_stub() + + node_frags = [[]] * 14 # all primaries missing + node_frags = node_frags + [ # handoffs + {'obj': obj1, 'frag': 0}, + {'obj': obj1, 'frag': 1}, + {'obj': obj1, 'frag': 2}, + {'obj': obj1, 'frag': 3}, + {'obj': obj1, 'frag': 4}, + {'obj': obj1, 'frag': 5}, + {'obj': obj1, 'frag': 6}, + {'obj': obj1, 'frag': 7}, + {'obj': obj1, 'frag': 8}, + {'obj': obj1, 'frag': 9}, + {'obj': obj1, 'frag': 10}, # parity + {'obj': obj1, 'frag': 11}, # parity + {'obj': obj1, 'frag': 12}, # parity + {'obj': obj1, 'frag': 13}, # parity + ] + + fake_response = self._fake_ec_node_response(node_frags) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj1['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj1['etag']) + + collected_responses = defaultdict(list) + for conn in log: + etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] + index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index'] + collected_responses[etag].append(index) + + # GETS would be required to all primaries and then ndata handoffs + self.assertEqual(len(log), self.replicas() + self.policy.ec_ndata) + self.assertEqual(2, len(collected_responses)) + self.assertEqual(14, len(collected_responses[None])) # 404s + self.assertEqual(self.policy.ec_ndata, + len(collected_responses[obj1['etag']])) + def test_GET_with_single_missed_overwrite_does_not_need_handoff(self): obj1 = self._make_ec_object_stub() obj2 = self._make_ec_object_stub() @@ -2432,37 +2543,37 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): len(frags), etag)) def test_GET_with_missing_and_mixed_frags_will_dig_deep_but_succeed(self): - obj1 = self._make_ec_object_stub() - obj2 = self._make_ec_object_stub() + obj1 = self._make_ec_object_stub(timestamp=self.ts()) + obj2 = self._make_ec_object_stub(timestamp=self.ts()) node_frags = [ {'obj': obj1, 'frag': 0}, {'obj': obj2, 'frag': 0}, - {}, + [], {'obj': obj1, 'frag': 1}, {'obj': obj2, 'frag': 1}, - {}, + [], {'obj': obj1, 'frag': 2}, {'obj': obj2, 'frag': 2}, - {}, + [], {'obj': obj1, 'frag': 3}, {'obj': obj2, 'frag': 3}, - {}, + [], {'obj': obj1, 'frag': 4}, {'obj': obj2, 'frag': 4}, - {}, + [], {'obj': obj1, 'frag': 5}, {'obj': obj2, 'frag': 5}, - {}, + [], {'obj': obj1, 'frag': 6}, {'obj': obj2, 'frag': 6}, - {}, + [], {'obj': obj1, 'frag': 7}, {'obj': obj2, 'frag': 7}, - {}, + [], {'obj': obj1, 'frag': 8}, {'obj': obj2, 'frag': 8}, - {}, + [], {'obj': obj2, 'frag': 9}, ] @@ -2501,32 +2612,34 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): node_frags = [ {'obj': obj1, 'frag': 0}, {'obj': obj2, 'frag': 0}, - {}, + [], {'obj': obj1, 'frag': 1}, {'obj': obj2, 'frag': 1}, - {}, + [], {'obj': obj1, 'frag': 2}, {'obj': obj2, 'frag': 2}, - {}, + [], {'obj': obj1, 'frag': 3}, {'obj': obj2, 'frag': 3}, - {}, + [], {'obj': obj1, 'frag': 4}, {'obj': obj2, 'frag': 4}, - {}, + [], {'obj': obj1, 'frag': 5}, {'obj': obj2, 'frag': 5}, - {}, + [], {'obj': obj1, 'frag': 6}, {'obj': obj2, 'frag': 6}, - {}, + [], {'obj': obj1, 'frag': 7}, {'obj': obj2, 'frag': 7}, - {}, + [], {'obj': obj1, 'frag': 8}, {'obj': obj2, 'frag': 8}, - {}, - {}, + [], + # handoffs are iter'd in order so proxy will see 404 from this + # final handoff + [], ] fake_response = self._fake_ec_node_response(node_frags) @@ -2554,6 +2667,617 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): 'collected %s frags for etag %s' % ( len(frags), etag)) + def test_GET_with_duplicate_but_sufficient_frag_indexes(self): + obj1 = self._make_ec_object_stub() + # proxy should ignore duplicated frag indexes and continue search for + # a set of unique indexes, finding last one on a handoff + node_frags = [ + {'obj': obj1, 'frag': 0}, + {'obj': obj1, 'frag': 0}, # duplicate frag + {'obj': obj1, 'frag': 1}, + {'obj': obj1, 'frag': 1}, # duplicate frag + {'obj': obj1, 'frag': 2}, + {'obj': obj1, 'frag': 2}, # duplicate frag + {'obj': obj1, 'frag': 3}, + {'obj': obj1, 'frag': 3}, # duplicate frag + {'obj': obj1, 'frag': 4}, + {'obj': obj1, 'frag': 4}, # duplicate frag + {'obj': obj1, 'frag': 10}, + {'obj': obj1, 'frag': 11}, + {'obj': obj1, 'frag': 12}, + {'obj': obj1, 'frag': 13}, + {'obj': obj1, 'frag': 5}, # handoff + ] + + fake_response = self._fake_ec_node_response(node_frags) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj1['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj1['etag']) + + # expect a request to all primaries plus one handoff + self.assertEqual(self.replicas() + 1, len(log)) + collected_indexes = defaultdict(list) + for conn in log: + fi = conn.resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') + if fi is not None: + collected_indexes[fi].append(conn) + self.assertEqual(len(collected_indexes), self.policy.ec_ndata) + + def test_GET_with_duplicate_and_hidden_frag_indexes(self): + obj1 = self._make_ec_object_stub() + # proxy should ignore duplicated frag indexes and continue search for + # a set of unique indexes, finding last one on a handoff + node_frags = [ + [{'obj': obj1, 'frag': 0}, {'obj': obj1, 'frag': 5}], + {'obj': obj1, 'frag': 0}, # duplicate frag + {'obj': obj1, 'frag': 1}, + {'obj': obj1, 'frag': 1}, # duplicate frag + {'obj': obj1, 'frag': 2}, + {'obj': obj1, 'frag': 2}, # duplicate frag + {'obj': obj1, 'frag': 3}, + {'obj': obj1, 'frag': 3}, # duplicate frag + {'obj': obj1, 'frag': 4}, + {'obj': obj1, 'frag': 4}, # duplicate frag + {'obj': obj1, 'frag': 10}, + {'obj': obj1, 'frag': 11}, + {'obj': obj1, 'frag': 12}, + {'obj': obj1, 'frag': 13}, + ] + + fake_response = self._fake_ec_node_response(node_frags) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj1['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj1['etag']) + + # Expect a maximum of one request to each primary plus one extra + # request to node 1. Actual value could be less if the extra request + # occurs and quorum is reached before requests to nodes with a + # duplicate frag. + self.assertLessEqual(len(log), self.replicas() + 1) + collected_indexes = defaultdict(list) + for conn in log: + fi = conn.resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') + if fi is not None: + collected_indexes[fi].append(conn) + self.assertEqual(len(collected_indexes), self.policy.ec_ndata) + + def test_GET_with_duplicate_but_insufficient_frag(self): + obj1 = self._make_ec_object_stub() + # proxy should ignore duplicated frag indexes and continue search for + # a set of unique indexes, but fails to find one + node_frags = [ + {'obj': obj1, 'frag': 0}, + {'obj': obj1, 'frag': 0}, # duplicate frag + {'obj': obj1, 'frag': 1}, + {'obj': obj1, 'frag': 1}, # duplicate frag + {'obj': obj1, 'frag': 2}, + {'obj': obj1, 'frag': 2}, # duplicate frag + {'obj': obj1, 'frag': 3}, + {'obj': obj1, 'frag': 3}, # duplicate frag + {'obj': obj1, 'frag': 4}, + {'obj': obj1, 'frag': 4}, # duplicate frag + {'obj': obj1, 'frag': 10}, + {'obj': obj1, 'frag': 11}, + {'obj': obj1, 'frag': 12}, + {'obj': obj1, 'frag': 13}, + ] + [[]] * 14 # 404 from handoffs + + fake_response = self._fake_ec_node_response(node_frags) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 404) + + # expect a request to all nodes + self.assertEqual(2 * self.replicas(), len(log)) + collected_indexes = defaultdict(list) + collected_etags = set() + for conn in log: + etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] + collected_etags.add(etag) # will be None from handoffs + fi = conn.resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') + if fi is not None: + collected_indexes[fi].append(conn) + self.assertEqual(len(collected_indexes), self.policy.ec_ndata - 1) + self.assertEqual({obj1['etag'], None}, collected_etags) + + def test_GET_with_missing_and_mixed_frags_may_503(self): + obj1 = self._make_ec_object_stub() + obj2 = self._make_ec_object_stub() + # we get a 503 when all the handoffs return 200 + node_frags = [[]] * 14 # primaries have no frags + node_frags = node_frags + [ # handoffs all have frags + {'obj': obj1, 'frag': 0}, + {'obj': obj2, 'frag': 0}, + {'obj': obj1, 'frag': 1}, + {'obj': obj2, 'frag': 1}, + {'obj': obj1, 'frag': 2}, + {'obj': obj2, 'frag': 2}, + {'obj': obj1, 'frag': 3}, + {'obj': obj2, 'frag': 3}, + {'obj': obj1, 'frag': 4}, + {'obj': obj2, 'frag': 4}, + {'obj': obj1, 'frag': 5}, + {'obj': obj2, 'frag': 5}, + {'obj': obj1, 'frag': 6}, + {'obj': obj2, 'frag': 6}, + ] + fake_response = self._fake_ec_node_response(node_frags) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 503) + # never get a quorum so all nodes are searched + self.assertEqual(len(log), 2 * self.replicas()) + collected_indexes = defaultdict(list) + for conn in log: + fi = conn.resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') + if fi is not None: + collected_indexes[fi].append(conn) + self.assertEqual(len(collected_indexes), 7) + + def test_GET_with_mixed_frags_and_no_quorum_will_503(self): + # all nodes have a frag but there is no one set that reaches quorum, + # which means there is no backend 404 response, but proxy should still + # return 404 rather than 503 + obj1 = self._make_ec_object_stub() + obj2 = self._make_ec_object_stub() + obj3 = self._make_ec_object_stub() + obj4 = self._make_ec_object_stub() + + node_frags = [ + {'obj': obj1, 'frag': 0}, + {'obj': obj2, 'frag': 0}, + {'obj': obj3, 'frag': 0}, + {'obj': obj1, 'frag': 1}, + {'obj': obj2, 'frag': 1}, + {'obj': obj3, 'frag': 1}, + {'obj': obj1, 'frag': 2}, + {'obj': obj2, 'frag': 2}, + {'obj': obj3, 'frag': 2}, + {'obj': obj1, 'frag': 3}, + {'obj': obj2, 'frag': 3}, + {'obj': obj3, 'frag': 3}, + {'obj': obj1, 'frag': 4}, + {'obj': obj2, 'frag': 4}, + {'obj': obj3, 'frag': 4}, + {'obj': obj1, 'frag': 5}, + {'obj': obj2, 'frag': 5}, + {'obj': obj3, 'frag': 5}, + {'obj': obj1, 'frag': 6}, + {'obj': obj2, 'frag': 6}, + {'obj': obj3, 'frag': 6}, + {'obj': obj1, 'frag': 7}, + {'obj': obj2, 'frag': 7}, + {'obj': obj3, 'frag': 7}, + {'obj': obj1, 'frag': 8}, + {'obj': obj2, 'frag': 8}, + {'obj': obj3, 'frag': 8}, + {'obj': obj4, 'frag': 8}, + ] + + fake_response = self._fake_ec_node_response(node_frags) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 503) + + collected_etags = set() + collected_status = set() + for conn in log: + etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] + collected_etags.add(etag) + collected_status.add(conn.resp.status) + + # default node_iter will exhaust at 2 * replicas + self.assertEqual(len(log), 2 * self.replicas()) + self.assertEqual( + {obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']}, + collected_etags) + self.assertEqual({200}, collected_status) + + def test_GET_with_quorum_durable_files(self): + # verify that only (ec_nparity + 1) nodes need to be durable for a GET + # to be completed with ec_ndata requests. + obj1 = self._make_ec_object_stub() + + node_frags = [ + {'obj': obj1, 'frag': 0, 'durable': True}, # durable + {'obj': obj1, 'frag': 1, 'durable': True}, # durable + {'obj': obj1, 'frag': 2, 'durable': True}, # durable + {'obj': obj1, 'frag': 3, 'durable': True}, # durable + {'obj': obj1, 'frag': 4, 'durable': True}, # durable + {'obj': obj1, 'frag': 5, 'durable': False}, + {'obj': obj1, 'frag': 6, 'durable': False}, + {'obj': obj1, 'frag': 7, 'durable': False}, + {'obj': obj1, 'frag': 8, 'durable': False}, + {'obj': obj1, 'frag': 9, 'durable': False}, + {'obj': obj1, 'frag': 10, 'durable': False}, # parity + {'obj': obj1, 'frag': 11, 'durable': False}, # parity + {'obj': obj1, 'frag': 12, 'durable': False}, # parity + {'obj': obj1, 'frag': 13, 'durable': False}, # parity + ] # handoffs not used in this scenario + + fake_response = self._fake_ec_node_response(list(node_frags)) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj1['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj1['etag']) + + self.assertEqual(self.policy.ec_ndata, len(log)) + collected_durables = [] + for conn in log: + if (conn.resp.headers.get('X-Backend-Durable-Timestamp') + == conn.resp.headers.get('X-Backend-Data-Timestamp')): + collected_durables.append(conn) + # because nodes are shuffled we can't be sure how many durables are + # returned but it must be at least 1 and cannot exceed 5 + self.assertLessEqual(len(collected_durables), 5) + self.assertGreaterEqual(len(collected_durables), 1) + + def test_GET_with_single_durable_file(self): + # verify that a single durable is sufficient for a GET + # to be completed with ec_ndata requests. + obj1 = self._make_ec_object_stub() + + node_frags = [ + {'obj': obj1, 'frag': 0, 'durable': True}, # durable + {'obj': obj1, 'frag': 1, 'durable': False}, + {'obj': obj1, 'frag': 2, 'durable': False}, + {'obj': obj1, 'frag': 3, 'durable': False}, + {'obj': obj1, 'frag': 4, 'durable': False}, + {'obj': obj1, 'frag': 5, 'durable': False}, + {'obj': obj1, 'frag': 6, 'durable': False}, + {'obj': obj1, 'frag': 7, 'durable': False}, + {'obj': obj1, 'frag': 8, 'durable': False}, + {'obj': obj1, 'frag': 9, 'durable': False}, + {'obj': obj1, 'frag': 10, 'durable': False}, # parity + {'obj': obj1, 'frag': 11, 'durable': False}, # parity + {'obj': obj1, 'frag': 12, 'durable': False}, # parity + {'obj': obj1, 'frag': 13, 'durable': False}, # parity + ] # handoffs not used in this scenario + + fake_response = self._fake_ec_node_response(list(node_frags)) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj1['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj1['etag']) + + collected_durables = [] + for conn in log: + if (conn.resp.headers.get('X-Backend-Durable-Timestamp') + == conn.resp.headers.get('X-Backend-Data-Timestamp')): + collected_durables.append(conn) + # because nodes are shuffled we can't be sure how many non-durables + # are returned before the durable, but we do expect a single durable + self.assertEqual(1, len(collected_durables)) + + def test_GET_with_no_durable_files(self): + # verify that at least one durable is necessary for a successful GET + obj1 = self._make_ec_object_stub() + node_frags = [ + {'obj': obj1, 'frag': 0, 'durable': False}, + {'obj': obj1, 'frag': 1, 'durable': False}, + {'obj': obj1, 'frag': 2, 'durable': False}, + {'obj': obj1, 'frag': 3, 'durable': False}, + {'obj': obj1, 'frag': 4, 'durable': False}, + {'obj': obj1, 'frag': 5, 'durable': False}, + {'obj': obj1, 'frag': 6, 'durable': False}, + {'obj': obj1, 'frag': 7, 'durable': False}, + {'obj': obj1, 'frag': 8, 'durable': False}, + {'obj': obj1, 'frag': 9, 'durable': False}, + {'obj': obj1, 'frag': 10, 'durable': False}, # parity + {'obj': obj1, 'frag': 11, 'durable': False}, # parity + {'obj': obj1, 'frag': 12, 'durable': False}, # parity + {'obj': obj1, 'frag': 13, 'durable': False}, # parity + ] + [[]] * self.replicas() # handoffs + + fake_response = self._fake_ec_node_response(list(node_frags)) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 404) + # all 28 nodes tried with an optimistic get, none are durable and none + # report having a durable timestamp + self.assertEqual(28, len(log)) + + def test_GET_with_missing_durable_files_and_mixed_etags(self): + obj1 = self._make_ec_object_stub() + obj2 = self._make_ec_object_stub() + + # non-quorate durables for another object won't stop us finding the + # quorate object + node_frags = [ + # ec_ndata - 1 frags of obj2 are available and durable + {'obj': obj2, 'frag': 0, 'durable': True}, + {'obj': obj2, 'frag': 1, 'durable': True}, + {'obj': obj2, 'frag': 2, 'durable': True}, + {'obj': obj2, 'frag': 3, 'durable': True}, + {'obj': obj2, 'frag': 4, 'durable': True}, + {'obj': obj2, 'frag': 5, 'durable': True}, + {'obj': obj2, 'frag': 6, 'durable': True}, + {'obj': obj2, 'frag': 7, 'durable': True}, + {'obj': obj2, 'frag': 8, 'durable': True}, + # ec_ndata frags of obj1 are available and one is durable + {'obj': obj1, 'frag': 0, 'durable': False}, + {'obj': obj1, 'frag': 1, 'durable': False}, + {'obj': obj1, 'frag': 2, 'durable': False}, + {'obj': obj1, 'frag': 3, 'durable': False}, + {'obj': obj1, 'frag': 4, 'durable': False}, + {'obj': obj1, 'frag': 5, 'durable': False}, + {'obj': obj1, 'frag': 6, 'durable': False}, + {'obj': obj1, 'frag': 7, 'durable': False}, + {'obj': obj1, 'frag': 8, 'durable': False}, + {'obj': obj1, 'frag': 9, 'durable': True}, + ] + + fake_response = self._fake_ec_node_response(list(node_frags)) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj1['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj1['etag']) + + # Quorum of non-durables for a different object won't + # prevent us hunting down the durable object + node_frags = [ + # primaries + {'obj': obj2, 'frag': 0, 'durable': False}, + {'obj': obj2, 'frag': 1, 'durable': False}, + {'obj': obj2, 'frag': 2, 'durable': False}, + {'obj': obj2, 'frag': 3, 'durable': False}, + {'obj': obj2, 'frag': 4, 'durable': False}, + {'obj': obj2, 'frag': 5, 'durable': False}, + {'obj': obj2, 'frag': 6, 'durable': False}, + {'obj': obj2, 'frag': 7, 'durable': False}, + {'obj': obj2, 'frag': 8, 'durable': False}, + {'obj': obj2, 'frag': 9, 'durable': False}, + {'obj': obj2, 'frag': 10, 'durable': False}, + {'obj': obj2, 'frag': 11, 'durable': False}, + {'obj': obj2, 'frag': 12, 'durable': False}, + {'obj': obj2, 'frag': 13, 'durable': False}, + # handoffs + {'obj': obj1, 'frag': 0, 'durable': False}, + {'obj': obj1, 'frag': 1, 'durable': False}, + {'obj': obj1, 'frag': 2, 'durable': False}, + {'obj': obj1, 'frag': 3, 'durable': False}, + {'obj': obj1, 'frag': 4, 'durable': False}, + {'obj': obj1, 'frag': 5, 'durable': False}, + {'obj': obj1, 'frag': 6, 'durable': False}, + {'obj': obj1, 'frag': 7, 'durable': False}, + {'obj': obj1, 'frag': 8, 'durable': False}, + {'obj': obj1, 'frag': 9, 'durable': False}, + {'obj': obj1, 'frag': 10, 'durable': False}, # parity + {'obj': obj1, 'frag': 11, 'durable': False}, # parity + {'obj': obj1, 'frag': 12, 'durable': False}, # parity + {'obj': obj1, 'frag': 13, 'durable': True}, # parity + ] + + fake_response = self._fake_ec_node_response(list(node_frags)) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj1['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj1['etag']) + + def test_GET_with_missing_durables_and_older_durables(self): + # scenario: non-durable frags of newer obj1 obscure all durable frags + # of older obj2, so first 14 requests result in a non-durable set. + # At that point (or before) the proxy knows that a durable set of + # frags for obj2 exists so will fetch them, requiring another 10 + # directed requests. + obj2 = self._make_ec_object_stub(timestamp=self._ts_iter.next()) + obj1 = self._make_ec_object_stub(timestamp=self._ts_iter.next()) + + node_frags = [ + [{'obj': obj1, 'frag': 0, 'durable': False}], # obj2 missing + [{'obj': obj1, 'frag': 1, 'durable': False}, + {'obj': obj2, 'frag': 1, 'durable': True}], + [{'obj': obj1, 'frag': 2, 'durable': False}], # obj2 missing + [{'obj': obj1, 'frag': 3, 'durable': False}, + {'obj': obj2, 'frag': 3, 'durable': True}], + [{'obj': obj1, 'frag': 4, 'durable': False}, + {'obj': obj2, 'frag': 4, 'durable': True}], + [{'obj': obj1, 'frag': 5, 'durable': False}, + {'obj': obj2, 'frag': 5, 'durable': True}], + [{'obj': obj1, 'frag': 6, 'durable': False}, + {'obj': obj2, 'frag': 6, 'durable': True}], + [{'obj': obj1, 'frag': 7, 'durable': False}, + {'obj': obj2, 'frag': 7, 'durable': True}], + [{'obj': obj1, 'frag': 8, 'durable': False}, + {'obj': obj2, 'frag': 8, 'durable': True}], + [{'obj': obj1, 'frag': 9, 'durable': False}], # obj2 missing + [{'obj': obj1, 'frag': 10, 'durable': False}, + {'obj': obj2, 'frag': 10, 'durable': True}], + [{'obj': obj1, 'frag': 11, 'durable': False}, + {'obj': obj2, 'frag': 11, 'durable': True}], + [{'obj': obj1, 'frag': 12, 'durable': False}], # obj2 missing + [{'obj': obj1, 'frag': 13, 'durable': False}, + {'obj': obj2, 'frag': 13, 'durable': True}], + ] + + fake_response = self._fake_ec_node_response(list(node_frags)) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj2['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj2['etag']) + # max: proxy will GET all non-durable obj1 frags and then 10 obj frags + self.assertLessEqual(len(log), self.replicas() + self.policy.ec_ndata) + # min: proxy will GET 10 non-durable obj1 frags and then 10 obj frags + self.assertGreaterEqual(len(log), 2 * self.policy.ec_ndata) + + # scenario: obj3 has 14 frags but only 2 are durable and these are + # obscured by two non-durable frags of obj1. There is also a single + # non-durable frag of obj2. The proxy will need to do at least 10 + # GETs to see all the obj3 frags plus 1 more to GET a durable frag. + # The proxy may also do one more GET if the obj2 frag is found. + obj2 = self._make_ec_object_stub(timestamp=self._ts_iter.next()) + obj3 = self._make_ec_object_stub(timestamp=self._ts_iter.next()) + obj1 = self._make_ec_object_stub(timestamp=self._ts_iter.next()) + + node_frags = [ + [{'obj': obj1, 'frag': 0, 'durable': False}, # obj1 frag + {'obj': obj3, 'frag': 0, 'durable': True}], + [{'obj': obj1, 'frag': 1, 'durable': False}, # obj1 frag + {'obj': obj3, 'frag': 1, 'durable': True}], + [{'obj': obj2, 'frag': 2, 'durable': False}, # obj2 frag + {'obj': obj3, 'frag': 2, 'durable': False}], + [{'obj': obj3, 'frag': 3, 'durable': False}], + [{'obj': obj3, 'frag': 4, 'durable': False}], + [{'obj': obj3, 'frag': 5, 'durable': False}], + [{'obj': obj3, 'frag': 6, 'durable': False}], + [{'obj': obj3, 'frag': 7, 'durable': False}], + [{'obj': obj3, 'frag': 8, 'durable': False}], + [{'obj': obj3, 'frag': 9, 'durable': False}], + [{'obj': obj3, 'frag': 10, 'durable': False}], + [{'obj': obj3, 'frag': 11, 'durable': False}], + [{'obj': obj3, 'frag': 12, 'durable': False}], + [{'obj': obj3, 'frag': 13, 'durable': False}], + ] + + fake_response = self._fake_ec_node_response(list(node_frags)) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj3['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj3['etag']) + self.assertGreaterEqual(len(log), self.policy.ec_ndata + 1) + self.assertLessEqual(len(log), self.policy.ec_ndata + 3) + + def test_GET_with_missing_durables_and_older_non_durables(self): + # scenario: non-durable frags of newer obj1 obscure all frags + # of older obj2, so first 28 requests result in a non-durable set. + # There are only 10 frags for obj2 and one is not durable. + obj2 = self._make_ec_object_stub(timestamp=self._ts_iter.next()) + obj1 = self._make_ec_object_stub(timestamp=self._ts_iter.next()) + + node_frags = [ + [{'obj': obj1, 'frag': 0, 'durable': False}], # obj2 missing + [{'obj': obj1, 'frag': 1, 'durable': False}, + {'obj': obj2, 'frag': 1, 'durable': False}], # obj2 non-durable + [{'obj': obj1, 'frag': 2, 'durable': False}], # obj2 missing + [{'obj': obj1, 'frag': 3, 'durable': False}, + {'obj': obj2, 'frag': 3, 'durable': True}], + [{'obj': obj1, 'frag': 4, 'durable': False}, + {'obj': obj2, 'frag': 4, 'durable': True}], + [{'obj': obj1, 'frag': 5, 'durable': False}, + {'obj': obj2, 'frag': 5, 'durable': True}], + [{'obj': obj1, 'frag': 6, 'durable': False}, + {'obj': obj2, 'frag': 6, 'durable': True}], + [{'obj': obj1, 'frag': 7, 'durable': False}, + {'obj': obj2, 'frag': 7, 'durable': True}], + [{'obj': obj1, 'frag': 8, 'durable': False}, + {'obj': obj2, 'frag': 8, 'durable': True}], + [{'obj': obj1, 'frag': 9, 'durable': False}], # obj2 missing + [{'obj': obj1, 'frag': 10, 'durable': False}, + {'obj': obj2, 'frag': 10, 'durable': True}], + [{'obj': obj1, 'frag': 11, 'durable': False}, + {'obj': obj2, 'frag': 11, 'durable': True}], + [{'obj': obj1, 'frag': 12, 'durable': False}], # obj2 missing + [{'obj': obj1, 'frag': 13, 'durable': False}, + {'obj': obj2, 'frag': 13, 'durable': True}], + [], # 1 empty primary + ] + + fake_response = self._fake_ec_node_response(list(node_frags)) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj2['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj2['etag']) + # max: proxy will GET all non-durable obj1 frags and then 10 obj2 frags + self.assertLessEqual(len(log), self.replicas() + self.policy.ec_ndata) + # min: proxy will GET 10 non-durable obj1 frags and then 10 obj2 frags + self.assertGreaterEqual(len(log), 2 * self.policy.ec_ndata) + + def test_GET_with_mixed_etags_at_same_timestamp(self): + # this scenario should never occur but if there are somehow + # fragments for different content at the same timestamp then the + # object controller should handle it gracefully + ts = self.ts() # force equal timestamps for two objects + obj1 = self._make_ec_object_stub(timestamp=ts) + obj2 = self._make_ec_object_stub(timestamp=ts) + + node_frags = [ + # 7 frags of obj2 are available and durable + {'obj': obj2, 'frag': 0, 'durable': True}, + {'obj': obj2, 'frag': 1, 'durable': True}, + {'obj': obj2, 'frag': 2, 'durable': True}, + {'obj': obj2, 'frag': 3, 'durable': True}, + {'obj': obj2, 'frag': 4, 'durable': True}, + {'obj': obj2, 'frag': 5, 'durable': True}, + {'obj': obj2, 'frag': 6, 'durable': True}, + # 7 frags of obj1 are available and durable + {'obj': obj1, 'frag': 7, 'durable': True}, + {'obj': obj1, 'frag': 8, 'durable': True}, + {'obj': obj1, 'frag': 9, 'durable': True}, + {'obj': obj1, 'frag': 10, 'durable': True}, + {'obj': obj1, 'frag': 11, 'durable': True}, + {'obj': obj1, 'frag': 12, 'durable': True}, + {'obj': obj1, 'frag': 13, 'durable': True}, + ] + [[]] * self.replicas() # handoffs + + fake_response = self._fake_ec_node_response(list(node_frags)) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + # read body to provoke any EC decode errors + self.assertFalse(resp.body) + + self.assertEqual(resp.status_int, 404) + self.assertEqual(len(log), self.replicas() * 2) + collected_etags = set() + for conn in log: + etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] + collected_etags.add(etag) # will be None from handoffs + self.assertEqual({obj1['etag'], obj2['etag'], None}, collected_etags) + log_lines = self.app.logger.get_lines_for_level('error') + self.assertEqual(log_lines, + ['Problem with fragment response: ETag mismatch'] * 7) + def test_GET_mixed_success_with_range(self): fragment_size = self.policy.fragment_size @@ -2568,22 +3292,23 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): frag_archive_size), 'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']), 'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'], + 'X-Timestamp': Timestamp(self._ts_iter.next()).normal, } responses = [ - StubResponse(206, frag_archives[0][:fragment_size], headers), - StubResponse(206, frag_archives[1][:fragment_size], headers), - StubResponse(206, frag_archives[2][:fragment_size], headers), - StubResponse(206, frag_archives[3][:fragment_size], headers), - StubResponse(206, frag_archives[4][:fragment_size], headers), + StubResponse(206, frag_archives[0][:fragment_size], headers, 0), + StubResponse(206, frag_archives[1][:fragment_size], headers, 1), + StubResponse(206, frag_archives[2][:fragment_size], headers, 2), + StubResponse(206, frag_archives[3][:fragment_size], headers, 3), + StubResponse(206, frag_archives[4][:fragment_size], headers, 4), # data nodes with old frag - StubResponse(416), - StubResponse(416), - StubResponse(206, frag_archives[7][:fragment_size], headers), - StubResponse(206, frag_archives[8][:fragment_size], headers), - StubResponse(206, frag_archives[9][:fragment_size], headers), + StubResponse(416, frag_index=5), + StubResponse(416, frag_index=6), + StubResponse(206, frag_archives[7][:fragment_size], headers, 7), + StubResponse(206, frag_archives[8][:fragment_size], headers, 8), + StubResponse(206, frag_archives[9][:fragment_size], headers, 9), # hopefully we ask for two more - StubResponse(206, frag_archives[10][:fragment_size], headers), - StubResponse(206, frag_archives[11][:fragment_size], headers), + StubResponse(206, frag_archives[10][:fragment_size], headers, 10), + StubResponse(206, frag_archives[11][:fragment_size], headers, 11), ] def get_response(req): @@ -2597,22 +3322,50 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.body, 'test') self.assertEqual(len(log), self.policy.ec_ndata + 2) + # verify that even when last responses to be collected are 416's + # the shortfall of 2xx responses still triggers extra spawned requests + responses = [ + StubResponse(206, frag_archives[0][:fragment_size], headers, 0), + StubResponse(206, frag_archives[1][:fragment_size], headers, 1), + StubResponse(206, frag_archives[2][:fragment_size], headers, 2), + StubResponse(206, frag_archives[3][:fragment_size], headers, 3), + StubResponse(206, frag_archives[4][:fragment_size], headers, 4), + StubResponse(206, frag_archives[7][:fragment_size], headers, 7), + StubResponse(206, frag_archives[8][:fragment_size], headers, 8), + StubResponse(206, frag_archives[9][:fragment_size], headers, 9), + StubResponse(206, frag_archives[10][:fragment_size], headers, 10), + # data nodes with old frag + StubResponse(416, frag_index=5), + # hopefully we ask for one more + StubResponse(416, frag_index=6), + # and hopefully we ask for another + StubResponse(206, frag_archives[11][:fragment_size], headers, 11), + ] + + req = swob.Request.blank('/v1/a/c/o', headers={'Range': 'bytes=0-3'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 206) + self.assertEqual(resp.body, 'test') + self.assertEqual(len(log), self.policy.ec_ndata + 2) + def test_GET_with_range_unsatisfiable_mixed_success(self): responses = [ - StubResponse(416), - StubResponse(416), - StubResponse(416), - StubResponse(416), - StubResponse(416), - StubResponse(416), - StubResponse(416), + StubResponse(416, frag_index=0), + StubResponse(416, frag_index=1), + StubResponse(416, frag_index=2), + StubResponse(416, frag_index=3), + StubResponse(416, frag_index=4), + StubResponse(416, frag_index=5), + StubResponse(416, frag_index=6), # sneak in bogus extra responses StubResponse(404), - StubResponse(206), + StubResponse(206, frag_index=8), # and then just "enough" more 416's - StubResponse(416), - StubResponse(416), - StubResponse(416), + StubResponse(416, frag_index=9), + StubResponse(416, frag_index=10), + StubResponse(416, frag_index=11), ] def get_response(req): @@ -2627,9 +3380,83 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): # ec_ndata responses that must agree, plus the bogus extras self.assertEqual(len(log), self.policy.ec_ndata + 2) + def test_GET_with_missing_and_range_unsatisifiable(self): + responses = [ # not quite ec_ndata frags on primaries + StubResponse(416, frag_index=0), + StubResponse(416, frag_index=1), + StubResponse(416, frag_index=2), + StubResponse(416, frag_index=3), + StubResponse(416, frag_index=4), + StubResponse(416, frag_index=5), + StubResponse(416, frag_index=6), + StubResponse(416, frag_index=7), + StubResponse(416, frag_index=8), + ] + + def get_response(req): + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=%s-' % 100000000000000}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + + # TODO: does 416 make sense without a quorum, or should this be a 404? + # a non-range GET of same object would return 404 + self.assertEqual(resp.status_int, 416) + self.assertEqual(len(log), 2 * self.replicas()) + + def test_GET_with_success_and_507_will_503(self): + responses = [ # only 9 good nodes + StubResponse(200), + StubResponse(200), + StubResponse(200), + StubResponse(200), + StubResponse(200), + StubResponse(200), + StubResponse(200), + StubResponse(200), + StubResponse(200), + ] + + def get_response(req): + # bad disk on all other nodes + return responses.pop(0) if responses else StubResponse(507) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 503) + self.assertEqual(len(log), 2 * self.replicas()) + + def test_GET_with_success_and_404_will_404(self): + responses = [ # only 9 good nodes + StubResponse(200), + StubResponse(200), + StubResponse(200), + StubResponse(200), + StubResponse(200), + StubResponse(200), + StubResponse(200), + StubResponse(200), + StubResponse(200), + ] + + def get_response(req): + # no frags on other nodes + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 404) + self.assertEqual(len(log), 2 * self.replicas()) + def test_GET_mixed_ranged_responses_success(self): segment_size = self.policy.ec_segment_size - fragment_size = self.policy.fragment_size + frag_size = self.policy.fragment_size new_data = ('test' * segment_size)[:-492] new_etag = md5(new_data).hexdigest() new_archives = self._make_ec_archive_bodies(new_data) @@ -2638,45 +3465,50 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): old_archives = self._make_ec_archive_bodies(old_data) frag_archive_size = len(new_archives[0]) - new_headers = { - 'Content-Type': 'text/plain', - 'Content-Length': fragment_size, - 'Content-Range': 'bytes 0-%s/%s' % (fragment_size - 1, - frag_archive_size), - 'X-Object-Sysmeta-Ec-Content-Length': len(new_data), - 'X-Object-Sysmeta-Ec-Etag': new_etag, - } + # here we deliberately omit X-Backend-Data-Timestamp to check that + # proxy will tolerate responses from object server that have not been + # upgraded to send that header old_headers = { 'Content-Type': 'text/plain', - 'Content-Length': fragment_size, - 'Content-Range': 'bytes 0-%s/%s' % (fragment_size - 1, + 'Content-Length': frag_size, + 'Content-Range': 'bytes 0-%s/%s' % (frag_size - 1, frag_archive_size), 'X-Object-Sysmeta-Ec-Content-Length': len(old_data), 'X-Object-Sysmeta-Ec-Etag': old_etag, + 'X-Backend-Timestamp': Timestamp(self._ts_iter.next()).internal + } + new_headers = { + 'Content-Type': 'text/plain', + 'Content-Length': frag_size, + 'Content-Range': 'bytes 0-%s/%s' % (frag_size - 1, + frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(new_data), + 'X-Object-Sysmeta-Ec-Etag': new_etag, + 'X-Backend-Timestamp': Timestamp(self._ts_iter.next()).internal } # 7 primaries with stale frags, 3 handoffs failed to get new frags responses = [ - StubResponse(206, old_archives[0][:fragment_size], old_headers), - StubResponse(206, new_archives[1][:fragment_size], new_headers), - StubResponse(206, old_archives[2][:fragment_size], old_headers), - StubResponse(206, new_archives[3][:fragment_size], new_headers), - StubResponse(206, old_archives[4][:fragment_size], old_headers), - StubResponse(206, new_archives[5][:fragment_size], new_headers), - StubResponse(206, old_archives[6][:fragment_size], old_headers), - StubResponse(206, new_archives[7][:fragment_size], new_headers), - StubResponse(206, old_archives[8][:fragment_size], old_headers), - StubResponse(206, new_archives[9][:fragment_size], new_headers), - StubResponse(206, old_archives[10][:fragment_size], old_headers), - StubResponse(206, new_archives[11][:fragment_size], new_headers), - StubResponse(206, old_archives[12][:fragment_size], old_headers), - StubResponse(206, new_archives[13][:fragment_size], new_headers), - StubResponse(206, new_archives[0][:fragment_size], new_headers), + StubResponse(206, old_archives[0][:frag_size], old_headers, 0), + StubResponse(206, new_archives[1][:frag_size], new_headers, 1), + StubResponse(206, old_archives[2][:frag_size], old_headers, 2), + StubResponse(206, new_archives[3][:frag_size], new_headers, 3), + StubResponse(206, old_archives[4][:frag_size], old_headers, 4), + StubResponse(206, new_archives[5][:frag_size], new_headers, 5), + StubResponse(206, old_archives[6][:frag_size], old_headers, 6), + StubResponse(206, new_archives[7][:frag_size], new_headers, 7), + StubResponse(206, old_archives[8][:frag_size], old_headers, 8), + StubResponse(206, new_archives[9][:frag_size], new_headers, 9), + StubResponse(206, old_archives[10][:frag_size], old_headers, 10), + StubResponse(206, new_archives[11][:frag_size], new_headers, 11), + StubResponse(206, old_archives[12][:frag_size], old_headers, 12), + StubResponse(206, new_archives[13][:frag_size], new_headers, 13), + StubResponse(206, new_archives[0][:frag_size], new_headers, 0), StubResponse(404), StubResponse(404), - StubResponse(206, new_archives[6][:fragment_size], new_headers), + StubResponse(206, new_archives[6][:frag_size], new_headers, 6), StubResponse(404), - StubResponse(206, new_archives[10][:fragment_size], new_headers), - StubResponse(206, new_archives[12][:fragment_size], new_headers), + StubResponse(206, new_archives[10][:frag_size], new_headers, 10), + StubResponse(206, new_archives[12][:frag_size], new_headers, 12), ] def get_response(req): @@ -2708,10 +3540,10 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): headers2 = {'X-Object-Sysmeta-Ec-Etag': etag1, 'X-Object-Sysmeta-Ec-Content-Length': '333'} - responses1 = [(200, body, headers1) - for body in ec_archive_bodies1] - responses2 = [(200, body, headers2) - for body in ec_archive_bodies2] + responses1 = [(200, body, self._add_frag_index(fi, headers1)) + for fi, body in enumerate(ec_archive_bodies1)] + responses2 = [(200, body, self._add_frag_index(fi, headers2)) + for fi, body in enumerate(ec_archive_bodies2)] req = swob.Request.blank('/v1/a/c/o') @@ -2764,8 +3596,9 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): ec_archive_bodies = self._make_ec_archive_bodies(test_data) headers = {'X-Object-Sysmeta-Ec-Etag': etag} self.app.recoverable_node_timeout = 0.01 - responses = [(200, SlowBody(body, 0.1), headers) - for body in ec_archive_bodies] + responses = [(200, SlowBody(body, 0.1), + self._add_frag_index(i, headers)) + for i, body in enumerate(ec_archive_bodies)] req = swob.Request.blank('/v1/a/c/o') @@ -2795,10 +3628,11 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): headers = {'X-Object-Sysmeta-Ec-Etag': etag} self.app.recoverable_node_timeout = 0.05 # first one is slow - responses = [(200, SlowBody(ec_archive_bodies[0], 0.1), headers)] + responses = [(200, SlowBody(ec_archive_bodies[0], 0.1), + self._add_frag_index(0, headers))] # ... the rest are fine - responses += [(200, body, headers) - for body in ec_archive_bodies[1:]] + responses += [(200, body, self._add_frag_index(i, headers)) + for i, body in enumerate(ec_archive_bodies[1:], start=1)] req = swob.Request.blank('/v1/a/c/o') @@ -2936,10 +3770,12 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): range_not_satisfiable_body = \ '

%s

%s

' % (title, exp) if start >= segment_size: - responses = [(416, range_not_satisfiable_body, headers) + responses = [(416, range_not_satisfiable_body, + self._add_frag_index(i, headers)) for i in range(POLICIES.default.ec_ndata)] else: - responses = [(200, ''.join(node_fragments[i]), headers) + responses = [(200, ''.join(node_fragments[i]), + self._add_frag_index(i, headers)) for i in range(POLICIES.default.ec_ndata)] status_codes, body_iter, headers = zip(*responses) expect_headers = { diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index bcd5895289..9e1a4b0213 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -24,11 +24,12 @@ import sys import traceback import unittest from contextlib import contextmanager -from shutil import rmtree, copyfile +from shutil import rmtree, copyfile, move import gc import time from textwrap import dedent from hashlib import md5 +import collections from pyeclib.ec_iface import ECDriverError from tempfile import mkdtemp, NamedTemporaryFile import weakref @@ -55,7 +56,7 @@ from swift.common.utils import hash_path, storage_directory, \ from test.unit import ( connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing, FakeMemcache, debug_logger, patch_policies, write_fake_ring, - mocked_http_conn, DEFAULT_TEST_EC_TYPE) + mocked_http_conn, DEFAULT_TEST_EC_TYPE, make_timestamp_iter) from swift.proxy import server as proxy_server from swift.proxy.controllers.obj import ReplicatedObjectController from swift.obj import server as object_server @@ -5595,6 +5596,255 @@ class TestECMismatchedFA(unittest.TestCase): self.assertEqual(resp.status_int, 503) +class TestECGets(unittest.TestCase): + def tearDown(self): + prosrv = _test_servers[0] + # don't leak error limits and poison other tests + prosrv._error_limiting = {} + + def _setup_nodes_and_do_GET(self, objs, node_state): + """ + A helper method that creates object fragments, stashes them in temp + dirs, and then moves selected fragments back into the hash_dirs on each + node according to a specified desired node state description. + + :param objs: a dict that maps object references to dicts that describe + the object timestamp and content. Object frags will be + created for each item in this dict. + :param node_state: a dict that maps a node index to the desired state + for that node. Each desired state is a list of + dicts, with each dict describing object reference, + frag_index and file extensions to be moved to the + node's hash_dir. + """ + (prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv, + obj2srv, obj3srv) = _test_servers + ec_policy = POLICIES[3] + container_name = uuid.uuid4().hex + obj_name = uuid.uuid4().hex + obj_path = os.path.join(os.sep, 'v1', 'a', container_name, obj_name) + + # PUT container, make sure it worked + container_path = os.path.join(os.sep, 'v1', 'a', container_name) + ec_container = Request.blank( + container_path, environ={"REQUEST_METHOD": "PUT"}, + headers={"X-Storage-Policy": "ec", "X-Auth-Token": "t"}) + resp = ec_container.get_response(prosrv) + self.assertIn(resp.status_int, (201, 202)) + + partition, nodes = \ + ec_policy.object_ring.get_nodes('a', container_name, obj_name) + + # map nodes to hash dirs + node_hash_dirs = {} + node_tmp_dirs = collections.defaultdict(dict) + for node in nodes: + node_hash_dirs[node['index']] = os.path.join( + _testdir, node['device'], storage_directory( + diskfile.get_data_dir(ec_policy), + partition, hash_path('a', container_name, obj_name))) + + def _put_object(ref, timestamp, body): + # PUT an object and then move its disk files to a temp dir + headers = {"X-Timestamp": timestamp.internal} + put_req1 = Request.blank(obj_path, method='PUT', headers=headers) + put_req1.body = body + resp = put_req1.get_response(prosrv) + self.assertEqual(resp.status_int, 201) + + # GET the obj, should work fine + get_req = Request.blank(obj_path, method="GET") + resp = get_req.get_response(prosrv) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.body, body) + + # move all hash dir files to per-node, per-obj tempdir + for node_index, hash_dir in node_hash_dirs.items(): + node_tmp_dirs[node_index][ref] = mkdtemp() + for f in os.listdir(hash_dir): + move(os.path.join(hash_dir, f), + os.path.join(node_tmp_dirs[node_index][ref], f)) + + for obj_ref, obj_info in objs.items(): + _put_object(obj_ref, **obj_info) + + # sanity check - all hash_dirs are empty and GET returns a 404 + for hash_dir in node_hash_dirs.values(): + self.assertFalse(os.listdir(hash_dir)) + get_req = Request.blank(obj_path, method="GET") + resp = get_req.get_response(prosrv) + self.assertEqual(resp.status_int, 404) + + # node state is in form: + # {node_index: [{ref: object reference, + # frag_index: index, + # exts: ['.data' etc]}, ...], + # node_index: ...} + for node_index, state in node_state.items(): + dest = node_hash_dirs[node_index] + for frag_info in state: + src = node_tmp_dirs[frag_info['frag_index']][frag_info['ref']] + src_files = [f for f in os.listdir(src) + if f.endswith(frag_info['exts'])] + self.assertEqual(len(frag_info['exts']), len(src_files), + 'Bad test setup for node %s, obj %s' + % (node_index, frag_info['ref'])) + for f in src_files: + move(os.path.join(src, f), os.path.join(dest, f)) + + # do an object GET + get_req = Request.blank(obj_path, method='GET') + return get_req.get_response(prosrv) + + def test_GET_with_missing_durables(self): + # verify object GET behavior when durable files are missing + ts_iter = make_timestamp_iter() + objs = {'obj1': dict(timestamp=next(ts_iter), body='body')} + + # durable missing from 2/3 nodes + node_state = { + 0: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))], + 1: [dict(ref='obj1', frag_index=1, exts=('.data',))], + 2: [dict(ref='obj1', frag_index=2, exts=('.data',))] + } + + resp = self._setup_nodes_and_do_GET(objs, node_state) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.body, objs['obj1']['body']) + + # all files missing on 1 node, durable missing from 1/2 other nodes + # durable missing from 2/3 nodes + node_state = { + 0: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))], + 1: [], + 2: [dict(ref='obj1', frag_index=2, exts=('.data',))] + } + + resp = self._setup_nodes_and_do_GET(objs, node_state) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.body, objs['obj1']['body']) + + # durable missing from all 3 nodes + node_state = { + 0: [dict(ref='obj1', frag_index=0, exts=('.data',))], + 1: [dict(ref='obj1', frag_index=1, exts=('.data',))], + 2: [dict(ref='obj1', frag_index=2, exts=('.data',))] + } + + resp = self._setup_nodes_and_do_GET(objs, node_state) + self.assertEqual(resp.status_int, 503) + + def test_GET_with_multiple_frags_per_node(self): + # verify object GET behavior when multiple fragments are on same node + ts_iter = make_timestamp_iter() + objs = {'obj1': dict(timestamp=next(ts_iter), body='body')} + + # scenario: only two frags, both on same node + node_state = { + 0: [], + 1: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable')), + dict(ref='obj1', frag_index=1, exts=('.data',))], + 2: [] + } + + resp = self._setup_nodes_and_do_GET(objs, node_state) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.body, objs['obj1']['body']) + + # scenario: all 3 frags on same node + node_state = { + 0: [], + 1: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable')), + dict(ref='obj1', frag_index=1, exts=('.data',)), + dict(ref='obj1', frag_index=2, exts=('.data',))], + 2: [] + } + + resp = self._setup_nodes_and_do_GET(objs, node_state) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.body, objs['obj1']['body']) + + def test_GET_with_multiple_timestamps_on_nodes(self): + ts_iter = make_timestamp_iter() + + ts_1, ts_2, ts_3 = [next(ts_iter) for _ in range(3)] + objs = {'obj1': dict(timestamp=ts_1, body='body1'), + 'obj2': dict(timestamp=ts_2, body='body2'), + 'obj3': dict(timestamp=ts_3, body='body3')} + + # newer non-durable frags do not prevent proxy getting the durable obj1 + node_state = { + 0: [dict(ref='obj3', frag_index=0, exts=('.data',)), + dict(ref='obj2', frag_index=0, exts=('.data',)), + dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))], + 1: [dict(ref='obj3', frag_index=1, exts=('.data',)), + dict(ref='obj2', frag_index=1, exts=('.data',)), + dict(ref='obj1', frag_index=1, exts=('.data', '.durable'))], + 2: [dict(ref='obj3', frag_index=2, exts=('.data',)), + dict(ref='obj2', frag_index=2, exts=('.data',)), + dict(ref='obj1', frag_index=2, exts=('.data', '.durable'))], + } + + resp = self._setup_nodes_and_do_GET(objs, node_state) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.body, objs['obj1']['body']) + + # .durables at two timestamps: in this scenario proxy is guaranteed + # to see the durable at ts_2 with one of the first 2 responses, so will + # then prefer that when requesting from third obj server + node_state = { + 0: [dict(ref='obj3', frag_index=0, exts=('.data',)), + dict(ref='obj2', frag_index=0, exts=('.data',)), + dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))], + 1: [dict(ref='obj3', frag_index=1, exts=('.data',)), + dict(ref='obj2', frag_index=1, exts=('.data', '.durable'))], + 2: [dict(ref='obj3', frag_index=2, exts=('.data',)), + dict(ref='obj2', frag_index=2, exts=('.data', '.durable'))], + } + + resp = self._setup_nodes_and_do_GET(objs, node_state) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.body, objs['obj2']['body']) + + def test_GET_with_same_frag_index_on_multiple_nodes(self): + ts_iter = make_timestamp_iter() + + # this is a trick to be able to get identical frags placed onto + # multiple nodes: since we cannot *copy* frags, we generate three sets + # of identical frags at same timestamp so we have enough to *move* + ts_1 = next(ts_iter) + objs = {'obj1a': dict(timestamp=ts_1, body='body'), + 'obj1b': dict(timestamp=ts_1, body='body'), + 'obj1c': dict(timestamp=ts_1, body='body')} + + # arrange for duplicate frag indexes across nodes: because the object + # server prefers the highest available frag index, proxy will first get + # back two responses with frag index 1, and will then return to node 0 + # for frag_index 0. + node_state = { + 0: [dict(ref='obj1a', frag_index=0, exts=('.data',)), + dict(ref='obj1a', frag_index=1, exts=('.data',))], + 1: [dict(ref='obj1b', frag_index=1, exts=('.data', '.durable'))], + 2: [dict(ref='obj1c', frag_index=1, exts=('.data', '.durable'))] + } + + resp = self._setup_nodes_and_do_GET(objs, node_state) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.body, objs['obj1a']['body']) + + # if all we have across nodes are frags with same index then expect a + # 404 (the third, 'extra', obj server GET will return 404 because it + # will be sent frag prefs that exclude frag_index 1) + node_state = { + 0: [dict(ref='obj1a', frag_index=1, exts=('.data',))], + 1: [dict(ref='obj1b', frag_index=1, exts=('.data', '.durable'))], + 2: [dict(ref='obj1c', frag_index=1, exts=('.data',))] + } + + resp = self._setup_nodes_and_do_GET(objs, node_state) + self.assertEqual(resp.status_int, 404) + + class TestObjectDisconnectCleanup(unittest.TestCase): # update this if you need to make more different devices in do_setup