EC GET path: require fragments to be of same set

And if they are not, exhaust the node iter to go get more.  The
problem without this implementation is a simple overwrite where
a GET follows before the handoff has put the newer obj back on
the 'alive again' node such that the proxy gets n-1 fragments
of the newest set and 1 of the older.

This patch bucketizes the fragments by etag and if it doesn't
have enough continues to exhaust the node iterator until it
has a large enough matching set.

Change-Id: Ib710a133ce1be278365067fd0d6610d80f1f7372
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Co-Authored-By: Alistair Coles <alistair.coles@hp.com>
Closes-Bug: 1457691
This commit is contained in:
paul luse 2015-08-12 13:32:50 -07:00 committed by Clay Gerrard
parent bb5e38569e
commit 893f30c61d
9 changed files with 828 additions and 116 deletions

View File

@ -2268,6 +2268,7 @@ class GreenAsyncPile(object):
size = size_or_pool
self._responses = eventlet.queue.LightQueue(size)
self._inflight = 0
self._pending = 0
def _run_func(self, func, args, kwargs):
try:
@ -2279,6 +2280,7 @@ class GreenAsyncPile(object):
"""
Spawn a job in a green thread on the pile.
"""
self._pending += 1
self._inflight += 1
self._pool.spawn(self._run_func, func, args, kwargs)
@ -2303,12 +2305,13 @@ class GreenAsyncPile(object):
def next(self):
try:
return self._responses.get_nowait()
rv = self._responses.get_nowait()
except Empty:
if self._inflight == 0:
raise StopIteration()
else:
return self._responses.get()
rv = self._responses.get()
self._pending -= 1
return rv
class ModifiedParseResult(ParseResult):

View File

@ -28,6 +28,7 @@ import os
import time
import functools
import inspect
import itertools
import operator
from sys import exc_info
from swift import gettext_ as _
@ -1125,6 +1126,99 @@ class GetOrHeadHandler(ResumingGetter):
return res
class NodeIter(object):
"""
Yields nodes for a ring partition, skipping over error
limited nodes and stopping at the configurable number of nodes. If a
node yielded subsequently gets error limited, an extra node will be
yielded to take its place.
Note that if you're going to iterate over this concurrently from
multiple greenthreads, you'll want to use a
swift.common.utils.GreenthreadSafeIterator to serialize access.
Otherwise, you may get ValueErrors from concurrent access. (You also
may not, depending on how logging is configured, the vagaries of
socket IO and eventlet, and the phase of the moon.)
:param app: a proxy app
:param ring: ring to get yield nodes from
:param partition: ring partition to yield nodes for
:param node_iter: optional iterable of nodes to try. Useful if you
want to filter or reorder the nodes.
"""
def __init__(self, app, ring, partition, node_iter=None):
self.app = app
self.ring = ring
self.partition = partition
part_nodes = ring.get_part_nodes(partition)
if node_iter is None:
node_iter = itertools.chain(
part_nodes, ring.get_more_nodes(partition))
num_primary_nodes = len(part_nodes)
self.nodes_left = self.app.request_node_count(num_primary_nodes)
self.expected_handoffs = self.nodes_left - num_primary_nodes
# Use of list() here forcibly yanks the first N nodes (the primary
# nodes) from node_iter, so the rest of its values are handoffs.
self.primary_nodes = self.app.sort_nodes(
list(itertools.islice(node_iter, num_primary_nodes)))
self.handoff_iter = node_iter
def __iter__(self):
self._node_iter = self._node_gen()
return self
def log_handoffs(self, handoffs):
"""
Log handoff requests if handoff logging is enabled and the
handoff was not expected.
We only log handoffs when we've pushed the handoff count further
than we would normally have expected under normal circumstances,
that is (request_node_count - num_primaries), when handoffs goes
higher than that it means one of the primaries must have been
skipped because of error limiting before we consumed all of our
nodes_left.
"""
if not self.app.log_handoffs:
return
extra_handoffs = handoffs - self.expected_handoffs
if extra_handoffs > 0:
self.app.logger.increment('handoff_count')
self.app.logger.warning(
'Handoff requested (%d)' % handoffs)
if (extra_handoffs == len(self.primary_nodes)):
# all the primaries were skipped, and handoffs didn't help
self.app.logger.increment('handoff_all_count')
def _node_gen(self):
for node in self.primary_nodes:
if not self.app.error_limited(node):
yield node
if not self.app.error_limited(node):
self.nodes_left -= 1
if self.nodes_left <= 0:
return
handoffs = 0
for node in self.handoff_iter:
if not self.app.error_limited(node):
handoffs += 1
self.log_handoffs(handoffs)
yield node
if not self.app.error_limited(node):
self.nodes_left -= 1
if self.nodes_left <= 0:
return
def next(self):
return next(self._node_iter)
def __next__(self):
return self.next()
class Controller(object):
"""Base WSGI controller class for the proxy"""
server_type = 'Base'

View File

@ -1951,44 +1951,43 @@ class ECObjectController(BaseObjectController):
orig_range = req.range
range_specs = self._convert_range(req, policy)
node_iter = GreenthreadSafeIterator(node_iter)
num_gets = policy.ec_ndata
with ContextPool(num_gets) as pool:
safe_iter = GreenthreadSafeIterator(node_iter)
with ContextPool(policy.ec_ndata) as pool:
pile = GreenAsyncPile(pool)
for _junk in range(num_gets):
for _junk in range(policy.ec_ndata):
pile.spawn(self._fragment_GET_request,
req, node_iter, partition,
req, safe_iter, partition,
policy)
gets = list(pile)
good_gets = []
bad_gets = []
for get, parts_iter in gets:
etag_buckets = collections.defaultdict(list)
best_etag = None
for get, parts_iter in pile:
if is_success(get.last_status):
good_gets.append((get, parts_iter))
etag = HeaderKeyDict(
get.last_headers)['X-Object-Sysmeta-Ec-Etag']
etag_buckets[etag].append((get, parts_iter))
if etag != best_etag and (
len(etag_buckets[etag]) >
len(etag_buckets[best_etag])):
best_etag = etag
else:
bad_gets.append((get, parts_iter))
matching_response_count = max(
len(etag_buckets[best_etag]), len(bad_gets))
if (policy.ec_ndata - matching_response_count >
pile._pending) and node_iter.nodes_left > 0:
# we need more matching responses to reach ec_ndata
# than we have pending gets, as long as we still have
# nodes in node_iter we can spawn another
pile.spawn(self._fragment_GET_request, req,
safe_iter, partition, policy)
req.range = orig_range
if len(good_gets) == num_gets:
# If these aren't all for the same object, then error out so
# at least the client doesn't get garbage. We can do a lot
# better here with more work, but this'll work for now.
found_obj_etags = set(
HeaderKeyDict(
getter.last_headers)['X-Object-Sysmeta-Ec-Etag']
for getter, _junk in good_gets)
if len(found_obj_etags) > 1:
self.app.logger.debug(
"Returning 503 for %s; found too many etags (%s)",
req.path,
", ".join(found_obj_etags))
return HTTPServiceUnavailable(request=req)
# we found enough pieces to decode the object, so now let's
# decode the object
if len(etag_buckets[best_etag]) >= policy.ec_ndata:
# headers can come from any of the getters
resp_headers = HeaderKeyDict(
good_gets[0][0].source_headers[-1])
etag_buckets[best_etag][0][0].source_headers[-1])
resp_headers.pop('Content-Range', None)
eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length')
obj_length = int(eccl) if eccl is not None else None
@ -1996,11 +1995,10 @@ class ECObjectController(BaseObjectController):
# This is only true if we didn't get a 206 response, but
# that's the only time this is used anyway.
fa_length = int(resp_headers['Content-Length'])
app_iter = ECAppIter(
req.swift_entity_path,
policy,
[iterator for getter, iterator in good_gets],
[iterator for getter, iterator in etag_buckets[best_etag]],
range_specs, fa_length, obj_length,
self.app.logger)
resp = Response(

View File

@ -19,7 +19,6 @@ import socket
from swift import gettext_ as _
from random import shuffle
from time import time
import itertools
import functools
import sys
@ -36,7 +35,7 @@ from swift.common.utils import cache_from_env, get_logger, \
from swift.common.constraints import check_utf8, valid_api_version
from swift.proxy.controllers import AccountController, ContainerController, \
ObjectControllerRouter, InfoController
from swift.proxy.controllers.base import get_container_info
from swift.proxy.controllers.base import get_container_info, NodeIter
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
HTTPServerError, HTTPException, Request, HTTPServiceUnavailable
@ -507,60 +506,7 @@ class Application(object):
'port': node['port'], 'device': node['device']})
def iter_nodes(self, ring, partition, node_iter=None):
"""
Yields nodes for a ring partition, skipping over error
limited nodes and stopping at the configurable number of nodes. If a
node yielded subsequently gets error limited, an extra node will be
yielded to take its place.
Note that if you're going to iterate over this concurrently from
multiple greenthreads, you'll want to use a
swift.common.utils.GreenthreadSafeIterator to serialize access.
Otherwise, you may get ValueErrors from concurrent access. (You also
may not, depending on how logging is configured, the vagaries of
socket IO and eventlet, and the phase of the moon.)
:param ring: ring to get yield nodes from
:param partition: ring partition to yield nodes for
:param node_iter: optional iterable of nodes to try. Useful if you
want to filter or reorder the nodes.
"""
part_nodes = ring.get_part_nodes(partition)
if node_iter is None:
node_iter = itertools.chain(part_nodes,
ring.get_more_nodes(partition))
num_primary_nodes = len(part_nodes)
# Use of list() here forcibly yanks the first N nodes (the primary
# nodes) from node_iter, so the rest of its values are handoffs.
primary_nodes = self.sort_nodes(
list(itertools.islice(node_iter, num_primary_nodes)))
handoff_nodes = node_iter
nodes_left = self.request_node_count(len(primary_nodes))
log_handoffs_threshold = nodes_left - len(primary_nodes)
for node in primary_nodes:
if not self.error_limited(node):
yield node
if not self.error_limited(node):
nodes_left -= 1
if nodes_left <= 0:
return
handoffs = 0
for node in handoff_nodes:
if not self.error_limited(node):
handoffs += 1
if self.log_handoffs and handoffs > log_handoffs_threshold:
self.logger.increment('handoff_count')
self.logger.warning(
'Handoff requested (%d)' % handoffs)
if handoffs - log_handoffs_threshold == len(primary_nodes):
self.logger.increment('handoff_all_count')
yield node
if not self.error_limited(node):
nodes_left -= 1
if nodes_left <= 0:
return
return NodeIter(self, ring, partition, node_iter=node_iter)
def exception_occurred(self, node, typ, additional_info,
**kwargs):

View File

@ -20,6 +20,8 @@ import sys
from time import sleep, time
from collections import defaultdict
import unittest
from hashlib import md5
from uuid import uuid4
from nose import SkipTest
from six.moves.http_client import HTTPConnection
@ -262,6 +264,49 @@ def resetswift():
Manager(['all']).stop()
class Body(object):
def __init__(self, total=3.5 * 2 ** 20):
self.length = total
self.hasher = md5()
self.read_amount = 0
self.chunk = uuid4().hex * 2 ** 10
self.buff = ''
@property
def etag(self):
return self.hasher.hexdigest()
def __len__(self):
return self.length
def read(self, amount):
if len(self.buff) < amount:
try:
self.buff += next(self)
except StopIteration:
pass
rv, self.buff = self.buff[:amount], self.buff[amount:]
return rv
def __iter__(self):
return self
def next(self):
if self.buff:
rv, self.buff = self.buff, ''
return rv
if self.read_amount >= self.length:
raise StopIteration()
rv = self.chunk[:int(self.length - self.read_amount)]
self.read_amount += len(rv)
self.hasher.update(rv)
return rv
def __next__(self):
return next(self)
class ProbeTest(unittest.TestCase):
"""
Don't instantiate this directly, use a child class instead.

View File

@ -16,13 +16,17 @@
from unittest import main
from uuid import uuid4
import random
from hashlib import md5
from collections import defaultdict
from swiftclient import client
from swift.common import direct_client
from swift.common.exceptions import ClientException
from swift.common.manager import Manager
from test.probe.common import kill_server, ReplProbeTest, start_server
from test.probe.common import (kill_server, start_server, ReplProbeTest,
ECProbeTest, Body)
class TestObjectHandoff(ReplProbeTest):
@ -211,5 +215,89 @@ class TestObjectHandoff(ReplProbeTest):
self.fail("Expected ClientException but didn't get it")
class TestECObjectHandoffOverwrite(ECProbeTest):
def get_object(self, container_name, object_name):
headers, body = client.get_object(self.url, self.token,
container_name,
object_name,
resp_chunk_size=64 * 2 ** 10)
resp_checksum = md5()
for chunk in body:
resp_checksum.update(chunk)
return resp_checksum.hexdigest()
def test_ec_handoff_overwrite(self):
container_name = 'container-%s' % uuid4()
object_name = 'object-%s' % uuid4()
# create EC container
headers = {'X-Storage-Policy': self.policy.name}
client.put_container(self.url, self.token, container_name,
headers=headers)
# PUT object
old_contents = Body()
client.put_object(self.url, self.token, container_name,
object_name, contents=old_contents)
# get our node lists
opart, onodes = self.object_ring.get_nodes(
self.account, container_name, object_name)
# shutdown one of the primary data nodes
failed_primary = random.choice(onodes)
failed_primary_device_path = self.device_dir('object', failed_primary)
self.kill_drive(failed_primary_device_path)
# overwrite our object with some new data
new_contents = Body()
client.put_object(self.url, self.token, container_name,
object_name, contents=new_contents)
self.assertNotEqual(new_contents.etag, old_contents.etag)
# restore failed primary device
self.revive_drive(failed_primary_device_path)
# sanity - failed node has old contents
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
headers = direct_client.direct_head_object(
failed_primary, opart, self.account, container_name,
object_name, headers=req_headers)
self.assertEqual(headers['X-Object-Sysmeta-EC-Etag'],
old_contents.etag)
# we have 1 primary with wrong old etag, and we should have 5 with
# new etag plus a handoff with the new etag, so killing 2 other
# primaries forces proxy to try to GET from all primaries plus handoff.
other_nodes = [n for n in onodes if n != failed_primary]
random.shuffle(other_nodes)
for node in other_nodes[:2]:
self.kill_drive(self.device_dir('object', node))
# sanity, after taking out two primaries we should be down to
# only four primaries, one of which has the old etag - but we
# also have a handoff with the new etag out there
found_frags = defaultdict(int)
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
for node in onodes + list(self.object_ring.get_more_nodes(opart)):
try:
headers = direct_client.direct_head_object(
node, opart, self.account, container_name,
object_name, headers=req_headers)
except Exception:
continue
found_frags[headers['X-Object-Sysmeta-EC-Etag']] += 1
self.assertEqual(found_frags, {
new_contents.etag: 4, # this should be enough to rebuild!
old_contents.etag: 1,
})
# clear node error limiting
Manager(['proxy']).restart()
resp_etag = self.get_object(container_name, object_name)
self.assertEqual(resp_etag, new_contents.etag)
if __name__ == '__main__':
main()

View File

@ -21,7 +21,7 @@ import random
import shutil
from collections import defaultdict
from test.probe.common import ECProbeTest
from test.probe.common import ECProbeTest, Body
from swift.common import direct_client
from swift.common.storage_policy import EC_POLICY
@ -31,32 +31,6 @@ from swift.obj import reconstructor
from swiftclient import client
class Body(object):
def __init__(self, total=3.5 * 2 ** 20):
self.total = total
self.hasher = md5()
self.size = 0
self.chunk = 'test' * 16 * 2 ** 10
@property
def etag(self):
return self.hasher.hexdigest()
def __iter__(self):
return self
def next(self):
if self.size > self.total:
raise StopIteration()
self.size += len(self.chunk)
self.hasher.update(self.chunk)
return self.chunk
def __next__(self):
return next(self)
class TestReconstructorRevert(ECProbeTest):
def setUp(self):

View File

@ -4530,6 +4530,22 @@ class TestGreenAsyncPile(unittest.TestCase):
self.assertEqual(pile.waitall(0.5), [0.1, 0.1])
self.assertEqual(completed[0], 2)
def test_pending(self):
pile = utils.GreenAsyncPile(3)
self.assertEqual(0, pile._pending)
for repeats in range(2):
# repeat to verify that pending will go again up after going down
for i in range(4):
pile.spawn(lambda: i)
self.assertEqual(4, pile._pending)
for i in range(3, -1, -1):
pile.next()
self.assertEqual(i, pile._pending)
# sanity check - the pile is empty
self.assertRaises(StopIteration, pile.next)
# pending remains 0
self.assertEqual(0, pile._pending)
class TestLRUCache(unittest.TestCase):

View File

@ -26,6 +26,7 @@ from hashlib import md5
import mock
from eventlet import Timeout
from six import BytesIO
from six.moves import range
import swift
@ -913,6 +914,76 @@ class TestObjControllerLegacyCache(TestReplicatedObjController):
self.assertEqual(resp.status_int, 503)
class StubResponse(object):
def __init__(self, status, body='', headers=None):
self.status = status
self.body = body
self.readable = BytesIO(body)
self.headers = swob.HeaderKeyDict(headers)
fake_reason = ('Fake', 'This response is a lie.')
self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0]
def getheader(self, header_name, default=None):
return self.headers.get(header_name, default)
def getheaders(self):
if 'Content-Length' not in self.headers:
self.headers['Content-Length'] = len(self.body)
return self.headers.items()
def read(self, amt=0):
return self.readable.read(amt)
@contextmanager
def capture_http_requests(get_response):
class FakeConn(object):
def __init__(self, req):
self.req = req
self.resp = None
def getresponse(self):
self.resp = get_response(self.req)
return self.resp
class ConnectionLog(object):
def __init__(self):
self.connections = []
def __len__(self):
return len(self.connections)
def __getitem__(self, i):
return self.connections[i]
def __iter__(self):
return iter(self.connections)
def __call__(self, ip, port, method, path, headers, qs, ssl):
req = {
'ip': ip,
'port': port,
'method': method,
'path': path,
'headers': headers,
'qs': qs,
'ssl': ssl,
}
conn = FakeConn(req)
self.connections.append(conn)
return conn
fake_conn = ConnectionLog()
with mock.patch('swift.common.bufferedhttp.http_connect_raw',
new=fake_conn):
yield fake_conn
@patch_policies(with_ec_default=True)
class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
container_info = {
@ -1344,6 +1415,483 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
for fragments in zip(*fragment_payloads)]
return ec_archive_bodies
def _make_ec_object_stub(self, test_body=None, policy=None):
policy = policy or self.policy
segment_size = policy.ec_segment_size
test_body = test_body or (
'test' * segment_size)[:-random.randint(0, 1000)]
etag = md5(test_body).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_body,
policy=policy)
return {
'body': test_body,
'etag': etag,
'frags': ec_archive_bodies,
}
def _fake_ec_node_response(self, node_frags):
"""
Given a list of entries for each node in ring order, where the
entries are a dict (or list of dicts) which describe all of the
fragment(s); create a function suitable for use with
capture_http_requests that will accept a req object and return a
response that will suitably fake the behavior of an object
server who had the given fragments on disk at the time.
"""
node_map = {}
all_nodes = []
def _build_node_map(req):
node_key = lambda n: (n['ip'], n['port'])
part = utils.split_path(req['path'], 5, 5, True)[1]
policy = POLICIES[int(
req['headers']['X-Backend-Storage-Policy-Index'])]
all_nodes.extend(policy.object_ring.get_part_nodes(part))
all_nodes.extend(policy.object_ring.get_more_nodes(part))
for i, node in enumerate(all_nodes):
node_map[node_key(node)] = i
# normalize node_frags to a list of fragments for each node even
# if there's only one fragment in the dataset provided.
for i, frags in enumerate(node_frags):
if isinstance(frags, dict):
node_frags[i] = [frags]
def get_response(req):
if not node_map:
_build_node_map(req)
try:
node_index = node_map[(req['ip'], req['port'])]
except KeyError:
raise Exception("Couldn't find node %s:%s in %r" % (
req['ip'], req['port'], all_nodes))
try:
frags = node_frags[node_index]
except KeyError:
raise Exception('Found node %r:%r at index %s - '
'but only got %s stub response nodes' % (
req['ip'], req['port'], node_index,
len(node_frags)))
try:
stub = random.choice(frags)
except IndexError:
stub = None
if stub:
body = stub['obj']['frags'][stub['frag']]
headers = {
'X-Object-Sysmeta-Ec-Content-Length': len(
stub['obj']['body']),
'X-Object-Sysmeta-Ec-Etag': stub['obj']['etag'],
'X-Object-Sysmeta-Ec-Frag-Index': stub['frag'],
}
resp = StubResponse(200, body, headers)
else:
resp = StubResponse(404)
return resp
return get_response
def test_GET_with_frags_swapped_around(self):
segment_size = self.policy.ec_segment_size
test_data = ('test' * segment_size)[:-657]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
_part, primary_nodes = self.obj_ring.get_nodes('a', 'c', 'o')
node_key = lambda n: (n['ip'], n['port'])
response_map = {
node_key(n): StubResponse(200, ec_archive_bodies[i], {
'X-Object-Sysmeta-Ec-Content-Length': len(test_data),
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Frag-Index': i,
}) for i, n in enumerate(primary_nodes)
}
# swap a parity response into a data node
data_node = random.choice(primary_nodes[:self.policy.ec_ndata])
parity_node = random.choice(primary_nodes[self.policy.ec_ndata:])
(response_map[node_key(data_node)],
response_map[node_key(parity_node)]) = \
(response_map[node_key(parity_node)],
response_map[node_key(data_node)])
def get_response(req):
req_key = (req['ip'], req['port'])
return response_map.pop(req_key)
req = swob.Request.blank('/v1/a/c/o')
with capture_http_requests(get_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(log), self.policy.ec_ndata)
self.assertEqual(len(response_map),
len(primary_nodes) - self.policy.ec_ndata)
def test_GET_with_single_missed_overwrite_does_not_need_handoff(self):
obj1 = self._make_ec_object_stub()
obj2 = self._make_ec_object_stub()
node_frags = [
{'obj': obj2, 'frag': 0},
{'obj': obj2, 'frag': 1},
{'obj': obj1, 'frag': 2}, # missed over write
{'obj': obj2, 'frag': 3},
{'obj': obj2, 'frag': 4},
{'obj': obj2, 'frag': 5},
{'obj': obj2, 'frag': 6},
{'obj': obj2, 'frag': 7},
{'obj': obj2, 'frag': 8},
{'obj': obj2, 'frag': 9},
{'obj': obj2, 'frag': 10}, # parity
{'obj': obj2, 'frag': 11}, # parity
{'obj': obj2, 'frag': 12}, # parity
{'obj': obj2, 'frag': 13}, # parity
# {'obj': obj2, 'frag': 2}, # handoff (not used in this test)
]
fake_response = self._fake_ec_node_response(node_frags)
req = swob.Request.blank('/v1/a/c/o')
with capture_http_requests(fake_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.headers['etag'], obj2['etag'])
self.assertEqual(md5(resp.body).hexdigest(), obj2['etag'])
collected_responses = defaultdict(set)
for conn in log:
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index']
collected_responses[etag].add(index)
# because the primary nodes are shuffled, it's possible the proxy
# didn't even notice the missed overwrite frag - but it might have
self.assertLessEqual(len(log), self.policy.ec_ndata + 1)
self.assertLessEqual(len(collected_responses), 2)
# ... regardless we should never need to fetch more than ec_ndata
# frags for any given etag
for etag, frags in collected_responses.items():
self.assertTrue(len(frags) <= self.policy.ec_ndata,
'collected %s frags for etag %s' % (
len(frags), etag))
def test_GET_with_many_missed_overwrite_will_need_handoff(self):
obj1 = self._make_ec_object_stub()
obj2 = self._make_ec_object_stub()
node_frags = [
{'obj': obj2, 'frag': 0},
{'obj': obj2, 'frag': 1},
{'obj': obj1, 'frag': 2}, # missed
{'obj': obj2, 'frag': 3},
{'obj': obj2, 'frag': 4},
{'obj': obj2, 'frag': 5},
{'obj': obj1, 'frag': 6}, # missed
{'obj': obj2, 'frag': 7},
{'obj': obj2, 'frag': 8},
{'obj': obj1, 'frag': 9}, # missed
{'obj': obj1, 'frag': 10}, # missed
{'obj': obj1, 'frag': 11}, # missed
{'obj': obj2, 'frag': 12},
{'obj': obj2, 'frag': 13},
{'obj': obj2, 'frag': 6}, # handoff
]
fake_response = self._fake_ec_node_response(node_frags)
req = swob.Request.blank('/v1/a/c/o')
with capture_http_requests(fake_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.headers['etag'], obj2['etag'])
self.assertEqual(md5(resp.body).hexdigest(), obj2['etag'])
collected_responses = defaultdict(set)
for conn in log:
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index']
collected_responses[etag].add(index)
# there's not enough of the obj2 etag on the primaries, we would
# have collected responses for both etags, and would have made
# one more request to the handoff node
self.assertEqual(len(log), self.replicas() + 1)
self.assertEqual(len(collected_responses), 2)
# ... regardless we should never need to fetch more than ec_ndata
# frags for any given etag
for etag, frags in collected_responses.items():
self.assertTrue(len(frags) <= self.policy.ec_ndata,
'collected %s frags for etag %s' % (
len(frags), etag))
def test_GET_with_missing_and_mixed_frags_will_dig_deep_but_succeed(self):
obj1 = self._make_ec_object_stub()
obj2 = self._make_ec_object_stub()
node_frags = [
{'obj': obj1, 'frag': 0},
{'obj': obj2, 'frag': 0},
{},
{'obj': obj1, 'frag': 1},
{'obj': obj2, 'frag': 1},
{},
{'obj': obj1, 'frag': 2},
{'obj': obj2, 'frag': 2},
{},
{'obj': obj1, 'frag': 3},
{'obj': obj2, 'frag': 3},
{},
{'obj': obj1, 'frag': 4},
{'obj': obj2, 'frag': 4},
{},
{'obj': obj1, 'frag': 5},
{'obj': obj2, 'frag': 5},
{},
{'obj': obj1, 'frag': 6},
{'obj': obj2, 'frag': 6},
{},
{'obj': obj1, 'frag': 7},
{'obj': obj2, 'frag': 7},
{},
{'obj': obj1, 'frag': 8},
{'obj': obj2, 'frag': 8},
{},
{'obj': obj2, 'frag': 9},
]
fake_response = self._fake_ec_node_response(node_frags)
req = swob.Request.blank('/v1/a/c/o')
with capture_http_requests(fake_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.headers['etag'], obj2['etag'])
self.assertEqual(md5(resp.body).hexdigest(), obj2['etag'])
collected_responses = defaultdict(set)
for conn in log:
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index']
collected_responses[etag].add(index)
# we go exactly as long as we have to, finding two different
# etags and some 404's (i.e. collected_responses[None])
self.assertEqual(len(log), len(node_frags))
self.assertEqual(len(collected_responses), 3)
# ... regardless we should never need to fetch more than ec_ndata
# frags for any given etag
for etag, frags in collected_responses.items():
self.assertTrue(len(frags) <= self.policy.ec_ndata,
'collected %s frags for etag %s' % (
len(frags), etag))
def test_GET_with_missing_and_mixed_frags_will_dig_deep_but_stop(self):
obj1 = self._make_ec_object_stub()
obj2 = self._make_ec_object_stub()
node_frags = [
{'obj': obj1, 'frag': 0},
{'obj': obj2, 'frag': 0},
{},
{'obj': obj1, 'frag': 1},
{'obj': obj2, 'frag': 1},
{},
{'obj': obj1, 'frag': 2},
{'obj': obj2, 'frag': 2},
{},
{'obj': obj1, 'frag': 3},
{'obj': obj2, 'frag': 3},
{},
{'obj': obj1, 'frag': 4},
{'obj': obj2, 'frag': 4},
{},
{'obj': obj1, 'frag': 5},
{'obj': obj2, 'frag': 5},
{},
{'obj': obj1, 'frag': 6},
{'obj': obj2, 'frag': 6},
{},
{'obj': obj1, 'frag': 7},
{'obj': obj2, 'frag': 7},
{},
{'obj': obj1, 'frag': 8},
{'obj': obj2, 'frag': 8},
{},
{},
]
fake_response = self._fake_ec_node_response(node_frags)
req = swob.Request.blank('/v1/a/c/o')
with capture_http_requests(fake_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 404)
collected_responses = defaultdict(set)
for conn in log:
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index']
collected_responses[etag].add(index)
# default node_iter will exhaust at 2 * replicas
self.assertEqual(len(log), 2 * self.replicas())
self.assertEqual(len(collected_responses), 3)
# ... regardless we should never need to fetch more than ec_ndata
# frags for any given etag
for etag, frags in collected_responses.items():
self.assertTrue(len(frags) <= self.policy.ec_ndata,
'collected %s frags for etag %s' % (
len(frags), etag))
def test_GET_mixed_success_with_range(self):
fragment_size = self.policy.fragment_size
ec_stub = self._make_ec_object_stub()
frag_archives = ec_stub['frags']
frag_archive_size = len(ec_stub['frags'][0])
headers = {
'Content-Type': 'text/plain',
'Content-Length': fragment_size,
'Content-Range': 'bytes 0-%s/%s' % (fragment_size - 1,
frag_archive_size),
'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']),
'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'],
}
responses = [
StubResponse(206, frag_archives[0][:fragment_size], headers),
StubResponse(206, frag_archives[1][:fragment_size], headers),
StubResponse(206, frag_archives[2][:fragment_size], headers),
StubResponse(206, frag_archives[3][:fragment_size], headers),
StubResponse(206, frag_archives[4][:fragment_size], headers),
# data nodes with old frag
StubResponse(416),
StubResponse(416),
StubResponse(206, frag_archives[7][:fragment_size], headers),
StubResponse(206, frag_archives[8][:fragment_size], headers),
StubResponse(206, frag_archives[9][:fragment_size], headers),
# hopefully we ask for two more
StubResponse(206, frag_archives[10][:fragment_size], headers),
StubResponse(206, frag_archives[11][:fragment_size], headers),
]
def get_response(req):
return responses.pop(0) if responses else StubResponse(404)
req = swob.Request.blank('/v1/a/c/o', headers={'Range': 'bytes=0-3'})
with capture_http_requests(get_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 206)
self.assertEqual(resp.body, 'test')
self.assertEqual(len(log), self.policy.ec_ndata + 2)
def test_GET_with_range_unsatisfiable_mixed_success(self):
responses = [
StubResponse(416),
StubResponse(416),
StubResponse(416),
StubResponse(416),
StubResponse(416),
StubResponse(416),
StubResponse(416),
# sneak in bogus extra responses
StubResponse(404),
StubResponse(206),
# and then just "enough" more 416's
StubResponse(416),
StubResponse(416),
StubResponse(416),
]
def get_response(req):
return responses.pop(0) if responses else StubResponse(404)
req = swob.Request.blank('/v1/a/c/o', headers={
'Range': 'bytes=%s-' % 100000000000000})
with capture_http_requests(get_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 416)
# ec_ndata responses that must agree, plus the bogus extras
self.assertEqual(len(log), self.policy.ec_ndata + 2)
def test_GET_mixed_ranged_responses_success(self):
segment_size = self.policy.ec_segment_size
fragment_size = self.policy.fragment_size
new_data = ('test' * segment_size)[:-492]
new_etag = md5(new_data).hexdigest()
new_archives = self._make_ec_archive_bodies(new_data)
old_data = ('junk' * segment_size)[:-492]
old_etag = md5(old_data).hexdigest()
old_archives = self._make_ec_archive_bodies(old_data)
frag_archive_size = len(new_archives[0])
new_headers = {
'Content-Type': 'text/plain',
'Content-Length': fragment_size,
'Content-Range': 'bytes 0-%s/%s' % (fragment_size - 1,
frag_archive_size),
'X-Object-Sysmeta-Ec-Content-Length': len(new_data),
'X-Object-Sysmeta-Ec-Etag': new_etag,
}
old_headers = {
'Content-Type': 'text/plain',
'Content-Length': fragment_size,
'Content-Range': 'bytes 0-%s/%s' % (fragment_size - 1,
frag_archive_size),
'X-Object-Sysmeta-Ec-Content-Length': len(old_data),
'X-Object-Sysmeta-Ec-Etag': old_etag,
}
# 7 primaries with stale frags, 3 handoffs failed to get new frags
responses = [
StubResponse(206, old_archives[0][:fragment_size], old_headers),
StubResponse(206, new_archives[1][:fragment_size], new_headers),
StubResponse(206, old_archives[2][:fragment_size], old_headers),
StubResponse(206, new_archives[3][:fragment_size], new_headers),
StubResponse(206, old_archives[4][:fragment_size], old_headers),
StubResponse(206, new_archives[5][:fragment_size], new_headers),
StubResponse(206, old_archives[6][:fragment_size], old_headers),
StubResponse(206, new_archives[7][:fragment_size], new_headers),
StubResponse(206, old_archives[8][:fragment_size], old_headers),
StubResponse(206, new_archives[9][:fragment_size], new_headers),
StubResponse(206, old_archives[10][:fragment_size], old_headers),
StubResponse(206, new_archives[11][:fragment_size], new_headers),
StubResponse(206, old_archives[12][:fragment_size], old_headers),
StubResponse(206, new_archives[13][:fragment_size], new_headers),
StubResponse(206, new_archives[0][:fragment_size], new_headers),
StubResponse(404),
StubResponse(404),
StubResponse(206, new_archives[6][:fragment_size], new_headers),
StubResponse(404),
StubResponse(206, new_archives[10][:fragment_size], new_headers),
StubResponse(206, new_archives[12][:fragment_size], new_headers),
]
def get_response(req):
return responses.pop(0) if responses else StubResponse(404)
req = swob.Request.blank('/v1/a/c/o')
with capture_http_requests(get_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, new_data[:segment_size])
self.assertEqual(len(log), self.policy.ec_ndata + 10)
def test_GET_mismatched_fragment_archives(self):
segment_size = self.policy.ec_segment_size
test_data1 = ('test' * segment_size)[:-333]