From 8d544d643dfa79634813052292bfeacf9a268be9 Mon Sep 17 00:00:00 2001 From: kgriffs Date: Thu, 5 Sep 2013 13:02:01 -0500 Subject: [PATCH] fix(storage.mongodb): Race condition when creating a claim This patch forces MessageController.claimed to always query the primary in order to resolve a race condition in ClaimController.create. The latter method tags a batch of messages with a claim ID, then immediately calles claimed() to get a list of messages that were actually updated. MessageController.claimed is also used by ClaimController.update, so that request will no longer read from secondaries either. This shouldn't put much more load on the primary, and may even be desired behavior in the case that a client creates a claim and immediately attempts to update it, in which case they could get a "not found" if the new claim hasn't propagated yet to the secondary being queried. Change-Id: I8b5cfee1cc6f185ebf53e98fad95a5fc30e021da Partial-Bug: #1218990 --- marconi/storage/mongodb/claims.py | 6 ++++++ marconi/storage/mongodb/messages.py | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/marconi/storage/mongodb/claims.py b/marconi/storage/mongodb/claims.py index d24c56d76..04090e89e 100644 --- a/marconi/storage/mongodb/claims.py +++ b/marconi/storage/mongodb/claims.py @@ -174,6 +174,12 @@ class ClaimController(storage.ClaimBase): upsert=False, multi=True) if updated != 0: + # NOTE(kgriffs): This extra step is necessary because + # in between having gotten a list of active messages + # and updating them, some of them may have been + # claimed by a parallel request. Therefore, we need + # to find out which messages were actually tagged + # with the claim ID successfully. claim, messages = self.get(queue, oid, project=project) return (str(oid), messages) diff --git a/marconi/storage/mongodb/messages.py b/marconi/storage/mongodb/messages.py index b85dfb82c..31d2bfb54 100644 --- a/marconi/storage/mongodb/messages.py +++ b/marconi/storage/mongodb/messages.py @@ -25,6 +25,7 @@ import datetime import time import pymongo.errors +import pymongo.read_preferences import marconi.openstack.common.log as logging from marconi.openstack.common import timeutils @@ -339,7 +340,12 @@ class MessageController(storage.MessageBase): 'p': project, } - msgs = self._col.find(query, sort=[('k', 1)]) + # NOTE(kgriffs): Claimed messages bust be queried from + # the primary to avoid a race condition caused by the + # multi-phased "create claim" algorithm. + preference = pymongo.read_preferences.ReadPreference.PRIMARY + msgs = self._col.find(query, sort=[('k', 1)], + read_preference=preference) if limit: msgs = msgs.limit(limit)