Don't stream by default

Current `_Iterator` streams data from the server by default. This
behavior, although useful, shouldn't be considered a sane default. A
good example is when using `limit` on listing operations. If the default
is to stream, the limit will be ignored because the server client will
get everything from the server until there's anything left to consume.
This behavior can also be harmful if not used carefully.

This patch adds a `stream` method to the iterator and makes using this
behavior an explicit operation. The user has to opt-in for data
streaming.

Change-Id: Ib1af24960dff97cb956990d4caf88705f2f7a0d5
This commit is contained in:
Flavio Percoco 2014-04-16 13:13:33 +02:00
parent 24971970d2
commit ddd41adab1
2 changed files with 51 additions and 2 deletions

View File

@ -43,6 +43,7 @@ class _Iterator(object):
self._create_function = create_function
self._links = []
self._stream = False
self._listing_response = listing_response
# NOTE(flaper87): Simple hack to
@ -59,6 +60,22 @@ class _Iterator(object):
self._links = iterables['links']
self._listing_response = iterables[self._iter_key]
def stream(self, enabled=True):
"""Make this `_Iterator` a stream iterator.
Since `_Iterator`'s default is to *not* stream,
this method's default value is to *stream* data
from the server. That is, unless explicitly specified
this method will enable make this iterator a stream
iterator.
:param enabled: Whether streaming should be
enabled or not.
:type enabled: bool
"""
self._stream = enabled
return self
def _next_page(self):
for link in self._links:
if link['rel'] == 'next':
@ -80,8 +97,12 @@ class _Iterator(object):
try:
args = self._listing_response.pop(0)
except IndexError:
if not self._stream:
raise StopIteration
self._next_page()
return self.next()
return self._create_function(args)
# NOTE(flaper87): Py2K support

View File

@ -44,7 +44,7 @@ class TestMessageIterator(base.QueuesTestBase):
iterated = [msg for msg in iterator]
self.assertEqual(len(iterated), 1)
def test_next_page(self):
def test_stream(self):
messages = {'links': [],
'messages': [{
'href': '/v1/queues/mine/messages/123123423',
@ -72,9 +72,37 @@ class TestMessageIterator(base.QueuesTestBase):
messages,
'messages',
message.create_object(self.queue))
iterated = [msg for msg in iterator]
iterated = [msg for msg in iterator.stream()]
self.assertEqual(len(iterated), 2)
def test_iterator_respect_paging(self):
messages = {'links': [],
'messages': [{
'href': '/v1/queues/mine/messages/123123423',
'ttl': 800,
'age': 790,
'body': {'event': 'ActivateAccount',
'mode': 'active'}
}]
}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(messages))
send_method.return_value = resp
link = {'rel': 'next',
'href': "/v1/queues/mine/messages?marker=6244-244224-783"}
messages['links'].append(link)
iterator = iterate._Iterator(self.queue.client,
messages,
'messages',
message.create_object(self.queue))
iterated = [msg for msg in iterator]
self.assertEqual(len(iterated), 1)
class QueuesV1MessageHttpUnitTest(test_message.QueuesV1MessageUnitTest):