From 8378a11d1113386326841ef7065914dd1629a15f Mon Sep 17 00:00:00 2001 From: Romain LE DISEZ Date: Mon, 4 Nov 2019 14:47:35 +0800 Subject: [PATCH] Replace all "with Chunk*Timeout" by a watchdog The contextmanager eventlet.timeout.Timeout is scheduling a call to throw an exception every time is is entered. The swift-proxy uses Chunk(Read|Write)Timeout for every chunk read/written from the client or object-server. For a single upload/download of a big object, it means tens of thousands of scheduling in eventlet, which is very costly. This patch replace the usage of these context managers by a watchdog greenthread that will schedule itself by sleeping until the next timeout expiration. Then, only if a timeout expired, it will schedule a call to throw the appropriate exception. The gain on bandwidth and CPU usage is significant. On a benchmark environment, it gave this result for an upload of 6 Gbpson a replica policy (average of 3 runs): master: 5.66 Gbps / 849 jiffies consumed by the proxy-server this patch: 7.56 Gbps / 618 jiffies consumed by the proxy-server Change-Id: I19fd42908be5a6ac5905ba193967cd860cb27a0b --- swift/common/utils.py | 130 ++++++++++++++++++++++++++++++++ swift/proxy/controllers/base.py | 22 ++++-- swift/proxy/controllers/obj.py | 58 ++++++++------ swift/proxy/server.py | 4 +- test/unit/common/test_utils.py | 83 ++++++++++++++++++++ test/unit/proxy/test_server.py | 6 +- 6 files changed, 267 insertions(+), 36 deletions(-) diff --git a/swift/common/utils.py b/swift/common/utils.py index d5692cada3..8a168393c5 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -62,6 +62,7 @@ import eventlet.patcher import eventlet.semaphore import pkg_resources from eventlet import GreenPool, sleep, Timeout +from eventlet.event import Event from eventlet.green import socket, threading import eventlet.hubs import eventlet.queue @@ -5847,3 +5848,132 @@ def systemd_notify(logger=None): except EnvironmentError: if logger: logger.debug("Systemd notification failed", exc_info=True) + + +class Watchdog(object): + """ + Implements a watchdog to efficiently manage concurrent timeouts. + + Compared to eventlet.timeouts.Timeout, it reduces the number of context + switching in eventlet by avoiding to schedule actions (throw an Exception), + then unschedule them if the timeouts are cancelled. + + 1. at T+0, request timeout(10) + => wathdog greenlet sleeps 10 seconds + 2. at T+1, request timeout(15) + => the timeout will expire after the current, no need to wake up the + watchdog greenlet + 3. at T+2, request timeout(5) + => the timeout will expire before the first timeout, wake up the + watchdog greenlet to calculate a new sleep period + 4. at T+7, the 3rd timeout expires + => the exception is raised, then the greenlet watchdog sleep(3) to + wake up for the 1st timeout expiration + """ + def __init__(self): + # key => (timeout, timeout_at, caller_greenthread, exception) + self._timeouts = dict() + self._evt = Event() + self._next_expiration = None + self._run_gth = None + + def start(self, timeout, exc, timeout_at=None): + """ + Schedule a timeout action + + :param timeout: duration before the timeout expires + :param exc: exception to throw when the timeout expire, must inherit + from eventlet.timeouts.Timeout + :param timeout_at: allow to force the expiration timestamp + :return: id of the scheduled timeout, needed to cancel it + """ + if not timeout_at: + timeout_at = time.time() + timeout + gth = eventlet.greenthread.getcurrent() + timeout_definition = (timeout, timeout_at, gth, exc) + key = id(timeout_definition) + self._timeouts[key] = timeout_definition + + # Wake up the watchdog loop only when there is a new shorter timeout + if (self._next_expiration is None + or self._next_expiration > timeout_at): + # There could be concurrency on .send(), so wrap it in a try + try: + if not self._evt.ready(): + self._evt.send() + except AssertionError: + pass + + return key + + def stop(self, key): + """ + Cancel a scheduled timeout + + :param key: timeout id, as returned by start() + """ + try: + if key in self._timeouts: + del(self._timeouts[key]) + except KeyError: + pass + + def spawn(self): + """ + Start the watchdog greenthread. + """ + if self._run_gth is None: + self._run_gth = eventlet.spawn(self.run) + + def run(self): + while True: + self._run() + + def _run(self): + now = time.time() + self._next_expiration = None + if self._evt.ready(): + self._evt.reset() + for k, (timeout, timeout_at, gth, exc) in list(self._timeouts.items()): + if timeout_at <= now: + try: + if k in self._timeouts: + del(self._timeouts[k]) + except KeyError: + pass + e = exc() + e.seconds = timeout + eventlet.hubs.get_hub().schedule_call_global(0, gth.throw, e) + else: + if (self._next_expiration is None + or self._next_expiration > timeout_at): + self._next_expiration = timeout_at + if self._next_expiration is None: + sleep_duration = self._next_expiration + else: + sleep_duration = self._next_expiration - now + self._evt.wait(sleep_duration) + + +class WatchdogTimeout(object): + """ + Context manager to schedule a timeout in a Watchdog instance + """ + def __init__(self, watchdog, timeout, exc, timeout_at=None): + """ + Schedule a timeout in a Watchdog instance + + :param watchdog: Watchdog instance + :param timeout: duration before the timeout expires + :param exc: exception to throw when the timeout expire, must inherit + from eventlet.timeouts.Timeout + :param timeout_at: allow to force the expiration timestamp + """ + self.watchdog = watchdog + self.key = watchdog.start(timeout, exc, timeout_at=timeout_at) + + def __enter__(self): + pass + + def __exit__(self, type, value, traceback): + self.watchdog.stop(self.key) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 2d470553cc..6bb5e4257d 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -42,7 +42,7 @@ from eventlet.timeout import Timeout import six from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request -from swift.common.utils import Timestamp, config_true_value, \ +from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \ GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \ document_iters_to_http_response_body, ShardRange, find_shard_range @@ -1047,7 +1047,8 @@ class ResumingGetter(object): # but just a 200 or a single-range 206, then this # performs no IO, and either just returns source or # raises StopIteration. - with ChunkReadTimeout(node_timeout): + with WatchdogTimeout(self.app.watchdog, node_timeout, + ChunkReadTimeout): # if StopIteration is raised, it escapes and is # handled elsewhere start_byte, end_byte, length, headers, part = next( @@ -1078,7 +1079,8 @@ class ResumingGetter(object): part_file = ByteCountEnforcer(part_file, nbytes) while True: try: - with ChunkReadTimeout(node_timeout): + with WatchdogTimeout(self.app.watchdog, node_timeout, + ChunkReadTimeout): chunk = part_file.read(self.app.object_chunk_size) nchunks += 1 # NB: this append must be *inside* the context @@ -1138,8 +1140,9 @@ class ResumingGetter(object): if not chunk: if buf: - with ChunkWriteTimeout( - self.app.client_timeout): + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): self.bytes_used_from_backend += len(buf) yield buf buf = b'' @@ -1149,13 +1152,16 @@ class ResumingGetter(object): while len(buf) >= client_chunk_size: client_chunk = buf[:client_chunk_size] buf = buf[client_chunk_size:] - with ChunkWriteTimeout( - self.app.client_timeout): + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): self.bytes_used_from_backend += \ len(client_chunk) yield client_chunk else: - with ChunkWriteTimeout(self.app.client_timeout): + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): self.bytes_used_from_backend += len(buf) yield buf buf = b'' diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 163503cbe4..68472b4be3 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -44,7 +44,7 @@ from eventlet.timeout import Timeout from swift.common.utils import ( clean_content_type, config_true_value, ContextPool, csv_append, - GreenAsyncPile, GreenthreadSafeIterator, Timestamp, + GreenAsyncPile, GreenthreadSafeIterator, Timestamp, WatchdogTimeout, normalize_delete_at_timestamp, public, get_expirer_container, document_iters_to_http_response_body, parse_content_range, quorum_size, reiterate, close_if_possible, safe_json_loads) @@ -869,7 +869,7 @@ class ReplicatedObjectController(BaseObjectController): def _make_putter(self, node, part, req, headers): if req.environ.get('swift.callback.update_footers'): putter = MIMEPutter.connect( - node, part, req.swift_entity_path, headers, + node, part, req.swift_entity_path, headers, self.app.watchdog, conn_timeout=self.app.conn_timeout, node_timeout=self.app.node_timeout, write_timeout=self.app.node_timeout, @@ -879,7 +879,7 @@ class ReplicatedObjectController(BaseObjectController): else: te = ',' + headers.get('Transfer-Encoding', '') putter = Putter.connect( - node, part, req.swift_entity_path, headers, + node, part, req.swift_entity_path, headers, self.app.watchdog, conn_timeout=self.app.conn_timeout, node_timeout=self.app.node_timeout, write_timeout=self.app.node_timeout, @@ -897,9 +897,10 @@ class ReplicatedObjectController(BaseObjectController): bytes_transferred = 0 def send_chunk(chunk): + timeout_at = time.time() + self.app.node_timeout for putter in list(putters): if not putter.failed: - putter.send_chunk(chunk) + putter.send_chunk(chunk, timeout_at=timeout_at) else: putter.close() putters.remove(putter) @@ -911,7 +912,9 @@ class ReplicatedObjectController(BaseObjectController): min_conns = quorum_size(len(nodes)) try: while True: - with ChunkReadTimeout(self.app.client_timeout): + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkReadTimeout): try: chunk = next(data_source) except StopIteration: @@ -1569,13 +1572,14 @@ class Putter(object): :param resp: an HTTPResponse instance if connect() received final response :param path: the object path to send to the storage node :param connect_duration: time taken to initiate the HTTPConnection + :param watchdog: a spawned Watchdog instance that will enforce timeouts :param write_timeout: time limit to write a chunk to the connection socket :param send_exception_handler: callback called when an exception occured writing to the connection socket :param logger: a Logger instance :param chunked: boolean indicating if the request encoding is chunked """ - def __init__(self, conn, node, resp, path, connect_duration, + def __init__(self, conn, node, resp, path, connect_duration, watchdog, write_timeout, send_exception_handler, logger, chunked=False): # Note: you probably want to call Putter.connect() instead of @@ -1585,6 +1589,7 @@ class Putter(object): self.resp = self.final_resp = resp self.path = path self.connect_duration = connect_duration + self.watchdog = watchdog self.write_timeout = write_timeout self.send_exception_handler = send_exception_handler # for handoff nodes node_index is None @@ -1627,7 +1632,7 @@ class Putter(object): # Subclasses may implement custom behaviour pass - def send_chunk(self, chunk): + def send_chunk(self, chunk, timeout_at=None): if not chunk: # If we're not using chunked transfer-encoding, sending a 0-byte # chunk is just wasteful. If we *are* using chunked @@ -1641,7 +1646,7 @@ class Putter(object): self._start_object_data() self.state = SENDING_DATA - self._send_chunk(chunk) + self._send_chunk(chunk, timeout_at=timeout_at) def end_of_object_data(self, **kwargs): """ @@ -1653,14 +1658,15 @@ class Putter(object): self._send_chunk(b'') self.state = DATA_SENT - def _send_chunk(self, chunk): + def _send_chunk(self, chunk, timeout_at=None): if not self.failed: if self.chunked: to_send = b"%x\r\n%s\r\n" % (len(chunk), chunk) else: to_send = chunk try: - with ChunkWriteTimeout(self.write_timeout): + with WatchdogTimeout(self.watchdog, self.write_timeout, + ChunkWriteTimeout, timeout_at=timeout_at): self.conn.send(to_send) except (Exception, ChunkWriteTimeout): self.failed = True @@ -1702,9 +1708,9 @@ class Putter(object): return conn, resp, final_resp, connect_duration @classmethod - def connect(cls, node, part, path, headers, conn_timeout, node_timeout, - write_timeout, send_exception_handler, logger=None, - chunked=False, **kwargs): + def connect(cls, node, part, path, headers, watchdog, conn_timeout, + node_timeout, write_timeout, send_exception_handler, + logger=None, chunked=False, **kwargs): """ Connect to a backend node and send the headers. @@ -1717,7 +1723,7 @@ class Putter(object): """ conn, expect_resp, final_resp, connect_duration = cls._make_connection( node, part, path, headers, conn_timeout, node_timeout) - return cls(conn, node, final_resp, path, connect_duration, + return cls(conn, node, final_resp, path, connect_duration, watchdog, write_timeout, send_exception_handler, logger, chunked=chunked) @@ -1732,12 +1738,13 @@ class MIMEPutter(Putter): An HTTP PUT request that supports streaming. """ - def __init__(self, conn, node, resp, req, connect_duration, + def __init__(self, conn, node, resp, req, connect_duration, watchdog, write_timeout, send_exception_handler, logger, mime_boundary, multiphase=False): super(MIMEPutter, self).__init__(conn, node, resp, req, - connect_duration, write_timeout, - send_exception_handler, logger) + connect_duration, watchdog, + write_timeout, send_exception_handler, + logger) # Note: you probably want to call MimePutter.connect() instead of # instantiating one of these directly. self.chunked = True # MIME requests always send chunked body @@ -1815,9 +1822,9 @@ class MIMEPutter(Putter): self.state = COMMIT_SENT @classmethod - def connect(cls, node, part, req, headers, conn_timeout, node_timeout, - write_timeout, send_exception_handler, logger=None, - need_multiphase=True, **kwargs): + def connect(cls, node, part, req, headers, watchdog, conn_timeout, + node_timeout, write_timeout, send_exception_handler, + logger=None, need_multiphase=True, **kwargs): """ Connect to a backend node and send the headers. @@ -1869,7 +1876,7 @@ class MIMEPutter(Putter): if need_multiphase and not can_handle_multiphase_put: raise MultiphasePUTNotSupported() - return cls(conn, node, final_resp, req, connect_duration, + return cls(conn, node, final_resp, req, connect_duration, watchdog, write_timeout, send_exception_handler, logger, mime_boundary, multiphase=need_multiphase) @@ -2502,7 +2509,7 @@ class ECObjectController(BaseObjectController): def _make_putter(self, node, part, req, headers): return MIMEPutter.connect( - node, part, req.swift_entity_path, headers, + node, part, req.swift_entity_path, headers, self.app.watchdog, conn_timeout=self.app.conn_timeout, node_timeout=self.app.node_timeout, write_timeout=self.app.node_timeout, @@ -2603,6 +2610,7 @@ class ECObjectController(BaseObjectController): return updated_frag_indexes = set() + timeout_at = time.time() + self.app.node_timeout for putter in list(putters): frag_index = putter_to_frag_index[putter] backend_chunk = backend_chunks[frag_index] @@ -2613,7 +2621,7 @@ class ECObjectController(BaseObjectController): if frag_index not in updated_frag_indexes: frag_hashers[frag_index].update(backend_chunk) updated_frag_indexes.add(frag_index) - putter.send_chunk(backend_chunk) + putter.send_chunk(backend_chunk, timeout_at=timeout_at) else: putter.close() putters.remove(putter) @@ -2629,7 +2637,9 @@ class ECObjectController(BaseObjectController): putters, policy) while True: - with ChunkReadTimeout(self.app.client_timeout): + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkReadTimeout): try: chunk = next(data_source) except StopIteration: diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 56c0afd98c..c79251c69c 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -32,7 +32,7 @@ from swift.common import constraints from swift.common.http import is_server_error from swift.common.storage_policy import POLICIES from swift.common.ring import Ring -from swift.common.utils import cache_from_env, get_logger, \ +from swift.common.utils import Watchdog, cache_from_env, get_logger, \ get_remote_client, split_path, config_true_value, generate_trans_id, \ affinity_key_function, affinity_locality_predicate, list_from_csv, \ register_swift_info, readconf, config_auto_int_value @@ -317,6 +317,8 @@ class Application(object): allow_account_management=self.allow_account_management, account_autocreate=self.account_autocreate, **constraints.EFFECTIVE_CONSTRAINTS) + self.watchdog = Watchdog() + self.watchdog.spawn() def _make_policy_override(self, policy, conf, override_conf): label_for_policy = _label_for_policy(policy) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 6c4a94f9cc..0b7fd5aff8 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -8381,3 +8381,86 @@ class Test_LibcWrapper(unittest.TestCase): # 0 is SEEK_SET 0) self.assertEqual(tf.read(100), b"defgh") + + +class TestWatchdog(unittest.TestCase): + def test_start_stop(self): + w = utils.Watchdog() + w._evt.send = mock.Mock(side_effect=w._evt.send) + gth = object() + + with patch('eventlet.greenthread.getcurrent', return_value=gth),\ + patch('time.time', return_value=10.0): + # On first call, _next_expiration is None, it should unblock + # greenthread that is blocked for ever + key = w.start(1.0, Timeout) + self.assertIn(key, w._timeouts) + self.assertEqual(w._timeouts[key], (1.0, 11.0, gth, Timeout)) + w._evt.send.assert_called_once() + + w.stop(key) + self.assertNotIn(key, w._timeouts) + + def test_timeout_concurrency(self): + w = utils.Watchdog() + w._evt.send = mock.Mock(side_effect=w._evt.send) + w._evt.wait = mock.Mock() + gth = object() + + w._run() + w._evt.wait.assert_called_once_with(None) + + with patch('eventlet.greenthread.getcurrent', return_value=gth): + w._evt.send.reset_mock() + w._evt.wait.reset_mock() + with patch('time.time', return_value=10.00): + # On first call, _next_expiration is None, it should unblock + # greenthread that is blocked for ever + w.start(5.0, Timeout) # Will end at 15.0 + w._evt.send.assert_called_once() + + with patch('time.time', return_value=10.01): + w._run() + self.assertEqual(15.0, w._next_expiration) + w._evt.wait.assert_called_once_with(15.0 - 10.01) + + w._evt.send.reset_mock() + w._evt.wait.reset_mock() + with patch('time.time', return_value=12.00): + # Now _next_expiration is 15.0, it won't unblock greenthread + # because this expiration is later + w.start(5.0, Timeout) # Will end at 17.0 + w._evt.send.assert_not_called() + + w._evt.send.reset_mock() + w._evt.wait.reset_mock() + with patch('time.time', return_value=14.00): + # Now _next_expiration is still 15.0, it will unblock + # greenthread because this new expiration is 14.5 + w.start(0.5, Timeout) # Will end at 14.5 + w._evt.send.assert_called_once() + + with patch('time.time', return_value=14.01): + w._run() + w._evt.wait.assert_called_once_with(14.5 - 14.01) + self.assertEqual(14.5, w._next_expiration) + # Should wakeup at 14.5 + + def test_timeout_expire(self): + w = utils.Watchdog() + w._evt.send = mock.Mock() # To avoid it to call get_hub() + w._evt.wait = mock.Mock() # To avoid it to call get_hub() + + with patch('eventlet.hubs.get_hub') as m_gh: + with patch('time.time', return_value=10.0): + w.start(5.0, Timeout) # Will end at 15.0 + + with patch('time.time', return_value=16.0): + w._run() + m_gh.assert_called_once() + m_gh.return_value.schedule_call_global.assert_called_once() + exc = m_gh.return_value.schedule_call_global.call_args[0][2] + self.assertIsInstance(exc, Timeout) + self.assertEqual(exc.seconds, 5.0) + self.assertEqual(None, w._next_expiration) + w._evt.wait.assert_called_once_with(None) diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 17b4d7e6f7..1076272bb4 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -67,7 +67,7 @@ from swift.common.middleware import proxy_logging, versioned_writes, \ copy, listing_formats from swift.common.middleware.acl import parse_acl, format_acl from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist, \ - APIVersionError, ChunkWriteTimeout, ChunkReadError + APIVersionError, ChunkReadError from swift.common import utils, constraints from swift.common.utils import hash_path, storage_directory, \ parse_content_type, parse_mime_headers, \ @@ -7302,7 +7302,7 @@ class BaseTestECObjectController(BaseTestObjectController): exp = b'HTTP/1.1 201' self.assertEqual(headers[:len(exp)], exp) - class WrappedTimeout(ChunkWriteTimeout): + class WrappedTimeout(utils.WatchdogTimeout): def __enter__(self): timeouts[self] = traceback.extract_stack() return super(WrappedTimeout, self).__enter__() @@ -7312,7 +7312,7 @@ class BaseTestECObjectController(BaseTestObjectController): return super(WrappedTimeout, self).__exit__(typ, value, tb) timeouts = {} - with mock.patch('swift.proxy.controllers.base.ChunkWriteTimeout', + with mock.patch('swift.proxy.controllers.base.WatchdogTimeout', WrappedTimeout): with mock.patch.object(_test_servers[0], 'client_timeout', new=5): # get object