zaqar/tests/functional/wsgi/v1_1/test_queues.py
Flavio Percoco 5bf16193e7 Make Client-ID a required header
It was decided to make Client-ID a required header. This patch adds a
new `before hook` that enforces this new rule.

Partially-Implements blueprint: api-v1.1-header-changes

Change-Id: I7b20d56812a0dff4f8ed3e6da75b62ef7c0f307e
2014-09-02 20:55:21 +02:00

377 lines
12 KiB
Python

# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import uuid
import ddt
import six
from zaqar.tests.functional import base # noqa
from zaqar.tests.functional import helpers
class NamedBinaryStr(six.binary_type):
"""Wrapper for six.binary_type to facilitate overriding __name__."""
class NamedUnicodeStr(six.text_type):
"""Unicode string look-alike to facilitate overriding __name__."""
def __init__(self, value):
self.value = value
def __str__(self):
return self.value
def encode(self, enc):
return self.value.encode(enc)
def __format__(self, formatstr):
"""Workaround for ddt bug.
DDT will always call __format__ even when __name__ exists,
which blows up for Unicode strings under Py2.
"""
return ''
class NamedDict(dict):
"""Wrapper for dict to facilitate overriding __name__."""
def annotated(test_name, test_input):
if isinstance(test_input, dict):
annotated_input = NamedDict(test_input)
elif isinstance(test_input, six.text_type):
annotated_input = NamedUnicodeStr(test_input)
else:
annotated_input = NamedBinaryStr(test_input)
setattr(annotated_input, '__name__', test_name)
return annotated_input
@ddt.ddt
class TestInsertQueue(base.V1_1FunctionalTestBase):
"""Tests for Insert queue."""
server_class = base.ZaqarServer
def setUp(self):
super(TestInsertQueue, self).setUp()
self.base_url = '{0}/{1}'.format(self.cfg.zaqar.url,
"v1.1")
self.header = helpers.create_zaqar_headers(self.cfg)
self.headers_response_empty = set(['location'])
self.client.set_base_url(self.base_url)
self.client.headers = self.header
@ddt.data('qtestqueue', 'TESTqueue', 'hyphen-name', '_undersore',
annotated('test_insert_queue_long_name', 'i' * 64))
def test_insert_queue(self, queue_name):
"""Create Queue."""
self.url = self.base_url + '/queues/' + queue_name
self.addCleanup(self.client.delete, self.url)
result = self.client.put(self.url)
self.assertEqual(result.status_code, 201)
response_headers = set(result.headers.keys())
self.assertIsSubset(self.headers_response_empty, response_headers)
test_insert_queue.tags = ['positive', 'smoke']
@ddt.data(annotated('test_insert_queue_non_ascii_name',
u'\u6c49\u5b57\u6f22\u5b57'),
'@$@^qw',
annotated('test_insert_queue_invalid_name_length', 'i' * 65))
def test_insert_queue_invalid_name(self, queue_name):
"""Create Queue."""
if six.PY2 and isinstance(queue_name, NamedUnicodeStr):
queue_name = queue_name.encode('utf-8')
self.url = self.base_url + '/queues/' + queue_name
self.addCleanup(self.client.delete, self.url)
result = self.client.put(self.url)
self.assertEqual(result.status_code, 400)
test_insert_queue_invalid_name.tags = ['negative']
def test_insert_queue_invalid_authtoken(self):
"""Insert Queue with invalid authtoken."""
# NOTE(flaper87): Currently, tearDown
# depends on this attribute. Needs to
# be fixed.
self.url = self.base_url + '/queues/invalidauthtoken'
self.addCleanup(self.client.delete, self.url)
if not self.cfg.auth.auth_on:
self.skipTest("Auth is not on!")
header = copy.copy(self.header)
header['X-Auth-Token'] = 'invalid'
result = self.client.put(self.url, headers=header)
self.assertEqual(result.status_code, 401)
test_insert_queue_invalid_authtoken.tags = ['negative']
def test_insert_queue_header_plaintext(self):
"""Insert Queue with 'Accept': 'plain/text'."""
path = '/queues/plaintextheader'
self.addCleanup(self.client.delete, path)
header = {"Accept": 'plain/text'}
result = self.client.put(path, headers=header)
self.assertEqual(result.status_code, 406)
test_insert_queue_header_plaintext.tags = ['negative']
def test_insert_queue_header_asterisk(self):
"""Insert Queue with 'Accept': '*/*'."""
path = '/queues/asteriskinheader'
headers = {'Accept': '*/*',
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': '518b51ea133c4facadae42c328d6b77b'}
self.addCleanup(self.client.delete, url=path, headers=headers)
result = self.client.put(path, headers=headers)
self.assertEqual(result.status_code, 201)
test_insert_queue_header_asterisk.tags = ['positive']
def test_insert_queue_with_metadata(self):
"""Insert queue with a non-empty request body."""
self.url = self.base_url + '/queues/hasmetadata'
doc = {"queue": "Has Metadata"}
self.addCleanup(self.client.delete, self.url)
result = self.client.put(self.url, data=doc)
self.assertEqual(result.status_code, 201)
self.url = self.base_url + '/queues/hasmetadata'
result = self.client.get(self.url)
self.assertEqual(result.status_code, 200)
self.assertEqual(result.json(), {"queue": "Has Metadata"})
test_insert_queue_with_metadata.tags = ['negative']
def tearDown(self):
super(TestInsertQueue, self).tearDown()
@ddt.ddt
class TestQueueMisc(base.V1_1FunctionalTestBase):
server_class = base.ZaqarServer
def setUp(self):
super(TestQueueMisc, self).setUp()
self.base_url = self.cfg.zaqar.url
self.client.set_base_url(self.base_url)
self.queue_url = self.base_url + ('/{0}/queues/{1}'
.format("v1.1", uuid.uuid1()))
def test_list_queues(self):
"""List Queues."""
self.client.put(self.queue_url)
self.addCleanup(self.client.delete, self.queue_url)
result = self.client.get('/{0}/queues'
.format("v1.1"))
self.assertEqual(result.status_code, 200)
self.assertSchema(result.json(), 'queue_list')
test_list_queues.tags = ['smoke', 'positive']
def test_list_queues_detailed(self):
"""List Queues with detailed = True."""
self.client.put(self.queue_url)
self.addCleanup(self.client.delete, self.queue_url)
params = {'detailed': True}
result = self.client.get('/{0}/queues'
.format("v1.1"),
params=params)
self.assertEqual(result.status_code, 200)
self.assertSchema(result.json(), 'queue_list')
response_keys = result.json()['queues'][0].keys()
self.assertIn('metadata', response_keys)
test_list_queues_detailed.tags = ['smoke', 'positive']
@ddt.data(0, -1, 1001)
def test_list_queue_invalid_limit(self, limit):
"""List Queues with a limit value that is not allowed."""
params = {'limit': limit}
result = self.client.get('/{0}/queues'
.format("v1.1"),
params=params)
self.assertEqual(result.status_code, 400)
test_list_queue_invalid_limit.tags = ['negative']
def test_check_queue_exists(self):
"""Checks if queue exists."""
self.client.put(self.queue_url)
self.addCleanup(self.client.delete, self.queue_url)
result = self.client.head(self.queue_url)
self.assertEqual(result.status_code, 405)
test_check_queue_exists.tags = ['negative']
def test_get_queue_malformed_marker(self):
"""List queues with invalid marker."""
path = '/{0}/queues?marker=zzz'.format("v1.1")
result = self.client.get(path)
self.assertEqual(result.status_code, 204)
test_get_queue_malformed_marker.tags = ['negative']
def test_get_stats_empty_queue(self):
"""Get queue stats on an empty queue."""
result = self.client.put(self.queue_url)
self.addCleanup(self.client.delete, self.queue_url)
self.assertEqual(result.status_code, 201)
stats_url = self.queue_url + '/stats'
# Get stats on an empty queue
result = self.client.get(stats_url)
self.assertEqual(result.status_code, 200)
expected_response = {'messages':
{'claimed': 0, 'total': 0, 'free': 0}}
self.assertEqual(result.json(), expected_response)
test_get_stats_empty_queue.tags = ['positive']
@ddt.data(0, 1)
def test_get_queue_stats_claimed(self, claimed):
"""Get stats on a queue."""
result = self.client.put(self.queue_url)
self.addCleanup(self.client.delete, self.queue_url)
self.assertEqual(result.status_code, 201)
# Post Messages to the test queue
doc = helpers.create_message_body_v1_1(
messagecount=self.limits.max_messages_per_claim_or_pop)
message_url = self.queue_url + '/messages'
result = self.client.post(message_url, data=doc)
self.assertEqual(result.status_code, 201)
if claimed > 0:
claim_url = self.queue_url + '/claims?limit=' + str(claimed)
doc = {'ttl': 300, 'grace': 300}
result = self.client.post(claim_url, data=doc)
self.assertEqual(result.status_code, 201)
# Get stats on the queue.
stats_url = self.queue_url + '/stats'
result = self.client.get(stats_url)
self.assertEqual(result.status_code, 200)
self.assertQueueStats(result.json(), claimed)
test_get_queue_stats_claimed.tags = ['positive']
def test_ping_queue(self):
pass
def tearDown(self):
super(TestQueueMisc, self).tearDown()
class TestQueueNonExisting(base.V1_1FunctionalTestBase):
"""Test Actions on non existing queue."""
server_class = base.ZaqarServer
def setUp(self):
super(TestQueueNonExisting, self).setUp()
if self.cfg.version != "v1":
self.skipTest("Not Supported")
self.base_url = '{0}/{1}'.format(self.cfg.zaqar.url,
"v1.1")
self.queue_url = (self.base_url +
'/queues/0a5b1b85-4263-11e3-b034-28cfe91478b9')
self.client.set_base_url(self.queue_url)
self.header = helpers.create_zaqar_headers(self.cfg)
self.headers_response_empty = set(['location'])
self.header = helpers.create_zaqar_headers(self.cfg)
def test_get_stats(self):
"""Get stats on non existing Queue."""
result = self.client.get('/stats')
self.assertEqual(result.status_code, 200)
self.assertEqual(result.json(), [])
def test_get_metadata(self):
"""Get metadata on non existing Queue."""
result = self.client.get('/')
self.assertEqual(result.status_code, 200)
self.assertEqual(result.json(), [])
def test_get_messages(self):
"""Get messages on non existing Queue."""
result = self.client.get('/messages')
self.assertEqual(result.status_code, 200)
self.assertEqual(result.json(), [])
def test_post_messages(self):
"""Post messages to a non existing Queue."""
doc = [{"ttl": 200, "body": {"Home": ""}}]
result = self.client.post('/messages', data=doc)
self.assertEqual(result.status_code, 201)
# check existence of queue
result = self.client.get()
self.assertEqual(result.status_code, 200)
self.assertNotEqual(result.json(), [])
def test_claim_messages(self):
"""Claim messages from a non existing Queue."""
doc = {"ttl": 200, "grace": 300}
result = self.client.post('/claims', data=doc)
self.assertEqual(result.status_code, 200)
self.assertEqual(result.json(), [])
def test_delete_queue(self):
"""Delete non existing Queue."""
result = self.client.delete()
self.assertEqual(result.status_code, 204)