Add support for bulk message delete in v1
Bulk message delete was missing in the client. This patch implements that feature for both v1 and v1.1. Change-Id: Ief73fe5f67ae71d396c6122048c26020ea9592bb
This commit is contained in:
parent
81fcab985c
commit
bde63a2f66
@ -177,6 +177,21 @@ class TestV1Core(base.TestBase):
|
||||
core.message_delete(self.transport, req,
|
||||
'test', 'message_id')
|
||||
|
||||
def test_message_delete_many(self):
|
||||
with mock.patch.object(self.transport, 'send',
|
||||
autospec=True) as send_method:
|
||||
resp = response.Response(None, None)
|
||||
send_method.return_value = resp
|
||||
|
||||
ids = ['a', 'b']
|
||||
req = request.Request()
|
||||
core.message_delete_many(self.transport, req,
|
||||
'test', ids=ids)
|
||||
|
||||
self.assertIn('queue_name', req.params)
|
||||
self.assertIn('ids', req.params)
|
||||
self.assertEqual(ids, req.params['ids'])
|
||||
|
||||
# ADMIN API
|
||||
def test_shard_create(self):
|
||||
with mock.patch.object(self.transport, 'send',
|
||||
|
@ -144,6 +144,16 @@ class V1(api.Api):
|
||||
}
|
||||
},
|
||||
|
||||
'message_delete_many': {
|
||||
'ref': 'queues/{queue_name}/messages/',
|
||||
'method': 'DELETE',
|
||||
'required': ['queue_name', 'ids'],
|
||||
'properties': {
|
||||
'queue_name': {'type': 'string'},
|
||||
'ids': {'type': 'string'},
|
||||
}
|
||||
},
|
||||
|
||||
'shard_create': {
|
||||
'ref': 'shards/{shard_name}',
|
||||
'method': 'PUT',
|
||||
|
@ -270,6 +270,30 @@ def message_delete(transport, request, queue_name, message_id,
|
||||
transport.send(request)
|
||||
|
||||
|
||||
def message_delete_many(transport, request, queue_name,
|
||||
ids, callback=None):
|
||||
"""Deletes `ids` messages from `queue_name`
|
||||
|
||||
:param transport: Transport instance to use
|
||||
:type transport: `transport.base.Transport`
|
||||
:param request: Request instance ready to be sent.
|
||||
:type request: `transport.request.Request`
|
||||
:param queue_name: Queue reference name.
|
||||
:type queue_name: `six.text_type`
|
||||
:param ids: Ids of the messages to delete
|
||||
:type ids: List of `six.text_type`
|
||||
:param callback: Optional callable to use as callback.
|
||||
If specified, this request will be sent asynchronously.
|
||||
(IGNORED UNTIL ASYNC SUPPORT IS COMPLETE)
|
||||
:type callback: Callable object.
|
||||
"""
|
||||
|
||||
request.operation = 'message_delete_many'
|
||||
request.params['queue_name'] = queue_name
|
||||
request.params['ids'] = ids
|
||||
transport.send(request)
|
||||
|
||||
|
||||
def claim_create(transport, request, queue_name, **kwargs):
|
||||
"""Creates a Claim `claim_id` on the queue `queue_name`
|
||||
|
||||
|
@ -162,6 +162,17 @@ class Queue(object):
|
||||
'messages',
|
||||
message.create_object(self))
|
||||
|
||||
def delete_messages(self, *messages):
|
||||
"""Deletes a set of messages from the server
|
||||
|
||||
:param messages: List of messages' ids to delete.
|
||||
:type messages: *args of `six.string_type`
|
||||
"""
|
||||
|
||||
req, trans = self.client._request_and_transport()
|
||||
return core.message_delete_many(trans, req, self._name,
|
||||
set(messages))
|
||||
|
||||
def claim(self, id=None, ttl=None, grace=None,
|
||||
limit=None):
|
||||
return claim_api.Claim(self, id=id, ttl=ttl, grace=grace, limit=limit)
|
||||
|
@ -198,6 +198,21 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase):
|
||||
# just checking our way down to the transport
|
||||
# doesn't crash.
|
||||
|
||||
def test_message_delete_many(self):
|
||||
with mock.patch.object(self.transport, 'send',
|
||||
autospec=True) as send_method:
|
||||
|
||||
resp = response.Response(None, None)
|
||||
send_method.return_value = resp
|
||||
|
||||
rst = self.queue.delete_messages('50b68a50d6f5b8c8a7c62b01',
|
||||
'50b68a50d6f5b8c8a7c62b02')
|
||||
self.assertEqual(rst, None)
|
||||
|
||||
# NOTE(flaper87): Nothing to assert here,
|
||||
# just checking our way down to the transport
|
||||
# doesn't crash.
|
||||
|
||||
|
||||
class QueuesV1QueueFunctionalTest(base.QueuesTestBase):
|
||||
|
||||
@ -321,3 +336,18 @@ class QueuesV1QueueFunctionalTest(base.QueuesTestBase):
|
||||
messages = queue.messages(*msgs_id)
|
||||
self.assertTrue(isinstance(messages, iterator._Iterator))
|
||||
self.assertEqual(len(list(messages)), 3)
|
||||
|
||||
def test_message_delete_many_functional(self):
|
||||
queue = self.client.queue("test_queue")
|
||||
queue._get_transport = mock.Mock(return_value=self.transport)
|
||||
|
||||
messages = [
|
||||
{'ttl': 60, 'body': 'Post It 1!'},
|
||||
{'ttl': 60, 'body': 'Post It 2!'},
|
||||
]
|
||||
|
||||
res = queue.post(messages)['resources']
|
||||
msgs_id = [ref.split('/')[-1] for ref in res]
|
||||
messages = queue.delete_messages(*msgs_id)
|
||||
self.assertTrue(isinstance(messages, iterator._Iterator))
|
||||
self.assertEqual(len(list(messages)), 1)
|
||||
|
Loading…
x
Reference in New Issue
Block a user