From 1107f24179c0c6fdcb58771f3a6e6c025352b5d3 Mon Sep 17 00:00:00 2001 From: Darrell Bishop Date: Fri, 25 Oct 2019 12:48:36 -0700 Subject: [PATCH] Seamlessly reload servers with SIGUSR1 Swift servers can now be seamlessly reloaded by sending them a SIGUSR1 (instead of a SIGHUP). The server forks off a synchronized child to wait to close the old listen socket(s) until the new server has started up and bound its listen socket(s). The new server is exec'ed from the old one so its PID doesn't change. This makes Systemd happier, so a ReloadExec= stanza can now be used. The seamless part means that incoming connections will alwyas get accepted either by the old server or the new one. This eliminates client-perceived "downtime" during server reloads, while allowing the server to fully reload, re-reading configuration, becoming a fresh Python interpreter instance, etc. The SO_REUSEPORT socket option has already been getting used, so nothing had to change there. This patch also includes a non-invasive fix for a current eventlet bug; see https://github.com/eventlet/eventlet/pull/590 That bug prevents a SIGHUP "reload" from properly servicing existing requests before old worker processes close sockets and exit. The existing probtests missed this, but the new ones, in this patch, caught it. New probe tests cover both old SIGHUP "reload" behavior as well as the new SIGUSR1 seamless reload behavior. Change-Id: I3e5229d2fb04be67e53533ff65b0870038accbb7 --- doc/manpages/swift-init.1 | 1 + doc/source/admin_guide.rst | 35 ++-- swift/common/daemon.py | 1 + swift/common/manager.py | 20 ++ swift/common/utils.py | 14 +- swift/common/wsgi.py | 184 ++++++++++++++---- test/probe/test_signals.py | 336 ++++++++++++++++++++++++++++----- test/unit/common/test_utils.py | 54 +++--- test/unit/common/test_wsgi.py | 67 +++---- tox.ini | 5 +- 10 files changed, 549 insertions(+), 168 deletions(-) diff --git a/doc/manpages/swift-init.1 b/doc/manpages/swift-init.1 index 95b67ab5c1..c056e04fea 100644 --- a/doc/manpages/swift-init.1 +++ b/doc/manpages/swift-init.1 @@ -87,6 +87,7 @@ allows one to use the keywords such as "all", "main" and "rest" for the .IP "\fIno-wait\fR: \t\t\t spawn server and return immediately" .IP "\fIonce\fR: \t\t\t start server and run one pass on supporting daemons" .IP "\fIreload\fR: \t\t\t graceful shutdown then restart on supporting servers" +.IP "\fIreload-seamless\fR: \t\t reload supporting servers with no downtime" .IP "\fIrestart\fR: \t\t\t stops then restarts server" .IP "\fIshutdown\fR: \t\t allow current requests to finish on supporting servers" .IP "\fIstart\fR: \t\t\t starts a server" diff --git a/doc/source/admin_guide.rst b/doc/source/admin_guide.rst index d0b74f7b17..747bfdb11c 100644 --- a/doc/source/admin_guide.rst +++ b/doc/source/admin_guide.rst @@ -1362,20 +1362,29 @@ Swift services are generally managed with ``swift-init``. the general usage is ``swift-init ``, where service is the Swift service to manage (for example object, container, account, proxy) and command is one of: -========== =============================================== -Command Description ----------- ----------------------------------------------- -start Start the service -stop Stop the service -restart Restart the service -shutdown Attempt to gracefully shutdown the service -reload Attempt to gracefully restart the service -========== =============================================== +=============== =============================================== +Command Description +--------------- ----------------------------------------------- +start Start the service +stop Stop the service +restart Restart the service +shutdown Attempt to gracefully shutdown the service +reload Attempt to gracefully restart the service +reload-seamless Attempt to seamlessly restart the service +=============== =============================================== -A graceful shutdown or reload will finish any current requests before -completely stopping the old service. There is also a special case of -``swift-init all ``, which will run the command for all swift -services. +A graceful shutdown or reload will allow all server workers to finish any +current requests before exiting. The parent server process exits immediately. + +A seamless reload will make new configuration settings active, with no window +where client requests fail due to there being no active listen socket. +The parent server process will re-exec itself, retaining its existing PID. +After the re-exec'ed parent server process binds its listen sockets, the old +listen sockets are closed and old server workers finish any current requests +before exiting. + +There is also a special case of ``swift-init all ``, which will run +the command for all swift services. In cases where there are multiple configs for a service, a specific config can be managed with ``swift-init . ``. diff --git a/swift/common/daemon.py b/swift/common/daemon.py index 7e7c4cd8fb..53b4099090 100644 --- a/swift/common/daemon.py +++ b/swift/common/daemon.py @@ -132,6 +132,7 @@ class DaemonStrategy(object): def setup(self, **kwargs): utils.validate_configuration() utils.drop_privileges(self.daemon.conf.get('user', 'swift')) + utils.clean_up_daemon_hygiene() utils.capture_stdio(self.logger, **kwargs) def kill_children(*args): diff --git a/swift/common/manager.py b/swift/common/manager.py index 47f47d03b0..698fd4cb1e 100644 --- a/swift/common/manager.py +++ b/swift/common/manager.py @@ -46,6 +46,7 @@ REST_SERVERS = [s for s in ALL_SERVERS if s not in MAIN_SERVERS] # aliases mapping ALIASES = {'all': ALL_SERVERS, 'main': MAIN_SERVERS, 'rest': REST_SERVERS} GRACEFUL_SHUTDOWN_SERVERS = MAIN_SERVERS +SEAMLESS_SHUTDOWN_SERVERS = MAIN_SERVERS START_ONCE_SERVERS = REST_SERVERS # These are servers that match a type (account-*, container-*, object-*) but # don't use that type-server.conf file and instead use their own. @@ -365,6 +366,21 @@ class Manager(object): status += m.start(**kwargs) return status + @command + def reload_seamless(self, **kwargs): + """seamlessly re-exec, then shutdown of old listen sockets on + supporting servers + """ + kwargs.pop('graceful', None) + kwargs['seamless'] = True + status = 0 + for server in self.servers: + signaled_pids = server.stop(**kwargs) + if not signaled_pids: + print(_('No %s running') % server) + status += 1 + return status + @command def force_reload(self, **kwargs): """alias for reload @@ -628,13 +644,17 @@ class Server(object): """Kill running pids :param graceful: if True, attempt SIGHUP on supporting servers + :param seamless: if True, attempt SIGUSR1 on supporting servers :returns: a dict mapping pids (ints) to pid_files (paths) """ graceful = kwargs.get('graceful') + seamless = kwargs.get('seamless') if graceful and self.server in GRACEFUL_SHUTDOWN_SERVERS: sig = signal.SIGHUP + elif seamless and self.server in SEAMLESS_SHUTDOWN_SERVERS: + sig = signal.SIGUSR1 else: sig = signal.SIGTERM return self.signal_pids(sig, **kwargs) diff --git a/swift/common/utils.py b/swift/common/utils.py index c2499821f4..97d78444f8 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2453,7 +2453,7 @@ def get_hub(): return None -def drop_privileges(user, call_setsid=True): +def drop_privileges(user): """ Sets the userid/groupid of the current process, get session leader, etc. @@ -2466,11 +2466,13 @@ def drop_privileges(user, call_setsid=True): os.setgid(user[3]) os.setuid(user[2]) os.environ['HOME'] = user[5] - if call_setsid: - try: - os.setsid() - except OSError: - pass + + +def clean_up_daemon_hygiene(): + try: + os.setsid() + except OSError: + pass os.chdir('/') # in case you need to rmdir on where you started the daemon os.umask(0o22) # ensure files are created with the correct privileges diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index 10a8b3e564..c71d412860 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -18,15 +18,18 @@ from __future__ import print_function import errno +import fcntl import os import signal -import time from swift import gettext_ as _ +import sys from textwrap import dedent +import time import eventlet import eventlet.debug -from eventlet import greenio, GreenPool, sleep, wsgi, listen, Timeout +from eventlet import greenio, GreenPool, sleep, wsgi, listen, Timeout, \ + websocket from paste.deploy import loadwsgi from eventlet.green import socket, ssl, os as green_os from io import BytesIO @@ -42,10 +45,11 @@ from swift.common.swob import Request, wsgi_quote, wsgi_unquote, \ from swift.common.utils import capture_stdio, disable_fallocate, \ drop_privileges, get_logger, NullLogger, config_true_value, \ validate_configuration, get_hub, config_auto_int_value, \ - reiterate + reiterate, clean_up_daemon_hygiene SIGNUM_TO_NAME = {getattr(signal, n): n for n in dir(signal) if n.startswith('SIG') and '_' not in n} +NOTIFY_FD_ENV_KEY = '__SWIFT_SERVER_NOTIFY_FD' # Set maximum line size of message headers to be accepted. wsgi.MAX_HEADER_LINE = constraints.MAX_HEADER_SIZE @@ -422,6 +426,13 @@ def load_app_config(conf_file): class SwiftHttpProtocol(wsgi.HttpProtocol): default_request_version = "HTTP/1.0" + def __init__(self, *args, **kwargs): + # See https://github.com/eventlet/eventlet/pull/590 + self.pre_shutdown_bugfix_eventlet = not getattr( + websocket.WebSocketWSGI, '_WSGI_APP_ALWAYS_IDLE', None) + # Note this is not a new-style class, so super() won't work + wsgi.HttpProtocol.__init__(self, *args, **kwargs) + def log_request(self, *a): """ Turn off logging requests by the underlying WSGI software. @@ -528,6 +539,23 @@ class SwiftHttpProtocol(wsgi.HttpProtocol): b'HTTP/1.1 100 Continue\r\n' return environ + def _read_request_line(self): + # Note this is not a new-style class, so super() won't work + got = wsgi.HttpProtocol._read_request_line(self) + # See https://github.com/eventlet/eventlet/pull/590 + if self.pre_shutdown_bugfix_eventlet: + self.conn_state[2] = wsgi.STATE_REQUEST + return got + + def handle_one_request(self): + # Note this is not a new-style class, so super() won't work + got = wsgi.HttpProtocol.handle_one_request(self) + # See https://github.com/eventlet/eventlet/pull/590 + if self.pre_shutdown_bugfix_eventlet: + if self.conn_state[2] != wsgi.STATE_CLOSE: + self.conn_state[2] = wsgi.STATE_IDLE + return got + class SwiftHttpProxiedProtocol(SwiftHttpProtocol): """ @@ -662,7 +690,36 @@ def run_server(conf, logger, sock, global_conf=None): pool.waitall() -class WorkersStrategy(object): +class StrategyBase(object): + """ + Some operations common to all strategy classes. + """ + + def shutdown_sockets(self): + """ + Shutdown any listen sockets. + """ + + for sock in self.iter_sockets(): + greenio.shutdown_safe(sock) + sock.close() + + def set_close_on_exec_on_listen_sockets(self): + """ + Set the close-on-exec flag on any listen sockets. + """ + + for sock in self.iter_sockets(): + if six.PY2: + fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC) + else: + # Python 3.4 and later default to sockets having close-on-exec + # set (what PEP 0446 calls "non-inheritable"). This new method + # on socket objects is provided to toggle it. + sock.set_inheritable(False) + + +class WorkersStrategy(StrategyBase): """ WSGI server management strategy object for a single bind port and listen socket shared by a configured number of forked-off workers. @@ -695,8 +752,7 @@ class WorkersStrategy(object): def do_bind_ports(self): """ - Bind the one listen socket for this strategy and drop privileges - (since the parent process will never need to bind again). + Bind the one listen socket for this strategy. """ try: @@ -705,7 +761,6 @@ class WorkersStrategy(object): msg = 'bind_port wasn\'t properly set in the config file. ' \ 'It must be explicitly set to a valid port number.' return msg - drop_privileges(self.conf.get('user', 'swift')) def no_fork_sock(self): """ @@ -766,20 +821,27 @@ class WorkersStrategy(object): """ Called when a worker has exited. + NOTE: a re-exec'ed server can reap the dead worker PIDs from the old + server process that is being replaced as part of a service reload + (SIGUSR1). So we need to be robust to getting some unknown PID here. + :param int pid: The PID of the worker that exited. """ - self.logger.error('Removing dead child %s from parent %s', - pid, os.getpid()) - self.children.remove(pid) + if pid in self.children: + self.logger.error('Removing dead child %s from parent %s', + pid, os.getpid()) + self.children.remove(pid) + else: + self.logger.info('Ignoring wait() result from unknown PID %s', pid) - def shutdown_sockets(self): + def iter_sockets(self): """ - Shutdown any listen sockets. + Yields all known listen sockets. """ - greenio.shutdown_safe(self.sock) - self.sock.close() + if self.sock: + yield self.sock class PortPidState(object): @@ -901,7 +963,7 @@ class PortPidState(object): self.sock_data_by_port[dead_port]['pids'][server_idx] = None -class ServersPerPortStrategy(object): +class ServersPerPortStrategy(StrategyBase): """ WSGI server management strategy object for an object-server with one listen port per unique local port in the storage policy rings. The @@ -948,28 +1010,13 @@ class ServersPerPortStrategy(object): def do_bind_ports(self): """ - Bind one listen socket per unique local storage policy ring port. Then - do all the work of drop_privileges except the actual dropping of - privileges (each forked-off worker will do that post-fork in - :py:meth:`post_fork_hook`). + Bind one listen socket per unique local storage policy ring port. """ self._reload_bind_ports() for port in self.bind_ports: self._bind_port(port) - # The workers strategy drops privileges here, which we obviously cannot - # do if we want to support binding to low ports. But we do want some - # of the actions that drop_privileges did. - try: - os.setsid() - except OSError: - pass - # In case you need to rmdir where you started the daemon: - os.chdir('/') - # Ensure files are created with the correct privileges: - os.umask(0o22) - def no_fork_sock(self): """ This strategy does not support running in the foreground. @@ -1030,7 +1077,7 @@ class ServersPerPortStrategy(object): to drop privileges. """ - drop_privileges(self.conf.get('user', 'swift'), call_setsid=False) + drop_privileges(self.conf.get('user', 'swift')) def log_sock_exit(self, sock, server_idx): """ @@ -1050,6 +1097,7 @@ class ServersPerPortStrategy(object): :py:meth:`new_worker_socks`. :param int pid: The new worker process' PID """ + port = self.port_pid_state.port_for_sock(sock) self.logger.notice('Started child %d (PID %d) for port %d', server_idx, pid, port) @@ -1064,14 +1112,13 @@ class ServersPerPortStrategy(object): self.port_pid_state.forget_pid(pid) - def shutdown_sockets(self): + def iter_sockets(self): """ - Shutdown any listen sockets. + Yields all known listen sockets. """ for sock in self.port_pid_state.all_socks(): - greenio.shutdown_safe(sock) - sock.close() + yield sock def run_wsgi(conf_path, app_section, *args, **kwargs): @@ -1127,10 +1174,22 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): print(error_msg) return 1 + # Do some daemonization process hygene before we fork any children or run a + # server without forking. + clean_up_daemon_hygiene() + # Redirect errors to logger and close stdio. Do this *after* binding ports; # we use this to signal that the service is ready to accept connections. capture_stdio(logger) + # If necessary, signal an old copy of us that it's okay to shutdown its + # listen sockets now because ours are up and ready to receive connections. + reexec_signal_fd = os.getenv(NOTIFY_FD_ENV_KEY) + if reexec_signal_fd: + reexec_signal_fd = int(reexec_signal_fd) + os.write(reexec_signal_fd, str(os.getpid()).encode('utf8')) + os.close(reexec_signal_fd) + no_fork_sock = strategy.no_fork_sock() if no_fork_sock: run_server(conf, logger, no_fork_sock, global_conf=global_conf) @@ -1145,6 +1204,7 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): running_context = [True, None] signal.signal(signal.SIGTERM, stop_with_signal) signal.signal(signal.SIGHUP, stop_with_signal) + signal.signal(signal.SIGUSR1, stop_with_signal) while running_context[0]: for sock, sock_info in strategy.new_worker_socks(): @@ -1152,6 +1212,7 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): if pid == 0: signal.signal(signal.SIGHUP, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGUSR1, signal.SIG_DFL) strategy.post_fork_hook() run_server(conf, logger, sock) strategy.log_sock_exit(sock, sock_info) @@ -1196,9 +1257,58 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): logger.error('Stopping with unexpected signal %r' % running_context[1]) else: - logger.error('%s received', signame) + logger.error('%s received (%s)', signame, os.getpid()) if running_context[1] == signal.SIGTERM: os.killpg(0, signal.SIGTERM) + elif running_context[1] == signal.SIGUSR1: + # set up a pipe, fork off a child to handle cleanup later, + # and rexec ourselves with an environment variable set which will + # indicate which fd (one of the pipe ends) to write a byte to + # to indicate listen socket setup is complete. That will signal + # the forked-off child to complete its listen socket shutdown. + # + # NOTE: all strategies will now require the parent process to retain + # superuser privileges so that the re'execd process can bind a new + # socket to the configured IP & port(s). We can't just reuse existing + # listen sockets because then the bind IP couldn't be changed. + # + # NOTE: we need to set all our listen sockets close-on-exec so the only + # open reference to those file descriptors will be in the forked-off + # child here who waits to shutdown the old server's listen sockets. If + # the re-exec'ed server's old listen sockets aren't closed-on-exec, + # then the old server can't actually ever exit. + strategy.set_close_on_exec_on_listen_sockets() + read_fd, write_fd = os.pipe() + orig_server_pid = os.getpid() + child_pid = os.fork() + if child_pid: + # parent; set env var for fds and reexec ourselves + os.close(read_fd) + os.putenv(NOTIFY_FD_ENV_KEY, str(write_fd)) + myself = os.path.realpath(sys.argv[0]) + logger.info("Old server PID=%d re'execing as: %r", + orig_server_pid, [myself] + list(sys.argv)) + os.execv(myself, sys.argv) + logger.error('Somehow lived past os.execv()?!') + exit('Somehow lived past os.execv()?!') + elif child_pid == 0: + # child + os.close(write_fd) + logger.info('Old server temporary child PID=%d waiting for ' + "re-exec'ed PID=%d to signal readiness...", + os.getpid(), orig_server_pid) + try: + got_pid = os.read(read_fd, 30) + logger.info('Old server temporary child PID=%d notified ' + 'to shutdown old listen sockets by PID=%s', + os.getpid(), got_pid) + except Exception as e: + logger.warning('Unexpected exception while reading from ' + 'pipe:', exc_info=True) + try: + os.close(read_fd) + except Exception: + pass strategy.shutdown_sockets() signal.signal(signal.SIGTERM, signal.SIG_IGN) diff --git a/test/probe/test_signals.py b/test/probe/test_signals.py index bfc6e299a3..dbb3b01f45 100644 --- a/test/probe/test_signals.py +++ b/test/probe/test_signals.py @@ -16,10 +16,14 @@ import unittest -import random from contextlib import contextmanager - import eventlet +import json +import os +import random +import shutil +import time +from uuid import uuid4 from six.moves import http_client as httplib @@ -27,7 +31,7 @@ from swift.common.storage_policy import POLICIES from swift.common.ring import Ring from swift.common.manager import Manager -from test.probe.common import resetswift +from test.probe.common import resetswift, ReplProbeTest, client def putrequest(conn, method, path, headers): @@ -39,77 +43,311 @@ def putrequest(conn, method, path, headers): conn.endheaders() -class TestWSGIServerProcessHandling(unittest.TestCase): +def get_server_and_worker_pids(manager, old_workers=None): + # Gets all the server parent pids, as well as the set of all worker PIDs + # (i.e. any PID whose PPID is in the set of parent pids). + server_pid_set = {pid for server in manager.servers + for (_, pid) in server.iter_pid_files()} + children_pid_set = set() + old_worker_pid_set = set(old_workers or []) + all_pids = [int(f) for f in os.listdir('/proc') if f.isdigit()] + for pid in all_pids: + try: + with open('/proc/%d/status' % pid, 'r') as fh: + for line in fh: + if line.startswith('PPid:\t'): + ppid = int(line[6:]) + if ppid in server_pid_set or pid in old_worker_pid_set: + children_pid_set.add(pid) + break + except Exception: + # No big deal, a process could have exited since we listed /proc, + # so we just ignore errors + pass + return {'server': server_pid_set, 'worker': children_pid_set} + + +def wait_for_pids(manager, callback, timeout=15, old_workers=None): + # Waits up to `timeout` seconds for the supplied callback to return True + # when passed in the manager's pid set. + start_time = time.time() + + pid_sets = get_server_and_worker_pids(manager, old_workers=old_workers) + got = callback(pid_sets) + while not got and time.time() - start_time < timeout: + time.sleep(0.1) + pid_sets = get_server_and_worker_pids(manager, old_workers=old_workers) + got = callback(pid_sets) + if time.time() - start_time >= timeout: + raise AssertionError('timed out waiting for PID state; got %r' % ( + pid_sets)) + return pid_sets + + +class TestWSGIServerProcessHandling(ReplProbeTest): + # Subclasses need to define SERVER_NAME + HAS_INFO = False + PID_TIMEOUT = 25 def setUp(self): - resetswift() + super(TestWSGIServerProcessHandling, self).setUp() + self.container = 'container-%s' % uuid4() + client.put_container(self.url, self.token, self.container, + headers={'X-Storage-Policy': + self.policy.name}) + self.manager = Manager([self.SERVER_NAME]) + for server in self.manager.servers: + self.assertTrue(server.get_running_pids, + 'No running PIDs for %s' % server.cmd) + self.starting_pids = get_server_and_worker_pids(self.manager) - def _check_reload(self, server_name, ip, port): - manager = Manager([server_name]) - manager.start() + def assert4xx(self, resp): + self.assertEqual(resp.status // 100, 4) + got_body = resp.read() + try: + self.assertIn('resource could not be found', got_body) + except AssertionError: + self.assertIn('Invalid path: blah', got_body) - starting_pids = {pid for server in manager.servers - for (_, pid) in server.iter_pid_files()} + def get_conn(self): + ip, port = self.get_ip_port() + return httplib.HTTPConnection('%s:%s' % (ip, port)) - body = b'test' * 10 - conn = httplib.HTTPConnection('%s:%s' % (ip, port)) + def _check_reload(self): + conn = self.get_conn() + self.addCleanup(conn.close) # sanity request - putrequest(conn, 'PUT', 'blah', - headers={'Content-Length': len(body)}) - conn.send(body) - resp = conn.getresponse() - self.assertEqual(resp.status // 100, 4) - resp.read() + self.start_write_req(conn, 'sanity') + resp = self.finish_write_req(conn) + self.check_write_resp(resp) - # Start the request before reloading... - putrequest(conn, 'PUT', 'blah', - headers={'Content-Length': len(body)}) + if self.HAS_INFO: + self.check_info_value(8192) - manager.reload() + # Start another write request before reloading... + self.start_write_req(conn, 'across-reload') - post_reload_pids = {pid for server in manager.servers - for (_, pid) in server.iter_pid_files()} + if self.HAS_INFO: + self.swap_configs() # new server's max_header_size == 8191 - # none of the pids we started with are being tracked after reload - msg = 'expected all pids from %r to have died, but found %r' % ( - starting_pids, post_reload_pids) - self.assertFalse(starting_pids & post_reload_pids, msg) + self.do_reload() - # ... and make sure we can finish what we were doing, and even - # start part of a new request - conn.send(body) - resp = conn.getresponse() - self.assertEqual(resp.status // 100, 4) - # We can even read the body - self.assertTrue(resp.read()) + wait_for_pids(self.manager, self.make_post_reload_pid_cb(), + old_workers=self.starting_pids['worker'], + timeout=self.PID_TIMEOUT) + + # ... and make sure we can finish what we were doing + resp = self.finish_write_req(conn) + self.check_write_resp(resp) # After this, we're in a funny spot. With eventlet 0.22.0, the # connection's now closed, but with prior versions we could keep # going indefinitely. See https://bugs.launchpad.net/swift/+bug/1792615 - # Close our connection, to make sure old eventlet shuts down + # Close our connections, to make sure old eventlet shuts down conn.close() # sanity - post_close_pids = {pid for server in manager.servers - for (_, pid) in server.iter_pid_files()} - self.assertEqual(post_reload_pids, post_close_pids) + wait_for_pids(self.manager, self.make_post_close_pid_cb(), + old_workers=self.starting_pids['worker'], + timeout=self.PID_TIMEOUT) - def test_proxy_reload(self): - self._check_reload('proxy-server', 'localhost', 8080) + if self.HAS_INFO: + self.check_info_value(8191) - def test_object_reload(self): + +class OldReloadMixin(object): + def make_post_reload_pid_cb(self): + def _cb(post_reload_pids): + # We expect all old server PIDs to be gone, a new server present, + # and for there to be exactly 1 old worker PID plus additional new + # worker PIDs. + old_servers_dead = not (self.starting_pids['server'] & + post_reload_pids['server']) + one_old_worker = 1 == len(self.starting_pids['worker'] & + post_reload_pids['worker']) + new_workers_present = (post_reload_pids['worker'] - + self.starting_pids['worker']) + return (post_reload_pids['server'] and old_servers_dead and + one_old_worker and new_workers_present) + return _cb + + def make_post_close_pid_cb(self): + def _cb(post_close_pids): + # We expect all old server PIDs to be gone, a new server present, + # no old worker PIDs, and additional new worker PIDs. + old_servers_dead = not (self.starting_pids['server'] & + post_close_pids['server']) + old_workers_dead = not (self.starting_pids['worker'] & + post_close_pids['worker']) + new_workers_present = (post_close_pids['worker'] - + self.starting_pids['worker']) + return (post_close_pids['server'] and old_servers_dead and + old_workers_dead and new_workers_present) + return _cb + + def do_reload(self): + self.manager.reload() + + +class SeamlessReloadMixin(object): + def make_post_reload_pid_cb(self): + def _cb(post_reload_pids): + # We expect all orig server PIDs to STILL BE PRESENT, no new server + # present, and for there to be exactly 1 old worker PID plus + # additional new worker PIDs. + same_servers = (self.starting_pids['server'] == + post_reload_pids['server']) + one_old_worker = 1 == len(self.starting_pids['worker'] & + post_reload_pids['worker']) + new_workers_present = (post_reload_pids['worker'] - + self.starting_pids['worker']) + return (post_reload_pids['server'] and same_servers and + one_old_worker and new_workers_present) + return _cb + + def make_post_close_pid_cb(self): + def _cb(post_close_pids): + # We expect all orig server PIDs to STILL BE PRESENT, no new server + # present, no old worker PIDs, and additional new worker PIDs. + same_servers = (self.starting_pids['server'] == + post_close_pids['server']) + old_workers_dead = not (self.starting_pids['worker'] & + post_close_pids['worker']) + new_workers_present = (post_close_pids['worker'] - + self.starting_pids['worker']) + return (post_close_pids['server'] and same_servers and + old_workers_dead and new_workers_present) + return _cb + + def do_reload(self): + self.manager.reload_seamless() + + +class TestObjectServerReloadBase(TestWSGIServerProcessHandling): + SERVER_NAME = 'object' + PID_TIMEOUT = 35 + + def get_ip_port(self): policy = random.choice(list(POLICIES)) policy.load_ring('/etc/swift') - node = random.choice(policy.object_ring.get_part_nodes(1)) - self._check_reload('object', node['ip'], node['port']) + self.ring_node = random.choice(policy.object_ring.get_part_nodes(1)) + return self.ring_node['ip'], self.ring_node['port'] - def test_account_container_reload(self): - for server in ('account', 'container'): - ring = Ring('/etc/swift', ring_name=server) - node = random.choice(ring.get_part_nodes(1)) - self._check_reload(server, node['ip'], node['port']) + def start_write_req(self, conn, suffix): + putrequest(conn, 'PUT', '/%s/123/%s/%s/blah-%s' % ( + self.ring_node['device'], self.account, self.container, suffix), + headers={'X-Timestamp': str(time.time()), + 'Content-Type': 'application/octet-string', + 'Content-Length': len(self.BODY)}) + + def finish_write_req(self, conn): + conn.send(self.BODY) + return conn.getresponse() + + def check_write_resp(self, resp): + got_body = resp.read() + self.assertEqual(resp.status // 100, 2, 'Got status %d; %r' % + (resp.status, got_body)) + self.assertEqual('', got_body) + return resp + + +class TestObjectServerReload(OldReloadMixin, TestObjectServerReloadBase): + BODY = 'test-object' * 10 + + def test_object_reload(self): + self._check_reload() + + +class TestObjectServerReloadSeamless(SeamlessReloadMixin, + TestObjectServerReloadBase): + BODY = 'test-object' * 10 + + def test_object_reload_seamless(self): + self._check_reload() + + +class TestProxyServerReloadBase(TestWSGIServerProcessHandling): + SERVER_NAME = 'proxy-server' + HAS_INFO = True + + def setUp(self): + super(TestProxyServerReloadBase, self).setUp() + self.swift_conf_path = '/etc/swift/swift.conf' + self.new_swift_conf_path = self.swift_conf_path + '.new' + self.saved_swift_conf_path = self.swift_conf_path + '.orig' + shutil.copy(self.swift_conf_path, self.saved_swift_conf_path) + shutil.copy(self.swift_conf_path, self.new_swift_conf_path) + with open(self.new_swift_conf_path, 'a+') as fh: + fh.seek(0, os.SEEK_END) + fh.write('\n[swift-constraints]\nmax_header_size = 8191\n') + fh.flush() + + def tearDown(self): + shutil.move(self.saved_swift_conf_path, self.swift_conf_path) + try: + os.unlink(self.new_swift_conf_path) + except OSError: + pass + super(TestProxyServerReloadBase, self).tearDown() + + def swap_configs(self): + shutil.copy(self.new_swift_conf_path, self.swift_conf_path) + + def get_ip_port(self): + return 'localhost', 8080 + + def assertMaxHeaderSize(self, resp, exp_max_header_size): + self.assertEqual(resp.status // 100, 2) + info_dict = json.loads(resp.read()) + self.assertEqual(exp_max_header_size, + info_dict['swift']['max_header_size']) + + def check_info_value(self, expected_value): + # show that we're talking to the original server with the default + # max_header_size == 8192 + conn2 = self.get_conn() + putrequest(conn2, 'GET', '/info', + headers={'Content-Length': '0', + 'Accept': 'application/json'}) + conn2.send('') + resp = conn2.getresponse() + self.assertMaxHeaderSize(resp, expected_value) + conn2.close() + + def start_write_req(self, conn, suffix): + putrequest(conn, 'PUT', '/v1/%s/%s/blah-%s' % ( + self.account, self.container, suffix), + headers={'X-Auth-Token': self.token, + 'Content-Length': len(self.BODY)}) + + def finish_write_req(self, conn): + conn.send(self.BODY) + return conn.getresponse() + + def check_write_resp(self, resp): + got_body = resp.read() + self.assertEqual(resp.status // 100, 2, 'Got status %d; %r' % + (resp.status, got_body)) + self.assertEqual('', got_body) + return resp + + +class TestProxyServerReload(OldReloadMixin, TestProxyServerReloadBase): + BODY = 'proxy' * 10 + + def test_proxy_reload(self): + self._check_reload() + + +class TestProxyServerReloadSeamless(SeamlessReloadMixin, + TestProxyServerReloadBase): + BODY = 'proxy-seamless' * 10 + + def test_proxy_reload_seamless(self): + self._check_reload() @contextmanager diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 8f26565e12..fa8db26d5b 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -2285,15 +2285,16 @@ log_name = %(yarr)s''' } self.assertEqual(conf, expected) - def _check_drop_privileges(self, mock_os, required_func_calls, - call_setsid=True): + def test_drop_privileges(self): + required_func_calls = ('setgroups', 'setgid', 'setuid') + mock_os = MockOs(called_funcs=required_func_calls) user = getuser() user_data = pwd.getpwnam(user) self.assertFalse(mock_os.called_funcs) # sanity check # over-ride os with mock with mock.patch('swift.common.utils.os', mock_os): # exercise the code - utils.drop_privileges(user, call_setsid=call_setsid) + utils.drop_privileges(user) for func in required_func_calls: self.assertIn(func, mock_os.called_funcs) @@ -2302,34 +2303,41 @@ log_name = %(yarr)s''' self.assertEqual(groups, set(mock_os.called_funcs['setgroups'][0])) self.assertEqual(user_data[3], mock_os.called_funcs['setgid'][0]) self.assertEqual(user_data[2], mock_os.called_funcs['setuid'][0]) - self.assertEqual('/', mock_os.called_funcs['chdir'][0]) - self.assertEqual(0o22, mock_os.called_funcs['umask'][0]) - def test_drop_privileges(self): - required_func_calls = ('setgroups', 'setgid', 'setuid', 'setsid', - 'chdir', 'umask') + def test_drop_privileges_no_setgroups(self): + required_func_calls = ('geteuid', 'setgid', 'setuid') mock_os = MockOs(called_funcs=required_func_calls) - self._check_drop_privileges(mock_os, required_func_calls) + user = getuser() + user_data = pwd.getpwnam(user) + self.assertFalse(mock_os.called_funcs) # sanity check + # over-ride os with mock + with mock.patch('swift.common.utils.os', mock_os): + # exercise the code + utils.drop_privileges(user) - def test_drop_privileges_setsid_error(self): - # OSError trying to get session leader - required_func_calls = ('setgroups', 'setgid', 'setuid', 'setsid', - 'chdir', 'umask') - mock_os = MockOs(called_funcs=required_func_calls, - raise_funcs=('setsid',)) - self._check_drop_privileges(mock_os, required_func_calls) + for func in required_func_calls: + self.assertIn(func, mock_os.called_funcs) + self.assertNotIn('setgroups', mock_os.called_funcs) + self.assertEqual(user_data[5], mock_os.environ['HOME']) + self.assertEqual(user_data[3], mock_os.called_funcs['setgid'][0]) + self.assertEqual(user_data[2], mock_os.called_funcs['setuid'][0]) - def test_drop_privileges_no_call_setsid(self): - required_func_calls = ('setgroups', 'setgid', 'setuid', 'chdir', - 'umask') - # OSError if trying to get session leader, but it shouldn't be called + def test_clean_up_daemon_hygene(self): + required_func_calls = ('chdir', 'umask') + # OSError if trying to get session leader, but setsid() OSError is + # ignored by the code under test. bad_func_calls = ('setsid',) mock_os = MockOs(called_funcs=required_func_calls, raise_funcs=bad_func_calls) - self._check_drop_privileges(mock_os, required_func_calls, - call_setsid=False) + with mock.patch('swift.common.utils.os', mock_os): + # exercise the code + utils.clean_up_daemon_hygiene() + for func in required_func_calls: + self.assertIn(func, mock_os.called_funcs) for func in bad_func_calls: - self.assertNotIn(func, mock_os.called_funcs) + self.assertIn(func, mock_os.called_funcs) + self.assertEqual('/', mock_os.called_funcs['chdir'][0]) + self.assertEqual(0o22, mock_os.called_funcs['umask'][0]) @reset_logger_state def test_capture_stdio(self): diff --git a/test/unit/common/test_wsgi.py b/test/unit/common/test_wsgi.py index dd3ceff258..b72c415505 100644 --- a/test/unit/common/test_wsgi.py +++ b/test/unit/common/test_wsgi.py @@ -836,7 +836,8 @@ class TestWSGI(unittest.TestCase): with mock.patch.object(wsgi, '_initrp', _initrp), \ mock.patch.object(wsgi, 'get_socket'), \ - mock.patch.object(wsgi, 'drop_privileges'), \ + mock.patch.object(wsgi, 'drop_privileges') as _d_privs, \ + mock.patch.object(wsgi, 'clean_up_daemon_hygiene') as _c_hyg, \ mock.patch.object(wsgi, 'loadapp', _loadapp), \ mock.patch.object(wsgi, 'capture_stdio'), \ mock.patch.object(wsgi, 'run_server'), \ @@ -849,6 +850,10 @@ class TestWSGI(unittest.TestCase): socket=True, select=True, thread=True) + # run_wsgi() no longer calls drop_privileges() in the parent process, + # just clean_up_deemon_hygene() + self.assertEqual([], _d_privs.mock_calls) + self.assertEqual([mock.call()], _c_hyg.mock_calls) @mock.patch('swift.common.wsgi.run_server') @mock.patch('swift.common.wsgi.WorkersStrategy') @@ -1353,36 +1358,11 @@ class TestServersPerPortStrategy(unittest.TestCase): 6006, self.strategy.port_pid_state.port_for_sock(self.s1)) self.assertEqual( 6007, self.strategy.port_pid_state.port_for_sock(self.s2)) - self.assertEqual([mock.call()], self.mock_setsid.mock_calls) - self.assertEqual([mock.call('/')], self.mock_chdir.mock_calls) - self.assertEqual([mock.call(0o22)], self.mock_umask.mock_calls) - - def test_bind_ports_ignores_setsid_errors(self): - self.mock_setsid.side_effect = OSError() - self.strategy.do_bind_ports() - - self.assertEqual(set((6006, 6007)), self.strategy.bind_ports) - self.assertEqual([ - mock.call({'workers': 100, # ignored - 'user': 'bob', - 'swift_dir': '/jim/cricket', - 'ring_check_interval': '76', - 'bind_ip': '2.3.4.5', - 'bind_port': 6006}), - mock.call({'workers': 100, # ignored - 'user': 'bob', - 'swift_dir': '/jim/cricket', - 'ring_check_interval': '76', - 'bind_ip': '2.3.4.5', - 'bind_port': 6007}), - ], self.mock_get_socket.mock_calls) - self.assertEqual( - 6006, self.strategy.port_pid_state.port_for_sock(self.s1)) - self.assertEqual( - 6007, self.strategy.port_pid_state.port_for_sock(self.s2)) - self.assertEqual([mock.call()], self.mock_setsid.mock_calls) - self.assertEqual([mock.call('/')], self.mock_chdir.mock_calls) - self.assertEqual([mock.call(0o22)], self.mock_umask.mock_calls) + # strategy binding no longer does clean_up_deemon_hygene() actions, the + # user of the strategy does. + self.assertEqual([], self.mock_setsid.mock_calls) + self.assertEqual([], self.mock_chdir.mock_calls) + self.assertEqual([], self.mock_umask.mock_calls) def test_no_fork_sock(self): self.assertIsNone(self.strategy.no_fork_sock()) @@ -1519,7 +1499,7 @@ class TestServersPerPortStrategy(unittest.TestCase): self.strategy.post_fork_hook() self.assertEqual([ - mock.call('bob', call_setsid=False), + mock.call('bob'), ], self.mock_drop_privileges.mock_calls) def test_shutdown_sockets(self): @@ -1555,6 +1535,9 @@ class TestWorkersStrategy(unittest.TestCase): patcher = mock.patch('swift.common.wsgi.drop_privileges') self.mock_drop_privileges = patcher.start() self.addCleanup(patcher.stop) + patcher = mock.patch('swift.common.wsgi.clean_up_daemon_hygiene') + self.mock_clean_up_daemon_hygene = patcher.start() + self.addCleanup(patcher.stop) def test_loop_timeout(self): # This strategy should sit in the green.os.wait() for a bit (to avoid @@ -1569,9 +1552,10 @@ class TestWorkersStrategy(unittest.TestCase): self.assertEqual([ mock.call(self.conf), ], self.mock_get_socket.mock_calls) - self.assertEqual([ - mock.call('bob'), - ], self.mock_drop_privileges.mock_calls) + # strategy binding no longer drops privileges nor does + # clean_up_deemon_hygene() actions. + self.assertEqual([], self.mock_drop_privileges.mock_calls) + self.assertEqual([], self.mock_clean_up_daemon_hygene.mock_calls) self.mock_get_socket.side_effect = wsgi.ConfigFilePortError() @@ -1643,9 +1627,16 @@ class TestWorkersStrategy(unittest.TestCase): self.assertEqual([ mock.call.shutdown_safe(self.mock_get_socket.return_value), ], mock_greenio.mock_calls) - self.assertEqual([ - mock.call.close(), - ], self.mock_get_socket.return_value.mock_calls) + if six.PY2: + self.assertEqual([ + mock.call.__nonzero__(), + mock.call.close(), + ], self.mock_get_socket.return_value.mock_calls) + else: + self.assertEqual([ + mock.call.__bool__(), + mock.call.close(), + ], self.mock_get_socket.return_value.mock_calls) def test_log_sock_exit(self): self.strategy.log_sock_exit('blahblah', 'blahblah') diff --git a/tox.ini b/tox.ini index 292f34a83c..6044e7d047 100644 --- a/tox.ini +++ b/tox.ini @@ -9,12 +9,13 @@ install_command = pip install -U {opts} {packages} setenv = VIRTUAL_ENV={envdir} NOSE_WITH_COVERAGE=1 NOSE_COVER_BRANCHES=1 + NOSE_COVER_HTML_DIR={toxinidir}/cover deps = -c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} -r{toxinidir}/requirements.txt -r{toxinidir}/test-requirements.txt -commands = find . ( -type f -o -type l ) -name "*.py[co]" -delete - find . -type d -name "__pycache__" -delete +commands = find {envdir} ( -type f -o -type l ) -name "*.py[co]" -delete + find {envdir} -type d -name "__pycache__" -delete nosetests {posargs:test/unit} whitelist_externals = find rm