Merge "Seamlessly reload servers with SIGUSR1"
This commit is contained in:
commit
9fa0b211a9
@ -87,6 +87,7 @@ allows one to use the keywords such as "all", "main" and "rest" for the <server>
|
|||||||
.IP "\fIno-wait\fR: \t\t\t spawn server and return immediately"
|
.IP "\fIno-wait\fR: \t\t\t spawn server and return immediately"
|
||||||
.IP "\fIonce\fR: \t\t\t start server and run one pass on supporting daemons"
|
.IP "\fIonce\fR: \t\t\t start server and run one pass on supporting daemons"
|
||||||
.IP "\fIreload\fR: \t\t\t graceful shutdown then restart on supporting servers"
|
.IP "\fIreload\fR: \t\t\t graceful shutdown then restart on supporting servers"
|
||||||
|
.IP "\fIreload-seamless\fR: \t\t reload supporting servers with no downtime"
|
||||||
.IP "\fIrestart\fR: \t\t\t stops then restarts server"
|
.IP "\fIrestart\fR: \t\t\t stops then restarts server"
|
||||||
.IP "\fIshutdown\fR: \t\t allow current requests to finish on supporting servers"
|
.IP "\fIshutdown\fR: \t\t allow current requests to finish on supporting servers"
|
||||||
.IP "\fIstart\fR: \t\t\t starts a server"
|
.IP "\fIstart\fR: \t\t\t starts a server"
|
||||||
|
@ -1362,20 +1362,29 @@ Swift services are generally managed with ``swift-init``. the general usage is
|
|||||||
``swift-init <service> <command>``, where service is the Swift service to
|
``swift-init <service> <command>``, where service is the Swift service to
|
||||||
manage (for example object, container, account, proxy) and command is one of:
|
manage (for example object, container, account, proxy) and command is one of:
|
||||||
|
|
||||||
========== ===============================================
|
=============== ===============================================
|
||||||
Command Description
|
Command Description
|
||||||
---------- -----------------------------------------------
|
--------------- -----------------------------------------------
|
||||||
start Start the service
|
start Start the service
|
||||||
stop Stop the service
|
stop Stop the service
|
||||||
restart Restart the service
|
restart Restart the service
|
||||||
shutdown Attempt to gracefully shutdown the service
|
shutdown Attempt to gracefully shutdown the service
|
||||||
reload Attempt to gracefully restart the service
|
reload Attempt to gracefully restart the service
|
||||||
========== ===============================================
|
reload-seamless Attempt to seamlessly restart the service
|
||||||
|
=============== ===============================================
|
||||||
|
|
||||||
A graceful shutdown or reload will finish any current requests before
|
A graceful shutdown or reload will allow all server workers to finish any
|
||||||
completely stopping the old service. There is also a special case of
|
current requests before exiting. The parent server process exits immediately.
|
||||||
``swift-init all <command>``, which will run the command for all swift
|
|
||||||
services.
|
A seamless reload will make new configuration settings active, with no window
|
||||||
|
where client requests fail due to there being no active listen socket.
|
||||||
|
The parent server process will re-exec itself, retaining its existing PID.
|
||||||
|
After the re-exec'ed parent server process binds its listen sockets, the old
|
||||||
|
listen sockets are closed and old server workers finish any current requests
|
||||||
|
before exiting.
|
||||||
|
|
||||||
|
There is also a special case of ``swift-init all <command>``, which will run
|
||||||
|
the command for all swift services.
|
||||||
|
|
||||||
In cases where there are multiple configs for a service, a specific config
|
In cases where there are multiple configs for a service, a specific config
|
||||||
can be managed with ``swift-init <service>.<config> <command>``.
|
can be managed with ``swift-init <service>.<config> <command>``.
|
||||||
|
@ -132,6 +132,7 @@ class DaemonStrategy(object):
|
|||||||
def setup(self, **kwargs):
|
def setup(self, **kwargs):
|
||||||
utils.validate_configuration()
|
utils.validate_configuration()
|
||||||
utils.drop_privileges(self.daemon.conf.get('user', 'swift'))
|
utils.drop_privileges(self.daemon.conf.get('user', 'swift'))
|
||||||
|
utils.clean_up_daemon_hygiene()
|
||||||
utils.capture_stdio(self.logger, **kwargs)
|
utils.capture_stdio(self.logger, **kwargs)
|
||||||
|
|
||||||
def kill_children(*args):
|
def kill_children(*args):
|
||||||
|
@ -46,6 +46,7 @@ REST_SERVERS = [s for s in ALL_SERVERS if s not in MAIN_SERVERS]
|
|||||||
# aliases mapping
|
# aliases mapping
|
||||||
ALIASES = {'all': ALL_SERVERS, 'main': MAIN_SERVERS, 'rest': REST_SERVERS}
|
ALIASES = {'all': ALL_SERVERS, 'main': MAIN_SERVERS, 'rest': REST_SERVERS}
|
||||||
GRACEFUL_SHUTDOWN_SERVERS = MAIN_SERVERS
|
GRACEFUL_SHUTDOWN_SERVERS = MAIN_SERVERS
|
||||||
|
SEAMLESS_SHUTDOWN_SERVERS = MAIN_SERVERS
|
||||||
START_ONCE_SERVERS = REST_SERVERS
|
START_ONCE_SERVERS = REST_SERVERS
|
||||||
# These are servers that match a type (account-*, container-*, object-*) but
|
# These are servers that match a type (account-*, container-*, object-*) but
|
||||||
# don't use that type-server.conf file and instead use their own.
|
# don't use that type-server.conf file and instead use their own.
|
||||||
@ -365,6 +366,21 @@ class Manager(object):
|
|||||||
status += m.start(**kwargs)
|
status += m.start(**kwargs)
|
||||||
return status
|
return status
|
||||||
|
|
||||||
|
@command
|
||||||
|
def reload_seamless(self, **kwargs):
|
||||||
|
"""seamlessly re-exec, then shutdown of old listen sockets on
|
||||||
|
supporting servers
|
||||||
|
"""
|
||||||
|
kwargs.pop('graceful', None)
|
||||||
|
kwargs['seamless'] = True
|
||||||
|
status = 0
|
||||||
|
for server in self.servers:
|
||||||
|
signaled_pids = server.stop(**kwargs)
|
||||||
|
if not signaled_pids:
|
||||||
|
print(_('No %s running') % server)
|
||||||
|
status += 1
|
||||||
|
return status
|
||||||
|
|
||||||
@command
|
@command
|
||||||
def force_reload(self, **kwargs):
|
def force_reload(self, **kwargs):
|
||||||
"""alias for reload
|
"""alias for reload
|
||||||
@ -628,13 +644,17 @@ class Server(object):
|
|||||||
"""Kill running pids
|
"""Kill running pids
|
||||||
|
|
||||||
:param graceful: if True, attempt SIGHUP on supporting servers
|
:param graceful: if True, attempt SIGHUP on supporting servers
|
||||||
|
:param seamless: if True, attempt SIGUSR1 on supporting servers
|
||||||
|
|
||||||
:returns: a dict mapping pids (ints) to pid_files (paths)
|
:returns: a dict mapping pids (ints) to pid_files (paths)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
graceful = kwargs.get('graceful')
|
graceful = kwargs.get('graceful')
|
||||||
|
seamless = kwargs.get('seamless')
|
||||||
if graceful and self.server in GRACEFUL_SHUTDOWN_SERVERS:
|
if graceful and self.server in GRACEFUL_SHUTDOWN_SERVERS:
|
||||||
sig = signal.SIGHUP
|
sig = signal.SIGHUP
|
||||||
|
elif seamless and self.server in SEAMLESS_SHUTDOWN_SERVERS:
|
||||||
|
sig = signal.SIGUSR1
|
||||||
else:
|
else:
|
||||||
sig = signal.SIGTERM
|
sig = signal.SIGTERM
|
||||||
return self.signal_pids(sig, **kwargs)
|
return self.signal_pids(sig, **kwargs)
|
||||||
|
@ -2452,7 +2452,7 @@ def get_hub():
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def drop_privileges(user, call_setsid=True):
|
def drop_privileges(user):
|
||||||
"""
|
"""
|
||||||
Sets the userid/groupid of the current process, get session leader, etc.
|
Sets the userid/groupid of the current process, get session leader, etc.
|
||||||
|
|
||||||
@ -2465,7 +2465,9 @@ def drop_privileges(user, call_setsid=True):
|
|||||||
os.setgid(user[3])
|
os.setgid(user[3])
|
||||||
os.setuid(user[2])
|
os.setuid(user[2])
|
||||||
os.environ['HOME'] = user[5]
|
os.environ['HOME'] = user[5]
|
||||||
if call_setsid:
|
|
||||||
|
|
||||||
|
def clean_up_daemon_hygiene():
|
||||||
try:
|
try:
|
||||||
os.setsid()
|
os.setsid()
|
||||||
except OSError:
|
except OSError:
|
||||||
|
@ -18,15 +18,18 @@
|
|||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
|
|
||||||
import errno
|
import errno
|
||||||
|
import fcntl
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
import time
|
|
||||||
from swift import gettext_ as _
|
from swift import gettext_ as _
|
||||||
|
import sys
|
||||||
from textwrap import dedent
|
from textwrap import dedent
|
||||||
|
import time
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
import eventlet.debug
|
import eventlet.debug
|
||||||
from eventlet import greenio, GreenPool, sleep, wsgi, listen, Timeout
|
from eventlet import greenio, GreenPool, sleep, wsgi, listen, Timeout, \
|
||||||
|
websocket
|
||||||
from paste.deploy import loadwsgi
|
from paste.deploy import loadwsgi
|
||||||
from eventlet.green import socket, ssl, os as green_os
|
from eventlet.green import socket, ssl, os as green_os
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
@ -42,10 +45,11 @@ from swift.common.swob import Request, wsgi_quote, wsgi_unquote, \
|
|||||||
from swift.common.utils import capture_stdio, disable_fallocate, \
|
from swift.common.utils import capture_stdio, disable_fallocate, \
|
||||||
drop_privileges, get_logger, NullLogger, config_true_value, \
|
drop_privileges, get_logger, NullLogger, config_true_value, \
|
||||||
validate_configuration, get_hub, config_auto_int_value, \
|
validate_configuration, get_hub, config_auto_int_value, \
|
||||||
reiterate
|
reiterate, clean_up_daemon_hygiene
|
||||||
|
|
||||||
SIGNUM_TO_NAME = {getattr(signal, n): n for n in dir(signal)
|
SIGNUM_TO_NAME = {getattr(signal, n): n for n in dir(signal)
|
||||||
if n.startswith('SIG') and '_' not in n}
|
if n.startswith('SIG') and '_' not in n}
|
||||||
|
NOTIFY_FD_ENV_KEY = '__SWIFT_SERVER_NOTIFY_FD'
|
||||||
|
|
||||||
# Set maximum line size of message headers to be accepted.
|
# Set maximum line size of message headers to be accepted.
|
||||||
wsgi.MAX_HEADER_LINE = constraints.MAX_HEADER_SIZE
|
wsgi.MAX_HEADER_LINE = constraints.MAX_HEADER_SIZE
|
||||||
@ -422,6 +426,13 @@ def load_app_config(conf_file):
|
|||||||
class SwiftHttpProtocol(wsgi.HttpProtocol):
|
class SwiftHttpProtocol(wsgi.HttpProtocol):
|
||||||
default_request_version = "HTTP/1.0"
|
default_request_version = "HTTP/1.0"
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
# See https://github.com/eventlet/eventlet/pull/590
|
||||||
|
self.pre_shutdown_bugfix_eventlet = not getattr(
|
||||||
|
websocket.WebSocketWSGI, '_WSGI_APP_ALWAYS_IDLE', None)
|
||||||
|
# Note this is not a new-style class, so super() won't work
|
||||||
|
wsgi.HttpProtocol.__init__(self, *args, **kwargs)
|
||||||
|
|
||||||
def log_request(self, *a):
|
def log_request(self, *a):
|
||||||
"""
|
"""
|
||||||
Turn off logging requests by the underlying WSGI software.
|
Turn off logging requests by the underlying WSGI software.
|
||||||
@ -528,6 +539,23 @@ class SwiftHttpProtocol(wsgi.HttpProtocol):
|
|||||||
b'HTTP/1.1 100 Continue\r\n'
|
b'HTTP/1.1 100 Continue\r\n'
|
||||||
return environ
|
return environ
|
||||||
|
|
||||||
|
def _read_request_line(self):
|
||||||
|
# Note this is not a new-style class, so super() won't work
|
||||||
|
got = wsgi.HttpProtocol._read_request_line(self)
|
||||||
|
# See https://github.com/eventlet/eventlet/pull/590
|
||||||
|
if self.pre_shutdown_bugfix_eventlet:
|
||||||
|
self.conn_state[2] = wsgi.STATE_REQUEST
|
||||||
|
return got
|
||||||
|
|
||||||
|
def handle_one_request(self):
|
||||||
|
# Note this is not a new-style class, so super() won't work
|
||||||
|
got = wsgi.HttpProtocol.handle_one_request(self)
|
||||||
|
# See https://github.com/eventlet/eventlet/pull/590
|
||||||
|
if self.pre_shutdown_bugfix_eventlet:
|
||||||
|
if self.conn_state[2] != wsgi.STATE_CLOSE:
|
||||||
|
self.conn_state[2] = wsgi.STATE_IDLE
|
||||||
|
return got
|
||||||
|
|
||||||
|
|
||||||
class SwiftHttpProxiedProtocol(SwiftHttpProtocol):
|
class SwiftHttpProxiedProtocol(SwiftHttpProtocol):
|
||||||
"""
|
"""
|
||||||
@ -662,7 +690,36 @@ def run_server(conf, logger, sock, global_conf=None):
|
|||||||
pool.waitall()
|
pool.waitall()
|
||||||
|
|
||||||
|
|
||||||
class WorkersStrategy(object):
|
class StrategyBase(object):
|
||||||
|
"""
|
||||||
|
Some operations common to all strategy classes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def shutdown_sockets(self):
|
||||||
|
"""
|
||||||
|
Shutdown any listen sockets.
|
||||||
|
"""
|
||||||
|
|
||||||
|
for sock in self.iter_sockets():
|
||||||
|
greenio.shutdown_safe(sock)
|
||||||
|
sock.close()
|
||||||
|
|
||||||
|
def set_close_on_exec_on_listen_sockets(self):
|
||||||
|
"""
|
||||||
|
Set the close-on-exec flag on any listen sockets.
|
||||||
|
"""
|
||||||
|
|
||||||
|
for sock in self.iter_sockets():
|
||||||
|
if six.PY2:
|
||||||
|
fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
|
||||||
|
else:
|
||||||
|
# Python 3.4 and later default to sockets having close-on-exec
|
||||||
|
# set (what PEP 0446 calls "non-inheritable"). This new method
|
||||||
|
# on socket objects is provided to toggle it.
|
||||||
|
sock.set_inheritable(False)
|
||||||
|
|
||||||
|
|
||||||
|
class WorkersStrategy(StrategyBase):
|
||||||
"""
|
"""
|
||||||
WSGI server management strategy object for a single bind port and listen
|
WSGI server management strategy object for a single bind port and listen
|
||||||
socket shared by a configured number of forked-off workers.
|
socket shared by a configured number of forked-off workers.
|
||||||
@ -695,8 +752,7 @@ class WorkersStrategy(object):
|
|||||||
|
|
||||||
def do_bind_ports(self):
|
def do_bind_ports(self):
|
||||||
"""
|
"""
|
||||||
Bind the one listen socket for this strategy and drop privileges
|
Bind the one listen socket for this strategy.
|
||||||
(since the parent process will never need to bind again).
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -705,7 +761,6 @@ class WorkersStrategy(object):
|
|||||||
msg = 'bind_port wasn\'t properly set in the config file. ' \
|
msg = 'bind_port wasn\'t properly set in the config file. ' \
|
||||||
'It must be explicitly set to a valid port number.'
|
'It must be explicitly set to a valid port number.'
|
||||||
return msg
|
return msg
|
||||||
drop_privileges(self.conf.get('user', 'swift'))
|
|
||||||
|
|
||||||
def no_fork_sock(self):
|
def no_fork_sock(self):
|
||||||
"""
|
"""
|
||||||
@ -766,20 +821,27 @@ class WorkersStrategy(object):
|
|||||||
"""
|
"""
|
||||||
Called when a worker has exited.
|
Called when a worker has exited.
|
||||||
|
|
||||||
|
NOTE: a re-exec'ed server can reap the dead worker PIDs from the old
|
||||||
|
server process that is being replaced as part of a service reload
|
||||||
|
(SIGUSR1). So we need to be robust to getting some unknown PID here.
|
||||||
|
|
||||||
:param int pid: The PID of the worker that exited.
|
:param int pid: The PID of the worker that exited.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if pid in self.children:
|
||||||
self.logger.error('Removing dead child %s from parent %s',
|
self.logger.error('Removing dead child %s from parent %s',
|
||||||
pid, os.getpid())
|
pid, os.getpid())
|
||||||
self.children.remove(pid)
|
self.children.remove(pid)
|
||||||
|
else:
|
||||||
|
self.logger.info('Ignoring wait() result from unknown PID %s', pid)
|
||||||
|
|
||||||
def shutdown_sockets(self):
|
def iter_sockets(self):
|
||||||
"""
|
"""
|
||||||
Shutdown any listen sockets.
|
Yields all known listen sockets.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
greenio.shutdown_safe(self.sock)
|
if self.sock:
|
||||||
self.sock.close()
|
yield self.sock
|
||||||
|
|
||||||
|
|
||||||
class PortPidState(object):
|
class PortPidState(object):
|
||||||
@ -901,7 +963,7 @@ class PortPidState(object):
|
|||||||
self.sock_data_by_port[dead_port]['pids'][server_idx] = None
|
self.sock_data_by_port[dead_port]['pids'][server_idx] = None
|
||||||
|
|
||||||
|
|
||||||
class ServersPerPortStrategy(object):
|
class ServersPerPortStrategy(StrategyBase):
|
||||||
"""
|
"""
|
||||||
WSGI server management strategy object for an object-server with one listen
|
WSGI server management strategy object for an object-server with one listen
|
||||||
port per unique local port in the storage policy rings. The
|
port per unique local port in the storage policy rings. The
|
||||||
@ -948,28 +1010,13 @@ class ServersPerPortStrategy(object):
|
|||||||
|
|
||||||
def do_bind_ports(self):
|
def do_bind_ports(self):
|
||||||
"""
|
"""
|
||||||
Bind one listen socket per unique local storage policy ring port. Then
|
Bind one listen socket per unique local storage policy ring port.
|
||||||
do all the work of drop_privileges except the actual dropping of
|
|
||||||
privileges (each forked-off worker will do that post-fork in
|
|
||||||
:py:meth:`post_fork_hook`).
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self._reload_bind_ports()
|
self._reload_bind_ports()
|
||||||
for port in self.bind_ports:
|
for port in self.bind_ports:
|
||||||
self._bind_port(port)
|
self._bind_port(port)
|
||||||
|
|
||||||
# The workers strategy drops privileges here, which we obviously cannot
|
|
||||||
# do if we want to support binding to low ports. But we do want some
|
|
||||||
# of the actions that drop_privileges did.
|
|
||||||
try:
|
|
||||||
os.setsid()
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
# In case you need to rmdir where you started the daemon:
|
|
||||||
os.chdir('/')
|
|
||||||
# Ensure files are created with the correct privileges:
|
|
||||||
os.umask(0o22)
|
|
||||||
|
|
||||||
def no_fork_sock(self):
|
def no_fork_sock(self):
|
||||||
"""
|
"""
|
||||||
This strategy does not support running in the foreground.
|
This strategy does not support running in the foreground.
|
||||||
@ -1030,7 +1077,7 @@ class ServersPerPortStrategy(object):
|
|||||||
to drop privileges.
|
to drop privileges.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
drop_privileges(self.conf.get('user', 'swift'), call_setsid=False)
|
drop_privileges(self.conf.get('user', 'swift'))
|
||||||
|
|
||||||
def log_sock_exit(self, sock, server_idx):
|
def log_sock_exit(self, sock, server_idx):
|
||||||
"""
|
"""
|
||||||
@ -1050,6 +1097,7 @@ class ServersPerPortStrategy(object):
|
|||||||
:py:meth:`new_worker_socks`.
|
:py:meth:`new_worker_socks`.
|
||||||
:param int pid: The new worker process' PID
|
:param int pid: The new worker process' PID
|
||||||
"""
|
"""
|
||||||
|
|
||||||
port = self.port_pid_state.port_for_sock(sock)
|
port = self.port_pid_state.port_for_sock(sock)
|
||||||
self.logger.notice('Started child %d (PID %d) for port %d',
|
self.logger.notice('Started child %d (PID %d) for port %d',
|
||||||
server_idx, pid, port)
|
server_idx, pid, port)
|
||||||
@ -1064,14 +1112,13 @@ class ServersPerPortStrategy(object):
|
|||||||
|
|
||||||
self.port_pid_state.forget_pid(pid)
|
self.port_pid_state.forget_pid(pid)
|
||||||
|
|
||||||
def shutdown_sockets(self):
|
def iter_sockets(self):
|
||||||
"""
|
"""
|
||||||
Shutdown any listen sockets.
|
Yields all known listen sockets.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
for sock in self.port_pid_state.all_socks():
|
for sock in self.port_pid_state.all_socks():
|
||||||
greenio.shutdown_safe(sock)
|
yield sock
|
||||||
sock.close()
|
|
||||||
|
|
||||||
|
|
||||||
def run_wsgi(conf_path, app_section, *args, **kwargs):
|
def run_wsgi(conf_path, app_section, *args, **kwargs):
|
||||||
@ -1127,10 +1174,22 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
|
|||||||
print(error_msg)
|
print(error_msg)
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
|
# Do some daemonization process hygene before we fork any children or run a
|
||||||
|
# server without forking.
|
||||||
|
clean_up_daemon_hygiene()
|
||||||
|
|
||||||
# Redirect errors to logger and close stdio. Do this *after* binding ports;
|
# Redirect errors to logger and close stdio. Do this *after* binding ports;
|
||||||
# we use this to signal that the service is ready to accept connections.
|
# we use this to signal that the service is ready to accept connections.
|
||||||
capture_stdio(logger)
|
capture_stdio(logger)
|
||||||
|
|
||||||
|
# If necessary, signal an old copy of us that it's okay to shutdown its
|
||||||
|
# listen sockets now because ours are up and ready to receive connections.
|
||||||
|
reexec_signal_fd = os.getenv(NOTIFY_FD_ENV_KEY)
|
||||||
|
if reexec_signal_fd:
|
||||||
|
reexec_signal_fd = int(reexec_signal_fd)
|
||||||
|
os.write(reexec_signal_fd, str(os.getpid()).encode('utf8'))
|
||||||
|
os.close(reexec_signal_fd)
|
||||||
|
|
||||||
no_fork_sock = strategy.no_fork_sock()
|
no_fork_sock = strategy.no_fork_sock()
|
||||||
if no_fork_sock:
|
if no_fork_sock:
|
||||||
run_server(conf, logger, no_fork_sock, global_conf=global_conf)
|
run_server(conf, logger, no_fork_sock, global_conf=global_conf)
|
||||||
@ -1145,6 +1204,7 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
|
|||||||
running_context = [True, None]
|
running_context = [True, None]
|
||||||
signal.signal(signal.SIGTERM, stop_with_signal)
|
signal.signal(signal.SIGTERM, stop_with_signal)
|
||||||
signal.signal(signal.SIGHUP, stop_with_signal)
|
signal.signal(signal.SIGHUP, stop_with_signal)
|
||||||
|
signal.signal(signal.SIGUSR1, stop_with_signal)
|
||||||
|
|
||||||
while running_context[0]:
|
while running_context[0]:
|
||||||
for sock, sock_info in strategy.new_worker_socks():
|
for sock, sock_info in strategy.new_worker_socks():
|
||||||
@ -1152,6 +1212,7 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
|
|||||||
if pid == 0:
|
if pid == 0:
|
||||||
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
||||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||||
|
signal.signal(signal.SIGUSR1, signal.SIG_DFL)
|
||||||
strategy.post_fork_hook()
|
strategy.post_fork_hook()
|
||||||
run_server(conf, logger, sock)
|
run_server(conf, logger, sock)
|
||||||
strategy.log_sock_exit(sock, sock_info)
|
strategy.log_sock_exit(sock, sock_info)
|
||||||
@ -1196,9 +1257,58 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
|
|||||||
logger.error('Stopping with unexpected signal %r' %
|
logger.error('Stopping with unexpected signal %r' %
|
||||||
running_context[1])
|
running_context[1])
|
||||||
else:
|
else:
|
||||||
logger.error('%s received', signame)
|
logger.error('%s received (%s)', signame, os.getpid())
|
||||||
if running_context[1] == signal.SIGTERM:
|
if running_context[1] == signal.SIGTERM:
|
||||||
os.killpg(0, signal.SIGTERM)
|
os.killpg(0, signal.SIGTERM)
|
||||||
|
elif running_context[1] == signal.SIGUSR1:
|
||||||
|
# set up a pipe, fork off a child to handle cleanup later,
|
||||||
|
# and rexec ourselves with an environment variable set which will
|
||||||
|
# indicate which fd (one of the pipe ends) to write a byte to
|
||||||
|
# to indicate listen socket setup is complete. That will signal
|
||||||
|
# the forked-off child to complete its listen socket shutdown.
|
||||||
|
#
|
||||||
|
# NOTE: all strategies will now require the parent process to retain
|
||||||
|
# superuser privileges so that the re'execd process can bind a new
|
||||||
|
# socket to the configured IP & port(s). We can't just reuse existing
|
||||||
|
# listen sockets because then the bind IP couldn't be changed.
|
||||||
|
#
|
||||||
|
# NOTE: we need to set all our listen sockets close-on-exec so the only
|
||||||
|
# open reference to those file descriptors will be in the forked-off
|
||||||
|
# child here who waits to shutdown the old server's listen sockets. If
|
||||||
|
# the re-exec'ed server's old listen sockets aren't closed-on-exec,
|
||||||
|
# then the old server can't actually ever exit.
|
||||||
|
strategy.set_close_on_exec_on_listen_sockets()
|
||||||
|
read_fd, write_fd = os.pipe()
|
||||||
|
orig_server_pid = os.getpid()
|
||||||
|
child_pid = os.fork()
|
||||||
|
if child_pid:
|
||||||
|
# parent; set env var for fds and reexec ourselves
|
||||||
|
os.close(read_fd)
|
||||||
|
os.putenv(NOTIFY_FD_ENV_KEY, str(write_fd))
|
||||||
|
myself = os.path.realpath(sys.argv[0])
|
||||||
|
logger.info("Old server PID=%d re'execing as: %r",
|
||||||
|
orig_server_pid, [myself] + list(sys.argv))
|
||||||
|
os.execv(myself, sys.argv)
|
||||||
|
logger.error('Somehow lived past os.execv()?!')
|
||||||
|
exit('Somehow lived past os.execv()?!')
|
||||||
|
elif child_pid == 0:
|
||||||
|
# child
|
||||||
|
os.close(write_fd)
|
||||||
|
logger.info('Old server temporary child PID=%d waiting for '
|
||||||
|
"re-exec'ed PID=%d to signal readiness...",
|
||||||
|
os.getpid(), orig_server_pid)
|
||||||
|
try:
|
||||||
|
got_pid = os.read(read_fd, 30)
|
||||||
|
logger.info('Old server temporary child PID=%d notified '
|
||||||
|
'to shutdown old listen sockets by PID=%s',
|
||||||
|
os.getpid(), got_pid)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning('Unexpected exception while reading from '
|
||||||
|
'pipe:', exc_info=True)
|
||||||
|
try:
|
||||||
|
os.close(read_fd)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
strategy.shutdown_sockets()
|
strategy.shutdown_sockets()
|
||||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||||
|
@ -16,10 +16,14 @@
|
|||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
import random
|
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import random
|
||||||
|
import shutil
|
||||||
|
import time
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
from six.moves import http_client as httplib
|
from six.moves import http_client as httplib
|
||||||
|
|
||||||
@ -27,7 +31,7 @@ from swift.common.storage_policy import POLICIES
|
|||||||
from swift.common.ring import Ring
|
from swift.common.ring import Ring
|
||||||
from swift.common.manager import Manager
|
from swift.common.manager import Manager
|
||||||
|
|
||||||
from test.probe.common import resetswift
|
from test.probe.common import resetswift, ReplProbeTest, client
|
||||||
|
|
||||||
|
|
||||||
def putrequest(conn, method, path, headers):
|
def putrequest(conn, method, path, headers):
|
||||||
@ -39,77 +43,311 @@ def putrequest(conn, method, path, headers):
|
|||||||
conn.endheaders()
|
conn.endheaders()
|
||||||
|
|
||||||
|
|
||||||
class TestWSGIServerProcessHandling(unittest.TestCase):
|
def get_server_and_worker_pids(manager, old_workers=None):
|
||||||
|
# Gets all the server parent pids, as well as the set of all worker PIDs
|
||||||
|
# (i.e. any PID whose PPID is in the set of parent pids).
|
||||||
|
server_pid_set = {pid for server in manager.servers
|
||||||
|
for (_, pid) in server.iter_pid_files()}
|
||||||
|
children_pid_set = set()
|
||||||
|
old_worker_pid_set = set(old_workers or [])
|
||||||
|
all_pids = [int(f) for f in os.listdir('/proc') if f.isdigit()]
|
||||||
|
for pid in all_pids:
|
||||||
|
try:
|
||||||
|
with open('/proc/%d/status' % pid, 'r') as fh:
|
||||||
|
for line in fh:
|
||||||
|
if line.startswith('PPid:\t'):
|
||||||
|
ppid = int(line[6:])
|
||||||
|
if ppid in server_pid_set or pid in old_worker_pid_set:
|
||||||
|
children_pid_set.add(pid)
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
# No big deal, a process could have exited since we listed /proc,
|
||||||
|
# so we just ignore errors
|
||||||
|
pass
|
||||||
|
return {'server': server_pid_set, 'worker': children_pid_set}
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_pids(manager, callback, timeout=15, old_workers=None):
|
||||||
|
# Waits up to `timeout` seconds for the supplied callback to return True
|
||||||
|
# when passed in the manager's pid set.
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
pid_sets = get_server_and_worker_pids(manager, old_workers=old_workers)
|
||||||
|
got = callback(pid_sets)
|
||||||
|
while not got and time.time() - start_time < timeout:
|
||||||
|
time.sleep(0.1)
|
||||||
|
pid_sets = get_server_and_worker_pids(manager, old_workers=old_workers)
|
||||||
|
got = callback(pid_sets)
|
||||||
|
if time.time() - start_time >= timeout:
|
||||||
|
raise AssertionError('timed out waiting for PID state; got %r' % (
|
||||||
|
pid_sets))
|
||||||
|
return pid_sets
|
||||||
|
|
||||||
|
|
||||||
|
class TestWSGIServerProcessHandling(ReplProbeTest):
|
||||||
|
# Subclasses need to define SERVER_NAME
|
||||||
|
HAS_INFO = False
|
||||||
|
PID_TIMEOUT = 25
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
resetswift()
|
super(TestWSGIServerProcessHandling, self).setUp()
|
||||||
|
self.container = 'container-%s' % uuid4()
|
||||||
|
client.put_container(self.url, self.token, self.container,
|
||||||
|
headers={'X-Storage-Policy':
|
||||||
|
self.policy.name})
|
||||||
|
self.manager = Manager([self.SERVER_NAME])
|
||||||
|
for server in self.manager.servers:
|
||||||
|
self.assertTrue(server.get_running_pids,
|
||||||
|
'No running PIDs for %s' % server.cmd)
|
||||||
|
self.starting_pids = get_server_and_worker_pids(self.manager)
|
||||||
|
|
||||||
def _check_reload(self, server_name, ip, port):
|
def assert4xx(self, resp):
|
||||||
manager = Manager([server_name])
|
self.assertEqual(resp.status // 100, 4)
|
||||||
manager.start()
|
got_body = resp.read()
|
||||||
|
try:
|
||||||
|
self.assertIn('resource could not be found', got_body)
|
||||||
|
except AssertionError:
|
||||||
|
self.assertIn('Invalid path: blah', got_body)
|
||||||
|
|
||||||
starting_pids = {pid for server in manager.servers
|
def get_conn(self):
|
||||||
for (_, pid) in server.iter_pid_files()}
|
ip, port = self.get_ip_port()
|
||||||
|
return httplib.HTTPConnection('%s:%s' % (ip, port))
|
||||||
|
|
||||||
body = b'test' * 10
|
def _check_reload(self):
|
||||||
conn = httplib.HTTPConnection('%s:%s' % (ip, port))
|
conn = self.get_conn()
|
||||||
|
self.addCleanup(conn.close)
|
||||||
|
|
||||||
# sanity request
|
# sanity request
|
||||||
putrequest(conn, 'PUT', 'blah',
|
self.start_write_req(conn, 'sanity')
|
||||||
headers={'Content-Length': len(body)})
|
resp = self.finish_write_req(conn)
|
||||||
conn.send(body)
|
self.check_write_resp(resp)
|
||||||
resp = conn.getresponse()
|
|
||||||
self.assertEqual(resp.status // 100, 4)
|
|
||||||
resp.read()
|
|
||||||
|
|
||||||
# Start the request before reloading...
|
if self.HAS_INFO:
|
||||||
putrequest(conn, 'PUT', 'blah',
|
self.check_info_value(8192)
|
||||||
headers={'Content-Length': len(body)})
|
|
||||||
|
|
||||||
manager.reload()
|
# Start another write request before reloading...
|
||||||
|
self.start_write_req(conn, 'across-reload')
|
||||||
|
|
||||||
post_reload_pids = {pid for server in manager.servers
|
if self.HAS_INFO:
|
||||||
for (_, pid) in server.iter_pid_files()}
|
self.swap_configs() # new server's max_header_size == 8191
|
||||||
|
|
||||||
# none of the pids we started with are being tracked after reload
|
self.do_reload()
|
||||||
msg = 'expected all pids from %r to have died, but found %r' % (
|
|
||||||
starting_pids, post_reload_pids)
|
|
||||||
self.assertFalse(starting_pids & post_reload_pids, msg)
|
|
||||||
|
|
||||||
# ... and make sure we can finish what we were doing, and even
|
wait_for_pids(self.manager, self.make_post_reload_pid_cb(),
|
||||||
# start part of a new request
|
old_workers=self.starting_pids['worker'],
|
||||||
conn.send(body)
|
timeout=self.PID_TIMEOUT)
|
||||||
resp = conn.getresponse()
|
|
||||||
self.assertEqual(resp.status // 100, 4)
|
# ... and make sure we can finish what we were doing
|
||||||
# We can even read the body
|
resp = self.finish_write_req(conn)
|
||||||
self.assertTrue(resp.read())
|
self.check_write_resp(resp)
|
||||||
|
|
||||||
# After this, we're in a funny spot. With eventlet 0.22.0, the
|
# After this, we're in a funny spot. With eventlet 0.22.0, the
|
||||||
# connection's now closed, but with prior versions we could keep
|
# connection's now closed, but with prior versions we could keep
|
||||||
# going indefinitely. See https://bugs.launchpad.net/swift/+bug/1792615
|
# going indefinitely. See https://bugs.launchpad.net/swift/+bug/1792615
|
||||||
|
|
||||||
# Close our connection, to make sure old eventlet shuts down
|
# Close our connections, to make sure old eventlet shuts down
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
# sanity
|
# sanity
|
||||||
post_close_pids = {pid for server in manager.servers
|
wait_for_pids(self.manager, self.make_post_close_pid_cb(),
|
||||||
for (_, pid) in server.iter_pid_files()}
|
old_workers=self.starting_pids['worker'],
|
||||||
self.assertEqual(post_reload_pids, post_close_pids)
|
timeout=self.PID_TIMEOUT)
|
||||||
|
|
||||||
def test_proxy_reload(self):
|
if self.HAS_INFO:
|
||||||
self._check_reload('proxy-server', 'localhost', 8080)
|
self.check_info_value(8191)
|
||||||
|
|
||||||
def test_object_reload(self):
|
|
||||||
|
class OldReloadMixin(object):
|
||||||
|
def make_post_reload_pid_cb(self):
|
||||||
|
def _cb(post_reload_pids):
|
||||||
|
# We expect all old server PIDs to be gone, a new server present,
|
||||||
|
# and for there to be exactly 1 old worker PID plus additional new
|
||||||
|
# worker PIDs.
|
||||||
|
old_servers_dead = not (self.starting_pids['server'] &
|
||||||
|
post_reload_pids['server'])
|
||||||
|
one_old_worker = 1 == len(self.starting_pids['worker'] &
|
||||||
|
post_reload_pids['worker'])
|
||||||
|
new_workers_present = (post_reload_pids['worker'] -
|
||||||
|
self.starting_pids['worker'])
|
||||||
|
return (post_reload_pids['server'] and old_servers_dead and
|
||||||
|
one_old_worker and new_workers_present)
|
||||||
|
return _cb
|
||||||
|
|
||||||
|
def make_post_close_pid_cb(self):
|
||||||
|
def _cb(post_close_pids):
|
||||||
|
# We expect all old server PIDs to be gone, a new server present,
|
||||||
|
# no old worker PIDs, and additional new worker PIDs.
|
||||||
|
old_servers_dead = not (self.starting_pids['server'] &
|
||||||
|
post_close_pids['server'])
|
||||||
|
old_workers_dead = not (self.starting_pids['worker'] &
|
||||||
|
post_close_pids['worker'])
|
||||||
|
new_workers_present = (post_close_pids['worker'] -
|
||||||
|
self.starting_pids['worker'])
|
||||||
|
return (post_close_pids['server'] and old_servers_dead and
|
||||||
|
old_workers_dead and new_workers_present)
|
||||||
|
return _cb
|
||||||
|
|
||||||
|
def do_reload(self):
|
||||||
|
self.manager.reload()
|
||||||
|
|
||||||
|
|
||||||
|
class SeamlessReloadMixin(object):
|
||||||
|
def make_post_reload_pid_cb(self):
|
||||||
|
def _cb(post_reload_pids):
|
||||||
|
# We expect all orig server PIDs to STILL BE PRESENT, no new server
|
||||||
|
# present, and for there to be exactly 1 old worker PID plus
|
||||||
|
# additional new worker PIDs.
|
||||||
|
same_servers = (self.starting_pids['server'] ==
|
||||||
|
post_reload_pids['server'])
|
||||||
|
one_old_worker = 1 == len(self.starting_pids['worker'] &
|
||||||
|
post_reload_pids['worker'])
|
||||||
|
new_workers_present = (post_reload_pids['worker'] -
|
||||||
|
self.starting_pids['worker'])
|
||||||
|
return (post_reload_pids['server'] and same_servers and
|
||||||
|
one_old_worker and new_workers_present)
|
||||||
|
return _cb
|
||||||
|
|
||||||
|
def make_post_close_pid_cb(self):
|
||||||
|
def _cb(post_close_pids):
|
||||||
|
# We expect all orig server PIDs to STILL BE PRESENT, no new server
|
||||||
|
# present, no old worker PIDs, and additional new worker PIDs.
|
||||||
|
same_servers = (self.starting_pids['server'] ==
|
||||||
|
post_close_pids['server'])
|
||||||
|
old_workers_dead = not (self.starting_pids['worker'] &
|
||||||
|
post_close_pids['worker'])
|
||||||
|
new_workers_present = (post_close_pids['worker'] -
|
||||||
|
self.starting_pids['worker'])
|
||||||
|
return (post_close_pids['server'] and same_servers and
|
||||||
|
old_workers_dead and new_workers_present)
|
||||||
|
return _cb
|
||||||
|
|
||||||
|
def do_reload(self):
|
||||||
|
self.manager.reload_seamless()
|
||||||
|
|
||||||
|
|
||||||
|
class TestObjectServerReloadBase(TestWSGIServerProcessHandling):
|
||||||
|
SERVER_NAME = 'object'
|
||||||
|
PID_TIMEOUT = 35
|
||||||
|
|
||||||
|
def get_ip_port(self):
|
||||||
policy = random.choice(list(POLICIES))
|
policy = random.choice(list(POLICIES))
|
||||||
policy.load_ring('/etc/swift')
|
policy.load_ring('/etc/swift')
|
||||||
node = random.choice(policy.object_ring.get_part_nodes(1))
|
self.ring_node = random.choice(policy.object_ring.get_part_nodes(1))
|
||||||
self._check_reload('object', node['ip'], node['port'])
|
return self.ring_node['ip'], self.ring_node['port']
|
||||||
|
|
||||||
def test_account_container_reload(self):
|
def start_write_req(self, conn, suffix):
|
||||||
for server in ('account', 'container'):
|
putrequest(conn, 'PUT', '/%s/123/%s/%s/blah-%s' % (
|
||||||
ring = Ring('/etc/swift', ring_name=server)
|
self.ring_node['device'], self.account, self.container, suffix),
|
||||||
node = random.choice(ring.get_part_nodes(1))
|
headers={'X-Timestamp': str(time.time()),
|
||||||
self._check_reload(server, node['ip'], node['port'])
|
'Content-Type': 'application/octet-string',
|
||||||
|
'Content-Length': len(self.BODY)})
|
||||||
|
|
||||||
|
def finish_write_req(self, conn):
|
||||||
|
conn.send(self.BODY)
|
||||||
|
return conn.getresponse()
|
||||||
|
|
||||||
|
def check_write_resp(self, resp):
|
||||||
|
got_body = resp.read()
|
||||||
|
self.assertEqual(resp.status // 100, 2, 'Got status %d; %r' %
|
||||||
|
(resp.status, got_body))
|
||||||
|
self.assertEqual('', got_body)
|
||||||
|
return resp
|
||||||
|
|
||||||
|
|
||||||
|
class TestObjectServerReload(OldReloadMixin, TestObjectServerReloadBase):
|
||||||
|
BODY = 'test-object' * 10
|
||||||
|
|
||||||
|
def test_object_reload(self):
|
||||||
|
self._check_reload()
|
||||||
|
|
||||||
|
|
||||||
|
class TestObjectServerReloadSeamless(SeamlessReloadMixin,
|
||||||
|
TestObjectServerReloadBase):
|
||||||
|
BODY = 'test-object' * 10
|
||||||
|
|
||||||
|
def test_object_reload_seamless(self):
|
||||||
|
self._check_reload()
|
||||||
|
|
||||||
|
|
||||||
|
class TestProxyServerReloadBase(TestWSGIServerProcessHandling):
|
||||||
|
SERVER_NAME = 'proxy-server'
|
||||||
|
HAS_INFO = True
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestProxyServerReloadBase, self).setUp()
|
||||||
|
self.swift_conf_path = '/etc/swift/swift.conf'
|
||||||
|
self.new_swift_conf_path = self.swift_conf_path + '.new'
|
||||||
|
self.saved_swift_conf_path = self.swift_conf_path + '.orig'
|
||||||
|
shutil.copy(self.swift_conf_path, self.saved_swift_conf_path)
|
||||||
|
shutil.copy(self.swift_conf_path, self.new_swift_conf_path)
|
||||||
|
with open(self.new_swift_conf_path, 'a+') as fh:
|
||||||
|
fh.seek(0, os.SEEK_END)
|
||||||
|
fh.write('\n[swift-constraints]\nmax_header_size = 8191\n')
|
||||||
|
fh.flush()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
shutil.move(self.saved_swift_conf_path, self.swift_conf_path)
|
||||||
|
try:
|
||||||
|
os.unlink(self.new_swift_conf_path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
super(TestProxyServerReloadBase, self).tearDown()
|
||||||
|
|
||||||
|
def swap_configs(self):
|
||||||
|
shutil.copy(self.new_swift_conf_path, self.swift_conf_path)
|
||||||
|
|
||||||
|
def get_ip_port(self):
|
||||||
|
return 'localhost', 8080
|
||||||
|
|
||||||
|
def assertMaxHeaderSize(self, resp, exp_max_header_size):
|
||||||
|
self.assertEqual(resp.status // 100, 2)
|
||||||
|
info_dict = json.loads(resp.read())
|
||||||
|
self.assertEqual(exp_max_header_size,
|
||||||
|
info_dict['swift']['max_header_size'])
|
||||||
|
|
||||||
|
def check_info_value(self, expected_value):
|
||||||
|
# show that we're talking to the original server with the default
|
||||||
|
# max_header_size == 8192
|
||||||
|
conn2 = self.get_conn()
|
||||||
|
putrequest(conn2, 'GET', '/info',
|
||||||
|
headers={'Content-Length': '0',
|
||||||
|
'Accept': 'application/json'})
|
||||||
|
conn2.send('')
|
||||||
|
resp = conn2.getresponse()
|
||||||
|
self.assertMaxHeaderSize(resp, expected_value)
|
||||||
|
conn2.close()
|
||||||
|
|
||||||
|
def start_write_req(self, conn, suffix):
|
||||||
|
putrequest(conn, 'PUT', '/v1/%s/%s/blah-%s' % (
|
||||||
|
self.account, self.container, suffix),
|
||||||
|
headers={'X-Auth-Token': self.token,
|
||||||
|
'Content-Length': len(self.BODY)})
|
||||||
|
|
||||||
|
def finish_write_req(self, conn):
|
||||||
|
conn.send(self.BODY)
|
||||||
|
return conn.getresponse()
|
||||||
|
|
||||||
|
def check_write_resp(self, resp):
|
||||||
|
got_body = resp.read()
|
||||||
|
self.assertEqual(resp.status // 100, 2, 'Got status %d; %r' %
|
||||||
|
(resp.status, got_body))
|
||||||
|
self.assertEqual('', got_body)
|
||||||
|
return resp
|
||||||
|
|
||||||
|
|
||||||
|
class TestProxyServerReload(OldReloadMixin, TestProxyServerReloadBase):
|
||||||
|
BODY = 'proxy' * 10
|
||||||
|
|
||||||
|
def test_proxy_reload(self):
|
||||||
|
self._check_reload()
|
||||||
|
|
||||||
|
|
||||||
|
class TestProxyServerReloadSeamless(SeamlessReloadMixin,
|
||||||
|
TestProxyServerReloadBase):
|
||||||
|
BODY = 'proxy-seamless' * 10
|
||||||
|
|
||||||
|
def test_proxy_reload_seamless(self):
|
||||||
|
self._check_reload()
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
|
@ -2285,15 +2285,16 @@ log_name = %(yarr)s'''
|
|||||||
}
|
}
|
||||||
self.assertEqual(conf, expected)
|
self.assertEqual(conf, expected)
|
||||||
|
|
||||||
def _check_drop_privileges(self, mock_os, required_func_calls,
|
def test_drop_privileges(self):
|
||||||
call_setsid=True):
|
required_func_calls = ('setgroups', 'setgid', 'setuid')
|
||||||
|
mock_os = MockOs(called_funcs=required_func_calls)
|
||||||
user = getuser()
|
user = getuser()
|
||||||
user_data = pwd.getpwnam(user)
|
user_data = pwd.getpwnam(user)
|
||||||
self.assertFalse(mock_os.called_funcs) # sanity check
|
self.assertFalse(mock_os.called_funcs) # sanity check
|
||||||
# over-ride os with mock
|
# over-ride os with mock
|
||||||
with mock.patch('swift.common.utils.os', mock_os):
|
with mock.patch('swift.common.utils.os', mock_os):
|
||||||
# exercise the code
|
# exercise the code
|
||||||
utils.drop_privileges(user, call_setsid=call_setsid)
|
utils.drop_privileges(user)
|
||||||
|
|
||||||
for func in required_func_calls:
|
for func in required_func_calls:
|
||||||
self.assertIn(func, mock_os.called_funcs)
|
self.assertIn(func, mock_os.called_funcs)
|
||||||
@ -2302,34 +2303,41 @@ log_name = %(yarr)s'''
|
|||||||
self.assertEqual(groups, set(mock_os.called_funcs['setgroups'][0]))
|
self.assertEqual(groups, set(mock_os.called_funcs['setgroups'][0]))
|
||||||
self.assertEqual(user_data[3], mock_os.called_funcs['setgid'][0])
|
self.assertEqual(user_data[3], mock_os.called_funcs['setgid'][0])
|
||||||
self.assertEqual(user_data[2], mock_os.called_funcs['setuid'][0])
|
self.assertEqual(user_data[2], mock_os.called_funcs['setuid'][0])
|
||||||
self.assertEqual('/', mock_os.called_funcs['chdir'][0])
|
|
||||||
self.assertEqual(0o22, mock_os.called_funcs['umask'][0])
|
|
||||||
|
|
||||||
def test_drop_privileges(self):
|
def test_drop_privileges_no_setgroups(self):
|
||||||
required_func_calls = ('setgroups', 'setgid', 'setuid', 'setsid',
|
required_func_calls = ('geteuid', 'setgid', 'setuid')
|
||||||
'chdir', 'umask')
|
|
||||||
mock_os = MockOs(called_funcs=required_func_calls)
|
mock_os = MockOs(called_funcs=required_func_calls)
|
||||||
self._check_drop_privileges(mock_os, required_func_calls)
|
user = getuser()
|
||||||
|
user_data = pwd.getpwnam(user)
|
||||||
|
self.assertFalse(mock_os.called_funcs) # sanity check
|
||||||
|
# over-ride os with mock
|
||||||
|
with mock.patch('swift.common.utils.os', mock_os):
|
||||||
|
# exercise the code
|
||||||
|
utils.drop_privileges(user)
|
||||||
|
|
||||||
def test_drop_privileges_setsid_error(self):
|
for func in required_func_calls:
|
||||||
# OSError trying to get session leader
|
self.assertIn(func, mock_os.called_funcs)
|
||||||
required_func_calls = ('setgroups', 'setgid', 'setuid', 'setsid',
|
self.assertNotIn('setgroups', mock_os.called_funcs)
|
||||||
'chdir', 'umask')
|
self.assertEqual(user_data[5], mock_os.environ['HOME'])
|
||||||
mock_os = MockOs(called_funcs=required_func_calls,
|
self.assertEqual(user_data[3], mock_os.called_funcs['setgid'][0])
|
||||||
raise_funcs=('setsid',))
|
self.assertEqual(user_data[2], mock_os.called_funcs['setuid'][0])
|
||||||
self._check_drop_privileges(mock_os, required_func_calls)
|
|
||||||
|
|
||||||
def test_drop_privileges_no_call_setsid(self):
|
def test_clean_up_daemon_hygene(self):
|
||||||
required_func_calls = ('setgroups', 'setgid', 'setuid', 'chdir',
|
required_func_calls = ('chdir', 'umask')
|
||||||
'umask')
|
# OSError if trying to get session leader, but setsid() OSError is
|
||||||
# OSError if trying to get session leader, but it shouldn't be called
|
# ignored by the code under test.
|
||||||
bad_func_calls = ('setsid',)
|
bad_func_calls = ('setsid',)
|
||||||
mock_os = MockOs(called_funcs=required_func_calls,
|
mock_os = MockOs(called_funcs=required_func_calls,
|
||||||
raise_funcs=bad_func_calls)
|
raise_funcs=bad_func_calls)
|
||||||
self._check_drop_privileges(mock_os, required_func_calls,
|
with mock.patch('swift.common.utils.os', mock_os):
|
||||||
call_setsid=False)
|
# exercise the code
|
||||||
|
utils.clean_up_daemon_hygiene()
|
||||||
|
for func in required_func_calls:
|
||||||
|
self.assertIn(func, mock_os.called_funcs)
|
||||||
for func in bad_func_calls:
|
for func in bad_func_calls:
|
||||||
self.assertNotIn(func, mock_os.called_funcs)
|
self.assertIn(func, mock_os.called_funcs)
|
||||||
|
self.assertEqual('/', mock_os.called_funcs['chdir'][0])
|
||||||
|
self.assertEqual(0o22, mock_os.called_funcs['umask'][0])
|
||||||
|
|
||||||
@reset_logger_state
|
@reset_logger_state
|
||||||
def test_capture_stdio(self):
|
def test_capture_stdio(self):
|
||||||
|
@ -836,7 +836,8 @@ class TestWSGI(unittest.TestCase):
|
|||||||
|
|
||||||
with mock.patch.object(wsgi, '_initrp', _initrp), \
|
with mock.patch.object(wsgi, '_initrp', _initrp), \
|
||||||
mock.patch.object(wsgi, 'get_socket'), \
|
mock.patch.object(wsgi, 'get_socket'), \
|
||||||
mock.patch.object(wsgi, 'drop_privileges'), \
|
mock.patch.object(wsgi, 'drop_privileges') as _d_privs, \
|
||||||
|
mock.patch.object(wsgi, 'clean_up_daemon_hygiene') as _c_hyg, \
|
||||||
mock.patch.object(wsgi, 'loadapp', _loadapp), \
|
mock.patch.object(wsgi, 'loadapp', _loadapp), \
|
||||||
mock.patch.object(wsgi, 'capture_stdio'), \
|
mock.patch.object(wsgi, 'capture_stdio'), \
|
||||||
mock.patch.object(wsgi, 'run_server'), \
|
mock.patch.object(wsgi, 'run_server'), \
|
||||||
@ -849,6 +850,10 @@ class TestWSGI(unittest.TestCase):
|
|||||||
socket=True,
|
socket=True,
|
||||||
select=True,
|
select=True,
|
||||||
thread=True)
|
thread=True)
|
||||||
|
# run_wsgi() no longer calls drop_privileges() in the parent process,
|
||||||
|
# just clean_up_deemon_hygene()
|
||||||
|
self.assertEqual([], _d_privs.mock_calls)
|
||||||
|
self.assertEqual([mock.call()], _c_hyg.mock_calls)
|
||||||
|
|
||||||
@mock.patch('swift.common.wsgi.run_server')
|
@mock.patch('swift.common.wsgi.run_server')
|
||||||
@mock.patch('swift.common.wsgi.WorkersStrategy')
|
@mock.patch('swift.common.wsgi.WorkersStrategy')
|
||||||
@ -1353,36 +1358,11 @@ class TestServersPerPortStrategy(unittest.TestCase):
|
|||||||
6006, self.strategy.port_pid_state.port_for_sock(self.s1))
|
6006, self.strategy.port_pid_state.port_for_sock(self.s1))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
6007, self.strategy.port_pid_state.port_for_sock(self.s2))
|
6007, self.strategy.port_pid_state.port_for_sock(self.s2))
|
||||||
self.assertEqual([mock.call()], self.mock_setsid.mock_calls)
|
# strategy binding no longer does clean_up_deemon_hygene() actions, the
|
||||||
self.assertEqual([mock.call('/')], self.mock_chdir.mock_calls)
|
# user of the strategy does.
|
||||||
self.assertEqual([mock.call(0o22)], self.mock_umask.mock_calls)
|
self.assertEqual([], self.mock_setsid.mock_calls)
|
||||||
|
self.assertEqual([], self.mock_chdir.mock_calls)
|
||||||
def test_bind_ports_ignores_setsid_errors(self):
|
self.assertEqual([], self.mock_umask.mock_calls)
|
||||||
self.mock_setsid.side_effect = OSError()
|
|
||||||
self.strategy.do_bind_ports()
|
|
||||||
|
|
||||||
self.assertEqual(set((6006, 6007)), self.strategy.bind_ports)
|
|
||||||
self.assertEqual([
|
|
||||||
mock.call({'workers': 100, # ignored
|
|
||||||
'user': 'bob',
|
|
||||||
'swift_dir': '/jim/cricket',
|
|
||||||
'ring_check_interval': '76',
|
|
||||||
'bind_ip': '2.3.4.5',
|
|
||||||
'bind_port': 6006}),
|
|
||||||
mock.call({'workers': 100, # ignored
|
|
||||||
'user': 'bob',
|
|
||||||
'swift_dir': '/jim/cricket',
|
|
||||||
'ring_check_interval': '76',
|
|
||||||
'bind_ip': '2.3.4.5',
|
|
||||||
'bind_port': 6007}),
|
|
||||||
], self.mock_get_socket.mock_calls)
|
|
||||||
self.assertEqual(
|
|
||||||
6006, self.strategy.port_pid_state.port_for_sock(self.s1))
|
|
||||||
self.assertEqual(
|
|
||||||
6007, self.strategy.port_pid_state.port_for_sock(self.s2))
|
|
||||||
self.assertEqual([mock.call()], self.mock_setsid.mock_calls)
|
|
||||||
self.assertEqual([mock.call('/')], self.mock_chdir.mock_calls)
|
|
||||||
self.assertEqual([mock.call(0o22)], self.mock_umask.mock_calls)
|
|
||||||
|
|
||||||
def test_no_fork_sock(self):
|
def test_no_fork_sock(self):
|
||||||
self.assertIsNone(self.strategy.no_fork_sock())
|
self.assertIsNone(self.strategy.no_fork_sock())
|
||||||
@ -1519,7 +1499,7 @@ class TestServersPerPortStrategy(unittest.TestCase):
|
|||||||
self.strategy.post_fork_hook()
|
self.strategy.post_fork_hook()
|
||||||
|
|
||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
mock.call('bob', call_setsid=False),
|
mock.call('bob'),
|
||||||
], self.mock_drop_privileges.mock_calls)
|
], self.mock_drop_privileges.mock_calls)
|
||||||
|
|
||||||
def test_shutdown_sockets(self):
|
def test_shutdown_sockets(self):
|
||||||
@ -1555,6 +1535,9 @@ class TestWorkersStrategy(unittest.TestCase):
|
|||||||
patcher = mock.patch('swift.common.wsgi.drop_privileges')
|
patcher = mock.patch('swift.common.wsgi.drop_privileges')
|
||||||
self.mock_drop_privileges = patcher.start()
|
self.mock_drop_privileges = patcher.start()
|
||||||
self.addCleanup(patcher.stop)
|
self.addCleanup(patcher.stop)
|
||||||
|
patcher = mock.patch('swift.common.wsgi.clean_up_daemon_hygiene')
|
||||||
|
self.mock_clean_up_daemon_hygene = patcher.start()
|
||||||
|
self.addCleanup(patcher.stop)
|
||||||
|
|
||||||
def test_loop_timeout(self):
|
def test_loop_timeout(self):
|
||||||
# This strategy should sit in the green.os.wait() for a bit (to avoid
|
# This strategy should sit in the green.os.wait() for a bit (to avoid
|
||||||
@ -1569,9 +1552,10 @@ class TestWorkersStrategy(unittest.TestCase):
|
|||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
mock.call(self.conf),
|
mock.call(self.conf),
|
||||||
], self.mock_get_socket.mock_calls)
|
], self.mock_get_socket.mock_calls)
|
||||||
self.assertEqual([
|
# strategy binding no longer drops privileges nor does
|
||||||
mock.call('bob'),
|
# clean_up_deemon_hygene() actions.
|
||||||
], self.mock_drop_privileges.mock_calls)
|
self.assertEqual([], self.mock_drop_privileges.mock_calls)
|
||||||
|
self.assertEqual([], self.mock_clean_up_daemon_hygene.mock_calls)
|
||||||
|
|
||||||
self.mock_get_socket.side_effect = wsgi.ConfigFilePortError()
|
self.mock_get_socket.side_effect = wsgi.ConfigFilePortError()
|
||||||
|
|
||||||
@ -1643,7 +1627,14 @@ class TestWorkersStrategy(unittest.TestCase):
|
|||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
mock.call.shutdown_safe(self.mock_get_socket.return_value),
|
mock.call.shutdown_safe(self.mock_get_socket.return_value),
|
||||||
], mock_greenio.mock_calls)
|
], mock_greenio.mock_calls)
|
||||||
|
if six.PY2:
|
||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
|
mock.call.__nonzero__(),
|
||||||
|
mock.call.close(),
|
||||||
|
], self.mock_get_socket.return_value.mock_calls)
|
||||||
|
else:
|
||||||
|
self.assertEqual([
|
||||||
|
mock.call.__bool__(),
|
||||||
mock.call.close(),
|
mock.call.close(),
|
||||||
], self.mock_get_socket.return_value.mock_calls)
|
], self.mock_get_socket.return_value.mock_calls)
|
||||||
|
|
||||||
|
5
tox.ini
5
tox.ini
@ -9,12 +9,13 @@ install_command = pip install -U {opts} {packages}
|
|||||||
setenv = VIRTUAL_ENV={envdir}
|
setenv = VIRTUAL_ENV={envdir}
|
||||||
NOSE_WITH_COVERAGE=1
|
NOSE_WITH_COVERAGE=1
|
||||||
NOSE_COVER_BRANCHES=1
|
NOSE_COVER_BRANCHES=1
|
||||||
|
NOSE_COVER_HTML_DIR={toxinidir}/cover
|
||||||
deps =
|
deps =
|
||||||
-c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master}
|
-c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master}
|
||||||
-r{toxinidir}/requirements.txt
|
-r{toxinidir}/requirements.txt
|
||||||
-r{toxinidir}/test-requirements.txt
|
-r{toxinidir}/test-requirements.txt
|
||||||
commands = find . ( -type f -o -type l ) -name "*.py[co]" -delete
|
commands = find {envdir} ( -type f -o -type l ) -name "*.py[co]" -delete
|
||||||
find . -type d -name "__pycache__" -delete
|
find {envdir} -type d -name "__pycache__" -delete
|
||||||
nosetests {posargs:test/unit}
|
nosetests {posargs:test/unit}
|
||||||
whitelist_externals = find
|
whitelist_externals = find
|
||||||
rm
|
rm
|
||||||
|
Loading…
Reference in New Issue
Block a user