Merge "API v1.1 Claims endpoints"
This commit is contained in:
commit
d55f1de3bc
@ -70,3 +70,6 @@ class Handler(object):
|
|||||||
|
|
||||||
return request.Request(action=action, body=body,
|
return request.Request(action=action, body=body,
|
||||||
headers=headers, api="v1.1")
|
headers=headers, api="v1.1")
|
||||||
|
|
||||||
|
def get_defaults(self):
|
||||||
|
return self.v1_1_endpoints._defaults
|
||||||
|
@ -538,3 +538,190 @@ class Endpoints(object):
|
|||||||
body = {'messages': messages}
|
body = {'messages': messages}
|
||||||
|
|
||||||
return response.Response(req, body, headers)
|
return response.Response(req, body, headers)
|
||||||
|
|
||||||
|
# Claims
|
||||||
|
@api_utils.raises_conn_error
|
||||||
|
def claim_create(self, req):
|
||||||
|
"""Creates a claim
|
||||||
|
|
||||||
|
:param req: Request instance ready to be sent.
|
||||||
|
:type req: `api.common.Request`
|
||||||
|
:return: resp: Response instance
|
||||||
|
:type: resp: `api.common.Response`
|
||||||
|
"""
|
||||||
|
project_id = req._headers.get('X-Project-ID')
|
||||||
|
queue_name = req._body.get('queue_name')
|
||||||
|
|
||||||
|
LOG.debug(u'Claims create - queue: %(queue)s, '
|
||||||
|
u'project: %(project)s',
|
||||||
|
{'queue': queue_name, 'project': project_id})
|
||||||
|
|
||||||
|
self._claim_post_spec = (
|
||||||
|
('ttl', int, self._defaults.claim_ttl),
|
||||||
|
('grace', int, self._defaults.claim_grace),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Claim some messages
|
||||||
|
|
||||||
|
# NOTE(vkmc): We build a dict with the ttl and grace
|
||||||
|
# This is the metadata the storage is waiting for
|
||||||
|
kwargs = api_utils.get_headers(req)
|
||||||
|
# Read claim metadata (e.g., ttl) and raise appropriate
|
||||||
|
# errors as needed.
|
||||||
|
metadata = api_utils.sanitize(kwargs, self._claim_post_spec)
|
||||||
|
|
||||||
|
limit = (None if kwargs.get('limit') is None
|
||||||
|
else kwargs.get('limit'))
|
||||||
|
|
||||||
|
claim_options = {} if limit is None else {'limit': limit}
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._validate.claim_creation(metadata, limit=limit)
|
||||||
|
except (ValueError, validation.ValidationFailed) as ex:
|
||||||
|
LOG.debug(ex)
|
||||||
|
headers = {'status': 400}
|
||||||
|
return api_utils.error_response(req, ex, headers)
|
||||||
|
|
||||||
|
cid, msgs = self._claim_controller.create(
|
||||||
|
queue_name,
|
||||||
|
metadata=metadata,
|
||||||
|
project=project_id,
|
||||||
|
**claim_options)
|
||||||
|
|
||||||
|
# Buffer claimed messages
|
||||||
|
# TODO(vkmc): optimize, along with serialization (below)
|
||||||
|
resp_msgs = list(msgs)
|
||||||
|
|
||||||
|
# Serialize claimed messages, if any. This logic assumes
|
||||||
|
# the storage driver returned well-formed messages.
|
||||||
|
if len(resp_msgs) != 0:
|
||||||
|
resp_msgs = [api_utils.format_message(msg, cid)
|
||||||
|
for msg in resp_msgs]
|
||||||
|
|
||||||
|
headers = {'status': 201}
|
||||||
|
body = {'claim_id': cid, 'messages': resp_msgs}
|
||||||
|
else:
|
||||||
|
headers = {'status': 204}
|
||||||
|
body = {'claim_id': cid}
|
||||||
|
|
||||||
|
return response.Response(req, body, headers)
|
||||||
|
|
||||||
|
@api_utils.raises_conn_error
|
||||||
|
def claim_get(self, req):
|
||||||
|
"""Gets a claim
|
||||||
|
|
||||||
|
:param req: Request instance ready to be sent.
|
||||||
|
:type req: `api.common.Request`
|
||||||
|
:return: resp: Response instance
|
||||||
|
:type: resp: `api.common.Response`
|
||||||
|
"""
|
||||||
|
project_id = req._headers.get('X-Project-ID')
|
||||||
|
queue_name = req._body.get('queue_name')
|
||||||
|
claim_id = req._body.get('claim_id')
|
||||||
|
|
||||||
|
LOG.debug(u'Claim get - claim: %(claim_id)s, '
|
||||||
|
u'queue: %(queue_name)s, project: %(project_id)s',
|
||||||
|
{'queue_name': queue_name,
|
||||||
|
'project_id': project_id,
|
||||||
|
'claim_id': claim_id})
|
||||||
|
try:
|
||||||
|
meta, msgs = self._claim_controller.get(
|
||||||
|
queue_name,
|
||||||
|
claim_id=claim_id,
|
||||||
|
project=project_id)
|
||||||
|
|
||||||
|
# Buffer claimed messages
|
||||||
|
# TODO(vkmc): Optimize along with serialization (see below)
|
||||||
|
meta['messages'] = list(msgs)
|
||||||
|
except storage_errors.DoesNotExist as ex:
|
||||||
|
LOG.debug(ex)
|
||||||
|
error = _('Claim %s does not exist.') % claim_id
|
||||||
|
headers = {'status': 404}
|
||||||
|
return api_utils.error_response(req, ex, headers, error)
|
||||||
|
|
||||||
|
# Serialize claimed messages
|
||||||
|
# TODO(vkmc): Optimize
|
||||||
|
meta['messages'] = [api_utils.format_message(msg, claim_id)
|
||||||
|
for msg in meta['messages']]
|
||||||
|
|
||||||
|
del meta['id']
|
||||||
|
|
||||||
|
headers = {'status': 200}
|
||||||
|
body = meta
|
||||||
|
|
||||||
|
return response.Response(req, body, headers)
|
||||||
|
|
||||||
|
@api_utils.raises_conn_error
|
||||||
|
def claim_update(self, req):
|
||||||
|
"""Updates a claim
|
||||||
|
|
||||||
|
:param req: Request instance ready to be sent.
|
||||||
|
:type req: `api.common.Request`
|
||||||
|
:return: resp: Response instance
|
||||||
|
:type: resp: `api.common.Response`
|
||||||
|
"""
|
||||||
|
project_id = req._headers.get('X-Project-ID')
|
||||||
|
queue_name = req._body.get('queue_name')
|
||||||
|
claim_id = req._body.get('claim_id')
|
||||||
|
|
||||||
|
LOG.debug(u'Claim update - claim: %(claim_id)s, '
|
||||||
|
u'queue: %(queue_name)s, project:%(project_id)s' %
|
||||||
|
{'queue_name': queue_name,
|
||||||
|
'project_id': project_id,
|
||||||
|
'claim_id': claim_id})
|
||||||
|
|
||||||
|
self._claim_patch_spec = (
|
||||||
|
('ttl', int, self._defaults.claim_ttl),
|
||||||
|
('grace', int, self._defaults.claim_grace),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Read claim metadata (e.g., TTL) and raise appropriate
|
||||||
|
# HTTP errors as needed.
|
||||||
|
metadata = api_utils.sanitize(req._body, self._claim_patch_spec)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._validate.claim_updating(metadata)
|
||||||
|
self._claim_controller.update(queue_name,
|
||||||
|
claim_id=claim_id,
|
||||||
|
metadata=metadata,
|
||||||
|
project=project_id)
|
||||||
|
headers = {'status': 204}
|
||||||
|
body = _('Claim %s updated.') % claim_id
|
||||||
|
return response.Response(req, body, headers)
|
||||||
|
except validation.ValidationFailed as ex:
|
||||||
|
LOG.debug(ex)
|
||||||
|
headers = {'status': 400}
|
||||||
|
return api_utils.error_response(req, ex, headers)
|
||||||
|
except storage_errors.DoesNotExist as ex:
|
||||||
|
LOG.debug(ex)
|
||||||
|
error = _('Claim %s does not exist.') % claim_id
|
||||||
|
headers = {'status': 404}
|
||||||
|
return api_utils.error_response(req, ex, headers, error)
|
||||||
|
|
||||||
|
@api_utils.raises_conn_error
|
||||||
|
def claim_delete(self, req):
|
||||||
|
"""Deletes a claim
|
||||||
|
|
||||||
|
:param req: Request instance ready to be sent.
|
||||||
|
:type req: `api.common.Request`
|
||||||
|
:return: resp: Response instance
|
||||||
|
:type: resp: `api.common.Response`
|
||||||
|
"""
|
||||||
|
project_id = req._headers.get('X-Project-ID')
|
||||||
|
queue_name = req._body.get('queue_name')
|
||||||
|
claim_id = req._body.get('claim_id')
|
||||||
|
|
||||||
|
LOG.debug(u'Claim delete - claim: %(claim_id)s, '
|
||||||
|
u'queue: %(queue_name)s, project: %(project_id)s' %
|
||||||
|
{'queue_name': queue_name,
|
||||||
|
'project_id': project_id,
|
||||||
|
'claim_id': claim_id})
|
||||||
|
|
||||||
|
self._claim_controller.delete(queue_name,
|
||||||
|
claim_id=claim_id,
|
||||||
|
project=project_id)
|
||||||
|
|
||||||
|
headers = {'status': 204}
|
||||||
|
body = _('Claim %s deleted.') % claim_id
|
||||||
|
|
||||||
|
return response.Response(req, body, headers)
|
||||||
|
@ -149,6 +149,10 @@ def get_client_uuid(req):
|
|||||||
def get_headers(req):
|
def get_headers(req):
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
|
|
||||||
|
# TODO(vkmc) We should add a control here to make sure
|
||||||
|
# that the headers/request combination is possible
|
||||||
|
# e.g. we cannot have messages_post with grace
|
||||||
|
|
||||||
if req._body.get('marker') is not None:
|
if req._body.get('marker') is not None:
|
||||||
kwargs['marker'] = req._body.get('marker')
|
kwargs['marker'] = req._body.get('marker')
|
||||||
|
|
||||||
@ -166,6 +170,12 @@ def get_headers(req):
|
|||||||
kwargs['include_claimed'] = strutils.bool_from_string(
|
kwargs['include_claimed'] = strutils.bool_from_string(
|
||||||
req._body.get('include_claimed'))
|
req._body.get('include_claimed'))
|
||||||
|
|
||||||
|
if req._body.get('ttl') is not None:
|
||||||
|
kwargs['ttl'] = int(req._body.get('ttl'))
|
||||||
|
|
||||||
|
if req._body.get('grace') is not None:
|
||||||
|
kwargs['grace'] = int(req._body.get('grace'))
|
||||||
|
|
||||||
return kwargs
|
return kwargs
|
||||||
|
|
||||||
|
|
||||||
@ -196,9 +206,10 @@ def error_response(req, exception, headers=None, error=None):
|
|||||||
return resp
|
return resp
|
||||||
|
|
||||||
|
|
||||||
def format_message(message):
|
def format_message(message, claim_id=None):
|
||||||
return {
|
return {
|
||||||
'id': message['id'],
|
'id': message['id'],
|
||||||
|
'claim_id': claim_id,
|
||||||
'ttl': message['ttl'],
|
'ttl': message['ttl'],
|
||||||
'age': message['age'],
|
'age': message['age'],
|
||||||
'body': message['body'],
|
'body': message['body'],
|
||||||
|
@ -45,6 +45,7 @@ class TestBase(testing.TestBase):
|
|||||||
self.boot = bootstrap.Bootstrap(self.conf)
|
self.boot = bootstrap.Bootstrap(self.conf)
|
||||||
|
|
||||||
self.transport = self.boot.transport
|
self.transport = self.boot.transport
|
||||||
|
self.api = self.boot.api
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
if self.conf.pooling:
|
if self.conf.pooling:
|
||||||
|
434
zaqar/tests/unit/transport/websocket/v1_1/test_claims.py
Normal file
434
zaqar/tests/unit/transport/websocket/v1_1/test_claims.py
Normal file
@ -0,0 +1,434 @@
|
|||||||
|
# Copyright (c) 2015 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||||
|
# use this file except in compliance with the License. You may obtain a copy
|
||||||
|
# of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations under
|
||||||
|
# the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import ddt
|
||||||
|
import mock
|
||||||
|
from oslo_utils import timeutils
|
||||||
|
|
||||||
|
from zaqar.tests.unit.transport.websocket import base
|
||||||
|
from zaqar.tests.unit.transport.websocket import utils as test_utils
|
||||||
|
|
||||||
|
|
||||||
|
@ddt.ddt
|
||||||
|
class ClaimsBaseTest(base.V1_1Base):
|
||||||
|
|
||||||
|
config_file = "websocket_mongodb.conf"
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(ClaimsBaseTest, self).setUp()
|
||||||
|
self.protocol = self.transport.factory()
|
||||||
|
self.defaults = self.api.get_defaults()
|
||||||
|
|
||||||
|
self.project_id = '7e55e1a7e'
|
||||||
|
self.headers = {
|
||||||
|
'Client-ID': str(uuid.uuid4()),
|
||||||
|
'X-Project-ID': self.project_id
|
||||||
|
}
|
||||||
|
|
||||||
|
action = "queue_create"
|
||||||
|
body = {"queue_name": "skittle"}
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
|
||||||
|
with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(msg_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 201)
|
||||||
|
|
||||||
|
action = "message_post"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"messages": [
|
||||||
|
{'body': 239, 'ttl': 300},
|
||||||
|
{'body': {'key_1': 'value_1'}, 'ttl': 300},
|
||||||
|
{'body': [1, 3], 'ttl': 300},
|
||||||
|
{'body': 439, 'ttl': 300},
|
||||||
|
{'body': {'key_2': 'value_2'}, 'ttl': 300},
|
||||||
|
{'body': ['a', 'b'], 'ttl': 300},
|
||||||
|
{'body': 639, 'ttl': 300},
|
||||||
|
{'body': {'key_3': 'value_3'}, 'ttl': 300},
|
||||||
|
{'body': ["aa", "bb"], 'ttl': 300}]
|
||||||
|
}
|
||||||
|
|
||||||
|
send_mock = mock.Mock()
|
||||||
|
self.protocol.sendMessage = send_mock
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 201)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(ClaimsBaseTest, self).tearDown()
|
||||||
|
action = 'queue_delete'
|
||||||
|
body = {'queue_name': 'skittle'}
|
||||||
|
|
||||||
|
send_mock = mock.Mock()
|
||||||
|
self.protocol.sendMessage = send_mock
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 204)
|
||||||
|
|
||||||
|
@ddt.data('[', '[]', '.', '"fail"')
|
||||||
|
def test_bad_claim(self, doc):
|
||||||
|
action = "claim_create"
|
||||||
|
body = doc
|
||||||
|
|
||||||
|
send_mock = mock.Mock()
|
||||||
|
self.protocol.sendMessage = send_mock
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 400)
|
||||||
|
|
||||||
|
action = "claim_update"
|
||||||
|
body = doc
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 400)
|
||||||
|
|
||||||
|
def test_exceeded_claim(self):
|
||||||
|
action = "claim_create"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"ttl": 100,
|
||||||
|
"grace": 60,
|
||||||
|
"limit": 21}
|
||||||
|
|
||||||
|
send_mock = mock.Mock()
|
||||||
|
self.protocol.sendMessage = send_mock
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 400)
|
||||||
|
|
||||||
|
@ddt.data((-1, -1), (59, 60), (60, 59), (60, 43201), (43201, 60))
|
||||||
|
def test_unacceptable_ttl_or_grace(self, ttl_grace):
|
||||||
|
ttl, grace = ttl_grace
|
||||||
|
action = "claim_create"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"ttl": ttl,
|
||||||
|
"grace": grace}
|
||||||
|
|
||||||
|
send_mock = mock.Mock()
|
||||||
|
self.protocol.sendMessage = send_mock
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 400)
|
||||||
|
|
||||||
|
@ddt.data(-1, 59, 43201)
|
||||||
|
def test_unacceptable_new_ttl(self, ttl):
|
||||||
|
claim = self._get_a_claim()
|
||||||
|
|
||||||
|
action = "claim_update"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"claim_id": claim['body']['claim_id'],
|
||||||
|
"ttl": ttl}
|
||||||
|
|
||||||
|
send_mock = mock.Mock()
|
||||||
|
self.protocol.sendMessage = send_mock
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 400)
|
||||||
|
|
||||||
|
def test_default_ttl_and_grace(self):
|
||||||
|
action = "claim_create"
|
||||||
|
body = {"queue_name": "skittle"}
|
||||||
|
|
||||||
|
send_mock = mock.Mock()
|
||||||
|
self.protocol.sendMessage = send_mock
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 201)
|
||||||
|
|
||||||
|
action = "claim_get"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"claim_id": resp['body']['claim_id']}
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
|
||||||
|
self.assertEqual(resp['headers']['status'], 200)
|
||||||
|
self.assertEqual(self.defaults.claim_ttl, resp['body']['ttl'])
|
||||||
|
|
||||||
|
def test_lifecycle(self):
|
||||||
|
# First, claim some messages
|
||||||
|
action = "claim_create"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"ttl": 100,
|
||||||
|
"grace": 60}
|
||||||
|
|
||||||
|
send_mock = mock.Mock()
|
||||||
|
self.protocol.sendMessage = send_mock
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 201)
|
||||||
|
claimed_messages = resp['body']['messages']
|
||||||
|
claim_id = resp['body']['claim_id']
|
||||||
|
|
||||||
|
# No more messages to claim
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"ttl": 100,
|
||||||
|
"grace": 60}
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 204)
|
||||||
|
|
||||||
|
# Listing messages, by default, won't include claimed, will echo
|
||||||
|
action = "message_list"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"echo": True}
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 200)
|
||||||
|
self.assertEqual(resp['body']['messages'], [])
|
||||||
|
|
||||||
|
# Listing messages, by default, won't include claimed, won't echo
|
||||||
|
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"echo": False}
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 200)
|
||||||
|
self.assertEqual(resp['body']['messages'], [])
|
||||||
|
|
||||||
|
# List messages, include_claimed, but don't echo
|
||||||
|
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"include_claimed": True,
|
||||||
|
"echo": False}
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 200)
|
||||||
|
self.assertEqual(resp['body']['messages'], [])
|
||||||
|
|
||||||
|
# List messages with a different client-id and echo=false.
|
||||||
|
# Should return some messages
|
||||||
|
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"echo": False}
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
'Client-ID': str(uuid.uuid4()),
|
||||||
|
'X-Project-ID': self.project_id
|
||||||
|
}
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 200)
|
||||||
|
|
||||||
|
# Include claimed messages this time, and echo
|
||||||
|
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"include_claimed": True,
|
||||||
|
"echo": True}
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 200)
|
||||||
|
self.assertEqual(len(resp['body']['messages']), len(claimed_messages))
|
||||||
|
|
||||||
|
message_id_1 = resp['body']['messages'][0]['id']
|
||||||
|
message_id_2 = resp['body']['messages'][1]['id']
|
||||||
|
|
||||||
|
# Try to delete the message without submitting a claim_id
|
||||||
|
action = "message_delete"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"message_id": message_id_1}
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 403)
|
||||||
|
|
||||||
|
# Delete the message and its associated claim
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"message_id": message_id_1,
|
||||||
|
"claim_id": claim_id}
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 204)
|
||||||
|
|
||||||
|
# Try to get it from the wrong project
|
||||||
|
headers = {
|
||||||
|
'Client-ID': str(uuid.uuid4()),
|
||||||
|
'X-Project-ID': 'someproject'
|
||||||
|
}
|
||||||
|
|
||||||
|
action = "message_get"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"message_id": message_id_2}
|
||||||
|
req = test_utils.create_request(action, body, headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 404)
|
||||||
|
|
||||||
|
# Get the message
|
||||||
|
action = "message_get"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"message_id": message_id_2}
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 200)
|
||||||
|
|
||||||
|
# Update the claim
|
||||||
|
creation = timeutils.utcnow()
|
||||||
|
action = "claim_update"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"ttl": 60,
|
||||||
|
"grace": 60,
|
||||||
|
"claim_id": claim_id}
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 204)
|
||||||
|
|
||||||
|
# Get the claimed messages (again)
|
||||||
|
action = "claim_get"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"claim_id": claim_id}
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
query = timeutils.utcnow()
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 200)
|
||||||
|
self.assertEqual(resp['body']['ttl'], 60)
|
||||||
|
|
||||||
|
message_id_3 = resp['body']['messages'][0]['id']
|
||||||
|
|
||||||
|
estimated_age = timeutils.delta_seconds(creation, query)
|
||||||
|
self.assertTrue(estimated_age > resp['body']['age'])
|
||||||
|
|
||||||
|
# Delete the claim
|
||||||
|
action = "claim_delete"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"claim_id": claim_id}
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 204)
|
||||||
|
|
||||||
|
# Try to delete a message with an invalid claim ID
|
||||||
|
action = "message_delete"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"message_id": message_id_3,
|
||||||
|
"claim_id": claim_id}
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 400)
|
||||||
|
|
||||||
|
# Make sure it wasn't deleted!
|
||||||
|
action = "message_get"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"message_id": message_id_2}
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 200)
|
||||||
|
|
||||||
|
# Try to get a claim that doesn't exist
|
||||||
|
action = "claim_get"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"claim_id": claim_id}
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 404)
|
||||||
|
|
||||||
|
# Try to update a claim that doesn't exist
|
||||||
|
action = "claim_update"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"ttl": 60,
|
||||||
|
"grace": 60,
|
||||||
|
"claim_id": claim_id}
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 404)
|
||||||
|
|
||||||
|
def test_post_claim_nonexistent_queue(self):
|
||||||
|
action = "claim_create"
|
||||||
|
body = {"queue_name": "nonexistent",
|
||||||
|
"ttl": 100,
|
||||||
|
"grace": 60}
|
||||||
|
|
||||||
|
send_mock = mock.Mock()
|
||||||
|
self.protocol.sendMessage = send_mock
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 204)
|
||||||
|
|
||||||
|
def test_get_claim_nonexistent_queue(self):
|
||||||
|
action = "claim_get"
|
||||||
|
body = {"queue_name": "nonexistent",
|
||||||
|
"claim_id": "aaabbbba"}
|
||||||
|
|
||||||
|
send_mock = mock.Mock()
|
||||||
|
self.protocol.sendMessage = send_mock
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 404)
|
||||||
|
|
||||||
|
def _get_a_claim(self):
|
||||||
|
action = "claim_create"
|
||||||
|
body = {"queue_name": "skittle",
|
||||||
|
"ttl": 100,
|
||||||
|
"grace": 60}
|
||||||
|
|
||||||
|
send_mock = mock.Mock()
|
||||||
|
self.protocol.sendMessage = send_mock
|
||||||
|
|
||||||
|
req = test_utils.create_request(action, body, self.headers)
|
||||||
|
self.protocol.onMessage(req, False)
|
||||||
|
resp = json.loads(send_mock.call_args[0][0])
|
||||||
|
self.assertEqual(resp['headers']['status'], 201)
|
||||||
|
|
||||||
|
return resp
|
Loading…
Reference in New Issue
Block a user