Make GreenAsyncPile not hang

It's probably weird that StreamingPile has this interfaces that swallows
exceptions, but this seems better than hanging.

Change-Id: I8fe45c0f0d291efc84f3edf5d6b7cd116b5c7835
This commit is contained in:
Clay Gerrard 2018-12-06 16:36:16 -08:00 committed by Tim Burke
parent 3189410f9d
commit 25aeb0ca49
4 changed files with 136 additions and 8 deletions

View File

@ -3254,6 +3254,9 @@ class GreenAsyncPileWaitallTimeout(Timeout):
pass
DEAD = object()
class GreenAsyncPile(object):
"""
Runs jobs in a pool of green threads, and the results can be retrieved by
@ -3282,6 +3285,8 @@ class GreenAsyncPile(object):
def _run_func(self, func, args, kwargs):
try:
self._responses.put(func(*args, **kwargs))
except Exception:
self._responses.put(DEAD)
finally:
self._inflight -= 1
@ -3332,14 +3337,17 @@ class GreenAsyncPile(object):
return self
def next(self):
try:
rv = self._responses.get_nowait()
except eventlet.queue.Empty:
if self._inflight == 0:
raise StopIteration()
rv = self._responses.get()
self._pending -= 1
return rv
while True:
try:
rv = self._responses.get_nowait()
except eventlet.queue.Empty:
if self._inflight == 0:
raise StopIteration()
rv = self._responses.get()
self._pending -= 1
if rv == DEAD:
continue
return rv
__next__ = next

View File

@ -18,6 +18,7 @@ import json
import unittest
from datetime import datetime
from hashlib import md5
import mock
from swift.common import swob
from swift.common.swob import Request
@ -320,6 +321,27 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'MalformedXML')
@s3acl
def test_object_multi_DELETE_unhandled_exception(self):
exploding_resp = mock.MagicMock(
side_effect=Exception('kaboom'))
self.swift.register('DELETE', '/v1/AUTH_test/bucket/Key1',
exploding_resp, {}, None)
elem = Element('Delete')
obj = SubElement(elem, 'Object')
SubElement(obj, 'Key').text = 'Key1'
body = tostring(elem, use_s3ns=False)
content_md5 = base64.b64encode(md5(body).digest()).strip()
req = Request.blank('/bucket?delete',
environ={'REQUEST_METHOD': 'POST'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'Content-MD5': content_md5},
body=body)
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
def _test_object_multi_DELETE(self, account):
self.keys = ['Key1', 'Key2']
self.swift.register(

View File

@ -5939,6 +5939,13 @@ class TestAuditLocationGenerator(unittest.TestCase):
class TestGreenAsyncPile(unittest.TestCase):
def setUp(self):
self.timeout = Timeout(5.0)
def tearDown(self):
self.timeout.cancel()
def test_runs_everything(self):
def run_test():
tests_ran[0] += 1
@ -6045,6 +6052,58 @@ class TestGreenAsyncPile(unittest.TestCase):
# pending remains 0
self.assertEqual(0, pile._pending)
def _exploder(self, arg):
if isinstance(arg, Exception):
raise arg
else:
return arg
def test_blocking_last_next_explodes(self):
pile = utils.GreenAsyncPile(2)
pile.spawn(self._exploder, 1)
pile.spawn(self._exploder, 2)
pile.spawn(self._exploder, Exception('kaboom'))
self.assertEqual(1, next(pile))
self.assertEqual(2, next(pile))
with self.assertRaises(StopIteration):
next(pile)
self.assertEqual(pile.inflight, 0)
self.assertEqual(pile._pending, 0)
def test_no_blocking_last_next_explodes(self):
pile = utils.GreenAsyncPile(10)
pile.spawn(self._exploder, 1)
self.assertEqual(1, next(pile))
pile.spawn(self._exploder, 2)
self.assertEqual(2, next(pile))
pile.spawn(self._exploder, Exception('kaboom'))
with self.assertRaises(StopIteration):
next(pile)
self.assertEqual(pile.inflight, 0)
self.assertEqual(pile._pending, 0)
def test_exceptions_in_streaming_pile(self):
with utils.StreamingPile(2) as pile:
results = list(pile.asyncstarmap(self._exploder, [
(1,),
(Exception('kaboom'),),
(3,),
]))
self.assertEqual(results, [1, 3])
self.assertEqual(pile.inflight, 0)
self.assertEqual(pile._pending, 0)
def test_exceptions_at_end_of_streaming_pile(self):
with utils.StreamingPile(2) as pile:
results = list(pile.asyncstarmap(self._exploder, [
(1,),
(2,),
(Exception('kaboom'),),
]))
self.assertEqual(results, [1, 2])
self.assertEqual(pile.inflight, 0)
self.assertEqual(pile._pending, 0)
class TestLRUCache(unittest.TestCase):

View File

@ -1570,6 +1570,45 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 404)
def test_GET_primaries_explode(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
codes = [Exception('kaboom!')] * self.obj_ring.replicas + (
[404] * self.obj_ring.max_more_nodes)
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 503)
def test_GET_primaries_timeout(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
codes = [Timeout()] * self.obj_ring.replicas + (
[404] * self.obj_ring.max_more_nodes)
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 503)
def test_GET_primaries_mixed_explode_and_timeout(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
primaries = []
for i in range(self.obj_ring.replicas):
if i % 2:
primaries.append(Timeout())
else:
primaries.append(Exception('kaboom!'))
codes = primaries + [404] * self.obj_ring.max_more_nodes
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 503)
def test_primary_returns_some_nonsense_timestamp(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
# an un-handled ValueError in _make_node_request should just continue
# to the next node rather than hang the request
headers = [{'X-Backend-Timestamp': 'not-a-timestamp'}, {}]
codes = [200, 200]
with set_http_connect(*codes, headers=headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
def test_GET_not_found_when_404_newer(self):
# if proxy receives a 404, it keeps waiting for other connections until
# max number of nodes in hopes of finding an object, but if 404 is