Merge "ECFragGetter: simplify iter_bytes_from_response_part"
This commit is contained in:
commit
b5a2453ed7
@ -2500,7 +2500,7 @@ class ECFragGetter(object):
|
|||||||
self.client_chunk_size = policy.fragment_size
|
self.client_chunk_size = policy.fragment_size
|
||||||
self.skip_bytes = 0
|
self.skip_bytes = 0
|
||||||
self.bytes_used_from_backend = 0
|
self.bytes_used_from_backend = 0
|
||||||
self.source = None
|
self.source = self.node = None
|
||||||
self.logger_thread_locals = logger_thread_locals
|
self.logger_thread_locals = logger_thread_locals
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
|
|
||||||
@ -2660,14 +2660,13 @@ class ECFragGetter(object):
|
|||||||
read_chunk_size=self.app.object_chunk_size)
|
read_chunk_size=self.app.object_chunk_size)
|
||||||
|
|
||||||
def iter_bytes_from_response_part(self, part_file, nbytes):
|
def iter_bytes_from_response_part(self, part_file, nbytes):
|
||||||
client_chunk_size = self.client_chunk_size
|
|
||||||
node_timeout = self.app.recoverable_node_timeout
|
|
||||||
nchunks = 0
|
nchunks = 0
|
||||||
buf = b''
|
buf = b''
|
||||||
part_file = ByteCountEnforcer(part_file, nbytes)
|
part_file = ByteCountEnforcer(part_file, nbytes)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
with WatchdogTimeout(self.app.watchdog, node_timeout,
|
with WatchdogTimeout(self.app.watchdog,
|
||||||
|
self.app.recoverable_node_timeout,
|
||||||
ChunkReadTimeout):
|
ChunkReadTimeout):
|
||||||
chunk = part_file.read(self.app.object_chunk_size)
|
chunk = part_file.read(self.app.object_chunk_size)
|
||||||
nchunks += 1
|
nchunks += 1
|
||||||
@ -2726,33 +2725,18 @@ class ECFragGetter(object):
|
|||||||
self.bytes_used_from_backend += len(buf)
|
self.bytes_used_from_backend += len(buf)
|
||||||
buf = b''
|
buf = b''
|
||||||
|
|
||||||
if not chunk:
|
client_chunk_size = self.client_chunk_size or len(buf)
|
||||||
if buf:
|
while buf and (len(buf) >= client_chunk_size or not chunk):
|
||||||
with WatchdogTimeout(self.app.watchdog,
|
client_chunk = buf[:client_chunk_size]
|
||||||
self.app.client_timeout,
|
buf = buf[client_chunk_size:]
|
||||||
ChunkWriteTimeout):
|
|
||||||
self.bytes_used_from_backend += len(buf)
|
|
||||||
yield buf
|
|
||||||
buf = b''
|
|
||||||
break
|
|
||||||
|
|
||||||
if client_chunk_size is not None:
|
|
||||||
while len(buf) >= client_chunk_size:
|
|
||||||
client_chunk = buf[:client_chunk_size]
|
|
||||||
buf = buf[client_chunk_size:]
|
|
||||||
with WatchdogTimeout(self.app.watchdog,
|
|
||||||
self.app.client_timeout,
|
|
||||||
ChunkWriteTimeout):
|
|
||||||
self.bytes_used_from_backend += \
|
|
||||||
len(client_chunk)
|
|
||||||
yield client_chunk
|
|
||||||
else:
|
|
||||||
with WatchdogTimeout(self.app.watchdog,
|
with WatchdogTimeout(self.app.watchdog,
|
||||||
self.app.client_timeout,
|
self.app.client_timeout,
|
||||||
ChunkWriteTimeout):
|
ChunkWriteTimeout):
|
||||||
self.bytes_used_from_backend += len(buf)
|
self.bytes_used_from_backend += len(client_chunk)
|
||||||
yield buf
|
yield client_chunk
|
||||||
buf = b''
|
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
|
||||||
# This is for fairness; if the network is outpacing
|
# This is for fairness; if the network is outpacing
|
||||||
# the CPU, we'll always be able to read and write
|
# the CPU, we'll always be able to read and write
|
||||||
|
@ -39,8 +39,9 @@ else:
|
|||||||
|
|
||||||
import swift
|
import swift
|
||||||
from swift.common import utils, swob, exceptions
|
from swift.common import utils, swob, exceptions
|
||||||
from swift.common.exceptions import ChunkWriteTimeout
|
from swift.common.exceptions import ChunkWriteTimeout, ShortReadError, \
|
||||||
from swift.common.utils import Timestamp, list_from_csv, md5
|
ChunkReadTimeout
|
||||||
|
from swift.common.utils import Timestamp, list_from_csv, md5, FileLikeIter
|
||||||
from swift.proxy import server as proxy_server
|
from swift.proxy import server as proxy_server
|
||||||
from swift.proxy.controllers import obj
|
from swift.proxy.controllers import obj
|
||||||
from swift.proxy.controllers.base import \
|
from swift.proxy.controllers.base import \
|
||||||
@ -6676,5 +6677,69 @@ class TestNumContainerUpdates(unittest.TestCase):
|
|||||||
c_replica, o_replica, o_quorum))
|
c_replica, o_replica, o_quorum))
|
||||||
|
|
||||||
|
|
||||||
|
@patch_policies(with_ec_default=True)
|
||||||
|
class TestECFragGetter(BaseObjectControllerMixin, unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestECFragGetter, self).setUp()
|
||||||
|
req = Request.blank(path='/a/c/o')
|
||||||
|
self.getter = obj.ECFragGetter(
|
||||||
|
self.app, req, None, None, self.policy, 'a/c/o',
|
||||||
|
{}, None, self.logger.thread_locals,
|
||||||
|
self.logger)
|
||||||
|
|
||||||
|
def test_iter_bytes_from_response_part(self):
|
||||||
|
part = FileLikeIter([b'some', b'thing'])
|
||||||
|
it = self.getter.iter_bytes_from_response_part(part, nbytes=None)
|
||||||
|
self.assertEqual(b'something', b''.join(it))
|
||||||
|
|
||||||
|
def test_iter_bytes_from_response_part_insufficient_bytes(self):
|
||||||
|
part = FileLikeIter([b'some', b'thing'])
|
||||||
|
it = self.getter.iter_bytes_from_response_part(part, nbytes=100)
|
||||||
|
with mock.patch.object(self.getter, '_dig_for_source_and_node',
|
||||||
|
return_value=(None, None)):
|
||||||
|
with self.assertRaises(ShortReadError) as cm:
|
||||||
|
b''.join(it)
|
||||||
|
self.assertEqual('Too few bytes; read 9, expecting 100',
|
||||||
|
str(cm.exception))
|
||||||
|
|
||||||
|
def test_iter_bytes_from_response_part_read_timeout(self):
|
||||||
|
part = FileLikeIter([b'some', b'thing'])
|
||||||
|
self.app.recoverable_node_timeout = 0.05
|
||||||
|
self.app.client_timeout = 0.8
|
||||||
|
it = self.getter.iter_bytes_from_response_part(part, nbytes=9)
|
||||||
|
with mock.patch.object(self.getter, '_dig_for_source_and_node',
|
||||||
|
return_value=(None, None)):
|
||||||
|
with mock.patch.object(part, 'read',
|
||||||
|
side_effect=[b'some', ChunkReadTimeout(9)]):
|
||||||
|
with self.assertRaises(ChunkReadTimeout) as cm:
|
||||||
|
b''.join(it)
|
||||||
|
self.assertEqual('9 seconds', str(cm.exception))
|
||||||
|
|
||||||
|
def test_iter_bytes_from_response_part_null_chunk_size(self):
|
||||||
|
# we don't expect a policy to have fragment_size None or zero but
|
||||||
|
# verify that the getter is defensive
|
||||||
|
self.getter.client_chunk_size = None
|
||||||
|
part = FileLikeIter([b'some', b'thing', b''])
|
||||||
|
it = self.getter.iter_bytes_from_response_part(part, nbytes=None)
|
||||||
|
self.assertEqual(b'something', b''.join(it))
|
||||||
|
|
||||||
|
self.getter.client_chunk_size = 0
|
||||||
|
part = FileLikeIter([b'some', b'thing', b''])
|
||||||
|
it = self.getter.iter_bytes_from_response_part(part, nbytes=None)
|
||||||
|
self.assertEqual(b'something', b''.join(it))
|
||||||
|
|
||||||
|
def test_iter_bytes_from_response_part_small_chunk_size(self):
|
||||||
|
# we don't expect a policy to have fragment_size None or zero but
|
||||||
|
# verify that the getter is defensive
|
||||||
|
self.getter.client_chunk_size = 4
|
||||||
|
part = FileLikeIter([b'some', b'thing', b''])
|
||||||
|
it = self.getter.iter_bytes_from_response_part(part, nbytes=None)
|
||||||
|
self.assertEqual([b'some', b'thin', b'g'], [ch for ch in it])
|
||||||
|
self.getter.client_chunk_size = 1
|
||||||
|
part = FileLikeIter([b'some', b'thing', b''])
|
||||||
|
it = self.getter.iter_bytes_from_response_part(part, nbytes=None)
|
||||||
|
self.assertEqual([c.encode() for c in 'something'], [ch for ch in it])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user