Get several messages by ID using a queue filter parameter
This modifies the way multiple messages are queried by ID such that the IDs are now listed as a query parameter for GET requestes to a queue resource. Change-Id: If1245bdb7a75614311f5beb61f8835faa4521de8 Implements: blueprint v1-api
This commit is contained in:
parent
d7866a28d0
commit
20ddcb8e4c
@ -161,7 +161,7 @@ class Message(base.MessageBase):
|
||||
records = self.driver.run(sql, project, queue)
|
||||
for id, content, ttl, age in records:
|
||||
yield {
|
||||
'id': id,
|
||||
'id': _msgid_encode(id),
|
||||
'ttl': ttl,
|
||||
'age': int(age),
|
||||
'body': content,
|
||||
|
@ -52,8 +52,9 @@ class MessagesBaseTest(base.TestBase):
|
||||
]
|
||||
"""
|
||||
|
||||
path = '/v1/480924/queues/fizbit/messages'
|
||||
env = testing.create_environ(path,
|
||||
queue_path = '/v1/480924/queues/fizbit'
|
||||
messages_path = queue_path + '/messages'
|
||||
env = testing.create_environ(messages_path,
|
||||
method='POST',
|
||||
body=doc,
|
||||
headers=self.headers)
|
||||
@ -62,31 +63,47 @@ class MessagesBaseTest(base.TestBase):
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
msg_ids = self._get_msg_ids(self.srmock.headers_dict)
|
||||
print msg_ids
|
||||
self.assertEquals(len(msg_ids), 3)
|
||||
|
||||
body = json.loads(body[0])
|
||||
expected_resources = [unicode(path + '/' + id) for id in msg_ids]
|
||||
expected_resources = [unicode(messages_path + '/' + id)
|
||||
for id in msg_ids]
|
||||
self.assertEquals(expected_resources, body['resources'])
|
||||
self.assertFalse(body['partial'])
|
||||
|
||||
real_msgs = json.loads(doc)
|
||||
sample_messages = json.loads(doc)
|
||||
|
||||
self.assertEquals(len(msg_ids), len(real_msgs))
|
||||
self.assertEquals(len(msg_ids), len(sample_messages))
|
||||
|
||||
lookup = dict([(m['ttl'], m['body']) for m in real_msgs])
|
||||
lookup = dict([(m['ttl'], m['body']) for m in sample_messages])
|
||||
|
||||
# Test GET on the message resource directly
|
||||
for msg_id in msg_ids:
|
||||
message_uri = path + '/' + msg_id
|
||||
message_uri = messages_path + '/' + msg_id
|
||||
env = testing.create_environ(message_uri, method='GET')
|
||||
|
||||
body = self.app(env, self.srmock)
|
||||
body = self.app(env, self.srmock)[0]
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||
self.assertEquals(self.srmock.headers_dict['Content-Location'],
|
||||
message_uri)
|
||||
|
||||
msg = json.loads(body[0])
|
||||
msg = json.loads(body)
|
||||
self.assertEquals(msg['href'], message_uri)
|
||||
self.assertEquals(msg['body'], lookup[msg['ttl']])
|
||||
|
||||
# Test bulk GET
|
||||
query_string = 'ids=' + ','.join(msg_ids)
|
||||
env = testing.create_environ(queue_path, method='GET',
|
||||
query_string=query_string)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||
body = self.app(env, self.srmock)[0]
|
||||
document = json.loads(body)
|
||||
expected_ttls = set(m['ttl'] for m in sample_messages)
|
||||
actual_ttls = set(m['ttl'] for m in document)
|
||||
self.assertFalse(expected_ttls - actual_ttls)
|
||||
|
||||
def test_post_to_mia_queue(self):
|
||||
self._post_messages('/v1/480924/queues/nonexistent/messages')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
@ -124,7 +141,10 @@ class MessagesBaseTest(base.TestBase):
|
||||
|
||||
def test_delete(self):
|
||||
self._post_messages('/v1/480924/queues/fizbit/messages')
|
||||
[msg_id] = self._get_msg_ids(self.srmock.headers_dict)
|
||||
|
||||
# NOTE(kgriffs): This implictly tests that posting a single
|
||||
# message returns a message resource, not a queue resource.
|
||||
msg_id = self._get_msg_id(self.srmock.headers_dict)
|
||||
|
||||
env = testing.create_environ('/v1/480924/queues/fizbit/messages/'
|
||||
+ msg_id, method='GET')
|
||||
@ -224,8 +244,11 @@ class MessagesBaseTest(base.TestBase):
|
||||
headers=self.headers)
|
||||
self.app(env, self.srmock)
|
||||
|
||||
def _get_msg_ids(self, headers_dict):
|
||||
return headers_dict['Location'].rsplit('/', 1)[-1].split(',')
|
||||
def _get_msg_id(self, headers):
|
||||
return headers['Location'].rsplit('/', 1)[-1]
|
||||
|
||||
def _get_msg_ids(self, headers):
|
||||
return headers['Location'].rsplit('=', 1)[-1].split(',')
|
||||
|
||||
|
||||
class MessagesSQLiteTests(MessagesBaseTest):
|
||||
|
@ -44,12 +44,15 @@ class Driver(transport.DriverBase):
|
||||
|
||||
self.app = falcon.API()
|
||||
|
||||
# Queues Endpoints
|
||||
queue_controller = self.storage.queue_controller
|
||||
message_controller = self.storage.message_controller
|
||||
claim_controller = self.storage.claim_controller
|
||||
|
||||
# Queues Endpoints
|
||||
queue_collection = queues.CollectionResource(queue_controller)
|
||||
self.app.add_route('/v1/{project_id}/queues', queue_collection)
|
||||
|
||||
queue_item = queues.ItemResource(queue_controller)
|
||||
queue_item = queues.ItemResource(queue_controller, message_controller)
|
||||
self.app.add_route('/v1/{project_id}/queues/{queue_name}', queue_item)
|
||||
|
||||
stats_endpoint = stats.Resource(queue_controller)
|
||||
@ -57,7 +60,6 @@ class Driver(transport.DriverBase):
|
||||
'/stats', stats_endpoint)
|
||||
|
||||
# Messages Endpoints
|
||||
message_controller = self.storage.message_controller
|
||||
msg_collection = messages.CollectionResource(message_controller)
|
||||
self.app.add_route('/v1/{project_id}/queues/{queue_name}'
|
||||
'/messages', msg_collection)
|
||||
@ -67,7 +69,6 @@ class Driver(transport.DriverBase):
|
||||
'/messages/{message_id}', msg_item)
|
||||
|
||||
# Claims Endpoints
|
||||
claim_controller = self.storage.claim_controller
|
||||
claim_collection = claims.CollectionResource(claim_controller)
|
||||
self.app.add_route('/v1/{project_id}/queues/{queue_name}'
|
||||
'/claims', claim_collection)
|
||||
|
@ -86,8 +86,13 @@ class CollectionResource(object):
|
||||
|
||||
# Prepare the response
|
||||
resp.status = falcon.HTTP_201
|
||||
resource = ','.join(message_ids)
|
||||
resp.location = req.path + '/' + resource
|
||||
|
||||
if len(message_ids) == 1:
|
||||
base_path = req.path[0:req.path.rfind('/')]
|
||||
resp.location = base_path + '/' + message_ids[0]
|
||||
else:
|
||||
ids_value = ','.join(message_ids)
|
||||
resp.location = req.path + '?ids=' + ids_value
|
||||
|
||||
hrefs = [req.path + '/' + id for id in message_ids]
|
||||
body = {'resources': hrefs, 'partial': partial}
|
||||
@ -168,6 +173,7 @@ class ItemResource(object):
|
||||
|
||||
def on_get(self, req, resp, project_id, queue_name, message_id):
|
||||
try:
|
||||
print message_id
|
||||
message = self.message_controller.get(
|
||||
queue_name,
|
||||
message_id,
|
||||
|
@ -27,10 +27,56 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class ItemResource(object):
|
||||
|
||||
__slots__ = ('queue_controller')
|
||||
__slots__ = ('queue_controller', 'message_controller')
|
||||
|
||||
def __init__(self, queue_controller):
|
||||
def __init__(self, queue_controller, message_controller):
|
||||
self.queue_controller = queue_controller
|
||||
self.message_controller = message_controller
|
||||
|
||||
#-----------------------------------------------------------------------
|
||||
# Helpers
|
||||
#-----------------------------------------------------------------------
|
||||
|
||||
def _get_metadata(self, project_id, queue_name):
|
||||
"""Returns non-serialized queue metadata."""
|
||||
try:
|
||||
return self.queue_controller.get(queue_name, project=project_id)
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Queue metdata could not be retrieved.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
def _get_messages_by_id(self, base_path, project_id, queue_name, ids):
|
||||
"""Returns one or more messages from the queue by ID."""
|
||||
try:
|
||||
messages = self.message_controller.get(
|
||||
queue_name,
|
||||
ids,
|
||||
project=project_id)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Message could not be retrieved.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Prepare response
|
||||
messages = list(messages)
|
||||
if not messages:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
base_path += '/'
|
||||
for each_message in messages:
|
||||
print each_message
|
||||
each_message['href'] = base_path + each_message['id']
|
||||
del each_message['id']
|
||||
|
||||
return messages
|
||||
|
||||
#-----------------------------------------------------------------------
|
||||
# Interface
|
||||
#-----------------------------------------------------------------------
|
||||
|
||||
def on_put(self, req, resp, project_id, queue_name):
|
||||
# TODO(kgriffs): Migrate this check to input validator middleware
|
||||
@ -70,14 +116,13 @@ class ItemResource(object):
|
||||
resp.location = req.path
|
||||
|
||||
def on_get(self, req, resp, project_id, queue_name):
|
||||
try:
|
||||
doc = self.queue_controller.get(queue_name, project=project_id)
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Queue metdata could not be retrieved.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
message_ids = req.get_param_as_list('ids')
|
||||
if message_ids is None:
|
||||
doc = self._get_metadata(project_id, queue_name)
|
||||
else:
|
||||
base_path = req.path + '/messages'
|
||||
doc = self._get_messages_by_id(base_path, project_id, queue_name,
|
||||
message_ids)
|
||||
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(doc)
|
||||
|
Loading…
x
Reference in New Issue
Block a user