diff --git a/marconi/queues/storage/mongodb/claims.py b/marconi/queues/storage/mongodb/claims.py index 073ebf29c..5c4a1f433 100644 --- a/marconi/queues/storage/mongodb/claims.py +++ b/marconi/queues/storage/mongodb/claims.py @@ -151,15 +151,19 @@ class ClaimController(storage.ClaimBase): now = timeutils.utcnow_ts() - # Set claim field for messages in ids + # NOTE(kgriffs): Set the claim field for + # the active message batch, while also + # filtering out any messages that happened + # to get claimed just now by one or more + # parallel requests. + # + # Filtering by just 'c.e' works because + # new messages have that field initialized + # to the current time when the message is + # posted. There is no need to check whether + # 'c' exists or 'c.id' is None. updated = msg_ctrl._col.update({'_id': {'$in': ids}, - '$or': [ - {'c.id': None}, - { - 'c.id': {'$ne': None}, - 'c.e': {'$lte': now} - } - ]}, + 'c.e': {'$lte': now}}, {'$set': {'c': meta}}, upsert=False, multi=True)['n']