diff --git a/marconi/common/exceptions.py b/marconi/common/exceptions.py index 5c1db2cbe..639d4298d 100644 --- a/marconi/common/exceptions.py +++ b/marconi/common/exceptions.py @@ -20,3 +20,7 @@ class InvalidDriver(Exception): class PatternNotFound(Exception): """A string did not match the expected pattern or regex.""" + + +class ValidationFailed(ValueError): + """User input exceeds the API restrictions.""" diff --git a/marconi/tests/etc/wsgi_sqlite_validation.conf b/marconi/tests/etc/wsgi_sqlite_validation.conf new file mode 100644 index 000000000..293d56acd --- /dev/null +++ b/marconi/tests/etc/wsgi_sqlite_validation.conf @@ -0,0 +1,6 @@ +[drivers] +transport = wsgi +storage = sqlite + +[limits:transport] +message_size_uplimit = 256 diff --git a/marconi/tests/transport/wsgi/test_claims.py b/marconi/tests/transport/wsgi/test_claims.py index 33bf4cacd..837d7a428 100644 --- a/marconi/tests/transport/wsgi/test_claims.py +++ b/marconi/tests/transport/wsgi/test_claims.py @@ -42,7 +42,7 @@ class ClaimsBaseTest(base.TestBase): self.simulate_put(self.queue_path, self.project_id, body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_201) - doc = json.dumps([{'body': 239, 'ttl': 30}] * 10) + doc = json.dumps([{'body': 239, 'ttl': 300}] * 10) self.simulate_post(self.queue_path + '/messages', self.project_id, body=doc, headers={'Client-ID': '30387f00'}) self.assertEquals(self.srmock.status, falcon.HTTP_201) @@ -58,15 +58,35 @@ class ClaimsBaseTest(base.TestBase): body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_400) + # Payload exceeded + self.simulate_post(self.claims_path, self.project_id, + body='{"ttl": 100, "grace": 60}', + query_string='limit=21') + + self.assertEquals(self.srmock.status, falcon.HTTP_400) + + # Unacceptable TTL or grace + for ttl, grace in ((-1, -1), (59, 60), (60, 59), + (60, 43201), (43201, 60)): + self.simulate_post(self.claims_path, self.project_id, + body=json.dumps({'ttl': ttl, 'grace': grace})) + self.assertEquals(self.srmock.status, falcon.HTTP_400) + def test_bad_patch(self): self.simulate_post(self.claims_path, self.project_id, - body='{"ttl": 10, "grace": 30}') + body='{"ttl": 100, "grace": 60}') href = self.srmock.headers_dict['Location'] for doc in (None, '[', '"crunchy"'): self.simulate_patch(href, self.project_id, body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_400) + # Unacceptable new TTL + for ttl in (-1, 59, 43201): + self.simulate_post(self.claims_path, self.project_id, + body=json.dumps({'ttl': ttl})) + self.assertEquals(self.srmock.status, falcon.HTTP_400) + def test_too_much_metadata(self): doc = '{"ttl": 100, "grace": 60}' long_doc = doc + (' ' * @@ -82,7 +102,7 @@ class ClaimsBaseTest(base.TestBase): self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_lifecycle(self): - doc = '{"ttl": 10, "grace": 30}' + doc = '{"ttl": 100, "grace": 60}' # First, claim some messages body = self.simulate_post(self.claims_path, self.project_id, body=doc) @@ -117,7 +137,7 @@ class ClaimsBaseTest(base.TestBase): self.assertEquals(self.srmock.status, falcon.HTTP_200) self.assertEquals(self.srmock.headers_dict['Content-Location'], claim_href) - self.assertEquals(claim['ttl'], 10) + self.assertEquals(claim['ttl'], 100) # Delete the message and its associated claim self.simulate_delete(message_href, self.project_id, @@ -170,7 +190,7 @@ class ClaimsBaseTest(base.TestBase): def test_nonexistent(self): self.simulate_post('/v1/queues/nonexistent/claims', self.project_id, - body='{"ttl": 10, "grace": 30}') + body='{"ttl": 100, "grace": 60}') self.assertEquals(self.srmock.status, falcon.HTTP_404) @@ -203,7 +223,7 @@ class ClaimsFaultyDriverTests(base.TestBaseFaulty): def test_simple(self): project_id = '480924' claims_path = '/v1/queues/fizbit/claims' - doc = '{"ttl": 100, "grace": 30}' + doc = '{"ttl": 100, "grace": 60}' self.simulate_post(claims_path, project_id, body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_503) diff --git a/marconi/tests/transport/wsgi/test_messages.py b/marconi/tests/transport/wsgi/test_messages.py index 480f90e66..2d93f7489 100644 --- a/marconi/tests/transport/wsgi/test_messages.py +++ b/marconi/tests/transport/wsgi/test_messages.py @@ -96,18 +96,48 @@ class MessagesBaseTest(base.TestBase): actual_ttls = set(m['ttl'] for m in result_doc) self.assertFalse(expected_ttls - actual_ttls) + def test_exceeded_payloads(self): + # Get a valid message id + path = self.queue_path + '/messages' + self._post_messages(path) + + msg_id = self._get_msg_id(self.srmock.headers_dict) + + # Posting restriction + self._post_messages(path, repeat=23) + self.assertEquals(self.srmock.status, falcon.HTTP_400) + + # Bulk GET restriction + query_string = 'ids=' + ','.join([msg_id] * 21) + self.simulate_get(path, self.project_id, query_string=query_string) + + self.assertEquals(self.srmock.status, falcon.HTTP_400) + + # Listing restriction + self.simulate_get(path, self.project_id, + query_string='limit=21', + headers=self.headers) + + self.assertEquals(self.srmock.status, falcon.HTTP_400) + + # Bulk deletion restriction + query_string = 'ids=' + ','.join([msg_id] * 22) + self.simulate_delete(path, self.project_id, query_string=query_string) + + self.assertEquals(self.srmock.status, falcon.HTTP_400) + def test_post_single(self): sample_messages = [ - {'body': {'key': 'value'}, 'ttl': 20}, + {'body': {'key': 'value'}, 'ttl': 200}, ] self._test_post(sample_messages) def test_post_multiple(self): sample_messages = [ - {'body': 239, 'ttl': 10}, - {'body': {'key': 'value'}, 'ttl': 20}, - {'body': [1, 3], 'ttl': 30}, + {'body': 239, 'ttl': 100}, + {'body': {'key': 'value'}, 'ttl': 200}, + {'body': [1, 3], 'ttl': 300}, ] self._test_post(sample_messages) @@ -117,15 +147,25 @@ class MessagesBaseTest(base.TestBase): self.assertEquals(self.srmock.status, falcon.HTTP_404) def test_post_bad_message(self): + messages_path = self.queue_path + '/messages' + for document in (None, '[', '[]', '{}', '.'): - self.simulate_post(self.queue_path + '/messages', + self.simulate_post(messages_path, body=document, headers=self.headers) self.assertEquals(self.srmock.status, falcon.HTTP_400) + # Unacceptable TTL + for ttl in (-1, 59, 1209601): + self.simulate_post(messages_path, + body=json.dumps([{'ttl': ttl, + 'body': None}]), + headers=self.headers) + self.assertEquals(self.srmock.status, falcon.HTTP_400) + def test_exceeded_message_posting(self): - #TODO(zyuan): read `20` from the input validation module + # Total (raw request) size doc = json.dumps([{'body': "some body", 'ttl': 100}] * 20, indent=4) long_doc = doc + (' ' * (self.wsgi_cfg.content_max_length - len(doc) + 1)) @@ -136,6 +176,16 @@ class MessagesBaseTest(base.TestBase): self.assertEquals(self.srmock.status, falcon.HTTP_400) + # Each message's size + for long_body in ('a' * 255, # +2 string quotes + {'a': 0, 'b': 'x' * 243}): # w/o whitespaces + doc = json.dumps([{'body': long_body, 'ttl': 100}]) + self.simulate_post(self.queue_path + '/messages', + body=doc, + headers=self.headers) + + self.assertEquals(self.srmock.status, falcon.HTTP_400) + def test_unsupported_json(self): for document in ('{"overflow": 9223372036854775808}', '{"underflow": -9223372036854775809}'): @@ -254,7 +304,7 @@ class MessagesBaseTest(base.TestBase): def test_no_uuid(self): path = self.queue_path + '/messages' - self.simulate_post(path, '7e7e7e', body='[{"body": 0, "ttl": 0}]') + self.simulate_post(path, '7e7e7e', body='[{"body": 0, "ttl": 100}]') self.assertEquals(self.srmock.status, falcon.HTTP_400) @@ -262,7 +312,7 @@ class MessagesBaseTest(base.TestBase): self.assertEquals(self.srmock.status, falcon.HTTP_400) def _post_messages(self, target, repeat=1): - doc = json.dumps([{'body': 239, 'ttl': 30}] * repeat) + doc = json.dumps([{'body': 239, 'ttl': 300}] * repeat) self.simulate_post(target, self.project_id, body=doc, headers=self.headers) @@ -275,7 +325,7 @@ class MessagesBaseTest(base.TestBase): class MessagesSQLiteTests(MessagesBaseTest): - config_filename = 'wsgi_sqlite.conf' + config_filename = 'wsgi_sqlite_validation.conf' class MessagesMongoDBTests(MessagesBaseTest): @@ -296,7 +346,7 @@ class MessagesFaultyDriverTests(base.TestBaseFaulty): def test_simple(self): project_id = 'xyz' path = '/v1/queues/fizbit/messages' - doc = '[{"body": 239, "ttl": 10}]' + doc = '[{"body": 239, "ttl": 100}]' headers = { 'Client-ID': '30387f00', } diff --git a/marconi/tests/transport/wsgi/test_queue_lifecycle.py b/marconi/tests/transport/wsgi/test_queue_lifecycle.py index 04b7a2856..9e56bc82a 100644 --- a/marconi/tests/transport/wsgi/test_queue_lifecycle.py +++ b/marconi/tests/transport/wsgi/test_queue_lifecycle.py @@ -84,10 +84,26 @@ class QueueLifecycleBaseTest(base.TestBase): self.simulate_get(path + '/metadata', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_404) + def test_name_restrictions(self): + self.simulate_put('/v1/queues/Nice-Boat_2') + self.assertEquals(self.srmock.status, falcon.HTTP_201) + + self.simulate_put('/v1/queues/Nice-Bo@t') + self.assertEquals(self.srmock.status, falcon.HTTP_400) + + self.simulate_put('/v1/queues/_' + 'niceboat' * 8) + self.assertEquals(self.srmock.status, falcon.HTTP_400) + def test_no_metadata(self): self.simulate_put('/v1/queues/fizbat') self.assertEquals(self.srmock.status, falcon.HTTP_201) + self.simulate_put('/v1/queues/fizbat/metadata') + self.assertEquals(self.srmock.status, falcon.HTTP_400) + + self.simulate_put('/v1/queues/fizbat/metadata', body='') + self.assertEquals(self.srmock.status, falcon.HTTP_400) + def test_bad_metadata(self): self.simulate_put('/v1/queues/fizbat', '7e55e1a7e') self.assertEquals(self.srmock.status, falcon.HTTP_201) @@ -166,6 +182,10 @@ class QueueLifecycleBaseTest(base.TestBase): self.simulate_get('/v1/queues', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_204) + # Payload exceeded + self.simulate_get('/v1/queues', project_id, query_string='limit=21') + self.assertEquals(self.srmock.status, falcon.HTTP_400) + # Create some self.simulate_put('/v1/queues/q1', project_id, body='{"_ttl": 30 }') self.simulate_put('/v1/queues/q2', project_id, body='{}') diff --git a/marconi/tests/transport/wsgi/test_utils.py b/marconi/tests/transport/wsgi/test_utils.py index 8db0fb596..9eda06052 100644 --- a/marconi/tests/transport/wsgi/test_utils.py +++ b/marconi/tests/transport/wsgi/test_utils.py @@ -158,3 +158,17 @@ class TestWSGIutils(testtools.TestCase): self.assertRaises(falcon.HTTPBadRequest, utils.filter_stream, stream, len(document), spec, doctype=utils.JSONObject) + + def test_filter_stream_wrong_use(self): + document = u'3' + stream = io.StringIO(document) + spec = None + self.assertRaises(TypeError, + utils.filter_stream, stream, len(document), spec, + doctype=int) + + def test_filter_stream_no_reading(self): + stream = None + length = None + self.assertRaises(falcon.HTTPBadRequest, + utils.filter_stream, stream, length, None) diff --git a/marconi/transport/validation.py b/marconi/transport/validation.py new file mode 100644 index 000000000..32ce14f8c --- /dev/null +++ b/marconi/transport/validation.py @@ -0,0 +1,149 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re + +import simplejson as json + +from marconi.common import config +from marconi.common import exceptions + +OPTIONS = { + 'queue_payload_uplimit': 20, + 'message_payload_uplimit': 20, + 'message_size_uplimit': 256 * 1024, + 'message_ttl_max': 1209600, + 'claim_ttl_max': 43200, + 'claim_grace_max': 43200, +} + +CFG = config.namespace('limits:transport').from_options(**OPTIONS) + +QUEUE_NAME_REGEX = re.compile('^[\w-]+$') + + +def queue_creation(name): + """Restrictions on a queue name. + + :param name: The queue name + :raises: ValidationFailed if the name is longer than 64 bytes or + contains bytes other than ASCII digits, letters, underscore, + and dash. + """ + + if len(name) > 64: + raise exceptions.ValidationFailed( + 'queue name longer than 64 bytes') + + if not QUEUE_NAME_REGEX.match(name): + raise exceptions.ValidationFailed( + 'queue name contains forbidden characters') + + +def queue_listing(limit=None, **kwargs): + """Restrictions involving a list of queues. + + :param limit: The expected number of queues in the list + :param kwargs: Ignored arguments passed to storage API + :raises: ValidationFailed if the limit is exceeded + """ + + if limit is not None and not (0 < limit <= CFG.queue_payload_uplimit): + raise exceptions.ValidationFailed( + 'queue payload count not in (0, %d]' % + CFG.queue_payload_uplimit) + + +def message_posting(messages, check_size=True): + """Restrictions on a list of messages. + + :param messages: A list of messages + :param check_size: Whether the size checking for each message + is required + :raises: ValidationFailed if any message has a out-of-range + TTL, or an oversize message body. + """ + + message_listing(limit=len(messages)) + + for msg in messages: + message_content(msg, check_size) + + +def message_content(message, check_size): + """Restrictions on each message.""" + + if not (60 <= message['ttl'] <= CFG.message_ttl_max): + raise exceptions.ValidationFailed( + 'message TTL not in [60, %d]' % + CFG.message_ttl_max) + + if check_size: + # UTF-8 encoded, without whitespace + # TODO(zyuan): Replace this redundent re-serialization + # with a sizing-only parser. + body_length = len(json.dumps(message['body'], + ensure_ascii=False, + separators=(',', ':'))) + if body_length > CFG.message_size_uplimit: + raise exceptions.ValidationFailed( + 'message body larger than %d bytes' % + CFG.message_size_uplimit) + + +def message_listing(limit=None, **kwargs): + """Restrictions involving a list of messages. + + :param limit: The expected number of messages in the list + :param kwargs: Ignored arguments passed to storage API + :raises: ValidationFailed if the limit is exceeded + """ + + if limit is not None and not (0 < limit <= CFG.message_payload_uplimit): + raise exceptions.ValidationFailed( + 'message payload count not in (0, %d]' % + CFG.message_payload_uplimit) + + +def claim_creation(metadata, **kwargs): + """Restrictions on the claim parameters upon creation. + + :param metadata: The claim metadata + :param kwargs: Other arguments passed to storage API + :raises: ValidationFailed if either TTL or grace is out of range, + or the expected number of messages exceed the limit. + """ + + message_listing(**kwargs) + claim_updating(metadata) + + if not (60 <= metadata['grace'] <= CFG.claim_grace_max): + raise exceptions.ValidationFailed( + 'claim grace not in [60, %d]' % + CFG.claim_grace_max) + + +def claim_updating(metadata): + """Restrictions on the claim TTL. + + :param metadata: The claim metadata + :param kwargs: Ignored arguments passed to storage API + :raises: ValidationFailed if the TTL is out of range + """ + + if not (60 <= metadata['ttl'] <= CFG.claim_ttl_max): + raise exceptions.ValidationFailed( + 'claim TTL not in [60, %d]' % + CFG.claim_ttl_max) diff --git a/marconi/transport/wsgi/claims.py b/marconi/transport/wsgi/claims.py index 70ff22313..a15a45167 100644 --- a/marconi/transport/wsgi/claims.py +++ b/marconi/transport/wsgi/claims.py @@ -16,9 +16,11 @@ import falcon from marconi.common import config +from marconi.common import exceptions as input_exceptions import marconi.openstack.common.log as logging from marconi.storage import exceptions as storage_exceptions from marconi.transport import utils +from marconi.transport import validation from marconi.transport.wsgi import exceptions as wsgi_exceptions from marconi.transport.wsgi import utils as wsgi_utils @@ -60,6 +62,7 @@ class CollectionResource(object): # Claim some messages try: + validation.claim_creation(metadata, **claim_options) cid, msgs = self.claim_controller.create( queue_name, metadata=metadata, @@ -70,8 +73,12 @@ class CollectionResource(object): # TODO(kgriffs): optimize, along with serialization (below) resp_msgs = list(msgs) + except input_exceptions.ValidationFailed as ex: + raise wsgi_exceptions.HTTPBadRequestBody(str(ex)) + except storage_exceptions.DoesNotExist: raise falcon.HTTPNotFound() + except Exception as ex: LOG.exception(ex) description = _('Claim could not be created.') @@ -155,6 +162,7 @@ class ItemResource(object): CLAIM_PATCH_SPEC) try: + validation.claim_updating(metadata) self.claim_controller.update(queue_name, claim_id=claim_id, metadata=metadata, @@ -162,8 +170,12 @@ class ItemResource(object): resp.status = falcon.HTTP_204 + except input_exceptions.ValidationFailed as ex: + raise wsgi_exceptions.HTTPBadRequestBody(str(ex)) + except storage_exceptions.DoesNotExist: raise falcon.HTTPNotFound() + except Exception as ex: LOG.exception(ex) description = _('Claim could not be updated.') diff --git a/marconi/transport/wsgi/messages.py b/marconi/transport/wsgi/messages.py index 11281a650..f0b6cf366 100644 --- a/marconi/transport/wsgi/messages.py +++ b/marconi/transport/wsgi/messages.py @@ -16,9 +16,11 @@ import falcon from marconi.common import config +from marconi.common import exceptions as input_exceptions import marconi.openstack.common.log as logging from marconi.storage import exceptions as storage_exceptions from marconi.transport import utils +from marconi.transport import validation from marconi.transport.wsgi import exceptions as wsgi_exceptions from marconi.transport.wsgi import utils as wsgi_utils @@ -45,11 +47,15 @@ class CollectionResource(object): def _get_by_id(self, base_path, project_id, queue_name, ids): """Returns one or more messages from the queue by ID.""" try: + validation.message_listing(limit=len(ids)) messages = self.message_controller.bulk_get( queue_name, message_ids=ids, project=project_id) + except input_exceptions.ValidationFailed as ex: + raise wsgi_exceptions.HTTPBadRequestBody(str(ex)) + except Exception as ex: LOG.exception(ex) description = _('Message could not be retrieved.') @@ -79,6 +85,7 @@ class CollectionResource(object): }) try: + validation.message_listing(**kwargs) results = self.message_controller.list( queue_name, project=project_id, @@ -89,6 +96,9 @@ class CollectionResource(object): cursor = next(results) messages = list(cursor) + except input_exceptions.ValidationFailed as ex: + raise wsgi_exceptions.HTTPBadRequestBody(str(ex)) + except storage_exceptions.DoesNotExist: raise falcon.HTTPNotFound() @@ -161,14 +171,24 @@ class CollectionResource(object): partial = False try: + # No need to check each message's size if it + # can not exceed the request size limit + validation.message_posting( + messages, check_size=( + validation.CFG.message_size_uplimit < + CFG.content_max_length)) message_ids = self.message_controller.post( queue_name, messages=messages, project=project_id, client_uuid=uuid) + except input_exceptions.ValidationFailed as ex: + raise wsgi_exceptions.HTTPBadRequestBody(str(ex)) + except storage_exceptions.DoesNotExist: raise falcon.HTTPNotFound() + except storage_exceptions.MessageConflict as ex: LOG.exception(ex) partial = True @@ -221,11 +241,15 @@ class CollectionResource(object): ids = req.get_param_as_list('ids', required=True) try: + validation.message_listing(limit=len(ids)) self.message_controller.bulk_delete( queue_name, message_ids=ids, project=project_id) + except input_exceptions.ValidationFailed as ex: + raise wsgi_exceptions.HTTPBadRequestBody(str(ex)) + except Exception as ex: LOG.exception(ex) description = 'Messages could not be deleted.' diff --git a/marconi/transport/wsgi/metadata.py b/marconi/transport/wsgi/metadata.py index a99f4fb24..3e3667954 100644 --- a/marconi/transport/wsgi/metadata.py +++ b/marconi/transport/wsgi/metadata.py @@ -20,6 +20,7 @@ import marconi.openstack.common.log as logging from marconi.storage import exceptions as storage_exceptions from marconi.transport import utils from marconi.transport.wsgi import exceptions as wsgi_exceptions +from marconi.transport.wsgi import utils as wsgi_utils LOG = logging.getLogger(__name__) @@ -66,20 +67,9 @@ class Resource(object): raise wsgi_exceptions.HTTPBadRequestBody(description) # Deserialize queue metadata - try: - metadata = utils.read_json(req.stream, req.content_length) - except utils.MalformedJSON: - description = _('Request body could not be parsed.') - raise wsgi_exceptions.HTTPBadRequestBody(description) - except Exception as ex: - LOG.exception(ex) - description = _('Request body could not be read.') - raise wsgi_exceptions.HTTPServiceUnavailable(description) - - # Metadata must be a JSON object - if not isinstance(metadata, dict): - description = _('Queue metadata must be an object.') - raise wsgi_exceptions.HTTPBadRequestBody(description) + metadata, = wsgi_utils.filter_stream(req.stream, + req.content_length, + spec=None) try: self.queue_ctrl.set_metadata(queue_name, diff --git a/marconi/transport/wsgi/queues.py b/marconi/transport/wsgi/queues.py index 15e40673e..2b5a57af2 100644 --- a/marconi/transport/wsgi/queues.py +++ b/marconi/transport/wsgi/queues.py @@ -15,8 +15,10 @@ import falcon +from marconi.common import exceptions as input_exceptions import marconi.openstack.common.log as logging from marconi.transport import utils +from marconi.transport import validation from marconi.transport.wsgi import exceptions as wsgi_exceptions @@ -37,10 +39,14 @@ class ItemResource(object): {"queue": queue_name, "project": project_id}) try: + validation.queue_creation(name=queue_name) created = self.queue_controller.create( queue_name, project=project_id) + except input_exceptions.ValidationFailed as ex: + raise wsgi_exceptions.HTTPBadRequestBody(str(ex)) + except Exception as ex: LOG.exception(ex) description = _('Queue could not be created.') @@ -95,7 +101,12 @@ class CollectionResource(object): }) try: + validation.queue_listing(**kwargs) results = self.queue_controller.list(project=project_id, **kwargs) + + except input_exceptions.ValidationFailed as ex: + raise wsgi_exceptions.HTTPBadRequestBody(str(ex)) + except Exception as ex: LOG.exception(ex) description = _('Queues could not be listed.')