diff --git a/marconi/storage/mongodb/controllers.py b/marconi/storage/mongodb/controllers.py index 18df5c57d..b5aab1c33 100644 --- a/marconi/storage/mongodb/controllers.py +++ b/marconi/storage/mongodb/controllers.py @@ -239,7 +239,10 @@ class MessageController(storage.MessageBase): return utils.HookedCursor(msgs, denormalizer) def unclaim(self, claim_id): - cid = utils.to_oid(claim_id) + try: + cid = utils.to_oid(claim_id) + except ValueError: + return self._col.update({"c.id": cid}, {"$set": {"c": {"id": None, "e": 0}}}, upsert=False, multi=True) @@ -247,8 +250,12 @@ class MessageController(storage.MessageBase): def list(self, queue, tenant=None, marker=None, limit=10, echo=False, client_uuid=None): - qid = self._get_queue_id(queue, tenant) - messages = self.active(qid, marker, echo, client_uuid) + try: + qid = self._get_queue_id(queue, tenant) + messages = self.active(qid, marker, echo, client_uuid) + except ValueError: + return + messages = messages.limit(limit).sort("_id") marker_id = {} @@ -272,7 +279,11 @@ class MessageController(storage.MessageBase): def get(self, queue, message_id, tenant=None): # Base query, always check expire time - mid = utils.to_oid(message_id) + try: + mid = utils.to_oid(message_id) + except ValueError: + raise exceptions.MessageDoesNotExist(message_id, queue, tenant) + now = timeutils.utcnow() query = { @@ -284,7 +295,7 @@ class MessageController(storage.MessageBase): message = self._col.find_one(query) if message is None: - raise exceptions.MessageDoesNotExist(mid, queue, tenant) + raise exceptions.MessageDoesNotExist(message_id, queue, tenant) oid = message["_id"] age = now - utils.oid_utc(oid) @@ -320,9 +331,14 @@ class MessageController(storage.MessageBase): def delete(self, queue, message_id, tenant=None, claim=None): try: + try: + mid = utils.to_oid(message_id) + except ValueError: + return + query = { "q": self._get_queue_id(queue, tenant), - "_id": utils.to_oid(message_id) + "_id": mid } if claim: @@ -333,7 +349,11 @@ class MessageController(storage.MessageBase): if message is None: return - cid = utils.to_oid(claim) + try: + cid = utils.to_oid(claim) + except ValueError: + raise exceptions.ClaimNotPermitted(message_id, claim) + if not ("c" in message and message["c"]["id"] == cid and message["c"]["e"] > now): @@ -385,7 +405,12 @@ class ClaimController(storage.ClaimBase): # Base query, always check expire time now = timeutils.utcnow() - cid = utils.to_oid(claim_id) + + try: + cid = utils.to_oid(claim_id) + except ValueError: + raise exceptions.ClaimDoesNotExist() + age = now - utils.oid_utc(cid) def messages(msg_iter): @@ -492,7 +517,11 @@ class ClaimController(storage.ClaimBase): return (str(oid), messages) def update(self, queue, claim_id, metadata, tenant=None): - cid = utils.to_oid(claim_id) + try: + cid = utils.to_oid(claim_id) + except ValueError: + raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) + now = timeutils.utcnow() ttl = int(metadata.get("ttl", 60)) ttl_delta = datetime.timedelta(seconds=ttl) @@ -510,7 +539,7 @@ class ClaimController(storage.ClaimBase): try: claimed.next() except StopIteration: - raise exceptions.ClaimDoesNotExist(cid, queue, tenant) + raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) meta = { "id": cid, diff --git a/marconi/tests/storage/base.py b/marconi/tests/storage/base.py index 64eb4872f..4962d08e5 100644 --- a/marconi/tests/storage/base.py +++ b/marconi/tests/storage/base.py @@ -15,6 +15,7 @@ from marconi import storage +from marconi.storage import exceptions from marconi.tests import util as testing @@ -244,6 +245,32 @@ class MessageControllerTest(ControllerBaseTest): tenant=self.tenant) self.assertEquals(countof['messages']['free'], 0) + def test_illformed_id(self): + # any ill-formed IDs should be regarded as non-existing ones. + + self.queue_controller.upsert('unused', {}, '480924') + self.controller.delete('unused', 'illformed', '480924') + + msgs = list(self.controller.list('unused', '480924', + marker='illformed')) + + self.assertEquals(len(msgs), 0) + + with testing.expected(exceptions.DoesNotExist): + self.controller.get('unused', 'illformed', '480924') + + def test_illformed_claim(self): + self.queue_controller.upsert('unused', {}, '480924') + [msgid] = self.controller.post('unused', + [{'body': {}, 'ttl': 10}], + tenant='480924', + client_uuid='unused') + + with testing.expected(exceptions.NotPermitted): + self.controller.delete('unused', msgid, + tenant='480924', + claim='illformed') + class ClaimControllerTest(ControllerBaseTest): """ @@ -337,6 +364,16 @@ class ClaimControllerTest(ControllerBaseTest): self.controller.update(self.queue_name, claim_id, meta, tenant=self.tenant) + def test_illformed_id(self): + # any ill-formed IDs should be regarded as non-existing ones. + + self.queue_controller.upsert('unused', {}, '480924') + self.controller.delete('unused', 'illformed', '480924') + + with testing.expected(exceptions.DoesNotExist): + self.controller.update('unused', 'illformed', + {'ttl': 40}, '480924') + def _insert_fixtures(controller, queue_name, tenant=None, client_uuid=None, num=4): diff --git a/marconi/tests/storage/test_impl_sqlite.py b/marconi/tests/storage/test_impl_sqlite.py index fbf8791e1..23f0fa5fb 100644 --- a/marconi/tests/storage/test_impl_sqlite.py +++ b/marconi/tests/storage/test_impl_sqlite.py @@ -13,11 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from marconi.storage import exceptions from marconi.storage import sqlite from marconi.storage.sqlite import controllers from marconi.tests.storage import base -from marconi.tests import util as testing class SQliteQueueTests(base.QueueControllerTest): @@ -29,56 +27,7 @@ class SQliteMessageTests(base.MessageControllerTest): driver_class = sqlite.Driver controller_class = controllers.Message - def setUp(self): - super(SQliteMessageTests, self).setUp() - self.queue_controller.upsert('unused', {}, '480924') - - def tearDown(self): - self.queue_controller.delete('unused', '480924') - super(SQliteMessageTests, self).tearDown() - - def test_illformed_id(self): - # any ill-formed IDs should be regarded as non-existing ones. - - self.controller.delete('unused', 'illformed', '480924') - - msgs = list(self.controller.list('unused', '480924', - marker='illformed')) - - self.assertEquals(len(msgs), 0) - - with testing.expected(exceptions.DoesNotExist): - self.controller.get('unused', 'illformed', '480924') - - def test_illformed_claim(self): - [msgid] = self.controller.post('unused', - [{'body': {}, 'ttl': 10}], - tenant='480924', - client_uuid='unused') - - with testing.expected(exceptions.NotPermitted): - self.controller.delete('unused', msgid, - tenant='480924', - claim='illformed') - class SQliteClaimTests(base.ClaimControllerTest): driver_class = sqlite.Driver controller_class = controllers.Claim - - def setUp(self): - super(SQliteClaimTests, self).setUp() - self.queue_controller.upsert('unused', {}, '480924') - - def tearDown(self): - self.queue_controller.delete('unused', '480924') - super(SQliteClaimTests, self).tearDown() - - def test_illformed_id(self): - # any ill-formed IDs should be regarded as non-existing ones. - - self.controller.delete('unused', 'illformed', '480924') - - with testing.expected(exceptions.DoesNotExist): - self.controller.update('unused', 'illformed', - {'ttl': 40}, '480924')