Clean up exception handling in the WSGI transport driver.
This patch localizes try...except blocks, DRYs construction of certain HTTP exceptions, and modifies what types of exceptions are thrown (in some cases), to be more precise. I also factored out some input validation code that happened to be mixed up with all the exception handling. Implements: blueprint grizzly-debt Change-Id: I1030308bed1ec55477b176bf022118d53b46b7ff
This commit is contained in:
parent
14061a2466
commit
5f87afafcd
136
marconi/tests/transport/wsgi/test_helpers.py
Normal file
136
marconi/tests/transport/wsgi/test_helpers.py
Normal file
@ -0,0 +1,136 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
#
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import io
|
||||
|
||||
import falcon
|
||||
import json
|
||||
import testtools
|
||||
|
||||
from marconi.transport.wsgi import helpers
|
||||
|
||||
|
||||
class TestWSGIHelpers(testtools.TestCase):
|
||||
|
||||
def test_get_checked_field_missing(self):
|
||||
doc = {}
|
||||
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
helpers.get_checked_field, doc, 'openstack', int)
|
||||
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
helpers.get_checked_field, doc, 42, int)
|
||||
|
||||
doc = {'openstac': 10}
|
||||
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
helpers.get_checked_field, doc, 'openstack', int)
|
||||
|
||||
def test_get_checked_field_bad_type(self):
|
||||
doc = {'openstack': '10'}
|
||||
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
helpers.get_checked_field, doc, 'openstack', int)
|
||||
|
||||
doc = {'openstack': 10, 'openstack-mq': 'test'}
|
||||
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
helpers.get_checked_field, doc, 'openstack', str)
|
||||
|
||||
doc = {'openstack': '[1, 2]'}
|
||||
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
helpers.get_checked_field, doc, 'openstack', list)
|
||||
|
||||
def test_get_checked_field(self):
|
||||
doc = {'hello': 'world', 'teh answer': 42, 'question': []}
|
||||
|
||||
value = helpers.get_checked_field(doc, 'hello', str)
|
||||
self.assertEquals(value, 'world')
|
||||
|
||||
value = helpers.get_checked_field(doc, 'teh answer', int)
|
||||
self.assertEquals(value, 42)
|
||||
|
||||
value = helpers.get_checked_field(doc, 'question', list)
|
||||
self.assertEquals(value, [])
|
||||
|
||||
def test_filter_missing(self):
|
||||
doc = {'body': {'event': 'start_backup'}}
|
||||
spec = (('tag', dict),)
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
helpers.filter, doc, spec)
|
||||
|
||||
def test_filter_bad_type(self):
|
||||
doc = {'ttl': '300', 'bogus': 'yogabbagabba'}
|
||||
spec = [('ttl', int)]
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
helpers.filter, doc, spec)
|
||||
|
||||
def test_filter(self):
|
||||
doc = {'body': {'event': 'start_backup'}}
|
||||
|
||||
def spec():
|
||||
yield ('body', dict)
|
||||
|
||||
filtered = helpers.filter(doc, spec())
|
||||
self.assertEqual(filtered, doc)
|
||||
|
||||
doc = {'ttl': 300, 'bogus': 'yogabbagabba'}
|
||||
spec = [('ttl', int)]
|
||||
filtered = helpers.filter(doc, spec)
|
||||
self.assertEqual(filtered, {'ttl': 300})
|
||||
|
||||
doc = {'body': {'event': 'start_backup'}, 'ttl': 300}
|
||||
spec = (('body', dict), ('ttl', int))
|
||||
filtered = helpers.filter(doc, spec)
|
||||
self.assertEqual(filtered, doc)
|
||||
|
||||
def test_filter_star(self):
|
||||
doc = {'ttl': 300, 'body': {'event': 'start_backup'}}
|
||||
|
||||
spec = [('body', '*'), ('ttl', '*')]
|
||||
filtered = helpers.filter(doc, spec)
|
||||
|
||||
self.assertEqual(filtered, doc)
|
||||
|
||||
def test_filter_stream_expect_obj(self):
|
||||
obj = {u'body': {'event': 'start_backup'}, 'id': 'DEADBEEF'}
|
||||
|
||||
stream = io.StringIO(json.dumps(obj, ensure_ascii=False))
|
||||
spec = [('body', dict), ('id', basestring)]
|
||||
filtered_object, = helpers.filter_stream(stream, spec)
|
||||
|
||||
self.assertEqual(filtered_object, obj)
|
||||
|
||||
stream.seek(0)
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
helpers.filter_stream, stream, spec,
|
||||
doctype=helpers.JSONArray)
|
||||
|
||||
def test_filter_stream_expect_array(self):
|
||||
array = [{u'body': {u'x': 1}}, {u'body': {u'x': 2}}]
|
||||
|
||||
stream = io.StringIO(json.dumps(array, ensure_ascii=False))
|
||||
spec = [('body', dict)]
|
||||
filtered_objects = list(helpers.filter_stream(
|
||||
stream, spec, doctype=helpers.JSONArray))
|
||||
|
||||
self.assertEqual(filtered_objects, array)
|
||||
|
||||
stream.seek(0)
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
helpers.filter_stream, stream, spec,
|
||||
doctype=helpers.JSONObject)
|
@ -16,20 +16,21 @@
|
||||
import simplejson as json
|
||||
|
||||
|
||||
class MalformedJSON(Exception):
|
||||
class MalformedJSON(ValueError):
|
||||
"""JSON string is not valid."""
|
||||
pass
|
||||
|
||||
|
||||
def read_json(stream):
|
||||
"""Like json.load, but raises an exception upon failure.
|
||||
"""Like json.load, but converts ValueError to MalformedJSON upon failure.
|
||||
|
||||
:param stream: a file-like object
|
||||
"""
|
||||
try:
|
||||
return json.load(stream)
|
||||
|
||||
except Exception:
|
||||
raise MalformedJSON
|
||||
except ValueError as ex:
|
||||
raise MalformedJSON(ex.message)
|
||||
|
||||
|
||||
def to_json(obj):
|
||||
|
@ -16,11 +16,14 @@
|
||||
import falcon
|
||||
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.storage import exceptions
|
||||
from marconi.storage import exceptions as storage_exceptions
|
||||
from marconi.transport import helpers
|
||||
from marconi.transport.wsgi import exceptions as wsgi_exceptions
|
||||
from marconi.transport.wsgi import helpers as wsgi_helpers
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CLAIM_METADATA_SPEC = (('ttl', int),)
|
||||
|
||||
|
||||
class CollectionResource(object):
|
||||
@ -31,46 +34,47 @@ class CollectionResource(object):
|
||||
self.claim_ctrl = claim_controller
|
||||
|
||||
def on_post(self, req, resp, project_id, queue_name):
|
||||
if req.content_length is None or req.content_length == 0:
|
||||
raise falcon.HTTPBadRequest(_('Bad request'),
|
||||
_('Missing claim metadata.'))
|
||||
|
||||
# Check for an explicit limit on the # of messages to claim
|
||||
limit = req.get_param_as_int('limit')
|
||||
claim_options = {} if limit is None else {'limit': limit}
|
||||
|
||||
# Read claim metadata (e.g., TTL) and raise appropriate
|
||||
# HTTP errors as needed.
|
||||
metadata, = wsgi_helpers.filter_stream(req.stream, CLAIM_METADATA_SPEC)
|
||||
|
||||
# Claim some messages
|
||||
try:
|
||||
metadata = _filtered(helpers.read_json(req.stream))
|
||||
cid, msgs = self.claim_ctrl.create(
|
||||
queue_name,
|
||||
metadata=metadata,
|
||||
project=project_id,
|
||||
**claim_options)
|
||||
|
||||
# Buffer claimed messages
|
||||
#TODO(kgriffs): optimize, along with serialization (below)
|
||||
resp_msgs = list(msgs)
|
||||
|
||||
if len(resp_msgs) != 0:
|
||||
for msg in resp_msgs:
|
||||
msg['href'] = _msg_uri_from_claim(
|
||||
req.path.rpartition('/')[0], msg['id'], cid)
|
||||
del msg['id']
|
||||
|
||||
resp.location = req.path + '/' + cid
|
||||
resp.body = helpers.to_json(resp_msgs)
|
||||
resp.status = falcon.HTTP_200
|
||||
else:
|
||||
resp.status = falcon.HTTP_204
|
||||
|
||||
except helpers.MalformedJSON:
|
||||
raise falcon.HTTPBadRequest(_('Bad request'),
|
||||
_('Malformed claim metadata.'))
|
||||
|
||||
except exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound
|
||||
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Service temporarily unavailable')
|
||||
msg = _('Please try again in a few seconds.')
|
||||
raise falcon.HTTPServiceUnavailable(title, msg, 30)
|
||||
description = _('Claim could not be created.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Serialize claimed messages, if any. This logic assumes
|
||||
# the storage driver returned well-formed messages.
|
||||
if len(resp_msgs) != 0:
|
||||
for msg in resp_msgs:
|
||||
msg['href'] = _msg_uri_from_claim(
|
||||
req.path.rpartition('/')[0], msg['id'], cid)
|
||||
|
||||
del msg['id']
|
||||
|
||||
resp.location = req.path + '/' + cid
|
||||
resp.body = helpers.to_json(resp_msgs)
|
||||
resp.status = falcon.HTTP_200
|
||||
else:
|
||||
resp.status = falcon.HTTP_204
|
||||
|
||||
|
||||
class ItemResource(object):
|
||||
@ -87,35 +91,37 @@ class ItemResource(object):
|
||||
claim_id=claim_id,
|
||||
project=project_id)
|
||||
|
||||
# Buffer claimed messages
|
||||
#TODO(kgriffs): Optimize along with serialization (see below)
|
||||
meta['messages'] = list(msgs)
|
||||
for msg in meta['messages']:
|
||||
msg['href'] = _msg_uri_from_claim(
|
||||
req.path.rsplit('/', 2)[0], msg['id'], meta['id'])
|
||||
del msg['id']
|
||||
|
||||
meta['href'] = req.path
|
||||
del meta['id']
|
||||
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(meta)
|
||||
resp.status = falcon.HTTP_200
|
||||
|
||||
except exceptions.DoesNotExist:
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Service temporarily unavailable')
|
||||
msg = _('Please try again in a few seconds.')
|
||||
raise falcon.HTTPServiceUnavailable(title, msg, 30)
|
||||
description = _('Claim could not be queried.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Serialize claimed messages
|
||||
#TODO(kgriffs): Optimize
|
||||
for msg in meta['messages']:
|
||||
msg['href'] = _msg_uri_from_claim(
|
||||
req.path.rsplit('/', 2)[0], msg['id'], meta['id'])
|
||||
del msg['id']
|
||||
|
||||
meta['href'] = req.path
|
||||
del meta['id']
|
||||
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(meta)
|
||||
resp.status = falcon.HTTP_200
|
||||
|
||||
def on_patch(self, req, resp, project_id, queue_name, claim_id):
|
||||
if req.content_length is None or req.content_length == 0:
|
||||
raise falcon.HTTPBadRequest(_('Bad request'),
|
||||
_('Missing claim metadata.'))
|
||||
# Read claim metadata (e.g., TTL) and raise appropriate
|
||||
# HTTP errors as needed.
|
||||
metadata, = wsgi_helpers.filter_stream(req.stream, CLAIM_METADATA_SPEC)
|
||||
|
||||
try:
|
||||
metadata = _filtered(helpers.read_json(req.stream))
|
||||
self.claim_ctrl.update(queue_name,
|
||||
claim_id=claim_id,
|
||||
metadata=metadata,
|
||||
@ -123,18 +129,12 @@ class ItemResource(object):
|
||||
|
||||
resp.status = falcon.HTTP_204
|
||||
|
||||
except helpers.MalformedJSON:
|
||||
raise falcon.HTTPBadRequest(_('Bad request'),
|
||||
_('Malformed claim metadata.'))
|
||||
|
||||
except exceptions.DoesNotExist:
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Service temporarily unavailable')
|
||||
msg = _('Please try again in a few seconds.')
|
||||
raise falcon.HTTPServiceUnavailable(title, msg, 30)
|
||||
description = _('Claim could not be updated.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
def on_delete(self, req, resp, project_id, queue_name, claim_id):
|
||||
try:
|
||||
@ -146,20 +146,11 @@ class ItemResource(object):
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Service temporarily unavailable')
|
||||
msg = _('Please try again in a few seconds.')
|
||||
raise falcon.HTTPServiceUnavailable(title, msg, 30)
|
||||
|
||||
|
||||
def _filtered(obj):
|
||||
try:
|
||||
#TODO(zyuan): verify the TTL
|
||||
return {'ttl': obj['ttl']}
|
||||
|
||||
except Exception:
|
||||
raise helpers.MalformedJSON
|
||||
description = _('Claim could not be deleted.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
|
||||
#TODO(kgriffs): Clean up/optimize and move to wsgi.helpers
|
||||
def _msg_uri_from_claim(base_path, msg_id, claim_id):
|
||||
return '/'.join(
|
||||
[base_path, 'messages', msg_id]
|
||||
|
46
marconi/transport/wsgi/exceptions.py
Normal file
46
marconi/transport/wsgi/exceptions.py
Normal file
@ -0,0 +1,46 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import falcon
|
||||
|
||||
|
||||
class HTTPServiceUnavailable(falcon.HTTPServiceUnavailable):
|
||||
"""Wraps falcon.HTTPServiceUnavailable with Marconi messaging."""
|
||||
|
||||
TITLE = _('Service temporarily unavailable')
|
||||
DESCRIPTION = ('Please try again in a few seconds.')
|
||||
|
||||
def __init__(self, description, retry_after=30):
|
||||
description = description + ' ' + self.DESCRIPTION
|
||||
super(HTTPServiceUnavailable, self).__init__(
|
||||
self.TITLE, description, retry_after)
|
||||
|
||||
|
||||
class HTTPBadRequestBody(falcon.HTTPBadRequest):
|
||||
"""Wraps falcon.HTTPBadRequest with a contextual title."""
|
||||
|
||||
TITLE = _('Invalid request body')
|
||||
|
||||
def __init__(self, description):
|
||||
super(HTTPBadRequestBody, self).__init__(self.TITLE, description)
|
||||
|
||||
|
||||
class HTTPDocumentTypeNotSupported(HTTPBadRequestBody):
|
||||
"""Wraps HTTPBadRequestBody with a standard description."""
|
||||
|
||||
DESCRIPTION = ('Document type not supported.')
|
||||
|
||||
def __init__(self):
|
||||
super(HTTPDocumentTypeNotSupported, self).__init__(self.DESCRIPTION)
|
133
marconi/transport/wsgi/helpers.py
Normal file
133
marconi/transport/wsgi/helpers.py
Normal file
@ -0,0 +1,133 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
#
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import marconi.openstack.common.log as logging
|
||||
|
||||
from marconi.transport import helpers
|
||||
from marconi.transport.wsgi import exceptions
|
||||
|
||||
|
||||
JSONObject = dict
|
||||
"""Represents a JSON object in Python."""
|
||||
|
||||
JSONArray = list
|
||||
"""Represents a JSON array in Phython."""
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
#TODO(kgriffs): Consider moving this to Falcon and/or Oslo
|
||||
def filter_stream(stream, spec, doctype=JSONObject):
|
||||
"""Reads, deserializes, and validates a document from a stream.
|
||||
|
||||
:param stream: file-like object from which to read an object or
|
||||
array of objects.
|
||||
:param spec: iterable describing expected fields, yielding
|
||||
tuples with the form of: (field_name, value_type). Note that
|
||||
value_type may either be a Python type, or the special
|
||||
string '*' to accept any type.
|
||||
:param doctype: type of document to expect; must be either
|
||||
JSONObject or JSONArray.
|
||||
:raises: HTTPBadRequest, HTTPServiceUnavailable
|
||||
:returns: A sanitized, filtered version of the document read
|
||||
from the stream. If the document contains a list of objects,
|
||||
each object will be filtered and yielded in turn. If, on
|
||||
the other hand, the document is expected to contain a
|
||||
single object, that object will be filtered and returned as
|
||||
a single-element iterable.
|
||||
"""
|
||||
|
||||
try:
|
||||
document = helpers.read_json(stream)
|
||||
except helpers.MalformedJSON as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Body could not be parsed.')
|
||||
raise exceptions.HTTPBadRequestBody(description)
|
||||
except Exception as ex:
|
||||
# Error while reading from the network/server
|
||||
LOG.exception(ex)
|
||||
description = _('Request body could not be read.')
|
||||
raise exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
if doctype is JSONObject:
|
||||
if not isinstance(document, JSONObject):
|
||||
raise exceptions.HTTPDocumentTypeNotSupported()
|
||||
|
||||
return (filter(document, spec),)
|
||||
|
||||
if doctype is JSONArray:
|
||||
if not isinstance(document, JSONArray):
|
||||
raise exceptions.HTTPDocumentTypeNotSupported()
|
||||
|
||||
return (filter(obj, spec) for obj in document)
|
||||
|
||||
raise ValueError("doctype not in (JSONObject, JSONArray)")
|
||||
|
||||
|
||||
#TODO(kgriffs): Consider moving this to Falcon and/or Oslo
|
||||
def filter(document, spec):
|
||||
"""Validates and retrieves typed fields from a single document.
|
||||
|
||||
Sanitizes a dict-like document by checking it against a
|
||||
list of field spec, and returning only those fields
|
||||
specified.
|
||||
|
||||
:param document: dict-like object
|
||||
:param spec: iterable describing expected fields, yielding
|
||||
tuples with the form of: (field_name, value_type). Note that
|
||||
value_type may either be a Python type, or the special
|
||||
string '*' to accept any type.
|
||||
:raises: HTTPBadRequest if any field is missing or not an
|
||||
instance of the specified type
|
||||
:returns: A filtered dict containing only the fields
|
||||
listed in the spec
|
||||
"""
|
||||
|
||||
filtered = {}
|
||||
for name, value_type in spec:
|
||||
filtered[name] = get_checked_field(document, name, value_type)
|
||||
|
||||
return filtered
|
||||
|
||||
|
||||
#TODO(kgriffs): Consider moving this to Falcon and/or Oslo
|
||||
def get_checked_field(document, name, value_type):
|
||||
"""Validates and retrieves a typed field from a document.
|
||||
|
||||
This function attempts to look up doc[name], and raises
|
||||
appropriate HTTP errors if the field is missing or not an
|
||||
instance of the given type.
|
||||
|
||||
:param document: dict-like object
|
||||
:param name: field name
|
||||
:param value_type: expected value type, or '*' to accept any type
|
||||
:raises: HTTPBadRequest if the field is missing or not an
|
||||
instance of value_type
|
||||
:returns: value obtained from doc[name]
|
||||
"""
|
||||
|
||||
try:
|
||||
value = document[name]
|
||||
except KeyError:
|
||||
description = _('Missing "{name}" field.').format(name=name)
|
||||
raise exceptions.HTTPBadRequestBody(description)
|
||||
|
||||
if value_type == '*' or isinstance(value, value_type):
|
||||
return value
|
||||
|
||||
description = _('The value of the "{name}" field must be a {vtype}.')
|
||||
description = description.format(name=name, vtype=value_type.__name__)
|
||||
raise exceptions.HTTPBadRequestBody(description)
|
@ -16,11 +16,14 @@
|
||||
import falcon
|
||||
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.storage import exceptions
|
||||
from marconi.storage import exceptions as storage_exceptions
|
||||
from marconi.transport import helpers
|
||||
from marconi.transport.wsgi import exceptions as wsgi_exceptions
|
||||
from marconi.transport.wsgi import helpers as wsgi_helpers
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
MESSAGE_POST_SPEC = (('ttl', int), ('body', '*'))
|
||||
|
||||
|
||||
class CollectionResource(object):
|
||||
@ -33,50 +36,40 @@ class CollectionResource(object):
|
||||
def on_post(self, req, resp, project_id, queue_name):
|
||||
uuid = req.get_header('Client-ID', required=True)
|
||||
|
||||
if req.content_length is None or req.content_length == 0:
|
||||
raise falcon.HTTPBadRequest(_('Bad request'),
|
||||
_('Missing message contents.'))
|
||||
|
||||
def filtered(ls):
|
||||
try:
|
||||
if len(ls) < 1:
|
||||
raise helpers.MalformedJSON
|
||||
|
||||
for m in ls:
|
||||
#TODO(zyuan): verify the TTL values
|
||||
yield {'ttl': m['ttl'], 'body': m['body']}
|
||||
|
||||
except Exception:
|
||||
raise helpers.MalformedJSON
|
||||
# Pull out just the fields we care about
|
||||
messages = wsgi_helpers.filter_stream(
|
||||
req.stream,
|
||||
MESSAGE_POST_SPEC,
|
||||
doctype=wsgi_helpers.JSONArray)
|
||||
|
||||
# Enqueue the messages
|
||||
try:
|
||||
ls = filtered(helpers.read_json(req.stream))
|
||||
ns = self.msg_ctrl.post(queue_name,
|
||||
messages=ls,
|
||||
project=project_id,
|
||||
client_uuid=uuid)
|
||||
|
||||
resp.location = req.path + '/' + ','.join(
|
||||
[n.encode('utf-8') for n in ns])
|
||||
resp.status = falcon.HTTP_201
|
||||
|
||||
except helpers.MalformedJSON:
|
||||
raise falcon.HTTPBadRequest(_('Bad request'),
|
||||
_('Malformed messages.'))
|
||||
|
||||
except exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound
|
||||
message_ids = self.msg_ctrl.post(queue_name,
|
||||
messages=messages,
|
||||
project=project_id,
|
||||
client_uuid=uuid)
|
||||
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Service temporarily unavailable')
|
||||
msg = _('Please try again in a few seconds.')
|
||||
raise falcon.HTTPServiceUnavailable(title, msg, 30)
|
||||
description = _('Messages could not be enqueued.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# See if anything happened
|
||||
if len(message_ids) == 0:
|
||||
description = _('No messages to enqueue.')
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
|
||||
#TODO(kgriffs): Optimize
|
||||
resource = ','.join([id.encode('utf-8') for id in message_ids])
|
||||
resp.location = req.path + '/' + resource
|
||||
resp.status = falcon.HTTP_201
|
||||
|
||||
def on_get(self, req, resp, project_id, queue_name):
|
||||
uuid = req.get_header('Client-ID', required=True)
|
||||
|
||||
#TODO(zyuan): where do we define the limits?
|
||||
#TODO(kgriffs): Optimize
|
||||
kwargs = helpers.purge({
|
||||
'marker': req.get_param('marker'),
|
||||
'limit': req.get_param_as_int('limit'),
|
||||
@ -84,41 +77,45 @@ class CollectionResource(object):
|
||||
})
|
||||
|
||||
try:
|
||||
interaction = self.msg_ctrl.list(queue_name,
|
||||
project=project_id,
|
||||
client_uuid=uuid,
|
||||
**kwargs)
|
||||
resp_dict = {
|
||||
'messages': list(interaction.next())
|
||||
}
|
||||
results = self.msg_ctrl.list(queue_name,
|
||||
project=project_id,
|
||||
client_uuid=uuid,
|
||||
**kwargs)
|
||||
# Buffer messages
|
||||
cursor = results.next()
|
||||
messages = list(cursor)
|
||||
|
||||
if len(resp_dict['messages']) != 0:
|
||||
kwargs['marker'] = interaction.next()
|
||||
for msg in resp_dict['messages']:
|
||||
msg['href'] = req.path + '/' + msg['id']
|
||||
del msg['id']
|
||||
|
||||
resp_dict['links'] = [
|
||||
{
|
||||
'rel': 'next',
|
||||
'href': req.path + falcon.to_query_str(kwargs)
|
||||
}
|
||||
]
|
||||
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(resp_dict)
|
||||
resp.status = falcon.HTTP_200
|
||||
else:
|
||||
resp.status = falcon.HTTP_204
|
||||
|
||||
except exceptions.DoesNotExist:
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Service temporarily unavailable')
|
||||
msg = _('Please try again in a few seconds.')
|
||||
raise falcon.HTTPServiceUnavailable(title, msg, 30)
|
||||
description = _('Messages could not be listed.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Check for no content
|
||||
if len(messages) == 0:
|
||||
resp.status = falcon.HTTP_204
|
||||
return
|
||||
|
||||
# Found some messages, so prepare the response
|
||||
kwargs['marker'] = results.next()
|
||||
for each_message in messages:
|
||||
each_message['href'] = req.path + '/' + each_message['id']
|
||||
del each_message['id']
|
||||
|
||||
response_body = {
|
||||
'messages': messages,
|
||||
'links': [
|
||||
{
|
||||
'rel': 'next',
|
||||
'href': req.path + falcon.to_query_str(kwargs)
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(response_body)
|
||||
resp.status = falcon.HTTP_200
|
||||
|
||||
|
||||
class ItemResource(object):
|
||||
@ -130,25 +127,24 @@ class ItemResource(object):
|
||||
|
||||
def on_get(self, req, resp, project_id, queue_name, message_id):
|
||||
try:
|
||||
msg = self.msg_ctrl.get(queue_name,
|
||||
message_id=message_id,
|
||||
project=project_id)
|
||||
|
||||
msg['href'] = req.path
|
||||
del msg['id']
|
||||
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(msg)
|
||||
resp.status = falcon.HTTP_200
|
||||
|
||||
except exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound
|
||||
message = self.msg_ctrl.get(queue_name,
|
||||
message_id=message_id,
|
||||
project=project_id)
|
||||
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Service temporarily unavailable')
|
||||
msg = _('Please try again in a few seconds.')
|
||||
raise falcon.HTTPServiceUnavailable(title, msg, 30)
|
||||
description = _('Message could not be retrieved.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Prepare response
|
||||
message['href'] = req.path
|
||||
del message['id']
|
||||
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(message)
|
||||
resp.status = falcon.HTTP_200
|
||||
|
||||
def on_delete(self, req, resp, project_id, queue_name, message_id):
|
||||
try:
|
||||
@ -157,13 +153,16 @@ class ItemResource(object):
|
||||
project=project_id,
|
||||
claim=req.get_param('claim_id'))
|
||||
|
||||
resp.status = falcon.HTTP_204
|
||||
|
||||
except exceptions.NotPermitted:
|
||||
resp.status = falcon.HTTP_403
|
||||
|
||||
except storage_exceptions.NotPermitted as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Invalid claim')
|
||||
description = _('The specified claim either does not '
|
||||
'exist or has expired.')
|
||||
raise falcon.HTTPForbidden(title, description)
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Service temporarily unavailable')
|
||||
msg = _('Please try again in a few seconds.')
|
||||
raise falcon.HTTPServiceUnavailable(title, msg, 30)
|
||||
description = _('Message could not be deleted.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Alles guete
|
||||
resp.status = falcon.HTTP_204
|
||||
|
@ -16,9 +16,10 @@
|
||||
import falcon
|
||||
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.storage import exceptions
|
||||
from marconi.storage import exceptions as storage_exceptions
|
||||
from marconi import transport
|
||||
from marconi.transport import helpers
|
||||
from marconi.transport.wsgi import exceptions as wsgi_exceptions
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -32,48 +33,53 @@ class ItemResource(object):
|
||||
self.queue_ctrl = queue_controller
|
||||
|
||||
def on_put(self, req, resp, project_id, queue_name):
|
||||
#TODO(kgriffs): Migrate this check to input validator middleware
|
||||
if req.content_length > transport.MAX_QUEUE_METADATA_SIZE:
|
||||
raise falcon.HTTPBadRequest(_('Bad request'),
|
||||
_('Queue metadata size is too large.'))
|
||||
|
||||
if req.content_length is None or req.content_length == 0:
|
||||
raise falcon.HTTPBadRequest(_('Bad request'),
|
||||
_('Missing queue metadata.'))
|
||||
description = _('Queue metadata size is too large.')
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
|
||||
# Deserialize queue metadata
|
||||
try:
|
||||
metadata = helpers.read_json(req.stream)
|
||||
except helpers.MalformedJSON:
|
||||
description = _('Request body could not be parsed.')
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Request body could not be read.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Metadata must be a JSON object
|
||||
if not isinstance(metadata, dict):
|
||||
description = _('Queue metadata must be an object.')
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
|
||||
# Create or update the queue
|
||||
try:
|
||||
metadata = _filtered(helpers.read_json(req.stream))
|
||||
created = self.queue_ctrl.upsert(queue_name,
|
||||
metadata=metadata,
|
||||
project=project_id)
|
||||
except helpers.MalformedJSON:
|
||||
raise falcon.HTTPBadRequest(_('Bad request'),
|
||||
_('Malformed queue metadata.'))
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Service temporarily unavailable')
|
||||
msg = _('Please try again in a few seconds.')
|
||||
raise falcon.HTTPServiceUnavailable(title, msg, 30)
|
||||
description = _('Queue could not be created.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
resp.status = falcon.HTTP_201 if created else falcon.HTTP_204
|
||||
resp.location = req.path
|
||||
|
||||
def on_get(self, req, resp, project_id, queue_name):
|
||||
try:
|
||||
doc = self.queue_ctrl.get(queue_name,
|
||||
project=project_id)
|
||||
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(doc)
|
||||
|
||||
except exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound
|
||||
|
||||
doc = self.queue_ctrl.get(queue_name, project=project_id)
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Service temporarily unavailable')
|
||||
msg = _('Please try again in a few seconds.')
|
||||
raise falcon.HTTPServiceUnavailable(title, msg, 30)
|
||||
description = _('Queue metdata could not be retrieved.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(doc)
|
||||
|
||||
def on_delete(self, req, resp, project_id, queue_name):
|
||||
try:
|
||||
@ -82,9 +88,8 @@ class ItemResource(object):
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Service temporarily unavailable')
|
||||
msg = _('Please try again in a few seconds.')
|
||||
raise falcon.HTTPServiceUnavailable(title, msg, 30)
|
||||
description = _('Queue could not be deleted.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
resp.status = falcon.HTTP_204
|
||||
|
||||
@ -97,7 +102,7 @@ class CollectionResource(object):
|
||||
self.queue_ctrl = queue_controller
|
||||
|
||||
def on_get(self, req, resp, project_id):
|
||||
#TODO(zyuan): where do we define the limits?
|
||||
#TODO(kgriffs): Optimize
|
||||
kwargs = helpers.purge({
|
||||
'marker': req.get_param('marker'),
|
||||
'limit': req.get_param_as_int('limit'),
|
||||
@ -105,40 +110,35 @@ class CollectionResource(object):
|
||||
})
|
||||
|
||||
try:
|
||||
interaction = self.queue_ctrl.list(project=project_id, **kwargs)
|
||||
|
||||
resp_dict = {
|
||||
'queues': list(interaction.next())
|
||||
}
|
||||
|
||||
if len(resp_dict['queues']) != 0:
|
||||
kwargs['marker'] = interaction.next()
|
||||
for queue in resp_dict['queues']:
|
||||
queue['href'] = req.path + '/' + queue['name']
|
||||
|
||||
resp_dict['links'] = [
|
||||
{
|
||||
'rel': 'next',
|
||||
'href': req.path + falcon.to_query_str(kwargs)
|
||||
}
|
||||
]
|
||||
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(resp_dict)
|
||||
resp.status = falcon.HTTP_200
|
||||
else:
|
||||
resp.status = falcon.HTTP_204
|
||||
|
||||
results = self.queue_ctrl.list(project=project_id, **kwargs)
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
title = _('Service temporarily unavailable')
|
||||
msg = _('Please try again in a few seconds.')
|
||||
raise falcon.HTTPServiceUnavailable(title, msg, 30)
|
||||
description = _('Queues could not be listed.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Buffer list of queues
|
||||
queues = list(results.next())
|
||||
|
||||
def _filtered(obj):
|
||||
#TODO(zyuan): remove this check once we have a reserved field
|
||||
if type(obj) is not dict:
|
||||
raise helpers.MalformedJSON
|
||||
# Check for an empty list
|
||||
if len(queues) == 0:
|
||||
resp.status = falcon.HTTP_204
|
||||
return
|
||||
|
||||
return obj
|
||||
# Got some. Prepare the response.
|
||||
kwargs['marker'] = results.next()
|
||||
for each_queue in queues:
|
||||
each_queue['href'] = req.path + '/' + each_queue['name']
|
||||
|
||||
response_body = {
|
||||
'queues': queues,
|
||||
'links': [
|
||||
{
|
||||
'rel': 'next',
|
||||
'href': req.path + falcon.to_query_str(kwargs)
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(response_body)
|
||||
resp.status = falcon.HTTP_200
|
||||
|
Loading…
x
Reference in New Issue
Block a user