From ed45e5aa6dbfa6d956a67e0fa0dd2fa6557f0225 Mon Sep 17 00:00:00 2001 From: Yuriy Taraday Date: Thu, 20 Mar 2014 16:34:50 +0400 Subject: [PATCH] Add an option to run rootwrap as a daemon Benchmark for daemon mode is to be added in next commit. Current results on my machine for 'ip a': method : min avg max dev ip a : 3.701ms 4.966ms 12.720ms 1.206ms sudo ip a : 11.531ms 13.538ms 19.685ms 1.525ms sudo rootwrap conf ip a : 92.253ms 119.431ms 211.236ms 17.948ms daemon.run('ip a') : 6.718ms 9.914ms 189.873ms 18.201ms Implements: blueprint rootwrap-daemon-mode Change-Id: Iace26738f910a18a5d1d3479fad949027e5a3816 --- oslo/rootwrap/client.py | 144 +++++++++++++++++++++ oslo/rootwrap/cmd.py | 24 +++- oslo/rootwrap/daemon.py | 151 ++++++++++++++++++++++ oslo/rootwrap/filters.py | 10 +- oslo/rootwrap/jsonrpc.py | 208 ++++++++++++++++++++++++++++++ oslo/rootwrap/wrapper.py | 5 +- test-requirements-py3.txt | 20 +++ test-requirements.txt | 3 + tests/run_daemon.py | 57 ++++++++ tests/test_functional.py | 166 ++++++++++++++++++++++++ tests/test_functional_eventlet.py | 31 +++++ tox.ini | 14 +- 12 files changed, 821 insertions(+), 12 deletions(-) create mode 100644 oslo/rootwrap/client.py create mode 100644 oslo/rootwrap/daemon.py create mode 100644 oslo/rootwrap/jsonrpc.py create mode 100644 test-requirements-py3.txt create mode 100644 tests/run_daemon.py create mode 100644 tests/test_functional_eventlet.py diff --git a/oslo/rootwrap/client.py b/oslo/rootwrap/client.py new file mode 100644 index 0000000..e9930a0 --- /dev/null +++ b/oslo/rootwrap/client.py @@ -0,0 +1,144 @@ +# Copyright (c) 2014 Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +from multiprocessing import managers +from multiprocessing import util as mp_util +import os +import subprocess +import threading +import weakref + +try: + import eventlet.patcher +except ImportError: + patched_socket = False +else: + # In tests patching happens later, so we'll rely on environment variable + patched_socket = (eventlet.patcher.is_monkey_patched('socket') or + os.environ.get('TEST_EVENTLET', False)) + +from oslo.rootwrap import daemon +from oslo.rootwrap import jsonrpc + +if patched_socket: + # We have to use slow version of recvall with eventlet because of a bug in + # GreenSocket.recv_into: + # https://bitbucket.org/eventlet/eventlet/pull-request/41 + # This check happens here instead of jsonrpc to avoid importing eventlet + # from daemon code that is run with root priviledges. + jsonrpc.JsonConnection.recvall = jsonrpc.JsonConnection._recvall_slow + +try: + finalize = weakref.finalize +except AttributeError: + def finalize(obj, func, *args, **kwargs): + return mp_util.Finalize(obj, func, args=args, kwargs=kwargs, + exitpriority=0) + +ClientManager = daemon.get_manager_class() +LOG = logging.getLogger(__name__) + + +class Client(object): + def __init__(self, rootwrap_daemon_cmd): + self._start_command = rootwrap_daemon_cmd + self._initialized = False + self._mutex = threading.Lock() + self._manager = None + self._proxy = None + self._process = None + self._finalize = None + + def _initialize(self): + if self._process is not None and self._process.poll() is not None: + LOG.warning("Leaving behind already spawned process with pid %d, " + "root should kill it if it's still there (I can't)", + self._process.pid) + + process_obj = subprocess.Popen(self._start_command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + LOG.info("Spawned new rootwrap daemon process with pid=%d", + process_obj.pid) + + self._process = process_obj + socket_path = process_obj.stdout.readline()[:-1] + # For Python 3 we need to convert bytes to str here + if not isinstance(socket_path, str): + socket_path = socket_path.decode('utf-8') + authkey = process_obj.stdout.read(32) + if process_obj.poll() is not None: + stderr = process_obj.stderr.read() + # NOTE(yorik-sar): don't expose stdout here + raise Exception("Failed to spawn rootwrap process.\nstderr:\n%s" % + (stderr,)) + self._manager = ClientManager(socket_path, authkey) + self._manager.connect() + self._proxy = self._manager.rootwrap() + self._finalize = finalize(self, self._shutdown, self._process, + self._manager) + self._initialized = True + + @staticmethod + def _shutdown(process, manager, JsonClient=jsonrpc.JsonClient): + # Storing JsonClient in arguments because globals are set to None + # before executing atexit routines in Python 2.x + if process.poll() is None: + LOG.info('Stopping rootwrap daemon process with pid=%s', + process.pid) + try: + manager.rootwrap().shutdown() + except (EOFError, IOError): + pass # assume it is dead already + # We might want to wait for process to exit or kill it, but we + # can't provide sane timeout on 2.x and we most likely don't have + # permisions to do so + # Invalidate manager's state so that proxy won't try to do decref + manager._state.value = managers.State.SHUTDOWN + + def _ensure_initialized(self): + with self._mutex: + if not self._initialized: + self._initialize() + + def _restart(self, proxy): + with self._mutex: + assert self._initialized + # Verify if someone has already restarted this. + if self._proxy is proxy: + self._finalize() + self._manager = None + self._proxy = None + self._initialized = False + self._initialize() + return self._proxy + + def execute(self, cmd, env=None, stdin=None): + self._ensure_initialized() + proxy = self._proxy + retry = False + try: + res = proxy.run_one_command(cmd, env, stdin) + except (EOFError, IOError): + retry = True + # res can be None if we received final None sent by dying server thread + # instead of response to our request. Process is most likely to be dead + # at this point. + if retry or res is None: + proxy = self._restart(proxy) + res = proxy.run_one_command(cmd, env, stdin) + return res diff --git a/oslo/rootwrap/cmd.py b/oslo/rootwrap/cmd.py index 5578e6d..fe92054 100644 --- a/oslo/rootwrap/cmd.py +++ b/oslo/rootwrap/cmd.py @@ -37,6 +37,7 @@ import sys from six import moves +from oslo.rootwrap import daemon as daemon_mod from oslo.rootwrap import wrapper RC_UNAUTHORIZED = 99 @@ -52,14 +53,23 @@ def _exit_error(execname, message, errorcode, log=True): sys.exit(errorcode) -def main(): +def daemon(): + return main(run_daemon=True) + + +def main(run_daemon=False): # Split arguments, require at least a command execname = sys.argv.pop(0) - if len(sys.argv) < 2: - _exit_error(execname, "No command specified", RC_NOCOMMAND, log=False) + if run_daemon: + if len(sys.argv) != 1: + _exit_error(execname, "Extra arguments to daemon", RC_NOCOMMAND, + log=False) + else: + if len(sys.argv) < 2: + _exit_error(execname, "No command specified", RC_NOCOMMAND, + log=False) configfile = sys.argv.pop(0) - userargs = sys.argv[:] # Load configuration try: @@ -79,7 +89,11 @@ def main(): config.syslog_log_level) filters = wrapper.load_filters(config.filters_path) - run_one_command(execname, config, filters, userargs) + + if run_daemon: + daemon_mod.daemon_start(config, filters) + else: + run_one_command(execname, config, filters, sys.argv) def run_one_command(execname, config, filters, userargs): diff --git a/oslo/rootwrap/daemon.py b/oslo/rootwrap/daemon.py new file mode 100644 index 0000000..fbb4086 --- /dev/null +++ b/oslo/rootwrap/daemon.py @@ -0,0 +1,151 @@ +# Copyright (c) 2014 Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import print_function + +import functools +import logging +from multiprocessing import managers +import os +import shutil +import signal +import stat +import subprocess +import sys +import tempfile +import threading + +from oslo.rootwrap import jsonrpc +from oslo.rootwrap import wrapper + +LOG = logging.getLogger(__name__) + +# Since multiprocessing supports only pickle and xmlrpclib for serialization of +# RPC requests and responses, we declare another 'jsonrpc' serializer + +managers.listener_client['jsonrpc'] = jsonrpc.JsonListener, jsonrpc.JsonClient + + +class RootwrapClass(object): + def __init__(self, config, filters): + self.config = config + self.filters = filters + + def run_one_command(self, userargs, env=None, stdin=None): + if env is None: + env = {} + + obj = wrapper.start_subprocess( + self.filters, userargs, + exec_dirs=self.config.exec_dirs, + log=self.config.use_syslog, + close_fds=True, + env=env, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = obj.communicate(stdin) + return obj.returncode, out, err + + def shutdown(self): + # Suicide to force break of the main thread + os.kill(os.getpid(), signal.SIGINT) + + +def get_manager_class(config=None, filters=None): + class RootwrapManager(managers.BaseManager): + def __init__(self, address=None, authkey=None): + # Force jsonrpc because neither pickle nor xmlrpclib is secure + super(RootwrapManager, self).__init__(address, authkey, + serializer='jsonrpc') + + if config is not None: + partial_class = functools.partial(RootwrapClass, config, filters) + RootwrapManager.register('rootwrap', partial_class) + else: + RootwrapManager.register('rootwrap') + + return RootwrapManager + + +def daemon_start(config, filters): + temp_dir = tempfile.mkdtemp(prefix='rootwrap-') + LOG.debug("Created temporary directory %s", temp_dir) + try: + # allow everybody to find the socket + rwxr_xr_x = (stat.S_IRWXU | + stat.S_IRGRP | stat.S_IXGRP | + stat.S_IROTH | stat.S_IXOTH) + os.chmod(temp_dir, rwxr_xr_x) + socket_path = os.path.join(temp_dir, "rootwrap.sock") + LOG.debug("Will listen on socket %s", socket_path) + manager_cls = get_manager_class(config, filters) + manager = manager_cls(address=socket_path) + server = manager.get_server() + # allow everybody to connect to the socket + rw_rw_rw_ = (stat.S_IRUSR | stat.S_IWUSR | + stat.S_IRGRP | stat.S_IWGRP | + stat.S_IROTH | stat.S_IWOTH) + os.chmod(socket_path, rw_rw_rw_) + try: + # In Python 3 we have to use buffer to push in bytes directly + stdout = sys.stdout.buffer + except AttributeError: + stdout = sys.stdout + stdout.write(socket_path.encode('utf-8')) + stdout.write(b'\n') + stdout.write(bytes(server.authkey)) + sys.stdin.close() + sys.stdout.close() + sys.stderr.close() + # Gracefully shutdown on INT or TERM signals + stop = functools.partial(daemon_stop, server) + signal.signal(signal.SIGTERM, stop) + signal.signal(signal.SIGINT, stop) + LOG.info("Starting rootwrap daemon main loop") + server.serve_forever() + finally: + conn = server.listener + # This will break accept() loop with EOFError if it was not in the main + # thread (as in Python 3.x) + conn.close() + # Closing all currently connected client sockets for reading to break + # worker threads blocked on recv() + for cl_conn in conn.get_accepted(): + try: + cl_conn.half_close() + except Exception: + # Most likely the socket have already been closed + LOG.debug("Failed to close connection") + LOG.info("Waiting for all client threads to finish.") + for thread in threading.enumerate(): + if thread.daemon: + LOG.debug("Joining thread %s", thread) + thread.join() + LOG.debug("Removing temporary directory %s", temp_dir) + shutil.rmtree(temp_dir) + + +def daemon_stop(server, signal, frame): + LOG.info("Got signal %s. Shutting down server", signal) + # Signals are caught in the main thread which means this handler will run + # in the middle of serve_forever() loop. It will catch this exception and + # properly return. Since all threads created by server_forever are + # daemonic, we need to join them afterwards. In Python 3 we can just hit + # stop_event instead. + try: + server.stop_event.set() + except AttributeError: + raise KeyboardInterrupt diff --git a/oslo/rootwrap/filters.py b/oslo/rootwrap/filters.py index 1d5da26..b8747ae 100644 --- a/oslo/rootwrap/filters.py +++ b/oslo/rootwrap/filters.py @@ -57,9 +57,9 @@ class CommandFilter(object): return ['sudo', '-u', self.run_as, to_exec] + userargs[1:] return [to_exec] + userargs[1:] - def get_environment(self, userargs): + def get_environment(self, userargs, env=None): """Returns specific environment to set, None if none.""" - return None + return env class RegExpFilter(CommandFilter): @@ -277,8 +277,10 @@ class EnvFilter(CommandFilter): to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path return [to_exec] + self.exec_args(userargs)[1:] - def get_environment(self, userargs): - env = os.environ.copy() + def get_environment(self, userargs, env=None): + if env is None: + env = os.environ + env = env.copy() # ignore leading 'env' if userargs[0] == 'env': diff --git a/oslo/rootwrap/jsonrpc.py b/oslo/rootwrap/jsonrpc.py new file mode 100644 index 0000000..9c81a1d --- /dev/null +++ b/oslo/rootwrap/jsonrpc.py @@ -0,0 +1,208 @@ +# Copyright (c) 2014 Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import base64 +import errno +import json +from multiprocessing import connection +from multiprocessing import managers +import socket +import struct +import weakref + +from oslo.rootwrap import wrapper + + +class RpcJSONEncoder(json.JSONEncoder): + def default(self, o): + # We need to pass bytes unchanged as they are expected in arguments for + # and are result of Popen.communicate() + if isinstance(o, bytes): + return {"__bytes__": base64.b64encode(o).decode('ascii')} + # Handle two exception types relevant to command execution + if isinstance(o, wrapper.NoFilterMatched): + return {"__exception__": "NoFilterMatched"} + elif isinstance(o, wrapper.FilterMatchNotExecutable): + return {"__exception__": "FilterMatchNotExecutable", + "match": o.match} + # Other errors will fail to pass JSON encoding and will be visible on + # client side + else: + return super(RpcJSONEncoder, self).default(o) + + +# Parse whatever RpcJSONEncoder supplied us with +def rpc_object_hook(obj): + if "__exception__" in obj: + type_name = obj.pop("__exception__") + if type_name not in ("NoFilterMatched", "FilterMatchNotExecutable"): + return obj + exc_type = getattr(wrapper, type_name) + return exc_type(**obj) + elif "__bytes__" in obj: + return base64.b64decode(obj["__bytes__"].encode('ascii')) + else: + return obj + + +class JsonListener(object): + def __init__(self, address, backlog=1): + self.address = address + self._socket = socket.socket(socket.AF_UNIX) + try: + self._socket.setblocking(True) + self._socket.bind(address) + self._socket.listen(backlog) + except socket.error: + self._socket.close() + raise + self.closed = False + # Python 2.6 doesn't have WeakSet + self._accepted = weakref.WeakKeyDictionary() + + def accept(self): + while True: + try: + s, _ = self._socket.accept() + except socket.error as e: + if e.errno in (errno.EINVAL, errno.EBADF): + raise EOFError + elif e.errno != errno.EINTR: + raise + else: + break + s.setblocking(True) + conn = JsonConnection(s) + self._accepted[conn] = None + return conn + + def close(self): + if not self.closed: + self._socket.shutdown(socket.SHUT_RDWR) + self._socket.close() + self.closed = True + + def get_accepted(self): + return list(self._accepted) + +if hasattr(managers.Server, 'accepter'): + # In Python 3 accepter() thread has infinite loop. We break it with + # EOFError, so we should silence this error here. + def silent_accepter(self): + try: + old_accepter(self) + except EOFError: + pass + old_accepter = managers.Server.accepter + managers.Server.accepter = silent_accepter + +try: + memoryview +except NameError: + has_memoryview = False +else: + has_memoryview = True + + +class JsonConnection(object): + def __init__(self, sock): + sock.setblocking(True) + self._socket = sock + + def send_bytes(self, s): + self._socket.sendall(struct.pack('!Q', len(s))) + self._socket.sendall(s) + + def recv_bytes(self, maxsize=None): + l = struct.unpack('!Q', self.recvall(8))[0] + if maxsize is not None and l > maxsize: + raise RuntimeError("Too big message received") + s = self.recvall(l) + return s + + def send(self, obj): + s = self.dumps(obj) + self.send_bytes(s) + + def recv(self): + s = self.recv_bytes() + return self.loads(s) + + def close(self): + self._socket.close() + + def half_close(self): + self._socket.shutdown(socket.SHUT_RD) + + # Unfortunatelly Python 2.6 doesn't support memoryview, so we'll have + # to do it the slow way. + def _recvall_slow(self, size): + remaining = size + res = [] + while remaining: + piece = self._socket.recv(remaining) + if not piece: + raise EOFError + res.append(piece) + remaining -= len(piece) + return b''.join(res) + + # For all later versions we can do it almost like in C + def _recvall_fast(self, size): + buf = bytearray(size) + mem = memoryview(buf) + got = 0 + while got < size: + piece_size = self._socket.recv_into(mem[got:]) + if not piece_size: + raise EOFError + got += piece_size + # bytearray is mostly compatible with bytes and we could avoid copying + # data here, but hmac doesn't like it in Python 3.3 (not in 2.7 or 3.4) + return bytes(buf) + + if has_memoryview: + recvall = _recvall_fast + else: + recvall = _recvall_slow + + @staticmethod + def dumps(obj): + return json.dumps(obj, cls=RpcJSONEncoder).encode('utf-8') + + @staticmethod + def loads(s): + res = json.loads(s.decode('utf-8'), object_hook=rpc_object_hook) + try: + kind = res[0] + except (IndexError, TypeError): + pass + else: + # In Python 2 json returns unicode while multiprocessing needs str + if (kind in ("#TRACEBACK", "#UNSERIALIZABLE") and + not isinstance(res[1], str)): + res[1] = res[1].encode('utf-8', 'replace') + return res + + +class JsonClient(JsonConnection): + def __init__(self, address, authkey=None): + sock = socket.socket(socket.AF_UNIX) + sock.setblocking(True) + sock.connect(address) + super(JsonClient, self).__init__(sock) + if authkey is not None: + connection.answer_challenge(self, authkey) + connection.deliver_challenge(self, authkey) diff --git a/oslo/rootwrap/wrapper.py b/oslo/rootwrap/wrapper.py index 425a022..553eaa9 100644 --- a/oslo/rootwrap/wrapper.py +++ b/oslo/rootwrap/wrapper.py @@ -190,7 +190,8 @@ def _getlogin(): os.getenv('LOGNAME')) -def start_subprocess(filter_list, userargs, exec_dirs=[], log=False, **kwargs): +def start_subprocess(filter_list, userargs, exec_dirs=[], log=False, + env=None, **kwargs): filtermatch = match_filter(filter_list, userargs, exec_dirs) command = filtermatch.get_command(userargs, exec_dirs) @@ -201,6 +202,6 @@ def start_subprocess(filter_list, userargs, exec_dirs=[], log=False, **kwargs): obj = subprocess.Popen(command, preexec_fn=_subprocess_setup, - env=filtermatch.get_environment(userargs), + env=filtermatch.get_environment(userargs, env=env), **kwargs) return obj diff --git a/test-requirements-py3.txt b/test-requirements-py3.txt new file mode 100644 index 0000000..82ecdd0 --- /dev/null +++ b/test-requirements-py3.txt @@ -0,0 +1,20 @@ +hacking>=0.9.2,<0.10 + +discover +fixtures>=0.3.14 +python-subunit +testrepository>=0.0.17 +testscenarios>=0.4 +testtools>=0.9.32 + +# when we can require tox>= 1.4, this can go into tox.ini: +# [testenv:cover] +# deps = {[testenv]deps} coverage +coverage>=3.6 + +# mocking framework +mock>=1.0 + +# rootwrap daemon's client should be verified to run in eventlet +# not available for Python 3.x +# eventlet>=0.13.0 diff --git a/test-requirements.txt b/test-requirements.txt index b639ae4..77122fc 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -14,3 +14,6 @@ coverage>=3.6 # mocking framework mock>=1.0 + +# rootwrap daemon's client should be verified to run in eventlet +eventlet>=0.13.0 diff --git a/tests/run_daemon.py b/tests/run_daemon.py new file mode 100644 index 0000000..fa1f3df --- /dev/null +++ b/tests/run_daemon.py @@ -0,0 +1,57 @@ +# Copyright (c) 2014 Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import subprocess +import sys +import threading + +from oslo.rootwrap import cmd + + +def forward_stream(fr, to): + while True: + line = fr.readline() + if not line: + break + to.write(line) + + +def forwarding_popen(f, old_popen=subprocess.Popen): + def popen(*args, **kwargs): + p = old_popen(*args, **kwargs) + t = threading.Thread(target=forward_stream, args=(p.stderr, f)) + t.daemon = True + t.start() + return p + return popen + + +class nonclosing(object): + def __init__(self, f): + self._f = f + + def __getattr__(self, name): + return getattr(self._f, name) + + def close(self): + pass + +log_format = ("%(asctime)s | [%(process)5s]+%(levelname)5s | " + "%(message)s") +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG, format=log_format) + sys.stderr = nonclosing(sys.stderr) + cmd.daemon() diff --git a/tests/test_functional.py b/tests/test_functional.py index e9d3419..e3d17b0 100644 --- a/tests/test_functional.py +++ b/tests/test_functional.py @@ -13,14 +13,36 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib +import io +import logging import os +import signal import subprocess import sys +import threading + +try: + import StringIO +except ImportError: + StringIO = io.StringIO +else: + StringIO = StringIO.StringIO + +try: + import eventlet +except ImportError: + eventlet = None import fixtures +import mock import testtools from testtools import content +from oslo.rootwrap import client +from oslo.rootwrap import wrapper +from tests import run_daemon + class _FunctionalBase(object): def setUp(self): @@ -38,6 +60,7 @@ exec_dirs=/bin""" % (filters_dir,)) f.write("""[Filters] echo: CommandFilter, /bin/echo, root cat: CommandFilter, /bin/cat, root +sh: CommandFilter, /bin/sh, root """) def test_run_once(self): @@ -74,3 +97,146 @@ class RootwrapTest(_FunctionalBase, testtools.TestCase): self.addDetail('stderr', content.text_content(err.decode('utf-8', 'replace'))) return proc.returncode, out, err + + +class RootwrapDaemonTest(_FunctionalBase, testtools.TestCase): + def assert_unpatched(self): + # We need to verify that these tests are run without eventlet patching + if eventlet and eventlet.patcher.is_monkey_patched('socket'): + self.fail("Standard library should not be patched by eventlet" + " for this test") + + def setUp(self): + self.assert_unpatched() + + super(RootwrapDaemonTest, self).setUp() + + # Collect daemon logs + daemon_log = io.BytesIO() + p = mock.patch('subprocess.Popen', + run_daemon.forwarding_popen(daemon_log)) + p.start() + self.addCleanup(p.stop) + + # Collect client logs + client_log = StringIO() + handler = logging.StreamHandler(client_log) + log_format = run_daemon.log_format.replace('+', ' ') + handler.setFormatter(logging.Formatter(log_format)) + logger = logging.getLogger('oslo.rootwrap') + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + self.addCleanup(logger.removeHandler, handler) + + # Add all logs as details + @self.addCleanup + def add_logs(): + self.addDetail('daemon_log', content.Content( + content.UTF8_TEXT, + lambda: [daemon_log.getvalue()])) + self.addDetail('client_log', content.Content( + content.UTF8_TEXT, + lambda: [client_log.getvalue().encode('utf-8')])) + + # Create client + self.client = client.Client([ + sys.executable, run_daemon.__file__, + self.config_file]) + + # _finalize is set during Client.execute() + @self.addCleanup + def finalize_client(): + if self.client._initialized: + self.client._finalize() + + self.execute = self.client.execute + + def test_error_propagation(self): + self.assertRaises(wrapper.NoFilterMatched, self.execute, ['other']) + + def test_daemon_ressurection(self): + # Let the client start a daemon + self.execute(['cat']) + # Make daemon go away + os.kill(self.client._process.pid, signal.SIGTERM) + # Expect client to succesfully restart daemon and run simple request + self.test_run_once() + + def test_env_setting(self): + code, out, err = self.execute(['sh', '-c', 'echo $SOMEVAR'], + env={'SOMEVAR': 'teststr'}) + self.assertEqual(0, code) + self.assertEqual(b'teststr\n', out) + self.assertEqual(b'', err) + + def _exec_thread(self, fifo_path): + try: + # Run a shell script that signals calling process through FIFO and + # then hangs around for 1 sec + self._thread_res = self.execute([ + 'sh', '-c', 'echo > "%s"; sleep 1; echo OK' % fifo_path]) + except Exception as e: + self._thread_res = e + + def test_graceful_death(self): + # Create a fifo in a temporary dir + tmpdir = self.useFixture(fixtures.TempDir()).path + fifo_path = os.path.join(tmpdir, 'fifo') + os.mkfifo(fifo_path) + # Start daemon + self.execute(['cat']) + # Begin executing shell script + t = threading.Thread(target=self._exec_thread, args=(fifo_path,)) + t.start() + # Wait for shell script to actually start + with open(fifo_path) as f: + f.readline() + # Gracefully kill daemon process + os.kill(self.client._process.pid, signal.SIGTERM) + # Expect daemon to wait for our request to finish + t.join() + if isinstance(self._thread_res, Exception): + raise self._thread_res # Python 3 will even provide nice traceback + code, out, err = self._thread_res + self.assertEqual(0, code) + self.assertEqual(b'OK\n', out) + self.assertEqual(b'', err) + + @contextlib.contextmanager + def _test_daemon_cleanup(self): + # Start a daemon + self.execute(['cat']) + socket_path = self.client._manager._address + # Stop it one way or another + yield + process = self.client._process + stop = threading.Event() + + # Start background thread that would kill process in 1 second if it + # doesn't die by then + def sleep_kill(): + stop.wait(1) + if not stop.is_set(): + os.kill(process.pid, signal.SIGKILL) + threading.Thread(target=sleep_kill).start() + # Wait for process to finish one way or another + self.client._process.wait() + # Notify background thread that process is dead (no need to kill it) + stop.set() + # Fail if the process got killed by the background thread + self.assertNotEqual(-signal.SIGKILL, process.returncode, + "Server haven't stopped in one second") + # Verify that socket is deleted + self.assertFalse(os.path.exists(socket_path), + "Server didn't remove its temporary directory") + + def test_daemon_cleanup_client(self): + # Run _test_daemon_cleanup stopping daemon as Client instance would + # normally do + with self._test_daemon_cleanup(): + self.client._finalize() + + def test_daemon_cleanup_signal(self): + # Run _test_daemon_cleanup stopping daemon with SIGTERM signal + with self._test_daemon_cleanup(): + os.kill(self.client._process.pid, signal.SIGTERM) diff --git a/tests/test_functional_eventlet.py b/tests/test_functional_eventlet.py new file mode 100644 index 0000000..c94bc69 --- /dev/null +++ b/tests/test_functional_eventlet.py @@ -0,0 +1,31 @@ +# Copyright (c) 2014 Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +if os.environ.get('TEST_EVENTLET', False): + import eventlet + eventlet.monkey_patch() + + from tests import test_functional + + class RootwrapDaemonTest(test_functional.RootwrapDaemonTest): + def assert_unpatched(self): + # This test case is specifically for eventlet testing + pass + + def test_graceful_death(self): + # This test fails with eventlet on Python 2.6.6 on CentOS + self.skip("Eventlet doesn't like FIFOs") diff --git a/tox.ini b/tox.ini index 39b563c..b45b38e 100644 --- a/tox.ini +++ b/tox.ini @@ -9,7 +9,19 @@ install_command = pip install -U {opts} {packages} setenv = VIRTUAL_ENV={envdir} deps = -r{toxinidir}/requirements.txt -r{toxinidir}/test-requirements.txt -commands = python setup.py testr --slowest --testr-args='{posargs}' +# Functional tests with Eventlet involve monkeypatching, so force them to be +# run in a separate process +whitelist_externals = env +commands = + python setup.py testr --slowest --testr-args='(?!tests.test_functional_eventlet)tests {posargs}' + env TEST_EVENTLET=1 python setup.py testr --slowest --testr-args='tests.test_functional_eventlet' + +[testenv:py33] +deps = -r{toxinidir}/requirements.txt + -r{toxinidir}/test-requirements-py3.txt +# Eventlet is not available here +commands = + python setup.py testr --slowest --testr-args='{posargs}' [testenv:pep8] commands = flake8