wangzihao 81352d08e9 Remove six.PY3
The Python 2.7 Support has been dropped since Ussuri.
So remove hacking rules for compatibility between python 2 and 3.

Change-Id: Idd6a1efd1857a70fc7d960b365bfc58d6cbe00d3
2020-09-30 16:34:20 +08:00

202 lines
7.1 KiB
Python

# Copyright (c) 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import logging
from multiprocessing import managers
import os
import shutil
import signal
import stat
import sys
import tempfile
import threading
import time
from oslo_rootwrap import cmd
from oslo_rootwrap import jsonrpc
from oslo_rootwrap import subprocess
from oslo_rootwrap import wrapper
LOG = logging.getLogger(__name__)
# Since multiprocessing supports only pickle and xmlrpclib for serialization of
# RPC requests and responses, we declare another 'jsonrpc' serializer
managers.listener_client['jsonrpc'] = jsonrpc.JsonListener, jsonrpc.JsonClient
class RootwrapClass(object):
def __init__(self, config, filters):
self.config = config
self.filters = filters
self.reset_timer()
self.prepare_timer(config)
def run_one_command(self, userargs, stdin=None):
self.reset_timer()
try:
obj = wrapper.start_subprocess(
self.filters, userargs,
exec_dirs=self.config.exec_dirs,
log=self.config.use_syslog,
close_fds=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except wrapper.FilterMatchNotExecutable:
LOG.warning("Executable not found for: %s",
' '.join(userargs))
return cmd.RC_NOEXECFOUND, "", ""
except wrapper.NoFilterMatched:
LOG.warning("Unauthorized command: %s (no filter matched)",
' '.join(userargs))
return cmd.RC_UNAUTHORIZED, "", ""
if stdin is not None:
stdin = os.fsencode(stdin)
out, err = obj.communicate(stdin)
out = os.fsdecode(out)
err = os.fsdecode(err)
return obj.returncode, out, err
@classmethod
def reset_timer(cls):
cls.last_called = time.time()
@classmethod
def cancel_timer(cls):
try:
cls.timeout.cancel()
except RuntimeError:
pass
@classmethod
def prepare_timer(cls, config=None):
if config is not None:
cls.daemon_timeout = config.daemon_timeout
# Wait a bit longer to avoid rounding errors
timeout = max(
cls.last_called + cls.daemon_timeout - time.time(),
0) + 1
if getattr(cls, 'timeout', None):
# Another timer is already initialized
return
cls.timeout = threading.Timer(timeout, cls.handle_timeout)
cls.timeout.start()
@classmethod
def handle_timeout(cls):
if cls.last_called < time.time() - cls.daemon_timeout:
cls.shutdown()
cls.prepare_timer()
@staticmethod
def shutdown():
# Suicide to force break of the main thread
os.kill(os.getpid(), signal.SIGINT)
def get_manager_class(config=None, filters=None):
class RootwrapManager(managers.BaseManager):
def __init__(self, address=None, authkey=None):
# Force jsonrpc because neither pickle nor xmlrpclib is secure
super(RootwrapManager, self).__init__(address, authkey,
serializer='jsonrpc')
if config is not None:
partial_class = functools.partial(RootwrapClass, config, filters)
RootwrapManager.register('rootwrap', partial_class)
else:
RootwrapManager.register('rootwrap')
return RootwrapManager
def daemon_start(config, filters):
temp_dir = tempfile.mkdtemp(prefix='rootwrap-')
LOG.debug("Created temporary directory %s", temp_dir)
try:
# allow everybody to find the socket
rwxr_xr_x = (stat.S_IRWXU |
stat.S_IRGRP | stat.S_IXGRP |
stat.S_IROTH | stat.S_IXOTH)
os.chmod(temp_dir, rwxr_xr_x)
socket_path = os.path.join(temp_dir, "rootwrap.sock")
LOG.debug("Will listen on socket %s", socket_path)
manager_cls = get_manager_class(config, filters)
manager = manager_cls(address=socket_path)
server = manager.get_server()
try:
# allow everybody to connect to the socket
rw_rw_rw_ = (stat.S_IRUSR | stat.S_IWUSR |
stat.S_IRGRP | stat.S_IWGRP |
stat.S_IROTH | stat.S_IWOTH)
os.chmod(socket_path, rw_rw_rw_)
try:
# In Python 3 we have to use buffer to push in bytes directly
stdout = sys.stdout.buffer
except AttributeError:
stdout = sys.stdout
stdout.write(socket_path.encode('utf-8'))
stdout.write(b'\n')
stdout.write(bytes(server.authkey))
sys.stdin.close()
sys.stdout.close()
sys.stderr.close()
# Gracefully shutdown on INT or TERM signals
stop = functools.partial(daemon_stop, server)
signal.signal(signal.SIGTERM, stop)
signal.signal(signal.SIGINT, stop)
LOG.info("Starting rootwrap daemon main loop")
server.serve_forever()
finally:
conn = server.listener
# This will break accept() loop with EOFError if it was not in the
# main thread (as in Python 3.x)
conn.close()
# Closing all currently connected client sockets for reading to
# break worker threads blocked on recv()
for cl_conn in conn.get_accepted():
try:
cl_conn.half_close()
except Exception:
# Most likely the socket have already been closed
LOG.debug("Failed to close connection")
RootwrapClass.cancel_timer()
LOG.info("Waiting for all client threads to finish.")
for thread in threading.enumerate():
if thread.daemon:
LOG.debug("Joining thread %s", thread)
thread.join()
finally:
LOG.debug("Removing temporary directory %s", temp_dir)
shutil.rmtree(temp_dir)
def daemon_stop(server, signal, frame):
LOG.info("Got signal %s. Shutting down server", signal)
# Signals are caught in the main thread which means this handler will run
# in the middle of serve_forever() loop. It will catch this exception and
# properly return. Since all threads created by server_forever are
# daemonic, we need to join them afterwards. In Python 3 we can just hit
# stop_event instead.
try:
server.stop_event.set()
except AttributeError:
raise KeyboardInterrupt