feat(validation): validate user input
The validation is implemented as a set of free-standing functions in a separated module; each driver which uses storage functions should call call the corresponding validation functions first. Fixes: bug #1177971 Fixes: bug #1177976 Fixes: bug #1177979 Fixes: bug #1206175 Fixes: bug #1208873 Change-Id: I0b9c9766a9c3de42f2a2eb470cba0ab7265e669a
This commit is contained in:
parent
2248203b9e
commit
f1e71796b4
@ -20,3 +20,7 @@ class InvalidDriver(Exception):
|
||||
|
||||
class PatternNotFound(Exception):
|
||||
"""A string did not match the expected pattern or regex."""
|
||||
|
||||
|
||||
class ValidationFailed(ValueError):
|
||||
"""User input exceeds the API restrictions."""
|
||||
|
6
marconi/tests/etc/wsgi_sqlite_validation.conf
Normal file
6
marconi/tests/etc/wsgi_sqlite_validation.conf
Normal file
@ -0,0 +1,6 @@
|
||||
[drivers]
|
||||
transport = wsgi
|
||||
storage = sqlite
|
||||
|
||||
[limits:transport]
|
||||
message_size_uplimit = 256
|
@ -42,7 +42,7 @@ class ClaimsBaseTest(base.TestBase):
|
||||
self.simulate_put(self.queue_path, self.project_id, body=doc)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
doc = json.dumps([{'body': 239, 'ttl': 30}] * 10)
|
||||
doc = json.dumps([{'body': 239, 'ttl': 300}] * 10)
|
||||
self.simulate_post(self.queue_path + '/messages', self.project_id,
|
||||
body=doc, headers={'Client-ID': '30387f00'})
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
@ -58,15 +58,35 @@ class ClaimsBaseTest(base.TestBase):
|
||||
body=doc)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
# Payload exceeded
|
||||
self.simulate_post(self.claims_path, self.project_id,
|
||||
body='{"ttl": 100, "grace": 60}',
|
||||
query_string='limit=21')
|
||||
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
# Unacceptable TTL or grace
|
||||
for ttl, grace in ((-1, -1), (59, 60), (60, 59),
|
||||
(60, 43201), (43201, 60)):
|
||||
self.simulate_post(self.claims_path, self.project_id,
|
||||
body=json.dumps({'ttl': ttl, 'grace': grace}))
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_bad_patch(self):
|
||||
self.simulate_post(self.claims_path, self.project_id,
|
||||
body='{"ttl": 10, "grace": 30}')
|
||||
body='{"ttl": 100, "grace": 60}')
|
||||
href = self.srmock.headers_dict['Location']
|
||||
|
||||
for doc in (None, '[', '"crunchy"'):
|
||||
self.simulate_patch(href, self.project_id, body=doc)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
# Unacceptable new TTL
|
||||
for ttl in (-1, 59, 43201):
|
||||
self.simulate_post(self.claims_path, self.project_id,
|
||||
body=json.dumps({'ttl': ttl}))
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_too_much_metadata(self):
|
||||
doc = '{"ttl": 100, "grace": 60}'
|
||||
long_doc = doc + (' ' *
|
||||
@ -82,7 +102,7 @@ class ClaimsBaseTest(base.TestBase):
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_lifecycle(self):
|
||||
doc = '{"ttl": 10, "grace": 30}'
|
||||
doc = '{"ttl": 100, "grace": 60}'
|
||||
|
||||
# First, claim some messages
|
||||
body = self.simulate_post(self.claims_path, self.project_id, body=doc)
|
||||
@ -117,7 +137,7 @@ class ClaimsBaseTest(base.TestBase):
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||
self.assertEquals(self.srmock.headers_dict['Content-Location'],
|
||||
claim_href)
|
||||
self.assertEquals(claim['ttl'], 10)
|
||||
self.assertEquals(claim['ttl'], 100)
|
||||
|
||||
# Delete the message and its associated claim
|
||||
self.simulate_delete(message_href, self.project_id,
|
||||
@ -170,7 +190,7 @@ class ClaimsBaseTest(base.TestBase):
|
||||
|
||||
def test_nonexistent(self):
|
||||
self.simulate_post('/v1/queues/nonexistent/claims', self.project_id,
|
||||
body='{"ttl": 10, "grace": 30}')
|
||||
body='{"ttl": 100, "grace": 60}')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
|
||||
@ -203,7 +223,7 @@ class ClaimsFaultyDriverTests(base.TestBaseFaulty):
|
||||
def test_simple(self):
|
||||
project_id = '480924'
|
||||
claims_path = '/v1/queues/fizbit/claims'
|
||||
doc = '{"ttl": 100, "grace": 30}'
|
||||
doc = '{"ttl": 100, "grace": 60}'
|
||||
|
||||
self.simulate_post(claims_path, project_id, body=doc)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_503)
|
||||
|
@ -96,18 +96,48 @@ class MessagesBaseTest(base.TestBase):
|
||||
actual_ttls = set(m['ttl'] for m in result_doc)
|
||||
self.assertFalse(expected_ttls - actual_ttls)
|
||||
|
||||
def test_exceeded_payloads(self):
|
||||
# Get a valid message id
|
||||
path = self.queue_path + '/messages'
|
||||
self._post_messages(path)
|
||||
|
||||
msg_id = self._get_msg_id(self.srmock.headers_dict)
|
||||
|
||||
# Posting restriction
|
||||
self._post_messages(path, repeat=23)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
# Bulk GET restriction
|
||||
query_string = 'ids=' + ','.join([msg_id] * 21)
|
||||
self.simulate_get(path, self.project_id, query_string=query_string)
|
||||
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
# Listing restriction
|
||||
self.simulate_get(path, self.project_id,
|
||||
query_string='limit=21',
|
||||
headers=self.headers)
|
||||
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
# Bulk deletion restriction
|
||||
query_string = 'ids=' + ','.join([msg_id] * 22)
|
||||
self.simulate_delete(path, self.project_id, query_string=query_string)
|
||||
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_post_single(self):
|
||||
sample_messages = [
|
||||
{'body': {'key': 'value'}, 'ttl': 20},
|
||||
{'body': {'key': 'value'}, 'ttl': 200},
|
||||
]
|
||||
|
||||
self._test_post(sample_messages)
|
||||
|
||||
def test_post_multiple(self):
|
||||
sample_messages = [
|
||||
{'body': 239, 'ttl': 10},
|
||||
{'body': {'key': 'value'}, 'ttl': 20},
|
||||
{'body': [1, 3], 'ttl': 30},
|
||||
{'body': 239, 'ttl': 100},
|
||||
{'body': {'key': 'value'}, 'ttl': 200},
|
||||
{'body': [1, 3], 'ttl': 300},
|
||||
]
|
||||
|
||||
self._test_post(sample_messages)
|
||||
@ -117,15 +147,25 @@ class MessagesBaseTest(base.TestBase):
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
def test_post_bad_message(self):
|
||||
messages_path = self.queue_path + '/messages'
|
||||
|
||||
for document in (None, '[', '[]', '{}', '.'):
|
||||
self.simulate_post(self.queue_path + '/messages',
|
||||
self.simulate_post(messages_path,
|
||||
body=document,
|
||||
headers=self.headers)
|
||||
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
# Unacceptable TTL
|
||||
for ttl in (-1, 59, 1209601):
|
||||
self.simulate_post(messages_path,
|
||||
body=json.dumps([{'ttl': ttl,
|
||||
'body': None}]),
|
||||
headers=self.headers)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_exceeded_message_posting(self):
|
||||
#TODO(zyuan): read `20` from the input validation module
|
||||
# Total (raw request) size
|
||||
doc = json.dumps([{'body': "some body", 'ttl': 100}] * 20, indent=4)
|
||||
long_doc = doc + (' ' *
|
||||
(self.wsgi_cfg.content_max_length - len(doc) + 1))
|
||||
@ -136,6 +176,16 @@ class MessagesBaseTest(base.TestBase):
|
||||
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
# Each message's size
|
||||
for long_body in ('a' * 255, # +2 string quotes
|
||||
{'a': 0, 'b': 'x' * 243}): # w/o whitespaces
|
||||
doc = json.dumps([{'body': long_body, 'ttl': 100}])
|
||||
self.simulate_post(self.queue_path + '/messages',
|
||||
body=doc,
|
||||
headers=self.headers)
|
||||
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_unsupported_json(self):
|
||||
for document in ('{"overflow": 9223372036854775808}',
|
||||
'{"underflow": -9223372036854775809}'):
|
||||
@ -254,7 +304,7 @@ class MessagesBaseTest(base.TestBase):
|
||||
def test_no_uuid(self):
|
||||
path = self.queue_path + '/messages'
|
||||
|
||||
self.simulate_post(path, '7e7e7e', body='[{"body": 0, "ttl": 0}]')
|
||||
self.simulate_post(path, '7e7e7e', body='[{"body": 0, "ttl": 100}]')
|
||||
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
@ -262,7 +312,7 @@ class MessagesBaseTest(base.TestBase):
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def _post_messages(self, target, repeat=1):
|
||||
doc = json.dumps([{'body': 239, 'ttl': 30}] * repeat)
|
||||
doc = json.dumps([{'body': 239, 'ttl': 300}] * repeat)
|
||||
self.simulate_post(target, self.project_id, body=doc,
|
||||
headers=self.headers)
|
||||
|
||||
@ -275,7 +325,7 @@ class MessagesBaseTest(base.TestBase):
|
||||
|
||||
class MessagesSQLiteTests(MessagesBaseTest):
|
||||
|
||||
config_filename = 'wsgi_sqlite.conf'
|
||||
config_filename = 'wsgi_sqlite_validation.conf'
|
||||
|
||||
|
||||
class MessagesMongoDBTests(MessagesBaseTest):
|
||||
@ -296,7 +346,7 @@ class MessagesFaultyDriverTests(base.TestBaseFaulty):
|
||||
def test_simple(self):
|
||||
project_id = 'xyz'
|
||||
path = '/v1/queues/fizbit/messages'
|
||||
doc = '[{"body": 239, "ttl": 10}]'
|
||||
doc = '[{"body": 239, "ttl": 100}]'
|
||||
headers = {
|
||||
'Client-ID': '30387f00',
|
||||
}
|
||||
|
@ -84,10 +84,26 @@ class QueueLifecycleBaseTest(base.TestBase):
|
||||
self.simulate_get(path + '/metadata', project_id)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
def test_name_restrictions(self):
|
||||
self.simulate_put('/v1/queues/Nice-Boat_2')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
self.simulate_put('/v1/queues/Nice-Bo@t')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
self.simulate_put('/v1/queues/_' + 'niceboat' * 8)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_no_metadata(self):
|
||||
self.simulate_put('/v1/queues/fizbat')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
self.simulate_put('/v1/queues/fizbat/metadata')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
self.simulate_put('/v1/queues/fizbat/metadata', body='')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_bad_metadata(self):
|
||||
self.simulate_put('/v1/queues/fizbat', '7e55e1a7e')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
@ -166,6 +182,10 @@ class QueueLifecycleBaseTest(base.TestBase):
|
||||
self.simulate_get('/v1/queues', project_id)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
# Payload exceeded
|
||||
self.simulate_get('/v1/queues', project_id, query_string='limit=21')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
# Create some
|
||||
self.simulate_put('/v1/queues/q1', project_id, body='{"_ttl": 30 }')
|
||||
self.simulate_put('/v1/queues/q2', project_id, body='{}')
|
||||
|
@ -158,3 +158,17 @@ class TestWSGIutils(testtools.TestCase):
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
utils.filter_stream, stream, len(document), spec,
|
||||
doctype=utils.JSONObject)
|
||||
|
||||
def test_filter_stream_wrong_use(self):
|
||||
document = u'3'
|
||||
stream = io.StringIO(document)
|
||||
spec = None
|
||||
self.assertRaises(TypeError,
|
||||
utils.filter_stream, stream, len(document), spec,
|
||||
doctype=int)
|
||||
|
||||
def test_filter_stream_no_reading(self):
|
||||
stream = None
|
||||
length = None
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
utils.filter_stream, stream, length, None)
|
||||
|
149
marconi/transport/validation.py
Normal file
149
marconi/transport/validation.py
Normal file
@ -0,0 +1,149 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import re
|
||||
|
||||
import simplejson as json
|
||||
|
||||
from marconi.common import config
|
||||
from marconi.common import exceptions
|
||||
|
||||
OPTIONS = {
|
||||
'queue_payload_uplimit': 20,
|
||||
'message_payload_uplimit': 20,
|
||||
'message_size_uplimit': 256 * 1024,
|
||||
'message_ttl_max': 1209600,
|
||||
'claim_ttl_max': 43200,
|
||||
'claim_grace_max': 43200,
|
||||
}
|
||||
|
||||
CFG = config.namespace('limits:transport').from_options(**OPTIONS)
|
||||
|
||||
QUEUE_NAME_REGEX = re.compile('^[\w-]+$')
|
||||
|
||||
|
||||
def queue_creation(name):
|
||||
"""Restrictions on a queue name.
|
||||
|
||||
:param name: The queue name
|
||||
:raises: ValidationFailed if the name is longer than 64 bytes or
|
||||
contains bytes other than ASCII digits, letters, underscore,
|
||||
and dash.
|
||||
"""
|
||||
|
||||
if len(name) > 64:
|
||||
raise exceptions.ValidationFailed(
|
||||
'queue name longer than 64 bytes')
|
||||
|
||||
if not QUEUE_NAME_REGEX.match(name):
|
||||
raise exceptions.ValidationFailed(
|
||||
'queue name contains forbidden characters')
|
||||
|
||||
|
||||
def queue_listing(limit=None, **kwargs):
|
||||
"""Restrictions involving a list of queues.
|
||||
|
||||
:param limit: The expected number of queues in the list
|
||||
:param kwargs: Ignored arguments passed to storage API
|
||||
:raises: ValidationFailed if the limit is exceeded
|
||||
"""
|
||||
|
||||
if limit is not None and not (0 < limit <= CFG.queue_payload_uplimit):
|
||||
raise exceptions.ValidationFailed(
|
||||
'queue payload count not in (0, %d]' %
|
||||
CFG.queue_payload_uplimit)
|
||||
|
||||
|
||||
def message_posting(messages, check_size=True):
|
||||
"""Restrictions on a list of messages.
|
||||
|
||||
:param messages: A list of messages
|
||||
:param check_size: Whether the size checking for each message
|
||||
is required
|
||||
:raises: ValidationFailed if any message has a out-of-range
|
||||
TTL, or an oversize message body.
|
||||
"""
|
||||
|
||||
message_listing(limit=len(messages))
|
||||
|
||||
for msg in messages:
|
||||
message_content(msg, check_size)
|
||||
|
||||
|
||||
def message_content(message, check_size):
|
||||
"""Restrictions on each message."""
|
||||
|
||||
if not (60 <= message['ttl'] <= CFG.message_ttl_max):
|
||||
raise exceptions.ValidationFailed(
|
||||
'message TTL not in [60, %d]' %
|
||||
CFG.message_ttl_max)
|
||||
|
||||
if check_size:
|
||||
# UTF-8 encoded, without whitespace
|
||||
# TODO(zyuan): Replace this redundent re-serialization
|
||||
# with a sizing-only parser.
|
||||
body_length = len(json.dumps(message['body'],
|
||||
ensure_ascii=False,
|
||||
separators=(',', ':')))
|
||||
if body_length > CFG.message_size_uplimit:
|
||||
raise exceptions.ValidationFailed(
|
||||
'message body larger than %d bytes' %
|
||||
CFG.message_size_uplimit)
|
||||
|
||||
|
||||
def message_listing(limit=None, **kwargs):
|
||||
"""Restrictions involving a list of messages.
|
||||
|
||||
:param limit: The expected number of messages in the list
|
||||
:param kwargs: Ignored arguments passed to storage API
|
||||
:raises: ValidationFailed if the limit is exceeded
|
||||
"""
|
||||
|
||||
if limit is not None and not (0 < limit <= CFG.message_payload_uplimit):
|
||||
raise exceptions.ValidationFailed(
|
||||
'message payload count not in (0, %d]' %
|
||||
CFG.message_payload_uplimit)
|
||||
|
||||
|
||||
def claim_creation(metadata, **kwargs):
|
||||
"""Restrictions on the claim parameters upon creation.
|
||||
|
||||
:param metadata: The claim metadata
|
||||
:param kwargs: Other arguments passed to storage API
|
||||
:raises: ValidationFailed if either TTL or grace is out of range,
|
||||
or the expected number of messages exceed the limit.
|
||||
"""
|
||||
|
||||
message_listing(**kwargs)
|
||||
claim_updating(metadata)
|
||||
|
||||
if not (60 <= metadata['grace'] <= CFG.claim_grace_max):
|
||||
raise exceptions.ValidationFailed(
|
||||
'claim grace not in [60, %d]' %
|
||||
CFG.claim_grace_max)
|
||||
|
||||
|
||||
def claim_updating(metadata):
|
||||
"""Restrictions on the claim TTL.
|
||||
|
||||
:param metadata: The claim metadata
|
||||
:param kwargs: Ignored arguments passed to storage API
|
||||
:raises: ValidationFailed if the TTL is out of range
|
||||
"""
|
||||
|
||||
if not (60 <= metadata['ttl'] <= CFG.claim_ttl_max):
|
||||
raise exceptions.ValidationFailed(
|
||||
'claim TTL not in [60, %d]' %
|
||||
CFG.claim_ttl_max)
|
@ -16,9 +16,11 @@
|
||||
import falcon
|
||||
|
||||
from marconi.common import config
|
||||
from marconi.common import exceptions as input_exceptions
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.storage import exceptions as storage_exceptions
|
||||
from marconi.transport import utils
|
||||
from marconi.transport import validation
|
||||
from marconi.transport.wsgi import exceptions as wsgi_exceptions
|
||||
from marconi.transport.wsgi import utils as wsgi_utils
|
||||
|
||||
@ -60,6 +62,7 @@ class CollectionResource(object):
|
||||
|
||||
# Claim some messages
|
||||
try:
|
||||
validation.claim_creation(metadata, **claim_options)
|
||||
cid, msgs = self.claim_controller.create(
|
||||
queue_name,
|
||||
metadata=metadata,
|
||||
@ -70,8 +73,12 @@ class CollectionResource(object):
|
||||
# TODO(kgriffs): optimize, along with serialization (below)
|
||||
resp_msgs = list(msgs)
|
||||
|
||||
except input_exceptions.ValidationFailed as ex:
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(str(ex))
|
||||
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Claim could not be created.')
|
||||
@ -155,6 +162,7 @@ class ItemResource(object):
|
||||
CLAIM_PATCH_SPEC)
|
||||
|
||||
try:
|
||||
validation.claim_updating(metadata)
|
||||
self.claim_controller.update(queue_name,
|
||||
claim_id=claim_id,
|
||||
metadata=metadata,
|
||||
@ -162,8 +170,12 @@ class ItemResource(object):
|
||||
|
||||
resp.status = falcon.HTTP_204
|
||||
|
||||
except input_exceptions.ValidationFailed as ex:
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(str(ex))
|
||||
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Claim could not be updated.')
|
||||
|
@ -16,9 +16,11 @@
|
||||
import falcon
|
||||
|
||||
from marconi.common import config
|
||||
from marconi.common import exceptions as input_exceptions
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.storage import exceptions as storage_exceptions
|
||||
from marconi.transport import utils
|
||||
from marconi.transport import validation
|
||||
from marconi.transport.wsgi import exceptions as wsgi_exceptions
|
||||
from marconi.transport.wsgi import utils as wsgi_utils
|
||||
|
||||
@ -45,11 +47,15 @@ class CollectionResource(object):
|
||||
def _get_by_id(self, base_path, project_id, queue_name, ids):
|
||||
"""Returns one or more messages from the queue by ID."""
|
||||
try:
|
||||
validation.message_listing(limit=len(ids))
|
||||
messages = self.message_controller.bulk_get(
|
||||
queue_name,
|
||||
message_ids=ids,
|
||||
project=project_id)
|
||||
|
||||
except input_exceptions.ValidationFailed as ex:
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(str(ex))
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Message could not be retrieved.')
|
||||
@ -79,6 +85,7 @@ class CollectionResource(object):
|
||||
})
|
||||
|
||||
try:
|
||||
validation.message_listing(**kwargs)
|
||||
results = self.message_controller.list(
|
||||
queue_name,
|
||||
project=project_id,
|
||||
@ -89,6 +96,9 @@ class CollectionResource(object):
|
||||
cursor = next(results)
|
||||
messages = list(cursor)
|
||||
|
||||
except input_exceptions.ValidationFailed as ex:
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(str(ex))
|
||||
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
@ -161,14 +171,24 @@ class CollectionResource(object):
|
||||
partial = False
|
||||
|
||||
try:
|
||||
# No need to check each message's size if it
|
||||
# can not exceed the request size limit
|
||||
validation.message_posting(
|
||||
messages, check_size=(
|
||||
validation.CFG.message_size_uplimit <
|
||||
CFG.content_max_length))
|
||||
message_ids = self.message_controller.post(
|
||||
queue_name,
|
||||
messages=messages,
|
||||
project=project_id,
|
||||
client_uuid=uuid)
|
||||
|
||||
except input_exceptions.ValidationFailed as ex:
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(str(ex))
|
||||
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
except storage_exceptions.MessageConflict as ex:
|
||||
LOG.exception(ex)
|
||||
partial = True
|
||||
@ -221,11 +241,15 @@ class CollectionResource(object):
|
||||
ids = req.get_param_as_list('ids', required=True)
|
||||
|
||||
try:
|
||||
validation.message_listing(limit=len(ids))
|
||||
self.message_controller.bulk_delete(
|
||||
queue_name,
|
||||
message_ids=ids,
|
||||
project=project_id)
|
||||
|
||||
except input_exceptions.ValidationFailed as ex:
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(str(ex))
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = 'Messages could not be deleted.'
|
||||
|
@ -20,6 +20,7 @@ import marconi.openstack.common.log as logging
|
||||
from marconi.storage import exceptions as storage_exceptions
|
||||
from marconi.transport import utils
|
||||
from marconi.transport.wsgi import exceptions as wsgi_exceptions
|
||||
from marconi.transport.wsgi import utils as wsgi_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -66,20 +67,9 @@ class Resource(object):
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
|
||||
# Deserialize queue metadata
|
||||
try:
|
||||
metadata = utils.read_json(req.stream, req.content_length)
|
||||
except utils.MalformedJSON:
|
||||
description = _('Request body could not be parsed.')
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Request body could not be read.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Metadata must be a JSON object
|
||||
if not isinstance(metadata, dict):
|
||||
description = _('Queue metadata must be an object.')
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
metadata, = wsgi_utils.filter_stream(req.stream,
|
||||
req.content_length,
|
||||
spec=None)
|
||||
|
||||
try:
|
||||
self.queue_ctrl.set_metadata(queue_name,
|
||||
|
@ -15,8 +15,10 @@
|
||||
|
||||
import falcon
|
||||
|
||||
from marconi.common import exceptions as input_exceptions
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.transport import utils
|
||||
from marconi.transport import validation
|
||||
from marconi.transport.wsgi import exceptions as wsgi_exceptions
|
||||
|
||||
|
||||
@ -37,10 +39,14 @@ class ItemResource(object):
|
||||
{"queue": queue_name, "project": project_id})
|
||||
|
||||
try:
|
||||
validation.queue_creation(name=queue_name)
|
||||
created = self.queue_controller.create(
|
||||
queue_name,
|
||||
project=project_id)
|
||||
|
||||
except input_exceptions.ValidationFailed as ex:
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(str(ex))
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Queue could not be created.')
|
||||
@ -95,7 +101,12 @@ class CollectionResource(object):
|
||||
})
|
||||
|
||||
try:
|
||||
validation.queue_listing(**kwargs)
|
||||
results = self.queue_controller.list(project=project_id, **kwargs)
|
||||
|
||||
except input_exceptions.ValidationFailed as ex:
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(str(ex))
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Queues could not be listed.')
|
||||
|
Loading…
x
Reference in New Issue
Block a user