diff --git a/zaqar/storage/swift/claims.py b/zaqar/storage/swift/claims.py index 54945d066..58060953c 100644 --- a/zaqar/storage/swift/claims.py +++ b/zaqar/storage/swift/claims.py @@ -103,8 +103,12 @@ class ClaimController(storage.Claim): dlq = True if ('_max_claim_count' in queue_meta and '_dead_letter_queue' in queue_meta) else False + include_delayed = False if queue_meta.get('_default_message_delay', + 0) else True + messages, marker = message_ctrl._list(queue, project, limit=limit, - include_claimed=False) + include_claimed=False, + include_delayed=include_delayed) claimed = [] for msg in messages: diff --git a/zaqar/storage/swift/messages.py b/zaqar/storage/swift/messages.py index 9599134f9..f2b25a431 100644 --- a/zaqar/storage/swift/messages.py +++ b/zaqar/storage/swift/messages.py @@ -49,6 +49,8 @@ class MessageController(storage.Message): +--------------+-----------------------------------------+ | Claim ID | Object content 'claim_id' | +--------------+-----------------------------------------+ + | Delay Expires| Object content 'delay_expires' | + +--------------+-----------------------------------------+ | Expires | Object Delete-After header | +--------------------------------------------------------+ """ @@ -90,7 +92,8 @@ class MessageController(storage.Message): def _list(self, queue, project=None, marker=None, limit=storage.DEFAULT_MESSAGES_PER_PAGE, echo=False, client_uuid=None, - include_claimed=False, sort=1): + include_claimed=False, include_delayed=False, + sort=1): """List messages in the queue, oldest first(ish) Time ordering and message inclusion in lists are soft, there is no @@ -128,6 +131,12 @@ class MessageController(storage.Message): queue, msg['claim_id'], project) return claim_obj is not None and claim_obj['ttl'] > 0 + def is_delayed(msg, headers): + if include_delayed: + return False + now = timeutils.utcnow_ts() + return msg.get('delay_expires', 0) > now + def is_echo(msg, headers): if echo: return False @@ -136,6 +145,7 @@ class MessageController(storage.Message): filters = [ is_echo, is_claimed, + is_delayed, ] marker = {} get_object = functools.partial(client.get_object, container) @@ -149,9 +159,9 @@ class MessageController(storage.Message): def list(self, queue, project=None, marker=None, limit=storage.DEFAULT_MESSAGES_PER_PAGE, echo=False, client_uuid=None, - include_claimed=False): + include_claimed=False, include_delayed=False,): return self._list(queue, project, marker, limit, echo, - client_uuid, include_claimed) + client_uuid, include_claimed, include_delayed) def first(self, queue, project=None, sort=1): if sort not in (1, -1): @@ -212,9 +222,11 @@ class MessageController(storage.Message): def _create_msg(self, queue, msg, client_uuid, project): slug = str(uuid.uuid1()) + now = timeutils.utcnow_ts() contents = jsonutils.dumps( {'body': msg.get('body', {}), 'claim_id': None, - 'ttl': msg['ttl'], 'claim_count': 0}) + 'ttl': msg['ttl'], 'claim_count': 0, + 'delay_expires': now + msg.get('delay', 0)}) utils._put_or_create_container( self._client, utils._message_container(queue, project),