From d5877179a5a03839d2ace56fc40b67a59f7e3028 Mon Sep 17 00:00:00 2001 From: Jianjian Huo Date: Thu, 28 Sep 2023 16:48:56 -0700 Subject: [PATCH] Object-server: add periodic greenthread yielding during file read. Currently, when object-server serves GET request and DiskFile reader iterate over disk file chunks, there is no explicit eventlet sleep called. When network outpace the slow disk IO, it's possible one large and slow GET request could cause eventlet hub not to schedule any other green threads for a long period of time. To improve this, this patch add a configurable sleep parameter into DiskFile reader, which is 'cooperative_period' with a default value of 0 (disabled). Co-Authored-By: Clay Gerrard Change-Id: I80b04bad0601b6cd6caef35498f89d4ba70a4fd4 --- etc/object-server.conf-sample | 6 ++ swift/common/utils/__init__.py | 14 ++-- swift/obj/diskfile.py | 23 ++++-- swift/obj/mem_diskfile.py | 3 +- swift/obj/server.py | 12 ++- test/unit/common/test_utils.py | 19 ++++- test/unit/obj/test_server.py | 137 ++++++++++++++++++++++++++++++--- 7 files changed, 183 insertions(+), 31 deletions(-) diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index ac7d497adb..67c879b1ac 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -148,6 +148,12 @@ use = egg:swift#object # 'keep_cache_private' is false. # keep_cache_slo_manifest = false # +# cooperative_period defines how frequent object server GET request will +# perform the cooperative yielding during iterating the disk chunks. For +# example, value of '5' will insert one sleep() after every 5 disk_chunk_size +# chunk reads. A value of '0' (the default) will turn off cooperative yielding. +# cooperative_period = 0 +# # on PUTs, sync data every n MB # mb_per_sync = 512 # diff --git a/swift/common/utils/__init__.py b/swift/common/utils/__init__.py index 52d235b5c7..a735e85a63 100644 --- a/swift/common/utils/__init__.py +++ b/swift/common/utils/__init__.py @@ -6506,18 +6506,20 @@ class CooperativeIterator(ClosingIterator): :param iterable: iterator to wrap. :param period: number of items yielded from this iterator between calls to - ``sleep()``. + ``sleep()``; a negative value or 0 mean that cooperative sleep will be + disabled. """ __slots__ = ('period', 'count') def __init__(self, iterable, period=5): super(CooperativeIterator, self).__init__(iterable) self.count = 0 - self.period = period + self.period = max(0, period or 0) def _get_next_item(self): - if self.count >= self.period: - self.count = 0 - sleep() - self.count += 1 + if self.period: + if self.count >= self.period: + self.count = 0 + sleep() + self.count += 1 return super(CooperativeIterator, self)._get_next_item() diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index d593c0d686..d59c486252 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -66,7 +66,7 @@ from swift.common.utils import mkdirs, Timestamp, \ MD5_OF_EMPTY_STRING, link_fd_to_path, \ O_TMPFILE, makedirs_count, replace_partition_in_path, remove_directory, \ md5, is_file_older, non_negative_float, config_fallocate_value, \ - fs_has_free_space + fs_has_free_space, CooperativeIterator from swift.common.splice import splice, tee from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ @@ -2110,11 +2110,13 @@ class BaseDiskFileReader(object): :param pipe_size: size of pipe buffer used in zero-copy operations :param diskfile: the diskfile creating this DiskFileReader instance :param keep_cache: should resulting reads be kept in the buffer cache + :param cooperative_period: the period parameter when does cooperative + yielding during file read """ def __init__(self, fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, quarantine_hook, use_splice, pipe_size, diskfile, - keep_cache=False): + keep_cache=False, cooperative_period=0): # Parameter tracking self._fp = fp self._data_file = data_file @@ -2133,6 +2135,7 @@ class BaseDiskFileReader(object): self._keep_cache = obj_size < keep_cache_size else: self._keep_cache = False + self._cooperative_period = cooperative_period # Internal Attributes self._iter_etag = None @@ -2157,6 +2160,10 @@ class BaseDiskFileReader(object): self._iter_etag.update(chunk) def __iter__(self): + return CooperativeIterator( + self._inner_iter(), period=self._cooperative_period) + + def _inner_iter(self): """Returns an iterator over the data file.""" try: dropped_cache = 0 @@ -2975,7 +2982,7 @@ class BaseDiskFile(object): with self.open(current_time=current_time): return self.get_metadata() - def reader(self, keep_cache=False, + def reader(self, keep_cache=False, cooperative_period=0, _quarantine_hook=lambda m: None): """ Return a :class:`swift.common.swob.Response` class compatible @@ -2987,6 +2994,8 @@ class BaseDiskFile(object): :param keep_cache: caller's preference for keeping data read in the OS buffer cache + :param cooperative_period: the period parameter for cooperative + yielding during file read :param _quarantine_hook: 1-arg callable called when obj quarantined; the arg is the reason for quarantine. Default is to ignore it. @@ -2998,7 +3007,8 @@ class BaseDiskFile(object): self._metadata['ETag'], self._disk_chunk_size, self._manager.keep_cache_size, self._device_path, self._logger, use_splice=self._use_splice, quarantine_hook=_quarantine_hook, - pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache) + pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache, + cooperative_period=cooperative_period) # At this point the reader object is now responsible for closing # the file pointer. self._fp = None @@ -3161,11 +3171,12 @@ class ECDiskFileReader(BaseDiskFileReader): def __init__(self, fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, quarantine_hook, use_splice, pipe_size, diskfile, - keep_cache=False): + keep_cache=False, cooperative_period=0): super(ECDiskFileReader, self).__init__( fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, - quarantine_hook, use_splice, pipe_size, diskfile, keep_cache) + quarantine_hook, use_splice, pipe_size, diskfile, keep_cache, + cooperative_period) self.frag_buf = None self.frag_offset = 0 self.frag_size = self._diskfile.policy.fragment_size diff --git a/swift/obj/mem_diskfile.py b/swift/obj/mem_diskfile.py index fa72372fe1..3dee2c1354 100644 --- a/swift/obj/mem_diskfile.py +++ b/swift/obj/mem_diskfile.py @@ -426,13 +426,14 @@ class DiskFile(object): with self.open(current_time=current_time): return self.get_metadata() - def reader(self, keep_cache=False): + def reader(self, keep_cache=False, cooperative_period=0): """ Return a swift.common.swob.Response class compatible "app_iter" object. The responsibility of closing the open file is passed to the DiskFileReader object. :param keep_cache: + :param cooperative_period: """ dr = DiskFileReader(self._name, self._fp, int(self._metadata['Content-Length']), diff --git a/swift/obj/server.py b/swift/obj/server.py index 343cf624f8..5853eac9a9 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -152,6 +152,7 @@ class ObjectController(BaseStorageServer): config_true_value(conf.get('keep_cache_private', 'false')) self.keep_cache_slo_manifest = \ config_true_value(conf.get('keep_cache_slo_manifest', 'false')) + self.cooperative_period = int(conf.get("cooperative_period", 0)) default_allowed_headers = ''' content-disposition, @@ -1097,10 +1098,15 @@ class ObjectController(BaseStorageServer): ) ) conditional_etag = resolve_etag_is_at_header(request, metadata) + app_iter = disk_file.reader( + keep_cache=keep_cache, + cooperative_period=self.cooperative_period, + ) response = Response( - app_iter=disk_file.reader(keep_cache=keep_cache), - request=request, conditional_response=True, - conditional_etag=conditional_etag) + app_iter=app_iter, request=request, + conditional_response=True, + conditional_etag=conditional_etag, + ) response.headers['Content-Type'] = metadata.get( 'Content-Type', 'application/octet-stream') for key, value in metadata.items(): diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 0414d92cf1..5742f550e4 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -9821,7 +9821,7 @@ class TestCooperativeIterator(unittest.TestCase): it.close() self.assertTrue(closeable.close.called) - def test_next(self): + def test_sleeps(self): def do_test(it, period): results = [] for i in range(period): @@ -9852,8 +9852,21 @@ class TestCooperativeIterator(unittest.TestCase): self.assertEqual(list(range(7)), actual) actual = do_test(utils.CooperativeIterator(itertools.count(), 1), 1) self.assertEqual(list(range(3)), actual) - actual = do_test(utils.CooperativeIterator(itertools.count(), 0), 0) - self.assertEqual(list(range(2)), actual) + + def test_no_sleeps(self): + def do_test(period): + it = utils.CooperativeIterator(itertools.count(), period) + results = [] + with mock.patch('swift.common.utils.sleep') as mock_sleep: + for i in range(100): + results.append(next(it)) + self.assertFalse(mock_sleep.called, i) + self.assertEqual(list(range(100)), results) + + do_test(0) + do_test(-1) + do_test(-111) + do_test(None) class TestContextPool(unittest.TestCase): diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 52ab294585..3c27c0b61a 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -4346,7 +4346,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=True) + reader_mock.assert_called_with( + keep_cache=True, cooperative_period=0) self.assertEqual(resp.status_int, 200) etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest() self.assertEqual(dict(resp.headers), { @@ -4369,7 +4370,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=True) + reader_mock.assert_called_with( + keep_cache=True, cooperative_period=0) self.assertEqual(resp.status_int, 200) # Request headers have 'X-Storage-Token'. @@ -4379,7 +4381,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=True) + reader_mock.assert_called_with( + keep_cache=True, cooperative_period=0) self.assertEqual(resp.status_int, 200) # Request headers have both 'X-Auth-Token' and 'X-Storage-Token'. @@ -4390,7 +4393,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=True) + reader_mock.assert_called_with( + keep_cache=True, cooperative_period=0) self.assertEqual(resp.status_int, 200) def test_GET_keep_cache_private_config_false(self): @@ -4418,7 +4422,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=True) + reader_mock.assert_called_with( + keep_cache=True, cooperative_period=0) self.assertEqual(resp.status_int, 200) etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest() self.assertEqual(dict(resp.headers), { @@ -4441,7 +4446,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=False) + reader_mock.assert_called_with( + keep_cache=False, cooperative_period=0) self.assertEqual(resp.status_int, 200) # Request headers have 'X-Storage-Token'. @@ -4451,7 +4457,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=False) + reader_mock.assert_called_with( + keep_cache=False, cooperative_period=0) self.assertEqual(resp.status_int, 200) # Request headers have both 'X-Auth-Token' and 'X-Storage-Token'. @@ -4462,7 +4469,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=False) + reader_mock.assert_called_with( + keep_cache=False, cooperative_period=0) self.assertEqual(resp.status_int, 200) def test_GET_keep_cache_slo_manifest_no_config(self): @@ -4492,7 +4500,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=False) + reader_mock.assert_called_with( + keep_cache=False, cooperative_period=0) self.assertEqual(resp.status_int, 200) etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest() self.assertEqual(dict(resp.headers), { @@ -4537,7 +4546,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=False) + reader_mock.assert_called_with( + keep_cache=False, cooperative_period=0) self.assertEqual(resp.status_int, 200) etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest() self.assertEqual(dict(resp.headers), { @@ -4582,7 +4592,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=True) + reader_mock.assert_called_with( + keep_cache=True, cooperative_period=0) self.assertEqual(resp.status_int, 200) etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest() self.assertEqual(dict(resp.headers), { @@ -4626,7 +4637,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=False) + reader_mock.assert_called_with( + keep_cache=False, cooperative_period=0) self.assertEqual(resp.status_int, 200) etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest() self.assertEqual(dict(resp.headers), { @@ -4642,6 +4654,107 @@ class TestObjectController(BaseTestCase): gmtime(math.ceil(float(timestamp)))), }) + def test_GET_cooperative_period_config(self): + # Test config of 'cooperative_period' gets passed to DiskFile reader. + conf = {'devices': self.testdir, 'mount_check': 'false', + 'container_update_timeout': 0.0, + 'cooperative_period': '99'} + obj_controller = object_server.ObjectController( + conf, logger=self.logger) + + timestamp = normalize_timestamp(time()) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'application/x-test'}) + req.body = b'7 bytes' + resp = req.get_response(obj_controller) + self.assertEqual(resp.status_int, 201) + + req = Request.blank('/sda1/p/a/c/o', + headers={'Content-Type': 'application/x-test', + 'X-Auth-Token': '2340lsdfhhjl02lxfjj'}) + with mock.patch( + "swift.obj.diskfile.BaseDiskFile.reader" + ) as reader_mock: + resp = req.get_response(obj_controller) + reader_mock.assert_called_with(keep_cache=False, cooperative_period=99) + self.assertEqual(resp.status_int, 200) + + # Test DiskFile reader actually sleeps when reading chunks. When + # cooperative_period is 1, disk reader sleeps once AFTER each next(). + conf['cooperative_period'] = '1' + obj_controller = object_server.ObjectController( + conf, logger=self.logger) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=1-6'}) + with mock.patch('swift.common.utils.sleep') as mock_sleep: + resp = req.get_response(obj_controller) + self.assertEqual(resp.status_int, 206) + self.assertEqual('bytes 1-6/7', resp.headers.get('Content-Range')) + self.assertEqual(b' bytes', resp.body) + self.assertEqual(1, mock_sleep.call_count) + + # Test DiskFile reader actually sleeps when reading chunks. And verify + # number of sleeps when 'disk_chunk_size' is set. + conf['cooperative_period'] = '2' + conf['disk_chunk_size'] = 2 + obj_controller = object_server.ObjectController( + conf, logger=self.logger) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}) + with mock.patch('swift.common.utils.sleep') as mock_sleep: + resp = req.get_response(obj_controller) + self.assertEqual(resp.status_int, 200) + self.assertEqual(b'7 bytes', resp.body) + self.assertEqual(2, mock_sleep.call_count) + + conf['cooperative_period'] = '2' + conf['disk_chunk_size'] = 3 + obj_controller = object_server.ObjectController( + conf, logger=self.logger) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}) + with mock.patch('swift.common.utils.sleep') as mock_sleep: + resp = req.get_response(obj_controller) + self.assertEqual(resp.status_int, 200) + self.assertEqual(b'7 bytes', resp.body) + self.assertEqual(1, mock_sleep.call_count) + + # Test DiskFile reader won't sleep with cooperative_period set as 0. + conf['cooperative_period'] = '0' + obj_controller = object_server.ObjectController( + conf, logger=self.logger) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=1-6'}) + with mock.patch('swift.common.utils.sleep') as mock_sleep: + resp = req.get_response(obj_controller) + self.assertEqual(resp.status_int, 206) + self.assertEqual('bytes 1-6/7', resp.headers.get('Content-Range')) + self.assertEqual(b' bytes', resp.body) + self.assertFalse(mock_sleep.called) + + # Test DiskFile reader won't sleep with default cooperative_period + # which is also 0. + conf.pop('cooperative_period') + obj_controller = object_server.ObjectController( + conf, logger=self.logger) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=1-6'}) + with mock.patch('swift.common.utils.sleep') as mock_sleep: + resp = req.get_response(obj_controller) + self.assertEqual(resp.status_int, 206) + self.assertEqual('bytes 1-6/7', resp.headers.get('Content-Range')) + self.assertEqual(b' bytes', resp.body) + self.assertFalse(mock_sleep.called) + @mock.patch("time.time", mock_time) def test_DELETE(self): # Test swift.obj.server.ObjectController.DELETE