From f8f2a333b9b28500338544b2432b8dbd2c413a14 Mon Sep 17 00:00:00 2001 From: Victoria Martinez de la Cruz Date: Wed, 24 Jun 2015 06:37:27 -0300 Subject: [PATCH] API v1.1 Claims endpoints This change adds the endpoints for claims to the persistent transport API. DocImpact blueprint persistent-transport Change-Id: I6d9b8f288d62ab05f8a34f2b3e35782e78779e34 --- zaqar/api/handler.py | 3 + zaqar/api/v1_1/endpoints.py | 187 ++++++++ zaqar/common/api/utils.py | 13 +- zaqar/tests/unit/transport/websocket/base.py | 1 + .../transport/websocket/v1_1/test_claims.py | 434 ++++++++++++++++++ 5 files changed, 637 insertions(+), 1 deletion(-) create mode 100644 zaqar/tests/unit/transport/websocket/v1_1/test_claims.py diff --git a/zaqar/api/handler.py b/zaqar/api/handler.py index d3f1712f0..e957a24d0 100644 --- a/zaqar/api/handler.py +++ b/zaqar/api/handler.py @@ -70,3 +70,6 @@ class Handler(object): return request.Request(action=action, body=body, headers=headers, api="v1.1") + + def get_defaults(self): + return self.v1_1_endpoints._defaults diff --git a/zaqar/api/v1_1/endpoints.py b/zaqar/api/v1_1/endpoints.py index 1e86314a2..43de3a52e 100644 --- a/zaqar/api/v1_1/endpoints.py +++ b/zaqar/api/v1_1/endpoints.py @@ -538,3 +538,190 @@ class Endpoints(object): body = {'messages': messages} return response.Response(req, body, headers) + + # Claims + @api_utils.raises_conn_error + def claim_create(self, req): + """Creates a claim + + :param req: Request instance ready to be sent. + :type req: `api.common.Request` + :return: resp: Response instance + :type: resp: `api.common.Response` + """ + project_id = req._headers.get('X-Project-ID') + queue_name = req._body.get('queue_name') + + LOG.debug(u'Claims create - queue: %(queue)s, ' + u'project: %(project)s', + {'queue': queue_name, 'project': project_id}) + + self._claim_post_spec = ( + ('ttl', int, self._defaults.claim_ttl), + ('grace', int, self._defaults.claim_grace), + ) + + # Claim some messages + + # NOTE(vkmc): We build a dict with the ttl and grace + # This is the metadata the storage is waiting for + kwargs = api_utils.get_headers(req) + # Read claim metadata (e.g., ttl) and raise appropriate + # errors as needed. + metadata = api_utils.sanitize(kwargs, self._claim_post_spec) + + limit = (None if kwargs.get('limit') is None + else kwargs.get('limit')) + + claim_options = {} if limit is None else {'limit': limit} + + try: + self._validate.claim_creation(metadata, limit=limit) + except (ValueError, validation.ValidationFailed) as ex: + LOG.debug(ex) + headers = {'status': 400} + return api_utils.error_response(req, ex, headers) + + cid, msgs = self._claim_controller.create( + queue_name, + metadata=metadata, + project=project_id, + **claim_options) + + # Buffer claimed messages + # TODO(vkmc): optimize, along with serialization (below) + resp_msgs = list(msgs) + + # Serialize claimed messages, if any. This logic assumes + # the storage driver returned well-formed messages. + if len(resp_msgs) != 0: + resp_msgs = [api_utils.format_message(msg, cid) + for msg in resp_msgs] + + headers = {'status': 201} + body = {'claim_id': cid, 'messages': resp_msgs} + else: + headers = {'status': 204} + body = {'claim_id': cid} + + return response.Response(req, body, headers) + + @api_utils.raises_conn_error + def claim_get(self, req): + """Gets a claim + + :param req: Request instance ready to be sent. + :type req: `api.common.Request` + :return: resp: Response instance + :type: resp: `api.common.Response` + """ + project_id = req._headers.get('X-Project-ID') + queue_name = req._body.get('queue_name') + claim_id = req._body.get('claim_id') + + LOG.debug(u'Claim get - claim: %(claim_id)s, ' + u'queue: %(queue_name)s, project: %(project_id)s', + {'queue_name': queue_name, + 'project_id': project_id, + 'claim_id': claim_id}) + try: + meta, msgs = self._claim_controller.get( + queue_name, + claim_id=claim_id, + project=project_id) + + # Buffer claimed messages + # TODO(vkmc): Optimize along with serialization (see below) + meta['messages'] = list(msgs) + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + error = _('Claim %s does not exist.') % claim_id + headers = {'status': 404} + return api_utils.error_response(req, ex, headers, error) + + # Serialize claimed messages + # TODO(vkmc): Optimize + meta['messages'] = [api_utils.format_message(msg, claim_id) + for msg in meta['messages']] + + del meta['id'] + + headers = {'status': 200} + body = meta + + return response.Response(req, body, headers) + + @api_utils.raises_conn_error + def claim_update(self, req): + """Updates a claim + + :param req: Request instance ready to be sent. + :type req: `api.common.Request` + :return: resp: Response instance + :type: resp: `api.common.Response` + """ + project_id = req._headers.get('X-Project-ID') + queue_name = req._body.get('queue_name') + claim_id = req._body.get('claim_id') + + LOG.debug(u'Claim update - claim: %(claim_id)s, ' + u'queue: %(queue_name)s, project:%(project_id)s' % + {'queue_name': queue_name, + 'project_id': project_id, + 'claim_id': claim_id}) + + self._claim_patch_spec = ( + ('ttl', int, self._defaults.claim_ttl), + ('grace', int, self._defaults.claim_grace), + ) + + # Read claim metadata (e.g., TTL) and raise appropriate + # HTTP errors as needed. + metadata = api_utils.sanitize(req._body, self._claim_patch_spec) + + try: + self._validate.claim_updating(metadata) + self._claim_controller.update(queue_name, + claim_id=claim_id, + metadata=metadata, + project=project_id) + headers = {'status': 204} + body = _('Claim %s updated.') % claim_id + return response.Response(req, body, headers) + except validation.ValidationFailed as ex: + LOG.debug(ex) + headers = {'status': 400} + return api_utils.error_response(req, ex, headers) + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + error = _('Claim %s does not exist.') % claim_id + headers = {'status': 404} + return api_utils.error_response(req, ex, headers, error) + + @api_utils.raises_conn_error + def claim_delete(self, req): + """Deletes a claim + + :param req: Request instance ready to be sent. + :type req: `api.common.Request` + :return: resp: Response instance + :type: resp: `api.common.Response` + """ + project_id = req._headers.get('X-Project-ID') + queue_name = req._body.get('queue_name') + claim_id = req._body.get('claim_id') + + LOG.debug(u'Claim delete - claim: %(claim_id)s, ' + u'queue: %(queue_name)s, project: %(project_id)s' % + {'queue_name': queue_name, + 'project_id': project_id, + 'claim_id': claim_id}) + + self._claim_controller.delete(queue_name, + claim_id=claim_id, + project=project_id) + + headers = {'status': 204} + body = _('Claim %s deleted.') % claim_id + + return response.Response(req, body, headers) diff --git a/zaqar/common/api/utils.py b/zaqar/common/api/utils.py index 6f1e2bf0d..9b7d9830c 100644 --- a/zaqar/common/api/utils.py +++ b/zaqar/common/api/utils.py @@ -149,6 +149,10 @@ def get_client_uuid(req): def get_headers(req): kwargs = {} + # TODO(vkmc) We should add a control here to make sure + # that the headers/request combination is possible + # e.g. we cannot have messages_post with grace + if req._body.get('marker') is not None: kwargs['marker'] = req._body.get('marker') @@ -166,6 +170,12 @@ def get_headers(req): kwargs['include_claimed'] = strutils.bool_from_string( req._body.get('include_claimed')) + if req._body.get('ttl') is not None: + kwargs['ttl'] = int(req._body.get('ttl')) + + if req._body.get('grace') is not None: + kwargs['grace'] = int(req._body.get('grace')) + return kwargs @@ -196,9 +206,10 @@ def error_response(req, exception, headers=None, error=None): return resp -def format_message(message): +def format_message(message, claim_id=None): return { 'id': message['id'], + 'claim_id': claim_id, 'ttl': message['ttl'], 'age': message['age'], 'body': message['body'], diff --git a/zaqar/tests/unit/transport/websocket/base.py b/zaqar/tests/unit/transport/websocket/base.py index c4ac561f1..3792eae08 100644 --- a/zaqar/tests/unit/transport/websocket/base.py +++ b/zaqar/tests/unit/transport/websocket/base.py @@ -45,6 +45,7 @@ class TestBase(testing.TestBase): self.boot = bootstrap.Bootstrap(self.conf) self.transport = self.boot.transport + self.api = self.boot.api def tearDown(self): if self.conf.pooling: diff --git a/zaqar/tests/unit/transport/websocket/v1_1/test_claims.py b/zaqar/tests/unit/transport/websocket/v1_1/test_claims.py new file mode 100644 index 000000000..fb691097d --- /dev/null +++ b/zaqar/tests/unit/transport/websocket/v1_1/test_claims.py @@ -0,0 +1,434 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import json +import uuid + +import ddt +import mock +from oslo_utils import timeutils + +from zaqar.tests.unit.transport.websocket import base +from zaqar.tests.unit.transport.websocket import utils as test_utils + + +@ddt.ddt +class ClaimsBaseTest(base.V1_1Base): + + config_file = "websocket_mongodb.conf" + + def setUp(self): + super(ClaimsBaseTest, self).setUp() + self.protocol = self.transport.factory() + self.defaults = self.api.get_defaults() + + self.project_id = '7e55e1a7e' + self.headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': self.project_id + } + + action = "queue_create" + body = {"queue_name": "skittle"} + req = test_utils.create_request(action, body, self.headers) + + with mock.patch.object(self.protocol, 'sendMessage') as msg_mock: + self.protocol.onMessage(req, False) + resp = json.loads(msg_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 201) + + action = "message_post" + body = {"queue_name": "skittle", + "messages": [ + {'body': 239, 'ttl': 300}, + {'body': {'key_1': 'value_1'}, 'ttl': 300}, + {'body': [1, 3], 'ttl': 300}, + {'body': 439, 'ttl': 300}, + {'body': {'key_2': 'value_2'}, 'ttl': 300}, + {'body': ['a', 'b'], 'ttl': 300}, + {'body': 639, 'ttl': 300}, + {'body': {'key_3': 'value_3'}, 'ttl': 300}, + {'body': ["aa", "bb"], 'ttl': 300}] + } + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + + req = test_utils.create_request(action, body, self.headers) + + self.protocol.onMessage(req, False) + + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 201) + + def tearDown(self): + super(ClaimsBaseTest, self).tearDown() + action = 'queue_delete' + body = {'queue_name': 'skittle'} + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 204) + + @ddt.data('[', '[]', '.', '"fail"') + def test_bad_claim(self, doc): + action = "claim_create" + body = doc + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 400) + + action = "claim_update" + body = doc + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 400) + + def test_exceeded_claim(self): + action = "claim_create" + body = {"queue_name": "skittle", + "ttl": 100, + "grace": 60, + "limit": 21} + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 400) + + @ddt.data((-1, -1), (59, 60), (60, 59), (60, 43201), (43201, 60)) + def test_unacceptable_ttl_or_grace(self, ttl_grace): + ttl, grace = ttl_grace + action = "claim_create" + body = {"queue_name": "skittle", + "ttl": ttl, + "grace": grace} + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 400) + + @ddt.data(-1, 59, 43201) + def test_unacceptable_new_ttl(self, ttl): + claim = self._get_a_claim() + + action = "claim_update" + body = {"queue_name": "skittle", + "claim_id": claim['body']['claim_id'], + "ttl": ttl} + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 400) + + def test_default_ttl_and_grace(self): + action = "claim_create" + body = {"queue_name": "skittle"} + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 201) + + action = "claim_get" + body = {"queue_name": "skittle", + "claim_id": resp['body']['claim_id']} + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + + self.assertEqual(resp['headers']['status'], 200) + self.assertEqual(self.defaults.claim_ttl, resp['body']['ttl']) + + def test_lifecycle(self): + # First, claim some messages + action = "claim_create" + body = {"queue_name": "skittle", + "ttl": 100, + "grace": 60} + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 201) + claimed_messages = resp['body']['messages'] + claim_id = resp['body']['claim_id'] + + # No more messages to claim + body = {"queue_name": "skittle", + "ttl": 100, + "grace": 60} + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 204) + + # Listing messages, by default, won't include claimed, will echo + action = "message_list" + body = {"queue_name": "skittle", + "echo": True} + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 200) + self.assertEqual(resp['body']['messages'], []) + + # Listing messages, by default, won't include claimed, won't echo + + body = {"queue_name": "skittle", + "echo": False} + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 200) + self.assertEqual(resp['body']['messages'], []) + + # List messages, include_claimed, but don't echo + + body = {"queue_name": "skittle", + "include_claimed": True, + "echo": False} + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 200) + self.assertEqual(resp['body']['messages'], []) + + # List messages with a different client-id and echo=false. + # Should return some messages + + body = {"queue_name": "skittle", + "echo": False} + + headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': self.project_id + } + + req = test_utils.create_request(action, body, headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 200) + + # Include claimed messages this time, and echo + + body = {"queue_name": "skittle", + "include_claimed": True, + "echo": True} + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 200) + self.assertEqual(len(resp['body']['messages']), len(claimed_messages)) + + message_id_1 = resp['body']['messages'][0]['id'] + message_id_2 = resp['body']['messages'][1]['id'] + + # Try to delete the message without submitting a claim_id + action = "message_delete" + body = {"queue_name": "skittle", + "message_id": message_id_1} + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 403) + + # Delete the message and its associated claim + body = {"queue_name": "skittle", + "message_id": message_id_1, + "claim_id": claim_id} + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 204) + + # Try to get it from the wrong project + headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': 'someproject' + } + + action = "message_get" + body = {"queue_name": "skittle", + "message_id": message_id_2} + req = test_utils.create_request(action, body, headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 404) + + # Get the message + action = "message_get" + body = {"queue_name": "skittle", + "message_id": message_id_2} + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 200) + + # Update the claim + creation = timeutils.utcnow() + action = "claim_update" + body = {"queue_name": "skittle", + "ttl": 60, + "grace": 60, + "claim_id": claim_id} + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 204) + + # Get the claimed messages (again) + action = "claim_get" + body = {"queue_name": "skittle", + "claim_id": claim_id} + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + query = timeutils.utcnow() + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 200) + self.assertEqual(resp['body']['ttl'], 60) + + message_id_3 = resp['body']['messages'][0]['id'] + + estimated_age = timeutils.delta_seconds(creation, query) + self.assertTrue(estimated_age > resp['body']['age']) + + # Delete the claim + action = "claim_delete" + body = {"queue_name": "skittle", + "claim_id": claim_id} + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 204) + + # Try to delete a message with an invalid claim ID + action = "message_delete" + body = {"queue_name": "skittle", + "message_id": message_id_3, + "claim_id": claim_id} + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 400) + + # Make sure it wasn't deleted! + action = "message_get" + body = {"queue_name": "skittle", + "message_id": message_id_2} + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 200) + + # Try to get a claim that doesn't exist + action = "claim_get" + body = {"queue_name": "skittle", + "claim_id": claim_id} + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 404) + + # Try to update a claim that doesn't exist + action = "claim_update" + body = {"queue_name": "skittle", + "ttl": 60, + "grace": 60, + "claim_id": claim_id} + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 404) + + def test_post_claim_nonexistent_queue(self): + action = "claim_create" + body = {"queue_name": "nonexistent", + "ttl": 100, + "grace": 60} + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 204) + + def test_get_claim_nonexistent_queue(self): + action = "claim_get" + body = {"queue_name": "nonexistent", + "claim_id": "aaabbbba"} + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 404) + + def _get_a_claim(self): + action = "claim_create" + body = {"queue_name": "skittle", + "ttl": 100, + "grace": 60} + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(resp['headers']['status'], 201) + + return resp