fix: claimed message require claim_id to delete
But bulk deletion can still delete both unclaimed and claimed messages. However, the major purpose to support bulk deletion is to make it easy for a user to issue a DELETE on the Location header returned from a POST; if a user intend to use claim, then this URI does not help. Change-Id: Ieed92fc0a07d2e873b729242fb2fb14536462780 Closes-Bug: #1215484
This commit is contained in:
parent
3bfc487d2e
commit
3376b920ed
@ -84,6 +84,13 @@ class MessageDoesNotExist(DoesNotExist):
|
|||||||
super(MessageDoesNotExist, self).__init__(msg)
|
super(MessageDoesNotExist, self).__init__(msg)
|
||||||
|
|
||||||
|
|
||||||
|
class MessageIsClaimed(NotPermitted):
|
||||||
|
|
||||||
|
def __init__(self, mid):
|
||||||
|
msg = (u'Message %(mid)s is claimed' % dict(mid=mid))
|
||||||
|
super(MessageIsClaimed, self).__init__(msg)
|
||||||
|
|
||||||
|
|
||||||
class ClaimDoesNotExist(DoesNotExist):
|
class ClaimDoesNotExist(DoesNotExist):
|
||||||
|
|
||||||
def __init__(self, cid, queue, project):
|
def __init__(self, cid, queue, project):
|
||||||
@ -93,9 +100,9 @@ class ClaimDoesNotExist(DoesNotExist):
|
|||||||
super(ClaimDoesNotExist, self).__init__(msg)
|
super(ClaimDoesNotExist, self).__init__(msg)
|
||||||
|
|
||||||
|
|
||||||
class ClaimNotPermitted(NotPermitted):
|
class MessageIsClaimedBy(NotPermitted):
|
||||||
|
|
||||||
def __init__(self, mid, cid):
|
def __init__(self, mid, cid):
|
||||||
msg = (u'Message %(mid)s is not claimed by %(cid)s' %
|
msg = (u'Message %(mid)s is not claimed by %(cid)s' %
|
||||||
dict(cid=cid, mid=mid))
|
dict(cid=cid, mid=mid))
|
||||||
super(ClaimNotPermitted, self).__init__(msg)
|
super(MessageIsClaimedBy, self).__init__(msg)
|
||||||
|
@ -609,29 +609,32 @@ class MessageController(storage.MessageBase):
|
|||||||
'_id': mid
|
'_id': mid
|
||||||
}
|
}
|
||||||
|
|
||||||
if claim:
|
# NOTE(cpp-cabrera): return early - the user gaves us an
|
||||||
# NOTE(cpp-cabrera): return early - the user gaves us an
|
# invalid claim id and that renders the rest of this
|
||||||
# invalid claim id and that renders the rest of this
|
# request moot
|
||||||
# request moot
|
cid = utils.to_oid(claim)
|
||||||
cid = utils.to_oid(claim)
|
if cid is None:
|
||||||
if cid is None:
|
return
|
||||||
return
|
|
||||||
|
|
||||||
now = timeutils.utcnow()
|
now = timeutils.utcnow()
|
||||||
query['e'] = {'$gt': now}
|
query['e'] = {'$gt': now}
|
||||||
message = self._col.find_one(query)
|
message = self._col.find_one(query)
|
||||||
|
|
||||||
if message is None:
|
if message is None:
|
||||||
return None
|
return
|
||||||
|
|
||||||
if not ('c' in message and
|
is_claimed = (message['c']['id'] is not None and
|
||||||
message['c']['id'] == cid and
|
message['c']['e'] > now)
|
||||||
message['c']['e'] > now):
|
|
||||||
raise exceptions.ClaimNotPermitted(message_id, claim)
|
if claim is None:
|
||||||
|
if is_claimed:
|
||||||
|
raise exceptions.MessageIsClaimed(message_id)
|
||||||
|
|
||||||
self._col.remove(query['_id'], w=0)
|
|
||||||
else:
|
else:
|
||||||
self._col.remove(query, w=0)
|
if message['c']['id'] != cid:
|
||||||
|
raise exceptions.MessageIsClaimedBy(message_id, claim)
|
||||||
|
|
||||||
|
self._col.remove(query['_id'], w=0)
|
||||||
|
|
||||||
@utils.raises_conn_error
|
@utils.raises_conn_error
|
||||||
def bulk_delete(self, queue_name, message_ids, project=None):
|
def bulk_delete(self, queue_name, message_ids, project=None):
|
||||||
|
@ -223,15 +223,6 @@ class MessageController(base.MessageBase):
|
|||||||
if id is None:
|
if id is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
if not claim:
|
|
||||||
self.driver.run('''
|
|
||||||
delete from Messages
|
|
||||||
where id = ?
|
|
||||||
and qid = (select id from Queues
|
|
||||||
where project = ? and name = ?)
|
|
||||||
''', id, project, queue)
|
|
||||||
return
|
|
||||||
|
|
||||||
with self.driver('immediate'):
|
with self.driver('immediate'):
|
||||||
message_exists, = self.driver.get('''
|
message_exists, = self.driver.get('''
|
||||||
select count(M.id)
|
select count(M.id)
|
||||||
@ -244,7 +235,23 @@ class MessageController(base.MessageBase):
|
|||||||
if not message_exists:
|
if not message_exists:
|
||||||
return
|
return
|
||||||
|
|
||||||
self.__delete_claimed(id, claim)
|
if claim is None:
|
||||||
|
self.__delete_unclaimed(id)
|
||||||
|
else:
|
||||||
|
self.__delete_claimed(id, claim)
|
||||||
|
|
||||||
|
def __delete_unclaimed(self, id):
|
||||||
|
self.driver.run('''
|
||||||
|
delete from Messages
|
||||||
|
where id = ?
|
||||||
|
and not exists (select *
|
||||||
|
from Claims join Locked
|
||||||
|
on id = cid
|
||||||
|
where ttl > julianday() * 86400.0 - created)
|
||||||
|
''', id)
|
||||||
|
|
||||||
|
if not self.driver.affected:
|
||||||
|
raise exceptions.MessageIsClaimed(id)
|
||||||
|
|
||||||
def __delete_claimed(self, id, claim):
|
def __delete_claimed(self, id, claim):
|
||||||
# Precondition: id exists in a specific queue
|
# Precondition: id exists in a specific queue
|
||||||
@ -263,7 +270,7 @@ class MessageController(base.MessageBase):
|
|||||||
''', id, cid)
|
''', id, cid)
|
||||||
|
|
||||||
if not self.driver.affected:
|
if not self.driver.affected:
|
||||||
raise exceptions.ClaimNotPermitted(id, claim)
|
raise exceptions.MessageIsClaimedBy(id, claim)
|
||||||
|
|
||||||
def bulk_delete(self, queue, message_ids, project):
|
def bulk_delete(self, queue, message_ids, project):
|
||||||
if project is None:
|
if project is None:
|
||||||
|
@ -145,6 +145,10 @@ class ClaimsBaseTest(base.TestBase):
|
|||||||
claim_href)
|
claim_href)
|
||||||
self.assertEquals(claim['ttl'], 100)
|
self.assertEquals(claim['ttl'], 100)
|
||||||
|
|
||||||
|
# Try to delete the message without submitting a claim_id
|
||||||
|
self.simulate_delete(message_href, self.project_id)
|
||||||
|
self.assertEquals(self.srmock.status, falcon.HTTP_403)
|
||||||
|
|
||||||
# Delete the message and its associated claim
|
# Delete the message and its associated claim
|
||||||
self.simulate_delete(message_href, self.project_id,
|
self.simulate_delete(message_href, self.project_id,
|
||||||
query_string=params)
|
query_string=params)
|
||||||
|
@ -290,10 +290,11 @@ class ItemResource(object):
|
|||||||
|
|
||||||
except storage_exceptions.NotPermitted as ex:
|
except storage_exceptions.NotPermitted as ex:
|
||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
title = _(u'Invalid claim')
|
title = _(u'Unable to delete')
|
||||||
description = _(u'The specified claim either does not '
|
description = _(u'This message is claimed; it cannot be '
|
||||||
u'exist or has expired.')
|
u'deleted without a valid claim_id.')
|
||||||
raise falcon.HTTPForbidden(title, description)
|
raise falcon.HTTPForbidden(title, description)
|
||||||
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
description = _(u'Message could not be deleted.')
|
description = _(u'Message could not be deleted.')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user