Merge "Implement Lazy Create Queue in v1.1 API"
This commit is contained in:
commit
f4e24e85a5
@ -50,7 +50,8 @@ def public_endpoints(driver):
|
||||
('/queues/{queue_name}/messages',
|
||||
messages.CollectionResource(driver._wsgi_conf,
|
||||
driver._validate,
|
||||
message_controller)),
|
||||
message_controller,
|
||||
queue_controller)),
|
||||
('/queues/{queue_name}/messages/{message_id}',
|
||||
messages.ItemResource(message_controller)),
|
||||
|
||||
|
@ -31,12 +31,15 @@ MESSAGE_POST_SPEC = (('ttl', int), ('body', '*'))
|
||||
|
||||
class CollectionResource(object):
|
||||
|
||||
__slots__ = ('message_controller', '_wsgi_conf', '_validate')
|
||||
__slots__ = ('message_controller', '_wsgi_conf', '_validate',
|
||||
'queue_controller')
|
||||
|
||||
def __init__(self, wsgi_conf, validate, message_controller):
|
||||
def __init__(self, wsgi_conf, validate, message_controller,
|
||||
queue_controller):
|
||||
self._wsgi_conf = wsgi_conf
|
||||
self._validate = validate
|
||||
self.message_controller = message_controller
|
||||
self.queue_controller = queue_controller
|
||||
|
||||
#-----------------------------------------------------------------------
|
||||
# Helpers
|
||||
@ -109,13 +112,14 @@ class CollectionResource(object):
|
||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||
|
||||
if not messages:
|
||||
return None
|
||||
messages = []
|
||||
|
||||
# Found some messages, so prepare the response
|
||||
kwargs['marker'] = next(results)
|
||||
for each_message in messages:
|
||||
each_message['href'] = req.path + '/' + each_message['id']
|
||||
del each_message['id']
|
||||
else:
|
||||
# Found some messages, so prepare the response
|
||||
kwargs['marker'] = next(results)
|
||||
for each_message in messages:
|
||||
each_message['href'] = req.path + '/' + each_message['id']
|
||||
del each_message['id']
|
||||
|
||||
return {
|
||||
'messages': messages,
|
||||
@ -158,6 +162,9 @@ class CollectionResource(object):
|
||||
try:
|
||||
self._validate.message_posting(messages)
|
||||
|
||||
if not self.queue_controller.exists(queue_name, project_id):
|
||||
self.queue_controller.create(queue_name, project_id)
|
||||
|
||||
message_ids = self.message_controller.post(
|
||||
queue_name,
|
||||
messages=messages,
|
||||
@ -205,16 +212,21 @@ class CollectionResource(object):
|
||||
resp.content_location = req.relative_uri
|
||||
|
||||
ids = req.get_param_as_list('ids')
|
||||
|
||||
if ids is None:
|
||||
response = self._get(req, project_id, queue_name)
|
||||
|
||||
else:
|
||||
response = self._get_by_id(req.path, project_id, queue_name, ids)
|
||||
|
||||
if response is None:
|
||||
resp.status = falcon.HTTP_204
|
||||
return
|
||||
# NOTE(TheSriram): Trying to get a message by id, should
|
||||
# return the message if its present, otherwise a 404 since
|
||||
# the message might have been deleted.
|
||||
resp.status = falcon.HTTP_404
|
||||
|
||||
resp.body = utils.to_json(response)
|
||||
else:
|
||||
resp.body = utils.to_json(response)
|
||||
# status defaults to 200
|
||||
|
||||
def on_delete(self, req, resp, project_id, queue_name):
|
||||
|
@ -54,6 +54,17 @@ class Resource(object):
|
||||
resp.body = utils.to_json(resp_dict)
|
||||
# status defaults to 200
|
||||
|
||||
except storage_errors.QueueDoesNotExist as ex:
|
||||
resp_dict = {
|
||||
'messages': {
|
||||
'claimed': 0,
|
||||
'free': 0,
|
||||
'total': 0
|
||||
}
|
||||
}
|
||||
resp.content_location = req.path
|
||||
resp.body = utils.to_json(resp_dict)
|
||||
|
||||
except storage_errors.DoesNotExist as ex:
|
||||
LOG.debug(ex)
|
||||
raise falcon.HTTPNotFound()
|
||||
|
@ -14,6 +14,7 @@
|
||||
|
||||
from falcon import testing as ftest
|
||||
|
||||
from marconi.openstack.common import jsonutils
|
||||
from marconi.queues import bootstrap
|
||||
from marconi.queues.transport.wsgi import driver
|
||||
from marconi import tests as testing
|
||||
@ -123,10 +124,13 @@ class V1BaseFaulty(TestBaseFaulty):
|
||||
|
||||
|
||||
class V1_1Base(TestBase):
|
||||
"""Base class for V1 API Tests.
|
||||
"""Base class for V1.1 API Tests.
|
||||
Should contain methods specific to V1.1 of the API
|
||||
"""
|
||||
|
||||
def _empty_message_list(self, body):
|
||||
self.assertEqual(jsonutils.loads(body[0])['messages'], [])
|
||||
|
||||
def simulate_request(self, path, **kwargs):
|
||||
"""Simulate a request.
|
||||
|
||||
|
@ -121,13 +121,15 @@ class ClaimsBaseTest(base.V1_1Base):
|
||||
body = self.simulate_get(self.messages_path,
|
||||
headers=self.headers,
|
||||
query_string="echo=true")
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
self._empty_message_list(body)
|
||||
|
||||
# Listing messages, by default, won't include claimed, won't echo
|
||||
body = self.simulate_get(self.messages_path,
|
||||
headers=self.headers,
|
||||
query_string="echo=false")
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
self._empty_message_list(body)
|
||||
|
||||
# List messages, include_claimed, but don't echo
|
||||
body = self.simulate_get(self.messages_path,
|
||||
@ -135,7 +137,8 @@ class ClaimsBaseTest(base.V1_1Base):
|
||||
'&echo=false',
|
||||
headers=self.headers)
|
||||
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
self._empty_message_list(body)
|
||||
|
||||
# List messages with a different client-id and echo=false.
|
||||
# Should return some messages
|
||||
|
@ -78,7 +78,8 @@ class TestDefaultLimits(base.V1_1Base):
|
||||
headers=self.headers,
|
||||
query_string='echo=false')
|
||||
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
self._empty_message_list(result)
|
||||
|
||||
self._prepare_messages(storage.DEFAULT_MESSAGES_PER_PAGE + 1)
|
||||
result = self.simulate_get(self.messages_path,
|
||||
|
@ -31,6 +31,7 @@ from marconi.tests.queues.transport.wsgi import base
|
||||
|
||||
@ddt.ddt
|
||||
class MessagesBaseTest(base.V1_1Base):
|
||||
|
||||
def setUp(self):
|
||||
super(MessagesBaseTest, self).setUp()
|
||||
|
||||
@ -194,7 +195,7 @@ class MessagesBaseTest(base.V1_1Base):
|
||||
|
||||
game_title = 'v' * validation.QUEUE_NAME_MAX_LEN
|
||||
self._post_messages(queues_path + game_title + '/messages')
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
game_title += 'v'
|
||||
self._post_messages(queues_path + game_title + '/messages')
|
||||
@ -202,12 +203,14 @@ class MessagesBaseTest(base.V1_1Base):
|
||||
|
||||
def test_post_to_missing_queue(self):
|
||||
self._post_messages(self.url_prefix + '/queues/nonexistent/messages')
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
def test_get_from_missing_queue(self):
|
||||
self.simulate_get(self.url_prefix + '/queues/nonexistent/messages',
|
||||
headers=self.headers)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
body = self.simulate_get(self.url_prefix +
|
||||
'/queues/nonexistent/messages',
|
||||
headers=self.headers)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
self._empty_message_list(body)
|
||||
|
||||
@ddt.data('', '0xdeadbeef', '550893e0-2b6e-11e3-835a-5cf9dd72369')
|
||||
def test_bad_client_id(self, text_id):
|
||||
@ -235,7 +238,7 @@ class MessagesBaseTest(base.V1_1Base):
|
||||
def test_unacceptable_ttl(self, ttl):
|
||||
self.simulate_post(self.queue_path + '/messages',
|
||||
body=jsonutils.dumps([{'ttl': ttl,
|
||||
'body': None}]),
|
||||
'body': None}]),
|
||||
headers=self.headers)
|
||||
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
||||
@ -294,7 +297,7 @@ class MessagesBaseTest(base.V1_1Base):
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
self.simulate_get(target, query_string=params, headers=self.headers)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
# Safe to delete non-existing ones
|
||||
self.simulate_delete(target, query_string=params, headers=self.headers)
|
||||
@ -321,7 +324,7 @@ class MessagesBaseTest(base.V1_1Base):
|
||||
path + '?' + query_string)
|
||||
|
||||
cnt = 0
|
||||
while self.srmock.status == falcon.HTTP_200:
|
||||
while jsonutils.loads(body[0])['messages'] != []:
|
||||
contents = jsonutils.loads(body[0])
|
||||
[target, params] = contents['links'][0]['href'].split('?')
|
||||
|
||||
@ -335,7 +338,8 @@ class MessagesBaseTest(base.V1_1Base):
|
||||
cnt += 1
|
||||
|
||||
self.assertEqual(cnt, 4)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
self._empty_message_list(body)
|
||||
|
||||
# Stats
|
||||
body = self.simulate_get(self.queue_path + '/stats',
|
||||
@ -354,20 +358,23 @@ class MessagesBaseTest(base.V1_1Base):
|
||||
matchers.MatchesRegex(expected_pattern))
|
||||
|
||||
# NOTE(kgriffs): Try to get messages for a missing queue
|
||||
self.simulate_get(self.url_prefix + '/queues/nonexistent/messages',
|
||||
headers=self.headers)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
body = self.simulate_get(self.url_prefix +
|
||||
'/queues/nonexistent/messages',
|
||||
headers=self.headers)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
self._empty_message_list(body)
|
||||
|
||||
def test_list_with_bad_marker(self):
|
||||
path = self.queue_path + '/messages'
|
||||
self._post_messages(path, repeat=5)
|
||||
|
||||
query_string = 'limit=3&echo=true&marker=sfhlsfdjh2048'
|
||||
self.simulate_get(path,
|
||||
query_string=query_string,
|
||||
headers=self.headers)
|
||||
body = self.simulate_get(path,
|
||||
query_string=query_string,
|
||||
headers=self.headers)
|
||||
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
self._empty_message_list(body)
|
||||
|
||||
def test_no_uuid(self):
|
||||
headers = {
|
||||
@ -412,11 +419,11 @@ class MessagesBaseTest(base.V1_1Base):
|
||||
self.simulate_get(path, headers=self.headers)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
def test_get_multiple_invalid_messages_204s(self):
|
||||
def test_get_multiple_invalid_messages_404s(self):
|
||||
path = self.url_prefix + '/queues/notthere/messages'
|
||||
self.simulate_get(path, query_string='ids=a,b,c',
|
||||
headers=self.headers)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
def test_delete_multiple_invalid_messages_204s(self):
|
||||
path = self.url_prefix + '/queues/notthere/messages'
|
||||
|
@ -67,9 +67,9 @@ class QueueLifecycleBaseTest(base.V1_1Base):
|
||||
gumshoe_queue_path_metadata = self.gumshoe_queue_path + '/metadata'
|
||||
gumshoe_queue_path_stats = self.gumshoe_queue_path + '/stats'
|
||||
|
||||
# Stats not found - queue not created yet
|
||||
# Stats are empty - queue not created yet
|
||||
self.simulate_get(gumshoe_queue_path_stats, headers=headers)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
# Metadata not found - queue not created yet
|
||||
self.simulate_get(gumshoe_queue_path_metadata, headers=headers)
|
||||
@ -114,7 +114,7 @@ class QueueLifecycleBaseTest(base.V1_1Base):
|
||||
|
||||
# Get non-existent stats
|
||||
self.simulate_get(gumshoe_queue_path_stats, headers=headers)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
# Get non-existent metadata
|
||||
self.simulate_get(gumshoe_queue_path_metadata, headers=headers)
|
||||
|
Loading…
x
Reference in New Issue
Block a user