Merge "Support dead letter queue for swift"
This commit is contained in:
commit
088a08c531
@ -93,25 +93,70 @@ class ClaimController(storage.Claim):
|
|||||||
def create(self, queue, metadata, project=None,
|
def create(self, queue, metadata, project=None,
|
||||||
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
|
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
|
||||||
message_ctrl = self.driver.message_controller
|
message_ctrl = self.driver.message_controller
|
||||||
|
queue_ctrl = self.driver.queue_controller
|
||||||
|
queue_meta = queue_ctrl.get(queue, project=project)
|
||||||
ttl = metadata['ttl']
|
ttl = metadata['ttl']
|
||||||
grace = metadata['grace']
|
grace = metadata['grace']
|
||||||
msg_ts = ttl + grace
|
msg_ts = ttl + grace
|
||||||
claim_id = uuidutils.generate_uuid()
|
claim_id = uuidutils.generate_uuid()
|
||||||
|
|
||||||
|
dlq = True if ('_max_claim_count' in queue_meta and
|
||||||
|
'_dead_letter_queue' in queue_meta) else False
|
||||||
|
|
||||||
messages, marker = message_ctrl._list(queue, project, limit=limit,
|
messages, marker = message_ctrl._list(queue, project, limit=limit,
|
||||||
include_claimed=False)
|
include_claimed=False)
|
||||||
|
|
||||||
claimed = []
|
claimed = []
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
|
claim_count = msg.get('claim_count', 0)
|
||||||
md5 = hashlib.md5()
|
md5 = hashlib.md5()
|
||||||
md5.update(
|
md5.update(
|
||||||
jsonutils.dumps(
|
jsonutils.dumps(
|
||||||
{'body': msg['body'], 'claim_id': None,
|
{'body': msg['body'], 'claim_id': None,
|
||||||
'ttl': msg['ttl']}))
|
'ttl': msg['ttl'],
|
||||||
|
'claim_count': claim_count}))
|
||||||
md5 = md5.hexdigest()
|
md5 = md5.hexdigest()
|
||||||
msg_ttl = max(msg['ttl'], msg_ts)
|
msg_ttl = max(msg['ttl'], msg_ts)
|
||||||
|
move_to_dlq = False
|
||||||
|
if dlq:
|
||||||
|
if claim_count < queue_meta['_max_claim_count']:
|
||||||
|
# Check if the message's claim count has exceeded the
|
||||||
|
# max claim count defined in the queue, if not ,
|
||||||
|
# Save the new max claim count for message
|
||||||
|
claim_count = claim_count + 1
|
||||||
|
else:
|
||||||
|
# if the message's claim count has exceeded the
|
||||||
|
# max claim count defined in the queue, move the
|
||||||
|
# message to the dead letter queue.
|
||||||
|
# NOTE: We're moving message by changing the
|
||||||
|
# project info directly. That means, the queue and dead
|
||||||
|
# letter queue must be created on the same pool.
|
||||||
|
dlq_ttl = queue_meta.get("_dead_letter_queue_messages_ttl")
|
||||||
|
move_to_dlq = True
|
||||||
|
if dlq_ttl:
|
||||||
|
msg_ttl = dlq_ttl
|
||||||
|
|
||||||
content = jsonutils.dumps(
|
content = jsonutils.dumps(
|
||||||
{'body': msg['body'], 'claim_id': claim_id, 'ttl': msg_ttl})
|
{'body': msg['body'], 'claim_id': claim_id,
|
||||||
|
'ttl': msg_ttl,
|
||||||
|
'claim_count': claim_count})
|
||||||
|
|
||||||
|
if move_to_dlq:
|
||||||
|
dead_letter_queue = queue_meta.get("_dead_letter_queue")
|
||||||
|
utils._put_or_create_container(
|
||||||
|
self._client,
|
||||||
|
utils._message_container(dead_letter_queue, project),
|
||||||
|
msg['id'],
|
||||||
|
content,
|
||||||
|
content_type='application/json',
|
||||||
|
headers={'x-object-meta-clientid': msg['client_uuid'],
|
||||||
|
'if-match': md5,
|
||||||
|
'x-object-meta-claimid': claim_id,
|
||||||
|
'x-delete-after': msg_ttl})
|
||||||
|
|
||||||
|
message_ctrl._delete(queue, msg['id'], project)
|
||||||
|
|
||||||
|
else:
|
||||||
try:
|
try:
|
||||||
self._client.put_object(
|
self._client.put_object(
|
||||||
utils._message_container(queue, project),
|
utils._message_container(queue, project),
|
||||||
@ -129,6 +174,7 @@ class ClaimController(storage.Claim):
|
|||||||
else:
|
else:
|
||||||
msg['claim_id'] = claim_id
|
msg['claim_id'] = claim_id
|
||||||
msg['ttl'] = msg_ttl
|
msg['ttl'] = msg_ttl
|
||||||
|
msg['claim_count'] = claim_count
|
||||||
claimed.append(msg)
|
claimed.append(msg)
|
||||||
|
|
||||||
utils._put_or_create_container(
|
utils._put_or_create_container(
|
||||||
|
@ -212,7 +212,8 @@ class MessageController(storage.Message):
|
|||||||
def _create_msg(self, queue, msg, client_uuid, project):
|
def _create_msg(self, queue, msg, client_uuid, project):
|
||||||
slug = str(uuid.uuid1())
|
slug = str(uuid.uuid1())
|
||||||
contents = jsonutils.dumps(
|
contents = jsonutils.dumps(
|
||||||
{'body': msg.get('body', {}), 'claim_id': None, 'ttl': msg['ttl']})
|
{'body': msg.get('body', {}), 'claim_id': None,
|
||||||
|
'ttl': msg['ttl'], 'claim_count': 0})
|
||||||
try:
|
try:
|
||||||
self._client.put_object(
|
self._client.put_object(
|
||||||
utils._message_container(queue, project),
|
utils._message_container(queue, project),
|
||||||
|
@ -56,7 +56,8 @@ def _message_to_json(message_id, msg, headers, now):
|
|||||||
'age': now - float(headers['x-timestamp']),
|
'age': now - float(headers['x-timestamp']),
|
||||||
'ttl': msg['ttl'],
|
'ttl': msg['ttl'],
|
||||||
'body': msg['body'],
|
'body': msg['body'],
|
||||||
'claim_id': msg['claim_id']
|
'claim_id': msg['claim_id'],
|
||||||
|
'claim_count': msg.get('claim_count', 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -105,6 +106,7 @@ def _filter_messages(messages, filters, marker, get_object, list_objects,
|
|||||||
'body': obj['body'],
|
'body': obj['body'],
|
||||||
'age': now - float(headers['x-timestamp']),
|
'age': now - float(headers['x-timestamp']),
|
||||||
'claim_id': obj['claim_id'],
|
'claim_id': obj['claim_id'],
|
||||||
|
'claim_count': obj.get('claim_count', 0),
|
||||||
}
|
}
|
||||||
if limit <= 0:
|
if limit <= 0:
|
||||||
break
|
break
|
||||||
|
Loading…
x
Reference in New Issue
Block a user