Merge "Fix route for bulk-get message by ID"
This commit is contained in:
commit
cecb5c42e2
@ -13,7 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from marconi import storage
|
||||
from marconi.storage import exceptions
|
||||
from marconi.tests import util as testing
|
||||
@ -389,7 +388,6 @@ class ClaimControllerTest(ControllerBaseTest):
|
||||
project=self.project)
|
||||
|
||||
for message in messages:
|
||||
self.assertEquals(message['age'], 0)
|
||||
self.assertEquals(message['ttl'], 777)
|
||||
|
||||
def test_extend_lifetime_with_grace_1(self):
|
||||
@ -403,7 +401,6 @@ class ClaimControllerTest(ControllerBaseTest):
|
||||
project=self.project)
|
||||
|
||||
for message in messages:
|
||||
self.assertEquals(message['age'], 0)
|
||||
self.assertEquals(message['ttl'], 800)
|
||||
|
||||
def test_extend_lifetime_with_grace_2(self):
|
||||
@ -419,7 +416,6 @@ class ClaimControllerTest(ControllerBaseTest):
|
||||
project=self.project)
|
||||
|
||||
for message in messages:
|
||||
self.assertEquals(message['age'], 0)
|
||||
self.assertEquals(message['ttl'], 122)
|
||||
|
||||
def test_do_not_extend_lifetime(self):
|
||||
|
@ -40,31 +40,24 @@ class MessagesBaseTest(base.TestBase):
|
||||
self.simulate_delete(self.queue_path, self.project_id)
|
||||
super(MessagesBaseTest, self).tearDown()
|
||||
|
||||
def test_post(self):
|
||||
doc = """
|
||||
[
|
||||
{"body": 239, "ttl": 10},
|
||||
{"body": {"key": "value"}, "ttl": 20},
|
||||
{"body": [1, 3], "ttl": 30}
|
||||
]
|
||||
"""
|
||||
def _test_post(self, sample_messages):
|
||||
sample_doc = json.dumps(sample_messages)
|
||||
|
||||
messages_path = self.queue_path + '/messages'
|
||||
result = self.simulate_post(messages_path, self.project_id,
|
||||
body=doc, headers=self.headers)
|
||||
body=sample_doc, headers=self.headers)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
result_doc = json.loads(result[0])
|
||||
|
||||
msg_ids = self._get_msg_ids(self.srmock.headers_dict)
|
||||
self.assertEquals(len(msg_ids), 3)
|
||||
self.assertEquals(len(msg_ids), len(sample_messages))
|
||||
|
||||
expected_resources = [unicode(messages_path + '/' + id)
|
||||
for id in msg_ids]
|
||||
self.assertEquals(expected_resources, result_doc['resources'])
|
||||
self.assertFalse(result_doc['partial'])
|
||||
|
||||
sample_messages = json.loads(doc)
|
||||
self.assertEquals(len(msg_ids), len(sample_messages))
|
||||
|
||||
lookup = dict([(m['ttl'], m['body']) for m in sample_messages])
|
||||
@ -89,7 +82,7 @@ class MessagesBaseTest(base.TestBase):
|
||||
|
||||
# Test bulk GET
|
||||
query_string = 'ids=' + ','.join(msg_ids)
|
||||
result = self.simulate_get(self.queue_path, self.project_id,
|
||||
result = self.simulate_get(messages_path, self.project_id,
|
||||
query_string=query_string)
|
||||
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||
@ -98,6 +91,22 @@ class MessagesBaseTest(base.TestBase):
|
||||
actual_ttls = set(m['ttl'] for m in result_doc)
|
||||
self.assertFalse(expected_ttls - actual_ttls)
|
||||
|
||||
def test_post_single(self):
|
||||
sample_messages = [
|
||||
{'body': {'key': 'value'}, 'ttl': 20},
|
||||
]
|
||||
|
||||
self._test_post(sample_messages)
|
||||
|
||||
def test_post_multiple(self):
|
||||
sample_messages = [
|
||||
{'body': 239, 'ttl': 10},
|
||||
{'body': {'key': 'value'}, 'ttl': 20},
|
||||
{'body': [1, 3], 'ttl': 30},
|
||||
]
|
||||
|
||||
self._test_post(sample_messages)
|
||||
|
||||
def test_post_to_mia_queue(self):
|
||||
self._post_messages('/v1/queues/nonexistent/messages')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
@ -114,8 +123,6 @@ class MessagesBaseTest(base.TestBase):
|
||||
path = self.queue_path + '/messages'
|
||||
self._post_messages(path)
|
||||
|
||||
# NOTE(kgriffs): This implictly tests that posting a single
|
||||
# message returns a message resource, not a queue resource.
|
||||
msg_id = self._get_msg_id(self.srmock.headers_dict)
|
||||
|
||||
self.simulate_get(path + '/' + msg_id, self.project_id)
|
||||
@ -145,6 +152,7 @@ class MessagesBaseTest(base.TestBase):
|
||||
query_string=query_string,
|
||||
headers=self.headers)
|
||||
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||
self.assertEquals(self.srmock.headers_dict['Content-Location'],
|
||||
path + '?' + query_string)
|
||||
|
||||
@ -205,7 +213,7 @@ class MessagesBaseTest(base.TestBase):
|
||||
headers=self.headers)
|
||||
|
||||
def _get_msg_id(self, headers):
|
||||
return headers['Location'].rsplit('/', 1)[-1]
|
||||
return self._get_msg_ids(headers)[0]
|
||||
|
||||
def _get_msg_ids(self, headers):
|
||||
return headers['Location'].rsplit('=', 1)[-1].split(',')
|
||||
|
@ -35,6 +35,98 @@ class CollectionResource(object):
|
||||
def __init__(self, message_controller):
|
||||
self.message_controller = message_controller
|
||||
|
||||
#-----------------------------------------------------------------------
|
||||
# Helpers
|
||||
#-----------------------------------------------------------------------
|
||||
|
||||
def _get_by_id(self, base_path, project_id, queue_name, ids):
|
||||
"""Returns one or more messages from the queue by ID."""
|
||||
try:
|
||||
messages = self.message_controller.get(
|
||||
queue_name,
|
||||
ids,
|
||||
project=project_id)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Message could not be retrieved.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Prepare response
|
||||
messages = list(messages)
|
||||
if not messages:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
base_path += '/'
|
||||
for each_message in messages:
|
||||
each_message['href'] = base_path + each_message['id']
|
||||
del each_message['id']
|
||||
|
||||
return messages
|
||||
|
||||
def _get(self, req, project_id, queue_name):
|
||||
uuid = req.get_header('Client-ID', required=True)
|
||||
|
||||
# TODO(kgriffs): Optimize
|
||||
kwargs = helpers.purge({
|
||||
'marker': req.get_param('marker'),
|
||||
'limit': req.get_param_as_int('limit'),
|
||||
'echo': req.get_param_as_bool('echo'),
|
||||
})
|
||||
|
||||
try:
|
||||
results = self.message_controller.list(
|
||||
queue_name,
|
||||
project=project_id,
|
||||
client_uuid=uuid,
|
||||
**kwargs)
|
||||
|
||||
# Buffer messages
|
||||
cursor = results.next()
|
||||
messages = list(cursor)
|
||||
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
except storage_exceptions.MalformedMarker:
|
||||
title = _('Invalid query string parameter')
|
||||
description = _('The value for the query string '
|
||||
'parameter "marker" could not be '
|
||||
'parsed. We recommend using the '
|
||||
'"next" URI from a previous '
|
||||
'request directly, rather than '
|
||||
'constructing the URI manually. ')
|
||||
|
||||
raise falcon.HTTPBadRequest(title, description)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Messages could not be listed.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
if not messages:
|
||||
return None
|
||||
|
||||
# Found some messages, so prepare the response
|
||||
kwargs['marker'] = results.next()
|
||||
for each_message in messages:
|
||||
each_message['href'] = req.path + '/' + each_message['id']
|
||||
del each_message['id']
|
||||
|
||||
return {
|
||||
'messages': messages,
|
||||
'links': [
|
||||
{
|
||||
'rel': 'next',
|
||||
'href': req.path + falcon.to_query_str(kwargs)
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
#-----------------------------------------------------------------------
|
||||
# Interface
|
||||
#-----------------------------------------------------------------------
|
||||
|
||||
def on_post(self, req, resp, project_id, queue_name):
|
||||
uuid = req.get_header('Client-ID', required=True)
|
||||
|
||||
@ -87,81 +179,28 @@ class CollectionResource(object):
|
||||
# Prepare the response
|
||||
resp.status = falcon.HTTP_201
|
||||
|
||||
if len(message_ids) == 1:
|
||||
base_path = req.path[0:req.path.rfind('/')]
|
||||
resp.location = base_path + '/' + message_ids[0]
|
||||
else:
|
||||
ids_value = ','.join(message_ids)
|
||||
resp.location = req.path + '?ids=' + ids_value
|
||||
ids_value = ','.join(message_ids)
|
||||
resp.location = req.path + '?ids=' + ids_value
|
||||
|
||||
hrefs = [req.path + '/' + id for id in message_ids]
|
||||
body = {'resources': hrefs, 'partial': partial}
|
||||
resp.body = helpers.to_json(body)
|
||||
|
||||
def on_get(self, req, resp, project_id, queue_name):
|
||||
uuid = req.get_header('Client-ID', required=True)
|
||||
|
||||
# TODO(kgriffs): Optimize
|
||||
kwargs = helpers.purge({
|
||||
'marker': req.get_param('marker'),
|
||||
'limit': req.get_param_as_int('limit'),
|
||||
'echo': req.get_param_as_bool('echo'),
|
||||
})
|
||||
|
||||
try:
|
||||
results = self.message_controller.list(
|
||||
queue_name,
|
||||
project=project_id,
|
||||
client_uuid=uuid,
|
||||
**kwargs)
|
||||
|
||||
# Buffer messages
|
||||
cursor = results.next()
|
||||
messages = list(cursor)
|
||||
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
except storage_exceptions.MalformedMarker:
|
||||
title = _('Invalid query string parameter')
|
||||
description = _('The value for the query string '
|
||||
'parameter "marker" could not be '
|
||||
'parsed. We recommend using the '
|
||||
'"next" URI from a previous '
|
||||
'request directly, rather than '
|
||||
'constructing the URI manually. ')
|
||||
|
||||
raise falcon.HTTPBadRequest(title, description)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Messages could not be listed.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Check for no content
|
||||
if len(messages) == 0:
|
||||
resp.status = falcon.HTTP_204
|
||||
return
|
||||
|
||||
# Found some messages, so prepare the response
|
||||
kwargs['marker'] = results.next()
|
||||
for each_message in messages:
|
||||
each_message['href'] = req.path + '/' + each_message['id']
|
||||
del each_message['id']
|
||||
|
||||
response_body = {
|
||||
'messages': messages,
|
||||
'links': [
|
||||
{
|
||||
'rel': 'next',
|
||||
'href': req.path + falcon.to_query_str(kwargs)
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(response_body)
|
||||
resp.status = falcon.HTTP_200
|
||||
|
||||
ids = req.get_param_as_list('ids')
|
||||
if ids is None:
|
||||
response = self._get(req, project_id, queue_name)
|
||||
|
||||
if response is None:
|
||||
resp.status = falcon.HTTP_204
|
||||
return
|
||||
else:
|
||||
base_path = req.path + '/messages'
|
||||
response = self._get_by_id(base_path, project_id, queue_name, ids)
|
||||
|
||||
resp.body = helpers.to_json(response)
|
||||
|
||||
|
||||
class ItemResource(object):
|
||||
|
@ -48,31 +48,6 @@ class ItemResource(object):
|
||||
description = _('Queue metdata could not be retrieved.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
def _get_messages_by_id(self, base_path, project_id, queue_name, ids):
|
||||
"""Returns one or more messages from the queue by ID."""
|
||||
try:
|
||||
messages = self.message_controller.get(
|
||||
queue_name,
|
||||
ids,
|
||||
project=project_id)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Message could not be retrieved.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Prepare response
|
||||
messages = list(messages)
|
||||
if not messages:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
base_path += '/'
|
||||
for each_message in messages:
|
||||
each_message['href'] = base_path + each_message['id']
|
||||
del each_message['id']
|
||||
|
||||
return messages
|
||||
|
||||
#-----------------------------------------------------------------------
|
||||
# Interface
|
||||
#-----------------------------------------------------------------------
|
||||
|
Loading…
x
Reference in New Issue
Block a user