diff --git a/marconi/storage/mongodb/claims.py b/marconi/storage/mongodb/claims.py index ec7e98f1e..4771ac805 100644 --- a/marconi/storage/mongodb/claims.py +++ b/marconi/storage/mongodb/claims.py @@ -66,11 +66,9 @@ class ClaimController(storage.ClaimBase): # Base query, always check expire time now = timeutils.utcnow() - - try: - cid = utils.to_oid(claim_id) - except ValueError: - raise exceptions.ClaimDoesNotExist() + cid = utils.to_oid(claim_id) + if cid is None: + raise exceptions.ClaimDoesNotExist(queue, project, claim_id) def messages(msg_iter): msg = next(msg_iter) @@ -188,9 +186,8 @@ class ClaimController(storage.ClaimBase): @utils.raises_conn_error def update(self, queue, claim_id, metadata, project=None): - try: - cid = utils.to_oid(claim_id) - except ValueError: + cid = utils.to_oid(claim_id) + if cid is None: raise exceptions.ClaimDoesNotExist(claim_id, queue, project) now = timeutils.utcnow() diff --git a/marconi/storage/mongodb/messages.py b/marconi/storage/mongodb/messages.py index 9ca03dfb3..ef85145dc 100644 --- a/marconi/storage/mongodb/messages.py +++ b/marconi/storage/mongodb/messages.py @@ -368,9 +368,11 @@ class MessageController(storage.MessageBase): return utils.HookedCursor(msgs, denormalizer) def unclaim(self, queue_name, claim_id, project=None): - try: - cid = utils.to_oid(claim_id) - except ValueError: + cid = utils.to_oid(claim_id) + + # NOTE(cpp-cabrera): early abort - avoid a DB query if we're handling + # an invalid ID + if cid is None: return # NOTE(cpp-cabrera): unclaim by setting the claim ID to None @@ -435,7 +437,15 @@ class MessageController(storage.MessageBase): @utils.raises_conn_error def get(self, queue_name, message_id, project=None): + """Gets a single message by ID. + + :raises: exceptions.MessageDoesNotExist + """ mid = utils.to_oid(message_id) + if mid is None: + raise exceptions.MessageDoesNotExist(message_id, queue_name, + project) + now = timeutils.utcnow() query = { @@ -455,7 +465,10 @@ class MessageController(storage.MessageBase): @utils.raises_conn_error def bulk_get(self, queue_name, message_ids, project=None): - message_ids = [utils.to_oid(id) for id in message_ids] + message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid] + if not message_ids: + return () + now = timeutils.utcnow() # Base query, always check expire time @@ -597,50 +610,52 @@ class MessageController(storage.MessageBase): @utils.raises_conn_error def delete(self, queue_name, message_id, project=None, claim=None): - try: - mid = utils.to_oid(message_id) + # NOTE(cpp-cabrera): return early - this is an invalid message + # id so we won't be able to find it any way + mid = utils.to_oid(message_id) + if mid is None: + return - query = { - 'q': queue_name, - 'p': project, - '_id': mid - } + query = { + 'q': queue_name, + 'p': project, + '_id': mid + } - if claim: - now = timeutils.utcnow() - query['e'] = {'$gt': now} - message = self._col.find_one(query) + if claim: + # NOTE(cpp-cabrera): return early - the user gaves us an + # invalid claim id and that renders the rest of this + # request moot + cid = utils.to_oid(claim) + if cid is None: + return - if message is None: - return + now = timeutils.utcnow() + query['e'] = {'$gt': now} + message = self._col.find_one(query) - cid = utils.to_oid(claim) + if message is None: + return None - if not ('c' in message and - message['c']['id'] == cid and - message['c']['e'] > now): - raise exceptions.ClaimNotPermitted(message_id, claim) + if not ('c' in message and + message['c']['id'] == cid and + message['c']['e'] > now): + raise exceptions.ClaimNotPermitted(message_id, claim) - self._col.remove(query['_id'], w=0) - else: - self._col.remove(query, w=0) - except exceptions.QueueDoesNotExist: - pass + self._col.remove(query['_id'], w=0) + else: + self._col.remove(query, w=0) @utils.raises_conn_error def bulk_delete(self, queue_name, message_ids, project=None): - try: - message_ids = [utils.to_oid(id) for id in message_ids] - query = { - 'q': queue_name, - 'p': project, - '_id': {'$in': message_ids}, - } + message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid] + query = { + 'q': queue_name, + 'p': project, + '_id': {'$in': message_ids}, + } - self._col.remove(query, w=0) - - except exceptions.QueueDoesNotExist: - pass + self._col.remove(query, w=0) def _basic_message(msg, now): diff --git a/marconi/storage/mongodb/utils.py b/marconi/storage/mongodb/utils.py index 0e011527d..f2b59c247 100644 --- a/marconi/storage/mongodb/utils.py +++ b/marconi/storage/mongodb/utils.py @@ -117,23 +117,22 @@ def calculate_backoff(attempt, max_attempts, max_sleep, max_jitter=0): def to_oid(obj): """Creates a new ObjectId based on the input. - Raises MalformedID when TypeError or berrors.InvalidId + Returns None when TypeError or berrors.InvalidId is raised by the ObjectID class. :param obj: Anything that can be passed as an input to `objectid.ObjectId` - - :raises: MalformedID """ try: return objectid.ObjectId(obj) except (TypeError, berrors.InvalidId): - msg = u'Invalid oid: %s' % obj - raise storage_exceptions.MalformedID(msg) + return None def oid_utc(oid): - """Converts an ObjectId to a non-tz-aware datetime.""" + """Converts an ObjectId to a non-tz-aware datetime. + :raises: TypeError if oid isn't an ObjectId + """ try: return timeutils.normalize_time(oid.generation_time) except AttributeError: diff --git a/marconi/storage/sqlite/claims.py b/marconi/storage/sqlite/claims.py index 69a258691..4af19dacd 100644 --- a/marconi/storage/sqlite/claims.py +++ b/marconi/storage/sqlite/claims.py @@ -47,6 +47,10 @@ class ClaimController(base.ClaimBase): if project is None: project = '' + cid = utils.cid_decode(claim_id) + if cid is None: + raise exceptions.ClaimDoesNotExist(claim_id, queue, project) + with self.driver('deferred'): try: id, ttl, age = self.driver.get(''' @@ -55,7 +59,7 @@ class ClaimController(base.ClaimBase): on Q.id = C.qid where C.ttl > julianday() * 86400.0 - C.created and C.id = ? and project = ? and name = ? - ''', utils.cid_decode(claim_id), project, queue) + ''', cid, project, queue) return ( { @@ -66,7 +70,7 @@ class ClaimController(base.ClaimBase): self.__get(id) ) - except (utils.NoResult, exceptions.MalformedID()): + except utils.NoResult: raise exceptions.ClaimDoesNotExist(claim_id, queue, project) def create(self, queue, metadata, project, limit=10): @@ -125,9 +129,8 @@ class ClaimController(base.ClaimBase): if project is None: project = '' - try: - id = utils.cid_decode(claim_id) - except exceptions.MalformedID: + id = utils.cid_decode(claim_id) + if id is None: raise exceptions.ClaimDoesNotExist(claim_id, queue, project) with self.driver('deferred'): @@ -165,9 +168,8 @@ class ClaimController(base.ClaimBase): if project is None: project = '' - try: - cid = utils.cid_decode(claim_id) - except exceptions.MalformedID: + cid = utils.cid_decode(claim_id) + if cid is None: return self.driver.run(''' diff --git a/marconi/storage/sqlite/driver.py b/marconi/storage/sqlite/driver.py index c5736f79e..911284168 100644 --- a/marconi/storage/sqlite/driver.py +++ b/marconi/storage/sqlite/driver.py @@ -76,7 +76,7 @@ class Driver(storage.DriverBase): return next(self.run(sql, *args)) except StopIteration: - raise utils.NoResult + raise utils.NoResult() @property def affected(self): diff --git a/marconi/storage/sqlite/messages.py b/marconi/storage/sqlite/messages.py index 782d293fa..3fea195f8 100644 --- a/marconi/storage/sqlite/messages.py +++ b/marconi/storage/sqlite/messages.py @@ -41,6 +41,10 @@ class MessageController(base.MessageBase): if project is None: project = '' + mid = utils.msgid_decode(message_id) + if mid is None: + raise exceptions.MessageDoesNotExist(message_id, queue, project) + try: content, ttl, age = self.driver.get(''' select content, ttl, julianday() * 86400.0 - created @@ -48,24 +52,26 @@ class MessageController(base.MessageBase): on qid = Q.id where ttl > julianday() * 86400.0 - created and M.id = ? and project = ? and name = ? - ''', utils.msgid_decode(message_id), project, queue) - - return { - 'id': message_id, - 'ttl': ttl, - 'age': int(age), - 'body': content, - } + ''', mid, project, queue) except utils.NoResult: raise exceptions.MessageDoesNotExist(message_id, queue, project) + return { + 'id': message_id, + 'ttl': ttl, + 'age': int(age), + 'body': content, + } + def bulk_get(self, queue, message_ids, project): if project is None: project = '' - message_ids = ["'%s'" % utils.msgid_decode(id) for id in message_ids] - message_ids = ','.join(message_ids) + message_ids = ','.join( + ["'%s'" % id for id in + map(utils.msgid_decode, message_ids) if id is not None] + ) sql = ''' select M.id, content, ttl, julianday() * 86400.0 - created @@ -214,6 +220,8 @@ class MessageController(base.MessageBase): project = '' id = utils.msgid_decode(message_id) + if id is None: + return if not claim: self.driver.run(''' @@ -240,6 +248,10 @@ class MessageController(base.MessageBase): def __delete_claimed(self, id, claim): # Precondition: id exists in a specific queue + cid = utils.cid_decode(claim) + if cid is None: + return + self.driver.run(''' delete from Messages where id = ? @@ -248,17 +260,19 @@ class MessageController(base.MessageBase): on id = cid where ttl > julianday() * 86400.0 - created and id = ?) - ''', id, utils.cid_decode(claim)) + ''', id, cid) if not self.driver.affected: - raise exceptions.ClaimNotPermitted(utils.msgid_encode(id), claim) + raise exceptions.ClaimNotPermitted(id, claim) def bulk_delete(self, queue, message_ids, project): if project is None: project = '' - message_ids = ','.join(["'%s'" % utils.msgid_decode(id) - for id in message_ids]) + message_ids = ','.join( + ["'%s'" % id for id in + map(utils.msgid_decode, message_ids) if id] + ) self.driver.run(''' delete from Messages diff --git a/marconi/storage/sqlite/utils.py b/marconi/storage/sqlite/utils.py index 9957f8328..d97f130bf 100644 --- a/marconi/storage/sqlite/utils.py +++ b/marconi/storage/sqlite/utils.py @@ -46,7 +46,7 @@ def msgid_encode(id): return hex(id ^ 0x5c693a53)[2:] except TypeError: - raise exceptions.MalformedID() + return None def msgid_decode(id): @@ -54,7 +54,7 @@ def msgid_decode(id): return int(id, 16) ^ 0x5c693a53 except ValueError: - raise exceptions.MalformedID() + return None def marker_encode(id): @@ -78,7 +78,7 @@ def cid_decode(id): return int(id, 16) ^ 0x63c9a59c except ValueError: - raise exceptions.MalformedID() + return None def julian_to_unix(julian_sec): diff --git a/marconi/tests/storage/base.py b/marconi/tests/storage/base.py index 34bb8593a..65184258c 100644 --- a/marconi/tests/storage/base.py +++ b/marconi/tests/storage/base.py @@ -363,15 +363,18 @@ class MessageControllerTest(ControllerBaseTest): # more likely just list the messages, not try to # guess an ID of an arbitrary message. + # NOTE(cpp-cabrera): A malformed ID should result in an empty + # query. Raising an exception for validating IDs makes the + # implementation more verbose instead of taking advantage of + # the Maybe/Optional protocol, particularly when dealing with + # bulk operations. queue = 'foo' project = '480924' self.queue_controller.create(queue, project) bad_message_id = 'xyz' - with testing.expect(exceptions.MalformedID): - self.controller.delete(queue, bad_message_id, project) - - with testing.expect(exceptions.MalformedID): + self.controller.delete(queue, bad_message_id, project) + with testing.expect(exceptions.MessageDoesNotExist): self.controller.get(queue, bad_message_id, project) def test_bad_claim_id(self): @@ -382,10 +385,9 @@ class MessageControllerTest(ControllerBaseTest): client_uuid='unused') bad_claim_id = '; DROP TABLE queues' - with testing.expect(exceptions.MalformedID): - self.controller.delete('unused', msgid, - project='480924', - claim=bad_claim_id) + self.controller.delete('unused', msgid, + project='480924', + claim=bad_claim_id) def test_bad_marker(self): queue = 'foo' diff --git a/marconi/tests/transport/wsgi/test_claims.py b/marconi/tests/transport/wsgi/test_claims.py index 5f7848c0d..e604a77da 100644 --- a/marconi/tests/transport/wsgi/test_claims.py +++ b/marconi/tests/transport/wsgi/test_claims.py @@ -198,6 +198,26 @@ class ClaimsBaseTest(base.TestBase): body='{"ttl": 100, "grace": 60}') self.assertEquals(self.srmock.status, falcon.HTTP_404) + # NOTE(cpp-cabrera): regression test against bug #1203842 + def test_get_nonexistent_claim_404s(self): + path = '/v1/queues/notthere' + + self.simulate_get(path + '/claims/a') + self.assertEquals(self.srmock.status, falcon.HTTP_404) + + def test_delete_nonexistent_claim_204s(self): + path = '/v1/queues/notthere' + + self.simulate_delete(path + '/claims/a') + self.assertEquals(self.srmock.status, falcon.HTTP_204) + + def test_patch_nonexistent_claim_404s(self): + path = '/v1/queues/notthere' + + patch_data = json.dumps({'ttl': 100}) + self.simulate_patch(path + '/claims/a', body=patch_data) + self.assertEquals(self.srmock.status, falcon.HTTP_404) + class ClaimsMongoDBTests(ClaimsBaseTest): diff --git a/marconi/tests/transport/wsgi/test_messages.py b/marconi/tests/transport/wsgi/test_messages.py index e7d12683d..25bf9b406 100644 --- a/marconi/tests/transport/wsgi/test_messages.py +++ b/marconi/tests/transport/wsgi/test_messages.py @@ -322,6 +322,25 @@ class MessagesBaseTest(base.TestBase): headers=self.headers) self.assertEquals(self.srmock.status, falcon.HTTP_200) + # NOTE(cpp-cabrera): regression test against bug #1203842 + def test_get_nonexistent_message_404s(self): + path = '/v1/queues/notthere' + + self.simulate_get(path + '/messages/a') + self.assertEquals(self.srmock.status, falcon.HTTP_404) + + def test_get_multiple_invalid_messages_204s(self): + path = '/v1/queues/notthere' + + self.simulate_get(path + '/messages', query_string='ids=a,b,c') + self.assertEquals(self.srmock.status, falcon.HTTP_204) + + def test_delete_multiple_invalid_messages_204s(self): + path = '/v1/queues/notthere' + + self.simulate_delete(path + '/messages', query_string='ids=a,b,c') + self.assertEquals(self.srmock.status, falcon.HTTP_204) + def _post_messages(self, target, repeat=1): doc = json.dumps([{'body': 239, 'ttl': 300}] * repeat) self.simulate_post(target, self.project_id, body=doc, diff --git a/marconi/transport/wsgi/messages.py b/marconi/transport/wsgi/messages.py index 7152c4d22..d03e7415c 100644 --- a/marconi/transport/wsgi/messages.py +++ b/marconi/transport/wsgi/messages.py @@ -12,7 +12,6 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. - import falcon from marconi.common import config @@ -255,8 +254,7 @@ class CollectionResource(object): description = _(u'Messages could not be deleted.') raise wsgi_exceptions.HTTPServiceUnavailable(description) - else: - resp.status = falcon.HTTP_204 + resp.status = falcon.HTTP_204 class ItemResource(object):