Merge "Don't send commits for quorum *BAD* requests on EC"
This commit is contained in:
commit
608bdd7245
@ -50,7 +50,8 @@ from swift.common.http import is_informational, is_success, is_redirection, \
|
||||
HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
|
||||
HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED, HTTP_CONTINUE
|
||||
from swift.common.swob import Request, Response, HeaderKeyDict, Range, \
|
||||
HTTPException, HTTPRequestedRangeNotSatisfiable, HTTPServiceUnavailable
|
||||
HTTPException, HTTPRequestedRangeNotSatisfiable, HTTPServiceUnavailable, \
|
||||
status_map
|
||||
from swift.common.request_helpers import strip_sys_meta_prefix, \
|
||||
strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta
|
||||
from swift.common.storage_policy import POLICIES
|
||||
@ -1542,7 +1543,6 @@ class Controller(object):
|
||||
[(i, s) for i, s in enumerate(statuses)
|
||||
if hundred <= s < hundred + 100]
|
||||
if len(hstatuses) >= quorum_size:
|
||||
resp = Response(request=req)
|
||||
try:
|
||||
status_index, status = max(
|
||||
((i, stat) for i, stat in hstatuses
|
||||
@ -1551,6 +1551,7 @@ class Controller(object):
|
||||
except ValueError:
|
||||
# All statuses were indices to avoid
|
||||
continue
|
||||
resp = status_map[status](request=req)
|
||||
resp.status = '%s %s' % (status, reasons[status_index])
|
||||
resp.body = bodies[status_index]
|
||||
if headers:
|
||||
|
@ -55,10 +55,10 @@ from swift.common.exceptions import ChunkReadTimeout, \
|
||||
InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \
|
||||
PutterConnectError, ChunkReadError
|
||||
from swift.common.http import (
|
||||
is_success, is_server_error, HTTP_CONTINUE, HTTP_CREATED,
|
||||
HTTP_MULTIPLE_CHOICES, HTTP_INTERNAL_SERVER_ERROR,
|
||||
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
|
||||
HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, is_informational)
|
||||
is_informational, is_success, is_client_error, is_server_error,
|
||||
HTTP_CONTINUE, HTTP_CREATED, HTTP_MULTIPLE_CHOICES,
|
||||
HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE,
|
||||
HTTP_INSUFFICIENT_STORAGE, HTTP_PRECONDITION_FAILED, HTTP_CONFLICT)
|
||||
from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
|
||||
ECDriverError, PolicyError)
|
||||
from swift.proxy.controllers.base import Controller, delay_denial, \
|
||||
@ -2240,6 +2240,23 @@ class ECObjectController(BaseObjectController):
|
||||
_('Not enough object servers ack\'ed (got %d)'),
|
||||
statuses.count(HTTP_CONTINUE))
|
||||
raise HTTPServiceUnavailable(request=req)
|
||||
|
||||
elif not self._have_adequate_informational(
|
||||
statuses, min_conns):
|
||||
resp = self.best_response(req, statuses, reasons, bodies,
|
||||
_('Object PUT'),
|
||||
quorum_size=min_conns)
|
||||
if is_client_error(resp.status_int):
|
||||
# if 4xx occurred in this state it is absolutely
|
||||
# a bad conversation between proxy-server and
|
||||
# object-server (even if it's
|
||||
# HTTP_UNPROCESSABLE_ENTITY) so we should regard this
|
||||
# as HTTPServiceUnavailable.
|
||||
raise HTTPServiceUnavailable(request=req)
|
||||
else:
|
||||
# Other errors should use raw best_response
|
||||
raise resp
|
||||
|
||||
# quorum achieved, start 2nd phase - send commit
|
||||
# confirmation to participating object servers
|
||||
# so they write a .durable state file indicating
|
||||
@ -2266,20 +2283,36 @@ class ECObjectController(BaseObjectController):
|
||||
_('ERROR Exception causing client disconnect'))
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
|
||||
def _have_adequate_successes(self, statuses, min_responses):
|
||||
def _have_adequate_responses(
|
||||
self, statuses, min_responses, conditional_func):
|
||||
"""
|
||||
Given a list of statuses from several requests, determine if a
|
||||
satisfactory number of nodes have responded with 2xx statuses to
|
||||
satisfactory number of nodes have responded with 1xx or 2xx statuses to
|
||||
deem the transaction for a succssful response to the client.
|
||||
|
||||
:param statuses: list of statuses returned so far
|
||||
:param min_responses: minimal pass criterion for number of successes
|
||||
:param conditional_func: a callable function to check http status code
|
||||
:returns: True or False, depending on current number of successes
|
||||
"""
|
||||
if sum(1 for s in statuses if is_success(s)) >= min_responses:
|
||||
if sum(1 for s in statuses if (conditional_func(s))) >= min_responses:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _have_adequate_successes(self, statuses, min_responses):
|
||||
"""
|
||||
Partial method of _have_adequate_responses for 2xx
|
||||
"""
|
||||
return self._have_adequate_responses(
|
||||
statuses, min_responses, is_success)
|
||||
|
||||
def _have_adequate_informational(self, statuses, min_responses):
|
||||
"""
|
||||
Partial method of _have_adequate_responses for 2xx
|
||||
"""
|
||||
return self._have_adequate_responses(
|
||||
statuses, min_responses, is_informational)
|
||||
|
||||
def _await_response(self, conn, final_phase):
|
||||
return conn.await_response(
|
||||
self.app.node_timeout, not final_phase)
|
||||
@ -2334,9 +2367,9 @@ class ECObjectController(BaseObjectController):
|
||||
reasons.append(response.reason)
|
||||
if final_phase:
|
||||
body = response.read()
|
||||
bodies.append(body)
|
||||
else:
|
||||
body = ''
|
||||
bodies.append(body)
|
||||
if response.status == HTTP_INSUFFICIENT_STORAGE:
|
||||
putter.failed = True
|
||||
self.app.error_limit(putter.node,
|
||||
@ -2375,7 +2408,8 @@ class ECObjectController(BaseObjectController):
|
||||
bodies.append('')
|
||||
else:
|
||||
# intermediate response phase - set return value to true only
|
||||
# if there are enough 100-continue acknowledgements
|
||||
# if there are responses having same value of *any* status
|
||||
# except 5xx
|
||||
if self.have_quorum(statuses, num_nodes, quorum=min_responses):
|
||||
quorum = True
|
||||
|
||||
|
@ -73,7 +73,7 @@ from swift.proxy.controllers.base import get_container_memcache_key, \
|
||||
import swift.proxy.controllers
|
||||
import swift.proxy.controllers.obj
|
||||
from swift.common.swob import Request, Response, HTTPUnauthorized, \
|
||||
HTTPException, HeaderKeyDict
|
||||
HTTPException, HeaderKeyDict, HTTPBadRequest
|
||||
from swift.common import storage_policy
|
||||
from swift.common.storage_policy import StoragePolicy, ECStoragePolicy, \
|
||||
StoragePolicyCollection, POLICIES
|
||||
@ -2015,6 +2015,122 @@ class TestObjectController(unittest.TestCase):
|
||||
pass
|
||||
self.assertEqual(found, 2)
|
||||
|
||||
@unpatch_policies
|
||||
def test_PUT_ec_fragment_quorum_archive_etag_mismatch(self):
|
||||
ec_policy = POLICIES[3]
|
||||
self.put_container("ec", "ec-con")
|
||||
|
||||
def busted_md5_constructor(initial_str=""):
|
||||
hasher = md5(initial_str)
|
||||
hasher.update('wrong')
|
||||
return hasher
|
||||
|
||||
obj = 'uvarovite-esurience-cerated-symphysic'
|
||||
prolis = _test_sockets[0]
|
||||
prosrv = _test_servers[0]
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
|
||||
call_count = [0]
|
||||
|
||||
def mock_committer(self):
|
||||
call_count[0] += 1
|
||||
|
||||
commit_confirmation = \
|
||||
'swift.proxy.controllers.obj.ECPutter.send_commit_confirmation'
|
||||
|
||||
with nested(
|
||||
mock.patch('swift.obj.server.md5', busted_md5_constructor),
|
||||
mock.patch(commit_confirmation, mock_committer)) as \
|
||||
(_junk, commit_call):
|
||||
fd = sock.makefile()
|
||||
fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Connection: close\r\n'
|
||||
'Etag: %s\r\n'
|
||||
'Content-Length: %d\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'Content-Type: application/octet-stream\r\n'
|
||||
'\r\n%s' % (md5(obj).hexdigest(), len(obj), obj))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 503' # no quorum
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
# Don't send commit to object-server if quorum responses consist of 4xx
|
||||
self.assertEqual(0, call_count[0])
|
||||
|
||||
# no fragment archives should have landed on disk
|
||||
partition, nodes = prosrv.get_object_ring(3).get_nodes(
|
||||
'a', 'ec-con', 'quorum')
|
||||
conf = {'devices': _testdir, 'mount_check': 'false'}
|
||||
|
||||
df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
|
||||
|
||||
for node in nodes:
|
||||
df = df_mgr.get_diskfile(node['device'], partition,
|
||||
'a', 'ec-con', 'quorum',
|
||||
policy=POLICIES[3])
|
||||
self.assertFalse(os.path.exists(df._datadir))
|
||||
|
||||
@unpatch_policies
|
||||
def test_PUT_ec_fragment_quorum_bad_request(self):
|
||||
ec_policy = POLICIES[3]
|
||||
self.put_container("ec", "ec-con")
|
||||
|
||||
obj = 'uvarovite-esurience-cerated-symphysic'
|
||||
prolis = _test_sockets[0]
|
||||
prosrv = _test_servers[0]
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
|
||||
call_count = [0]
|
||||
|
||||
def mock_committer(self):
|
||||
call_count[0] += 1
|
||||
|
||||
read_footer = \
|
||||
'swift.obj.server.ObjectController._read_metadata_footer'
|
||||
commit_confirmation = \
|
||||
'swift.proxy.controllers.obj.ECPutter.send_commit_confirmation'
|
||||
|
||||
with nested(
|
||||
mock.patch(read_footer),
|
||||
mock.patch(commit_confirmation, mock_committer)) as \
|
||||
(read_footer_call, commit_call):
|
||||
# Emulate missing footer MIME doc in all object-servers
|
||||
read_footer_call.side_effect = HTTPBadRequest(
|
||||
body="couldn't find footer MIME doc")
|
||||
|
||||
fd = sock.makefile()
|
||||
fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Connection: close\r\n'
|
||||
'Etag: %s\r\n'
|
||||
'Content-Length: %d\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'Content-Type: application/octet-stream\r\n'
|
||||
'\r\n%s' % (md5(obj).hexdigest(), len(obj), obj))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
|
||||
# Don't show a result of the bad conversation between proxy-server
|
||||
# and object-server
|
||||
exp = 'HTTP/1.1 503'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
# Don't send commit to object-server if quorum responses consist of 4xx
|
||||
self.assertEqual(0, call_count[0])
|
||||
|
||||
# no fragment archives should have landed on disk
|
||||
partition, nodes = prosrv.get_object_ring(3).get_nodes(
|
||||
'a', 'ec-con', 'quorum')
|
||||
conf = {'devices': _testdir, 'mount_check': 'false'}
|
||||
|
||||
df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
|
||||
|
||||
for node in nodes:
|
||||
df = df_mgr.get_diskfile(node['device'], partition,
|
||||
'a', 'ec-con', 'quorum',
|
||||
policy=POLICIES[3])
|
||||
self.assertFalse(os.path.exists(df._datadir))
|
||||
|
||||
@unpatch_policies
|
||||
def test_PUT_ec_if_none_match(self):
|
||||
self.put_container("ec", "ec-con")
|
||||
|
Loading…
Reference in New Issue
Block a user