diff --git a/zaqar/storage/mongodb/claims.py b/zaqar/storage/mongodb/claims.py index 26ed1a15d..975c15b45 100644 --- a/zaqar/storage/mongodb/claims.py +++ b/zaqar/storage/mongodb/claims.py @@ -245,9 +245,13 @@ class ClaimController(storage.Claim): u"for queue %(dlq_name)s", {"dlq_name": dlq_name}) return None, iter([]) - result = dlq_collection.insert_one(msg) - if result.inserted_id: - collection.delete_one({'_id': _id}) + # NOTE(flwang): If dead letter queue and queue are in the + # same partition, the message has been already + # modified. + if collection != dlq_collection: + result = dlq_collection.insert_one(msg) + if result.inserted_id: + collection.delete_one({'_id': _id}) LOG.debug(u"Message %(id)s has met the max claim count " u"%(count)d, now it has been moved to dead " u"letter queue %(dlq_name)s.", diff --git a/zaqar/tests/tempest_plugin/tests/v2/test_claims.py b/zaqar/tests/tempest_plugin/tests/v2/test_claims.py index 1fb568b04..422b94fa1 100644 --- a/zaqar/tests/tempest_plugin/tests/v2/test_claims.py +++ b/zaqar/tests/tempest_plugin/tests/v2/test_claims.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time + from six.moves.urllib import parse as urlparse from tempest import config from tempest.lib.common.utils import data_utils @@ -108,6 +110,44 @@ class TestClaims(base.BaseV2MessagingTest): message_uri = urlparse.urlparse(claim_uri).path self.client.delete_messages(message_uri) + @decorators.idempotent_id('c1975970-66e7-11e7-a771-fa163e40e1ff') + def test_dead_letter_queue(self): + # Post Messages + QueueName = "QueueWithDLQ" + DLQ_name = "DLQ" + meta = {'ttl': 60, 'grace': 60} + # Set dead letter queeu metadata + op1 = {"op": "add", + "path": "/metadata/_max_claim_count", "value": 2} + op2 = {"op": "add", + "path": "/metadata/_dead_letter_queue", "value": DLQ_name} + op3 = {"op": "add", + "path": "/metadata/_dead_letter_queue_messages_ttl", + "value": 7799} + metadata = [op1, op2, op3] + self.client.create_queue(QueueName) + self.client.create_queue(DLQ_name) + self.set_queue_metadata(QueueName, metadata) + message_body = self.generate_message_body(repeat=1) + self.client.post_messages(queue_name=QueueName, + rbody=message_body) + + for i in range(3): + resp, body = self.client.post_claims( + queue_name=QueueName, + rbody=meta) + if(i == 2): + self.assertEqual('204', resp['status']) + else: + self.assertEqual('201', resp['status']) + self.assertEqual(1, len(body["messages"])) + time.sleep(70) + + resp, body = self.client.list_messages(DLQ_name) + self.assertEqual('200', resp['status']) + self.client.delete_queue(DLQ_name) + self.client.delete_queue(QueueName) + @classmethod def resource_cleanup(cls): cls.delete_queue(cls.queue_name)