fix(mongodb): Ensure batch message posts are atomic (all or nothing)

In the case of a network failure it is not possible to discover which,
if any, of the messages in a given batch were inserted before the
outage occurred.

This patch works around the problem by inserting messages in two
phases:

1. Assign all messages a "batch" or "group" ID and insert them
2. Once it has been confirmed that *all* messages were inserted
   correctly, finalize the transaction by clearing the batch
   ID, 'g', from the messages just inserted.

Message listing and counting were also modified to ignore any
non-finalized messages, i.e., those messages that have a non-null
'g' value.

Orphaned messages will eventually expire and be cleaned up as
usual via the TTL index.

Closes-Bug: #1270263
Change-Id: I9de61cdfdf6dfcbe07ef76bf63a293091f087663
This commit is contained in:
kgriffs 2014-01-16 17:01:28 -06:00 committed by Gerrit Code Review
parent b14c7e07b7
commit fc71f6a17e
2 changed files with 70 additions and 21 deletions

View File

@ -24,6 +24,7 @@ Field Mappings:
import datetime import datetime
import time import time
from bson import objectid
import pymongo.errors import pymongo.errors
import pymongo.read_preferences import pymongo.read_preferences
@ -66,6 +67,20 @@ ACTIVE_INDEX_FIELDS = [
(PROJ_QUEUE, 1), # Project will be unique, so put first (PROJ_QUEUE, 1), # Project will be unique, so put first
('k', 1), # Used for sorting and paging, must come before range queries ('k', 1), # Used for sorting and paging, must come before range queries
('c.e', 1), # Used for filtering out claimed messages ('c.e', 1), # Used for filtering out claimed messages
# NOTE(kgriffs): We do not include 'u' and 'tx' here on
# purpose. It was found experimentally that adding 'u' did
# not improve performance, and so it was left out in order
# to reduce index size and make updating the index
# faster. When 'tx' was added, it was assumed that it would
# follow a similar performance pattern to 'u', since by
# the time you traverse the index down past the fields
# listed above, there is very little left to scan, esp.
# considering all queries are limited (limit=) to a fairly
# small number.
#
# TODO(kgriffs): The extrapolation wrt 'tx' needs to be
# proven empirically.
] ]
# For counting # For counting
@ -88,6 +103,10 @@ MARKER_INDEX_FIELDS = [
('k', 1), ('k', 1),
] ]
TRANSACTION_INDEX_FIELDS = [
('tx', 1),
]
class MessageController(storage.MessageBase): class MessageController(storage.MessageBase):
"""Implements message resource operations using MongoDB. """Implements message resource operations using MongoDB.
@ -95,14 +114,16 @@ class MessageController(storage.MessageBase):
Messages are scoped by project + queue. Messages are scoped by project + queue.
Messages: Messages:
Name Field Name Field
----------------- -------------------------
scope -> p_q scope -> p_q
expires -> e ttl -> t
ttl -> t expires -> e
uuid -> u marker -> k
claim -> c body -> b
marker -> k claim -> c
client uuid -> u
transaction -> tx
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -163,6 +184,10 @@ class MessageController(storage.MessageBase):
unique=True, unique=True,
background=True) background=True)
collection.ensure_index(TRANSACTION_INDEX_FIELDS,
name='transaction',
background=True)
def _collection(self, queue_name, project=None): def _collection(self, queue_name, project=None):
"""Get a partitioned collection instance.""" """Get a partitioned collection instance."""
return self._collections[utils.get_partition(self._num_partitions, return self._collections[utils.get_partition(self._num_partitions,
@ -237,9 +262,15 @@ class MessageController(storage.MessageBase):
now = timeutils.utcnow_ts() now = timeutils.utcnow_ts()
query = { query = {
# Messages must belong to this # Messages must belong to this queue and project.
# queue and project
PROJ_QUEUE: utils.scope_queue_name(queue_name, project), PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
# NOTE(kgriffs): Messages must be finalized (i.e., must not
# be part of an unfinalized transaction).
#
# See also the note wrt 'tx' within the definition
# of ACTIVE_INDEX_FIELDS.
'tx': None,
} }
if not echo: if not echo:
@ -279,8 +310,15 @@ class MessageController(storage.MessageBase):
they haven't been GC'd yet. This is done for performance. they haven't been GC'd yet. This is done for performance.
""" """
query = { query = {
# Messages must belong to this queue # Messages must belong to this queue and project.
PROJ_QUEUE: utils.scope_queue_name(queue_name, project), PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
# NOTE(kgriffs): Messages must be finalized (i.e., must not
# be part of an unfinalized transaction).
#
# See also the note wrt 'tx' within the definition
# of ACTIVE_INDEX_FIELDS.
'tx': None,
} }
if not include_claimed: if not include_claimed:
@ -458,20 +496,30 @@ class MessageController(storage.MessageBase):
# Set the next basis marker for the first attempt. # Set the next basis marker for the first attempt.
next_marker = self._queue_ctrl._get_counter(queue_name, project) next_marker = self._queue_ctrl._get_counter(queue_name, project)
# Unique transaction ID to facilitate atomic batch inserts
transaction = objectid.ObjectId()
prepared_messages = [ prepared_messages = [
{ {
't': message['ttl'],
PROJ_QUEUE: utils.scope_queue_name(queue_name, project), PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
't': message['ttl'],
'e': now_dt + datetime.timedelta(seconds=message['ttl']), 'e': now_dt + datetime.timedelta(seconds=message['ttl']),
'u': client_uuid, 'u': client_uuid,
'c': {'id': None, 'e': now}, 'c': {'id': None, 'e': now},
'b': message['body'] if 'body' in message else {}, 'b': message['body'] if 'body' in message else {},
'k': next_marker + index, 'k': next_marker + index,
'tx': transaction,
} }
for index, message in enumerate(messages) for index, message in enumerate(messages)
] ]
# NOTE(kgriffs): Don't take the time to do a 2-phase insert
# if there is no way for it to partially succeed.
if len(prepared_messages) == 1:
transaction = None
prepared_messages[0]['tx'] = None
# Use a retry range for sanity, although we expect # Use a retry range for sanity, although we expect
# to rarely, if ever, reach the maximum number of # to rarely, if ever, reach the maximum number of
# retries. # retries.
@ -506,11 +554,18 @@ class MessageController(storage.MessageBase):
self._queue_ctrl._inc_counter(queue_name, project, self._queue_ctrl._inc_counter(queue_name, project,
amount=len(ids)) amount=len(ids))
# NOTE(kgriffs): Finalize the insert once we can say that
# all the messages made it. This makes bulk inserts
# atomic, assuming queries filter out any non-finalized
# messages.
if transaction is not None:
collection.update({'tx': transaction},
{'$set': {'tx': None}},
upsert=False, multi=True)
return map(str, ids) return map(str, ids)
except pymongo.errors.DuplicateKeyError as ex: except pymongo.errors.DuplicateKeyError as ex:
# Try again with the remaining messages
# TODO(kgriffs): Record stats of how often retries happen, # TODO(kgriffs): Record stats of how often retries happen,
# and how many attempts, on average, are required to insert # and how many attempts, on average, are required to insert
# messages. # messages.
@ -593,10 +648,6 @@ class MessageController(storage.MessageBase):
message['k'] = next_marker + index message['k'] = next_marker + index
except Exception as ex: except Exception as ex:
# TODO(kgriffs): Query the DB to get the last marker that
# made it, and extrapolate from there to figure out what
# needs to be retried.
LOG.exception(ex) LOG.exception(ex)
raise raise

View File

@ -249,10 +249,8 @@ def raises_conn_error(func):
try: try:
return func(*args, **kwargs) return func(*args, **kwargs)
except errors.ConnectionFailure as ex: except errors.ConnectionFailure as ex:
# NOTE(flaper87): Raise the error
LOG.exception(ex) LOG.exception(ex)
msg = u'ConnectionFailure caught' raise storage_errors.ConnectionError()
raise storage_errors.ConnectionError(msg)
return wrapper return wrapper