Buffer message documents when enqueuing them, in lieu of streaming

This patch modifies the pipeline for the message post operation
to use list comprehensions in place of generators. This simplifies
some existing logic (e.g., retry handling) and sets us up nicely
for input validation. It should also be a little bit faster than
the previous approach, at the expense of increased memory usage.

Note that the SQLite driver was not modified (YAGNI).

Change-Id: Iae5f7ceaf09f775afc4a4944e0c626d2f40a6554
Implements: blueprint buffer-messages
This commit is contained in:
kgriffs 2013-08-09 13:25:59 -05:00 committed by Zhihao Yuan
parent c548e10a48
commit 2248203b9e
4 changed files with 57 additions and 47 deletions

View File

@ -21,7 +21,6 @@ Field Mappings:
letter of their long name. letter of their long name.
""" """
import collections
import datetime import datetime
import time import time
@ -483,17 +482,7 @@ class MessageController(storage.MessageBase):
# Set the next basis marker for the first attempt. # Set the next basis marker for the first attempt.
next_marker = self._next_marker(queue_name, project) next_marker = self._next_marker(queue_name, project)
# Results are aggregated across all attempts prepared_messages = [
# NOTE(kgriffs): lazy instantiation
aggregated_results = None
# NOTE(kgriffs): This avoids iterating over messages twice,
# since pymongo internally will iterate over them all to
# encode as bson before submitting to mongod. By using a
# generator, we can produce each message only once,
# as needed by pymongo. At the same time, each message is
# cached in case we need to retry any of them.
message_gen = (
{ {
't': message['ttl'], 't': message['ttl'],
'q': queue_name, 'q': queue_name,
@ -506,9 +495,11 @@ class MessageController(storage.MessageBase):
} }
for index, message in enumerate(messages) for index, message in enumerate(messages)
) ]
prepared_messages, cached_messages = utils.cached_gen(message_gen) # Results are aggregated across all attempts
# NOTE(kgriffs): Using lazy instantiation...
aggregated_results = None
# Use a retry range for sanity, although we expect # Use a retry range for sanity, although we expect
# to rarely, if ever, reach the maximum number of # to rarely, if ever, reach the maximum number of
@ -562,13 +553,8 @@ class MessageController(storage.MessageBase):
duplicate_marker = utils.dup_marker_from_error(str(ex)) duplicate_marker = utils.dup_marker_from_error(str(ex))
failed_index = duplicate_marker - next_marker failed_index = duplicate_marker - next_marker
# First time here, convert the deque to a list
# to support slicing.
if isinstance(cached_messages, collections.deque):
cached_messages = list(cached_messages)
# Put the successful one's IDs into aggregated_results. # Put the successful one's IDs into aggregated_results.
succeeded_messages = cached_messages[:failed_index] succeeded_messages = prepared_messages[:failed_index]
succeeded_ids = [msg['_id'] for msg in succeeded_messages] succeeded_ids = [msg['_id'] for msg in succeeded_messages]
# Results are aggregated across all attempts # Results are aggregated across all attempts
@ -579,19 +565,18 @@ class MessageController(storage.MessageBase):
# Retry the remaining messages with a new sequence # Retry the remaining messages with a new sequence
# of markers. # of markers.
prepared_messages = cached_messages[failed_index:] prepared_messages = prepared_messages[failed_index:]
next_marker = self._next_marker(queue_name, project) next_marker = self._next_marker(queue_name, project)
for index, message in enumerate(prepared_messages): for index, message in enumerate(prepared_messages):
message['k'] = next_marker + index message['k'] = next_marker + index
# Chill out to avoid thrashing/thundering # Chill out for a moment to mitigate thrashing/thundering
self._backoff_sleep(attempt) self._backoff_sleep(attempt)
except Exception as ex: except Exception as ex:
# TODO(kgriffs): Query the DB to get the last marker that # TODO(kgriffs): Query the DB to get the last marker that
# made it, and extrapolate from there to figure out what # made it, and extrapolate from there to figure out what
# needs to be retried. Definitely retry on AutoReconnect; # needs to be retried.
# other types of errors TBD.
LOG.exception(ex) LOG.exception(ex)
raise raise

View File

@ -98,6 +98,28 @@ class TestWSGIutils(testtools.TestCase):
filtered = utils.filter(doc, spec) filtered = utils.filter(doc, spec)
self.assertEqual(filtered, doc) self.assertEqual(filtered, doc)
def test_no_spec(self):
obj = {u'body': {'event': 'start_backup'}, 'ttl': 300}
document = json.dumps(obj, ensure_ascii=False)
doc_stream = io.StringIO(document)
filtered = utils.filter_stream(doc_stream, len(document), spec=None)
self.assertEqual(filtered[0], obj)
# NOTE(kgriffs): Ensure default value for *spec* is None
doc_stream.seek(0)
filtered2 = utils.filter_stream(doc_stream, len(document))
self.assertEqual(filtered2, filtered)
def test_no_spec_array(self):
things = [{u'body': {'event': 'start_backup'}, 'ttl': 300}]
document = json.dumps(things, ensure_ascii=False)
doc_stream = io.StringIO(document)
filtered = utils.filter_stream(doc_stream, len(document),
doctype=utils.JSONArray, spec=None)
self.assertEqual(filtered, things)
def test_filter_star(self): def test_filter_star(self):
doc = {'ttl': 300, 'body': {'event': 'start_backup'}} doc = {'ttl': 300, 'body': {'event': 'start_backup'}}

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import itertools
import falcon import falcon
from marconi.common import config from marconi.common import config
@ -153,16 +151,12 @@ class CollectionResource(object):
doctype=wsgi_utils.JSONArray) doctype=wsgi_utils.JSONArray)
# Verify that at least one message was provided. # Verify that at least one message was provided.
try: # NOTE(kgriffs): This check assumes messages is a
first_message = next(messages) # collection (not a generator).
except StopIteration: if not messages:
description = _('No messages were provided.') description = _('No messages were provided.')
raise wsgi_exceptions.HTTPBadRequestBody(description) raise wsgi_exceptions.HTTPBadRequestBody(description)
# Hack to make message_controller oblivious to the
# fact that we just popped the first message.
messages = itertools.chain((first_message,), messages)
# Enqueue the messages # Enqueue the messages
partial = False partial = False

View File

@ -30,27 +30,35 @@ LOG = logging.getLogger(__name__)
# TODO(kgriffs): Consider moving this to Falcon and/or Oslo # TODO(kgriffs): Consider moving this to Falcon and/or Oslo
def filter_stream(stream, len, spec, doctype=JSONObject): def filter_stream(stream, len, spec=None, doctype=JSONObject):
"""Reads, deserializes, and validates a document from a stream. """Reads, deserializes, and validates a document from a stream.
:param stream: file-like object from which to read an object or :param stream: file-like object from which to read an object or
array of objects. array of objects.
:param len: number of bytes to read from stream :param len: number of bytes to read from stream
:param spec: iterable describing expected fields, yielding :param spec: (Default None) Iterable describing expected fields,
tuples with the form of: (field_name, value_type). Note that yielding tuples with the form of:
value_type may either be a Python type, or the special
string '*' to accept any type. (field_name, value_type).
Note that value_type may either be a Python type, or the
special string '*' to accept any type. If spec is None, the
incoming documents will not be validated.
:param doctype: type of document to expect; must be either :param doctype: type of document to expect; must be either
JSONObject or JSONArray. JSONObject or JSONArray.
:raises: HTTPBadRequest, HTTPServiceUnavailable :raises: HTTPBadRequest, HTTPServiceUnavailable
:returns: A sanitized, filtered version of the document read :returns: A sanitized, filtered version of the document list read
from the stream. If the document contains a list of objects, from the stream. If the document contains a list of objects,
each object will be filtered and yielded in turn. If, on each object will be filtered and returned in a new list. If,
the other hand, the document is expected to contain a on the other hand, the document is expected to contain a
single object, that object will be filtered and returned as single object, that object will be filtered and returned as
a single-element iterable. a single-element iterable.
""" """
if len is None:
description = _('Request body can not be empty')
raise exceptions.HTTPBadRequestBody(description)
try: try:
# TODO(kgriffs): read_json should stream the resulting list # TODO(kgriffs): read_json should stream the resulting list
# of messages, returning a generator rather than buffering # of messages, returning a generator rather than buffering
@ -59,7 +67,7 @@ def filter_stream(stream, len, spec, doctype=JSONObject):
except utils.MalformedJSON as ex: except utils.MalformedJSON as ex:
LOG.exception(ex) LOG.exception(ex)
description = _('Body could not be parsed.') description = _('Request body could not be parsed.')
raise exceptions.HTTPBadRequestBody(description) raise exceptions.HTTPBadRequestBody(description)
except utils.OverflowedJSONInteger as ex: except utils.OverflowedJSONInteger as ex:
@ -77,17 +85,18 @@ def filter_stream(stream, len, spec, doctype=JSONObject):
if not isinstance(document, JSONObject): if not isinstance(document, JSONObject):
raise exceptions.HTTPDocumentTypeNotSupported() raise exceptions.HTTPDocumentTypeNotSupported()
return (filter(document, spec),) return (document,) if spec is None else (filter(document, spec),)
if doctype is JSONArray: if doctype is JSONArray:
if not isinstance(document, JSONArray): if not isinstance(document, JSONArray):
raise exceptions.HTTPDocumentTypeNotSupported() raise exceptions.HTTPDocumentTypeNotSupported()
# Return as a generator since we plan on doing a if spec is None:
# streaming JSON deserializer (see above.git ) return document
return (filter(obj, spec) for obj in document)
raise ValueError('doctype not in (JSONObject, JSONArray)') return [filter(obj, spec) for obj in document]
raise TypeError('doctype must be either a JSONObject or JSONArray')
# TODO(kgriffs): Consider moving this to Falcon and/or Oslo # TODO(kgriffs): Consider moving this to Falcon and/or Oslo