Object-server: add periodic greenthread yielding during file read.

Currently, when object-server serves GET request and DiskFile
reader iterate over disk file chunks, there is no explicit
eventlet sleep called. When network outpace the slow disk IO,
it's possible one large and slow GET request could cause
eventlet hub not to schedule any other green threads for a
long period of time. To improve this, this patch add a
configurable sleep parameter into DiskFile reader, which
is 'cooperative_period' with a default value of 0 (disabled).

Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Change-Id: I80b04bad0601b6cd6caef35498f89d4ba70a4fd4
This commit is contained in:
Jianjian Huo 2023-09-28 16:48:56 -07:00 committed by Matthew Oliver
parent 50336c5098
commit d5877179a5
7 changed files with 183 additions and 31 deletions

View File

@ -148,6 +148,12 @@ use = egg:swift#object
# 'keep_cache_private' is false.
# keep_cache_slo_manifest = false
#
# cooperative_period defines how frequent object server GET request will
# perform the cooperative yielding during iterating the disk chunks. For
# example, value of '5' will insert one sleep() after every 5 disk_chunk_size
# chunk reads. A value of '0' (the default) will turn off cooperative yielding.
# cooperative_period = 0
#
# on PUTs, sync data every n MB
# mb_per_sync = 512
#

View File

@ -6506,18 +6506,20 @@ class CooperativeIterator(ClosingIterator):
:param iterable: iterator to wrap.
:param period: number of items yielded from this iterator between calls to
``sleep()``.
``sleep()``; a negative value or 0 mean that cooperative sleep will be
disabled.
"""
__slots__ = ('period', 'count')
def __init__(self, iterable, period=5):
super(CooperativeIterator, self).__init__(iterable)
self.count = 0
self.period = period
self.period = max(0, period or 0)
def _get_next_item(self):
if self.count >= self.period:
self.count = 0
sleep()
self.count += 1
if self.period:
if self.count >= self.period:
self.count = 0
sleep()
self.count += 1
return super(CooperativeIterator, self)._get_next_item()

View File

@ -66,7 +66,7 @@ from swift.common.utils import mkdirs, Timestamp, \
MD5_OF_EMPTY_STRING, link_fd_to_path, \
O_TMPFILE, makedirs_count, replace_partition_in_path, remove_directory, \
md5, is_file_older, non_negative_float, config_fallocate_value, \
fs_has_free_space
fs_has_free_space, CooperativeIterator
from swift.common.splice import splice, tee
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
@ -2110,11 +2110,13 @@ class BaseDiskFileReader(object):
:param pipe_size: size of pipe buffer used in zero-copy operations
:param diskfile: the diskfile creating this DiskFileReader instance
:param keep_cache: should resulting reads be kept in the buffer cache
:param cooperative_period: the period parameter when does cooperative
yielding during file read
"""
def __init__(self, fp, data_file, obj_size, etag,
disk_chunk_size, keep_cache_size, device_path, logger,
quarantine_hook, use_splice, pipe_size, diskfile,
keep_cache=False):
keep_cache=False, cooperative_period=0):
# Parameter tracking
self._fp = fp
self._data_file = data_file
@ -2133,6 +2135,7 @@ class BaseDiskFileReader(object):
self._keep_cache = obj_size < keep_cache_size
else:
self._keep_cache = False
self._cooperative_period = cooperative_period
# Internal Attributes
self._iter_etag = None
@ -2157,6 +2160,10 @@ class BaseDiskFileReader(object):
self._iter_etag.update(chunk)
def __iter__(self):
return CooperativeIterator(
self._inner_iter(), period=self._cooperative_period)
def _inner_iter(self):
"""Returns an iterator over the data file."""
try:
dropped_cache = 0
@ -2975,7 +2982,7 @@ class BaseDiskFile(object):
with self.open(current_time=current_time):
return self.get_metadata()
def reader(self, keep_cache=False,
def reader(self, keep_cache=False, cooperative_period=0,
_quarantine_hook=lambda m: None):
"""
Return a :class:`swift.common.swob.Response` class compatible
@ -2987,6 +2994,8 @@ class BaseDiskFile(object):
:param keep_cache: caller's preference for keeping data read in the
OS buffer cache
:param cooperative_period: the period parameter for cooperative
yielding during file read
:param _quarantine_hook: 1-arg callable called when obj quarantined;
the arg is the reason for quarantine.
Default is to ignore it.
@ -2998,7 +3007,8 @@ class BaseDiskFile(object):
self._metadata['ETag'], self._disk_chunk_size,
self._manager.keep_cache_size, self._device_path, self._logger,
use_splice=self._use_splice, quarantine_hook=_quarantine_hook,
pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache)
pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache,
cooperative_period=cooperative_period)
# At this point the reader object is now responsible for closing
# the file pointer.
self._fp = None
@ -3161,11 +3171,12 @@ class ECDiskFileReader(BaseDiskFileReader):
def __init__(self, fp, data_file, obj_size, etag,
disk_chunk_size, keep_cache_size, device_path, logger,
quarantine_hook, use_splice, pipe_size, diskfile,
keep_cache=False):
keep_cache=False, cooperative_period=0):
super(ECDiskFileReader, self).__init__(
fp, data_file, obj_size, etag,
disk_chunk_size, keep_cache_size, device_path, logger,
quarantine_hook, use_splice, pipe_size, diskfile, keep_cache)
quarantine_hook, use_splice, pipe_size, diskfile, keep_cache,
cooperative_period)
self.frag_buf = None
self.frag_offset = 0
self.frag_size = self._diskfile.policy.fragment_size

View File

@ -426,13 +426,14 @@ class DiskFile(object):
with self.open(current_time=current_time):
return self.get_metadata()
def reader(self, keep_cache=False):
def reader(self, keep_cache=False, cooperative_period=0):
"""
Return a swift.common.swob.Response class compatible "app_iter"
object. The responsibility of closing the open file is passed to the
DiskFileReader object.
:param keep_cache:
:param cooperative_period:
"""
dr = DiskFileReader(self._name, self._fp,
int(self._metadata['Content-Length']),

View File

@ -152,6 +152,7 @@ class ObjectController(BaseStorageServer):
config_true_value(conf.get('keep_cache_private', 'false'))
self.keep_cache_slo_manifest = \
config_true_value(conf.get('keep_cache_slo_manifest', 'false'))
self.cooperative_period = int(conf.get("cooperative_period", 0))
default_allowed_headers = '''
content-disposition,
@ -1097,10 +1098,15 @@ class ObjectController(BaseStorageServer):
)
)
conditional_etag = resolve_etag_is_at_header(request, metadata)
app_iter = disk_file.reader(
keep_cache=keep_cache,
cooperative_period=self.cooperative_period,
)
response = Response(
app_iter=disk_file.reader(keep_cache=keep_cache),
request=request, conditional_response=True,
conditional_etag=conditional_etag)
app_iter=app_iter, request=request,
conditional_response=True,
conditional_etag=conditional_etag,
)
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.items():

View File

@ -9821,7 +9821,7 @@ class TestCooperativeIterator(unittest.TestCase):
it.close()
self.assertTrue(closeable.close.called)
def test_next(self):
def test_sleeps(self):
def do_test(it, period):
results = []
for i in range(period):
@ -9852,8 +9852,21 @@ class TestCooperativeIterator(unittest.TestCase):
self.assertEqual(list(range(7)), actual)
actual = do_test(utils.CooperativeIterator(itertools.count(), 1), 1)
self.assertEqual(list(range(3)), actual)
actual = do_test(utils.CooperativeIterator(itertools.count(), 0), 0)
self.assertEqual(list(range(2)), actual)
def test_no_sleeps(self):
def do_test(period):
it = utils.CooperativeIterator(itertools.count(), period)
results = []
with mock.patch('swift.common.utils.sleep') as mock_sleep:
for i in range(100):
results.append(next(it))
self.assertFalse(mock_sleep.called, i)
self.assertEqual(list(range(100)), results)
do_test(0)
do_test(-1)
do_test(-111)
do_test(None)
class TestContextPool(unittest.TestCase):

View File

@ -4346,7 +4346,8 @@ class TestObjectController(BaseTestCase):
reader_mock = mock.Mock(keep_cache=False)
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=True)
reader_mock.assert_called_with(
keep_cache=True, cooperative_period=0)
self.assertEqual(resp.status_int, 200)
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
self.assertEqual(dict(resp.headers), {
@ -4369,7 +4370,8 @@ class TestObjectController(BaseTestCase):
reader_mock = mock.Mock(keep_cache=False)
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=True)
reader_mock.assert_called_with(
keep_cache=True, cooperative_period=0)
self.assertEqual(resp.status_int, 200)
# Request headers have 'X-Storage-Token'.
@ -4379,7 +4381,8 @@ class TestObjectController(BaseTestCase):
reader_mock = mock.Mock(keep_cache=False)
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=True)
reader_mock.assert_called_with(
keep_cache=True, cooperative_period=0)
self.assertEqual(resp.status_int, 200)
# Request headers have both 'X-Auth-Token' and 'X-Storage-Token'.
@ -4390,7 +4393,8 @@ class TestObjectController(BaseTestCase):
reader_mock = mock.Mock(keep_cache=False)
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=True)
reader_mock.assert_called_with(
keep_cache=True, cooperative_period=0)
self.assertEqual(resp.status_int, 200)
def test_GET_keep_cache_private_config_false(self):
@ -4418,7 +4422,8 @@ class TestObjectController(BaseTestCase):
reader_mock = mock.Mock(keep_cache=False)
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=True)
reader_mock.assert_called_with(
keep_cache=True, cooperative_period=0)
self.assertEqual(resp.status_int, 200)
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
self.assertEqual(dict(resp.headers), {
@ -4441,7 +4446,8 @@ class TestObjectController(BaseTestCase):
reader_mock = mock.Mock(keep_cache=False)
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=False)
reader_mock.assert_called_with(
keep_cache=False, cooperative_period=0)
self.assertEqual(resp.status_int, 200)
# Request headers have 'X-Storage-Token'.
@ -4451,7 +4457,8 @@ class TestObjectController(BaseTestCase):
reader_mock = mock.Mock(keep_cache=False)
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=False)
reader_mock.assert_called_with(
keep_cache=False, cooperative_period=0)
self.assertEqual(resp.status_int, 200)
# Request headers have both 'X-Auth-Token' and 'X-Storage-Token'.
@ -4462,7 +4469,8 @@ class TestObjectController(BaseTestCase):
reader_mock = mock.Mock(keep_cache=False)
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=False)
reader_mock.assert_called_with(
keep_cache=False, cooperative_period=0)
self.assertEqual(resp.status_int, 200)
def test_GET_keep_cache_slo_manifest_no_config(self):
@ -4492,7 +4500,8 @@ class TestObjectController(BaseTestCase):
reader_mock = mock.Mock(keep_cache=False)
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=False)
reader_mock.assert_called_with(
keep_cache=False, cooperative_period=0)
self.assertEqual(resp.status_int, 200)
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
self.assertEqual(dict(resp.headers), {
@ -4537,7 +4546,8 @@ class TestObjectController(BaseTestCase):
reader_mock = mock.Mock(keep_cache=False)
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=False)
reader_mock.assert_called_with(
keep_cache=False, cooperative_period=0)
self.assertEqual(resp.status_int, 200)
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
self.assertEqual(dict(resp.headers), {
@ -4582,7 +4592,8 @@ class TestObjectController(BaseTestCase):
reader_mock = mock.Mock(keep_cache=False)
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=True)
reader_mock.assert_called_with(
keep_cache=True, cooperative_period=0)
self.assertEqual(resp.status_int, 200)
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
self.assertEqual(dict(resp.headers), {
@ -4626,7 +4637,8 @@ class TestObjectController(BaseTestCase):
reader_mock = mock.Mock(keep_cache=False)
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=False)
reader_mock.assert_called_with(
keep_cache=False, cooperative_period=0)
self.assertEqual(resp.status_int, 200)
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
self.assertEqual(dict(resp.headers), {
@ -4642,6 +4654,107 @@ class TestObjectController(BaseTestCase):
gmtime(math.ceil(float(timestamp)))),
})
def test_GET_cooperative_period_config(self):
# Test config of 'cooperative_period' gets passed to DiskFile reader.
conf = {'devices': self.testdir, 'mount_check': 'false',
'container_update_timeout': 0.0,
'cooperative_period': '99'}
obj_controller = object_server.ObjectController(
conf, logger=self.logger)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'application/x-test'})
req.body = b'7 bytes'
resp = req.get_response(obj_controller)
self.assertEqual(resp.status_int, 201)
req = Request.blank('/sda1/p/a/c/o',
headers={'Content-Type': 'application/x-test',
'X-Auth-Token': '2340lsdfhhjl02lxfjj'})
with mock.patch(
"swift.obj.diskfile.BaseDiskFile.reader"
) as reader_mock:
resp = req.get_response(obj_controller)
reader_mock.assert_called_with(keep_cache=False, cooperative_period=99)
self.assertEqual(resp.status_int, 200)
# Test DiskFile reader actually sleeps when reading chunks. When
# cooperative_period is 1, disk reader sleeps once AFTER each next().
conf['cooperative_period'] = '1'
obj_controller = object_server.ObjectController(
conf, logger=self.logger)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=1-6'})
with mock.patch('swift.common.utils.sleep') as mock_sleep:
resp = req.get_response(obj_controller)
self.assertEqual(resp.status_int, 206)
self.assertEqual('bytes 1-6/7', resp.headers.get('Content-Range'))
self.assertEqual(b' bytes', resp.body)
self.assertEqual(1, mock_sleep.call_count)
# Test DiskFile reader actually sleeps when reading chunks. And verify
# number of sleeps when 'disk_chunk_size' is set.
conf['cooperative_period'] = '2'
conf['disk_chunk_size'] = 2
obj_controller = object_server.ObjectController(
conf, logger=self.logger)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'})
with mock.patch('swift.common.utils.sleep') as mock_sleep:
resp = req.get_response(obj_controller)
self.assertEqual(resp.status_int, 200)
self.assertEqual(b'7 bytes', resp.body)
self.assertEqual(2, mock_sleep.call_count)
conf['cooperative_period'] = '2'
conf['disk_chunk_size'] = 3
obj_controller = object_server.ObjectController(
conf, logger=self.logger)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'})
with mock.patch('swift.common.utils.sleep') as mock_sleep:
resp = req.get_response(obj_controller)
self.assertEqual(resp.status_int, 200)
self.assertEqual(b'7 bytes', resp.body)
self.assertEqual(1, mock_sleep.call_count)
# Test DiskFile reader won't sleep with cooperative_period set as 0.
conf['cooperative_period'] = '0'
obj_controller = object_server.ObjectController(
conf, logger=self.logger)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=1-6'})
with mock.patch('swift.common.utils.sleep') as mock_sleep:
resp = req.get_response(obj_controller)
self.assertEqual(resp.status_int, 206)
self.assertEqual('bytes 1-6/7', resp.headers.get('Content-Range'))
self.assertEqual(b' bytes', resp.body)
self.assertFalse(mock_sleep.called)
# Test DiskFile reader won't sleep with default cooperative_period
# which is also 0.
conf.pop('cooperative_period')
obj_controller = object_server.ObjectController(
conf, logger=self.logger)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=1-6'})
with mock.patch('swift.common.utils.sleep') as mock_sleep:
resp = req.get_response(obj_controller)
self.assertEqual(resp.status_int, 206)
self.assertEqual('bytes 1-6/7', resp.headers.get('Content-Range'))
self.assertEqual(b' bytes', resp.body)
self.assertFalse(mock_sleep.called)
@mock.patch("time.time", mock_time)
def test_DELETE(self):
# Test swift.obj.server.ObjectController.DELETE