diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 545554ddd3..6a69058f86 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -390,6 +390,39 @@ use = egg:swift#recon # the environment (default). For more information, see # https://bugs.launchpad.net/liberasurecode/+bug/1886088 # write_legacy_ec_crc = +# +# When attempting to reconstruct a missing fragment on another node from a +# fragment on the local node, the reconstructor may fail to fetch sufficient +# fragments to reconstruct the missing fragment. This may be because most or +# all of the remote fragments have been deleted, and the local fragment is +# stale, in which case the reconstructor will never succeed in reconstructing +# the apparently missing fragment and will log errors. If the object's +# tombstones have been reclaimed then the stale fragment will never be deleted +# (see https://bugs.launchpad.net/swift/+bug/1655608). If an operator suspects +# that stale fragments have been re-introduced to the cluster and is seeing +# error logs similar to those in the bug report, then the quarantine_threshold +# option may be set to a value greater than zero. This enables the +# reconstructor to quarantine the stale fragments when it fails to fetch more +# than the quarantine_threshold number of fragments (including the stale +# fragment) during an attempt to reconstruct. For example, setting the +# quarantine_threshold to 1 would cause a fragment to be quarantined if no +# other fragments can be fetched. The value may be reset to zero after the +# reconstructor has run on all affected nodes and the error logs are no longer +# seen. +# Note: the quarantine_threshold applies equally to all policies, but for each +# policy it is effectively capped at (ec_ndata - 1) so that a fragment is never +# quarantined when sufficient fragments exist to reconstruct the object. +# Fragments are not quarantined until they are older than the reclaim_age. +# quarantine_threshold = 0 +# +# Sets the maximum number of nodes to which requests will be made before +# quarantining a fragment. You can use '* replicas' at the end to have it use +# the number given times the number of replicas for the ring being used for the +# requests. The minimum number of nodes to which requests are made is the +# number of replicas for the policy minus 1 (the node on which the fragment is +# to be rebuilt). The minimum is only exceeded if request_node_count is +# greater, and only for the purposes of quarantining. +# request_node_count = 2 * replicas [object-updater] # You can override the default log routing for this app here (don't use set!): diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index d0bc1caae7..dea16eb95f 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -220,8 +220,8 @@ use = egg:swift#proxy # to use up to replica count threads when waiting on a response. In # conjunction with the concurrency_timeout option this will allow swift to send # out GET/HEAD requests to the storage nodes concurrently and answer as soon as -# the minimum number of backend responses are availabe - in replicated contexts -# this will be the first backend replica to respond. +# the minimum number of backend responses are available - in replicated +# contexts this will be the first backend replica to respond. # concurrent_gets = off # # This parameter controls how long to wait before firing off the next diff --git a/swift/common/utils.py b/swift/common/utils.py index ad157e58ab..404d384c9d 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -515,6 +515,23 @@ def config_percent_value(value): raise ValueError("%s: %s" % (str(err), value)) +def config_request_node_count_value(value): + try: + value_parts = value.lower().split() + rnc_value = int(value_parts[0]) + except (ValueError, AttributeError): + pass + else: + if len(value_parts) == 1: + return lambda replicas: rnc_value + elif (len(value_parts) == 3 and + value_parts[1] == '*' and + value_parts[2] == 'replicas'): + return lambda replicas: rnc_value * replicas + raise ValueError( + 'Invalid request_node_count value: %r' % value) + + def append_underscore(prefix): if prefix and not prefix.endswith('_'): prefix += '_' diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index de29903245..5d73ba00dd 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -12,7 +12,7 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. - +import itertools import json import errno import os @@ -33,7 +33,8 @@ from swift.common.utils import ( dump_recon_cache, mkdirs, config_true_value, GreenAsyncPile, Timestamp, remove_file, load_recon_cache, parse_override_options, distribute_evenly, - PrefixLoggerAdapter, remove_directory) + PrefixLoggerAdapter, remove_directory, config_request_node_count_value, + non_negative_int) from swift.common.header_key_dict import HeaderKeyDict from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon @@ -102,8 +103,8 @@ class ResponseBucket(object): def __init__(self): # count of all responses associated with this Bucket self.num_responses = 0 - # map {frag_index: response} for subset of responses that - # could be used to rebuild the missing fragment + # map {frag_index: response} for subset of responses that could be used + # to rebuild the missing fragment self.useful_responses = {} # set if a durable timestamp was seen in responses self.durable = False @@ -232,6 +233,10 @@ class ObjectReconstructor(Daemon): 'of handoffs_only.') self.rebuild_handoff_node_count = int(conf.get( 'rebuild_handoff_node_count', 2)) + self.quarantine_threshold = non_negative_int( + conf.get('quarantine_threshold', 0)) + self.request_node_count = config_request_node_count_value( + conf.get('request_node_count', '2 * replicas')) # When upgrading from liberasurecode<=1.5.0, you may want to continue # writing legacy CRCs until all nodes are upgraded and capabale of @@ -368,23 +373,24 @@ class ObjectReconstructor(Daemon): return False return True - def _get_response(self, node, part, path, headers, full_path): + def _get_response(self, node, policy, partition, path, headers): """ Helper method for reconstruction that GETs a single EC fragment archive :param node: the node to GET from - :param part: the partition + :param policy: the job policy + :param partition: the partition :param path: path of the desired EC archive relative to partition dir :param headers: the headers to send - :param full_path: full path to desired EC archive :returns: response """ + full_path = _full_path(node, partition, path, policy) resp = None try: with ConnectionTimeout(self.conn_timeout): conn = http_connect(node['ip'], node['port'], node['device'], - part, 'GET', path, headers=headers) + partition, 'GET', path, headers=headers) with Timeout(self.node_timeout): resp = conn.getresponse() resp.full_path = full_path @@ -462,7 +468,11 @@ class ObjectReconstructor(Daemon): fi_to_rebuild) return None - if fi_to_rebuild == resp_frag_index: + durable_timestamp = resp.headers.get('X-Backend-Durable-Timestamp') + if durable_timestamp: + buckets[Timestamp(durable_timestamp)].durable = True + + if resp_frag_index == fi_to_rebuild: # TODO: With duplicated EC frags it's not unreasonable to find the # very fragment we're trying to rebuild exists on another primary # node. In this case we should stream it directly from the remote @@ -471,19 +481,43 @@ class ObjectReconstructor(Daemon): 'Found existing frag #%s at %s while rebuilding to %s', fi_to_rebuild, resp.full_path, _full_path(node, partition, path, policy)) - return None - - durable_timestamp = resp.headers.get('X-Backend-Durable-Timestamp') - if durable_timestamp: - buckets[Timestamp(durable_timestamp)].durable = True - - if resp_frag_index not in bucket.useful_responses: + elif resp_frag_index not in bucket.useful_responses: bucket.useful_responses[resp_frag_index] = resp - return bucket - return None + # else: duplicate frag_index isn't useful for rebuilding - def _make_fragment_requests(self, job, node, datafile_metadata, buckets, - error_responses, nodes): + return bucket + + def _is_quarantine_candidate(self, policy, buckets, error_responses, df): + # This condition is deliberately strict because it determines if + # more requests will be issued and ultimately if the fragment + # will be quarantined. + if list(error_responses.keys()) != [404]: + # only quarantine if all other responses are 404 so we are + # confident there are no other frags on queried nodes + return False + + local_timestamp = Timestamp(df.get_datafile_metadata()['X-Timestamp']) + if list(buckets.keys()) != [local_timestamp]: + # don't quarantine if there's insufficient other timestamp + # frags, or no response for the local frag timestamp: we + # possibly could quarantine, but this unexpected case may be + # worth more investigation + return False + + if time.time() - float(local_timestamp) <= df.manager.reclaim_age: + # If the fragment has not yet passed reclaim age then it is + # likely that a tombstone will be reverted to this node, or + # neighbor frags will get reverted from handoffs to *other* nodes + # and we'll discover we *do* have enough to reconstruct. Don't + # quarantine it yet: better that it is cleaned up 'normally'. + return False + + bucket = buckets[local_timestamp] + return (bucket.num_responses <= self.quarantine_threshold and + bucket.num_responses < policy.ec_ndata and + df._frag_index in bucket.useful_responses) + + def _make_fragment_requests(self, job, node, df, buckets, error_responses): """ Issue requests for fragments to the list of ``nodes`` and sort the responses into per-timestamp ``buckets`` or per-status @@ -492,16 +526,15 @@ class ObjectReconstructor(Daemon): :param job: job from ssync_sender. :param node: node to which we're rebuilding. - :param datafile_metadata: the datafile metadata to attach to - the rebuilt fragment archive + :param df: an instance of :class:`~swift.obj.diskfile.BaseDiskFile`. :param buckets: dict of per-timestamp buckets for ok responses. :param error_responses: dict of per-status lists of error responses. - :param nodes: A list of nodes. :return: A per-timestamp with sufficient responses, or None if there is no such bucket. """ policy = job['policy'] partition = job['partition'] + datafile_metadata = df.get_datafile_metadata() # the fragment index we need to reconstruct is the position index # of the node we're rebuilding to within the primary part list @@ -521,24 +554,80 @@ class ObjectReconstructor(Daemon): headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs) path = datafile_metadata['name'] - pile = GreenAsyncPile(len(nodes)) - for _node in nodes: - full_get_path = _full_path(_node, partition, path, policy) - pile.spawn(self._get_response, _node, partition, - path, headers, full_get_path) + ring = policy.object_ring + primary_nodes = ring.get_part_nodes(partition) + # primary_node_count is the maximum number of nodes to consume in a + # normal rebuild attempt when there is no quarantine candidate, + # including the node to which we are rebuilding + primary_node_count = len(primary_nodes) + # don't try and fetch a fragment from the node we're rebuilding to + filtered_primary_nodes = [n for n in primary_nodes + if n['id'] != node['id']] + # concurrency is the number of requests fired off in initial batch + concurrency = len(filtered_primary_nodes) + # max_node_count is the maximum number of nodes to consume when + # verifying a quarantine candidate and is at least primary_node_count + max_node_count = max(primary_node_count, + self.request_node_count(primary_node_count)) + pile = GreenAsyncPile(concurrency) + for primary_node in filtered_primary_nodes: + pile.spawn(self._get_response, primary_node, policy, partition, + path, headers) + + useful_bucket = None for resp in pile: bucket = self._handle_fragment_response( node, policy, partition, fi_to_rebuild, path, buckets, error_responses, resp) if bucket and len(bucket.useful_responses) >= policy.ec_ndata: - frag_indexes = list(bucket.useful_responses.keys()) - self.logger.debug('Reconstruct frag #%s with frag indexes %s' - % (fi_to_rebuild, frag_indexes)) - return bucket - return None + useful_bucket = bucket + break - def reconstruct_fa(self, job, node, datafile_metadata): + # Once all rebuild nodes have responded, if we have a quarantine + # candidate, go beyond primary_node_count and on to handoffs. The + # first non-404 response will prevent quarantine, but the expected + # common case is all 404 responses so we use some concurrency to get an + # outcome faster at the risk of some unnecessary requests in the + # uncommon case. + if (not useful_bucket and + self._is_quarantine_candidate( + policy, buckets, error_responses, df)): + node_count = primary_node_count + handoff_iter = itertools.islice(ring.get_more_nodes(partition), + max_node_count - node_count) + for handoff_node in itertools.islice(handoff_iter, concurrency): + node_count += 1 + pile.spawn(self._get_response, handoff_node, policy, partition, + path, headers) + for resp in pile: + bucket = self._handle_fragment_response( + node, policy, partition, fi_to_rebuild, path, buckets, + error_responses, resp) + if bucket and len(bucket.useful_responses) >= policy.ec_ndata: + useful_bucket = bucket + self.logger.debug( + 'Reconstructing frag from handoffs, node_count=%d' + % node_count) + break + elif self._is_quarantine_candidate( + policy, buckets, error_responses, df): + try: + handoff_node = next(handoff_iter) + node_count += 1 + pile.spawn(self._get_response, handoff_node, policy, + partition, path, headers) + except StopIteration: + pass + # else: this frag is no longer a quarantine candidate, so we + # could break right here and ignore any remaining responses, + # but given that we may have actually found another frag we'll + # optimistically wait for any remaining responses in case a + # useful bucket is assembled. + + return useful_bucket + + def reconstruct_fa(self, job, node, df): """ Reconstructs a fragment archive - this method is called from ssync after a remote node responds that is missing this object - the local @@ -547,8 +636,7 @@ class ObjectReconstructor(Daemon): :param job: job from ssync_sender. :param node: node to which we're rebuilding. - :param datafile_metadata: the datafile metadata to attach to - the rebuilt fragment archive + :param df: an instance of :class:`~swift.obj.diskfile.BaseDiskFile`. :returns: a DiskFile like class for use by ssync. :raises DiskFileQuarantined: if the fragment archive cannot be reconstructed and has as a result been quarantined. @@ -559,6 +647,7 @@ class ObjectReconstructor(Daemon): # the fragment index we need to reconstruct is the position index # of the node we're rebuilding to within the primary part list fi_to_rebuild = node['backend_index'] + datafile_metadata = df.get_datafile_metadata() local_timestamp = Timestamp(datafile_metadata['X-Timestamp']) path = datafile_metadata['name'] @@ -566,12 +655,13 @@ class ObjectReconstructor(Daemon): error_responses = defaultdict(list) # map status code -> response list # don't try and fetch a fragment from the node we're rebuilding to - part_nodes = [n for n in policy.object_ring.get_part_nodes(partition) - if n['id'] != node['id']] useful_bucket = self._make_fragment_requests( - job, node, datafile_metadata, buckets, error_responses, part_nodes) + job, node, df, buckets, error_responses) if useful_bucket: + frag_indexes = list(useful_bucket.useful_responses.keys()) + self.logger.debug('Reconstruct frag #%s with frag indexes %s' + % (fi_to_rebuild, frag_indexes)) responses = list(useful_bucket.useful_responses.values()) rebuilt_fragment_iter = self.make_rebuilt_fragment_iter( responses[:policy.ec_ndata], path, policy, fi_to_rebuild) @@ -601,6 +691,10 @@ class ObjectReconstructor(Daemon): errors, 'durable' if durable else 'non-durable', full_path, fi_to_rebuild)) + if self._is_quarantine_candidate(policy, buckets, error_responses, df): + raise df._quarantine( + df._data_file, "Solitary fragment #%s" % df._frag_index) + raise DiskFileError('Unable to reconstruct EC archive') def _reconstruct(self, policy, fragment_payload, frag_index): diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index 2a42fbf01e..27d143210c 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -374,7 +374,7 @@ class Sender(object): # from the data file only. df_alt = self.job.get( 'sync_diskfile_builder', lambda *args: df)( - self.job, self.node, df.get_datafile_metadata()) + self.job, self.node, df) self.send_put(connection, url_path, df_alt, durable=is_durable) if want.get('meta') and df.data_timestamp != df.timestamp: diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 21802c47fb..6bd7fbe918 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -35,7 +35,8 @@ from swift.common.ring import Ring from swift.common.utils import Watchdog, get_logger, \ get_remote_client, split_path, config_true_value, generate_trans_id, \ affinity_key_function, affinity_locality_predicate, list_from_csv, \ - register_swift_info, parse_prefixed_conf, config_auto_int_value + register_swift_info, parse_prefixed_conf, config_auto_int_value, \ + config_request_node_count_value from swift.common.constraints import check_utf8, valid_api_version from swift.proxy.controllers import AccountController, ContainerController, \ ObjectControllerRouter, InfoController @@ -279,16 +280,8 @@ class Application(object): conf.get('strict_cors_mode', 't')) self.node_timings = {} self.timing_expiry = int(conf.get('timing_expiry', 300)) - value = conf.get('request_node_count', '2 * replicas').lower().split() - if len(value) == 1: - rnc_value = int(value[0]) - self.request_node_count = lambda replicas: rnc_value - elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas': - rnc_value = int(value[0]) - self.request_node_count = lambda replicas: rnc_value * replicas - else: - raise ValueError( - 'Invalid request_node_count value: %r' % ''.join(value)) + value = conf.get('request_node_count', '2 * replicas') + self.request_node_count = config_request_node_count_value(value) # swift_owner_headers are stripped by the account and container # controllers; we should extend header stripping to object controller # when a privileged object header is implemented. diff --git a/test/probe/common.py b/test/probe/common.py index 5c437e20d4..fac9daea08 100644 --- a/test/probe/common.py +++ b/test/probe/common.py @@ -33,7 +33,7 @@ from six.moves.http_client import HTTPConnection from six.moves.urllib.parse import urlparse from swiftclient import get_auth, head_account, client -from swift.common import internal_client, direct_client +from swift.common import internal_client, direct_client, utils from swift.common.direct_client import DirectClientException from swift.common.ring import Ring from swift.common.utils import readconf, renamer, \ @@ -41,6 +41,7 @@ from swift.common.utils import readconf, renamer, \ from swift.common.manager import Manager from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY from swift.obj.diskfile import get_data_dir +from test.debug_logger import debug_logger from test.probe import CHECK_SERVER_TIMEOUT, VALIDATE_RSYNC, PROXY_BASE_URL @@ -186,6 +187,8 @@ def store_config_paths(name, configs): server_names = [name, '%s-replicator' % name] if name == 'container': server_names.append('container-sharder') + elif name == 'object': + server_names.append('object-reconstructor') for server_name in server_names: for server in Manager([server_name]): for i, conf in enumerate(server.conf_files(), 1): @@ -563,6 +566,15 @@ class ProbeTest(unittest.TestCase): for ent in os.listdir(ap_dir_fullpath)]) return async_pendings + def run_custom_daemon(self, klass, conf_section, conf_index, + custom_conf, **kwargs): + conf_file = self.configs[conf_section][conf_index] + conf = utils.readconf(conf_file, conf_section) + conf.update(custom_conf) + daemon = klass(conf, debug_logger('probe')) + daemon.run_once(**kwargs) + return daemon + class ReplProbeTest(ProbeTest): @@ -679,6 +691,7 @@ class ECProbeTest(ProbeTest): self.direct_get(onode, opart, require_durable=require_durable) except direct_client.DirectClientException as err: self.assertEqual(err.http_status, status) + return err else: self.fail('Node data on %r was not fully destroyed!' % (onode,)) diff --git a/test/probe/test_reconstructor_rebuild.py b/test/probe/test_reconstructor_rebuild.py index 9f8e491a5b..e87cd95785 100644 --- a/test/probe/test_reconstructor_rebuild.py +++ b/test/probe/test_reconstructor_rebuild.py @@ -13,7 +13,7 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. - +import itertools from contextlib import contextmanager import unittest import uuid @@ -22,7 +22,9 @@ import time import six from swift.common.direct_client import DirectClientException +from swift.common.manager import Manager from swift.common.utils import md5 +from swift.obj.reconstructor import ObjectReconstructor from test.probe.common import ECProbeTest from swift.common import direct_client @@ -369,6 +371,160 @@ class TestReconstructorRebuild(ECProbeTest): 'X-Backend-Storage-Policy-Index': int(self.policy)}) self.assertNotIn('X-Delete-At', headers) + def test_rebuild_quarantines_lonely_frag(self): + # fail one device while the object is deleted so we are left with one + # fragment and some tombstones + failed_node = self.onodes[0] + device_path = self.device_dir(failed_node) + self.kill_drive(device_path) + self.assert_direct_get_fails(failed_node, self.opart, 507) # sanity + + # delete object + client.delete_object(self.url, self.token, self.container_name, + self.object_name) + + # check we have tombstones + for node in self.onodes[1:]: + err = self.assert_direct_get_fails(node, self.opart, 404) + self.assertIn('X-Backend-Timestamp', err.http_headers) + + # run the reconstructor with zero reclaim age to clean up tombstones + for conf_index in self.configs['object-reconstructor'].keys(): + self.run_custom_daemon( + ObjectReconstructor, 'object-reconstructor', conf_index, + {'reclaim_age': '0'}) + + # check we no longer have tombstones + for node in self.onodes[1:]: + err = self.assert_direct_get_fails(node, self.opart, 404) + self.assertNotIn('X-Timestamp', err.http_headers) + + # revive the failed device and check it has a fragment + self.revive_drive(device_path) + self.assert_direct_get_succeeds(failed_node, self.opart) + + # restart proxy to clear error-limiting so that the revived drive + # participates again + Manager(['proxy-server']).restart() + + # client GET will fail with 503 ... + with self.assertRaises(ClientException) as cm: + client.get_object(self.url, self.token, self.container_name, + self.object_name) + self.assertEqual(503, cm.exception.http_status) + # ... but client GET succeeds + headers = client.head_object(self.url, self.token, self.container_name, + self.object_name) + for key in self.headers_post: + self.assertIn(key, headers) + self.assertEqual(self.headers_post[key], headers[key]) + + # run the reconstructor without quarantine_threshold set + error_lines = [] + warning_lines = [] + for conf_index in self.configs['object-reconstructor'].keys(): + reconstructor = self.run_custom_daemon( + ObjectReconstructor, 'object-reconstructor', conf_index, + {'reclaim_age': '0'}) + logger = reconstructor.logger.logger + error_lines.append(logger.get_lines_for_level('error')) + warning_lines.append(logger.get_lines_for_level('warning')) + + # check logs for errors + found_lines = False + for lines in error_lines: + if not lines: + continue + self.assertFalse(found_lines, error_lines) + found_lines = True + for line in itertools.islice(lines, 0, 6, 2): + self.assertIn( + 'Unable to get enough responses (1/4 from 1 ok ' + 'responses)', line, lines) + for line in itertools.islice(lines, 1, 7, 2): + self.assertIn( + 'Unable to get enough responses (4 x 404 error ' + 'responses)', line, lines) + self.assertTrue(found_lines, 'error lines not found') + + for lines in warning_lines: + self.assertEqual([], lines) + + # check we have still have a single fragment and no tombstones + self.assert_direct_get_succeeds(failed_node, self.opart) + for node in self.onodes[1:]: + err = self.assert_direct_get_fails(node, self.opart, 404) + self.assertNotIn('X-Timestamp', err.http_headers) + + # run the reconstructor to quarantine the lonely frag + error_lines = [] + warning_lines = [] + for conf_index in self.configs['object-reconstructor'].keys(): + reconstructor = self.run_custom_daemon( + ObjectReconstructor, 'object-reconstructor', conf_index, + {'reclaim_age': '0', 'quarantine_threshold': '1'}) + logger = reconstructor.logger.logger + error_lines.append(logger.get_lines_for_level('error')) + warning_lines.append(logger.get_lines_for_level('warning')) + + # check logs for errors + found_lines = False + for index, lines in enumerate(error_lines): + if not lines: + continue + self.assertFalse(found_lines, error_lines) + found_lines = True + for line in itertools.islice(lines, 0, 6, 2): + self.assertIn( + 'Unable to get enough responses (1/4 from 1 ok ' + 'responses)', line, lines) + for line in itertools.islice(lines, 1, 7, 2): + self.assertIn( + 'Unable to get enough responses (6 x 404 error ' + 'responses)', line, lines) + self.assertTrue(found_lines, 'error lines not found') + + # check logs for quarantine warning + found_lines = False + for lines in warning_lines: + if not lines: + continue + self.assertFalse(found_lines, warning_lines) + found_lines = True + self.assertEqual(1, len(lines), lines) + self.assertIn('Quarantined object', lines[0]) + self.assertTrue(found_lines, 'warning lines not found') + + # check we have nothing + for node in self.onodes: + err = self.assert_direct_get_fails(node, self.opart, 404) + self.assertNotIn('X-Backend-Timestamp', err.http_headers) + # client HEAD and GET now both 404 + with self.assertRaises(ClientException) as cm: + client.get_object(self.url, self.token, self.container_name, + self.object_name) + self.assertEqual(404, cm.exception.http_status) + with self.assertRaises(ClientException) as cm: + client.head_object(self.url, self.token, self.container_name, + self.object_name) + self.assertEqual(404, cm.exception.http_status) + + # run the reconstructor once more - should see no errors in logs! + error_lines = [] + warning_lines = [] + for conf_index in self.configs['object-reconstructor'].keys(): + reconstructor = self.run_custom_daemon( + ObjectReconstructor, 'object-reconstructor', conf_index, + {'reclaim_age': '0', 'quarantine_threshold': '1'}) + logger = reconstructor.logger.logger + error_lines.append(logger.get_lines_for_level('error')) + warning_lines.append(logger.get_lines_for_level('warning')) + + for lines in error_lines: + self.assertEqual([], lines) + for lines in warning_lines: + self.assertEqual([], lines) + if six.PY2: class TestReconstructorRebuildUTF8(TestReconstructorRebuild): diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 96511f6fce..2e8647c21f 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -40,7 +40,6 @@ from test.probe import PROXY_BASE_URL from test.probe.brain import BrainSplitter from test.probe.common import ReplProbeTest, get_server_number, \ wait_for_server_to_hangup -from test.debug_logger import debug_logger import mock @@ -415,12 +414,8 @@ class BaseTestContainerSharding(ReplProbeTest): additional_args='--partitions=%s' % part) def run_custom_sharder(self, conf_index, custom_conf, **kwargs): - conf_file = self.configs['container-sharder'][conf_index] - conf = utils.readconf(conf_file, 'container-sharder') - conf.update(custom_conf) - sharder = ContainerSharder(conf, logger=debug_logger('probe')) - sharder.run_once(**kwargs) - return sharder + return self.run_custom_daemon(ContainerSharder, 'container-sharder', + conf_index, custom_conf, **kwargs) class TestContainerShardingNonUTF8(BaseTestContainerSharding): diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 5582edf8d4..a6d1741752 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -18,6 +18,7 @@ from __future__ import print_function import hashlib +from test import annotate_failure from test.debug_logger import debug_logger from test.unit import temptree, make_timestamp_iter, with_tempdir, \ mock_timestamp_now, FakeIterable @@ -3156,6 +3157,27 @@ cluster_dfw1 = http://dfw1.host/v1/ 'less than 100, not "{}"'.format(val), cm.exception.args[0]) + def test_config_request_node_count_value(self): + def do_test(value, replicas, expected): + self.assertEqual( + expected, + utils.config_request_node_count_value(value)(replicas)) + + do_test('0', 10, 0) + do_test('1 * replicas', 3, 3) + do_test('1 * replicas', 11, 11) + do_test('2 * replicas', 3, 6) + do_test('2 * replicas', 11, 22) + do_test('11', 11, 11) + do_test('10', 11, 10) + do_test('12', 11, 12) + + for bad in ('1.1', 1.1, 'auto', 'bad', + '2.5 * replicas', 'two * replicas'): + with annotate_failure(bad): + with self.assertRaises(ValueError): + utils.config_request_node_count_value(bad) + def test_config_auto_int_value(self): expectations = { # (value, default) : expected, diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index dc98fae975..86ddddb712 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -34,7 +34,7 @@ from gzip import GzipFile from shutil import rmtree from six.moves.urllib.parse import unquote from swift.common import utils -from swift.common.exceptions import DiskFileError +from swift.common.exceptions import DiskFileError, DiskFileQuarantined from swift.common.header_key_dict import HeaderKeyDict from swift.common.utils import dump_recon_cache, md5 from swift.obj import diskfile, reconstructor as object_reconstructor @@ -42,6 +42,7 @@ from swift.common import ring from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, POLICIES, EC_POLICY) from swift.obj.reconstructor import SYNC, REVERT +from test import annotate_failure from test.debug_logger import debug_logger from test.unit import (patch_policies, mocked_http_conn, FabricatedRing, @@ -785,10 +786,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def do_test(stat_code): with mocked_http_conn(stat_code): - resp = self.reconstructor._get_response(node, part, - path='nada', - headers={}, - full_path='nada/nada') + resp = self.reconstructor._get_response( + node, self.policy, part, path='nada', headers={}) return resp for status in (200, 400, 404, 503): @@ -4528,17 +4527,27 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): class TestReconstructFragmentArchive(BaseTestObjectReconstructor): - obj_path = b'/a/c/o' # subclass overrides this + obj_name = b'o' # subclass overrides this def setUp(self): super(TestReconstructFragmentArchive, self).setUp() + self.obj_path = b'/a/c/' + self.obj_name self.obj_timestamp = self.ts() - self.obj_metadata = { - 'name': self.obj_path, - 'Content-Length': '0', - 'ETag': 'etag', - 'X-Timestamp': self.obj_timestamp.normal - } + + def _create_fragment(self, frag_index, body=b'test data'): + utils.mkdirs(os.path.join(self.devices, 'sda1')) + df_mgr = self.reconstructor._df_router[self.policy] + if six.PY2: + obj_name = self.obj_name + else: + obj_name = self.obj_name.decode('utf8') + self.df = df_mgr.get_diskfile('sda1', 9, 'a', 'c', obj_name, + policy=self.policy) + write_diskfile(self.df, self.obj_timestamp, data=body, + frag_index=frag_index) + self.df.open() + self.logger.clear() + return self.df def test_reconstruct_fa_no_errors(self): job = { @@ -4565,9 +4574,9 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): called_headers = [] orig_func = object_reconstructor.ObjectReconstructor._get_response - def _get_response_hook(self, node, part, path, headers, policy): + def _get_response_hook(self, node, policy, part, path, headers): called_headers.append(headers) - return orig_func(self, node, part, path, headers, policy) + return orig_func(self, node, policy, part, path, headers) codes, body_iter, headers = zip(*responses) get_response_path = \ @@ -4576,7 +4585,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): with mocked_http_conn( *codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( - job, node, self.obj_metadata) + job, node, self._create_fragment(2, body=b'')) self.assertEqual(0, df.content_length) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) @@ -4635,7 +4644,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers_iter): df = self.reconstructor.reconstruct_fa( - job, node, dict(self.obj_metadata)) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( @@ -4676,7 +4685,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers_iter): df = self.reconstructor.reconstruct_fa( - job, node, dict(self.obj_metadata)) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( @@ -4722,7 +4731,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers_iter): df = self.reconstructor.reconstruct_fa( - job, node, dict(self.obj_metadata)) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) # ... this bad response should be ignored like any other failure self.assertEqual(len(fixed_body), len(broken_body)) @@ -4761,7 +4770,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers_iter): df = self.reconstructor.reconstruct_fa( - job, node, dict(self.obj_metadata)) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( @@ -4783,7 +4792,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): range(policy.object_ring.replicas - 1)] with mocked_http_conn(*codes): self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, - job, node, self.obj_metadata) + job, node, self._create_fragment(2)) error_lines = self.logger.get_lines_for_level('error') # # of replicas failed and one more error log to report not enough # responses to reconstruct. @@ -4799,6 +4808,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): self.assertFalse(self.logger.get_lines_for_level('warning')) def test_reconstruct_fa_all_404s_fails(self): + self._create_fragment(2) job = { 'partition': 0, 'policy': self.policy, @@ -4811,7 +4821,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): codes = [404 for i in range(policy.object_ring.replicas - 1)] with mocked_http_conn(*codes): self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, - job, node, self.obj_metadata) + job, node, self.df) error_lines = self.logger.get_lines_for_level('error') # only 1 log to report not enough responses self.assertEqual(1, len(error_lines)) @@ -4823,7 +4833,51 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): # no warning self.assertFalse(self.logger.get_lines_for_level('warning')) + def test_reconstruct_fa_all_404s_fails_custom_request_node_count(self): + # verify that when quarantine_threshold is not set the number of + # requests is capped at replicas - 1 regardless of request_node_count + self._create_fragment(2) + job = { + 'partition': 0, + 'policy': self.policy, + } + part_nodes = self.policy.object_ring.get_part_nodes(0) + node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) + ring = self.policy.object_ring + # sanity check: number of handoffs available == replicas + self.assertEqual(ring.max_more_nodes, ring.replicas) + for request_node_count in (0, + self.policy.ec_ndata - 1, + ring.replicas + 1, + 2 * ring.replicas - 1, + 2 * ring.replicas, + 3 * ring.replicas, + 99 * ring.replicas): + with annotate_failure(request_node_count): + self.logger.clear() + self.reconstructor.request_node_count = \ + lambda replicas: request_node_count + # request count capped at num primaries - 1 + exp_requests = ring.replicas - 1 + codes = [404 for i in range(exp_requests)] + with mocked_http_conn(*codes): + self.assertRaises(DiskFileError, + self.reconstructor.reconstruct_fa, + job, node, self.df) + error_lines = self.logger.get_lines_for_level('error') + # only 1 log to report not enough responses + self.assertEqual(1, len(error_lines)) + self.assertIn( + 'Unable to get enough responses (%s x 404 error responses)' + % exp_requests, + error_lines[0], + "Unexpected error line found: %s" % error_lines[0]) + # no warning + self.assertFalse(self.logger.get_lines_for_level('warning')) + def test_reconstruct_fa_mixture_of_errors_fails(self): + self._create_fragment(2) job = { 'partition': 0, 'policy': self.policy, @@ -4839,7 +4893,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): range(policy.object_ring.replicas - 4)] with mocked_http_conn(*codes): self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, - job, node, self.obj_metadata) + job, node, self.df) exp_timeouts = len([c for c in codes if isinstance(c, Timeout)]) exp_404s = len([c for c in codes if c == 404]) exp_507s = len([c for c in codes if c == 507]) @@ -4901,7 +4955,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( - job, node, self.obj_metadata) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( @@ -4941,7 +4995,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( - job, node, dict(self.obj_metadata)) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( @@ -4958,7 +5012,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( - job, node, dict(self.obj_metadata)) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( @@ -4995,7 +5049,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( - job, node, dict(self.obj_metadata)) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( @@ -5016,7 +5070,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( - job, node, dict(self.obj_metadata)) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( @@ -5080,7 +5134,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, - job, node, self.obj_metadata) + job, node, self._create_fragment(2)) error_lines = self.logger.get_lines_for_level('error') # 1 error log per etag to report not enough responses @@ -5111,6 +5165,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): self.assertFalse(self.logger.get_lines_for_level('warning')) def test_reconstruct_fa_with_mixed_etags_same_timestamp_fail(self): + self._create_fragment(2) job = { 'partition': 0, 'policy': self.policy, @@ -5157,7 +5212,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, - job, node, self.obj_metadata) + job, node, self.df) error_lines = self.logger.get_lines_for_level('error') self.assertGreater(len(error_lines), 1) @@ -5219,7 +5274,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( - job, node, self.obj_metadata) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( @@ -5234,7 +5289,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): # redundant frag found once in first ec_ndata responses self.assertIn( 'Found existing frag #%s at' % broken_index, - debug_log_lines[0]) + debug_log_lines[0], debug_log_lines) # N.B. in the future, we could avoid those check because # definitely sending the copy rather than reconstruct will @@ -5251,6 +5306,537 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): debug_log_lines[1][len(log_prefix):]) self.assertNotIn(broken_index, got_frag_index_list) + def test_quarantine_threshold_conf(self): + reconstructor = object_reconstructor.ObjectReconstructor({}) + self.assertEqual(0, reconstructor.quarantine_threshold) + + reconstructor = object_reconstructor.ObjectReconstructor( + {'quarantine_threshold': '0'}) + self.assertEqual(0, reconstructor.quarantine_threshold) + + reconstructor = object_reconstructor.ObjectReconstructor( + {'quarantine_threshold': '1'}) + self.assertEqual(1, reconstructor.quarantine_threshold) + + reconstructor = object_reconstructor.ObjectReconstructor( + {'quarantine_threshold': 2.0}) + self.assertEqual(2, reconstructor.quarantine_threshold) + + for bad in ('1.1', 1.1, '-1', -1, 'auto', 'bad'): + with annotate_failure(bad): + with self.assertRaises(ValueError): + object_reconstructor.ObjectReconstructor( + {'quarantine_threshold': bad}) + + def test_request_node_count_conf(self): + # default is 1 * replicas + reconstructor = object_reconstructor.ObjectReconstructor({}) + self.assertEqual(6, reconstructor.request_node_count(3)) + self.assertEqual(22, reconstructor.request_node_count(11)) + + def do_test(value, replicas, expected): + reconstructor = object_reconstructor.ObjectReconstructor( + {'request_node_count': value}) + self.assertEqual(expected, + reconstructor.request_node_count(replicas)) + do_test('0', 10, 0) + do_test('1 * replicas', 3, 3) + do_test('1 * replicas', 11, 11) + do_test('2 * replicas', 3, 6) + do_test('2 * replicas', 11, 22) + do_test('11', 11, 11) + do_test('10', 11, 10) + do_test('12', 11, 12) + + for bad in ('1.1', 1.1, 'auto', 'bad', + '2.5 * replicas', 'two * replicas'): + with annotate_failure(bad): + with self.assertRaises(ValueError): + object_reconstructor.ObjectReconstructor( + {'request_node_count': bad}) + + def _do_test_reconstruct_insufficient_frags( + self, extra_conf, num_frags, other_responses, + local_frag_index=2, frag_index_to_rebuild=1, + resp_timestamps=None, resp_etags=None): + # num_frags is number of ok responses, other_responses is bad responses + # By default frag_index_to_rebuild is less than local_frag_index and + # all frag responses have indexes >= local_frag_index + self.assertGreater(num_frags, 0) + self.logger.clear() + self._configure_reconstructor(**extra_conf) + self._create_fragment(local_frag_index) + job = { + 'partition': 0, + 'policy': self.policy, + } + part_nodes = self.policy.object_ring.get_part_nodes(0) + node = part_nodes[frag_index_to_rebuild] + node['backend_index'] = self.policy.get_backend_index(node['index']) + + test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777] + etag = md5(test_data, usedforsecurity=False).hexdigest() + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) + frags = ec_archive_bodies[ + local_frag_index:local_frag_index + num_frags] + + if resp_etags: + self.assertEqual(len(frags), len(resp_etags)) + etags = [] + for other_etag in resp_etags: + # use default etag where other_etag is None + etags.append(other_etag if other_etag else etag) + else: + etags = [etag] * len(frags) + + def make_header(body): + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etags.pop(0)}) + return headers + + responses = [(200, frag, make_header(frag)) for frag in frags] + codes, body_iter, headers = zip(*(responses + other_responses)) + resp_timestamps = (resp_timestamps if resp_timestamps + else [self.obj_timestamp] * len(codes)) + resp_timestamps = [ts.internal for ts in resp_timestamps] + with mocked_http_conn(*codes, body_iter=body_iter, + headers=headers, + timestamps=resp_timestamps): + with self.assertRaises(DiskFileError) as cm: + self.reconstructor.reconstruct_fa( + job, node, self._create_fragment(2)) + return cm.exception + + def _verify_error_lines(self, num_frags, other_responses, + exp_useful_responses): + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(2, len(error_lines), error_lines) + self.assertIn( + 'Unable to get enough responses (%d/%d from %d ok responses)' + % (exp_useful_responses, self.policy.ec_ndata, num_frags), + error_lines[0]) + bad_codes = collections.Counter( + status for status, _, _ in other_responses) + errors = ', '.join('%s x %s' % (num, code) + for code, num in sorted(bad_codes.items())) + self.assertIn('Unable to get enough responses (%s error responses)' + % errors, error_lines[1]) + + def _assert_diskfile_quarantined(self): + warning_lines = self.logger.get_lines_for_level('warning') + self.assertEqual(1, len(warning_lines), warning_lines) + self.assertIn('Quarantined object', warning_lines[0]) + + # Check the diskfile has moved to quarantine dir + data_filename = os.path.basename(self.df._data_file) + df_hash = os.path.basename(self.df._datadir) + quarantine_dir = os.path.join( + self.df._device_path, 'quarantined', + diskfile.get_data_dir(self.policy), df_hash) + self.assertTrue(os.path.isdir(quarantine_dir)) + quarantine_file = os.path.join(quarantine_dir, data_filename) + self.assertTrue(os.path.isfile(quarantine_file)) + with open(quarantine_file, 'r') as fd: + self.assertEqual('test data', fd.read()) + self.assertFalse(os.path.exists(self.df._data_file)) + + def _assert_diskfile_not_quarantined(self): + # Check the diskfile has not moved to quarantine dir + quarantine_dir = os.path.join( + self.df._device_path, 'quarantined') + self.assertFalse(os.path.isdir(quarantine_dir)) + self.assertTrue(os.path.exists(self.df._data_file)) + with open(self.df._data_file, 'r') as fd: + self.assertEqual('test data', fd.read()) + + def test_reconstruct_fa_quarantine_threshold_one_rnc_two_replicas(self): + # use default request_node_count == 2 * replicas + num_other_resps = 2 * self.policy.object_ring.replicas - 2 + other_responses = [(404, None, None)] * num_other_resps + conf = {'quarantine_threshold': 1, 'reclaim_age': 0} + exc = self._do_test_reconstruct_insufficient_frags( + conf, 1, other_responses) + self.assertIsInstance(exc, DiskFileQuarantined) + self._assert_diskfile_quarantined() + self._verify_error_lines(1, other_responses, 1) + + def test_reconstruct_fa_quarantine_threshold_one_rnc_three_replicas(self): + num_other_resps = 3 * self.policy.object_ring.replicas - 2 + other_responses = [(404, None, None)] * num_other_resps + conf = {'quarantine_threshold': 1, 'reclaim_age': 0, + 'request_node_count': '3 * replicas'} + # set ring get_more_nodes to yield enough handoffs + self.policy.object_ring.max_more_nodes = ( + 2 * self.policy.object_ring.replicas) + exc = self._do_test_reconstruct_insufficient_frags( + conf, 1, other_responses) + self.assertIsInstance(exc, DiskFileQuarantined) + self._assert_diskfile_quarantined() + self._verify_error_lines(1, other_responses, 1) + + def test_reconstruct_fa_quarantine_threshold_one_rnc_four_replicas(self): + # verify handoff search exhausting handoff node iter + num_other_resps = 3 * self.policy.object_ring.replicas - 2 + other_responses = [(404, None, None)] * num_other_resps + conf = {'quarantine_threshold': 1, 'reclaim_age': 0, + 'request_node_count': '4 * replicas'} + # limit ring get_more_nodes to yield less than + # (request_node_count - 1 * replicas) nodes + self.policy.object_ring.max_more_nodes = ( + 2 * self.policy.object_ring.replicas) + exc = self._do_test_reconstruct_insufficient_frags( + conf, 1, other_responses) + self.assertIsInstance(exc, DiskFileQuarantined) + self._assert_diskfile_quarantined() + self._verify_error_lines(1, other_responses, 1) + + def test_reconstruct_fa_quarantine_threshold_one_rnc_absolute_number(self): + def do_test(rnc_num): + if rnc_num < self.policy.object_ring.replicas: + num_other_resps = self.policy.object_ring.replicas - 2 + else: + num_other_resps = rnc_num - 2 + other_responses = [(404, None, None)] * num_other_resps + conf = {'quarantine_threshold': 1, 'reclaim_age': 0, + 'request_node_count': str(rnc_num)} + # set ring get_more_nodes to yield enough handoffs + self.policy.object_ring.max_more_nodes = ( + 2 * self.policy.object_ring.replicas) + exc = self._do_test_reconstruct_insufficient_frags( + conf, 1, other_responses) + self.assertIsInstance(exc, DiskFileQuarantined) + self._assert_diskfile_quarantined() + self._verify_error_lines(1, other_responses, 1) + + for rnc_num in range(0, 3 * self.policy.object_ring.replicas): + do_test(rnc_num) + + def test_reconstruct_fa_quarantine_threshold_two(self): + num_other_resps = 2 * self.policy.object_ring.replicas - 3 + other_responses = [(404, None, None)] * num_other_resps + conf = {'quarantine_threshold': 2, 'reclaim_age': 0} + exc = self._do_test_reconstruct_insufficient_frags( + conf, 2, other_responses) + self.assertIsInstance(exc, DiskFileQuarantined) + self._assert_diskfile_quarantined() + self._verify_error_lines(2, other_responses, 2) + + def test_reconstruct_fa_no_quarantine_more_than_threshold_frags(self): + # default config + num_other_resps = self.policy.object_ring.replicas - 2 + other_responses = [(404, None, None)] * num_other_resps + exc = self._do_test_reconstruct_insufficient_frags( + {'reclaim_age': 0}, 1, other_responses) + self.assertIsInstance(exc, DiskFileError) + self._assert_diskfile_not_quarantined() + + # configured quarantine_threshold + for quarantine_threshold in range(self.policy.ec_ndata): + for num_frags in range(quarantine_threshold + 1, + self.policy.ec_ndata): + num_other_resps = (self.policy.object_ring.replicas - + num_frags - 1) + other_responses = [(404, None, None)] * num_other_resps + exc = self._do_test_reconstruct_insufficient_frags( + {'quarantine_threshold': quarantine_threshold, + 'reclaim_age': 0}, + num_frags, other_responses) + self.assertIsInstance(exc, DiskFileError) + self._assert_diskfile_not_quarantined() + self._verify_error_lines(num_frags, other_responses, num_frags) + warning_lines = self.logger.get_lines_for_level('warning') + self.assertEqual([], warning_lines) + + # responses include the frag_index_to_rebuild - verify that response is + # counted against the threshold + num_other_resps = self.policy.object_ring.replicas - 3 + other_responses = [(404, None, None)] * num_other_resps + exc = self._do_test_reconstruct_insufficient_frags( + {'quarantine_threshold': 1, 'reclaim_age': 0}, 2, other_responses, + local_frag_index=2, frag_index_to_rebuild=3) + self.assertIsInstance(exc, DiskFileError) + self._assert_diskfile_not_quarantined() + self._verify_error_lines(2, other_responses, 1) + + def test_reconstruct_fa_no_quarantine_non_404_response(self): + num_frags = 1 + ring = self.policy.object_ring + for bad_status in (400, 503, 507): + # a non-404 in primary responses will prevent quarantine + num_other_resps = ring.replicas - num_frags - 1 + other_responses = [(404, None, None)] * (num_other_resps - 1) + other_responses.append((bad_status, None, None)) + exc = self._do_test_reconstruct_insufficient_frags( + {'quarantine_threshold': 1, 'reclaim_age': 0}, + num_frags, other_responses) + self.assertIsInstance(exc, DiskFileError) + self._assert_diskfile_not_quarantined() + self._verify_error_lines(num_frags, other_responses, num_frags) + warning_lines = self.logger.get_lines_for_level('warning') + self.assertEqual(1, len(warning_lines), warning_lines) + self.assertIn('Invalid response %s' % bad_status, warning_lines[0]) + + # a non-404 in handoff responses will prevent quarantine; non-404 + # is the *final* handoff response... + ring.max_more_nodes = (13 * ring.replicas) + for request_node_count in (2, 3, 13): + num_other_resps = (request_node_count * ring.replicas + - num_frags - 1) + other_responses = [(404, None, None)] * (num_other_resps - 1) + other_responses.append((bad_status, None, None)) + with annotate_failure( + 'request_node_count=%d' % request_node_count): + exc = self._do_test_reconstruct_insufficient_frags( + {'quarantine_threshold': 1, + 'reclaim_age': 0, + 'request_node_count': '%s * replicas' + % request_node_count}, + num_frags, other_responses) + self.assertIsInstance(exc, DiskFileError) + self._assert_diskfile_not_quarantined() + self._verify_error_lines(num_frags, other_responses, num_frags) + warning_lines = self.logger.get_lines_for_level('warning') + self.assertEqual(1, len(warning_lines), warning_lines) + self.assertIn('Invalid response %s' % bad_status, + warning_lines[0]) + + # a non-404 in handoff responses will prevent quarantine; non-404 + # is part way through all handoffs so not all handoffs are used + # regardless of how big request_node_count is + non_404_handoff = 3 + for request_node_count in (2, 3, 13): + # replicas - 1 - num_frags other_responses from primaries, + # plus a batch of replicas - 1 during which non-404 shows up, + # plus some that trickle out before the non-404 shows up, but + # limited to (request_node_count * replicas - num_frags - 1) + # e.g. for 10+4 policy with request_node_count > 2 + # - batch of 13 requests go to primaries, + # - 12 other_responses are consumed, + # - then a batch of 13 handoff requests is sent, + # - the non-404 is the 4th response in that batch, + # - so 3 more requests will have been trickled out + batch_size = ring.replicas - 1 + num_other_resps = min( + 2 * batch_size - num_frags + non_404_handoff, + request_node_count * ring.replicas - 1 - num_frags) + other_responses = [(404, None, None)] * (num_other_resps - 1) + other_responses.insert( + batch_size - num_frags + non_404_handoff, + (bad_status, None, None)) + exc = self._do_test_reconstruct_insufficient_frags( + {'quarantine_threshold': 1, 'reclaim_age': 0, + 'request_node_count': '%s * replicas' + % request_node_count}, + num_frags, other_responses) + self.assertIsInstance(exc, DiskFileError) + self._assert_diskfile_not_quarantined() + self._verify_error_lines(num_frags, other_responses, num_frags) + warning_lines = self.logger.get_lines_for_level('warning') + self.assertEqual(1, len(warning_lines), warning_lines) + self.assertIn('Invalid response %s' % bad_status, + warning_lines[0]) + + def test_reconstruct_fa_no_quarantine_frag_not_old_enough(self): + # verify that solitary fragment is not quarantined if it has not + # reached reclaim_age + num_other_resps = self.policy.object_ring.replicas - 2 + other_responses = [(404, None, None)] * num_other_resps + exc = self._do_test_reconstruct_insufficient_frags( + {'quarantine_threshold': 1, 'reclaim_age': 10000}, + 1, other_responses) + self.assertIsInstance(exc, DiskFileError) + self._assert_diskfile_not_quarantined() + self._verify_error_lines(1, other_responses, 1) + + exc = self._do_test_reconstruct_insufficient_frags( + {'quarantine_threshold': 1}, # default reclaim_age + 1, other_responses) + self.assertIsInstance(exc, DiskFileError) + self._assert_diskfile_not_quarantined() + self._verify_error_lines(1, other_responses, 1) + + def test_reconstruct_fa_no_quarantine_frag_resp_different_timestamp(self): + # verify that solitary fragment is not quarantined if the only frag + # response is for a different timestamp than the local frag + resp_timestamp = utils.Timestamp(float(self.obj_timestamp) + 1) + num_other_resps = self.policy.object_ring.replicas - 2 + other_responses = [(404, None, None)] * num_other_resps + resp_timestamps = [resp_timestamp] * (num_other_resps + 1) + exc = self._do_test_reconstruct_insufficient_frags( + {'quarantine_threshold': 1, 'reclaim_age': 0}, + 1, other_responses, resp_timestamps=resp_timestamps) + self.assertIsInstance(exc, DiskFileError) + self._assert_diskfile_not_quarantined() + self._verify_error_lines(1, other_responses, 1) + + def test_reconstruct_fa_no_quarantine_frag_resp_mixed_timestamps(self): + # verify that solitary fragment is not quarantined if there is a + # response for a frag at different timestamp in addition to the + # response for the solitary local frag + resp_timestamp = utils.Timestamp(float(self.obj_timestamp) + 1) + num_other_resps = self.policy.object_ring.replicas - 3 + other_responses = [(404, None, None)] * num_other_resps + resp_timestamps = ([self.obj_timestamp] + + [resp_timestamp] * (num_other_resps + 1)) + exc = self._do_test_reconstruct_insufficient_frags( + {'quarantine_threshold': 1, 'reclaim_age': 0}, + 2, other_responses, resp_timestamps=resp_timestamps) + self.assertIsInstance(exc, DiskFileError) + self._assert_diskfile_not_quarantined() + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(3, len(error_lines), error_lines) + self.assertIn( + 'Unable to get enough responses (1/%d from 1 ok responses)' + % (self.policy.ec_ndata,), error_lines[0]) + self.assertIn( + 'Unable to get enough responses (1/%d from 1 ok responses)' + % (self.policy.ec_ndata,), error_lines[1]) + self.assertIn( + 'Unable to get enough responses (%d x 404 error responses)' + % num_other_resps, error_lines[2]) + + def test_reconstruct_fa_no_quarantine_frag_resp_mixed_etags(self): + # verify that solitary fragment is not quarantined if there is a + # response for a frag with different etag in addition to the + # response for the solitary local frag + etags = [None, 'unexpected_etag'] + num_other_resps = self.policy.object_ring.replicas - 3 + other_responses = [(404, None, None)] * num_other_resps + exc = self._do_test_reconstruct_insufficient_frags( + {'quarantine_threshold': 1, 'reclaim_age': 0}, + 2, other_responses, resp_etags=etags) + self.assertIsInstance(exc, DiskFileError) + self._assert_diskfile_not_quarantined() + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(3, len(error_lines), error_lines) + self.assertIn( + 'Mixed Etag', error_lines[0]) + self.assertIn( + 'Unable to get enough responses (1/%d from 2 ok responses)' + % (self.policy.ec_ndata,), error_lines[1]) + self.assertIn( + 'Unable to get enough responses (%d x 404 error responses)' + % num_other_resps, error_lines[2]) + + def _do_test_reconstruct_fa_no_quarantine_bad_headers(self, bad_headers): + # verify that responses with invalid headers count against the + # quarantine_threshold + self._configure_reconstructor(reclaim_age=0, quarantine_threshold=1) + local_frag_index = 2 + self._create_fragment(local_frag_index) + job = { + 'partition': 0, + 'policy': self.policy, + } + part_nodes = self.policy.object_ring.get_part_nodes(0) + node = part_nodes[0] + node['backend_index'] = self.policy.get_backend_index(node['index']) + + test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777] + etag = md5(test_data, usedforsecurity=False).hexdigest() + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) + + def make_header(body): + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag}) + return headers + + responses = [] + body = ec_archive_bodies[2] + headers = make_header(body) + responses.append((200, body, headers)) + body = ec_archive_bodies[3] + headers = make_header(body) + headers.update(bad_headers) + responses.append((200, body, headers)) + other_responses = ([(404, None, None)] * + (self.policy.object_ring.replicas - 3)) + codes, body_iter, headers = zip(*(responses + other_responses)) + resp_timestamps = [self.obj_timestamp] * len(codes) + resp_timestamps = [ts.internal for ts in resp_timestamps] + with mocked_http_conn(*codes, body_iter=body_iter, + headers=headers, + timestamps=resp_timestamps): + with self.assertRaises(DiskFileError) as cm: + self.reconstructor.reconstruct_fa( + job, node, self._create_fragment(2)) + self.assertIsInstance(cm.exception, DiskFileError) + self._assert_diskfile_not_quarantined() + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(2, len(error_lines), error_lines) + self.assertIn( + 'Unable to get enough responses (1/%d from 1 ok responses)' + % (self.policy.ec_ndata,), error_lines[0]) + self.assertIn( + 'Unable to get enough responses ' + '(1 x unknown, %d x 404 error responses)' + % len(other_responses), error_lines[1]) + + def test_reconstruct_fa_no_quarantine_invalid_frag_index_header(self): + self._do_test_reconstruct_fa_no_quarantine_bad_headers( + {'X-Object-Sysmeta-Ec-Frag-Index': 'two'}) + + def test_reconstruct_fa_no_quarantine_missing_frag_index_header(self): + self._do_test_reconstruct_fa_no_quarantine_bad_headers( + {'X-Object-Sysmeta-Ec-Frag-Index': ''}) + + def test_reconstruct_fa_no_quarantine_missing_timestamp_header(self): + self._do_test_reconstruct_fa_no_quarantine_bad_headers( + {'X-Backend-Data-Timestamp': ''}) + + def test_reconstruct_fa_no_quarantine_missing_etag_header(self): + self._do_test_reconstruct_fa_no_quarantine_bad_headers( + {'X-Object-Sysmeta-Ec-Etag': ''}) + + def test_reconstruct_fa_frags_on_handoffs(self): + # just a lonely old frag on primaries: this appears to be a quarantine + # candidate, but unexpectedly the other frags are found on handoffs so + # expect rebuild + # set reclaim_age to 0 to make lonely frag old enugh for quarantine + self._configure_reconstructor(quarantine_threshold=1, reclaim_age=0) + job = { + 'partition': 0, + 'policy': self.policy, + } + part_nodes = self.policy.object_ring.get_part_nodes(0) + node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) + + test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777] + etag = md5(test_data, usedforsecurity=False).hexdigest() + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) + broken_body = ec_archive_bodies.pop(1) + + # arrange for just one 200 to come from a primary, then 404s, then 200s + # from handoffs + responses = list() + for i, body in enumerate(ec_archive_bodies): + if i == 1: + # skip: this is the frag index we're rebuilding; insert 404s + responses.extend( + ((404, None, None),) * self.policy.object_ring.replicas) + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag}) + responses.append((200, body, headers)) + + codes, body_iter, headers = zip(*responses) + with mocked_http_conn( + *codes, body_iter=body_iter, headers=headers, + timestamps=[self.obj_timestamp.internal] * len(codes)): + df = self.reconstructor.reconstruct_fa( + job, node, self._create_fragment(0, body=b'')) + self.assertEqual(0, df.content_length) + fixed_body = b''.join(df.reader()) + self.assertEqual(len(fixed_body), len(broken_body)) + self.assertEqual(md5(fixed_body, usedforsecurity=False).hexdigest(), + md5(broken_body, usedforsecurity=False).hexdigest()) + # no error and warning + self.assertFalse(self.logger.get_lines_for_level('error')) + self.assertFalse(self.logger.get_lines_for_level('warning')) + debug_lines = self.logger.get_lines_for_level('debug') + self.assertIn('Reconstructing frag from handoffs, node_count=%d' + % (self.policy.object_ring.replicas * 2), debug_lines) + def test_reconstruct_fa_finds_duplicate_does_not_fail(self): job = { 'partition': 0, @@ -5280,7 +5866,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( - job, node, self.obj_metadata) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( @@ -5339,7 +5925,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): with mocked_http_conn( *codes, body_iter=body_iter, headers=headers) as mock_conn: df = self.reconstructor.reconstruct_fa( - job, node, self.obj_metadata) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( @@ -5408,7 +5994,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): with mocked_http_conn( *codes, body_iter=body_iter, headers=headers) as mock_conn: df = self.reconstructor.reconstruct_fa( - job, node, self.obj_metadata) + job, node, self._create_fragment(2)) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( @@ -5439,7 +6025,60 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): @patch_policies(with_ec_default=True) class TestReconstructFragmentArchiveUTF8(TestReconstructFragmentArchive): # repeat superclass tests with an object path that contains non-ascii chars - obj_path = b'/a/c/o\xc3\xa8' + obj_name = b'o\xc3\xa8' + + +@patch_policies([ECStoragePolicy(0, name='ec', is_default=True, + ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=10, ec_nparity=4, + ec_segment_size=4096, + ec_duplication_factor=2), + StoragePolicy(1, name='other')], + fake_ring_args=[{'replicas': 28}, {'replicas': 3}]) +class TestReconstructFragmentArchiveECDuplicationFactor( + TestReconstructFragmentArchive): + def test_reconstruct_fa_no_quarantine_duplicate_frags(self): + # verify that quarantine does not happen if the only other response in + # addition to the lonely frag's own response is for the same + # (duplicate) frag index + self._configure_reconstructor(quarantine_threshold=1, reclaim_age=0) + local_frag_index = 2 + self._create_fragment(local_frag_index) + job = { + 'partition': 0, + 'policy': self.policy, + } + part_nodes = self.policy.object_ring.get_part_nodes(0) + node = part_nodes[0] + node['backend_index'] = self.policy.get_backend_index(node['index']) + + test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777] + etag = md5(test_data, usedforsecurity=False).hexdigest() + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) + frags = [ + ec_archive_bodies[local_frag_index], + ec_archive_bodies[local_frag_index + + self.policy.ec_n_unique_fragments]] + + def make_header(body): + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag}) + return headers + + responses = [(200, frag, make_header(frag)) for frag in frags] + other_responses = ([(404, None, None)] * + (self.policy.ec_n_unique_fragments * 2 - 3)) + codes, body_iter, headers = zip(*(responses + other_responses)) + resp_timestamps = [self.obj_timestamp.internal] * len(codes) + with mocked_http_conn(*codes, body_iter=body_iter, + headers=headers, + timestamps=resp_timestamps): + with self.assertRaises(DiskFileError) as cm: + self.reconstructor.reconstruct_fa( + job, node, self._create_fragment(2)) + self.assertIsInstance(cm.exception, DiskFileError) + self._assert_diskfile_not_quarantined() + self._verify_error_lines(2, other_responses, 1) @patch_policies([ECStoragePolicy(0, name='ec', is_default=True, @@ -5455,6 +6094,13 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor): self.fabricated_ring = FabricatedRing(replicas=28, devices=56) def _test_reconstruct_with_duplicate_frags_no_errors(self, index): + utils.mkdirs(os.path.join(self.devices, 'sda1')) + df_mgr = self.reconstructor._df_router[self.policy] + df = df_mgr.get_diskfile('sda1', 9, 'a', 'c', 'o', + policy=self.policy) + write_diskfile(df, self.ts(), data=b'', frag_index=2) + df.open() + job = { 'partition': 0, 'policy': self.policy, @@ -5462,12 +6108,6 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor): part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[index] node['backend_index'] = self.policy.get_backend_index(node['index']) - metadata = { - 'name': '/a/c/o', - 'Content-Length': 0, - 'ETag': 'etag', - 'X-Timestamp': '1234567890.12345', - } test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data, usedforsecurity=False).hexdigest() @@ -5486,9 +6126,9 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor): called_headers = [] orig_func = object_reconstructor.ObjectReconstructor._get_response - def _get_response_hook(self, node, part, path, headers, policy): + def _get_response_hook(self, node, policy, part, path, headers): called_headers.append(headers) - return orig_func(self, node, part, path, headers, policy) + return orig_func(self, node, policy, part, path, headers) # need parity + 1 node failures to reach duplicated fragments failed_start_at = ( @@ -5505,7 +6145,7 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor): with mocked_http_conn( *codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( - job, node, metadata) + job, node, df) fixed_body = b''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual( diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index 1322e89c32..be98e66f58 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -16,11 +16,9 @@ from collections import defaultdict import mock import os -import time import unittest import eventlet -import itertools from six.moves import urllib from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ @@ -28,8 +26,7 @@ from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ from swift.common import swob from swift.common import utils from swift.common.storage_policy import POLICIES, EC_POLICY -from swift.common.utils import Timestamp -from swift.obj import ssync_sender, server +from swift.obj import ssync_sender, server, diskfile from swift.obj.reconstructor import RebuildingECDiskFileStream, \ ObjectReconstructor from swift.obj.replicator import ObjectReplicator @@ -38,7 +35,7 @@ from test import listen_zero from test.debug_logger import debug_logger from test.unit.obj.common import BaseTest from test.unit import patch_policies, encode_frag_archive_bodies, \ - skip_if_no_xattrs, quiet_eventlet_exceptions + skip_if_no_xattrs, quiet_eventlet_exceptions, make_timestamp_iter class TestBaseSsync(BaseTest): @@ -62,8 +59,7 @@ class TestBaseSsync(BaseTest): 'log_requests': 'false'} self.rx_logger = debug_logger() self.rx_controller = server.ObjectController(conf, self.rx_logger) - self.ts_iter = (Timestamp(t) - for t in itertools.count(int(time.time()))) + self.ts_iter = make_timestamp_iter() self.rx_ip = '127.0.0.1' sock = listen_zero() self.rx_server = eventlet.spawn( @@ -653,11 +649,12 @@ class TestSsyncEC(TestBaseSsyncEC): reconstruct_fa_calls = [] - def fake_reconstruct_fa(job, node, metadata): - reconstruct_fa_calls.append((job, node, policy, metadata)) + def fake_reconstruct_fa(job, node, df): + reconstruct_fa_calls.append((job, node, policy, df)) if len(reconstruct_fa_calls) == 2: # simulate second reconstruct failing raise DiskFileError + metadata = df.get_datafile_metadata() content = self._get_object_data(metadata['name'], frag_index=rx_node_index) return RebuildingECDiskFileStream( @@ -702,7 +699,8 @@ class TestSsyncEC(TestBaseSsyncEC): # remove the failed df from expected synced df's expect_sync_paths = ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5'] - failed_path = reconstruct_fa_calls[1][3]['name'] + failed_df = reconstruct_fa_calls[1][3] + failed_path = failed_df.get_datafile_metadata()['name'] expect_sync_paths.remove(failed_path) failed_obj = None for obj, diskfiles in tx_objs.items(): @@ -843,26 +841,26 @@ class TestSsyncEC(TestBaseSsyncEC): class FakeResponse(object): - def __init__(self, frag_index, obj_data, length=None): - self.headers = { - 'X-Object-Sysmeta-Ec-Frag-Index': str(frag_index), - 'X-Object-Sysmeta-Ec-Etag': 'the etag', - 'X-Backend-Timestamp': '1234567890.12345' - } + def __init__(self, frag_index, obj_data, length=None, status=200): self.frag_index = frag_index self.obj_data = obj_data self.data = b'' self.length = length - self.status = 200 + self.status = status - def init(self, path): + def init(self, path, conf): if isinstance(self.obj_data, Exception): self.data = self.obj_data else: self.data = self.obj_data[path][self.frag_index] + self.conf = conf def getheaders(self): - return self.headers + return { + 'X-Object-Sysmeta-Ec-Frag-Index': str(self.frag_index), + 'X-Object-Sysmeta-Ec-Etag': 'the etag', + 'X-Backend-Timestamp': self.conf['timestamp'].internal + } def read(self, length): if isinstance(self.data, Exception): @@ -878,7 +876,9 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): self.rx_node_index = 0 self.tx_node_index = 1 - # create sender side diskfiles... + # create sender side diskfiles...ensure their timestamps are in the + # past so that tests that set reclaim_age=0 succeed in reclaiming + self.ts_iter = make_timestamp_iter(offset=-1000) self.tx_objs = {} tx_df_mgr = self.daemon._df_router[self.policy] t1 = next(self.ts_iter) @@ -887,6 +887,8 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): t2 = next(self.ts_iter) self.tx_objs['o2'] = self._create_ondisk_files( tx_df_mgr, 'o2', self.policy, t2, (self.tx_node_index,)) + self.response_confs = {'/a/c/o1': {'timestamp': t1}, + '/a/c/o2': {'timestamp': t2}} self.suffixes = set() for diskfiles in list(self.tx_objs.values()): @@ -900,7 +902,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): self.frag_length = int( self.tx_objs['o1'][0].get_metadata()['Content-Length']) - def _test_reconstructor_sync_job(self, frag_responses): + def _test_reconstructor_sync_job(self, frag_responses, custom_conf=None): # Helper method to mock reconstructor to consume given lists of fake # responses while reconstructing a fragment for a sync type job. The # tests verify that when the reconstructed fragment iter fails in some @@ -908,25 +910,31 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): # node which have incorrect data. # See https://bugs.launchpad.net/swift/+bug/1631144 + custom_conf = custom_conf if custom_conf else {} # frag_responses is a list of two lists of responses to each # reconstructor GET request for a fragment archive. The two items in # the outer list are lists of responses for each of the two fragments - # to be reconstructed. Items in the inner lists are responses for each - # of the other fragments fetched during the reconstructor rebuild. + # to be reconstructed, and are used in the order that ssync syncs the + # fragments. Items in the inner lists are responses for each of the + # other fragments fetched during the reconstructor rebuild. path_to_responses = {} fake_get_response_calls = [] - def fake_get_response(recon, node, part, path, headers, policy): + def fake_get_response(recon, node, policy, part, path, headers): # select a list of fake responses for this path and return the next - # from the list + # from the list: we don't know the order in which paths will show + # up but we do want frag_responses[0] to be used first, so the + # frag_responses aren't bound to a path until this point if path not in path_to_responses: path_to_responses[path] = frag_responses.pop(0) response = path_to_responses[path].pop() - # the frag_responses list is in ssync task order, we only know the + # the frag_responses list is in ssync task order: we only know the # path when consuming the responses so initialise the path in the # response now if response: - response.init(path) + response.init(path, self.response_confs[path]) + # should be full path but just used for logging... + response.full_path = path fake_get_response_calls.append(path) return response @@ -944,17 +952,19 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): mock.patch.object( self.policy.object_ring, 'get_part_nodes', fake_get_part_nodes): - self.reconstructor = ObjectReconstructor( - {}, logger=self.logger) + conf = self.daemon_conf + conf.update(custom_conf) + self.reconstructor = ObjectReconstructor(conf, logger=self.logger) job = { 'device': self.device, 'partition': self.partition, 'policy': self.policy, + 'frag_index': self.tx_node_index, 'sync_diskfile_builder': self.reconstructor.reconstruct_fa } sender = ssync_sender.Sender( - self.daemon, self.job_node, job, self.suffixes) + self.reconstructor, self.job_node, job, self.suffixes) sender.connect, trace = self.make_connect_wrapper(sender) sender() return trace @@ -975,7 +985,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): df = self._open_rx_diskfile( obj_name, self.policy, self.rx_node_index) msgs.append('Unexpected rx diskfile for %r with content %r' % - (obj_name, ''.join([d for d in df.reader()]))) + (obj_name, b''.join([d for d in df.reader()]))) except DiskFileNotExist: pass # expected outcome if msgs: @@ -987,6 +997,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): # trampoline for the receiver to write a log eventlet.sleep(0) log_lines = self.rx_logger.get_lines_for_level('warning') + self.assertEqual(1, len(log_lines), self.rx_logger.all_log_lines()) self.assertIn('ssync subrequest failed with 499', log_lines[0]) self.assertFalse(log_lines[1:]) @@ -1009,7 +1020,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): df = self._open_rx_diskfile( obj_name, self.policy, self.rx_node_index) msgs.append('Unexpected rx diskfile for %r with content %r' % - (obj_name, ''.join([d for d in df.reader()]))) + (obj_name, b''.join([d for d in df.reader()]))) except DiskFileNotExist: pass # expected outcome if msgs: @@ -1053,7 +1064,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): df = self._open_rx_diskfile( obj_name, self.policy, self.rx_node_index) msgs.append('Unexpected rx diskfile for %r with content %r' % - (obj_name, ''.join([d for d in df.reader()]))) + (obj_name, b''.join([d for d in df.reader()]))) except DiskFileNotExist: pass # expected outcome if msgs: @@ -1111,7 +1122,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): df = self._open_rx_diskfile( obj_name, self.policy, self.rx_node_index) msgs.append('Unexpected rx diskfile for %r with content %r' % - (obj_name, ''.join([d for d in df.reader()]))) + (obj_name, b''.join([d for d in df.reader()]))) except DiskFileNotExist: pass # expected outcome if msgs: @@ -1123,6 +1134,109 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): self.assertFalse(self.rx_logger.get_lines_for_level('warning')) self.assertFalse(self.rx_logger.get_lines_for_level('error')) + def test_sync_reconstructor_quarantines_lonely_frag(self): + # First fragment to sync gets only one response for reconstructor to + # rebuild with, and that response is for the tx_node frag index: it + # should be quarantined, but after that the ssync session should still + # proceeed with rebuilding the second frag. + lonely_frag_responses = [ + FakeResponse(i, self.obj_data, status=404) + for i in range(self.policy.ec_ndata + self.policy.ec_nparity)] + lonely_frag_responses[self.tx_node_index].status = 200 + frag_responses = [ + lonely_frag_responses, + [FakeResponse(i, self.obj_data) + for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]] + + # configure reconstructor to quarantine the lonely frag + custom_conf = {'reclaim_age': 0, 'quarantine_threshold': 1} + trace = self._test_reconstructor_sync_job(frag_responses, custom_conf) + results = self._analyze_trace(trace) + self.assertEqual(2, len(results['tx_missing'])) + self.assertEqual(2, len(results['rx_missing'])) + self.assertEqual(1, len(results['tx_updates'])) + self.assertFalse(results['rx_updates']) + self.assertEqual('PUT', results['tx_updates'][0].get('method')) + synced_obj_path = results['tx_updates'][0].get('path') + synced_obj_name = synced_obj_path[-2:] + + # verify that the second frag was rebuilt on rx node... + msgs = [] + try: + df = self._open_rx_diskfile( + synced_obj_name, self.policy, self.rx_node_index) + self.assertEqual( + self._get_object_data(synced_obj_path, + frag_index=self.rx_node_index), + b''.join([d for d in df.reader()])) + except DiskFileNotExist: + msgs.append('Missing rx diskfile for %r' % synced_obj_name) + # ...and it is still on tx node... + try: + df = self._open_tx_diskfile( + synced_obj_name, self.policy, self.tx_node_index) + self.assertEqual( + self._get_object_data(df._name, + frag_index=self.tx_node_index), + b''.join([d for d in df.reader()])) + except DiskFileNotExist: + msgs.append('Missing tx diskfile for %r' % synced_obj_name) + + # verify that the lonely frag was not rebuilt on rx node and was + # removed on tx node + obj_names = list(self.tx_objs) + obj_names.remove(synced_obj_name) + quarantined_obj_name = obj_names[0] + try: + df = self._open_rx_diskfile( + quarantined_obj_name, self.policy, self.rx_node_index) + msgs.append( + 'Unexpected rx diskfile for %r with content %r' % + (quarantined_obj_name, b''.join([d for d in df.reader()]))) + except DiskFileNotExist: + pass # expected outcome + try: + df = self._open_tx_diskfile( + quarantined_obj_name, self.policy, self.tx_node_index) + msgs.append( + 'Unexpected tx diskfile for %r with content %r' % + (quarantined_obj_name, b''.join([d for d in df.reader()]))) + except DiskFileNotExist: + pass # expected outcome + + if msgs: + self.fail('Failed with:\n%s' % '\n'.join(msgs)) + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(2, len(error_lines), error_lines) + self.assertIn('Unable to get enough responses', error_lines[0]) + self.assertIn('Unable to get enough responses', error_lines[1]) + warning_lines = self.logger.get_lines_for_level('warning') + self.assertEqual(1, len(warning_lines), warning_lines) + self.assertIn('Quarantined object', warning_lines[0]) + + # check we have a quarantined data file + df_mgr = self.daemon._df_router[self.policy] + quarantined_df = df_mgr.get_diskfile( + self.device, self.partition, account='a', container='c', + obj=quarantined_obj_name, policy=self.policy, + frag_index=self.tx_node_index) + df_hash = os.path.basename(quarantined_df._datadir) + quarantine_dir = os.path.join( + quarantined_df._device_path, 'quarantined', + diskfile.get_data_dir(self.policy), df_hash) + self.assertTrue(os.path.isdir(quarantine_dir)) + data_file = os.listdir(quarantine_dir)[0] + with open(os.path.join(quarantine_dir, data_file), 'rb') as fd: + self.assertEqual( + self._get_object_data(quarantined_df._name, + frag_index=self.tx_node_index), + fd.read()) + + # trampoline for the receiver to write a log + eventlet.sleep(0) + self.assertFalse(self.rx_logger.get_lines_for_level('warning')) + self.assertFalse(self.rx_logger.get_lines_for_level('error')) + def test_sync_reconstructor_rebuild_ok(self): # Sanity test for this class of tests. Both fragments get a full # complement of responses and rebuild correctly. diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index e8f7eef532..ee25376b83 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -4699,10 +4699,30 @@ class TestReplicatedObjectController( self.assertEqual(resp.status_int, 503) def test_node_request_setting(self): - baseapp = proxy_server.Application({'request_node_count': '3'}, + # default is 2 * replicas + baseapp = proxy_server.Application({}, container_ring=FakeRing(), account_ring=FakeRing()) - self.assertEqual(baseapp.request_node_count(3), 3) + self.assertEqual(6, baseapp.request_node_count(3)) + + def do_test(value, replicas, expected): + baseapp = proxy_server.Application({'request_node_count': value}, + container_ring=FakeRing(), + account_ring=FakeRing()) + self.assertEqual(expected, baseapp.request_node_count(replicas)) + + do_test('3', 4, 3) + do_test('1 * replicas', 4, 4) + do_test('2 * replicas', 4, 8) + do_test('4', 4, 4) + do_test('5', 4, 5) + + for bad in ('1.1', 1.1, 'auto', 'bad', + '2.5 * replicas', 'two * replicas'): + with self.assertRaises(ValueError): + proxy_server.Application({'request_node_count': bad}, + container_ring=FakeRing(), + account_ring=FakeRing()) def test_iter_nodes(self): with save_globals():