diff --git a/doc/source/overview_wsgi_management.rst b/doc/source/overview_wsgi_management.rst index 206fe3e02f..779c4c65b7 100644 --- a/doc/source/overview_wsgi_management.rst +++ b/doc/source/overview_wsgi_management.rst @@ -58,13 +58,16 @@ is as follows: socket. Once all workers have started and can accept new connections, the manager notifies the socket-closer via a pipe. The socket-closer closes the old worker listen sockets so they stop accepting new - connections, then exits. + connections, passes the list of old workers to the new manager, + then exits. .. image:: images/reload_process_tree_5.svg 5. Old workers continue servicing any in-progress connections, while new connections are picked up by new workers. Once an old worker completes - all of its oustanding requests, it exits. + all of its oustanding requests, it exits. Beginning with Swift 2.33.0, + if any workers persist beyond ``stale_worker_timeout``, the new manager + will clean them up with ``KILL`` signals. .. image:: images/reload_process_tree_6.svg diff --git a/etc/account-server.conf-sample b/etc/account-server.conf-sample index 241156164c..fc21517a11 100644 --- a/etc/account-server.conf-sample +++ b/etc/account-server.conf-sample @@ -117,6 +117,12 @@ use = egg:swift#account # will be denied until the disk ha s more space available. Percentage # will be used if the value ends with a '%'. # fallocate_reserve = 1% +# +# When reloading servers with SIGUSR1, workers running with old config/code +# are allowed some time to finish serving in-flight requests. Use this to +# configure the grace period (in seconds), after which the reloaded server +# will issue SIGKILLs to remaining stale workers. +# stale_worker_timeout = 86400 [filter:healthcheck] use = egg:swift#healthcheck diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index a5750db919..fbf5caad07 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -127,6 +127,12 @@ use = egg:swift#container # will be denied until the disk ha s more space available. Percentage # will be used if the value ends with a '%'. # fallocate_reserve = 1% +# +# When reloading servers with SIGUSR1, workers running with old config/code +# are allowed some time to finish serving in-flight requests. Use this to +# configure the grace period (in seconds), after which the reloaded server +# will issue SIGKILLs to remaining stale workers. +# stale_worker_timeout = 86400 [filter:healthcheck] use = egg:swift#healthcheck diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 15367c1b57..f7275f6ee0 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -245,6 +245,12 @@ use = egg:swift#object # Work only with ionice_class. # ionice_class = # ionice_priority = +# +# When reloading servers with SIGUSR1, workers running with old config/code +# are allowed some time to finish serving in-flight requests. Use this to +# configure the grace period (in seconds), after which the reloaded server +# will issue SIGKILLs to remaining stale workers. +# stale_worker_timeout = 86400 [filter:healthcheck] use = egg:swift#healthcheck diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index b29d03adc8..f2cd114309 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -330,6 +330,12 @@ use = egg:swift#proxy # ionice_class = # ionice_priority = # +# When reloading servers with SIGUSR1, workers running with old config/code +# are allowed some time to finish serving in-flight requests. Use this to +# configure the grace period (in seconds), after which the reloaded server +# will issue SIGKILLs to remaining stale workers. +# stale_worker_timeout = 86400 +# # When upgrading from liberasurecode<=1.5.0, you may want to continue writing # legacy CRCs until all nodes are upgraded and capabale of reading fragments # with zlib CRCs. liberasurecode>=1.6.2 checks for the environment variable diff --git a/swift/common/utils/__init__.py b/swift/common/utils/__init__.py index 67cd6dbd7a..b5639f0cfe 100644 --- a/swift/common/utils/__init__.py +++ b/swift/common/utils/__init__.py @@ -5090,3 +5090,19 @@ class CooperativeIterator(ClosingIterator): sleep() self.count += 1 return super(CooperativeIterator, self)._get_next_item() + + +def get_ppid(pid): + """ + Get the parent process's PID given a child pid. + + :raises OSError: if the child pid cannot be found + """ + try: + with open('/proc/%d/stat' % pid) as fp: + stats = fp.read().split() + return int(stats[3]) + except IOError as e: + if e.errno == errno.ENOENT: + raise OSError(errno.ESRCH, 'No such process') + raise diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index 34abf26178..45e60f22c8 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -18,8 +18,10 @@ from __future__ import print_function import errno +import json import os import signal +import struct import sys from textwrap import dedent import time @@ -45,6 +47,7 @@ from swift.common.utils import capture_stdio, disable_fallocate, \ SIGNUM_TO_NAME = {getattr(signal, n): n for n in dir(signal) if n.startswith('SIG') and '_' not in n} NOTIFY_FD_ENV_KEY = '__SWIFT_SERVER_NOTIFY_FD' +CHILD_STATE_FD_ENV_KEY = '__SWIFT_SERVER_CHILD_STATE_FD' # Set maximum line size of message headers to be accepted. wsgi.MAX_HEADER_LINE = constraints.MAX_HEADER_SIZE @@ -474,6 +477,13 @@ class StrategyBase(object): # children to easily drop refs to sibling sockets in post_fork_hook(). self.tracking_data = {} + # When doing a seamless reload, we inherit a bunch of child processes + # that should all clean themselves up fairly quickly; track them here + self.reload_pids = dict() + # If they don't cleanup quickly, we'll start killing them after this + self.stale_worker_timeout = utils.non_negative_float( + conf.get('stale_worker_timeout', 86400)) + def post_fork_hook(self): """ Called in each forked-off child process, prior to starting the actual @@ -523,9 +533,21 @@ class StrategyBase(object): # connections. This is used for seamless reloading using SIGUSR1. reexec_signal_fd = os.getenv(NOTIFY_FD_ENV_KEY) if reexec_signal_fd: + if ',' in reexec_signal_fd: + reexec_signal_fd, worker_state_fd = reexec_signal_fd.split(',') reexec_signal_fd = int(reexec_signal_fd) os.write(reexec_signal_fd, str(os.getpid()).encode('utf8')) os.close(reexec_signal_fd) + worker_state_fd = os.getenv(CHILD_STATE_FD_ENV_KEY) + try: + self.read_state_from_old_manager(worker_state_fd) + except Exception as e: + # This was all opportunistic anyway; old swift wouldn't even + # *try* to send us any state -- we don't want *new* code to + # fail just because *old* code didn't live up to its promise + self.logger.warning( + 'Failed to read state from the old manager: %r', e, + exc_info=True) # Finally, signal systemd (if appropriate) that process started # properly. @@ -533,6 +555,110 @@ class StrategyBase(object): self.signaled_ready = True + def read_state_from_old_manager(self, worker_state_fd): + """ + Read worker state from the old manager's socket-closer. + + The socket-closing process is the last thing to still have the worker + PIDs in its head, so it sends us a JSON dict (prefixed by its length) + of the form:: + + { + "old_pids": { + "": "", + ... + } + } + + More data may be added in the future. + + :param worker_state_fd: The file descriptor that should have the + old worker state. Should be passed to us + via the ``__SWIFT_SERVER_CHILD_STATE_FD`` + environment variable. + """ + if not worker_state_fd: + return + worker_state_fd = int(worker_state_fd) + try: + # The temporary manager may have up and died while trying to send + # state; hopefully its logs will have more about what went wrong + # -- let's just log at warning here + data_len = os.read(worker_state_fd, 4) + if len(data_len) != 4: + self.logger.warning( + 'Invalid worker state received; expected 4 bytes ' + 'followed by a payload but only received %d bytes', + len(data_len)) + return + + data_len = struct.unpack('!I', data_len)[0] + data = b'' + while len(data) < data_len: + chunk = os.read(worker_state_fd, data_len - len(data)) + if not chunk: + break + data += chunk + if len(data) != data_len: + self.logger.warning( + 'Incomplete worker state received; expected %d ' + 'bytes but only received %d', data_len, len(data)) + return + + # OK, the temporary manager was able to tell us how much it wanted + # to send and send it; from here on, error seems appropriate. + try: + old_state = json.loads(data) + except ValueError: + self.logger.error( + 'Invalid worker state received; ' + 'invalid JSON: %r', data) + return + + try: + old_pids = { + int(pid): float(reloaded) + for pid, reloaded in old_state["old_pids"].items()} + except (KeyError, TypeError) as err: + self.logger.error( + 'Invalid worker state received; ' + 'error reading old pids: %s', err) + self.logger.debug('Received old worker pids: %s', old_pids) + self.reload_pids.update(old_pids) + + def smother(old_pids=old_pids, timeout=self.stale_worker_timeout): + own_pid = os.getpid() + kill_times = sorted(((reloaded + timeout, pid) + for pid, reloaded in old_pids.items()), + reverse=True) + while kill_times: + kill_time, pid = kill_times.pop() + now = time.time() + if kill_time > now: + sleep(kill_time - now) + try: + ppid = utils.get_ppid(pid) + except OSError as e: + if e.errno != errno.ESRCH: + self.logger.error("Could not determine parent " + "for stale pid %d: %s", pid, e) + continue + if ppid == own_pid: + self.logger.notice("Killing long-running stale worker " + "%d after %ds", pid, int(timeout)) + try: + os.kill(pid, signal.SIGKILL) + except OSError as e: + if e.errno != errno.ESRCH: + self.logger.error( + "Could not kill stale pid %d: %s", pid, e) + # else, pid got re-used? + + eventlet.spawn_n(smother) + + finally: + os.close(worker_state_fd) + class WorkersStrategy(StrategyBase): """ @@ -622,11 +748,17 @@ class WorkersStrategy(StrategyBase): :param int pid: The PID of the worker that exited. """ + if self.reload_pids.pop(pid, None): + self.logger.notice('Removing stale child %d from parent %d', + pid, os.getpid()) + return + sock = self.tracking_data.pop(pid, None) if sock is None: - self.logger.info('Ignoring wait() result from unknown PID %s', pid) + self.logger.warning('Ignoring wait() result from unknown PID %d', + pid) else: - self.logger.error('Removing dead child %s from parent %s', + self.logger.error('Removing dead child %d from parent %d', pid, os.getpid()) greenio.shutdown_safe(sock) sock.close() @@ -639,6 +771,9 @@ class WorkersStrategy(StrategyBase): for sock in self.tracking_data.values(): yield sock + def get_worker_pids(self): + return list(self.tracking_data.keys()) + class ServersPerPortStrategy(StrategyBase): """ @@ -786,14 +921,23 @@ class ServersPerPortStrategy(StrategyBase): :param int pid: The PID of the worker that exited. """ + if self.reload_pids.pop(pid, None): + self.logger.notice('Removing stale child %d from parent %d', + pid, os.getpid()) + return + for port_data in self.tracking_data.values(): for idx, (child_pid, sock) in enumerate(port_data): if child_pid == pid: + self.logger.error('Removing dead child %d from parent %d', + pid, os.getpid()) port_data[idx] = (None, None) greenio.shutdown_safe(sock) sock.close() return + self.logger.warning('Ignoring wait() result from unknown PID %d', pid) + def iter_sockets(self): """ Yields all known listen sockets. @@ -803,6 +947,12 @@ class ServersPerPortStrategy(StrategyBase): for _pid, sock in port_data: yield sock + def get_worker_pids(self): + return [ + pid + for port_data in self.tracking_data.values() + for pid, _sock in port_data] + def check_config(conf_path, app_section, *args, **kwargs): # Load configuration, Set logger and Load request processor @@ -1000,24 +1150,29 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): # then the old server can't actually ever exit. strategy.set_close_on_exec_on_listen_sockets() read_fd, write_fd = os.pipe() + state_rfd, state_wfd = os.pipe() orig_server_pid = os.getpid() child_pid = os.fork() if child_pid: # parent; set env var for fds and reexec ourselves os.close(read_fd) + os.close(state_wfd) os.putenv(NOTIFY_FD_ENV_KEY, str(write_fd)) + os.putenv(CHILD_STATE_FD_ENV_KEY, str(state_rfd)) myself = os.path.realpath(sys.argv[0]) logger.info("Old server PID=%d re'execing as: %r", orig_server_pid, [myself] + list(sys.argv)) if hasattr(os, 'set_inheritable'): # See https://www.python.org/dev/peps/pep-0446/ os.set_inheritable(write_fd, True) + os.set_inheritable(state_rfd, True) os.execv(myself, sys.argv) # nosec B606 logger.error('Somehow lived past os.execv()?!') exit('Somehow lived past os.execv()?!') elif child_pid == 0: # child os.close(write_fd) + os.close(state_rfd) logger.info('Old server temporary child PID=%d waiting for ' "re-exec'ed PID=%d to signal readiness...", os.getpid(), orig_server_pid) @@ -1032,6 +1187,16 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): logger.info('Old server temporary child PID=%d notified ' 'to shutdown old listen sockets by PID=%s', os.getpid(), got_pid) + # Ensure new process knows about old children + stale_pids = dict(strategy.reload_pids) + stale_pids[os.getpid()] = now = time.time() + stale_pids.update({ + pid: now for pid in strategy.get_worker_pids()}) + data = json.dumps({ + "old_pids": stale_pids, + }).encode('ascii') + os.write(state_wfd, struct.pack('!I', len(data)) + data) + os.close(state_wfd) else: logger.warning('Old server temporary child PID=%d *NOT* ' 'notified to shutdown old listen sockets; ' diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 8e2f3b0692..39b2eae43e 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -2826,6 +2826,9 @@ cluster_dfw1 = http://dfw1.host/v1/ self.assertRaises( TypeError, md5, None, usedforsecurity=False) + def test_get_my_ppid(self): + self.assertEqual(os.getppid(), utils.get_ppid(os.getpid())) + class TestUnlinkOlder(unittest.TestCase): @@ -4715,6 +4718,29 @@ class TestDistributeEvenly(unittest.TestCase): self.assertEqual(out, [[0], [1], [2], [3], [4], [], []]) +@mock.patch('swift.common.utils.open') +class TestGetPpid(unittest.TestCase): + def test_happy_path(self, mock_open): + mock_open.return_value.__enter__().read.return_value = \ + 'pid comm stat 456 see the procfs(5) man page for more info\n' + self.assertEqual(utils.get_ppid(123), 456) + self.assertIn(mock.call('/proc/123/stat'), mock_open.mock_calls) + + def test_not_found(self, mock_open): + mock_open.side_effect = IOError(errno.ENOENT, "Not there") + with self.assertRaises(OSError) as caught: + utils.get_ppid(123) + self.assertEqual(caught.exception.errno, errno.ESRCH) + self.assertEqual(mock_open.mock_calls[0], mock.call('/proc/123/stat')) + + def test_not_allowed(self, mock_open): + mock_open.side_effect = OSError(errno.EPERM, "Not for you") + with self.assertRaises(OSError) as caught: + utils.get_ppid(123) + self.assertEqual(caught.exception.errno, errno.EPERM) + self.assertEqual(mock_open.mock_calls[0], mock.call('/proc/123/stat')) + + class TestShardName(unittest.TestCase): def test(self): ts = utils.Timestamp.now() diff --git a/test/unit/common/test_wsgi.py b/test/unit/common/test_wsgi.py index e579edd045..b6be222da4 100644 --- a/test/unit/common/test_wsgi.py +++ b/test/unit/common/test_wsgi.py @@ -17,10 +17,14 @@ import configparser import errno +import json import logging +import signal import socket +import struct import unittest import os +import eventlet from collections import defaultdict from io import BytesIO @@ -1274,6 +1278,79 @@ class CommonTestMixin(object): mock.call(self.logger), ], mock_capture.mock_calls) + def test_stale_pid_loading(self): + class FakeTime(object): + def __init__(self, step=10): + self.patchers = [ + mock.patch('swift.common.wsgi.time.time', + side_effect=self.time), + mock.patch('swift.common.wsgi.sleep', + side_effect=self.sleep), + ] + self.now = 0 + self.step = step + self.sleeps = [] + + def time(self): + self.now += self.step + return self.now + + def sleep(self, delta): + if delta < 0: + raise ValueError('cannot sleep negative time: %s' % delta) + self.now += delta + self.sleeps.append(delta) + + def __enter__(self): + for patcher in self.patchers: + patcher.start() + return self + + def __exit__(self, *a): + for patcher in self.patchers: + patcher.stop() + + notify_rfd, notify_wfd = os.pipe() + state_rfd, state_wfd = os.pipe() + stale_process_data = { + "old_pids": {123: 5, 456: 6, 78: 27, 90: 28}, + } + to_write = json.dumps(stale_process_data).encode('ascii') + os.write(state_wfd, struct.pack('!I', len(to_write)) + to_write) + os.close(state_wfd) + self.assertEqual(self.strategy.reload_pids, {}) + os.environ['__SWIFT_SERVER_NOTIFY_FD'] = str(notify_wfd) + os.environ['__SWIFT_SERVER_CHILD_STATE_FD'] = str(state_rfd) + with mock.patch('swift.common.wsgi.capture_stdio'), \ + mock.patch('swift.common.utils.get_ppid') as mock_ppid, \ + mock.patch('os.kill') as mock_kill, FakeTime() as fake_time: + mock_ppid.side_effect = [ + os.getpid(), + OSError(errno.ENOENT, "Not there"), + OSError(errno.EPERM, "Not for you"), + os.getpid(), + ] + self.strategy.signal_ready() + self.assertEqual(self.strategy.reload_pids, + stale_process_data['old_pids']) + + # We spawned our child-killer, but it hasn't been scheduled yet + self.assertEqual(mock_ppid.mock_calls, []) + self.assertEqual(mock_kill.mock_calls, []) + self.assertEqual(fake_time.sleeps, []) + + # *Now* we let it run (with mocks still enabled) + eventlet.sleep() + + self.assertEqual(str(os.getpid()).encode('ascii'), + os.read(notify_rfd, 30)) + os.close(notify_rfd) + + self.assertEqual(mock_kill.mock_calls, [ + mock.call(123, signal.SIGKILL), + mock.call(90, signal.SIGKILL)]) + self.assertEqual(fake_time.sleeps, [86395, 2]) + class TestServersPerPortStrategy(unittest.TestCase, CommonTestMixin): def setUp(self):