From 7c6d52ccbf59989f75ed1e0ad90807765c505618 Mon Sep 17 00:00:00 2001 From: Fei Long Wang Date: Mon, 11 Jan 2016 15:32:19 +1300 Subject: [PATCH] Add `_max_messages_post_size` and `_default_message_ttl` for queue Now Zaqar supports metadata as the attributes of a queue, so that user can fill it with some key-value pair. However, Zaqar itself isn't using it very much(only the '_flavor'). Meanwhile, we have some configurations defined in zaqar.conf against messages or claims which are global defined. For example, the max message size. So the idea is adding more reserved attributes for queue so that the queues of Zaqar are more flexible. This patch adds the `_max_messages_post_size` and `_default_message_ttl` for message posting, tests are added as well. APIImpact DocImpact blueprint more-reserved-queue-attributes Change-Id: I7ba84b6e862ff15684a1b1d919c0fed60f4c58b2 --- zaqar/api/v2/endpoints.py | 40 +++++++++---- .../unit/transport/wsgi/v2_0/test_messages.py | 41 ++++++++++++- .../transport/wsgi/v2_0/test_validation.py | 37 ++++++++++++ zaqar/transport/validation.py | 58 ++++++++++++++++++- zaqar/transport/wsgi/v2_0/messages.py | 40 ++++++++++--- zaqar/transport/wsgi/v2_0/queues.py | 5 +- 6 files changed, 201 insertions(+), 20 deletions(-) diff --git a/zaqar/api/v2/endpoints.py b/zaqar/api/v2/endpoints.py index 76e4775e0..22f966247 100644 --- a/zaqar/api/v2/endpoints.py +++ b/zaqar/api/v2/endpoints.py @@ -100,6 +100,7 @@ class Endpoints(object): try: self._validate.queue_identification(queue_name, project_id) self._validate.queue_metadata_length(len(str(metadata))) + self._validate.queue_metadata_putting(metadata) created = self._queue_controller.create(queue_name, metadata=metadata, project=project_id) @@ -371,18 +372,41 @@ class Endpoints(object): return api_utils.error_response(req, ex, headers, error) try: + # NOTE(flwang): Replace 'exists' with 'get_metadata' won't impact + # the performance since both of them will call + # collection.find_one() + queue_meta = None + try: + queue_meta = self._queue_controller.get_metadata(queue_name, + project_id) + except storage_errors.DoesNotExist as ex: + self._validate.queue_identification(queue_name, project_id) + self._queue_controller.create(queue_name, project=project_id) + # NOTE(flwang): Queue is created in lazy mode, so no metadata + # set. + queue_meta = {} + + queue_max_msg_size = queue_meta.get('_max_messages_post_size', + None) + queue_default_ttl = queue_meta.get('_default_message_ttl', None) + + # TODO(flwang): To avoid any unexpected regression issue, we just + # leave the _message_post_spec attribute of class as it's. It + # should be removed in Newton release. + if queue_default_ttl: + _message_post_spec = (('ttl', int, queue_default_ttl), + ('body', '*', None),) + else: + _message_post_spec = (('ttl', int, self._defaults.message_ttl), + ('body', '*', None),) # Place JSON size restriction before parsing - self._validate.message_length(len(str(messages))) + self._validate.message_length(len(str(messages)), + max_msg_post_size=queue_max_msg_size) except validation.ValidationFailed as ex: LOG.debug(ex) headers = {'status': 400} return api_utils.error_response(req, ex, headers) - _message_post_spec = ( - ('ttl', int, self._defaults.message_ttl), - ('body', '*', None), - ) - try: messages = api_utils.sanitize(messages, _message_post_spec, @@ -397,10 +421,6 @@ class Endpoints(object): self._validate.message_posting(messages) - if not self._queue_controller.exists(queue_name, project_id): - self._validate.queue_identification(queue_name, project_id) - self._queue_controller.create(queue_name, project=project_id) - message_ids = self._message_controller.post( queue_name, messages=messages, diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py index bd99ac6a3..a7fdd8d27 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py @@ -265,6 +265,45 @@ class TestMessagesMongoDB(base.V2Base): self._post_messages(self.url_prefix + '/queues/nonexistent/messages') self.assertEqual(falcon.HTTP_201, self.srmock.status) + def test_post_using_queue_default_message_ttl(self): + queue_path = self.url_prefix + '/queues/test_queue1' + messages_path = queue_path + '/messages' + doc = '{"_default_message_ttl": 999}' + self.simulate_put(queue_path, body=doc, headers=self.headers) + self.addCleanup(self.simulate_delete, queue_path, headers=self.headers) + sample_messages = { + 'messages': [ + {'body': {'key': 'value'}}, + ], + } + + sample_doc = jsonutils.dumps(sample_messages) + result = self.simulate_post(messages_path, + body=sample_doc, headers=self.headers) + result_doc = jsonutils.loads(result[0]) + href = result_doc['resources'][0] + result = self.simulate_get(href, headers=self.headers) + message = jsonutils.loads(result[0]) + + self.assertEqual(999, message['ttl']) + + def test_post_using_queue_max_messages_post_size(self): + queue_path = self.url_prefix + '/queues/test_queue2' + messages_path = queue_path + '/messages' + doc = '{"_max_messages_post_size": 1023}' + self.simulate_put(queue_path, body=doc, headers=self.headers) + self.addCleanup(self.simulate_delete, queue_path, headers=self.headers) + sample_messages = { + 'messages': [ + {'body': {'key': 'a' * 1204}}, + ], + } + + sample_doc = jsonutils.dumps(sample_messages) + self.simulate_post(messages_path, + body=sample_doc, headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + def test_get_from_missing_queue(self): body = self.simulate_get(self.url_prefix + '/queues/nonexistent/messages', @@ -633,7 +672,7 @@ class TestMessagesFaultyDriver(base.V2BaseFaulty): self.simulate_post(path, body=body, headers=headers) - self.assertEqual(falcon.HTTP_503, self.srmock.status) + self.assertEqual(falcon.HTTP_500, self.srmock.status) self.simulate_get(path, headers=headers) diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py index e6bbe9b01..71564c73e 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py @@ -98,3 +98,40 @@ class TestValidation(base.V2Base): body='{"timespace": "Shangri-la"}', headers=empty_headers) self.assertEqual(falcon.HTTP_400, self.srmock.status) + + def test_queue_metadata_putting(self): + # Test _default_message_ttl + # TTL normal case + queue_1 = self.url_prefix + '/queues/queue1' + self.simulate_put(queue_1, + self.project_id, + body='{"_default_message_ttl": 60}') + self.addCleanup(self.simulate_delete, queue_1, headers=self.headers) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + # TTL under min + self.simulate_put(queue_1, + self.project_id, + body='{"_default_message_ttl": 59}') + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + # TTL over max + self.simulate_put(queue_1, + self.project_id, + body='{"_default_message_ttl": 1209601}') + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + # Test _max_messages_post_size + # Size normal case + queue_2 = self.url_prefix + '/queues/queue2' + self.simulate_put(queue_2, + self.project_id, + body='{"_max_messages_post_size": 255}') + self.addCleanup(self.simulate_delete, queue_2, headers=self.headers) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + # Size over max + self.simulate_put(queue_2, + self.project_id, + body='{"_max_messages_post_size": 257}') + self.assertEqual(falcon.HTTP_400, self.srmock.status) diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index 40cfc6825..08af1775f 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -153,6 +153,42 @@ class Validator(object): msg = _(u'Queue metadata is too large. Max size: {0}') raise ValidationFailed(msg, self._limits_conf.max_queue_metadata) + def queue_metadata_putting(self, queue_metadata): + """Checking if the reserved attributes of the queue are valid. + + :param queue_metadata: Queue's metadata. + :raises: ValidationFailed if any reserved attribute is invalid. + """ + if not queue_metadata: + return + + queue_default_ttl = queue_metadata.get('_default_message_ttl', None) + if queue_default_ttl and not isinstance(queue_default_ttl, int): + msg = _(u'_default_message_ttl must be integer.') + raise ValidationFailed(msg) + + if queue_default_ttl: + if not (MIN_MESSAGE_TTL <= queue_default_ttl <= + self._limits_conf.max_message_ttl): + msg = _(u'_default_message_ttl can not exceed {0} ' + 'seconds, and must be at least {1} seconds long.') + raise ValidationFailed( + msg, self._limits_conf.max_message_ttl, MIN_MESSAGE_TTL) + + queue_max_msg_size = queue_metadata.get('_max_messages_post_size', + None) + if queue_max_msg_size and not isinstance(queue_max_msg_size, int): + msg = _(u'_max_messages_post_size must be integer.') + raise ValidationFailed(msg) + + if queue_max_msg_size: + if not (0 < queue_max_msg_size <= + self._limits_conf.max_messages_post_size): + raise ValidationFailed( + _(u'_max_messages_post_size can not exceed {0}, ' + ' and must be at least greater than 0.'), + self._limits_conf.max_messages_post_size) + def message_posting(self, messages): """Restrictions on a list of messages. @@ -167,7 +203,7 @@ class Validator(object): for msg in messages: self.message_content(msg) - def message_length(self, content_length): + def message_length(self, content_length, max_msg_post_size=None): """Restrictions on message post length. :param content_length: Queue request's length. @@ -175,6 +211,26 @@ class Validator(object): """ if content_length is None: return + + if max_msg_post_size: + try: + min_max_size = min(max_msg_post_size, + self._limits_conf.max_messages_post_size) + if content_length > min_max_size: + raise ValidationFailed( + _(u'Message collection size is too large. The max ' + 'size for current queue is {0}. It is calculated ' + 'by max size = min(max_messages_post_size_config: ' + '{1}, max_messages_post_size_queue: {2}).'), + min_max_size, + self._limits_conf.max_messages_post_size, + max_msg_post_size) + except TypeError: + # NOTE(flwang): If there is a type error when using min(), + # it only happens in py3.x, it will be skipped and compare + # the message length with the size defined in config file. + pass + if content_length > self._limits_conf.max_messages_post_size: raise ValidationFailed( _(u'Message collection size is too large. Max size {0}'), diff --git a/zaqar/transport/wsgi/v2_0/messages.py b/zaqar/transport/wsgi/v2_0/messages.py index 638ef4147..79d91bc79 100644 --- a/zaqar/transport/wsgi/v2_0/messages.py +++ b/zaqar/transport/wsgi/v2_0/messages.py @@ -38,6 +38,7 @@ class CollectionResource(object): '_wsgi_conf', '_validate', '_message_post_spec', + '_default_message_ttl' ) def __init__(self, wsgi_conf, validate, @@ -48,9 +49,10 @@ class CollectionResource(object): self._validate = validate self._message_controller = message_controller self._queue_controller = queue_controller + self._default_message_ttl = default_message_ttl self._message_post_spec = ( - ('ttl', int, default_message_ttl), + ('ttl', int, self._default_message_ttl), ('body', '*', None), ) @@ -155,10 +157,37 @@ class CollectionResource(object): @acl.enforce("messages:create") def on_post(self, req, resp, project_id, queue_name): client_uuid = wsgi_helpers.get_client_uuid(req) - try: + # NOTE(flwang): Replace 'exists' with 'get_metadata' won't impact + # the performance since both of them will call + # collection.find_one() + queue_meta = None + try: + queue_meta = self._queue_controller.get_metadata(queue_name, + project_id) + except storage_errors.DoesNotExist as ex: + self._validate.queue_identification(queue_name, project_id) + self._queue_controller.create(queue_name, project=project_id) + # NOTE(flwang): Queue is created in lazy mode, so no metadata + # set. + queue_meta = {} + + queue_max_msg_size = queue_meta.get('_max_messages_post_size', + None) + queue_default_ttl = queue_meta.get('_default_message_ttl', None) + + # TODO(flwang): To avoid any unexpected regression issue, we just + # leave the _message_post_spec attribute of class as it's. It + # should be removed in Newton release. + if queue_default_ttl: + message_post_spec = (('ttl', int, queue_default_ttl), + ('body', '*', None),) + else: + message_post_spec = (('ttl', int, self._default_message_ttl), + ('body', '*', None),) # Place JSON size restriction before parsing - self._validate.message_length(req.content_length) + self._validate.message_length(req.content_length, + max_msg_post_size=queue_max_msg_size) except validation.ValidationFailed as ex: LOG.debug(ex) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) @@ -171,15 +200,12 @@ class CollectionResource(object): raise wsgi_errors.HTTPBadRequestAPI(description) messages = wsgi_utils.sanitize(document['messages'], - self._message_post_spec, + message_post_spec, doctype=wsgi_utils.JSONArray) try: self._validate.message_posting(messages) - if not self._queue_controller.exists(queue_name, project_id): - self._queue_controller.create(queue_name, project=project_id) - message_ids = self._message_controller.post( queue_name, messages=messages, diff --git a/zaqar/transport/wsgi/v2_0/queues.py b/zaqar/transport/wsgi/v2_0/queues.py index 82b397819..ead561416 100644 --- a/zaqar/transport/wsgi/v2_0/queues.py +++ b/zaqar/transport/wsgi/v2_0/queues.py @@ -74,6 +74,7 @@ class ItemResource(object): metadata = wsgi_utils.sanitize(document, spec=None) try: + self._validate.queue_metadata_putting(metadata) created = self._queue_controller.create(queue_name, metadata=metadata, project=project_id) @@ -81,7 +82,9 @@ class ItemResource(object): except storage_errors.FlavorDoesNotExist as ex: LOG.exception(ex) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) - + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) except Exception as ex: LOG.exception(ex) description = _(u'Queue could not be created.')