Merge "Object-server: add periodic greenthread yielding during file read."
This commit is contained in:
commit
07c8e8bcdc
@ -148,6 +148,12 @@ use = egg:swift#object
|
||||
# 'keep_cache_private' is false.
|
||||
# keep_cache_slo_manifest = false
|
||||
#
|
||||
# cooperative_period defines how frequent object server GET request will
|
||||
# perform the cooperative yielding during iterating the disk chunks. For
|
||||
# example, value of '5' will insert one sleep() after every 5 disk_chunk_size
|
||||
# chunk reads. A value of '0' (the default) will turn off cooperative yielding.
|
||||
# cooperative_period = 0
|
||||
#
|
||||
# on PUTs, sync data every n MB
|
||||
# mb_per_sync = 512
|
||||
#
|
||||
|
@ -6506,16 +6506,18 @@ class CooperativeIterator(ClosingIterator):
|
||||
|
||||
:param iterable: iterator to wrap.
|
||||
:param period: number of items yielded from this iterator between calls to
|
||||
``sleep()``.
|
||||
``sleep()``; a negative value or 0 mean that cooperative sleep will be
|
||||
disabled.
|
||||
"""
|
||||
__slots__ = ('period', 'count')
|
||||
|
||||
def __init__(self, iterable, period=5):
|
||||
super(CooperativeIterator, self).__init__(iterable)
|
||||
self.count = 0
|
||||
self.period = period
|
||||
self.period = max(0, period or 0)
|
||||
|
||||
def _get_next_item(self):
|
||||
if self.period:
|
||||
if self.count >= self.period:
|
||||
self.count = 0
|
||||
sleep()
|
||||
|
@ -66,7 +66,7 @@ from swift.common.utils import mkdirs, Timestamp, \
|
||||
MD5_OF_EMPTY_STRING, link_fd_to_path, \
|
||||
O_TMPFILE, makedirs_count, replace_partition_in_path, remove_directory, \
|
||||
md5, is_file_older, non_negative_float, config_fallocate_value, \
|
||||
fs_has_free_space
|
||||
fs_has_free_space, CooperativeIterator
|
||||
from swift.common.splice import splice, tee
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
|
||||
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
|
||||
@ -2110,11 +2110,13 @@ class BaseDiskFileReader(object):
|
||||
:param pipe_size: size of pipe buffer used in zero-copy operations
|
||||
:param diskfile: the diskfile creating this DiskFileReader instance
|
||||
:param keep_cache: should resulting reads be kept in the buffer cache
|
||||
:param cooperative_period: the period parameter when does cooperative
|
||||
yielding during file read
|
||||
"""
|
||||
def __init__(self, fp, data_file, obj_size, etag,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
quarantine_hook, use_splice, pipe_size, diskfile,
|
||||
keep_cache=False):
|
||||
keep_cache=False, cooperative_period=0):
|
||||
# Parameter tracking
|
||||
self._fp = fp
|
||||
self._data_file = data_file
|
||||
@ -2133,6 +2135,7 @@ class BaseDiskFileReader(object):
|
||||
self._keep_cache = obj_size < keep_cache_size
|
||||
else:
|
||||
self._keep_cache = False
|
||||
self._cooperative_period = cooperative_period
|
||||
|
||||
# Internal Attributes
|
||||
self._iter_etag = None
|
||||
@ -2157,6 +2160,10 @@ class BaseDiskFileReader(object):
|
||||
self._iter_etag.update(chunk)
|
||||
|
||||
def __iter__(self):
|
||||
return CooperativeIterator(
|
||||
self._inner_iter(), period=self._cooperative_period)
|
||||
|
||||
def _inner_iter(self):
|
||||
"""Returns an iterator over the data file."""
|
||||
try:
|
||||
dropped_cache = 0
|
||||
@ -2975,7 +2982,7 @@ class BaseDiskFile(object):
|
||||
with self.open(current_time=current_time):
|
||||
return self.get_metadata()
|
||||
|
||||
def reader(self, keep_cache=False,
|
||||
def reader(self, keep_cache=False, cooperative_period=0,
|
||||
_quarantine_hook=lambda m: None):
|
||||
"""
|
||||
Return a :class:`swift.common.swob.Response` class compatible
|
||||
@ -2987,6 +2994,8 @@ class BaseDiskFile(object):
|
||||
|
||||
:param keep_cache: caller's preference for keeping data read in the
|
||||
OS buffer cache
|
||||
:param cooperative_period: the period parameter for cooperative
|
||||
yielding during file read
|
||||
:param _quarantine_hook: 1-arg callable called when obj quarantined;
|
||||
the arg is the reason for quarantine.
|
||||
Default is to ignore it.
|
||||
@ -2998,7 +3007,8 @@ class BaseDiskFile(object):
|
||||
self._metadata['ETag'], self._disk_chunk_size,
|
||||
self._manager.keep_cache_size, self._device_path, self._logger,
|
||||
use_splice=self._use_splice, quarantine_hook=_quarantine_hook,
|
||||
pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache)
|
||||
pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache,
|
||||
cooperative_period=cooperative_period)
|
||||
# At this point the reader object is now responsible for closing
|
||||
# the file pointer.
|
||||
self._fp = None
|
||||
@ -3161,11 +3171,12 @@ class ECDiskFileReader(BaseDiskFileReader):
|
||||
def __init__(self, fp, data_file, obj_size, etag,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
quarantine_hook, use_splice, pipe_size, diskfile,
|
||||
keep_cache=False):
|
||||
keep_cache=False, cooperative_period=0):
|
||||
super(ECDiskFileReader, self).__init__(
|
||||
fp, data_file, obj_size, etag,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
quarantine_hook, use_splice, pipe_size, diskfile, keep_cache)
|
||||
quarantine_hook, use_splice, pipe_size, diskfile, keep_cache,
|
||||
cooperative_period)
|
||||
self.frag_buf = None
|
||||
self.frag_offset = 0
|
||||
self.frag_size = self._diskfile.policy.fragment_size
|
||||
|
@ -426,13 +426,14 @@ class DiskFile(object):
|
||||
with self.open(current_time=current_time):
|
||||
return self.get_metadata()
|
||||
|
||||
def reader(self, keep_cache=False):
|
||||
def reader(self, keep_cache=False, cooperative_period=0):
|
||||
"""
|
||||
Return a swift.common.swob.Response class compatible "app_iter"
|
||||
object. The responsibility of closing the open file is passed to the
|
||||
DiskFileReader object.
|
||||
|
||||
:param keep_cache:
|
||||
:param cooperative_period:
|
||||
"""
|
||||
dr = DiskFileReader(self._name, self._fp,
|
||||
int(self._metadata['Content-Length']),
|
||||
|
@ -152,6 +152,7 @@ class ObjectController(BaseStorageServer):
|
||||
config_true_value(conf.get('keep_cache_private', 'false'))
|
||||
self.keep_cache_slo_manifest = \
|
||||
config_true_value(conf.get('keep_cache_slo_manifest', 'false'))
|
||||
self.cooperative_period = int(conf.get("cooperative_period", 0))
|
||||
|
||||
default_allowed_headers = '''
|
||||
content-disposition,
|
||||
@ -1097,10 +1098,15 @@ class ObjectController(BaseStorageServer):
|
||||
)
|
||||
)
|
||||
conditional_etag = resolve_etag_is_at_header(request, metadata)
|
||||
app_iter = disk_file.reader(
|
||||
keep_cache=keep_cache,
|
||||
cooperative_period=self.cooperative_period,
|
||||
)
|
||||
response = Response(
|
||||
app_iter=disk_file.reader(keep_cache=keep_cache),
|
||||
request=request, conditional_response=True,
|
||||
conditional_etag=conditional_etag)
|
||||
app_iter=app_iter, request=request,
|
||||
conditional_response=True,
|
||||
conditional_etag=conditional_etag,
|
||||
)
|
||||
response.headers['Content-Type'] = metadata.get(
|
||||
'Content-Type', 'application/octet-stream')
|
||||
for key, value in metadata.items():
|
||||
|
@ -9821,7 +9821,7 @@ class TestCooperativeIterator(unittest.TestCase):
|
||||
it.close()
|
||||
self.assertTrue(closeable.close.called)
|
||||
|
||||
def test_next(self):
|
||||
def test_sleeps(self):
|
||||
def do_test(it, period):
|
||||
results = []
|
||||
for i in range(period):
|
||||
@ -9852,8 +9852,21 @@ class TestCooperativeIterator(unittest.TestCase):
|
||||
self.assertEqual(list(range(7)), actual)
|
||||
actual = do_test(utils.CooperativeIterator(itertools.count(), 1), 1)
|
||||
self.assertEqual(list(range(3)), actual)
|
||||
actual = do_test(utils.CooperativeIterator(itertools.count(), 0), 0)
|
||||
self.assertEqual(list(range(2)), actual)
|
||||
|
||||
def test_no_sleeps(self):
|
||||
def do_test(period):
|
||||
it = utils.CooperativeIterator(itertools.count(), period)
|
||||
results = []
|
||||
with mock.patch('swift.common.utils.sleep') as mock_sleep:
|
||||
for i in range(100):
|
||||
results.append(next(it))
|
||||
self.assertFalse(mock_sleep.called, i)
|
||||
self.assertEqual(list(range(100)), results)
|
||||
|
||||
do_test(0)
|
||||
do_test(-1)
|
||||
do_test(-111)
|
||||
do_test(None)
|
||||
|
||||
|
||||
class TestContextPool(unittest.TestCase):
|
||||
|
@ -4346,7 +4346,8 @@ class TestObjectController(BaseTestCase):
|
||||
reader_mock = mock.Mock(keep_cache=False)
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=True)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@ -4369,7 +4370,8 @@ class TestObjectController(BaseTestCase):
|
||||
reader_mock = mock.Mock(keep_cache=False)
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=True)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Request headers have 'X-Storage-Token'.
|
||||
@ -4379,7 +4381,8 @@ class TestObjectController(BaseTestCase):
|
||||
reader_mock = mock.Mock(keep_cache=False)
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=True)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Request headers have both 'X-Auth-Token' and 'X-Storage-Token'.
|
||||
@ -4390,7 +4393,8 @@ class TestObjectController(BaseTestCase):
|
||||
reader_mock = mock.Mock(keep_cache=False)
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=True)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
def test_GET_keep_cache_private_config_false(self):
|
||||
@ -4418,7 +4422,8 @@ class TestObjectController(BaseTestCase):
|
||||
reader_mock = mock.Mock(keep_cache=False)
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=True)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@ -4441,7 +4446,8 @@ class TestObjectController(BaseTestCase):
|
||||
reader_mock = mock.Mock(keep_cache=False)
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=False)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Request headers have 'X-Storage-Token'.
|
||||
@ -4451,7 +4457,8 @@ class TestObjectController(BaseTestCase):
|
||||
reader_mock = mock.Mock(keep_cache=False)
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=False)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Request headers have both 'X-Auth-Token' and 'X-Storage-Token'.
|
||||
@ -4462,7 +4469,8 @@ class TestObjectController(BaseTestCase):
|
||||
reader_mock = mock.Mock(keep_cache=False)
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=False)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
def test_GET_keep_cache_slo_manifest_no_config(self):
|
||||
@ -4492,7 +4500,8 @@ class TestObjectController(BaseTestCase):
|
||||
reader_mock = mock.Mock(keep_cache=False)
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=False)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@ -4537,7 +4546,8 @@ class TestObjectController(BaseTestCase):
|
||||
reader_mock = mock.Mock(keep_cache=False)
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=False)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@ -4582,7 +4592,8 @@ class TestObjectController(BaseTestCase):
|
||||
reader_mock = mock.Mock(keep_cache=False)
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=True)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@ -4626,7 +4637,8 @@ class TestObjectController(BaseTestCase):
|
||||
reader_mock = mock.Mock(keep_cache=False)
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=False)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@ -4642,6 +4654,107 @@ class TestObjectController(BaseTestCase):
|
||||
gmtime(math.ceil(float(timestamp)))),
|
||||
})
|
||||
|
||||
def test_GET_cooperative_period_config(self):
|
||||
# Test config of 'cooperative_period' gets passed to DiskFile reader.
|
||||
conf = {'devices': self.testdir, 'mount_check': 'false',
|
||||
'container_update_timeout': 0.0,
|
||||
'cooperative_period': '99'}
|
||||
obj_controller = object_server.ObjectController(
|
||||
conf, logger=self.logger)
|
||||
|
||||
timestamp = normalize_timestamp(time())
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp,
|
||||
'Content-Type': 'application/x-test'})
|
||||
req.body = b'7 bytes'
|
||||
resp = req.get_response(obj_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
headers={'Content-Type': 'application/x-test',
|
||||
'X-Auth-Token': '2340lsdfhhjl02lxfjj'})
|
||||
with mock.patch(
|
||||
"swift.obj.diskfile.BaseDiskFile.reader"
|
||||
) as reader_mock:
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=False, cooperative_period=99)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Test DiskFile reader actually sleeps when reading chunks. When
|
||||
# cooperative_period is 1, disk reader sleeps once AFTER each next().
|
||||
conf['cooperative_period'] = '1'
|
||||
obj_controller = object_server.ObjectController(
|
||||
conf, logger=self.logger)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Range': 'bytes=1-6'})
|
||||
with mock.patch('swift.common.utils.sleep') as mock_sleep:
|
||||
resp = req.get_response(obj_controller)
|
||||
self.assertEqual(resp.status_int, 206)
|
||||
self.assertEqual('bytes 1-6/7', resp.headers.get('Content-Range'))
|
||||
self.assertEqual(b' bytes', resp.body)
|
||||
self.assertEqual(1, mock_sleep.call_count)
|
||||
|
||||
# Test DiskFile reader actually sleeps when reading chunks. And verify
|
||||
# number of sleeps when 'disk_chunk_size' is set.
|
||||
conf['cooperative_period'] = '2'
|
||||
conf['disk_chunk_size'] = 2
|
||||
obj_controller = object_server.ObjectController(
|
||||
conf, logger=self.logger)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
with mock.patch('swift.common.utils.sleep') as mock_sleep:
|
||||
resp = req.get_response(obj_controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(b'7 bytes', resp.body)
|
||||
self.assertEqual(2, mock_sleep.call_count)
|
||||
|
||||
conf['cooperative_period'] = '2'
|
||||
conf['disk_chunk_size'] = 3
|
||||
obj_controller = object_server.ObjectController(
|
||||
conf, logger=self.logger)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
with mock.patch('swift.common.utils.sleep') as mock_sleep:
|
||||
resp = req.get_response(obj_controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(b'7 bytes', resp.body)
|
||||
self.assertEqual(1, mock_sleep.call_count)
|
||||
|
||||
# Test DiskFile reader won't sleep with cooperative_period set as 0.
|
||||
conf['cooperative_period'] = '0'
|
||||
obj_controller = object_server.ObjectController(
|
||||
conf, logger=self.logger)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Range': 'bytes=1-6'})
|
||||
with mock.patch('swift.common.utils.sleep') as mock_sleep:
|
||||
resp = req.get_response(obj_controller)
|
||||
self.assertEqual(resp.status_int, 206)
|
||||
self.assertEqual('bytes 1-6/7', resp.headers.get('Content-Range'))
|
||||
self.assertEqual(b' bytes', resp.body)
|
||||
self.assertFalse(mock_sleep.called)
|
||||
|
||||
# Test DiskFile reader won't sleep with default cooperative_period
|
||||
# which is also 0.
|
||||
conf.pop('cooperative_period')
|
||||
obj_controller = object_server.ObjectController(
|
||||
conf, logger=self.logger)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Range': 'bytes=1-6'})
|
||||
with mock.patch('swift.common.utils.sleep') as mock_sleep:
|
||||
resp = req.get_response(obj_controller)
|
||||
self.assertEqual(resp.status_int, 206)
|
||||
self.assertEqual('bytes 1-6/7', resp.headers.get('Content-Range'))
|
||||
self.assertEqual(b' bytes', resp.body)
|
||||
self.assertFalse(mock_sleep.called)
|
||||
|
||||
@mock.patch("time.time", mock_time)
|
||||
def test_DELETE(self):
|
||||
# Test swift.obj.server.ObjectController.DELETE
|
||||
|
Loading…
Reference in New Issue
Block a user