Don't send commits for quorum *BAD* requests on EC

In EC PUT request case, proxy-server may send commits to object-servers
it may make .durable file even though the request failed due to a lack
of quorum number.

For example:
- Considering the case that almost all object-servers fail by 422
  Unprocessable Entity
- Using ec scheme 4 + 2
- 5 (quorum size) object-server failed with 422, 1 object-servers
  succeeded as 201 created

How it works:
- Client creates a PUT request
- Proxy will open connections to backend object-servers
- Proxy will send whole encoded chunks to object-servers
- Proxy will send content-md5 as footers.
- Proxy will get responses [422, 422, 422, 422, 422, 201] (currently
  this list will be regarded as "we have quorum response")
- And then proxy will send commits to object-servers (the only
  object-server with 201 will create .durable file)
- Proxy will return 503 because the commits results in no response
  statuses from object-servers except the 201 node.

This patch fixes the quorum handling at ObjectController to check
that it has *successful* quorum responses before sending durable commits.

Closes-Bug: #1491748
Change-Id: Icc099993be76bcc687191f332db56d62856a500f
This commit is contained in:
Kota Tsuyuzaki 2015-09-03 00:40:41 -07:00
parent cb683d391c
commit 8f1c7409e7
3 changed files with 163 additions and 12 deletions

View File

@ -50,7 +50,8 @@ from swift.common.http import is_informational, is_success, is_redirection, \
HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \ HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED, HTTP_CONTINUE HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED, HTTP_CONTINUE
from swift.common.swob import Request, Response, HeaderKeyDict, Range, \ from swift.common.swob import Request, Response, HeaderKeyDict, Range, \
HTTPException, HTTPRequestedRangeNotSatisfiable, HTTPServiceUnavailable HTTPException, HTTPRequestedRangeNotSatisfiable, HTTPServiceUnavailable, \
status_map
from swift.common.request_helpers import strip_sys_meta_prefix, \ from swift.common.request_helpers import strip_sys_meta_prefix, \
strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta
from swift.common.storage_policy import POLICIES from swift.common.storage_policy import POLICIES
@ -1542,7 +1543,6 @@ class Controller(object):
[(i, s) for i, s in enumerate(statuses) [(i, s) for i, s in enumerate(statuses)
if hundred <= s < hundred + 100] if hundred <= s < hundred + 100]
if len(hstatuses) >= quorum_size: if len(hstatuses) >= quorum_size:
resp = Response(request=req)
try: try:
status_index, status = max( status_index, status = max(
((i, stat) for i, stat in hstatuses ((i, stat) for i, stat in hstatuses
@ -1551,6 +1551,7 @@ class Controller(object):
except ValueError: except ValueError:
# All statuses were indices to avoid # All statuses were indices to avoid
continue continue
resp = status_map[status](request=req)
resp.status = '%s %s' % (status, reasons[status_index]) resp.status = '%s %s' % (status, reasons[status_index])
resp.body = bodies[status_index] resp.body = bodies[status_index]
if headers: if headers:

View File

@ -55,10 +55,10 @@ from swift.common.exceptions import ChunkReadTimeout, \
InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \ InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \
PutterConnectError PutterConnectError
from swift.common.http import ( from swift.common.http import (
is_success, is_server_error, HTTP_CONTINUE, HTTP_CREATED, is_informational, is_success, is_client_error, is_server_error,
HTTP_MULTIPLE_CHOICES, HTTP_INTERNAL_SERVER_ERROR, HTTP_CONTINUE, HTTP_CREATED, HTTP_MULTIPLE_CHOICES,
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE, HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE,
HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, is_informational) HTTP_INSUFFICIENT_STORAGE, HTTP_PRECONDITION_FAILED, HTTP_CONFLICT)
from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY, from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
ECDriverError, PolicyError) ECDriverError, PolicyError)
from swift.proxy.controllers.base import Controller, delay_denial, \ from swift.proxy.controllers.base import Controller, delay_denial, \
@ -2206,6 +2206,23 @@ class ECObjectController(BaseObjectController):
_('Not enough object servers ack\'ed (got %d)'), _('Not enough object servers ack\'ed (got %d)'),
statuses.count(HTTP_CONTINUE)) statuses.count(HTTP_CONTINUE))
raise HTTPServiceUnavailable(request=req) raise HTTPServiceUnavailable(request=req)
elif not self._have_adequate_informational(
statuses, min_conns):
resp = self.best_response(req, statuses, reasons, bodies,
_('Object PUT'),
quorum_size=min_conns)
if is_client_error(resp.status_int):
# if 4xx occurred in this state it is absolutely
# a bad conversation between proxy-server and
# object-server (even if it's
# HTTP_UNPROCESSABLE_ENTITY) so we should regard this
# as HTTPServiceUnavailable.
raise HTTPServiceUnavailable(request=req)
else:
# Other errors should use raw best_response
raise resp
# quorum achieved, start 2nd phase - send commit # quorum achieved, start 2nd phase - send commit
# confirmation to participating object servers # confirmation to participating object servers
# so they write a .durable state file indicating # so they write a .durable state file indicating
@ -2232,20 +2249,36 @@ class ECObjectController(BaseObjectController):
self.app.logger.increment('client_disconnects') self.app.logger.increment('client_disconnects')
raise HTTPClientDisconnect(request=req) raise HTTPClientDisconnect(request=req)
def _have_adequate_successes(self, statuses, min_responses): def _have_adequate_responses(
self, statuses, min_responses, conditional_func):
""" """
Given a list of statuses from several requests, determine if a Given a list of statuses from several requests, determine if a
satisfactory number of nodes have responded with 2xx statuses to satisfactory number of nodes have responded with 1xx or 2xx statuses to
deem the transaction for a succssful response to the client. deem the transaction for a succssful response to the client.
:param statuses: list of statuses returned so far :param statuses: list of statuses returned so far
:param min_responses: minimal pass criterion for number of successes :param min_responses: minimal pass criterion for number of successes
:param conditional_func: a callable function to check http status code
:returns: True or False, depending on current number of successes :returns: True or False, depending on current number of successes
""" """
if sum(1 for s in statuses if is_success(s)) >= min_responses: if sum(1 for s in statuses if (conditional_func(s))) >= min_responses:
return True return True
return False return False
def _have_adequate_successes(self, statuses, min_responses):
"""
Partial method of _have_adequate_responses for 2xx
"""
return self._have_adequate_responses(
statuses, min_responses, is_success)
def _have_adequate_informational(self, statuses, min_responses):
"""
Partial method of _have_adequate_responses for 2xx
"""
return self._have_adequate_responses(
statuses, min_responses, is_informational)
def _await_response(self, conn, final_phase): def _await_response(self, conn, final_phase):
return conn.await_response( return conn.await_response(
self.app.node_timeout, not final_phase) self.app.node_timeout, not final_phase)
@ -2300,9 +2333,9 @@ class ECObjectController(BaseObjectController):
reasons.append(response.reason) reasons.append(response.reason)
if final_phase: if final_phase:
body = response.read() body = response.read()
bodies.append(body)
else: else:
body = '' body = ''
bodies.append(body)
if response.status == HTTP_INSUFFICIENT_STORAGE: if response.status == HTTP_INSUFFICIENT_STORAGE:
putter.failed = True putter.failed = True
self.app.error_limit(putter.node, self.app.error_limit(putter.node,
@ -2341,7 +2374,8 @@ class ECObjectController(BaseObjectController):
bodies.append('') bodies.append('')
else: else:
# intermediate response phase - set return value to true only # intermediate response phase - set return value to true only
# if there are enough 100-continue acknowledgements # if there are responses having same value of *any* status
# except 5xx
if self.have_quorum(statuses, num_nodes, quorum=min_responses): if self.have_quorum(statuses, num_nodes, quorum=min_responses):
quorum = True quorum = True

View File

@ -71,7 +71,7 @@ from swift.proxy.controllers.base import get_container_memcache_key, \
import swift.proxy.controllers import swift.proxy.controllers
import swift.proxy.controllers.obj import swift.proxy.controllers.obj
from swift.common.swob import Request, Response, HTTPUnauthorized, \ from swift.common.swob import Request, Response, HTTPUnauthorized, \
HTTPException, HeaderKeyDict HTTPException, HeaderKeyDict, HTTPBadRequest
from swift.common import storage_policy from swift.common import storage_policy
from swift.common.storage_policy import StoragePolicy, ECStoragePolicy, \ from swift.common.storage_policy import StoragePolicy, ECStoragePolicy, \
StoragePolicyCollection, POLICIES StoragePolicyCollection, POLICIES
@ -2013,6 +2013,122 @@ class TestObjectController(unittest.TestCase):
pass pass
self.assertEqual(found, 2) self.assertEqual(found, 2)
@unpatch_policies
def test_PUT_ec_fragment_quorum_archive_etag_mismatch(self):
ec_policy = POLICIES[3]
self.put_container("ec", "ec-con")
def busted_md5_constructor(initial_str=""):
hasher = md5(initial_str)
hasher.update('wrong')
return hasher
obj = 'uvarovite-esurience-cerated-symphysic'
prolis = _test_sockets[0]
prosrv = _test_servers[0]
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
call_count = [0]
def mock_committer(self):
call_count[0] += 1
commit_confirmation = \
'swift.proxy.controllers.obj.ECPutter.send_commit_confirmation'
with nested(
mock.patch('swift.obj.server.md5', busted_md5_constructor),
mock.patch(commit_confirmation, mock_committer)) as \
(_junk, commit_call):
fd = sock.makefile()
fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n'
'Host: localhost\r\n'
'Connection: close\r\n'
'Etag: %s\r\n'
'Content-Length: %d\r\n'
'X-Storage-Token: t\r\n'
'Content-Type: application/octet-stream\r\n'
'\r\n%s' % (md5(obj).hexdigest(), len(obj), obj))
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 503' # no quorum
self.assertEqual(headers[:len(exp)], exp)
# Don't send commit to object-server if quorum responses consist of 4xx
self.assertEqual(0, call_count[0])
# no fragment archives should have landed on disk
partition, nodes = prosrv.get_object_ring(3).get_nodes(
'a', 'ec-con', 'quorum')
conf = {'devices': _testdir, 'mount_check': 'false'}
df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
for node in nodes:
df = df_mgr.get_diskfile(node['device'], partition,
'a', 'ec-con', 'quorum',
policy=POLICIES[3])
self.assertFalse(os.path.exists(df._datadir))
@unpatch_policies
def test_PUT_ec_fragment_quorum_bad_request(self):
ec_policy = POLICIES[3]
self.put_container("ec", "ec-con")
obj = 'uvarovite-esurience-cerated-symphysic'
prolis = _test_sockets[0]
prosrv = _test_servers[0]
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
call_count = [0]
def mock_committer(self):
call_count[0] += 1
read_footer = \
'swift.obj.server.ObjectController._read_metadata_footer'
commit_confirmation = \
'swift.proxy.controllers.obj.ECPutter.send_commit_confirmation'
with nested(
mock.patch(read_footer),
mock.patch(commit_confirmation, mock_committer)) as \
(read_footer_call, commit_call):
# Emulate missing footer MIME doc in all object-servers
read_footer_call.side_effect = HTTPBadRequest(
body="couldn't find footer MIME doc")
fd = sock.makefile()
fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n'
'Host: localhost\r\n'
'Connection: close\r\n'
'Etag: %s\r\n'
'Content-Length: %d\r\n'
'X-Storage-Token: t\r\n'
'Content-Type: application/octet-stream\r\n'
'\r\n%s' % (md5(obj).hexdigest(), len(obj), obj))
fd.flush()
headers = readuntil2crlfs(fd)
# Don't show a result of the bad conversation between proxy-server
# and object-server
exp = 'HTTP/1.1 503'
self.assertEqual(headers[:len(exp)], exp)
# Don't send commit to object-server if quorum responses consist of 4xx
self.assertEqual(0, call_count[0])
# no fragment archives should have landed on disk
partition, nodes = prosrv.get_object_ring(3).get_nodes(
'a', 'ec-con', 'quorum')
conf = {'devices': _testdir, 'mount_check': 'false'}
df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
for node in nodes:
df = df_mgr.get_diskfile(node['device'], partition,
'a', 'ec-con', 'quorum',
policy=POLICIES[3])
self.assertFalse(os.path.exists(df._datadir))
@unpatch_policies @unpatch_policies
def test_PUT_ec_if_none_match(self): def test_PUT_ec_if_none_match(self):
self.put_container("ec", "ec-con") self.put_container("ec", "ec-con")