From 91a86d287178836f2ded2460db50d7afbb5289e4 Mon Sep 17 00:00:00 2001 From: Victoria Martinez de la Cruz Date: Thu, 19 Mar 2015 00:51:20 -0300 Subject: [PATCH] API v1.1 Messages endpoints This change adds the endpoints for messages to the persistent transport API. It also fixes some details on the representation for queues. DocImpact Change-Id: I9167f562dd7cb56a27d92f1ea6d9febf7d0865ae --- zaqar/api/handler.py | 5 +- zaqar/api/v1_1/endpoints.py | 359 ++++++++++- zaqar/api/v1_1/request.py | 3 +- zaqar/bootstrap.py | 4 +- zaqar/common/api/errors.py | 41 ++ zaqar/common/api/utils.py | 159 ++++- zaqar/tests/unit/transport/websocket/base.py | 2 +- .../transport/websocket/v1_1/test_messages.py | 608 ++++++++++++++++++ 8 files changed, 1148 insertions(+), 33 deletions(-) create mode 100644 zaqar/common/api/errors.py create mode 100644 zaqar/tests/unit/transport/websocket/v1_1/test_messages.py diff --git a/zaqar/api/handler.py b/zaqar/api/handler.py index c83550ef0..56251a04a 100644 --- a/zaqar/api/handler.py +++ b/zaqar/api/handler.py @@ -21,8 +21,9 @@ class Handler(object): The handler validates and process the requests """ - def __init__(self, storage, control, validate): - self.v1_1_endpoints = endpoints.Endpoints(storage, control, validate) + def __init__(self, storage, control, validate, defaults): + self.v1_1_endpoints = endpoints.Endpoints(storage, control, + validate, defaults) def process_request(self, req): # FIXME(vkmc): Control API version diff --git a/zaqar/api/v1_1/endpoints.py b/zaqar/api/v1_1/endpoints.py index 5fc4c6271..881d10ec7 100644 --- a/zaqar/api/v1_1/endpoints.py +++ b/zaqar/api/v1_1/endpoints.py @@ -14,6 +14,7 @@ from oslo_log import log as logging +from zaqar.common.api import errors as api_errors from zaqar.common.api import response from zaqar.common.api import utils as api_utils from zaqar.i18n import _ @@ -26,7 +27,7 @@ LOG = logging.getLogger(__name__) class Endpoints(object): """v1.1 API Endpoints.""" - def __init__(self, storage, control, validate): + def __init__(self, storage, control, validate, defaults): self._queue_controller = storage.queue_controller self._message_controller = storage.message_controller self._claim_controller = storage.claim_controller @@ -36,6 +37,9 @@ class Endpoints(object): self._validate = validate + self._defaults = defaults + + # Queues @api_utils.raises_conn_error def queue_list(self, req): """Gets a list of queues @@ -50,22 +54,13 @@ class Endpoints(object): LOG.debug(u'Queue list - project: %(project)s', {'project': project_id}) - kwargs = {} - - if req._body.get('marker') is not None: - kwargs['marker'] = req._body.get('marker') - - if req._body.get('limit') is not None: - kwargs['limit'] = req._body.get('limit') - - if req._body.get('detailed') is not None: - kwargs['detailed'] = req._body.get('detailed') - try: + kwargs = api_utils.get_headers(req) + self._validate.queue_listing(**kwargs) results = self._queue_controller.list( project=project_id, **kwargs) - except validation.ValidationFailed as ex: + except (ValueError, validation.ValidationFailed) as ex: LOG.debug(ex) headers = {'status': 400} return api_utils.error_response(req, ex, headers) @@ -82,9 +77,7 @@ class Endpoints(object): body = {'queues': queues} headers = {'status': 200} - resp = response.Response(req, body, headers) - - return resp + return response.Response(req, body, headers) @api_utils.raises_conn_error def queue_create(self, req): @@ -121,8 +114,7 @@ class Endpoints(object): else: body = _('Queue %s created.') % queue_name headers = {'status': 201} if created else {'status': 204} - resp = response.Response(req, body, headers) - return resp + return response.Response(req, body, headers) @api_utils.raises_conn_error def queue_delete(self, req): @@ -148,8 +140,7 @@ class Endpoints(object): else: body = _('Queue %s removed.') % queue_name headers = {'status': 204} - resp = response.Response(req, body, headers) - return resp + return response.Response(req, body, headers) @api_utils.raises_conn_error def queue_get(self, req): @@ -183,8 +174,7 @@ class Endpoints(object): else: body = resp_dict headers = {'status': 200} - resp = response.Response(req, body, headers) - return resp + return response.Response(req, body, headers) @api_utils.raises_conn_error def queue_get_stats(self, req): @@ -198,7 +188,7 @@ class Endpoints(object): project_id = req._headers.get('X-Project-ID') queue_name = req._body.get('queue_name') - LOG.debug(u'Queue get queue stats - queue: %(queue)s, ' + LOG.debug(u'Get queue stats - queue: %(queue)s, ' u'project: %(project)s', {'queue': queue_name, 'project': project_id}) @@ -217,8 +207,7 @@ class Endpoints(object): } body = resp_dict headers = {'status': 404} - resp = response.Response(req, body, headers) - return resp + return response.Response(req, body, headers) except storage_errors.ExceptionBase as ex: LOG.exception(ex) error = _('Cannot retrieve queue %s stats.') % queue_name @@ -226,5 +215,321 @@ class Endpoints(object): return api_utils.error_response(req, ex, headers, error) else: headers = {'status': 200} - resp = response.Response(req, body, headers) - return resp \ No newline at end of file + return response.Response(req, body, headers) + + # Messages + @api_utils.raises_conn_error + def message_list(self, req): + """Gets a list of messages on a queue + + :param req: Request instance ready to be sent. + :type req: `api.common.Request` + :return: resp: Response instance + :type: resp: `api.common.Response` + """ + project_id = req._headers.get('X-Project-ID') + queue_name = req._body.get('queue_name') + + LOG.debug(u'Message list - queue: %(queue)s, ' + u'project: %(project)s', + {'queue': queue_name, 'project': project_id}) + + try: + kwargs = api_utils.get_headers(req) + + client_uuid = api_utils.get_client_uuid(req) + + self._validate.message_listing(**kwargs) + results = self._message_controller.list( + queue_name, + project=project_id, + client_uuid=client_uuid, + **kwargs) + + # Buffer messages + cursor = next(results) + messages = list(cursor) + except (ValueError, api_errors.BadRequest, + validation.ValidationFailed) as ex: + LOG.debug(ex) + headers = {'status': 400} + return api_utils.error_response(req, ex, headers) + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + headers = {'status': 404} + return api_utils.error_response(req, ex, headers) + + if messages: + # Found some messages, so prepare the response + kwargs['marker'] = next(results) + messages = [api_utils.format_message(message) + for message in messages] + + headers = {'status': 200} + body = {'messages': messages} + + return response.Response(req, body, headers) + + @api_utils.raises_conn_error + def message_get(self, req): + """Gets a message from a queue + + :param req: Request instance ready to be sent. + :type req: `api.common.Request` + :return: resp: Response instance + :type: resp: `api.common.Response` + """ + project_id = req._headers.get('X-Project-ID') + queue_name = req._body.get('queue_name') + message_id = req._body.get('message_id') + + LOG.debug(u'Message get - message: %(message)s, ' + u'queue: %(queue)s, project: %(project)s', + {'message': message_id, + 'queue': queue_name, + 'project': project_id}) + try: + message = self._message_controller.get( + queue_name, + message_id, + project=project_id) + + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + headers = {'status': 404} + return api_utils.error_response(req, ex, headers) + + # Prepare response + message = api_utils.format_message(message) + + headers = {'status': 200} + body = {'messages': message} + + return response.Response(req, body, headers) + + @api_utils.raises_conn_error + def message_get_many(self, req): + """Gets a set of messages from a queue + + :param req: Request instance ready to be sent. + :type req: `api.common.Request` + :return: resp: Response instance + :type: resp: `api.common.Response` + """ + project_id = req._headers.get('X-Project-ID') + queue_name = req._body.get('queue_name') + message_ids = list(req._body.get('message_ids')) + + LOG.debug(u'Message get - queue: %(queue)s, ' + u'project: %(project)s', + {'queue': queue_name, 'project': project_id}) + + try: + self._validate.message_listing(limit=len(message_ids)) + messages = self._message_controller.bulk_get( + queue_name, + message_ids=message_ids, + project=project_id) + except validation.ValidationFailed as ex: + LOG.debug(ex) + headers = {'status': 400} + return api_utils.error_response(req, ex, headers) + + # Prepare response + messages = list(messages) + messages = [api_utils.format_message(message) + for message in messages] + + headers = {'status': 200} + body = {'messages': messages} + + return response.Response(req, body, headers) + + @api_utils.raises_conn_error + def message_post(self, req): + """Post a set of messages to a queue + + :param req: Request instance ready to be sent. + :type req: `api.common.Request` + :return: resp: Response instance + :type: resp: `api.common.Response` + """ + project_id = req._headers.get('X-Project-ID') + queue_name = req._body.get('queue_name') + + LOG.debug(u'Messages post - queue: %(queue)s, ' + u'project: %(project)s', + {'queue': queue_name, 'project': project_id}) + + messages = req._body.get('messages') + + if messages is None: + ex = _(u'Invalid request.') + error = _(u'No messages were found in the request body.') + headers = {'status': 400} + return api_utils.error_response(req, ex, headers, error) + + try: + # Place JSON size restriction before parsing + self._validate.message_length(len(str(messages))) + except validation.ValidationFailed as ex: + LOG.debug(ex) + headers = {'status': 400} + return api_utils.error_response(req, ex, headers) + + _message_post_spec = ( + ('ttl', int, self._defaults.message_ttl), + ('body', '*', None), + ) + + messages = api_utils.sanitize(messages, + _message_post_spec, + doctype=list) + + try: + client_uuid = api_utils.get_client_uuid(req) + + self._validate.message_posting(messages) + + if not self._queue_controller.exists(queue_name, project_id): + self._validate.queue_identification(queue_name, project_id) + self._queue_controller.create(queue_name, project=project_id) + + message_ids = self._message_controller.post( + queue_name, + messages=messages, + project=project_id, + client_uuid=client_uuid) + except (api_errors.BadRequest, validation.ValidationFailed) as ex: + LOG.debug(ex) + headers = {'status': 400} + return api_utils.error_response(req, ex, headers) + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + headers = {'status': 404} + return api_utils.error_response(req, ex, headers) + except storage_errors.MessageConflict as ex: + LOG.exception(ex) + error = _(u'No messages could be enqueued.') + headers = {'status': 500} + return api_utils.error_response(req, ex, headers, error) + + # Prepare the response + headers = {'status': 201} + body = {'message_ids': message_ids} + + return response.Response(req, body, headers) + + @api_utils.raises_conn_error + def message_delete(self, req): + """Delete a message from a queue + + :param req: Request instance ready to be sent. + :type req: `api.common.Request` + :return: resp: Response instance + :type: resp: `api.common.Response` + """ + project_id = req._headers.get('X-Project-ID') + queue_name = req._body.get('queue_name') + message_id = req._body.get('message_id') + + LOG.debug(u'Messages item DELETE - message: %(message)s, ' + u'queue: %(queue)s, project: %(project)s', + {'message': message_id, + 'queue': queue_name, + 'project': project_id}) + + claim_id = req._body.get('claim_id') + + try: + self._message_controller.delete( + queue_name, + message_id=message_id, + project=project_id, + claim=claim_id) + except storage_errors.MessageNotClaimed as ex: + LOG.debug(ex) + error = _(u'A claim was specified, but the message ' + u'is not currently claimed.') + headers = {'status': 400} + return api_utils.error_response(req, ex, headers, error) + except storage_errors.ClaimDoesNotExist as ex: + LOG.debug(ex) + error = _(u'The specified claim does not exist or ' + u'has expired.') + headers = {'status': 400} + return api_utils.error_response(req, ex, headers, error) + except storage_errors.NotPermitted as ex: + LOG.debug(ex) + error = _(u'This message is claimed; it cannot be ' + u'deleted without a valid claim ID.') + headers = {'status': 403} + return api_utils.error_response(req, ex, headers, error) + + headers = {'status': 204} + body = {} + + return response.Response(req, body, headers) + + @api_utils.raises_conn_error + def message_delete_many(self, req): + """Deletes a set of messages from a queue + + :param req: Request instance ready to be sent. + :type req: `api.common.Request` + :return: resp: Response instance + :type: resp: `api.common.Response` + """ + project_id = req._headers.get('X-Project-ID') + queue_name = req._body.get('queue_name') + message_ids = req._body.get('message_ids') + pop_limit = req._body.get('pop_limit') + + LOG.debug(u'Messages collection DELETE - queue: %(queue)s,' + u'project: %(project)s, messages: %(message_ids)s', + {'queue': queue_name, 'project': project_id, + 'message_ids': message_ids}) + + try: + self._validate.message_deletion(message_ids, pop_limit) + + except validation.ValidationFailed as ex: + LOG.debug(ex) + headers = {'status': 400} + return api_utils.error_response(req, ex, headers) + + if message_ids: + return self._delete_messages_by_id(req, queue_name, message_ids, + project_id) + elif pop_limit: + return self._pop_messages(req, queue_name, project_id, pop_limit) + + @api_utils.raises_conn_error + def _delete_messages_by_id(self, req, queue_name, ids, project_id): + self._message_controller.bulk_delete(queue_name, message_ids=ids, + project=project_id) + + headers = {'status': 204} + body = {} + + return response.Response(req, body, headers) + + @api_utils.raises_conn_error + def _pop_messages(self, req, queue_name, project_id, pop_limit): + + LOG.debug(u'Pop messages - queue: %(queue)s, project: %(project)s', + {'queue': queue_name, 'project': project_id}) + + messages = self._message_controller.pop( + queue_name, + project=project_id, + limit=pop_limit) + + # Prepare response + if not messages: + messages = [] + + headers = {'status': 200} + body = {'messages': messages} + + return response.Response(req, body, headers) diff --git a/zaqar/api/v1_1/request.py b/zaqar/api/v1_1/request.py index 875472e36..aba8f6878 100644 --- a/zaqar/api/v1_1/request.py +++ b/zaqar/api/v1_1/request.py @@ -239,8 +239,9 @@ class RequestSchema(api.Api): 'type': 'object', 'properties': { 'queue_name': {'type': 'string'}, + 'messages': {'type': 'array'}, }, - 'required': ['queue_name'], + 'required': ['queue_name', 'messages'], } }, 'required': ['action', 'headers', 'body'] diff --git a/zaqar/bootstrap.py b/zaqar/bootstrap.py index 82164e8e5..b62159aa2 100644 --- a/zaqar/bootstrap.py +++ b/zaqar/bootstrap.py @@ -25,6 +25,7 @@ from zaqar.openstack.common.cache import cache as oslo_cache from zaqar.storage import pipeline from zaqar.storage import pooling from zaqar.storage import utils as storage_utils +from zaqar.transport import base from zaqar.transport import validation LOG = log.getLogger(__name__) @@ -102,7 +103,8 @@ class Bootstrap(object): def api(self): LOG.debug(u'Loading API handler') validate = validation.Validator(self.conf) - return handler.Handler(self.storage, self.control, validate) + defaults = base.ResourceDefaults(self.conf) + return handler.Handler(self.storage, self.control, validate, defaults) @decorators.lazy_property(write=False) def storage(self): diff --git a/zaqar/common/api/errors.py b/zaqar/common/api/errors.py new file mode 100644 index 000000000..f70c1846a --- /dev/null +++ b/zaqar/common/api/errors.py @@ -0,0 +1,41 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class ExceptionBase(Exception): + + msg_format = '' + + def __init__(self, **kwargs): + msg = self.msg_format.format(**kwargs) + super(ExceptionBase, self).__init__(msg) + + +class BadRequest(ExceptionBase): + """Raised when an invalid request is received.""" + + msg_format = u'Bad request. {error} {description}' + + def __init__(self, error, description): + """Initializes the error with contextual information. + + :param description: Error description + """ + + super(BadRequest, self).__init__(error=error, description=description) + + +class DocumentTypeNotSupported(ExceptionBase): + """Raised when the content of a request has an unsupported format.""" diff --git a/zaqar/common/api/utils.py b/zaqar/common/api/utils.py index b259563bc..47b363a18 100644 --- a/zaqar/common/api/utils.py +++ b/zaqar/common/api/utils.py @@ -13,15 +13,162 @@ # the License. import functools +import uuid from oslo_log import log as logging +from oslo_utils import strutils +import zaqar.common.api.errors as api_errors import zaqar.common.api.response as response from zaqar.i18n import _ LOG = logging.getLogger(__name__) +def sanitize(document, spec=None, doctype=dict): + """Validates a document and drops undesired fields. + + :param document: A dict to verify according to `spec`. + :param spec: (Default None) Iterable describing expected fields, + yielding tuples with the form of: + + (field_name, value_type, default_value) + + Note that value_type may either be a Python type, or the + special string '*' to accept any type. default_value is the + default to give the field if it is missing, or None to require + that the field be present. + + If spec is None, the incoming documents will not be validated. + :param doctype: type of document to expect; must be either + JSONObject or JSONArray. + :raises: DocumentTypeNotSupported, TypeError + :returns: A sanitized, filtered version of the document. If the + document is a list of objects, each object will be filtered + and returned in a new list. If, on the other hand, the document + is expected to contain a single object, that object's fields will + be filtered and the resulting object will be returned. + """ + + if doctype is dict: + if not isinstance(document, dict): + raise api_errors.DocumentTypeNotSupported() + + return document if spec is None else filter(document, spec) + + if doctype is list: + if not isinstance(document, list): + raise api_errors.DocumentTypeNotSupported() + + if spec is None: + return document + + return [filter(obj, spec) for obj in document] + + raise TypeError(_(u'Doctype must be either a JSONObject or JSONArray')) + + +def filter(document, spec): + """Validates and retrieves typed fields from a single document. + + Sanitizes a dict-like document by checking it against a + list of field spec, and returning only those fields + specified. + + :param document: dict-like object + :param spec: iterable describing expected fields, yielding + tuples with the form of: (field_name, value_type). Note that + value_type may either be a Python type, or the special + string '*' to accept any type. + :raises: BadRequest if any field is missing or not an + instance of the specified type + :returns: A filtered dict containing only the fields + listed in the spec + """ + + filtered = {} + for name, value_type, default_value in spec: + filtered[name] = get_checked_field(document, name, + value_type, default_value) + + return filtered + + +def get_checked_field(document, name, value_type, default_value): + """Validates and retrieves a typed field from a document. + + This function attempts to look up doc[name], and raises + appropriate errors if the field is missing or not an + instance of the given type. + + :param document: dict-like object + :param name: field name + :param value_type: expected value type, or '*' to accept any type + :param default_value: Default value to use if the value is missing, + or None to make the value required. + :raises: BadRequest if the field is missing or not an + instance of value_type + :returns: value obtained from doc[name] + """ + + try: + value = document[name] + except KeyError: + if default_value is not None: + value = default_value + else: + description = _(u'Missing "{name}" field.').format(name=name) + raise api_errors.BadRequest(description) + + # PERF(kgriffs): We do our own little spec thing because it is way + # faster than jsonschema. + if value_type == '*' or isinstance(value, value_type): + return value + + description = _(u'The value of the "{name}" field must be a {vtype}.') + description = description.format(name=name, vtype=value_type.__name__) + raise api_errors.BadRequest(description) + + +def get_client_uuid(req): + """Read a required Client-ID from a request. + + :param req: Request object + :raises: BadRequest if the Client-ID header is missing or + does not represent a valid UUID + :returns: A UUID object + """ + + try: + return uuid.UUID(req._headers.get('Client-ID')) + except ValueError: + description = _(u'Malformed hexadecimal UUID.') + raise api_errors.BadRequest(_(u'Wrong UUID value'), description) + + +def get_headers(req): + kwargs = {} + + if req._body.get('marker') is not None: + kwargs['marker'] = req._body.get('marker') + + if req._body.get('limit') is not None: + kwargs['limit'] = int(req._body.get('limit')) + + if req._body.get('detailed') is not None: + kwargs['detailed'] = strutils.bool_from_string( + req._body.get('detailed')) + + if req._body.get('echo') is not None: + kwargs['echo'] = strutils.bool_from_string(req._body.get('echo')) + + if req._body.get('include_claimed') is not None: + kwargs['include_claimed'] = strutils.bool_from_string( + req._body.get('include_claimed')) + + return kwargs + + def raises_conn_error(func): """Handles generic Exceptions @@ -36,8 +183,9 @@ def raises_conn_error(func): except Exception as ex: LOG.exception(ex) error = _("Unexpected error.") + headers = {'status': 500} req = kwargs.get('req') - return error_response(req, ex, error) + return error_response(req, ex, headers, error) return wrapper @@ -46,3 +194,12 @@ def error_response(req, exception, headers=None, error=None): body = {'exception': str(exception), 'error': error} resp = response.Response(req, body, headers) return resp + + +def format_message(message): + return { + 'id': message['id'], + 'ttl': message['ttl'], + 'age': message['age'], + 'body': message['body'], + } diff --git a/zaqar/tests/unit/transport/websocket/base.py b/zaqar/tests/unit/transport/websocket/base.py index e77e32646..bcb94f178 100644 --- a/zaqar/tests/unit/transport/websocket/base.py +++ b/zaqar/tests/unit/transport/websocket/base.py @@ -37,7 +37,7 @@ class TestBase(testing.TestBase): self.conf.register_opts(driver._WS_OPTIONS, group=driver._WS_GROUP) - self.wsgi_cfg = self.conf[driver._WS_GROUP] + self.ws_cfg = self.conf[driver._WS_GROUP] self.conf.unreliable = True self.conf.admin_mode = True diff --git a/zaqar/tests/unit/transport/websocket/v1_1/test_messages.py b/zaqar/tests/unit/transport/websocket/v1_1/test_messages.py new file mode 100644 index 000000000..93101e6d0 --- /dev/null +++ b/zaqar/tests/unit/transport/websocket/v1_1/test_messages.py @@ -0,0 +1,608 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import uuid + +import ddt +import mock +from oslo_utils import timeutils +import six +from testtools import matchers + +from zaqar.tests.unit.transport.websocket import base +from zaqar.tests.unit.transport.websocket import utils as test_utils +from zaqar.transport import validation + + +@ddt.ddt +class MessagesBaseTest(base.V1_1Base): + + config_file = "websocket_mongodb.conf" + + def setUp(self): + super(MessagesBaseTest, self).setUp() + self.protocol = self.transport.factory() + + self.default_message_ttl = 3600 + + self.project_id = '7e55e1a7e' + self.headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': self.project_id + } + + action = "queue_create" + body = {"queue_name": "kitkat"} + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 201) + + with mock.patch.object(self.protocol, 'sendMessage') as msg_mock: + msg_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def tearDown(self): + super(MessagesBaseTest, self).tearDown() + action = "queue_delete" + body = {"queue_name": "kitkat"} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 204) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def _test_post(self, sample_messages): + action = "message_post" + body = {"queue_name": "kitkat", + "messages": sample_messages} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock = send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 201) + self.msg_ids = resp['body']['message_ids'] + self.assertEqual(len(self.msg_ids), len(sample_messages)) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + self.assertTrue(send_mock.called) + + lookup = dict([(m['ttl'], m['body']) for m in sample_messages]) + + # Test GET on the message resource directly + # NOTE(cpp-cabrera): force the passing of time to age a message + timeutils_utcnow = 'zaqar.openstack.common.timeutils.utcnow' + now = timeutils.utcnow() + datetime.timedelta(seconds=10) + with mock.patch(timeutils_utcnow) as mock_utcnow: + mock_utcnow.return_value = now + for msg_id in self.msg_ids: + headers = self.headers.copy() + headers['X-Project-ID'] = '777777' + # Wrong project ID + action = "message_get" + body = {"queue_name": "kitkat", + "message_id": msg_id} + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 404) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Correct project ID + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + + # Check message properties + message = resp['body']['messages'] + self.assertEqual(message['body'], lookup[message['ttl']]) + self.assertEqual(message['id'], msg_id) + + # no negative age + # NOTE(cpp-cabrera): testtools lacks + # GreaterThanEqual on py26 + self.assertThat(message['age'], + matchers.GreaterThan(-1)) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Test bulk GET + action = "message_get_many" + body = {"queue_name": "kitkat", + "message_ids": self.msg_ids} + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + expected_ttls = set(m['ttl'] for m in sample_messages) + actual_ttls = set(m['ttl'] for m in resp['body']['messages']) + self.assertFalse(expected_ttls - actual_ttls) + actual_ids = set(m['id'] for m in resp['body']['messages']) + self.assertFalse(set(self.msg_ids) - actual_ids) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def test_exceeded_payloads(self): + # Get a valid message id + resp = self._post_messages("kitkat") + msg_id = resp['body']['message_ids'] + + # Bulk GET restriction + get_msg_ids = msg_id * 21 + action = "message_get_many" + body = {"queue_name": "kitkat", + "message_ids": get_msg_ids} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Listing restriction + body['limit'] = 21 + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + + # Bulk deletion restriction + del_msg_ids = msg_id * 22 + action = "message_get_many" + body = {"queue_name": "kitkat", + "message_ids": del_msg_ids} + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + + def test_post_single(self): + sample_messages = [ + {'body': {'key': 'value'}, 'ttl': 200}, + ] + + self._test_post(sample_messages) + + def test_post_multiple(self): + sample_messages = [ + {'body': 239, 'ttl': 100}, + {'body': {'key': 'value'}, 'ttl': 200}, + {'body': [1, 3], 'ttl': 300}, + ] + + self._test_post(sample_messages) + + def test_post_optional_ttl(self): + messages = [{'body': 239}, + {'body': {'key': 'value'}, 'ttl': 200}] + + action = "message_post" + body = {"queue_name": "kitkat", + "messages": messages} + req = test_utils.create_request(action, body, self.headers) + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock = send_mock.start() + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 201) + self.msg_id = resp['body']['message_ids'][0] + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + action = "message_get" + body = {"queue_name": "kitkat", + "message_id": self.msg_id} + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + self.assertEqual(self.default_message_ttl, + resp['body']['messages']['ttl']) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + self.assertTrue(send_mock.called) + + def test_post_to_non_ascii_queue(self): + queue_name = u'non-ascii-n\u0153me' + + if six.PY2: + queue_name = queue_name.encode('utf-8') + + resp = self._post_messages(queue_name) + self.assertEqual(resp['headers']['status'], 400) + + def test_post_with_long_queue_name(self): + # NOTE(kgriffs): This test verifies that routes with + # embedded queue name params go through the validation + # hook, regardless of the target resource. + + queue_name = 'v' * validation.QUEUE_NAME_MAX_LEN + + resp = self._post_messages(queue_name) + self.assertEqual(resp['headers']['status'], 201) + + queue_name += 'v' + resp = self._post_messages(queue_name) + self.assertEqual(resp['headers']['status'], 400) + + def test_post_to_missing_queue(self): + queue_name = 'nonexistent' + resp = self._post_messages(queue_name) + self.assertEqual(resp['headers']['status'], 201) + + def test_get_from_missing_queue(self): + action = "message_list" + body = {"queue_name": "anothernonexistent"} + req = test_utils.create_request(action, body, self.headers) + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock = send_mock.start() + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + self.assertEqual(resp['body']['messages'], []) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + @ddt.data('', '0xdeadbeef', '550893e0-2b6e-11e3-835a-5cf9dd72369') + def test_bad_client_id(self, text_id): + action = "message_post" + body = { + "queue_name": "kinder", + "messages": [{"ttl": 60, + "body": ""}] + } + headers = { + 'Client-ID': text_id, + 'X-Project-ID': self.project_id + } + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock = send_mock.start() + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + action = "message_get" + body = { + "queue_name": "kinder", + "limit": 3, + "echo": True + } + + req = test_utils.create_request(action, body, headers) + self.protocol.onMessage(req, False) + + @ddt.data(None, '[', '[]', '{}', '.') + def test_post_bad_message(self, document): + action = "message_post" + body = { + "queue_name": "kinder", + "messages": document + } + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock = send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + @ddt.data(-1, 59, 1209601) + def test_unacceptable_ttl(self, ttl): + action = "message_post" + body = {"queue_name": "kinder", + "messages": [{"ttl": ttl, "body": ""}]} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock = send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def test_exceeded_message_posting(self): + # Total (raw request) size + document = [{'body': "some body", 'ttl': 100}] * 8000 + action = "message_post" + body = { + "queue_name": "kinder", + "messages": document + } + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock = send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + @ddt.data('{"overflow": 9223372036854775808}', + '{"underflow": -9223372036854775809}') + def test_unsupported_json(self, document): + action = "message_post" + body = { + "queue_name": "fizz", + "messages": document + } + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock = send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def test_delete(self): + resp = self._post_messages("tofi") + msg_id = resp['body']['message_ids'][0] + + action = "message_get" + body = {"queue_name": "tofi", + "message_id": msg_id} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock = send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + self.assertTrue(send_mock.called) + + # Delete queue + action = "message_delete" + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 204) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + self.assertTrue(send_mock.called) + + # Get non existent queue + action = "message_get" + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 404) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + self.assertTrue(send_mock.called) + + # Safe to delete non-existing ones + action = "message_delete" + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 204) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + self.assertTrue(send_mock.called) + + def test_bulk_delete(self): + resp = self._post_messages("nerds", repeat=5) + msg_ids = resp['body']['message_ids'] + + action = "message_delete_many" + body = {"queue_name": "nerds", + "message_ids": msg_ids} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock = send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 204) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + self.assertTrue(send_mock.called) + + action = "message_get" + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + self.assertTrue(send_mock.called) + + # Safe to delete non-existing ones + action = "message_delete_many" + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 204) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + self.assertTrue(send_mock.called) + + # Even after the queue is gone + action = "queue_delete" + body = {"queue_name": "nerds"} + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + + action = "message_delete_many" + body = {"queue_name": "nerds", + "message_ids": msg_ids} + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + + def test_get_nonexistent_message_404s(self): + action = "message_get" + body = {"queue_name": "notthere", + "message_id": "a"} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 404) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def test_get_multiple_invalid_messages_404s(self): + action = "message_get_many" + body = {"queue_name": "notnotthere", + "message_ids": ["a", "b", "c"]} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 404) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def test_delete_multiple_invalid_messages_204s(self): + action = "message_delete" + body = {"queue_name": "yetanothernotthere", + "message_ids": ["a", "b", "c"]} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 204) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def _post_messages(self, queue_name, repeat=1): + messages = [{'body': 239, 'ttl': 300}] * repeat + + action = "message_post" + body = {"queue_name": queue_name, + "messages": messages} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock = send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.response = resp + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + self.assertTrue(send_mock.called) + + return self.response