diff --git a/marconi/queues/storage/mongodb/messages.py b/marconi/queues/storage/mongodb/messages.py index 63cad68af..ccb830815 100644 --- a/marconi/queues/storage/mongodb/messages.py +++ b/marconi/queues/storage/mongodb/messages.py @@ -24,6 +24,7 @@ Field Mappings: import datetime import time +from bson import objectid import pymongo.errors import pymongo.read_preferences @@ -66,6 +67,20 @@ ACTIVE_INDEX_FIELDS = [ (PROJ_QUEUE, 1), # Project will be unique, so put first ('k', 1), # Used for sorting and paging, must come before range queries ('c.e', 1), # Used for filtering out claimed messages + + # NOTE(kgriffs): We do not include 'u' and 'tx' here on + # purpose. It was found experimentally that adding 'u' did + # not improve performance, and so it was left out in order + # to reduce index size and make updating the index + # faster. When 'tx' was added, it was assumed that it would + # follow a similar performance pattern to 'u', since by + # the time you traverse the index down past the fields + # listed above, there is very little left to scan, esp. + # considering all queries are limited (limit=) to a fairly + # small number. + # + # TODO(kgriffs): The extrapolation wrt 'tx' needs to be + # proven empirically. ] # For counting @@ -88,6 +103,10 @@ MARKER_INDEX_FIELDS = [ ('k', 1), ] +TRANSACTION_INDEX_FIELDS = [ + ('tx', 1), +] + class MessageController(storage.MessageBase): """Implements message resource operations using MongoDB. @@ -95,14 +114,16 @@ class MessageController(storage.MessageBase): Messages are scoped by project + queue. Messages: - Name Field - ----------------- - scope -> p_q - expires -> e - ttl -> t - uuid -> u - claim -> c - marker -> k + Name Field + ------------------------- + scope -> p_q + ttl -> t + expires -> e + marker -> k + body -> b + claim -> c + client uuid -> u + transaction -> tx """ def __init__(self, *args, **kwargs): @@ -163,6 +184,10 @@ class MessageController(storage.MessageBase): unique=True, background=True) + collection.ensure_index(TRANSACTION_INDEX_FIELDS, + name='transaction', + background=True) + def _collection(self, queue_name, project=None): """Get a partitioned collection instance.""" return self._collections[utils.get_partition(self._num_partitions, @@ -237,9 +262,15 @@ class MessageController(storage.MessageBase): now = timeutils.utcnow_ts() query = { - # Messages must belong to this - # queue and project + # Messages must belong to this queue and project. PROJ_QUEUE: utils.scope_queue_name(queue_name, project), + + # NOTE(kgriffs): Messages must be finalized (i.e., must not + # be part of an unfinalized transaction). + # + # See also the note wrt 'tx' within the definition + # of ACTIVE_INDEX_FIELDS. + 'tx': None, } if not echo: @@ -279,8 +310,15 @@ class MessageController(storage.MessageBase): they haven't been GC'd yet. This is done for performance. """ query = { - # Messages must belong to this queue + # Messages must belong to this queue and project. PROJ_QUEUE: utils.scope_queue_name(queue_name, project), + + # NOTE(kgriffs): Messages must be finalized (i.e., must not + # be part of an unfinalized transaction). + # + # See also the note wrt 'tx' within the definition + # of ACTIVE_INDEX_FIELDS. + 'tx': None, } if not include_claimed: @@ -458,20 +496,30 @@ class MessageController(storage.MessageBase): # Set the next basis marker for the first attempt. next_marker = self._queue_ctrl._get_counter(queue_name, project) + # Unique transaction ID to facilitate atomic batch inserts + transaction = objectid.ObjectId() + prepared_messages = [ { - 't': message['ttl'], PROJ_QUEUE: utils.scope_queue_name(queue_name, project), + 't': message['ttl'], 'e': now_dt + datetime.timedelta(seconds=message['ttl']), 'u': client_uuid, 'c': {'id': None, 'e': now}, 'b': message['body'] if 'body' in message else {}, 'k': next_marker + index, + 'tx': transaction, } for index, message in enumerate(messages) ] + # NOTE(kgriffs): Don't take the time to do a 2-phase insert + # if there is no way for it to partially succeed. + if len(prepared_messages) == 1: + transaction = None + prepared_messages[0]['tx'] = None + # Use a retry range for sanity, although we expect # to rarely, if ever, reach the maximum number of # retries. @@ -506,11 +554,18 @@ class MessageController(storage.MessageBase): self._queue_ctrl._inc_counter(queue_name, project, amount=len(ids)) + # NOTE(kgriffs): Finalize the insert once we can say that + # all the messages made it. This makes bulk inserts + # atomic, assuming queries filter out any non-finalized + # messages. + if transaction is not None: + collection.update({'tx': transaction}, + {'$set': {'tx': None}}, + upsert=False, multi=True) + return map(str, ids) except pymongo.errors.DuplicateKeyError as ex: - # Try again with the remaining messages - # TODO(kgriffs): Record stats of how often retries happen, # and how many attempts, on average, are required to insert # messages. @@ -593,10 +648,6 @@ class MessageController(storage.MessageBase): message['k'] = next_marker + index except Exception as ex: - # TODO(kgriffs): Query the DB to get the last marker that - # made it, and extrapolate from there to figure out what - # needs to be retried. - LOG.exception(ex) raise