Merge "feat(storage): message bulk deletion"
This commit is contained in:
commit
b70b433a11
@ -251,6 +251,18 @@ class MessageBase(ControllerBase):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def bulk_delete(self, queue, message_ids, project=None):
|
||||
"""Base method for deleting multiple messages.
|
||||
|
||||
:param queue: Name of the queue to post
|
||||
message to.
|
||||
:param message_ids: A sequence of message IDs
|
||||
to be deleted.
|
||||
:param project: Project id
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class ClaimBase(ControllerBase):
|
||||
|
||||
|
@ -587,6 +587,20 @@ class MessageController(storage.MessageBase):
|
||||
except exceptions.QueueDoesNotExist:
|
||||
pass
|
||||
|
||||
@utils.raises_conn_error
|
||||
def bulk_delete(self, queue, message_ids, project=None):
|
||||
try:
|
||||
message_ids = [utils.to_oid(id) for id in message_ids]
|
||||
query = {
|
||||
'q': self._get_queue_id(queue, project),
|
||||
'_id': {'$in': message_ids},
|
||||
}
|
||||
|
||||
self._col.remove(query, w=0)
|
||||
|
||||
except exceptions.QueueDoesNotExist:
|
||||
pass
|
||||
|
||||
|
||||
def _basic_message(msg, now):
|
||||
oid = msg['_id']
|
||||
|
@ -211,3 +211,17 @@ class MessageController(base.MessageBase):
|
||||
|
||||
if not self.driver.affected:
|
||||
raise exceptions.ClaimNotPermitted(utils.msgid_encode(id), claim)
|
||||
|
||||
def bulk_delete(self, queue, message_ids, project):
|
||||
if project is None:
|
||||
project = ''
|
||||
|
||||
message_ids = ','.join(["'%s'" % utils.msgid_decode(id)
|
||||
for id in message_ids])
|
||||
|
||||
self.driver.run('''
|
||||
delete from Messages
|
||||
where id in (%s)
|
||||
and qid = (select id from Queues
|
||||
where project = ? and name = ?)
|
||||
''' % message_ids, project, queue)
|
||||
|
@ -189,7 +189,7 @@ class MessageControllerTest(ControllerBaseTest):
|
||||
load_messages(5, self.queue_name, echo=True, project=self.project,
|
||||
marker=next(interaction), client_uuid='my_uuid')
|
||||
|
||||
def test_get_multi_by_id(self):
|
||||
def test_multi_ids(self):
|
||||
messages_in = [{'ttl': 120, 'body': 0}, {'ttl': 240, 'body': 1}]
|
||||
ids = self.controller.post(self.queue_name, messages_in,
|
||||
project=self.project,
|
||||
@ -201,6 +201,14 @@ class MessageControllerTest(ControllerBaseTest):
|
||||
for idx, message in enumerate(messages_out):
|
||||
self.assertEquals(message['body'], idx)
|
||||
|
||||
self.controller.bulk_delete(self.queue_name, ids,
|
||||
project=self.project)
|
||||
|
||||
with testing.expect(StopIteration):
|
||||
result = self.controller.bulk_get(self.queue_name, ids,
|
||||
project=self.project)
|
||||
next(result)
|
||||
|
||||
def test_claim_effects(self):
|
||||
_insert_fixtures(self.controller, self.queue_name,
|
||||
project=self.project, client_uuid='my_uuid', num=12)
|
||||
|
@ -76,3 +76,6 @@ class MessageController(storage.MessageBase):
|
||||
|
||||
def delete(self, queue, message_id, project=None, claim=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def bulk_delete(self, queue, message_ids, project=None):
|
||||
raise NotImplementedError()
|
||||
|
Loading…
x
Reference in New Issue
Block a user