Add support for pop
implemented in v1.1
This patch implements pop in the client library. Pop is a new operation that has been implemented in v1.1. It allows a client to remove N messages from the top of queue. Although pop is a query parameter in the server, it has been implemented as if it were an action. Partially-Implements blueprint: api-v1.1 Change-Id: I29b771a6167185033821139bd795311f1a25a5e1
This commit is contained in:
parent
bde63a2f66
commit
062507d09a
@ -221,3 +221,20 @@ class TestV1Core(base.TestBase):
|
||||
|
||||
req = request.Request()
|
||||
core.health(self.transport, req)
|
||||
|
||||
|
||||
class TestV1_1Core(TestV1Core):
|
||||
|
||||
def test_message_pop(self):
|
||||
with mock.patch.object(self.transport, 'send',
|
||||
autospec=True) as send_method:
|
||||
resp = response.Response(None, '{}')
|
||||
send_method.return_value = resp
|
||||
|
||||
req = request.Request()
|
||||
core.message_pop(self.transport, req,
|
||||
'test', count=5)
|
||||
|
||||
self.assertIn('queue_name', req.params)
|
||||
self.assertIn('pop', req.params)
|
||||
self.assertEqual(req.params['pop'], 5)
|
||||
|
@ -25,7 +25,7 @@ class QueuesV1QueueHttpUnitTest(queues.QueuesV1QueueUnitTest):
|
||||
version = 1
|
||||
|
||||
|
||||
class QueuesV1_1QueueHttpUnitTest(queues.QueuesV1QueueUnitTest):
|
||||
class QueuesV1_1QueueHttpUnitTest(queues.QueuesV1_1QueueUnitTest):
|
||||
|
||||
transport_cls = http.HttpTransport
|
||||
url = 'http://127.0.0.1:8888/v1.1'
|
||||
|
@ -224,3 +224,16 @@ class V1(api.Api):
|
||||
|
||||
class V1_1(V1):
|
||||
label = 'v1.1'
|
||||
|
||||
|
||||
V1_1.schema.update({
|
||||
'message_pop': {
|
||||
'ref': 'queues/{queue_name}/messages/',
|
||||
'method': 'DELETE',
|
||||
'required': ['queue_name', 'pop'],
|
||||
'properties': {
|
||||
'queue_name': {'type': 'string'},
|
||||
'pop': {'type': 'integer'},
|
||||
}
|
||||
},
|
||||
})
|
||||
|
@ -294,6 +294,30 @@ def message_delete_many(transport, request, queue_name,
|
||||
transport.send(request)
|
||||
|
||||
|
||||
def message_pop(transport, request, queue_name,
|
||||
count, callback=None):
|
||||
"""Pops out `count` messages from `queue_name`
|
||||
|
||||
:param transport: Transport instance to use
|
||||
:type transport: `transport.base.Transport`
|
||||
:param request: Request instance ready to be sent.
|
||||
:type request: `transport.request.Request`
|
||||
:param queue_name: Queue reference name.
|
||||
:type queue_name: `six.text_type`
|
||||
:param count: Number of messages to pop.
|
||||
:type count: int
|
||||
:param callback: Optional callable to use as callback.
|
||||
If specified, this request will be sent asynchronously.
|
||||
(IGNORED UNTIL ASYNC SUPPORT IS COMPLETE)
|
||||
:type callback: Callable object.
|
||||
"""
|
||||
|
||||
request.operation = 'message_delete_many'
|
||||
request.params['queue_name'] = queue_name
|
||||
request.params['pop'] = count
|
||||
transport.send(request)
|
||||
|
||||
|
||||
def claim_create(transport, request, queue_name, **kwargs):
|
||||
"""Creates a Claim `claim_id` on the queue `queue_name`
|
||||
|
||||
|
@ -173,6 +173,23 @@ class Queue(object):
|
||||
return core.message_delete_many(trans, req, self._name,
|
||||
set(messages))
|
||||
|
||||
def pop(self, count=1):
|
||||
"""Pop `count` messages from the server
|
||||
|
||||
:param count: Number of messages to pop.
|
||||
:type count: int
|
||||
|
||||
:returns: List of messages
|
||||
:rtype: `list`
|
||||
"""
|
||||
|
||||
req, trans = self.client._request_and_transport()
|
||||
msgs = core.message_pop(trans, req, self._name, count=count)
|
||||
return iterator._Iterator(self.client,
|
||||
msgs,
|
||||
'messages',
|
||||
message.create_object(self))
|
||||
|
||||
def claim(self, id=None, ttl=None, grace=None,
|
||||
limit=None):
|
||||
return claim_api.Claim(self, id=id, ttl=ttl, grace=grace, limit=limit)
|
||||
|
@ -351,3 +351,52 @@ class QueuesV1QueueFunctionalTest(base.QueuesTestBase):
|
||||
messages = queue.delete_messages(*msgs_id)
|
||||
self.assertTrue(isinstance(messages, iterator._Iterator))
|
||||
self.assertEqual(len(list(messages)), 1)
|
||||
|
||||
|
||||
class QueuesV1_1QueueUnitTest(QueuesV1QueueUnitTest):
|
||||
|
||||
def test_message_pop(self):
|
||||
returned = [{
|
||||
'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01',
|
||||
'ttl': 800,
|
||||
'age': 790,
|
||||
'body': {'event': 'ActivateAccount', 'mode': 'active'}
|
||||
}, {
|
||||
'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b02',
|
||||
'ttl': 800,
|
||||
'age': 790,
|
||||
'body': {'event': 'ActivateAccount', 'mode': 'active'}
|
||||
}]
|
||||
|
||||
with mock.patch.object(self.transport, 'send',
|
||||
autospec=True) as send_method:
|
||||
|
||||
resp = response.Response(None, json.dumps(returned))
|
||||
send_method.return_value = resp
|
||||
|
||||
msg = self.queue.pop(count=2)
|
||||
self.assertIsInstance(msg, iterator._Iterator)
|
||||
|
||||
# NOTE(flaper87): Nothing to assert here,
|
||||
# just checking our way down to the transport
|
||||
# doesn't crash.
|
||||
|
||||
|
||||
class QueuesV1_1QueueFunctionalTest(QueuesV1QueueFunctionalTest):
|
||||
|
||||
def test_message_pop(self):
|
||||
queue = self.client.queue("test_queue")
|
||||
queue._get_transport = mock.Mock(return_value=self.transport)
|
||||
|
||||
messages = [
|
||||
{'ttl': 60, 'body': 'Post It 1!'},
|
||||
{'ttl': 60, 'body': 'Post It 2!'},
|
||||
{'ttl': 60, 'body': 'Post It 2!'},
|
||||
]
|
||||
|
||||
messages = queue.pop(count=2)
|
||||
self.assertTrue(isinstance(messages, iterator._Iterator))
|
||||
self.assertEqual(len(list(messages)), 2)
|
||||
|
||||
remaining = queue.messages()
|
||||
self.assertEqual(len(list(remaining)), 1)
|
||||
|
Loading…
Reference in New Issue
Block a user