Merge "API v1.1 Messages endpoints"

This commit is contained in:
Jenkins 2015-07-06 23:13:56 +00:00 committed by Gerrit Code Review
commit 0a2d30f770
8 changed files with 1148 additions and 33 deletions

View File

@ -21,8 +21,9 @@ class Handler(object):
The handler validates and process the requests
"""
def __init__(self, storage, control, validate):
self.v1_1_endpoints = endpoints.Endpoints(storage, control, validate)
def __init__(self, storage, control, validate, defaults):
self.v1_1_endpoints = endpoints.Endpoints(storage, control,
validate, defaults)
def process_request(self, req):
# FIXME(vkmc): Control API version

View File

@ -14,6 +14,7 @@
from oslo_log import log as logging
from zaqar.common.api import errors as api_errors
from zaqar.common.api import response
from zaqar.common.api import utils as api_utils
from zaqar.i18n import _
@ -26,7 +27,7 @@ LOG = logging.getLogger(__name__)
class Endpoints(object):
"""v1.1 API Endpoints."""
def __init__(self, storage, control, validate):
def __init__(self, storage, control, validate, defaults):
self._queue_controller = storage.queue_controller
self._message_controller = storage.message_controller
self._claim_controller = storage.claim_controller
@ -36,6 +37,9 @@ class Endpoints(object):
self._validate = validate
self._defaults = defaults
# Queues
@api_utils.raises_conn_error
def queue_list(self, req):
"""Gets a list of queues
@ -50,22 +54,13 @@ class Endpoints(object):
LOG.debug(u'Queue list - project: %(project)s',
{'project': project_id})
kwargs = {}
if req._body.get('marker') is not None:
kwargs['marker'] = req._body.get('marker')
if req._body.get('limit') is not None:
kwargs['limit'] = req._body.get('limit')
if req._body.get('detailed') is not None:
kwargs['detailed'] = req._body.get('detailed')
try:
kwargs = api_utils.get_headers(req)
self._validate.queue_listing(**kwargs)
results = self._queue_controller.list(
project=project_id, **kwargs)
except validation.ValidationFailed as ex:
except (ValueError, validation.ValidationFailed) as ex:
LOG.debug(ex)
headers = {'status': 400}
return api_utils.error_response(req, ex, headers)
@ -82,9 +77,7 @@ class Endpoints(object):
body = {'queues': queues}
headers = {'status': 200}
resp = response.Response(req, body, headers)
return resp
return response.Response(req, body, headers)
@api_utils.raises_conn_error
def queue_create(self, req):
@ -121,8 +114,7 @@ class Endpoints(object):
else:
body = _('Queue %s created.') % queue_name
headers = {'status': 201} if created else {'status': 204}
resp = response.Response(req, body, headers)
return resp
return response.Response(req, body, headers)
@api_utils.raises_conn_error
def queue_delete(self, req):
@ -148,8 +140,7 @@ class Endpoints(object):
else:
body = _('Queue %s removed.') % queue_name
headers = {'status': 204}
resp = response.Response(req, body, headers)
return resp
return response.Response(req, body, headers)
@api_utils.raises_conn_error
def queue_get(self, req):
@ -183,8 +174,7 @@ class Endpoints(object):
else:
body = resp_dict
headers = {'status': 200}
resp = response.Response(req, body, headers)
return resp
return response.Response(req, body, headers)
@api_utils.raises_conn_error
def queue_get_stats(self, req):
@ -198,7 +188,7 @@ class Endpoints(object):
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
LOG.debug(u'Queue get queue stats - queue: %(queue)s, '
LOG.debug(u'Get queue stats - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
@ -217,8 +207,7 @@ class Endpoints(object):
}
body = resp_dict
headers = {'status': 404}
resp = response.Response(req, body, headers)
return resp
return response.Response(req, body, headers)
except storage_errors.ExceptionBase as ex:
LOG.exception(ex)
error = _('Cannot retrieve queue %s stats.') % queue_name
@ -226,5 +215,321 @@ class Endpoints(object):
return api_utils.error_response(req, ex, headers, error)
else:
headers = {'status': 200}
resp = response.Response(req, body, headers)
return resp
return response.Response(req, body, headers)
# Messages
@api_utils.raises_conn_error
def message_list(self, req):
"""Gets a list of messages on a queue
:param req: Request instance ready to be sent.
:type req: `api.common.Request`
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
LOG.debug(u'Message list - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
try:
kwargs = api_utils.get_headers(req)
client_uuid = api_utils.get_client_uuid(req)
self._validate.message_listing(**kwargs)
results = self._message_controller.list(
queue_name,
project=project_id,
client_uuid=client_uuid,
**kwargs)
# Buffer messages
cursor = next(results)
messages = list(cursor)
except (ValueError, api_errors.BadRequest,
validation.ValidationFailed) as ex:
LOG.debug(ex)
headers = {'status': 400}
return api_utils.error_response(req, ex, headers)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
headers = {'status': 404}
return api_utils.error_response(req, ex, headers)
if messages:
# Found some messages, so prepare the response
kwargs['marker'] = next(results)
messages = [api_utils.format_message(message)
for message in messages]
headers = {'status': 200}
body = {'messages': messages}
return response.Response(req, body, headers)
@api_utils.raises_conn_error
def message_get(self, req):
"""Gets a message from a queue
:param req: Request instance ready to be sent.
:type req: `api.common.Request`
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
message_id = req._body.get('message_id')
LOG.debug(u'Message get - message: %(message)s, '
u'queue: %(queue)s, project: %(project)s',
{'message': message_id,
'queue': queue_name,
'project': project_id})
try:
message = self._message_controller.get(
queue_name,
message_id,
project=project_id)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
headers = {'status': 404}
return api_utils.error_response(req, ex, headers)
# Prepare response
message = api_utils.format_message(message)
headers = {'status': 200}
body = {'messages': message}
return response.Response(req, body, headers)
@api_utils.raises_conn_error
def message_get_many(self, req):
"""Gets a set of messages from a queue
:param req: Request instance ready to be sent.
:type req: `api.common.Request`
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
message_ids = list(req._body.get('message_ids'))
LOG.debug(u'Message get - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
try:
self._validate.message_listing(limit=len(message_ids))
messages = self._message_controller.bulk_get(
queue_name,
message_ids=message_ids,
project=project_id)
except validation.ValidationFailed as ex:
LOG.debug(ex)
headers = {'status': 400}
return api_utils.error_response(req, ex, headers)
# Prepare response
messages = list(messages)
messages = [api_utils.format_message(message)
for message in messages]
headers = {'status': 200}
body = {'messages': messages}
return response.Response(req, body, headers)
@api_utils.raises_conn_error
def message_post(self, req):
"""Post a set of messages to a queue
:param req: Request instance ready to be sent.
:type req: `api.common.Request`
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
LOG.debug(u'Messages post - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
messages = req._body.get('messages')
if messages is None:
ex = _(u'Invalid request.')
error = _(u'No messages were found in the request body.')
headers = {'status': 400}
return api_utils.error_response(req, ex, headers, error)
try:
# Place JSON size restriction before parsing
self._validate.message_length(len(str(messages)))
except validation.ValidationFailed as ex:
LOG.debug(ex)
headers = {'status': 400}
return api_utils.error_response(req, ex, headers)
_message_post_spec = (
('ttl', int, self._defaults.message_ttl),
('body', '*', None),
)
messages = api_utils.sanitize(messages,
_message_post_spec,
doctype=list)
try:
client_uuid = api_utils.get_client_uuid(req)
self._validate.message_posting(messages)
if not self._queue_controller.exists(queue_name, project_id):
self._validate.queue_identification(queue_name, project_id)
self._queue_controller.create(queue_name, project=project_id)
message_ids = self._message_controller.post(
queue_name,
messages=messages,
project=project_id,
client_uuid=client_uuid)
except (api_errors.BadRequest, validation.ValidationFailed) as ex:
LOG.debug(ex)
headers = {'status': 400}
return api_utils.error_response(req, ex, headers)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
headers = {'status': 404}
return api_utils.error_response(req, ex, headers)
except storage_errors.MessageConflict as ex:
LOG.exception(ex)
error = _(u'No messages could be enqueued.')
headers = {'status': 500}
return api_utils.error_response(req, ex, headers, error)
# Prepare the response
headers = {'status': 201}
body = {'message_ids': message_ids}
return response.Response(req, body, headers)
@api_utils.raises_conn_error
def message_delete(self, req):
"""Delete a message from a queue
:param req: Request instance ready to be sent.
:type req: `api.common.Request`
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
message_id = req._body.get('message_id')
LOG.debug(u'Messages item DELETE - message: %(message)s, '
u'queue: %(queue)s, project: %(project)s',
{'message': message_id,
'queue': queue_name,
'project': project_id})
claim_id = req._body.get('claim_id')
try:
self._message_controller.delete(
queue_name,
message_id=message_id,
project=project_id,
claim=claim_id)
except storage_errors.MessageNotClaimed as ex:
LOG.debug(ex)
error = _(u'A claim was specified, but the message '
u'is not currently claimed.')
headers = {'status': 400}
return api_utils.error_response(req, ex, headers, error)
except storage_errors.ClaimDoesNotExist as ex:
LOG.debug(ex)
error = _(u'The specified claim does not exist or '
u'has expired.')
headers = {'status': 400}
return api_utils.error_response(req, ex, headers, error)
except storage_errors.NotPermitted as ex:
LOG.debug(ex)
error = _(u'This message is claimed; it cannot be '
u'deleted without a valid claim ID.')
headers = {'status': 403}
return api_utils.error_response(req, ex, headers, error)
headers = {'status': 204}
body = {}
return response.Response(req, body, headers)
@api_utils.raises_conn_error
def message_delete_many(self, req):
"""Deletes a set of messages from a queue
:param req: Request instance ready to be sent.
:type req: `api.common.Request`
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
message_ids = req._body.get('message_ids')
pop_limit = req._body.get('pop_limit')
LOG.debug(u'Messages collection DELETE - queue: %(queue)s,'
u'project: %(project)s, messages: %(message_ids)s',
{'queue': queue_name, 'project': project_id,
'message_ids': message_ids})
try:
self._validate.message_deletion(message_ids, pop_limit)
except validation.ValidationFailed as ex:
LOG.debug(ex)
headers = {'status': 400}
return api_utils.error_response(req, ex, headers)
if message_ids:
return self._delete_messages_by_id(req, queue_name, message_ids,
project_id)
elif pop_limit:
return self._pop_messages(req, queue_name, project_id, pop_limit)
@api_utils.raises_conn_error
def _delete_messages_by_id(self, req, queue_name, ids, project_id):
self._message_controller.bulk_delete(queue_name, message_ids=ids,
project=project_id)
headers = {'status': 204}
body = {}
return response.Response(req, body, headers)
@api_utils.raises_conn_error
def _pop_messages(self, req, queue_name, project_id, pop_limit):
LOG.debug(u'Pop messages - queue: %(queue)s, project: %(project)s',
{'queue': queue_name, 'project': project_id})
messages = self._message_controller.pop(
queue_name,
project=project_id,
limit=pop_limit)
# Prepare response
if not messages:
messages = []
headers = {'status': 200}
body = {'messages': messages}
return response.Response(req, body, headers)

View File

@ -239,8 +239,9 @@ class RequestSchema(api.Api):
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'messages': {'type': 'array'},
},
'required': ['queue_name'],
'required': ['queue_name', 'messages'],
}
},
'required': ['action', 'headers', 'body']

View File

@ -25,6 +25,7 @@ from zaqar.openstack.common.cache import cache as oslo_cache
from zaqar.storage import pipeline
from zaqar.storage import pooling
from zaqar.storage import utils as storage_utils
from zaqar.transport import base
from zaqar.transport import validation
LOG = log.getLogger(__name__)
@ -102,7 +103,8 @@ class Bootstrap(object):
def api(self):
LOG.debug(u'Loading API handler')
validate = validation.Validator(self.conf)
return handler.Handler(self.storage, self.control, validate)
defaults = base.ResourceDefaults(self.conf)
return handler.Handler(self.storage, self.control, validate, defaults)
@decorators.lazy_property(write=False)
def storage(self):

View File

@ -0,0 +1,41 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class ExceptionBase(Exception):
msg_format = ''
def __init__(self, **kwargs):
msg = self.msg_format.format(**kwargs)
super(ExceptionBase, self).__init__(msg)
class BadRequest(ExceptionBase):
"""Raised when an invalid request is received."""
msg_format = u'Bad request. {error} {description}'
def __init__(self, error, description):
"""Initializes the error with contextual information.
:param description: Error description
"""
super(BadRequest, self).__init__(error=error, description=description)
class DocumentTypeNotSupported(ExceptionBase):
"""Raised when the content of a request has an unsupported format."""

View File

@ -13,15 +13,162 @@
# the License.
import functools
import uuid
from oslo_log import log as logging
from oslo_utils import strutils
import zaqar.common.api.errors as api_errors
import zaqar.common.api.response as response
from zaqar.i18n import _
LOG = logging.getLogger(__name__)
def sanitize(document, spec=None, doctype=dict):
"""Validates a document and drops undesired fields.
:param document: A dict to verify according to `spec`.
:param spec: (Default None) Iterable describing expected fields,
yielding tuples with the form of:
(field_name, value_type, default_value)
Note that value_type may either be a Python type, or the
special string '*' to accept any type. default_value is the
default to give the field if it is missing, or None to require
that the field be present.
If spec is None, the incoming documents will not be validated.
:param doctype: type of document to expect; must be either
JSONObject or JSONArray.
:raises: DocumentTypeNotSupported, TypeError
:returns: A sanitized, filtered version of the document. If the
document is a list of objects, each object will be filtered
and returned in a new list. If, on the other hand, the document
is expected to contain a single object, that object's fields will
be filtered and the resulting object will be returned.
"""
if doctype is dict:
if not isinstance(document, dict):
raise api_errors.DocumentTypeNotSupported()
return document if spec is None else filter(document, spec)
if doctype is list:
if not isinstance(document, list):
raise api_errors.DocumentTypeNotSupported()
if spec is None:
return document
return [filter(obj, spec) for obj in document]
raise TypeError(_(u'Doctype must be either a JSONObject or JSONArray'))
def filter(document, spec):
"""Validates and retrieves typed fields from a single document.
Sanitizes a dict-like document by checking it against a
list of field spec, and returning only those fields
specified.
:param document: dict-like object
:param spec: iterable describing expected fields, yielding
tuples with the form of: (field_name, value_type). Note that
value_type may either be a Python type, or the special
string '*' to accept any type.
:raises: BadRequest if any field is missing or not an
instance of the specified type
:returns: A filtered dict containing only the fields
listed in the spec
"""
filtered = {}
for name, value_type, default_value in spec:
filtered[name] = get_checked_field(document, name,
value_type, default_value)
return filtered
def get_checked_field(document, name, value_type, default_value):
"""Validates and retrieves a typed field from a document.
This function attempts to look up doc[name], and raises
appropriate errors if the field is missing or not an
instance of the given type.
:param document: dict-like object
:param name: field name
:param value_type: expected value type, or '*' to accept any type
:param default_value: Default value to use if the value is missing,
or None to make the value required.
:raises: BadRequest if the field is missing or not an
instance of value_type
:returns: value obtained from doc[name]
"""
try:
value = document[name]
except KeyError:
if default_value is not None:
value = default_value
else:
description = _(u'Missing "{name}" field.').format(name=name)
raise api_errors.BadRequest(description)
# PERF(kgriffs): We do our own little spec thing because it is way
# faster than jsonschema.
if value_type == '*' or isinstance(value, value_type):
return value
description = _(u'The value of the "{name}" field must be a {vtype}.')
description = description.format(name=name, vtype=value_type.__name__)
raise api_errors.BadRequest(description)
def get_client_uuid(req):
"""Read a required Client-ID from a request.
:param req: Request object
:raises: BadRequest if the Client-ID header is missing or
does not represent a valid UUID
:returns: A UUID object
"""
try:
return uuid.UUID(req._headers.get('Client-ID'))
except ValueError:
description = _(u'Malformed hexadecimal UUID.')
raise api_errors.BadRequest(_(u'Wrong UUID value'), description)
def get_headers(req):
kwargs = {}
if req._body.get('marker') is not None:
kwargs['marker'] = req._body.get('marker')
if req._body.get('limit') is not None:
kwargs['limit'] = int(req._body.get('limit'))
if req._body.get('detailed') is not None:
kwargs['detailed'] = strutils.bool_from_string(
req._body.get('detailed'))
if req._body.get('echo') is not None:
kwargs['echo'] = strutils.bool_from_string(req._body.get('echo'))
if req._body.get('include_claimed') is not None:
kwargs['include_claimed'] = strutils.bool_from_string(
req._body.get('include_claimed'))
return kwargs
def raises_conn_error(func):
"""Handles generic Exceptions
@ -36,8 +183,9 @@ def raises_conn_error(func):
except Exception as ex:
LOG.exception(ex)
error = _("Unexpected error.")
headers = {'status': 500}
req = kwargs.get('req')
return error_response(req, ex, error)
return error_response(req, ex, headers, error)
return wrapper
@ -46,3 +194,12 @@ def error_response(req, exception, headers=None, error=None):
body = {'exception': str(exception), 'error': error}
resp = response.Response(req, body, headers)
return resp
def format_message(message):
return {
'id': message['id'],
'ttl': message['ttl'],
'age': message['age'],
'body': message['body'],
}

View File

@ -37,7 +37,7 @@ class TestBase(testing.TestBase):
self.conf.register_opts(driver._WS_OPTIONS,
group=driver._WS_GROUP)
self.wsgi_cfg = self.conf[driver._WS_GROUP]
self.ws_cfg = self.conf[driver._WS_GROUP]
self.conf.unreliable = True
self.conf.admin_mode = True

View File

@ -0,0 +1,608 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import uuid
import ddt
import mock
from oslo_utils import timeutils
import six
from testtools import matchers
from zaqar.tests.unit.transport.websocket import base
from zaqar.tests.unit.transport.websocket import utils as test_utils
from zaqar.transport import validation
@ddt.ddt
class MessagesBaseTest(base.V1_1Base):
config_file = "websocket_mongodb.conf"
def setUp(self):
super(MessagesBaseTest, self).setUp()
self.protocol = self.transport.factory()
self.default_message_ttl = 3600
self.project_id = '7e55e1a7e'
self.headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': self.project_id
}
action = "queue_create"
body = {"queue_name": "kitkat"}
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 201)
with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:
msg_mock.side_effect = validator
self.protocol.onMessage(req, False)
def tearDown(self):
super(MessagesBaseTest, self).tearDown()
action = "queue_delete"
body = {"queue_name": "kitkat"}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 204)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def _test_post(self, sample_messages):
action = "message_post"
body = {"queue_name": "kitkat",
"messages": sample_messages}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock = send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 201)
self.msg_ids = resp['body']['message_ids']
self.assertEqual(len(self.msg_ids), len(sample_messages))
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
self.assertTrue(send_mock.called)
lookup = dict([(m['ttl'], m['body']) for m in sample_messages])
# Test GET on the message resource directly
# NOTE(cpp-cabrera): force the passing of time to age a message
timeutils_utcnow = 'zaqar.openstack.common.timeutils.utcnow'
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
with mock.patch(timeutils_utcnow) as mock_utcnow:
mock_utcnow.return_value = now
for msg_id in self.msg_ids:
headers = self.headers.copy()
headers['X-Project-ID'] = '777777'
# Wrong project ID
action = "message_get"
body = {"queue_name": "kitkat",
"message_id": msg_id}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 404)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Correct project ID
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
# Check message properties
message = resp['body']['messages']
self.assertEqual(message['body'], lookup[message['ttl']])
self.assertEqual(message['id'], msg_id)
# no negative age
# NOTE(cpp-cabrera): testtools lacks
# GreaterThanEqual on py26
self.assertThat(message['age'],
matchers.GreaterThan(-1))
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Test bulk GET
action = "message_get_many"
body = {"queue_name": "kitkat",
"message_ids": self.msg_ids}
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
expected_ttls = set(m['ttl'] for m in sample_messages)
actual_ttls = set(m['ttl'] for m in resp['body']['messages'])
self.assertFalse(expected_ttls - actual_ttls)
actual_ids = set(m['id'] for m in resp['body']['messages'])
self.assertFalse(set(self.msg_ids) - actual_ids)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def test_exceeded_payloads(self):
# Get a valid message id
resp = self._post_messages("kitkat")
msg_id = resp['body']['message_ids']
# Bulk GET restriction
get_msg_ids = msg_id * 21
action = "message_get_many"
body = {"queue_name": "kitkat",
"message_ids": get_msg_ids}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Listing restriction
body['limit'] = 21
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
# Bulk deletion restriction
del_msg_ids = msg_id * 22
action = "message_get_many"
body = {"queue_name": "kitkat",
"message_ids": del_msg_ids}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
def test_post_single(self):
sample_messages = [
{'body': {'key': 'value'}, 'ttl': 200},
]
self._test_post(sample_messages)
def test_post_multiple(self):
sample_messages = [
{'body': 239, 'ttl': 100},
{'body': {'key': 'value'}, 'ttl': 200},
{'body': [1, 3], 'ttl': 300},
]
self._test_post(sample_messages)
def test_post_optional_ttl(self):
messages = [{'body': 239},
{'body': {'key': 'value'}, 'ttl': 200}]
action = "message_post"
body = {"queue_name": "kitkat",
"messages": messages}
req = test_utils.create_request(action, body, self.headers)
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock = send_mock.start()
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 201)
self.msg_id = resp['body']['message_ids'][0]
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
action = "message_get"
body = {"queue_name": "kitkat",
"message_id": self.msg_id}
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
self.assertEqual(self.default_message_ttl,
resp['body']['messages']['ttl'])
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
self.assertTrue(send_mock.called)
def test_post_to_non_ascii_queue(self):
queue_name = u'non-ascii-n\u0153me'
if six.PY2:
queue_name = queue_name.encode('utf-8')
resp = self._post_messages(queue_name)
self.assertEqual(resp['headers']['status'], 400)
def test_post_with_long_queue_name(self):
# NOTE(kgriffs): This test verifies that routes with
# embedded queue name params go through the validation
# hook, regardless of the target resource.
queue_name = 'v' * validation.QUEUE_NAME_MAX_LEN
resp = self._post_messages(queue_name)
self.assertEqual(resp['headers']['status'], 201)
queue_name += 'v'
resp = self._post_messages(queue_name)
self.assertEqual(resp['headers']['status'], 400)
def test_post_to_missing_queue(self):
queue_name = 'nonexistent'
resp = self._post_messages(queue_name)
self.assertEqual(resp['headers']['status'], 201)
def test_get_from_missing_queue(self):
action = "message_list"
body = {"queue_name": "anothernonexistent"}
req = test_utils.create_request(action, body, self.headers)
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock = send_mock.start()
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
self.assertEqual(resp['body']['messages'], [])
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
@ddt.data('', '0xdeadbeef', '550893e0-2b6e-11e3-835a-5cf9dd72369')
def test_bad_client_id(self, text_id):
action = "message_post"
body = {
"queue_name": "kinder",
"messages": [{"ttl": 60,
"body": ""}]
}
headers = {
'Client-ID': text_id,
'X-Project-ID': self.project_id
}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock = send_mock.start()
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
action = "message_get"
body = {
"queue_name": "kinder",
"limit": 3,
"echo": True
}
req = test_utils.create_request(action, body, headers)
self.protocol.onMessage(req, False)
@ddt.data(None, '[', '[]', '{}', '.')
def test_post_bad_message(self, document):
action = "message_post"
body = {
"queue_name": "kinder",
"messages": document
}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock = send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
@ddt.data(-1, 59, 1209601)
def test_unacceptable_ttl(self, ttl):
action = "message_post"
body = {"queue_name": "kinder",
"messages": [{"ttl": ttl, "body": ""}]}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock = send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def test_exceeded_message_posting(self):
# Total (raw request) size
document = [{'body': "some body", 'ttl': 100}] * 8000
action = "message_post"
body = {
"queue_name": "kinder",
"messages": document
}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock = send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
@ddt.data('{"overflow": 9223372036854775808}',
'{"underflow": -9223372036854775809}')
def test_unsupported_json(self, document):
action = "message_post"
body = {
"queue_name": "fizz",
"messages": document
}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock = send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def test_delete(self):
resp = self._post_messages("tofi")
msg_id = resp['body']['message_ids'][0]
action = "message_get"
body = {"queue_name": "tofi",
"message_id": msg_id}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock = send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
self.assertTrue(send_mock.called)
# Delete queue
action = "message_delete"
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 204)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
self.assertTrue(send_mock.called)
# Get non existent queue
action = "message_get"
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 404)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
self.assertTrue(send_mock.called)
# Safe to delete non-existing ones
action = "message_delete"
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 204)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
self.assertTrue(send_mock.called)
def test_bulk_delete(self):
resp = self._post_messages("nerds", repeat=5)
msg_ids = resp['body']['message_ids']
action = "message_delete_many"
body = {"queue_name": "nerds",
"message_ids": msg_ids}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock = send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 204)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
self.assertTrue(send_mock.called)
action = "message_get"
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
self.assertTrue(send_mock.called)
# Safe to delete non-existing ones
action = "message_delete_many"
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 204)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
self.assertTrue(send_mock.called)
# Even after the queue is gone
action = "queue_delete"
body = {"queue_name": "nerds"}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
action = "message_delete_many"
body = {"queue_name": "nerds",
"message_ids": msg_ids}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
def test_get_nonexistent_message_404s(self):
action = "message_get"
body = {"queue_name": "notthere",
"message_id": "a"}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 404)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def test_get_multiple_invalid_messages_404s(self):
action = "message_get_many"
body = {"queue_name": "notnotthere",
"message_ids": ["a", "b", "c"]}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 404)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def test_delete_multiple_invalid_messages_204s(self):
action = "message_delete"
body = {"queue_name": "yetanothernotthere",
"message_ids": ["a", "b", "c"]}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 204)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def _post_messages(self, queue_name, repeat=1):
messages = [{'body': 239, 'ttl': 300}] * repeat
action = "message_post"
body = {"queue_name": queue_name,
"messages": messages}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock = send_mock.start()
req = test_utils.create_request(action, body, self.headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.response = resp
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
self.assertTrue(send_mock.called)
return self.response