From ddd41adab1cc18f8d31e669422e1505293175f3b Mon Sep 17 00:00:00 2001 From: Flavio Percoco Date: Wed, 16 Apr 2014 13:13:33 +0200 Subject: [PATCH] Don't stream by default Current `_Iterator` streams data from the server by default. This behavior, although useful, shouldn't be considered a sane default. A good example is when using `limit` on listing operations. If the default is to stream, the limit will be ignored because the server client will get everything from the server until there's anything left to consume. This behavior can also be harmful if not used carefully. This patch adds a `stream` method to the iterator and makes using this behavior an explicit operation. The user has to opt-in for data streaming. Change-Id: Ib1af24960dff97cb956990d4caf88705f2f7a0d5 --- marconiclient/queues/v1/iterator.py | 21 ++++++++++++++++++ tests/unit/queues/v1/test_message.py | 32 ++++++++++++++++++++++++++-- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/marconiclient/queues/v1/iterator.py b/marconiclient/queues/v1/iterator.py index d9ef5f82..f0c6bba2 100644 --- a/marconiclient/queues/v1/iterator.py +++ b/marconiclient/queues/v1/iterator.py @@ -43,6 +43,7 @@ class _Iterator(object): self._create_function = create_function self._links = [] + self._stream = False self._listing_response = listing_response # NOTE(flaper87): Simple hack to @@ -59,6 +60,22 @@ class _Iterator(object): self._links = iterables['links'] self._listing_response = iterables[self._iter_key] + def stream(self, enabled=True): + """Make this `_Iterator` a stream iterator. + + Since `_Iterator`'s default is to *not* stream, + this method's default value is to *stream* data + from the server. That is, unless explicitly specified + this method will enable make this iterator a stream + iterator. + + :param enabled: Whether streaming should be + enabled or not. + :type enabled: bool + """ + self._stream = enabled + return self + def _next_page(self): for link in self._links: if link['rel'] == 'next': @@ -80,8 +97,12 @@ class _Iterator(object): try: args = self._listing_response.pop(0) except IndexError: + if not self._stream: + raise StopIteration + self._next_page() return self.next() + return self._create_function(args) # NOTE(flaper87): Py2K support diff --git a/tests/unit/queues/v1/test_message.py b/tests/unit/queues/v1/test_message.py index 3d9700a1..caee0e5f 100644 --- a/tests/unit/queues/v1/test_message.py +++ b/tests/unit/queues/v1/test_message.py @@ -44,7 +44,7 @@ class TestMessageIterator(base.QueuesTestBase): iterated = [msg for msg in iterator] self.assertEqual(len(iterated), 1) - def test_next_page(self): + def test_stream(self): messages = {'links': [], 'messages': [{ 'href': '/v1/queues/mine/messages/123123423', @@ -72,9 +72,37 @@ class TestMessageIterator(base.QueuesTestBase): messages, 'messages', message.create_object(self.queue)) - iterated = [msg for msg in iterator] + iterated = [msg for msg in iterator.stream()] self.assertEqual(len(iterated), 2) + def test_iterator_respect_paging(self): + messages = {'links': [], + 'messages': [{ + 'href': '/v1/queues/mine/messages/123123423', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', + 'mode': 'active'} + }] + } + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(messages)) + send_method.return_value = resp + + link = {'rel': 'next', + 'href': "/v1/queues/mine/messages?marker=6244-244224-783"} + messages['links'].append(link) + + iterator = iterate._Iterator(self.queue.client, + messages, + 'messages', + message.create_object(self.queue)) + iterated = [msg for msg in iterator] + self.assertEqual(len(iterated), 1) + class QueuesV1MessageHttpUnitTest(test_message.QueuesV1MessageUnitTest):