Implement grace period for claimed message lifetimes
This commit adds a grace period to claims such that claimed messages are guaranteed not to expire before the claim itself expires, plus an additional "grace" number of seconds. This provides fudge room so that when one worker crashes, an alternate worker has some time to reclaim the orphaned messages once they are released from the stalled claim. Change-Id: Id0e194a85e67f20babfb97e441e11703d6bf5563 Implements: blueprint claimed-message-grace
This commit is contained in:
parent
05054b5be9
commit
b316dff65b
@ -118,25 +118,29 @@ class ClaimController(storage.ClaimBase):
|
||||
number of messages.
|
||||
|
||||
This 2 queries are required because there's no way, as for the
|
||||
time being, to executed an update on a limited number of records
|
||||
time being, to execute an update on a limited number of records.
|
||||
"""
|
||||
msg_ctrl = self.driver.message_controller
|
||||
|
||||
# We don't need the qid here but
|
||||
# we need to verify it exists.
|
||||
qid = self._get_queue_id(queue, project)
|
||||
|
||||
ttl = int(metadata.get('ttl', 60))
|
||||
ttl = metadata['ttl']
|
||||
grace = metadata['grace']
|
||||
oid = objectid.ObjectId()
|
||||
|
||||
now = timeutils.utcnow()
|
||||
ttl_delta = datetime.timedelta(seconds=ttl)
|
||||
expires = now + ttl_delta
|
||||
claim_expires = now + ttl_delta
|
||||
|
||||
grace_delta = datetime.timedelta(seconds=grace)
|
||||
message_expires = claim_expires + grace_delta
|
||||
message_ttl = ttl + grace
|
||||
|
||||
meta = {
|
||||
'id': oid,
|
||||
't': ttl,
|
||||
'e': expires,
|
||||
'e': claim_expires,
|
||||
}
|
||||
|
||||
# Get a list of active, not claimed nor expired
|
||||
@ -170,14 +174,16 @@ class ClaimController(storage.ClaimBase):
|
||||
# This sets the expiration time to
|
||||
# `expires` on messages that would
|
||||
# expire before claim.
|
||||
msg_ctrl._col.update({'q': queue,
|
||||
'e': {'$lt': expires},
|
||||
new_values = {'e': message_expires, 't': message_ttl}
|
||||
msg_ctrl._col.update({'q': qid,
|
||||
'e': {'$lt': message_expires},
|
||||
'c.id': oid},
|
||||
{'$set': {'e': expires, 't': ttl}},
|
||||
{'$set': new_values},
|
||||
upsert=False, multi=True)
|
||||
|
||||
if updated != 0:
|
||||
claim, messages = self.get(queue, oid, project=project)
|
||||
|
||||
return (str(oid), messages)
|
||||
|
||||
def update(self, queue, claim_id, metadata, project=None):
|
||||
|
@ -100,7 +100,8 @@ class ClaimController(base.ClaimBase):
|
||||
and qid = ?
|
||||
limit ?''', qid, limit)
|
||||
|
||||
self.__update_claimed(id, metadata['ttl'])
|
||||
messages_ttl = metadata['ttl'] + metadata['grace']
|
||||
self.__update_claimed(id, messages_ttl)
|
||||
|
||||
return (utils.cid_encode(id), self.__get(id))
|
||||
|
||||
|
@ -202,7 +202,7 @@ class MessageControllerTest(ControllerBaseTest):
|
||||
_insert_fixtures(self.controller, self.queue_name,
|
||||
project=self.project, client_uuid='my_uuid', num=12)
|
||||
|
||||
meta = {'ttl': 70}
|
||||
meta = {'ttl': 70, 'grace': 60}
|
||||
|
||||
another_cid, _ = self.claim_controller.create(self.queue_name, meta,
|
||||
project=self.project)
|
||||
@ -325,7 +325,7 @@ class ClaimControllerTest(ControllerBaseTest):
|
||||
_insert_fixtures(self.message_controller, self.queue_name,
|
||||
project=self.project, client_uuid='my_uuid', num=20)
|
||||
|
||||
meta = {'ttl': 70}
|
||||
meta = {'ttl': 70, 'grace': 30}
|
||||
|
||||
# Make sure create works
|
||||
claim_id, messages = self.controller.create(self.queue_name, meta,
|
||||
@ -351,7 +351,7 @@ class ClaimControllerTest(ControllerBaseTest):
|
||||
self.assertEquals(claim['ttl'], 70)
|
||||
self.assertEquals(claim['id'], claim_id)
|
||||
|
||||
new_meta = {'ttl': 100}
|
||||
new_meta = {'ttl': 100, 'grace': 60}
|
||||
self.controller.update(self.queue_name, claim_id,
|
||||
new_meta, project=self.project)
|
||||
|
||||
@ -378,8 +378,66 @@ class ClaimControllerTest(ControllerBaseTest):
|
||||
self.controller.get, self.queue_name,
|
||||
claim_id, project=self.project)
|
||||
|
||||
def test_extend_lifetime(self):
|
||||
_insert_fixtures(self.message_controller, self.queue_name,
|
||||
project=self.project, client_uuid='my_uuid',
|
||||
num=20, ttl=120)
|
||||
|
||||
meta = {'ttl': 777, 'grace': 0}
|
||||
|
||||
claim_id, messages = self.controller.create(self.queue_name, meta,
|
||||
project=self.project)
|
||||
|
||||
for message in messages:
|
||||
self.assertEquals(message['age'], 0)
|
||||
self.assertEquals(message['ttl'], 777)
|
||||
|
||||
def test_extend_lifetime_with_grace_1(self):
|
||||
_insert_fixtures(self.message_controller, self.queue_name,
|
||||
project=self.project, client_uuid='my_uuid',
|
||||
num=20, ttl=120)
|
||||
|
||||
meta = {'ttl': 777, 'grace': 23}
|
||||
|
||||
claim_id, messages = self.controller.create(self.queue_name, meta,
|
||||
project=self.project)
|
||||
|
||||
for message in messages:
|
||||
self.assertEquals(message['age'], 0)
|
||||
self.assertEquals(message['ttl'], 800)
|
||||
|
||||
def test_extend_lifetime_with_grace_2(self):
|
||||
_insert_fixtures(self.message_controller, self.queue_name,
|
||||
project=self.project, client_uuid='my_uuid',
|
||||
num=20, ttl=120)
|
||||
|
||||
# Although ttl is less than the message's TTL, the grace
|
||||
# period puts it just over the edge.
|
||||
meta = {'ttl': 100, 'grace': 22}
|
||||
|
||||
claim_id, messages = self.controller.create(self.queue_name, meta,
|
||||
project=self.project)
|
||||
|
||||
for message in messages:
|
||||
self.assertEquals(message['age'], 0)
|
||||
self.assertEquals(message['ttl'], 122)
|
||||
|
||||
def test_do_not_extend_lifetime(self):
|
||||
_insert_fixtures(self.message_controller, self.queue_name,
|
||||
project=self.project, client_uuid='my_uuid',
|
||||
num=20, ttl=120)
|
||||
|
||||
# Choose a ttl that is less than the message's current TTL
|
||||
meta = {'ttl': 60, 'grace': 30}
|
||||
|
||||
claim_id, messages = self.controller.create(self.queue_name, meta,
|
||||
project=self.project)
|
||||
|
||||
for message in messages:
|
||||
self.assertEquals(message['ttl'], 120)
|
||||
|
||||
def test_expired_claim(self):
|
||||
meta = {'ttl': 0}
|
||||
meta = {'ttl': 0, 'grace': 60}
|
||||
|
||||
claim_id, messages = self.controller.create(self.queue_name, meta,
|
||||
project=self.project)
|
||||
@ -404,12 +462,12 @@ class ClaimControllerTest(ControllerBaseTest):
|
||||
|
||||
|
||||
def _insert_fixtures(controller, queue_name, project=None,
|
||||
client_uuid=None, num=4):
|
||||
client_uuid=None, num=4, ttl=120):
|
||||
|
||||
def messages():
|
||||
for n in xrange(num):
|
||||
yield {
|
||||
'ttl': 120,
|
||||
'ttl': ttl,
|
||||
'body': {
|
||||
'event': 'Event number %s' % n
|
||||
}}
|
||||
|
@ -255,7 +255,7 @@ class MongodbClaimTests(base.ClaimControllerTest):
|
||||
epoch, project=self.project)
|
||||
|
||||
claim_id, messages = self.controller.create(self.queue_name,
|
||||
{'ttl': 1},
|
||||
{'ttl': 1, 'grace': 0},
|
||||
project=self.project)
|
||||
|
||||
# Lets let it expire
|
||||
|
@ -33,7 +33,7 @@ class ClaimsBaseTest(base.TestBase):
|
||||
self.queue_path = '/v1/queues/fizbit'
|
||||
self.claims_path = self.queue_path + '/claims'
|
||||
|
||||
doc = '{"_ttl": 60 }'
|
||||
doc = '{"_ttl": 60}'
|
||||
|
||||
self.simulate_put(self.queue_path, self.project_id, body=doc)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
@ -56,7 +56,7 @@ class ClaimsBaseTest(base.TestBase):
|
||||
|
||||
def test_bad_patch(self):
|
||||
self.simulate_post(self.claims_path, self.project_id,
|
||||
body='{"ttl": 10}')
|
||||
body='{"ttl": 10, "grace": 30}')
|
||||
href = self.srmock.headers_dict['Location']
|
||||
|
||||
for doc in (None, '[', '"crunchy"'):
|
||||
@ -64,15 +64,15 @@ class ClaimsBaseTest(base.TestBase):
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_lifecycle(self):
|
||||
doc = '{"ttl": 10}'
|
||||
doc = '{"ttl": 10, "grace": 30}'
|
||||
|
||||
# First, claim some messages
|
||||
body = self.simulate_post(self.claims_path, self.project_id, body=doc)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
claim = json.loads(body[0])
|
||||
claimed = json.loads(body[0])
|
||||
claim_href = self.srmock.headers_dict['Location']
|
||||
message_href, params = claim[0]['href'].split('?')
|
||||
message_href, params = claimed[0]['href'].split('?')
|
||||
|
||||
# No more messages to claim
|
||||
self.simulate_post(self.claims_path, self.project_id, body=doc,
|
||||
@ -102,7 +102,8 @@ class ClaimsBaseTest(base.TestBase):
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
# Update the claim
|
||||
self.simulate_patch(claim_href, self.project_id, body='{"ttl": 60}')
|
||||
new_claim = '{"ttl": 60, "grace": 60}'
|
||||
self.simulate_patch(claim_href, self.project_id, body=new_claim)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
# Get the claimed messages (again)
|
||||
@ -138,7 +139,7 @@ class ClaimsBaseTest(base.TestBase):
|
||||
|
||||
def test_nonexistent(self):
|
||||
self.simulate_post('/v1/queues/nonexistent/claims', self.project_id,
|
||||
body='{"ttl": 10}')
|
||||
body='{"ttl": 10, "grace": 30}')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
|
||||
@ -171,7 +172,7 @@ class ClaimsFaultyDriverTests(base.TestBaseFaulty):
|
||||
def test_simple(self):
|
||||
project_id = '480924'
|
||||
claims_path = '/v1/queues/fizbit/claims'
|
||||
doc = '{"ttl": 100}'
|
||||
doc = '{"ttl": 100, "grace": 30}'
|
||||
|
||||
self.simulate_post(claims_path, project_id, body=doc)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_503)
|
||||
|
@ -23,7 +23,7 @@ from marconi.transport.wsgi import helpers as wsgi_helpers
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CLAIM_METADATA_SPEC = (('ttl', int),)
|
||||
CLAIM_METADATA_SPEC = (('ttl', int), ('grace', int))
|
||||
|
||||
|
||||
class CollectionResource(object):
|
||||
|
Loading…
x
Reference in New Issue
Block a user