From d34769fa76b428649021084fb8d4991514f9e8e0 Mon Sep 17 00:00:00 2001 From: Zhihao Yuan Date: Mon, 8 Apr 2013 18:18:31 -0400 Subject: [PATCH] All endpoints in WSGI pick up the faulty driver. Change-Id: I97cdac4235def4fed2a195064d331ff26b9f340d Implements: blueprint transport-wsgi --- marconi/tests/transport/wsgi/test_claims.py | 56 +++++++++++++--- marconi/tests/transport/wsgi/test_messages.py | 60 ++++++++++++++--- .../transport/wsgi/test_queue_lifecycle.py | 4 ++ marconi/transport/wsgi/claims.py | 40 +++++++++-- marconi/transport/wsgi/messages.py | 67 +++++++++++++------ marconi/transport/wsgi/stats.py | 22 ++++-- 6 files changed, 198 insertions(+), 51 deletions(-) diff --git a/marconi/tests/transport/wsgi/test_claims.py b/marconi/tests/transport/wsgi/test_claims.py index 91b8162f0..2e7dff4b2 100644 --- a/marconi/tests/transport/wsgi/test_claims.py +++ b/marconi/tests/transport/wsgi/test_claims.py @@ -18,20 +18,13 @@ import json import falcon from falcon import testing -import marconi -from marconi.tests import util +from marconi.tests.transport.wsgi import base -class TestClaims(util.TestBase): +class ClaimsBaseTest(base.TestBase): def setUp(self): - super(TestClaims, self).setUp() - - conf_file = self.conf_path('wsgi_sqlite.conf') - boot = marconi.Bootstrap(conf_file) - - self.app = boot.transport.app - self.srmock = testing.StartResponseMock() + super(ClaimsBaseTest, self).setUp() doc = '{ "_ttl": 60 }' env = testing.create_environ('/v1/480924/queues/fizbit', @@ -197,4 +190,45 @@ class TestClaims(util.TestBase): method="DELETE") self.app(env, self.srmock) - super(TestClaims, self).tearDown() + super(ClaimsBaseTest, self).tearDown() + + +class ClaimsSQLiteTests(ClaimsBaseTest): + + config_filename = 'wsgi_sqlite.conf' + + +class ClaimsFaultyDriverTests(base.TestBase): + + config_filename = 'wsgi_faulty.conf' + + def test_simple(self): + doc = '{ "ttl": 100 }' + env = testing.create_environ('/v1/480924/queues/fizbit/claims', + method="POST", + body=doc) + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_503) + + env = testing.create_environ('/v1/480924/queues/fizbit/claims' + '/nonexistent', + method="GET") + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_503) + + env = testing.create_environ('/v1/480924/queues/fizbit/claims' + '/nonexistent', + method="PATCH", + body=doc) + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_503) + + env = testing.create_environ('/v1/480924/queues/fizbit/claims' + '/nonexistent', + method="DELETE") + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_503) diff --git a/marconi/tests/transport/wsgi/test_messages.py b/marconi/tests/transport/wsgi/test_messages.py index 7836f901c..6d8546e36 100644 --- a/marconi/tests/transport/wsgi/test_messages.py +++ b/marconi/tests/transport/wsgi/test_messages.py @@ -18,20 +18,13 @@ import json import falcon from falcon import testing -import marconi -from marconi.tests import util +from marconi.tests.transport.wsgi import base -class TestMessages(util.TestBase): +class MessagesBaseTest(base.TestBase): def setUp(self): - super(TestMessages, self).setUp() - - conf_file = self.conf_path('wsgi_sqlite.conf') - boot = marconi.Bootstrap(conf_file) - - self.app = boot.transport.app - self.srmock = testing.StartResponseMock() + super(MessagesBaseTest, self).setUp() doc = '{ "_ttl": 60 }' env = testing.create_environ('/v1/480924/queues/fizbit', @@ -193,7 +186,7 @@ class TestMessages(util.TestBase): method="DELETE") self.app(env, self.srmock) - super(TestMessages, self).tearDown() + super(MessagesBaseTest, self).tearDown() def _post_messages(self, target, repeat=1): doc = json.dumps([{"body": 239, "ttl": 30}] * repeat) @@ -206,3 +199,48 @@ class TestMessages(util.TestBase): def _get_msg_ids(self, headers_dict): return headers_dict['Location'].rsplit('/', 1)[-1].split(',') + + +class MessagesSQLiteTests(MessagesBaseTest): + + config_filename = 'wsgi_sqlite.conf' + + +class MessagesFaultyDriverTests(base.TestBase): + + config_filename = 'wsgi_faulty.conf' + + def test_simple(self): + doc = '[{"body": 239, "ttl": 10}]' + headers = { + 'Client-ID': '30387f00', + } + + env = testing.create_environ('/v1/480924/queues/fizbit/messages', + method="POST", + body=doc, + headers=headers) + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_503) + + env = testing.create_environ('/v1/480924/queues/fizbit/messages', + method="GET", + headers=headers) + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_503) + + env = testing.create_environ('/v1/480924/queues/fizbit/messages' + '/nonexistent', + method="GET") + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_503) + + env = testing.create_environ('/v1/480924/queues/fizbit/messages' + '/nonexistent', + method="DELETE") + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_503) diff --git a/marconi/tests/transport/wsgi/test_queue_lifecycle.py b/marconi/tests/transport/wsgi/test_queue_lifecycle.py index 2fa67dc7b..2df131347 100644 --- a/marconi/tests/transport/wsgi/test_queue_lifecycle.py +++ b/marconi/tests/transport/wsgi/test_queue_lifecycle.py @@ -225,6 +225,10 @@ class QueueFaultyDriverTests(base.TestBase): self.assertEquals(self.srmock.status, falcon.HTTP_503) self.assertNotEquals(result_doc, json.loads(doc)) + env = testing.create_environ('/v1/480924/queues/gumshoe/stats') + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_503) + env = testing.create_environ('/v1/480924/queues') self.app(env, self.srmock) self.assertEquals(self.srmock.status, falcon.HTTP_503) diff --git a/marconi/transport/wsgi/claims.py b/marconi/transport/wsgi/claims.py index 08d20a4a1..c046d2b0d 100644 --- a/marconi/transport/wsgi/claims.py +++ b/marconi/transport/wsgi/claims.py @@ -13,12 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + import falcon from marconi.storage import exceptions from marconi.transport import helpers +LOG = logging.getLogger(__name__) + + class CollectionResource(object): __slots__ = ('claim_ctrl') @@ -65,6 +70,12 @@ class CollectionResource(object): except exceptions.DoesNotExist: raise falcon.HTTPNotFound + except Exception as ex: + LOG.exception(ex) + title = _('Service temporarily unavailable') + msg = _('Please try again in a few seconds.') + raise falcon.HTTPServiceUnavailable(title, msg, 30) + class ItemResource(object): @@ -96,6 +107,12 @@ class ItemResource(object): except exceptions.DoesNotExist: raise falcon.HTTPNotFound + except Exception as ex: + LOG.exception(ex) + title = _('Service temporarily unavailable') + msg = _('Please try again in a few seconds.') + raise falcon.HTTPServiceUnavailable(title, msg, 30) + def on_patch(self, req, resp, tenant_id, queue_name, claim_id): if req.content_length is None or req.content_length == 0: raise falcon.HTTPBadRequest(_('Bad request'), @@ -117,12 +134,25 @@ class ItemResource(object): except exceptions.DoesNotExist: raise falcon.HTTPNotFound - def on_delete(self, req, resp, tenant_id, queue_name, claim_id): - self.claim_ctrl.delete(queue_name, - claim_id=claim_id, - tenant=tenant_id) + except Exception as ex: + LOG.exception(ex) + title = _('Service temporarily unavailable') + msg = _('Please try again in a few seconds.') + raise falcon.HTTPServiceUnavailable(title, msg, 30) - resp.status = falcon.HTTP_204 + def on_delete(self, req, resp, tenant_id, queue_name, claim_id): + try: + self.claim_ctrl.delete(queue_name, + claim_id=claim_id, + tenant=tenant_id) + + resp.status = falcon.HTTP_204 + + except Exception as ex: + LOG.exception(ex) + title = _('Service temporarily unavailable') + msg = _('Please try again in a few seconds.') + raise falcon.HTTPServiceUnavailable(title, msg, 30) def _filtered(obj): diff --git a/marconi/transport/wsgi/messages.py b/marconi/transport/wsgi/messages.py index 923af2aa0..65c076b2c 100644 --- a/marconi/transport/wsgi/messages.py +++ b/marconi/transport/wsgi/messages.py @@ -13,12 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + import falcon from marconi.storage import exceptions from marconi.transport import helpers +LOG = logging.getLogger(__name__) + + class CollectionResource(object): __slots__ = ('msg_ctrl') @@ -63,6 +68,12 @@ class CollectionResource(object): except exceptions.DoesNotExist: raise falcon.HTTPNotFound + except Exception as ex: + LOG.exception(ex) + title = _('Service temporarily unavailable') + msg = _('Please try again in a few seconds.') + raise falcon.HTTPServiceUnavailable(title, msg, 30) + def on_get(self, req, resp, tenant_id, queue_name): uuid = req.get_header('Client-ID', required=True) @@ -84,28 +95,34 @@ class CollectionResource(object): **kwargs) resp_dict['messages'] = list(msgs) + if len(resp_dict['messages']) != 0: + kwargs['marker'] = resp_dict['messages'][-1]['marker'] + for msg in resp_dict['messages']: + msg['href'] = req.path + '/' + msg['id'] + del msg['id'] + del msg['marker'] + + resp_dict['links'] = [ + { + 'rel': 'next', + 'href': req.path + falcon.to_query_str(kwargs) + } + ] + + resp.content_location = req.relative_uri + resp.body = helpers.to_json(resp_dict) + resp.status = falcon.HTTP_200 + else: + resp.status = falcon.HTTP_204 + except exceptions.DoesNotExist: raise falcon.HTTPNotFound - if len(resp_dict['messages']) != 0: - kwargs['marker'] = resp_dict['messages'][-1]['marker'] - for msg in resp_dict['messages']: - msg['href'] = req.path + '/' + msg['id'] - del msg['id'] - del msg['marker'] - - resp_dict['links'] = [ - { - 'rel': 'next', - 'href': req.path + falcon.to_query_str(kwargs) - } - ] - - resp.content_location = req.relative_uri - resp.body = helpers.to_json(resp_dict) - resp.status = falcon.HTTP_200 - else: - resp.status = falcon.HTTP_204 + except Exception as ex: + LOG.exception(ex) + title = _('Service temporarily unavailable') + msg = _('Please try again in a few seconds.') + raise falcon.HTTPServiceUnavailable(title, msg, 30) class ItemResource(object): @@ -131,6 +148,12 @@ class ItemResource(object): except exceptions.DoesNotExist: raise falcon.HTTPNotFound + except Exception as ex: + LOG.exception(ex) + title = _('Service temporarily unavailable') + msg = _('Please try again in a few seconds.') + raise falcon.HTTPServiceUnavailable(title, msg, 30) + def on_delete(self, req, resp, tenant_id, queue_name, message_id): try: self.msg_ctrl.delete(queue_name, @@ -142,3 +165,9 @@ class ItemResource(object): except exceptions.NotPermitted: resp.status = falcon.HTTP_403 + + except Exception as ex: + LOG.exception(ex) + title = _('Service temporarily unavailable') + msg = _('Please try again in a few seconds.') + raise falcon.HTTPServiceUnavailable(title, msg, 30) diff --git a/marconi/transport/wsgi/stats.py b/marconi/transport/wsgi/stats.py index e26a72c12..19b65e816 100644 --- a/marconi/transport/wsgi/stats.py +++ b/marconi/transport/wsgi/stats.py @@ -13,11 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + import falcon from marconi.transport import helpers +LOG = logging.getLogger(__name__) + + class Resource(object): __slots__ = ('queue_ctrl') @@ -26,9 +31,16 @@ class Resource(object): self.queue_ctrl = queue_controller def on_get(self, req, resp, tenant_id, queue_name): - resp_dict = self.queue_ctrl.stats(queue_name, - tenant=tenant_id) + try: + resp_dict = self.queue_ctrl.stats(queue_name, + tenant=tenant_id) - resp.content_location = req.path - resp.body = helpers.to_json(resp_dict) - resp.status = falcon.HTTP_200 + resp.content_location = req.path + resp.body = helpers.to_json(resp_dict) + resp.status = falcon.HTTP_200 + + except Exception as ex: + LOG.exception(ex) + title = _('Service temporarily unavailable') + msg = _('Please try again in a few seconds.') + raise falcon.HTTPServiceUnavailable(title, msg, 30)