Support delayed queues for swift
Implement blueprint delayed-queues Change-Id: I32f27e9e6faefcba688467bbc1b87ce253cd74b0
This commit is contained in:
parent
8b071a46bb
commit
bde1f5cd6a
@ -103,8 +103,12 @@ class ClaimController(storage.Claim):
|
||||
dlq = True if ('_max_claim_count' in queue_meta and
|
||||
'_dead_letter_queue' in queue_meta) else False
|
||||
|
||||
include_delayed = False if queue_meta.get('_default_message_delay',
|
||||
0) else True
|
||||
|
||||
messages, marker = message_ctrl._list(queue, project, limit=limit,
|
||||
include_claimed=False)
|
||||
include_claimed=False,
|
||||
include_delayed=include_delayed)
|
||||
|
||||
claimed = []
|
||||
for msg in messages:
|
||||
|
@ -49,6 +49,8 @@ class MessageController(storage.Message):
|
||||
+--------------+-----------------------------------------+
|
||||
| Claim ID | Object content 'claim_id' |
|
||||
+--------------+-----------------------------------------+
|
||||
| Delay Expires| Object content 'delay_expires' |
|
||||
+--------------+-----------------------------------------+
|
||||
| Expires | Object Delete-After header |
|
||||
+--------------------------------------------------------+
|
||||
"""
|
||||
@ -90,7 +92,8 @@ class MessageController(storage.Message):
|
||||
def _list(self, queue, project=None, marker=None,
|
||||
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
|
||||
echo=False, client_uuid=None,
|
||||
include_claimed=False, sort=1):
|
||||
include_claimed=False, include_delayed=False,
|
||||
sort=1):
|
||||
"""List messages in the queue, oldest first(ish)
|
||||
|
||||
Time ordering and message inclusion in lists are soft, there is no
|
||||
@ -128,6 +131,12 @@ class MessageController(storage.Message):
|
||||
queue, msg['claim_id'], project)
|
||||
return claim_obj is not None and claim_obj['ttl'] > 0
|
||||
|
||||
def is_delayed(msg, headers):
|
||||
if include_delayed:
|
||||
return False
|
||||
now = timeutils.utcnow_ts()
|
||||
return msg.get('delay_expires', 0) > now
|
||||
|
||||
def is_echo(msg, headers):
|
||||
if echo:
|
||||
return False
|
||||
@ -136,6 +145,7 @@ class MessageController(storage.Message):
|
||||
filters = [
|
||||
is_echo,
|
||||
is_claimed,
|
||||
is_delayed,
|
||||
]
|
||||
marker = {}
|
||||
get_object = functools.partial(client.get_object, container)
|
||||
@ -149,9 +159,9 @@ class MessageController(storage.Message):
|
||||
def list(self, queue, project=None, marker=None,
|
||||
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
|
||||
echo=False, client_uuid=None,
|
||||
include_claimed=False):
|
||||
include_claimed=False, include_delayed=False,):
|
||||
return self._list(queue, project, marker, limit, echo,
|
||||
client_uuid, include_claimed)
|
||||
client_uuid, include_claimed, include_delayed)
|
||||
|
||||
def first(self, queue, project=None, sort=1):
|
||||
if sort not in (1, -1):
|
||||
@ -212,9 +222,11 @@ class MessageController(storage.Message):
|
||||
|
||||
def _create_msg(self, queue, msg, client_uuid, project):
|
||||
slug = str(uuid.uuid1())
|
||||
now = timeutils.utcnow_ts()
|
||||
contents = jsonutils.dumps(
|
||||
{'body': msg.get('body', {}), 'claim_id': None,
|
||||
'ttl': msg['ttl'], 'claim_count': 0})
|
||||
'ttl': msg['ttl'], 'claim_count': 0,
|
||||
'delay_expires': now + msg.get('delay', 0)})
|
||||
utils._put_or_create_container(
|
||||
self._client,
|
||||
utils._message_container(queue, project),
|
||||
|
Loading…
x
Reference in New Issue
Block a user