Fix missing container update
At PUT object request, proxy server makes backend headers (e.g. X-Container-Partition) which help object-servers to determine the container-server they should update. In addition, the backend headers are created as many as the number of container replicas. (i.e. 3 replica in container ring, 3 backend headers will be created) On EC case, Swift fans out fragment archives to backend object-servers. Basically the number of fragment archives will be more than the container replica number and proxy-server assumes a request as success when quorum number of object-server succeeded to store. That would cause to make an orphaned object which is stored but not container updated. For example, assuming k=10, m=4, container replica=3 case: Assuming, proxy-server attempts to make 14 backend streams but unfortunately first 3 nodes returns 507 (disk failure) and then the Swift doesn't have any other disks. In the case, proxy keeps 11 backend streams to store and current Swift assumes it as sufficient because it is more than or equals quorum (right now k+1 is sufficient i.e. 11 backend streams are enough to store) However, in the case, the 11 streams doesn't have the container update header so that the request will succeed but container will be never updated. This patch allows to extract container updates up to object quorum_size + 1 to more nodes to ensure the updates. This approach sacrifices the container update cost a bit because duplicated updates will be there but quorum sizes + 1 seems reasonable (even if it's reaplicated case) to pay to ensure that instead of whole objects incude the update headers. Now Swift will work like as follows: For example: k=10, m=4, qurum_size=11 (k+1), 3 replica for container. CU: container update CA: commit ack That result in like as CU CU CU CU CU CU CU CU CU CU CU CU [507, 507, 507, 201, 201, 201, 201, 201, 201, 201, 201, 201, 201, 201] CA CA CA CA CA In this case, at least 3 container updates are saved. For another example: 7 replicated objects, qurum_size=4 (7//2+1), 3 replica for container. CU: container update CA: commit ack (201s for successful PUT on replicated) CU CU CU CU CU [507, 507, 507, 201, 201, 201, 201] CA CA CA CA In this replicated case, at least 2 container updates are saved. Cleaned up some unit tests so that modifying policies doesn't leak between tests. Co-Authored-By: John Dickinson <me@not.mn> Co-Authored-By: Sam Merritt <sam@swiftstack.com> Closes-Bug: #1460920 Change-Id: I04132858f44b42ee7ecf3b7994cb22a19d001d70
This commit is contained in:
parent
25d5e686a1
commit
3f943cfcf2
@ -276,19 +276,33 @@ class BaseObjectController(Controller):
|
||||
container_partition, containers,
|
||||
delete_at_container=None, delete_at_partition=None,
|
||||
delete_at_nodes=None):
|
||||
policy_index = req.headers['X-Backend-Storage-Policy-Index']
|
||||
policy = POLICIES.get_by_index(policy_index)
|
||||
headers = [self.generate_request_headers(req, additional=req.headers)
|
||||
for _junk in range(n_outgoing)]
|
||||
|
||||
def set_container_update(index, container):
|
||||
headers[index]['X-Container-Partition'] = container_partition
|
||||
headers[index]['X-Container-Host'] = csv_append(
|
||||
headers[index].get('X-Container-Host'),
|
||||
'%(ip)s:%(port)s' % container)
|
||||
headers[index]['X-Container-Device'] = csv_append(
|
||||
headers[index].get('X-Container-Device'),
|
||||
container['device'])
|
||||
|
||||
for i, container in enumerate(containers):
|
||||
i = i % len(headers)
|
||||
set_container_update(i, container)
|
||||
|
||||
headers[i]['X-Container-Partition'] = container_partition
|
||||
headers[i]['X-Container-Host'] = csv_append(
|
||||
headers[i].get('X-Container-Host'),
|
||||
'%(ip)s:%(port)s' % container)
|
||||
headers[i]['X-Container-Device'] = csv_append(
|
||||
headers[i].get('X-Container-Device'),
|
||||
container['device'])
|
||||
# if # of container_updates is not enough against # of replicas
|
||||
# (or fragments). Fill them like as pigeon hole problem.
|
||||
# TODO?: apply these to X-Delete-At-Container?
|
||||
n_updates_needed = min(policy.quorum + 1, n_outgoing)
|
||||
container_iter = itertools.cycle(containers)
|
||||
existing_updates = len(containers)
|
||||
while existing_updates < n_updates_needed:
|
||||
set_container_update(existing_updates, next(container_iter))
|
||||
existing_updates += 1
|
||||
|
||||
for i, node in enumerate(delete_at_nodes or []):
|
||||
i = i % len(headers)
|
||||
|
@ -229,9 +229,7 @@ class FakeRing(Ring):
|
||||
return [dict(node, index=i) for i, node in enumerate(list(self._devs))]
|
||||
|
||||
def get_more_nodes(self, part):
|
||||
# replicas^2 is the true cap
|
||||
for x in range(self.replicas, min(self.replicas + self.max_more_nodes,
|
||||
self.replicas * self.replicas)):
|
||||
for x in range(self.replicas, (self.replicas + self.max_more_nodes)):
|
||||
yield {'ip': '10.0.0.%s' % x,
|
||||
'replication_ip': '10.0.0.%s' % x,
|
||||
'port': self._base_port + x,
|
||||
|
@ -34,7 +34,7 @@ from swift.common import utils, swob
|
||||
from swift.proxy import server as proxy_server
|
||||
from swift.proxy.controllers import obj
|
||||
from swift.proxy.controllers.base import get_info as _real_get_info
|
||||
from swift.common.storage_policy import POLICIES, ECDriverError
|
||||
from swift.common.storage_policy import POLICIES, ECDriverError, StoragePolicy
|
||||
|
||||
from test.unit import FakeRing, FakeMemcache, fake_http_connect, \
|
||||
debug_logger, patch_policies, SlowBody, FakeStatus
|
||||
@ -168,16 +168,15 @@ class BaseObjectControllerMixin(object):
|
||||
# make our fake ring have plenty of nodes, and not get limited
|
||||
# artificially by the proxy max request node count
|
||||
object_ring.max_more_nodes = 100000
|
||||
self.app.request_node_count = lambda r: 100000
|
||||
# nothing magic about * 2 + 3, just a way to make it bigger
|
||||
self.app.request_node_count = lambda r: r * 2 + 3
|
||||
|
||||
all_nodes = object_ring.get_part_nodes(1)
|
||||
all_nodes.extend(object_ring.get_more_nodes(1))
|
||||
|
||||
# i guess fake_ring wants the get_more_nodes iter to more safely be
|
||||
# converted to a list with a smallish sort of limit which *can* be
|
||||
# lower than max_more_nodes
|
||||
fake_rings_real_max_more_nodes_value = object_ring.replicas ** 2
|
||||
self.assertEqual(len(all_nodes), fake_rings_real_max_more_nodes_value)
|
||||
# limit to the number we're going to look at in this request
|
||||
nodes_requested = self.app.request_node_count(object_ring.replicas)
|
||||
all_nodes = all_nodes[:nodes_requested]
|
||||
|
||||
# make sure we have enough local nodes (sanity)
|
||||
all_local_nodes = [n for n in all_nodes if
|
||||
@ -240,6 +239,10 @@ class BaseObjectControllerMixin(object):
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
|
||||
def test_DELETE_missing_one(self):
|
||||
# Obviously this test doesn't work if we're testing 1 replica.
|
||||
# In that case, we don't have any failovers to check.
|
||||
if self.replicas() == 1:
|
||||
return
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
|
||||
codes = [404] + [204] * (self.replicas() - 1)
|
||||
random.shuffle(codes)
|
||||
@ -248,6 +251,10 @@ class BaseObjectControllerMixin(object):
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
|
||||
def test_DELETE_not_found(self):
|
||||
# Obviously this test doesn't work if we're testing 1 replica.
|
||||
# In that case, we don't have any failovers to check.
|
||||
if self.replicas() == 1:
|
||||
return
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
|
||||
codes = [404] * (self.replicas() - 1) + [204]
|
||||
with set_http_connect(*codes):
|
||||
@ -387,7 +394,7 @@ class BaseObjectControllerMixin(object):
|
||||
def test_HEAD_x_newest(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD',
|
||||
headers={'X-Newest': 'true'})
|
||||
with set_http_connect(200, 200, 200):
|
||||
with set_http_connect(*([200] * self.replicas())):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
@ -395,14 +402,14 @@ class BaseObjectControllerMixin(object):
|
||||
req = swob.Request.blank('/v1/a/c/o', method='HEAD',
|
||||
headers={'X-Newest': 'true'})
|
||||
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
|
||||
timestamps = [next(ts) for i in range(3)]
|
||||
timestamps = [next(ts) for i in range(self.replicas())]
|
||||
newest_timestamp = timestamps[-1]
|
||||
random.shuffle(timestamps)
|
||||
backend_response_headers = [{
|
||||
'X-Backend-Timestamp': t.internal,
|
||||
'X-Timestamp': t.normal
|
||||
} for t in timestamps]
|
||||
with set_http_connect(200, 200, 200,
|
||||
with set_http_connect(*([200] * self.replicas()),
|
||||
headers=backend_response_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
@ -413,14 +420,14 @@ class BaseObjectControllerMixin(object):
|
||||
headers={'X-Newest': 'true'})
|
||||
ts = (utils.Timestamp(time.time(), offset=offset)
|
||||
for offset in itertools.count())
|
||||
timestamps = [next(ts) for i in range(3)]
|
||||
timestamps = [next(ts) for i in range(self.replicas())]
|
||||
newest_timestamp = timestamps[-1]
|
||||
random.shuffle(timestamps)
|
||||
backend_response_headers = [{
|
||||
'X-Backend-Timestamp': t.internal,
|
||||
'X-Timestamp': t.normal
|
||||
} for t in timestamps]
|
||||
with set_http_connect(200, 200, 200,
|
||||
with set_http_connect(*([200] * self.replicas()),
|
||||
headers=backend_response_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
@ -478,6 +485,40 @@ class BaseObjectControllerMixin(object):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 411)
|
||||
|
||||
def test_container_update_backend_requests(self):
|
||||
for policy in POLICIES:
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT',
|
||||
headers={'Content-Length': '0',
|
||||
'X-Backend-Storage-Policy-Index': int(policy)})
|
||||
controller = self.controller_cls(self.app, 'a', 'c', 'o')
|
||||
|
||||
# This is the number of container updates we're doing, simulating
|
||||
# 1 to 15 container replicas.
|
||||
for num_containers in range(1, 16):
|
||||
containers = [{'ip': '1.0.0.%s' % i,
|
||||
'port': '60%s' % str(i).zfill(2),
|
||||
'device': 'sdb'} for i in range(num_containers)]
|
||||
|
||||
backend_headers = controller._backend_requests(
|
||||
req, self.replicas(policy), 1, containers)
|
||||
|
||||
# how many of the backend headers have a container update
|
||||
container_updates = len(
|
||||
[headers for headers in backend_headers
|
||||
if 'X-Container-Partition' in headers])
|
||||
|
||||
if num_containers <= self.quorum(policy):
|
||||
# filling case
|
||||
expected = min(self.quorum(policy) + 1,
|
||||
self.replicas(policy))
|
||||
else:
|
||||
# container updates >= object replicas
|
||||
expected = min(num_containers,
|
||||
self.replicas(policy))
|
||||
|
||||
self.assertEqual(container_updates, expected)
|
||||
|
||||
# end of BaseObjectControllerMixin
|
||||
|
||||
|
||||
@ -889,6 +930,18 @@ class TestReplicatedObjController(BaseObjectControllerMixin,
|
||||
self.assertEqual(req.environ.get('swift.log_info'), None)
|
||||
|
||||
|
||||
@patch_policies(
|
||||
[StoragePolicy(0, '1-replica', True),
|
||||
StoragePolicy(1, '5-replica', False),
|
||||
StoragePolicy(2, '8-replica', False),
|
||||
StoragePolicy(3, '15-replica', False)],
|
||||
fake_ring_args=[
|
||||
{'replicas': 1}, {'replicas': 5}, {'replicas': 8}, {'replicas': 15}])
|
||||
class TestReplicatedObjControllerVariousReplicas(BaseObjectControllerMixin,
|
||||
unittest.TestCase):
|
||||
controller_cls = obj.ReplicatedObjectController
|
||||
|
||||
|
||||
@patch_policies(legacy_only=True)
|
||||
class TestObjControllerLegacyCache(TestReplicatedObjController):
|
||||
"""
|
||||
|
@ -3650,7 +3650,7 @@ class TestObjectController(unittest.TestCase):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 5)
|
||||
|
||||
object_ring.max_more_nodes = 20
|
||||
object_ring.max_more_nodes = 6
|
||||
self.app.request_node_count = lambda r: 20
|
||||
partition, nodes = object_ring.get_nodes('account',
|
||||
'container',
|
||||
@ -5829,15 +5829,15 @@ class TestObjectController(unittest.TestCase):
|
||||
|
||||
self.assertEqual(
|
||||
seen_headers, [
|
||||
{'X-Container-Host': '10.0.0.0:1000',
|
||||
'X-Container-Partition': '0',
|
||||
'X-Container-Device': 'sda'},
|
||||
{'X-Container-Host': '10.0.0.0:1000',
|
||||
'X-Container-Partition': '0',
|
||||
'X-Container-Device': 'sda'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': '0',
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': None,
|
||||
'X-Container-Partition': None,
|
||||
'X-Container-Device': None}])
|
||||
'X-Container-Device': 'sdb'}])
|
||||
|
||||
def test_PUT_x_container_headers_with_more_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(4)
|
||||
|
Loading…
x
Reference in New Issue
Block a user