diff --git a/swift/common/utils.py b/swift/common/utils.py index 093db76ac8..8923246c54 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2268,6 +2268,7 @@ class GreenAsyncPile(object): size = size_or_pool self._responses = eventlet.queue.LightQueue(size) self._inflight = 0 + self._pending = 0 def _run_func(self, func, args, kwargs): try: @@ -2279,6 +2280,7 @@ class GreenAsyncPile(object): """ Spawn a job in a green thread on the pile. """ + self._pending += 1 self._inflight += 1 self._pool.spawn(self._run_func, func, args, kwargs) @@ -2303,12 +2305,13 @@ class GreenAsyncPile(object): def next(self): try: - return self._responses.get_nowait() + rv = self._responses.get_nowait() except Empty: if self._inflight == 0: raise StopIteration() - else: - return self._responses.get() + rv = self._responses.get() + self._pending -= 1 + return rv class ModifiedParseResult(ParseResult): diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 70940f9c16..65d9acdc0f 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -28,6 +28,7 @@ import os import time import functools import inspect +import itertools import operator from sys import exc_info from swift import gettext_ as _ @@ -1125,6 +1126,99 @@ class GetOrHeadHandler(ResumingGetter): return res +class NodeIter(object): + """ + Yields nodes for a ring partition, skipping over error + limited nodes and stopping at the configurable number of nodes. If a + node yielded subsequently gets error limited, an extra node will be + yielded to take its place. + + Note that if you're going to iterate over this concurrently from + multiple greenthreads, you'll want to use a + swift.common.utils.GreenthreadSafeIterator to serialize access. + Otherwise, you may get ValueErrors from concurrent access. (You also + may not, depending on how logging is configured, the vagaries of + socket IO and eventlet, and the phase of the moon.) + + :param app: a proxy app + :param ring: ring to get yield nodes from + :param partition: ring partition to yield nodes for + :param node_iter: optional iterable of nodes to try. Useful if you + want to filter or reorder the nodes. + """ + + def __init__(self, app, ring, partition, node_iter=None): + self.app = app + self.ring = ring + self.partition = partition + + part_nodes = ring.get_part_nodes(partition) + if node_iter is None: + node_iter = itertools.chain( + part_nodes, ring.get_more_nodes(partition)) + num_primary_nodes = len(part_nodes) + self.nodes_left = self.app.request_node_count(num_primary_nodes) + self.expected_handoffs = self.nodes_left - num_primary_nodes + + # Use of list() here forcibly yanks the first N nodes (the primary + # nodes) from node_iter, so the rest of its values are handoffs. + self.primary_nodes = self.app.sort_nodes( + list(itertools.islice(node_iter, num_primary_nodes))) + self.handoff_iter = node_iter + + def __iter__(self): + self._node_iter = self._node_gen() + return self + + def log_handoffs(self, handoffs): + """ + Log handoff requests if handoff logging is enabled and the + handoff was not expected. + + We only log handoffs when we've pushed the handoff count further + than we would normally have expected under normal circumstances, + that is (request_node_count - num_primaries), when handoffs goes + higher than that it means one of the primaries must have been + skipped because of error limiting before we consumed all of our + nodes_left. + """ + if not self.app.log_handoffs: + return + extra_handoffs = handoffs - self.expected_handoffs + if extra_handoffs > 0: + self.app.logger.increment('handoff_count') + self.app.logger.warning( + 'Handoff requested (%d)' % handoffs) + if (extra_handoffs == len(self.primary_nodes)): + # all the primaries were skipped, and handoffs didn't help + self.app.logger.increment('handoff_all_count') + + def _node_gen(self): + for node in self.primary_nodes: + if not self.app.error_limited(node): + yield node + if not self.app.error_limited(node): + self.nodes_left -= 1 + if self.nodes_left <= 0: + return + handoffs = 0 + for node in self.handoff_iter: + if not self.app.error_limited(node): + handoffs += 1 + self.log_handoffs(handoffs) + yield node + if not self.app.error_limited(node): + self.nodes_left -= 1 + if self.nodes_left <= 0: + return + + def next(self): + return next(self._node_iter) + + def __next__(self): + return self.next() + + class Controller(object): """Base WSGI controller class for the proxy""" server_type = 'Base' diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index c82c83150d..22a4a4eb30 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -1951,44 +1951,43 @@ class ECObjectController(BaseObjectController): orig_range = req.range range_specs = self._convert_range(req, policy) - node_iter = GreenthreadSafeIterator(node_iter) - num_gets = policy.ec_ndata - with ContextPool(num_gets) as pool: + safe_iter = GreenthreadSafeIterator(node_iter) + with ContextPool(policy.ec_ndata) as pool: pile = GreenAsyncPile(pool) - for _junk in range(num_gets): + for _junk in range(policy.ec_ndata): pile.spawn(self._fragment_GET_request, - req, node_iter, partition, + req, safe_iter, partition, policy) - gets = list(pile) - good_gets = [] bad_gets = [] - for get, parts_iter in gets: + etag_buckets = collections.defaultdict(list) + best_etag = None + for get, parts_iter in pile: if is_success(get.last_status): - good_gets.append((get, parts_iter)) + etag = HeaderKeyDict( + get.last_headers)['X-Object-Sysmeta-Ec-Etag'] + etag_buckets[etag].append((get, parts_iter)) + if etag != best_etag and ( + len(etag_buckets[etag]) > + len(etag_buckets[best_etag])): + best_etag = etag else: bad_gets.append((get, parts_iter)) + matching_response_count = max( + len(etag_buckets[best_etag]), len(bad_gets)) + if (policy.ec_ndata - matching_response_count > + pile._pending) and node_iter.nodes_left > 0: + # we need more matching responses to reach ec_ndata + # than we have pending gets, as long as we still have + # nodes in node_iter we can spawn another + pile.spawn(self._fragment_GET_request, req, + safe_iter, partition, policy) req.range = orig_range - if len(good_gets) == num_gets: - # If these aren't all for the same object, then error out so - # at least the client doesn't get garbage. We can do a lot - # better here with more work, but this'll work for now. - found_obj_etags = set( - HeaderKeyDict( - getter.last_headers)['X-Object-Sysmeta-Ec-Etag'] - for getter, _junk in good_gets) - if len(found_obj_etags) > 1: - self.app.logger.debug( - "Returning 503 for %s; found too many etags (%s)", - req.path, - ", ".join(found_obj_etags)) - return HTTPServiceUnavailable(request=req) - - # we found enough pieces to decode the object, so now let's - # decode the object + if len(etag_buckets[best_etag]) >= policy.ec_ndata: + # headers can come from any of the getters resp_headers = HeaderKeyDict( - good_gets[0][0].source_headers[-1]) + etag_buckets[best_etag][0][0].source_headers[-1]) resp_headers.pop('Content-Range', None) eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length') obj_length = int(eccl) if eccl is not None else None @@ -1996,11 +1995,10 @@ class ECObjectController(BaseObjectController): # This is only true if we didn't get a 206 response, but # that's the only time this is used anyway. fa_length = int(resp_headers['Content-Length']) - app_iter = ECAppIter( req.swift_entity_path, policy, - [iterator for getter, iterator in good_gets], + [iterator for getter, iterator in etag_buckets[best_etag]], range_specs, fa_length, obj_length, self.app.logger) resp = Response( diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 0401691b6e..b49181dc37 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -19,7 +19,6 @@ import socket from swift import gettext_ as _ from random import shuffle from time import time -import itertools import functools import sys @@ -36,7 +35,7 @@ from swift.common.utils import cache_from_env, get_logger, \ from swift.common.constraints import check_utf8, valid_api_version from swift.proxy.controllers import AccountController, ContainerController, \ ObjectControllerRouter, InfoController -from swift.proxy.controllers.base import get_container_info +from swift.proxy.controllers.base import get_container_info, NodeIter from swift.common.swob import HTTPBadRequest, HTTPForbidden, \ HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \ HTTPServerError, HTTPException, Request, HTTPServiceUnavailable @@ -507,60 +506,7 @@ class Application(object): 'port': node['port'], 'device': node['device']}) def iter_nodes(self, ring, partition, node_iter=None): - """ - Yields nodes for a ring partition, skipping over error - limited nodes and stopping at the configurable number of nodes. If a - node yielded subsequently gets error limited, an extra node will be - yielded to take its place. - - Note that if you're going to iterate over this concurrently from - multiple greenthreads, you'll want to use a - swift.common.utils.GreenthreadSafeIterator to serialize access. - Otherwise, you may get ValueErrors from concurrent access. (You also - may not, depending on how logging is configured, the vagaries of - socket IO and eventlet, and the phase of the moon.) - - :param ring: ring to get yield nodes from - :param partition: ring partition to yield nodes for - :param node_iter: optional iterable of nodes to try. Useful if you - want to filter or reorder the nodes. - """ - part_nodes = ring.get_part_nodes(partition) - if node_iter is None: - node_iter = itertools.chain(part_nodes, - ring.get_more_nodes(partition)) - num_primary_nodes = len(part_nodes) - - # Use of list() here forcibly yanks the first N nodes (the primary - # nodes) from node_iter, so the rest of its values are handoffs. - primary_nodes = self.sort_nodes( - list(itertools.islice(node_iter, num_primary_nodes))) - handoff_nodes = node_iter - nodes_left = self.request_node_count(len(primary_nodes)) - - log_handoffs_threshold = nodes_left - len(primary_nodes) - for node in primary_nodes: - if not self.error_limited(node): - yield node - if not self.error_limited(node): - nodes_left -= 1 - if nodes_left <= 0: - return - handoffs = 0 - for node in handoff_nodes: - if not self.error_limited(node): - handoffs += 1 - if self.log_handoffs and handoffs > log_handoffs_threshold: - self.logger.increment('handoff_count') - self.logger.warning( - 'Handoff requested (%d)' % handoffs) - if handoffs - log_handoffs_threshold == len(primary_nodes): - self.logger.increment('handoff_all_count') - yield node - if not self.error_limited(node): - nodes_left -= 1 - if nodes_left <= 0: - return + return NodeIter(self, ring, partition, node_iter=node_iter) def exception_occurred(self, node, typ, additional_info, **kwargs): diff --git a/test/probe/common.py b/test/probe/common.py index 1479ba9ddc..45a907444d 100644 --- a/test/probe/common.py +++ b/test/probe/common.py @@ -20,6 +20,8 @@ import sys from time import sleep, time from collections import defaultdict import unittest +from hashlib import md5 +from uuid import uuid4 from nose import SkipTest from six.moves.http_client import HTTPConnection @@ -262,6 +264,49 @@ def resetswift(): Manager(['all']).stop() +class Body(object): + + def __init__(self, total=3.5 * 2 ** 20): + self.length = total + self.hasher = md5() + self.read_amount = 0 + self.chunk = uuid4().hex * 2 ** 10 + self.buff = '' + + @property + def etag(self): + return self.hasher.hexdigest() + + def __len__(self): + return self.length + + def read(self, amount): + if len(self.buff) < amount: + try: + self.buff += next(self) + except StopIteration: + pass + rv, self.buff = self.buff[:amount], self.buff[amount:] + return rv + + def __iter__(self): + return self + + def next(self): + if self.buff: + rv, self.buff = self.buff, '' + return rv + if self.read_amount >= self.length: + raise StopIteration() + rv = self.chunk[:int(self.length - self.read_amount)] + self.read_amount += len(rv) + self.hasher.update(rv) + return rv + + def __next__(self): + return next(self) + + class ProbeTest(unittest.TestCase): """ Don't instantiate this directly, use a child class instead. diff --git a/test/probe/test_object_handoff.py b/test/probe/test_object_handoff.py index c7df4b9e07..f3b02c53cd 100755 --- a/test/probe/test_object_handoff.py +++ b/test/probe/test_object_handoff.py @@ -16,13 +16,17 @@ from unittest import main from uuid import uuid4 +import random +from hashlib import md5 +from collections import defaultdict from swiftclient import client from swift.common import direct_client from swift.common.exceptions import ClientException from swift.common.manager import Manager -from test.probe.common import kill_server, ReplProbeTest, start_server +from test.probe.common import (kill_server, start_server, ReplProbeTest, + ECProbeTest, Body) class TestObjectHandoff(ReplProbeTest): @@ -211,5 +215,89 @@ class TestObjectHandoff(ReplProbeTest): self.fail("Expected ClientException but didn't get it") +class TestECObjectHandoffOverwrite(ECProbeTest): + + def get_object(self, container_name, object_name): + headers, body = client.get_object(self.url, self.token, + container_name, + object_name, + resp_chunk_size=64 * 2 ** 10) + resp_checksum = md5() + for chunk in body: + resp_checksum.update(chunk) + return resp_checksum.hexdigest() + + def test_ec_handoff_overwrite(self): + container_name = 'container-%s' % uuid4() + object_name = 'object-%s' % uuid4() + + # create EC container + headers = {'X-Storage-Policy': self.policy.name} + client.put_container(self.url, self.token, container_name, + headers=headers) + + # PUT object + old_contents = Body() + client.put_object(self.url, self.token, container_name, + object_name, contents=old_contents) + + # get our node lists + opart, onodes = self.object_ring.get_nodes( + self.account, container_name, object_name) + + # shutdown one of the primary data nodes + failed_primary = random.choice(onodes) + failed_primary_device_path = self.device_dir('object', failed_primary) + self.kill_drive(failed_primary_device_path) + + # overwrite our object with some new data + new_contents = Body() + client.put_object(self.url, self.token, container_name, + object_name, contents=new_contents) + self.assertNotEqual(new_contents.etag, old_contents.etag) + + # restore failed primary device + self.revive_drive(failed_primary_device_path) + + # sanity - failed node has old contents + req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)} + headers = direct_client.direct_head_object( + failed_primary, opart, self.account, container_name, + object_name, headers=req_headers) + self.assertEqual(headers['X-Object-Sysmeta-EC-Etag'], + old_contents.etag) + + # we have 1 primary with wrong old etag, and we should have 5 with + # new etag plus a handoff with the new etag, so killing 2 other + # primaries forces proxy to try to GET from all primaries plus handoff. + other_nodes = [n for n in onodes if n != failed_primary] + random.shuffle(other_nodes) + for node in other_nodes[:2]: + self.kill_drive(self.device_dir('object', node)) + + # sanity, after taking out two primaries we should be down to + # only four primaries, one of which has the old etag - but we + # also have a handoff with the new etag out there + found_frags = defaultdict(int) + req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)} + for node in onodes + list(self.object_ring.get_more_nodes(opart)): + try: + headers = direct_client.direct_head_object( + node, opart, self.account, container_name, + object_name, headers=req_headers) + except Exception: + continue + found_frags[headers['X-Object-Sysmeta-EC-Etag']] += 1 + self.assertEqual(found_frags, { + new_contents.etag: 4, # this should be enough to rebuild! + old_contents.etag: 1, + }) + + # clear node error limiting + Manager(['proxy']).restart() + + resp_etag = self.get_object(container_name, object_name) + self.assertEqual(resp_etag, new_contents.etag) + if __name__ == '__main__': main() diff --git a/test/probe/test_reconstructor_revert.py b/test/probe/test_reconstructor_revert.py index 5e10c1337e..df4dc8beac 100755 --- a/test/probe/test_reconstructor_revert.py +++ b/test/probe/test_reconstructor_revert.py @@ -21,7 +21,7 @@ import random import shutil from collections import defaultdict -from test.probe.common import ECProbeTest +from test.probe.common import ECProbeTest, Body from swift.common import direct_client from swift.common.storage_policy import EC_POLICY @@ -31,32 +31,6 @@ from swift.obj import reconstructor from swiftclient import client -class Body(object): - - def __init__(self, total=3.5 * 2 ** 20): - self.total = total - self.hasher = md5() - self.size = 0 - self.chunk = 'test' * 16 * 2 ** 10 - - @property - def etag(self): - return self.hasher.hexdigest() - - def __iter__(self): - return self - - def next(self): - if self.size > self.total: - raise StopIteration() - self.size += len(self.chunk) - self.hasher.update(self.chunk) - return self.chunk - - def __next__(self): - return next(self) - - class TestReconstructorRevert(ECProbeTest): def setUp(self): diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 5402ab1de8..653d939cce 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -4530,6 +4530,22 @@ class TestGreenAsyncPile(unittest.TestCase): self.assertEqual(pile.waitall(0.5), [0.1, 0.1]) self.assertEqual(completed[0], 2) + def test_pending(self): + pile = utils.GreenAsyncPile(3) + self.assertEqual(0, pile._pending) + for repeats in range(2): + # repeat to verify that pending will go again up after going down + for i in range(4): + pile.spawn(lambda: i) + self.assertEqual(4, pile._pending) + for i in range(3, -1, -1): + pile.next() + self.assertEqual(i, pile._pending) + # sanity check - the pile is empty + self.assertRaises(StopIteration, pile.next) + # pending remains 0 + self.assertEqual(0, pile._pending) + class TestLRUCache(unittest.TestCase): diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index ea4b165c70..af695ef23f 100755 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -26,6 +26,7 @@ from hashlib import md5 import mock from eventlet import Timeout +from six import BytesIO from six.moves import range import swift @@ -913,6 +914,76 @@ class TestObjControllerLegacyCache(TestReplicatedObjController): self.assertEqual(resp.status_int, 503) +class StubResponse(object): + + def __init__(self, status, body='', headers=None): + self.status = status + self.body = body + self.readable = BytesIO(body) + self.headers = swob.HeaderKeyDict(headers) + fake_reason = ('Fake', 'This response is a lie.') + self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0] + + def getheader(self, header_name, default=None): + return self.headers.get(header_name, default) + + def getheaders(self): + if 'Content-Length' not in self.headers: + self.headers['Content-Length'] = len(self.body) + return self.headers.items() + + def read(self, amt=0): + return self.readable.read(amt) + + +@contextmanager +def capture_http_requests(get_response): + + class FakeConn(object): + + def __init__(self, req): + self.req = req + self.resp = None + + def getresponse(self): + self.resp = get_response(self.req) + return self.resp + + class ConnectionLog(object): + + def __init__(self): + self.connections = [] + + def __len__(self): + return len(self.connections) + + def __getitem__(self, i): + return self.connections[i] + + def __iter__(self): + return iter(self.connections) + + def __call__(self, ip, port, method, path, headers, qs, ssl): + req = { + 'ip': ip, + 'port': port, + 'method': method, + 'path': path, + 'headers': headers, + 'qs': qs, + 'ssl': ssl, + } + conn = FakeConn(req) + self.connections.append(conn) + return conn + + fake_conn = ConnectionLog() + + with mock.patch('swift.common.bufferedhttp.http_connect_raw', + new=fake_conn): + yield fake_conn + + @patch_policies(with_ec_default=True) class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): container_info = { @@ -1344,6 +1415,483 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): for fragments in zip(*fragment_payloads)] return ec_archive_bodies + def _make_ec_object_stub(self, test_body=None, policy=None): + policy = policy or self.policy + segment_size = policy.ec_segment_size + test_body = test_body or ( + 'test' * segment_size)[:-random.randint(0, 1000)] + etag = md5(test_body).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_body, + policy=policy) + return { + 'body': test_body, + 'etag': etag, + 'frags': ec_archive_bodies, + } + + def _fake_ec_node_response(self, node_frags): + """ + Given a list of entries for each node in ring order, where the + entries are a dict (or list of dicts) which describe all of the + fragment(s); create a function suitable for use with + capture_http_requests that will accept a req object and return a + response that will suitably fake the behavior of an object + server who had the given fragments on disk at the time. + """ + node_map = {} + all_nodes = [] + + def _build_node_map(req): + node_key = lambda n: (n['ip'], n['port']) + part = utils.split_path(req['path'], 5, 5, True)[1] + policy = POLICIES[int( + req['headers']['X-Backend-Storage-Policy-Index'])] + all_nodes.extend(policy.object_ring.get_part_nodes(part)) + all_nodes.extend(policy.object_ring.get_more_nodes(part)) + for i, node in enumerate(all_nodes): + node_map[node_key(node)] = i + + # normalize node_frags to a list of fragments for each node even + # if there's only one fragment in the dataset provided. + for i, frags in enumerate(node_frags): + if isinstance(frags, dict): + node_frags[i] = [frags] + + def get_response(req): + if not node_map: + _build_node_map(req) + + try: + node_index = node_map[(req['ip'], req['port'])] + except KeyError: + raise Exception("Couldn't find node %s:%s in %r" % ( + req['ip'], req['port'], all_nodes)) + + try: + frags = node_frags[node_index] + except KeyError: + raise Exception('Found node %r:%r at index %s - ' + 'but only got %s stub response nodes' % ( + req['ip'], req['port'], node_index, + len(node_frags))) + + try: + stub = random.choice(frags) + except IndexError: + stub = None + if stub: + body = stub['obj']['frags'][stub['frag']] + headers = { + 'X-Object-Sysmeta-Ec-Content-Length': len( + stub['obj']['body']), + 'X-Object-Sysmeta-Ec-Etag': stub['obj']['etag'], + 'X-Object-Sysmeta-Ec-Frag-Index': stub['frag'], + } + resp = StubResponse(200, body, headers) + else: + resp = StubResponse(404) + return resp + + return get_response + + def test_GET_with_frags_swapped_around(self): + segment_size = self.policy.ec_segment_size + test_data = ('test' * segment_size)[:-657] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + + _part, primary_nodes = self.obj_ring.get_nodes('a', 'c', 'o') + + node_key = lambda n: (n['ip'], n['port']) + response_map = { + node_key(n): StubResponse(200, ec_archive_bodies[i], { + 'X-Object-Sysmeta-Ec-Content-Length': len(test_data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Frag-Index': i, + }) for i, n in enumerate(primary_nodes) + } + + # swap a parity response into a data node + data_node = random.choice(primary_nodes[:self.policy.ec_ndata]) + parity_node = random.choice(primary_nodes[self.policy.ec_ndata:]) + (response_map[node_key(data_node)], + response_map[node_key(parity_node)]) = \ + (response_map[node_key(parity_node)], + response_map[node_key(data_node)]) + + def get_response(req): + req_key = (req['ip'], req['port']) + return response_map.pop(req_key) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(len(log), self.policy.ec_ndata) + self.assertEqual(len(response_map), + len(primary_nodes) - self.policy.ec_ndata) + + def test_GET_with_single_missed_overwrite_does_not_need_handoff(self): + obj1 = self._make_ec_object_stub() + obj2 = self._make_ec_object_stub() + + node_frags = [ + {'obj': obj2, 'frag': 0}, + {'obj': obj2, 'frag': 1}, + {'obj': obj1, 'frag': 2}, # missed over write + {'obj': obj2, 'frag': 3}, + {'obj': obj2, 'frag': 4}, + {'obj': obj2, 'frag': 5}, + {'obj': obj2, 'frag': 6}, + {'obj': obj2, 'frag': 7}, + {'obj': obj2, 'frag': 8}, + {'obj': obj2, 'frag': 9}, + {'obj': obj2, 'frag': 10}, # parity + {'obj': obj2, 'frag': 11}, # parity + {'obj': obj2, 'frag': 12}, # parity + {'obj': obj2, 'frag': 13}, # parity + # {'obj': obj2, 'frag': 2}, # handoff (not used in this test) + ] + + fake_response = self._fake_ec_node_response(node_frags) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj2['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj2['etag']) + + collected_responses = defaultdict(set) + for conn in log: + etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] + index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index'] + collected_responses[etag].add(index) + + # because the primary nodes are shuffled, it's possible the proxy + # didn't even notice the missed overwrite frag - but it might have + self.assertLessEqual(len(log), self.policy.ec_ndata + 1) + self.assertLessEqual(len(collected_responses), 2) + + # ... regardless we should never need to fetch more than ec_ndata + # frags for any given etag + for etag, frags in collected_responses.items(): + self.assertTrue(len(frags) <= self.policy.ec_ndata, + 'collected %s frags for etag %s' % ( + len(frags), etag)) + + def test_GET_with_many_missed_overwrite_will_need_handoff(self): + obj1 = self._make_ec_object_stub() + obj2 = self._make_ec_object_stub() + + node_frags = [ + {'obj': obj2, 'frag': 0}, + {'obj': obj2, 'frag': 1}, + {'obj': obj1, 'frag': 2}, # missed + {'obj': obj2, 'frag': 3}, + {'obj': obj2, 'frag': 4}, + {'obj': obj2, 'frag': 5}, + {'obj': obj1, 'frag': 6}, # missed + {'obj': obj2, 'frag': 7}, + {'obj': obj2, 'frag': 8}, + {'obj': obj1, 'frag': 9}, # missed + {'obj': obj1, 'frag': 10}, # missed + {'obj': obj1, 'frag': 11}, # missed + {'obj': obj2, 'frag': 12}, + {'obj': obj2, 'frag': 13}, + {'obj': obj2, 'frag': 6}, # handoff + ] + + fake_response = self._fake_ec_node_response(node_frags) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj2['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj2['etag']) + + collected_responses = defaultdict(set) + for conn in log: + etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] + index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index'] + collected_responses[etag].add(index) + + # there's not enough of the obj2 etag on the primaries, we would + # have collected responses for both etags, and would have made + # one more request to the handoff node + self.assertEqual(len(log), self.replicas() + 1) + self.assertEqual(len(collected_responses), 2) + + # ... regardless we should never need to fetch more than ec_ndata + # frags for any given etag + for etag, frags in collected_responses.items(): + self.assertTrue(len(frags) <= self.policy.ec_ndata, + 'collected %s frags for etag %s' % ( + len(frags), etag)) + + def test_GET_with_missing_and_mixed_frags_will_dig_deep_but_succeed(self): + obj1 = self._make_ec_object_stub() + obj2 = self._make_ec_object_stub() + + node_frags = [ + {'obj': obj1, 'frag': 0}, + {'obj': obj2, 'frag': 0}, + {}, + {'obj': obj1, 'frag': 1}, + {'obj': obj2, 'frag': 1}, + {}, + {'obj': obj1, 'frag': 2}, + {'obj': obj2, 'frag': 2}, + {}, + {'obj': obj1, 'frag': 3}, + {'obj': obj2, 'frag': 3}, + {}, + {'obj': obj1, 'frag': 4}, + {'obj': obj2, 'frag': 4}, + {}, + {'obj': obj1, 'frag': 5}, + {'obj': obj2, 'frag': 5}, + {}, + {'obj': obj1, 'frag': 6}, + {'obj': obj2, 'frag': 6}, + {}, + {'obj': obj1, 'frag': 7}, + {'obj': obj2, 'frag': 7}, + {}, + {'obj': obj1, 'frag': 8}, + {'obj': obj2, 'frag': 8}, + {}, + {'obj': obj2, 'frag': 9}, + ] + + fake_response = self._fake_ec_node_response(node_frags) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['etag'], obj2['etag']) + self.assertEqual(md5(resp.body).hexdigest(), obj2['etag']) + + collected_responses = defaultdict(set) + for conn in log: + etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] + index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index'] + collected_responses[etag].add(index) + + # we go exactly as long as we have to, finding two different + # etags and some 404's (i.e. collected_responses[None]) + self.assertEqual(len(log), len(node_frags)) + self.assertEqual(len(collected_responses), 3) + + # ... regardless we should never need to fetch more than ec_ndata + # frags for any given etag + for etag, frags in collected_responses.items(): + self.assertTrue(len(frags) <= self.policy.ec_ndata, + 'collected %s frags for etag %s' % ( + len(frags), etag)) + + def test_GET_with_missing_and_mixed_frags_will_dig_deep_but_stop(self): + obj1 = self._make_ec_object_stub() + obj2 = self._make_ec_object_stub() + + node_frags = [ + {'obj': obj1, 'frag': 0}, + {'obj': obj2, 'frag': 0}, + {}, + {'obj': obj1, 'frag': 1}, + {'obj': obj2, 'frag': 1}, + {}, + {'obj': obj1, 'frag': 2}, + {'obj': obj2, 'frag': 2}, + {}, + {'obj': obj1, 'frag': 3}, + {'obj': obj2, 'frag': 3}, + {}, + {'obj': obj1, 'frag': 4}, + {'obj': obj2, 'frag': 4}, + {}, + {'obj': obj1, 'frag': 5}, + {'obj': obj2, 'frag': 5}, + {}, + {'obj': obj1, 'frag': 6}, + {'obj': obj2, 'frag': 6}, + {}, + {'obj': obj1, 'frag': 7}, + {'obj': obj2, 'frag': 7}, + {}, + {'obj': obj1, 'frag': 8}, + {'obj': obj2, 'frag': 8}, + {}, + {}, + ] + + fake_response = self._fake_ec_node_response(node_frags) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 404) + + collected_responses = defaultdict(set) + for conn in log: + etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] + index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index'] + collected_responses[etag].add(index) + + # default node_iter will exhaust at 2 * replicas + self.assertEqual(len(log), 2 * self.replicas()) + self.assertEqual(len(collected_responses), 3) + + # ... regardless we should never need to fetch more than ec_ndata + # frags for any given etag + for etag, frags in collected_responses.items(): + self.assertTrue(len(frags) <= self.policy.ec_ndata, + 'collected %s frags for etag %s' % ( + len(frags), etag)) + + def test_GET_mixed_success_with_range(self): + fragment_size = self.policy.fragment_size + + ec_stub = self._make_ec_object_stub() + frag_archives = ec_stub['frags'] + frag_archive_size = len(ec_stub['frags'][0]) + + headers = { + 'Content-Type': 'text/plain', + 'Content-Length': fragment_size, + 'Content-Range': 'bytes 0-%s/%s' % (fragment_size - 1, + frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']), + 'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'], + } + responses = [ + StubResponse(206, frag_archives[0][:fragment_size], headers), + StubResponse(206, frag_archives[1][:fragment_size], headers), + StubResponse(206, frag_archives[2][:fragment_size], headers), + StubResponse(206, frag_archives[3][:fragment_size], headers), + StubResponse(206, frag_archives[4][:fragment_size], headers), + # data nodes with old frag + StubResponse(416), + StubResponse(416), + StubResponse(206, frag_archives[7][:fragment_size], headers), + StubResponse(206, frag_archives[8][:fragment_size], headers), + StubResponse(206, frag_archives[9][:fragment_size], headers), + # hopefully we ask for two more + StubResponse(206, frag_archives[10][:fragment_size], headers), + StubResponse(206, frag_archives[11][:fragment_size], headers), + ] + + def get_response(req): + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={'Range': 'bytes=0-3'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 206) + self.assertEqual(resp.body, 'test') + self.assertEqual(len(log), self.policy.ec_ndata + 2) + + def test_GET_with_range_unsatisfiable_mixed_success(self): + responses = [ + StubResponse(416), + StubResponse(416), + StubResponse(416), + StubResponse(416), + StubResponse(416), + StubResponse(416), + StubResponse(416), + # sneak in bogus extra responses + StubResponse(404), + StubResponse(206), + # and then just "enough" more 416's + StubResponse(416), + StubResponse(416), + StubResponse(416), + ] + + def get_response(req): + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=%s-' % 100000000000000}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 416) + # ec_ndata responses that must agree, plus the bogus extras + self.assertEqual(len(log), self.policy.ec_ndata + 2) + + def test_GET_mixed_ranged_responses_success(self): + segment_size = self.policy.ec_segment_size + fragment_size = self.policy.fragment_size + new_data = ('test' * segment_size)[:-492] + new_etag = md5(new_data).hexdigest() + new_archives = self._make_ec_archive_bodies(new_data) + old_data = ('junk' * segment_size)[:-492] + old_etag = md5(old_data).hexdigest() + old_archives = self._make_ec_archive_bodies(old_data) + frag_archive_size = len(new_archives[0]) + + new_headers = { + 'Content-Type': 'text/plain', + 'Content-Length': fragment_size, + 'Content-Range': 'bytes 0-%s/%s' % (fragment_size - 1, + frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(new_data), + 'X-Object-Sysmeta-Ec-Etag': new_etag, + } + old_headers = { + 'Content-Type': 'text/plain', + 'Content-Length': fragment_size, + 'Content-Range': 'bytes 0-%s/%s' % (fragment_size - 1, + frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(old_data), + 'X-Object-Sysmeta-Ec-Etag': old_etag, + } + # 7 primaries with stale frags, 3 handoffs failed to get new frags + responses = [ + StubResponse(206, old_archives[0][:fragment_size], old_headers), + StubResponse(206, new_archives[1][:fragment_size], new_headers), + StubResponse(206, old_archives[2][:fragment_size], old_headers), + StubResponse(206, new_archives[3][:fragment_size], new_headers), + StubResponse(206, old_archives[4][:fragment_size], old_headers), + StubResponse(206, new_archives[5][:fragment_size], new_headers), + StubResponse(206, old_archives[6][:fragment_size], old_headers), + StubResponse(206, new_archives[7][:fragment_size], new_headers), + StubResponse(206, old_archives[8][:fragment_size], old_headers), + StubResponse(206, new_archives[9][:fragment_size], new_headers), + StubResponse(206, old_archives[10][:fragment_size], old_headers), + StubResponse(206, new_archives[11][:fragment_size], new_headers), + StubResponse(206, old_archives[12][:fragment_size], old_headers), + StubResponse(206, new_archives[13][:fragment_size], new_headers), + StubResponse(206, new_archives[0][:fragment_size], new_headers), + StubResponse(404), + StubResponse(404), + StubResponse(206, new_archives[6][:fragment_size], new_headers), + StubResponse(404), + StubResponse(206, new_archives[10][:fragment_size], new_headers), + StubResponse(206, new_archives[12][:fragment_size], new_headers), + ] + + def get_response(req): + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.body, new_data[:segment_size]) + self.assertEqual(len(log), self.policy.ec_ndata + 10) + def test_GET_mismatched_fragment_archives(self): segment_size = self.policy.ec_segment_size test_data1 = ('test' * segment_size)[:-333]