diff --git a/marconi/storage/base.py b/marconi/storage/base.py index 44b1c6021..f27443cd8 100644 --- a/marconi/storage/base.py +++ b/marconi/storage/base.py @@ -251,6 +251,18 @@ class MessageBase(ControllerBase): """ raise NotImplementedError + @abc.abstractmethod + def bulk_delete(self, queue, message_ids, project=None): + """Base method for deleting multiple messages. + + :param queue: Name of the queue to post + message to. + :param message_ids: A sequence of message IDs + to be deleted. + :param project: Project id + """ + raise NotImplementedError + class ClaimBase(ControllerBase): diff --git a/marconi/storage/mongodb/messages.py b/marconi/storage/mongodb/messages.py index 2103d3732..598b080eb 100644 --- a/marconi/storage/mongodb/messages.py +++ b/marconi/storage/mongodb/messages.py @@ -587,6 +587,20 @@ class MessageController(storage.MessageBase): except exceptions.QueueDoesNotExist: pass + @utils.raises_conn_error + def bulk_delete(self, queue, message_ids, project=None): + try: + message_ids = [utils.to_oid(id) for id in message_ids] + query = { + 'q': self._get_queue_id(queue, project), + '_id': {'$in': message_ids}, + } + + self._col.remove(query, w=0) + + except exceptions.QueueDoesNotExist: + pass + def _basic_message(msg, now): oid = msg['_id'] diff --git a/marconi/storage/sqlite/messages.py b/marconi/storage/sqlite/messages.py index b7be109d6..e5a1767f1 100644 --- a/marconi/storage/sqlite/messages.py +++ b/marconi/storage/sqlite/messages.py @@ -211,3 +211,17 @@ class MessageController(base.MessageBase): if not self.driver.affected: raise exceptions.ClaimNotPermitted(utils.msgid_encode(id), claim) + + def bulk_delete(self, queue, message_ids, project): + if project is None: + project = '' + + message_ids = ','.join(["'%s'" % utils.msgid_decode(id) + for id in message_ids]) + + self.driver.run(''' + delete from Messages + where id in (%s) + and qid = (select id from Queues + where project = ? and name = ?) + ''' % message_ids, project, queue) diff --git a/marconi/tests/storage/base.py b/marconi/tests/storage/base.py index a0e911830..fa1ce090a 100644 --- a/marconi/tests/storage/base.py +++ b/marconi/tests/storage/base.py @@ -189,7 +189,7 @@ class MessageControllerTest(ControllerBaseTest): load_messages(5, self.queue_name, echo=True, project=self.project, marker=next(interaction), client_uuid='my_uuid') - def test_get_multi_by_id(self): + def test_multi_ids(self): messages_in = [{'ttl': 120, 'body': 0}, {'ttl': 240, 'body': 1}] ids = self.controller.post(self.queue_name, messages_in, project=self.project, @@ -201,6 +201,14 @@ class MessageControllerTest(ControllerBaseTest): for idx, message in enumerate(messages_out): self.assertEquals(message['body'], idx) + self.controller.bulk_delete(self.queue_name, ids, + project=self.project) + + with testing.expect(StopIteration): + result = self.controller.bulk_get(self.queue_name, ids, + project=self.project) + next(result) + def test_claim_effects(self): _insert_fixtures(self.controller, self.queue_name, project=self.project, client_uuid='my_uuid', num=12) diff --git a/marconi/tests/util/faulty_storage.py b/marconi/tests/util/faulty_storage.py index 78efd3f59..8e6bccfdc 100644 --- a/marconi/tests/util/faulty_storage.py +++ b/marconi/tests/util/faulty_storage.py @@ -76,3 +76,6 @@ class MessageController(storage.MessageBase): def delete(self, queue, message_id, project=None, claim=None): raise NotImplementedError() + + def bulk_delete(self, queue, message_ids, project=None): + raise NotImplementedError()