fix: fetching nonexistent message/claims raises 503s

The proposed fix moves away from throwing an exception everytime a
malformed ID is encountered and instead returns None. This way,
partial bulk get and bulk deletes are possible.

Storage drivers affected:
- mongo
- sqlite

Changes/expected behavior:
- GET /v1/queues/exists/messages/malformed => 404
- GET /v1/queues/exists/messages?ids=malformed,malformed => 204
- DELETE /v1/queues/exists/messages?ids=malformed,malformed => 204
- GET /v1/queues/exists/claims/malformed => 404
- PATCH /v1/queues/exists/claims/malformed => 404
- DELETE /v1/queues/exists/claims/malformed => 204

In partcular, regarding bulk_get on messages, malformed IDs or
messages that are not found by ID are now ignored. bulk_delete also
works in this fashion now.

Regression tests are added to the unit test suite to reflect these
changes.

Change-Id: I4f14cd8b4cfd1dd190dccd8724c20f5fac99c629
Closes-Bug: #1203842
This commit is contained in:
Alejandro Cabrera 2013-08-12 18:09:25 -04:00
parent e53c7ec194
commit f284850116
11 changed files with 155 additions and 89 deletions

View File

@ -66,11 +66,9 @@ class ClaimController(storage.ClaimBase):
# Base query, always check expire time
now = timeutils.utcnow()
try:
cid = utils.to_oid(claim_id)
except ValueError:
raise exceptions.ClaimDoesNotExist()
cid = utils.to_oid(claim_id)
if cid is None:
raise exceptions.ClaimDoesNotExist(queue, project, claim_id)
def messages(msg_iter):
msg = next(msg_iter)
@ -188,9 +186,8 @@ class ClaimController(storage.ClaimBase):
@utils.raises_conn_error
def update(self, queue, claim_id, metadata, project=None):
try:
cid = utils.to_oid(claim_id)
except ValueError:
cid = utils.to_oid(claim_id)
if cid is None:
raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
now = timeutils.utcnow()

View File

@ -368,9 +368,11 @@ class MessageController(storage.MessageBase):
return utils.HookedCursor(msgs, denormalizer)
def unclaim(self, queue_name, claim_id, project=None):
try:
cid = utils.to_oid(claim_id)
except ValueError:
cid = utils.to_oid(claim_id)
# NOTE(cpp-cabrera): early abort - avoid a DB query if we're handling
# an invalid ID
if cid is None:
return
# NOTE(cpp-cabrera): unclaim by setting the claim ID to None
@ -435,7 +437,15 @@ class MessageController(storage.MessageBase):
@utils.raises_conn_error
def get(self, queue_name, message_id, project=None):
"""Gets a single message by ID.
:raises: exceptions.MessageDoesNotExist
"""
mid = utils.to_oid(message_id)
if mid is None:
raise exceptions.MessageDoesNotExist(message_id, queue_name,
project)
now = timeutils.utcnow()
query = {
@ -455,7 +465,10 @@ class MessageController(storage.MessageBase):
@utils.raises_conn_error
def bulk_get(self, queue_name, message_ids, project=None):
message_ids = [utils.to_oid(id) for id in message_ids]
message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
if not message_ids:
return ()
now = timeutils.utcnow()
# Base query, always check expire time
@ -597,50 +610,52 @@ class MessageController(storage.MessageBase):
@utils.raises_conn_error
def delete(self, queue_name, message_id, project=None, claim=None):
try:
mid = utils.to_oid(message_id)
# NOTE(cpp-cabrera): return early - this is an invalid message
# id so we won't be able to find it any way
mid = utils.to_oid(message_id)
if mid is None:
return
query = {
'q': queue_name,
'p': project,
'_id': mid
}
query = {
'q': queue_name,
'p': project,
'_id': mid
}
if claim:
now = timeutils.utcnow()
query['e'] = {'$gt': now}
message = self._col.find_one(query)
if claim:
# NOTE(cpp-cabrera): return early - the user gaves us an
# invalid claim id and that renders the rest of this
# request moot
cid = utils.to_oid(claim)
if cid is None:
return
if message is None:
return
now = timeutils.utcnow()
query['e'] = {'$gt': now}
message = self._col.find_one(query)
cid = utils.to_oid(claim)
if message is None:
return None
if not ('c' in message and
message['c']['id'] == cid and
message['c']['e'] > now):
raise exceptions.ClaimNotPermitted(message_id, claim)
if not ('c' in message and
message['c']['id'] == cid and
message['c']['e'] > now):
raise exceptions.ClaimNotPermitted(message_id, claim)
self._col.remove(query['_id'], w=0)
else:
self._col.remove(query, w=0)
except exceptions.QueueDoesNotExist:
pass
self._col.remove(query['_id'], w=0)
else:
self._col.remove(query, w=0)
@utils.raises_conn_error
def bulk_delete(self, queue_name, message_ids, project=None):
try:
message_ids = [utils.to_oid(id) for id in message_ids]
query = {
'q': queue_name,
'p': project,
'_id': {'$in': message_ids},
}
message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
query = {
'q': queue_name,
'p': project,
'_id': {'$in': message_ids},
}
self._col.remove(query, w=0)
except exceptions.QueueDoesNotExist:
pass
self._col.remove(query, w=0)
def _basic_message(msg, now):

View File

@ -117,23 +117,22 @@ def calculate_backoff(attempt, max_attempts, max_sleep, max_jitter=0):
def to_oid(obj):
"""Creates a new ObjectId based on the input.
Raises MalformedID when TypeError or berrors.InvalidId
Returns None when TypeError or berrors.InvalidId
is raised by the ObjectID class.
:param obj: Anything that can be passed as an
input to `objectid.ObjectId`
:raises: MalformedID
"""
try:
return objectid.ObjectId(obj)
except (TypeError, berrors.InvalidId):
msg = u'Invalid oid: %s' % obj
raise storage_exceptions.MalformedID(msg)
return None
def oid_utc(oid):
"""Converts an ObjectId to a non-tz-aware datetime."""
"""Converts an ObjectId to a non-tz-aware datetime.
:raises: TypeError if oid isn't an ObjectId
"""
try:
return timeutils.normalize_time(oid.generation_time)
except AttributeError:

View File

@ -47,6 +47,10 @@ class ClaimController(base.ClaimBase):
if project is None:
project = ''
cid = utils.cid_decode(claim_id)
if cid is None:
raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
with self.driver('deferred'):
try:
id, ttl, age = self.driver.get('''
@ -55,7 +59,7 @@ class ClaimController(base.ClaimBase):
on Q.id = C.qid
where C.ttl > julianday() * 86400.0 - C.created
and C.id = ? and project = ? and name = ?
''', utils.cid_decode(claim_id), project, queue)
''', cid, project, queue)
return (
{
@ -66,7 +70,7 @@ class ClaimController(base.ClaimBase):
self.__get(id)
)
except (utils.NoResult, exceptions.MalformedID()):
except utils.NoResult:
raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
def create(self, queue, metadata, project, limit=10):
@ -125,9 +129,8 @@ class ClaimController(base.ClaimBase):
if project is None:
project = ''
try:
id = utils.cid_decode(claim_id)
except exceptions.MalformedID:
id = utils.cid_decode(claim_id)
if id is None:
raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
with self.driver('deferred'):
@ -165,9 +168,8 @@ class ClaimController(base.ClaimBase):
if project is None:
project = ''
try:
cid = utils.cid_decode(claim_id)
except exceptions.MalformedID:
cid = utils.cid_decode(claim_id)
if cid is None:
return
self.driver.run('''

View File

@ -76,7 +76,7 @@ class Driver(storage.DriverBase):
return next(self.run(sql, *args))
except StopIteration:
raise utils.NoResult
raise utils.NoResult()
@property
def affected(self):

View File

@ -41,6 +41,10 @@ class MessageController(base.MessageBase):
if project is None:
project = ''
mid = utils.msgid_decode(message_id)
if mid is None:
raise exceptions.MessageDoesNotExist(message_id, queue, project)
try:
content, ttl, age = self.driver.get('''
select content, ttl, julianday() * 86400.0 - created
@ -48,24 +52,26 @@ class MessageController(base.MessageBase):
on qid = Q.id
where ttl > julianday() * 86400.0 - created
and M.id = ? and project = ? and name = ?
''', utils.msgid_decode(message_id), project, queue)
return {
'id': message_id,
'ttl': ttl,
'age': int(age),
'body': content,
}
''', mid, project, queue)
except utils.NoResult:
raise exceptions.MessageDoesNotExist(message_id, queue, project)
return {
'id': message_id,
'ttl': ttl,
'age': int(age),
'body': content,
}
def bulk_get(self, queue, message_ids, project):
if project is None:
project = ''
message_ids = ["'%s'" % utils.msgid_decode(id) for id in message_ids]
message_ids = ','.join(message_ids)
message_ids = ','.join(
["'%s'" % id for id in
map(utils.msgid_decode, message_ids) if id is not None]
)
sql = '''
select M.id, content, ttl, julianday() * 86400.0 - created
@ -214,6 +220,8 @@ class MessageController(base.MessageBase):
project = ''
id = utils.msgid_decode(message_id)
if id is None:
return
if not claim:
self.driver.run('''
@ -240,6 +248,10 @@ class MessageController(base.MessageBase):
def __delete_claimed(self, id, claim):
# Precondition: id exists in a specific queue
cid = utils.cid_decode(claim)
if cid is None:
return
self.driver.run('''
delete from Messages
where id = ?
@ -248,17 +260,19 @@ class MessageController(base.MessageBase):
on id = cid
where ttl > julianday() * 86400.0 - created
and id = ?)
''', id, utils.cid_decode(claim))
''', id, cid)
if not self.driver.affected:
raise exceptions.ClaimNotPermitted(utils.msgid_encode(id), claim)
raise exceptions.ClaimNotPermitted(id, claim)
def bulk_delete(self, queue, message_ids, project):
if project is None:
project = ''
message_ids = ','.join(["'%s'" % utils.msgid_decode(id)
for id in message_ids])
message_ids = ','.join(
["'%s'" % id for id in
map(utils.msgid_decode, message_ids) if id]
)
self.driver.run('''
delete from Messages

View File

@ -46,7 +46,7 @@ def msgid_encode(id):
return hex(id ^ 0x5c693a53)[2:]
except TypeError:
raise exceptions.MalformedID()
return None
def msgid_decode(id):
@ -54,7 +54,7 @@ def msgid_decode(id):
return int(id, 16) ^ 0x5c693a53
except ValueError:
raise exceptions.MalformedID()
return None
def marker_encode(id):
@ -78,7 +78,7 @@ def cid_decode(id):
return int(id, 16) ^ 0x63c9a59c
except ValueError:
raise exceptions.MalformedID()
return None
def julian_to_unix(julian_sec):

View File

@ -363,15 +363,18 @@ class MessageControllerTest(ControllerBaseTest):
# more likely just list the messages, not try to
# guess an ID of an arbitrary message.
# NOTE(cpp-cabrera): A malformed ID should result in an empty
# query. Raising an exception for validating IDs makes the
# implementation more verbose instead of taking advantage of
# the Maybe/Optional protocol, particularly when dealing with
# bulk operations.
queue = 'foo'
project = '480924'
self.queue_controller.create(queue, project)
bad_message_id = 'xyz'
with testing.expect(exceptions.MalformedID):
self.controller.delete(queue, bad_message_id, project)
with testing.expect(exceptions.MalformedID):
self.controller.delete(queue, bad_message_id, project)
with testing.expect(exceptions.MessageDoesNotExist):
self.controller.get(queue, bad_message_id, project)
def test_bad_claim_id(self):
@ -382,10 +385,9 @@ class MessageControllerTest(ControllerBaseTest):
client_uuid='unused')
bad_claim_id = '; DROP TABLE queues'
with testing.expect(exceptions.MalformedID):
self.controller.delete('unused', msgid,
project='480924',
claim=bad_claim_id)
self.controller.delete('unused', msgid,
project='480924',
claim=bad_claim_id)
def test_bad_marker(self):
queue = 'foo'

View File

@ -198,6 +198,26 @@ class ClaimsBaseTest(base.TestBase):
body='{"ttl": 100, "grace": 60}')
self.assertEquals(self.srmock.status, falcon.HTTP_404)
# NOTE(cpp-cabrera): regression test against bug #1203842
def test_get_nonexistent_claim_404s(self):
path = '/v1/queues/notthere'
self.simulate_get(path + '/claims/a')
self.assertEquals(self.srmock.status, falcon.HTTP_404)
def test_delete_nonexistent_claim_204s(self):
path = '/v1/queues/notthere'
self.simulate_delete(path + '/claims/a')
self.assertEquals(self.srmock.status, falcon.HTTP_204)
def test_patch_nonexistent_claim_404s(self):
path = '/v1/queues/notthere'
patch_data = json.dumps({'ttl': 100})
self.simulate_patch(path + '/claims/a', body=patch_data)
self.assertEquals(self.srmock.status, falcon.HTTP_404)
class ClaimsMongoDBTests(ClaimsBaseTest):

View File

@ -322,6 +322,25 @@ class MessagesBaseTest(base.TestBase):
headers=self.headers)
self.assertEquals(self.srmock.status, falcon.HTTP_200)
# NOTE(cpp-cabrera): regression test against bug #1203842
def test_get_nonexistent_message_404s(self):
path = '/v1/queues/notthere'
self.simulate_get(path + '/messages/a')
self.assertEquals(self.srmock.status, falcon.HTTP_404)
def test_get_multiple_invalid_messages_204s(self):
path = '/v1/queues/notthere'
self.simulate_get(path + '/messages', query_string='ids=a,b,c')
self.assertEquals(self.srmock.status, falcon.HTTP_204)
def test_delete_multiple_invalid_messages_204s(self):
path = '/v1/queues/notthere'
self.simulate_delete(path + '/messages', query_string='ids=a,b,c')
self.assertEquals(self.srmock.status, falcon.HTTP_204)
def _post_messages(self, target, repeat=1):
doc = json.dumps([{'body': 239, 'ttl': 300}] * repeat)
self.simulate_post(target, self.project_id, body=doc,

View File

@ -12,7 +12,6 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from marconi.common import config
@ -255,8 +254,7 @@ class CollectionResource(object):
description = _(u'Messages could not be deleted.')
raise wsgi_exceptions.HTTPServiceUnavailable(description)
else:
resp.status = falcon.HTTP_204
resp.status = falcon.HTTP_204
class ItemResource(object):