Merge "Fix ttl and grace story for claims updates"
This commit is contained in:
commit
ac00d2b8ed
@ -131,7 +131,7 @@ class TestClaims(base.V1_1FunctionalTestBase):
|
||||
# Patch Claim
|
||||
claim_location = result.headers['Location']
|
||||
url = self.cfg.zaqar.url + claim_location
|
||||
doc_updated = {"ttl": 300}
|
||||
doc_updated = {"ttl": 300, 'grace': 60}
|
||||
|
||||
result = self.client.patch(url, data=doc_updated)
|
||||
self.assertEqual(result.status_code, 204)
|
||||
@ -139,7 +139,7 @@ class TestClaims(base.V1_1FunctionalTestBase):
|
||||
# verify that the claim TTL is updated
|
||||
result = self.client.get(url)
|
||||
new_ttl = result.json()['ttl']
|
||||
self.assertEqual(new_ttl, 300)
|
||||
self.assertEqual(doc_updated['ttl'], new_ttl)
|
||||
|
||||
test_claim_patch.tags = ['smoke', 'positive']
|
||||
|
||||
|
@ -432,11 +432,13 @@ class MongodbClaimTests(MongodbSetupMixin, base.ClaimControllerTest):
|
||||
time.sleep(1)
|
||||
self.assertRaises(storage.errors.ClaimDoesNotExist,
|
||||
self.controller.update, self.queue_name,
|
||||
claim_id, {}, project=self.project)
|
||||
claim_id, {'ttl': 1, 'grace': 0},
|
||||
project=self.project)
|
||||
|
||||
self.assertRaises(storage.errors.ClaimDoesNotExist,
|
||||
self.controller.update, self.queue_name,
|
||||
claim_id, {}, project=self.project)
|
||||
claim_id, {'ttl': 1, 'grace': 0},
|
||||
project=self.project)
|
||||
|
||||
|
||||
#
|
||||
|
@ -133,6 +133,7 @@ class ClaimController(storage.Claim):
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
claim_expires = now + ttl
|
||||
claim_expires_dt = datetime.datetime.utcfromtimestamp(claim_expires)
|
||||
|
||||
message_ttl = ttl + grace
|
||||
message_expiration = datetime.datetime.utcfromtimestamp(
|
||||
@ -181,7 +182,7 @@ class ClaimController(storage.Claim):
|
||||
# expire before claim.
|
||||
new_values = {'e': message_expiration, 't': message_ttl}
|
||||
collection.update({'p_q': utils.scope_queue_name(queue, project),
|
||||
'e': {'$lt': message_expiration},
|
||||
'e': {'$lt': claim_expires_dt},
|
||||
'c.id': oid},
|
||||
{'$set': new_values},
|
||||
upsert=False, multi=True)
|
||||
@ -205,8 +206,13 @@ class ClaimController(storage.Claim):
|
||||
raise errors.ClaimDoesNotExist(claim_id, queue, project)
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
ttl = int(metadata.get('ttl', 60))
|
||||
expires = now + ttl
|
||||
grace = metadata['grace']
|
||||
ttl = metadata['ttl']
|
||||
claim_expires = now + ttl
|
||||
claim_expires_dt = datetime.datetime.utcfromtimestamp(claim_expires)
|
||||
message_ttl = ttl + grace
|
||||
message_expires = datetime.datetime.utcfromtimestamp(
|
||||
claim_expires + grace)
|
||||
|
||||
msg_ctrl = self.driver.message_controller
|
||||
claimed = msg_ctrl._claimed(queue, cid, expires=now,
|
||||
@ -220,7 +226,7 @@ class ClaimController(storage.Claim):
|
||||
meta = {
|
||||
'id': cid,
|
||||
't': ttl,
|
||||
'e': expires,
|
||||
'e': claim_expires,
|
||||
}
|
||||
|
||||
# TODO(kgriffs): Create methods for these so we don't interact
|
||||
@ -236,9 +242,10 @@ class ClaimController(storage.Claim):
|
||||
# `expires` on messages that would
|
||||
# expire before claim.
|
||||
collection.update({'p_q': scope,
|
||||
'e': {'$lt': expires},
|
||||
'e': {'$lt': claim_expires_dt},
|
||||
'c.id': cid},
|
||||
{'$set': {'e': expires, 't': ttl}},
|
||||
{'$set': {'e': message_expires,
|
||||
't': message_ttl}},
|
||||
upsert=False, multi=True)
|
||||
|
||||
@utils.raises_conn_error
|
||||
|
@ -249,8 +249,8 @@ class ClaimController(storage.Claim, scripting.Mixin):
|
||||
def create(self, queue, metadata, project=None,
|
||||
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
|
||||
|
||||
claim_ttl = int(metadata.get('ttl', 60))
|
||||
grace = int(metadata.get('grace', 60))
|
||||
claim_ttl = metadata['ttl']
|
||||
grace = metadata['grace']
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
msg_ttl = claim_ttl + grace
|
||||
@ -314,10 +314,10 @@ class ClaimController(storage.Claim, scripting.Mixin):
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
claim_ttl = int(metadata.get('ttl', 60))
|
||||
claim_ttl = metadata['ttl']
|
||||
claim_expires = now + claim_ttl
|
||||
|
||||
grace = int(metadata.get('grace', 60))
|
||||
grace = metadata['grace']
|
||||
msg_ttl = claim_ttl + grace
|
||||
msg_expires = claim_expires + grace
|
||||
|
||||
@ -338,7 +338,7 @@ class ClaimController(storage.Claim, scripting.Mixin):
|
||||
msg.claim_id = claim_id
|
||||
msg.claim_expires = claim_expires
|
||||
|
||||
if _msg_would_expire(msg, msg_expires):
|
||||
if _msg_would_expire(msg, claim_expires):
|
||||
msg.ttl = msg_ttl
|
||||
msg.expires = msg_expires
|
||||
|
||||
|
@ -70,7 +70,7 @@ while (#claimed_msgs < limit) do
|
||||
'c.e', claim_expires)
|
||||
|
||||
-- Will the message expire early?
|
||||
if tonumber(msg_expires_prev) < msg_expires then
|
||||
if tonumber(msg_expires_prev) < claim_expires then
|
||||
redis.call('HMSET', mid,
|
||||
't', msg_ttl,
|
||||
'e', msg_expires)
|
||||
@ -85,4 +85,4 @@ while (#claimed_msgs < limit) do
|
||||
end
|
||||
end
|
||||
|
||||
return claimed_msgs
|
||||
return claimed_msgs
|
||||
|
@ -150,7 +150,7 @@ class ClaimController(storage.Claim):
|
||||
raise errors.ClaimDoesNotExist(claim_id, queue, project)
|
||||
|
||||
update = (tables.Messages.update().
|
||||
values(ttl=metadata['ttl']).
|
||||
values(ttl=metadata['ttl'] + metadata['grace']).
|
||||
where(sa.and_(
|
||||
tables.Messages.c.ttl < metadata['ttl'],
|
||||
tables.Messages.c.cid == cid)))
|
||||
|
@ -27,7 +27,7 @@ from zaqar.queues.transport.wsgi import utils as wsgi_utils
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CLAIM_POST_SPEC = (('ttl', int, None), ('grace', int, None))
|
||||
CLAIM_PATCH_SPEC = (('ttl', int, None),)
|
||||
CLAIM_PATCH_SPEC = (('ttl', int, None), ('grace', int, 0))
|
||||
|
||||
|
||||
class Resource(object):
|
||||
|
@ -66,7 +66,9 @@ def public_endpoints(driver):
|
||||
('/queues/{queue_name}/claims/{claim_id}',
|
||||
claims.ItemResource(driver._wsgi_conf,
|
||||
driver._validate,
|
||||
claim_controller)),
|
||||
claim_controller,
|
||||
defaults.claim_ttl,
|
||||
defaults.claim_grace)),
|
||||
|
||||
# Ping
|
||||
('/ping',
|
||||
|
@ -26,11 +26,8 @@ from zaqar.queues.transport.wsgi import utils as wsgi_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CLAIM_PATCH_SPEC = (('ttl', int, None),)
|
||||
|
||||
|
||||
class CollectionResource(object):
|
||||
|
||||
__slots__ = (
|
||||
'_claim_controller',
|
||||
'_validate',
|
||||
@ -116,12 +113,18 @@ class CollectionResource(object):
|
||||
|
||||
class ItemResource(object):
|
||||
|
||||
__slots__ = ('_claim_controller', '_validate')
|
||||
__slots__ = ('_claim_controller', '_validate', '_claim_patch_spec')
|
||||
|
||||
def __init__(self, wsgi_conf, validate, claim_controller):
|
||||
def __init__(self, wsgi_conf, validate, claim_controller,
|
||||
default_claim_ttl, default_grace_ttl):
|
||||
self._claim_controller = claim_controller
|
||||
self._validate = validate
|
||||
|
||||
self._claim_patch_spec = (
|
||||
('ttl', int, default_claim_ttl),
|
||||
('grace', int, default_grace_ttl),
|
||||
)
|
||||
|
||||
def on_get(self, req, resp, project_id, queue_name, claim_id):
|
||||
LOG.debug(u'Claim item GET - claim: %(claim_id)s, '
|
||||
u'queue: %(queue_name)s, project: %(project_id)s',
|
||||
@ -169,7 +172,7 @@ class ItemResource(object):
|
||||
# Read claim metadata (e.g., TTL) and raise appropriate
|
||||
# HTTP errors as needed.
|
||||
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
||||
metadata = wsgi_utils.sanitize(document, CLAIM_PATCH_SPEC)
|
||||
metadata = wsgi_utils.sanitize(document, self._claim_patch_spec)
|
||||
|
||||
try:
|
||||
self._validate.claim_updating(metadata)
|
||||
|
@ -778,7 +778,7 @@ class ClaimControllerTest(ControllerBaseTest):
|
||||
for msg1, msg2 in zip(messages, messages2):
|
||||
self.assertEqual(msg1['body'], msg2['body'])
|
||||
|
||||
self.assertEqual(claim['ttl'], 100)
|
||||
self.assertEqual(claim['ttl'], new_meta['ttl'])
|
||||
self.assertEqual(claim['id'], claim_id)
|
||||
|
||||
# Make sure delete works
|
||||
@ -845,15 +845,13 @@ class ClaimControllerTest(ControllerBaseTest):
|
||||
project=self.project, client_uuid=uuid.uuid4(),
|
||||
num=20, ttl=120)
|
||||
|
||||
# Although ttl is less than the message's TTL, the grace
|
||||
# period puts it just over the edge.
|
||||
meta = {'ttl': 100, 'grace': 22}
|
||||
meta = {'ttl': 121, 'grace': 22}
|
||||
|
||||
claim_id, messages = self.controller.create(self.queue_name, meta,
|
||||
project=self.project)
|
||||
|
||||
for message in messages:
|
||||
self.assertEqual(message['ttl'], 122)
|
||||
self.assertEqual(message['ttl'], 143)
|
||||
|
||||
def test_do_not_extend_lifetime(self):
|
||||
_insert_fixtures(self.message_controller, self.queue_name,
|
||||
|
@ -208,7 +208,7 @@ class ClaimsBaseTest(base.V1_1Base):
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
# Update the claim
|
||||
new_claim_ttl = '{"ttl": 60}'
|
||||
new_claim_ttl = '{"ttl": 60, "grace": 60}'
|
||||
creation = timeutils.utcnow()
|
||||
self.simulate_patch(claim_href, body=new_claim_ttl,
|
||||
headers=self.headers)
|
||||
|
Loading…
Reference in New Issue
Block a user