Pay attention to all punctual nodes
The proxy sends requests to all storage nodes, but it only needs a quorum of them to respond before the proxy can, in turn, respond to the client. Thus, it gets quorum, and then briefly waits to see if the remainder of the storage nodes respond before giving up on them. However, the proxy was not paying any attention to the responses from the non-quorum storage nodes. This would lead to some odd behavior, like a container PUT where the backends returned (201, 201, 202) would become a 201 to the client, but the permutation (201, 202, 201) would become 202. Further, on object PUT, if the last node responded with an error code, that error wouldn't count towards error-limiting. The fix is quite simple: after getting quorum, the make_requests() method was calling a method that returns responses from the remainder of the nodes, but it was ignoring that return value and making up responses with dummy values instead. Now, prior to making up dummy responses, the proxy first uses the responses it already has, and only fills in dummy responses for nodes that really didn't respond in time. Change-Id: I0206b6b2272b0e7dcc80fb6c51840d8dae25cee2
This commit is contained in:
parent
315af1737b
commit
0221f1f847
@ -1072,7 +1072,12 @@ class Controller(object):
|
||||
if self.have_quorum(statuses, len(start_nodes)):
|
||||
break
|
||||
# give any pending requests *some* chance to finish
|
||||
pile.waitall(self.app.post_quorum_timeout)
|
||||
finished_quickly = pile.waitall(self.app.post_quorum_timeout)
|
||||
for resp in finished_quickly:
|
||||
if not resp:
|
||||
continue
|
||||
response.append(resp)
|
||||
statuses.append(resp[0])
|
||||
while len(response) < len(start_nodes):
|
||||
response.append((HTTP_SERVICE_UNAVAILABLE, '', '', ''))
|
||||
statuses, reasons, resp_headers, bodies = zip(*response)
|
||||
|
@ -398,24 +398,33 @@ class ObjectController(Controller):
|
||||
pile = GreenAsyncPile(len(conns))
|
||||
for conn in conns:
|
||||
pile.spawn(get_conn_response, conn)
|
||||
|
||||
def _handle_response(conn, response):
|
||||
statuses.append(response.status)
|
||||
reasons.append(response.reason)
|
||||
bodies.append(response.read())
|
||||
if response.status >= HTTP_INTERNAL_SERVER_ERROR:
|
||||
self.app.error_occurred(
|
||||
conn.node,
|
||||
_('ERROR %(status)d %(body)s From Object Server '
|
||||
're: %(path)s') %
|
||||
{'status': response.status,
|
||||
'body': bodies[-1][:1024], 'path': req.path})
|
||||
elif is_success(response.status):
|
||||
etags.add(response.getheader('etag').strip('"'))
|
||||
|
||||
for (conn, response) in pile:
|
||||
if response:
|
||||
statuses.append(response.status)
|
||||
reasons.append(response.reason)
|
||||
bodies.append(response.read())
|
||||
if response.status >= HTTP_INTERNAL_SERVER_ERROR:
|
||||
self.app.error_occurred(
|
||||
conn.node,
|
||||
_('ERROR %(status)d %(body)s From Object Server '
|
||||
're: %(path)s') %
|
||||
{'status': response.status,
|
||||
'body': bodies[-1][:1024], 'path': req.path})
|
||||
elif is_success(response.status):
|
||||
etags.add(response.getheader('etag').strip('"'))
|
||||
_handle_response(conn, response)
|
||||
if self.have_quorum(statuses, len(nodes)):
|
||||
break
|
||||
|
||||
# give any pending requests *some* chance to finish
|
||||
pile.waitall(self.app.post_quorum_timeout)
|
||||
finished_quickly = pile.waitall(self.app.post_quorum_timeout)
|
||||
for (conn, response) in finished_quickly:
|
||||
if response:
|
||||
_handle_response(conn, response)
|
||||
|
||||
while len(statuses) < len(nodes):
|
||||
statuses.append(HTTP_SERVICE_UNAVAILABLE)
|
||||
reasons.append('')
|
||||
|
@ -1588,6 +1588,7 @@ class TestObjectController(unittest.TestCase):
|
||||
test_status_map((200, 200, 201, 201, 500), 201)
|
||||
test_status_map((200, 200, 204, 404, 404), 404)
|
||||
test_status_map((200, 200, 204, 500, 404), 503)
|
||||
test_status_map((200, 200, 202, 202, 204), 204)
|
||||
|
||||
def test_PUT_connect_exceptions(self):
|
||||
with save_globals():
|
||||
@ -2614,6 +2615,7 @@ class TestObjectController(unittest.TestCase):
|
||||
'container', 'object')
|
||||
controller.app.sort_nodes = lambda l: l
|
||||
object_ring = controller.app.get_object_ring(None)
|
||||
# acc con obj obj obj
|
||||
self.assert_status_map(controller.PUT, (200, 200, 503, 200, 200),
|
||||
200)
|
||||
|
||||
@ -2625,6 +2627,24 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assert_('last_error' not in object_ring.devs[1])
|
||||
self.assert_('last_error' not in object_ring.devs[2])
|
||||
|
||||
def test_PUT_error_limiting_last_node(self):
|
||||
with save_globals():
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
controller.app.sort_nodes = lambda l: l
|
||||
object_ring = controller.app.get_object_ring(None)
|
||||
# acc con obj obj obj
|
||||
self.assert_status_map(controller.PUT, (200, 200, 200, 200, 503),
|
||||
200)
|
||||
|
||||
# 2, not 1, because assert_status_map() calls the method twice
|
||||
self.assertEquals(object_ring.devs[0].get('errors', 0), 0)
|
||||
self.assertEquals(object_ring.devs[1].get('errors', 0), 0)
|
||||
self.assertEquals(object_ring.devs[2].get('errors', 0), 2)
|
||||
self.assert_('last_error' not in object_ring.devs[0])
|
||||
self.assert_('last_error' not in object_ring.devs[1])
|
||||
self.assert_('last_error' in object_ring.devs[2])
|
||||
|
||||
def test_acc_or_con_missing_returns_404(self):
|
||||
with save_globals():
|
||||
self.app.memcache = FakeMemcacheReturnsNone()
|
||||
@ -6106,6 +6126,12 @@ class TestAccountController(unittest.TestCase):
|
||||
self.assert_status_map(controller.PUT, (201, -1, -1), 503)
|
||||
self.assert_status_map(controller.PUT, (503, 503, -1), 503)
|
||||
|
||||
def test_PUT_status(self):
|
||||
with save_globals():
|
||||
self.app.allow_account_management = True
|
||||
controller = proxy_server.AccountController(self.app, 'account')
|
||||
self.assert_status_map(controller.PUT, (201, 201, 202), 202)
|
||||
|
||||
def test_PUT_metadata(self):
|
||||
self.metadata_helper('PUT')
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user