diff --git a/marconi/storage/mongodb/messages.py b/marconi/storage/mongodb/messages.py index a24e344ec..48d52c1bf 100644 --- a/marconi/storage/mongodb/messages.py +++ b/marconi/storage/mongodb/messages.py @@ -21,7 +21,6 @@ Field Mappings: letter of their long name. """ -import collections import datetime import time @@ -483,17 +482,7 @@ class MessageController(storage.MessageBase): # Set the next basis marker for the first attempt. next_marker = self._next_marker(queue_name, project) - # Results are aggregated across all attempts - # NOTE(kgriffs): lazy instantiation - aggregated_results = None - - # NOTE(kgriffs): This avoids iterating over messages twice, - # since pymongo internally will iterate over them all to - # encode as bson before submitting to mongod. By using a - # generator, we can produce each message only once, - # as needed by pymongo. At the same time, each message is - # cached in case we need to retry any of them. - message_gen = ( + prepared_messages = [ { 't': message['ttl'], 'q': queue_name, @@ -506,9 +495,11 @@ class MessageController(storage.MessageBase): } for index, message in enumerate(messages) - ) + ] - prepared_messages, cached_messages = utils.cached_gen(message_gen) + # Results are aggregated across all attempts + # NOTE(kgriffs): Using lazy instantiation... + aggregated_results = None # Use a retry range for sanity, although we expect # to rarely, if ever, reach the maximum number of @@ -562,13 +553,8 @@ class MessageController(storage.MessageBase): duplicate_marker = utils.dup_marker_from_error(str(ex)) failed_index = duplicate_marker - next_marker - # First time here, convert the deque to a list - # to support slicing. - if isinstance(cached_messages, collections.deque): - cached_messages = list(cached_messages) - # Put the successful one's IDs into aggregated_results. - succeeded_messages = cached_messages[:failed_index] + succeeded_messages = prepared_messages[:failed_index] succeeded_ids = [msg['_id'] for msg in succeeded_messages] # Results are aggregated across all attempts @@ -579,19 +565,18 @@ class MessageController(storage.MessageBase): # Retry the remaining messages with a new sequence # of markers. - prepared_messages = cached_messages[failed_index:] + prepared_messages = prepared_messages[failed_index:] next_marker = self._next_marker(queue_name, project) for index, message in enumerate(prepared_messages): message['k'] = next_marker + index - # Chill out to avoid thrashing/thundering + # Chill out for a moment to mitigate thrashing/thundering self._backoff_sleep(attempt) except Exception as ex: # TODO(kgriffs): Query the DB to get the last marker that # made it, and extrapolate from there to figure out what - # needs to be retried. Definitely retry on AutoReconnect; - # other types of errors TBD. + # needs to be retried. LOG.exception(ex) raise diff --git a/marconi/tests/transport/wsgi/test_utils.py b/marconi/tests/transport/wsgi/test_utils.py index 293347115..8db0fb596 100644 --- a/marconi/tests/transport/wsgi/test_utils.py +++ b/marconi/tests/transport/wsgi/test_utils.py @@ -98,6 +98,28 @@ class TestWSGIutils(testtools.TestCase): filtered = utils.filter(doc, spec) self.assertEqual(filtered, doc) + def test_no_spec(self): + obj = {u'body': {'event': 'start_backup'}, 'ttl': 300} + document = json.dumps(obj, ensure_ascii=False) + doc_stream = io.StringIO(document) + + filtered = utils.filter_stream(doc_stream, len(document), spec=None) + self.assertEqual(filtered[0], obj) + + # NOTE(kgriffs): Ensure default value for *spec* is None + doc_stream.seek(0) + filtered2 = utils.filter_stream(doc_stream, len(document)) + self.assertEqual(filtered2, filtered) + + def test_no_spec_array(self): + things = [{u'body': {'event': 'start_backup'}, 'ttl': 300}] + document = json.dumps(things, ensure_ascii=False) + doc_stream = io.StringIO(document) + + filtered = utils.filter_stream(doc_stream, len(document), + doctype=utils.JSONArray, spec=None) + self.assertEqual(filtered, things) + def test_filter_star(self): doc = {'ttl': 300, 'body': {'event': 'start_backup'}} diff --git a/marconi/transport/wsgi/messages.py b/marconi/transport/wsgi/messages.py index 40d6600c2..11281a650 100644 --- a/marconi/transport/wsgi/messages.py +++ b/marconi/transport/wsgi/messages.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import itertools - import falcon from marconi.common import config @@ -153,16 +151,12 @@ class CollectionResource(object): doctype=wsgi_utils.JSONArray) # Verify that at least one message was provided. - try: - first_message = next(messages) - except StopIteration: + # NOTE(kgriffs): This check assumes messages is a + # collection (not a generator). + if not messages: description = _('No messages were provided.') raise wsgi_exceptions.HTTPBadRequestBody(description) - # Hack to make message_controller oblivious to the - # fact that we just popped the first message. - messages = itertools.chain((first_message,), messages) - # Enqueue the messages partial = False diff --git a/marconi/transport/wsgi/utils.py b/marconi/transport/wsgi/utils.py index 92746aea0..7107be9f1 100644 --- a/marconi/transport/wsgi/utils.py +++ b/marconi/transport/wsgi/utils.py @@ -30,27 +30,35 @@ LOG = logging.getLogger(__name__) # TODO(kgriffs): Consider moving this to Falcon and/or Oslo -def filter_stream(stream, len, spec, doctype=JSONObject): +def filter_stream(stream, len, spec=None, doctype=JSONObject): """Reads, deserializes, and validates a document from a stream. :param stream: file-like object from which to read an object or array of objects. :param len: number of bytes to read from stream - :param spec: iterable describing expected fields, yielding - tuples with the form of: (field_name, value_type). Note that - value_type may either be a Python type, or the special - string '*' to accept any type. + :param spec: (Default None) Iterable describing expected fields, + yielding tuples with the form of: + + (field_name, value_type). + + Note that value_type may either be a Python type, or the + special string '*' to accept any type. If spec is None, the + incoming documents will not be validated. :param doctype: type of document to expect; must be either JSONObject or JSONArray. :raises: HTTPBadRequest, HTTPServiceUnavailable - :returns: A sanitized, filtered version of the document read + :returns: A sanitized, filtered version of the document list read from the stream. If the document contains a list of objects, - each object will be filtered and yielded in turn. If, on - the other hand, the document is expected to contain a + each object will be filtered and returned in a new list. If, + on the other hand, the document is expected to contain a single object, that object will be filtered and returned as a single-element iterable. """ + if len is None: + description = _('Request body can not be empty') + raise exceptions.HTTPBadRequestBody(description) + try: # TODO(kgriffs): read_json should stream the resulting list # of messages, returning a generator rather than buffering @@ -59,7 +67,7 @@ def filter_stream(stream, len, spec, doctype=JSONObject): except utils.MalformedJSON as ex: LOG.exception(ex) - description = _('Body could not be parsed.') + description = _('Request body could not be parsed.') raise exceptions.HTTPBadRequestBody(description) except utils.OverflowedJSONInteger as ex: @@ -77,17 +85,18 @@ def filter_stream(stream, len, spec, doctype=JSONObject): if not isinstance(document, JSONObject): raise exceptions.HTTPDocumentTypeNotSupported() - return (filter(document, spec),) + return (document,) if spec is None else (filter(document, spec),) if doctype is JSONArray: if not isinstance(document, JSONArray): raise exceptions.HTTPDocumentTypeNotSupported() - # Return as a generator since we plan on doing a - # streaming JSON deserializer (see above.git ) - return (filter(obj, spec) for obj in document) + if spec is None: + return document - raise ValueError('doctype not in (JSONObject, JSONArray)') + return [filter(obj, spec) for obj in document] + + raise TypeError('doctype must be either a JSONObject or JSONArray') # TODO(kgriffs): Consider moving this to Falcon and/or Oslo