diff --git a/marconi/storage/mongodb/claims.py b/marconi/storage/mongodb/claims.py index d24c56d76..04090e89e 100644 --- a/marconi/storage/mongodb/claims.py +++ b/marconi/storage/mongodb/claims.py @@ -174,6 +174,12 @@ class ClaimController(storage.ClaimBase): upsert=False, multi=True) if updated != 0: + # NOTE(kgriffs): This extra step is necessary because + # in between having gotten a list of active messages + # and updating them, some of them may have been + # claimed by a parallel request. Therefore, we need + # to find out which messages were actually tagged + # with the claim ID successfully. claim, messages = self.get(queue, oid, project=project) return (str(oid), messages) diff --git a/marconi/storage/mongodb/messages.py b/marconi/storage/mongodb/messages.py index b85dfb82c..31d2bfb54 100644 --- a/marconi/storage/mongodb/messages.py +++ b/marconi/storage/mongodb/messages.py @@ -25,6 +25,7 @@ import datetime import time import pymongo.errors +import pymongo.read_preferences import marconi.openstack.common.log as logging from marconi.openstack.common import timeutils @@ -339,7 +340,12 @@ class MessageController(storage.MessageBase): 'p': project, } - msgs = self._col.find(query, sort=[('k', 1)]) + # NOTE(kgriffs): Claimed messages bust be queried from + # the primary to avoid a race condition caused by the + # multi-phased "create claim" algorithm. + preference = pymongo.read_preferences.ReadPreference.PRIMARY + msgs = self._col.find(query, sort=[('k', 1)], + read_preference=preference) if limit: msgs = msgs.limit(limit)