Merge "Fix bug in mongodb backend for dead letter queue"
This commit is contained in:
commit
f722430fd4
@ -245,9 +245,13 @@ class ClaimController(storage.Claim):
|
||||
u"for queue %(dlq_name)s", {"dlq_name":
|
||||
dlq_name})
|
||||
return None, iter([])
|
||||
result = dlq_collection.insert_one(msg)
|
||||
if result.inserted_id:
|
||||
collection.delete_one({'_id': _id})
|
||||
# NOTE(flwang): If dead letter queue and queue are in the
|
||||
# same partition, the message has been already
|
||||
# modified.
|
||||
if collection != dlq_collection:
|
||||
result = dlq_collection.insert_one(msg)
|
||||
if result.inserted_id:
|
||||
collection.delete_one({'_id': _id})
|
||||
LOG.debug(u"Message %(id)s has met the max claim count "
|
||||
u"%(count)d, now it has been moved to dead "
|
||||
u"letter queue %(dlq_name)s.",
|
||||
|
@ -13,6 +13,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import time
|
||||
|
||||
from six.moves.urllib import parse as urlparse
|
||||
from tempest import config
|
||||
from tempest.lib.common.utils import data_utils
|
||||
@ -108,6 +110,44 @@ class TestClaims(base.BaseV2MessagingTest):
|
||||
message_uri = urlparse.urlparse(claim_uri).path
|
||||
self.client.delete_messages(message_uri)
|
||||
|
||||
@decorators.idempotent_id('c1975970-66e7-11e7-a771-fa163e40e1ff')
|
||||
def test_dead_letter_queue(self):
|
||||
# Post Messages
|
||||
QueueName = "QueueWithDLQ"
|
||||
DLQ_name = "DLQ"
|
||||
meta = {'ttl': 60, 'grace': 60}
|
||||
# Set dead letter queeu metadata
|
||||
op1 = {"op": "add",
|
||||
"path": "/metadata/_max_claim_count", "value": 2}
|
||||
op2 = {"op": "add",
|
||||
"path": "/metadata/_dead_letter_queue", "value": DLQ_name}
|
||||
op3 = {"op": "add",
|
||||
"path": "/metadata/_dead_letter_queue_messages_ttl",
|
||||
"value": 7799}
|
||||
metadata = [op1, op2, op3]
|
||||
self.client.create_queue(QueueName)
|
||||
self.client.create_queue(DLQ_name)
|
||||
self.set_queue_metadata(QueueName, metadata)
|
||||
message_body = self.generate_message_body(repeat=1)
|
||||
self.client.post_messages(queue_name=QueueName,
|
||||
rbody=message_body)
|
||||
|
||||
for i in range(3):
|
||||
resp, body = self.client.post_claims(
|
||||
queue_name=QueueName,
|
||||
rbody=meta)
|
||||
if(i == 2):
|
||||
self.assertEqual('204', resp['status'])
|
||||
else:
|
||||
self.assertEqual('201', resp['status'])
|
||||
self.assertEqual(1, len(body["messages"]))
|
||||
time.sleep(70)
|
||||
|
||||
resp, body = self.client.list_messages(DLQ_name)
|
||||
self.assertEqual('200', resp['status'])
|
||||
self.client.delete_queue(DLQ_name)
|
||||
self.client.delete_queue(QueueName)
|
||||
|
||||
@classmethod
|
||||
def resource_cleanup(cls):
|
||||
cls.delete_queue(cls.queue_name)
|
||||
|
Loading…
x
Reference in New Issue
Block a user