diff --git a/swift/common/utils.py b/swift/common/utils.py index acd6d161ed..02be46a5ea 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -3254,6 +3254,9 @@ class GreenAsyncPileWaitallTimeout(Timeout): pass +DEAD = object() + + class GreenAsyncPile(object): """ Runs jobs in a pool of green threads, and the results can be retrieved by @@ -3282,6 +3285,8 @@ class GreenAsyncPile(object): def _run_func(self, func, args, kwargs): try: self._responses.put(func(*args, **kwargs)) + except Exception: + self._responses.put(DEAD) finally: self._inflight -= 1 @@ -3332,14 +3337,17 @@ class GreenAsyncPile(object): return self def next(self): - try: - rv = self._responses.get_nowait() - except eventlet.queue.Empty: - if self._inflight == 0: - raise StopIteration() - rv = self._responses.get() - self._pending -= 1 - return rv + while True: + try: + rv = self._responses.get_nowait() + except eventlet.queue.Empty: + if self._inflight == 0: + raise StopIteration() + rv = self._responses.get() + self._pending -= 1 + if rv == DEAD: + continue + return rv __next__ = next diff --git a/test/unit/common/middleware/s3api/test_multi_delete.py b/test/unit/common/middleware/s3api/test_multi_delete.py index 6ea5f59003..b5295721c8 100644 --- a/test/unit/common/middleware/s3api/test_multi_delete.py +++ b/test/unit/common/middleware/s3api/test_multi_delete.py @@ -18,6 +18,7 @@ import json import unittest from datetime import datetime from hashlib import md5 +import mock from swift.common import swob from swift.common.swob import Request @@ -320,6 +321,27 @@ class TestS3ApiMultiDelete(S3ApiTestCase): status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'MalformedXML') + @s3acl + def test_object_multi_DELETE_unhandled_exception(self): + exploding_resp = mock.MagicMock( + side_effect=Exception('kaboom')) + self.swift.register('DELETE', '/v1/AUTH_test/bucket/Key1', + exploding_resp, {}, None) + elem = Element('Delete') + obj = SubElement(elem, 'Object') + SubElement(obj, 'Key').text = 'Key1' + body = tostring(elem, use_s3ns=False) + content_md5 = base64.b64encode(md5(body).digest()).strip() + + req = Request.blank('/bucket?delete', + environ={'REQUEST_METHOD': 'POST'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'Date': self.get_date_header(), + 'Content-MD5': content_md5}, + body=body) + status, headers, body = self.call_s3api(req) + self.assertEqual(status.split()[0], '200') + def _test_object_multi_DELETE(self, account): self.keys = ['Key1', 'Key2'] self.swift.register( diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index c8393b96f5..40afc6351e 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -5939,6 +5939,13 @@ class TestAuditLocationGenerator(unittest.TestCase): class TestGreenAsyncPile(unittest.TestCase): + + def setUp(self): + self.timeout = Timeout(5.0) + + def tearDown(self): + self.timeout.cancel() + def test_runs_everything(self): def run_test(): tests_ran[0] += 1 @@ -6045,6 +6052,58 @@ class TestGreenAsyncPile(unittest.TestCase): # pending remains 0 self.assertEqual(0, pile._pending) + def _exploder(self, arg): + if isinstance(arg, Exception): + raise arg + else: + return arg + + def test_blocking_last_next_explodes(self): + pile = utils.GreenAsyncPile(2) + pile.spawn(self._exploder, 1) + pile.spawn(self._exploder, 2) + pile.spawn(self._exploder, Exception('kaboom')) + self.assertEqual(1, next(pile)) + self.assertEqual(2, next(pile)) + with self.assertRaises(StopIteration): + next(pile) + self.assertEqual(pile.inflight, 0) + self.assertEqual(pile._pending, 0) + + def test_no_blocking_last_next_explodes(self): + pile = utils.GreenAsyncPile(10) + pile.spawn(self._exploder, 1) + self.assertEqual(1, next(pile)) + pile.spawn(self._exploder, 2) + self.assertEqual(2, next(pile)) + pile.spawn(self._exploder, Exception('kaboom')) + with self.assertRaises(StopIteration): + next(pile) + self.assertEqual(pile.inflight, 0) + self.assertEqual(pile._pending, 0) + + def test_exceptions_in_streaming_pile(self): + with utils.StreamingPile(2) as pile: + results = list(pile.asyncstarmap(self._exploder, [ + (1,), + (Exception('kaboom'),), + (3,), + ])) + self.assertEqual(results, [1, 3]) + self.assertEqual(pile.inflight, 0) + self.assertEqual(pile._pending, 0) + + def test_exceptions_at_end_of_streaming_pile(self): + with utils.StreamingPile(2) as pile: + results = list(pile.asyncstarmap(self._exploder, [ + (1,), + (2,), + (Exception('kaboom'),), + ])) + self.assertEqual(results, [1, 2]) + self.assertEqual(pile.inflight, 0) + self.assertEqual(pile._pending, 0) + class TestLRUCache(unittest.TestCase): diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index b98b6ad0fb..3a73b7b98b 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -1570,6 +1570,45 @@ class TestReplicatedObjController(CommonObjectControllerMixin, resp = req.get_response(self.app) self.assertEqual(resp.status_int, 404) + def test_GET_primaries_explode(self): + req = swift.common.swob.Request.blank('/v1/a/c/o') + codes = [Exception('kaboom!')] * self.obj_ring.replicas + ( + [404] * self.obj_ring.max_more_nodes) + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 503) + + def test_GET_primaries_timeout(self): + req = swift.common.swob.Request.blank('/v1/a/c/o') + codes = [Timeout()] * self.obj_ring.replicas + ( + [404] * self.obj_ring.max_more_nodes) + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 503) + + def test_GET_primaries_mixed_explode_and_timeout(self): + req = swift.common.swob.Request.blank('/v1/a/c/o') + primaries = [] + for i in range(self.obj_ring.replicas): + if i % 2: + primaries.append(Timeout()) + else: + primaries.append(Exception('kaboom!')) + codes = primaries + [404] * self.obj_ring.max_more_nodes + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 503) + + def test_primary_returns_some_nonsense_timestamp(self): + req = swift.common.swob.Request.blank('/v1/a/c/o') + # an un-handled ValueError in _make_node_request should just continue + # to the next node rather than hang the request + headers = [{'X-Backend-Timestamp': 'not-a-timestamp'}, {}] + codes = [200, 200] + with set_http_connect(*codes, headers=headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + def test_GET_not_found_when_404_newer(self): # if proxy receives a 404, it keeps waiting for other connections until # max number of nodes in hopes of finding an object, but if 404 is