Use threads to process target function
This patch takes advantages of threads to make privsep daemon process concurrently. Change-Id: Ib20b27d5ea07bd0af61891c7d8c0d352a393aa21
This commit is contained in:
parent
b033057372
commit
f368430f13
@ -43,6 +43,7 @@ The privsep daemon exits when the communication channel is closed,
|
||||
|
||||
'''
|
||||
|
||||
from concurrent import futures
|
||||
import enum
|
||||
import errno
|
||||
import io
|
||||
@ -64,6 +65,7 @@ import eventlet
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import importutils
|
||||
import six
|
||||
|
||||
from oslo_privsep._i18n import _
|
||||
from oslo_privsep import capabilities
|
||||
@ -352,6 +354,9 @@ class Daemon(object):
|
||||
self.user = context.conf.user
|
||||
self.group = context.conf.group
|
||||
self.caps = set(context.conf.capabilities)
|
||||
self.thread_pool = futures.ThreadPoolExecutor(
|
||||
context.conf.thread_pool_size)
|
||||
self.communication_error = None
|
||||
|
||||
def run(self):
|
||||
"""Run request loop. Sets up environment, then calls loop()"""
|
||||
@ -413,22 +418,75 @@ class Daemon(object):
|
||||
'inh': fmt_caps(inh),
|
||||
})
|
||||
|
||||
def _process_cmd(self, cmd, *args):
|
||||
def _process_cmd(self, msgid, cmd, *args):
|
||||
"""Executes the requested command in an execution thread.
|
||||
|
||||
This executes a call within a thread executor and returns the results
|
||||
of the execution.
|
||||
|
||||
:param msgid: The message identifier.
|
||||
:param cmd: The `Message` type indicating the command type.
|
||||
:param args: The function, args, and kwargs if a Message.CALL type.
|
||||
:return: A tuple of the return status, optional call output, and
|
||||
optional error information.
|
||||
"""
|
||||
if cmd == Message.PING:
|
||||
return (Message.PONG.value,)
|
||||
|
||||
elif cmd == Message.CALL:
|
||||
try:
|
||||
if cmd != Message.CALL:
|
||||
raise ProtocolError(_('Unknown privsep cmd: %s') % cmd)
|
||||
|
||||
# Extract the callable and arguments
|
||||
name, f_args, f_kwargs = args
|
||||
func = importutils.import_class(name)
|
||||
|
||||
if not self.context.is_entrypoint(func):
|
||||
msg = _('Invalid privsep function: %s not exported') % name
|
||||
raise NameError(msg)
|
||||
|
||||
ret = func(*f_args, **f_kwargs)
|
||||
return (Message.RET.value, ret)
|
||||
except Exception as e:
|
||||
LOG.debug(
|
||||
'privsep: Exception during request[%(msgid)s]: '
|
||||
'%(err)s', {'msgid': msgid, 'err': e}, exc_info=True)
|
||||
cls = e.__class__
|
||||
cls_name = '%s.%s' % (cls.__module__, cls.__name__)
|
||||
return (Message.ERR.value, cls_name, e.args)
|
||||
|
||||
raise ProtocolError(_('Unknown privsep cmd: %s') % cmd)
|
||||
def _create_done_callback(self, msgid):
|
||||
"""Creates a future callback to receive command execution results.
|
||||
|
||||
:param msgid: The message identifier.
|
||||
:return: A future reply callback.
|
||||
"""
|
||||
channel = self.channel
|
||||
|
||||
def _call_back(result):
|
||||
"""Future execution callback.
|
||||
|
||||
:param result: The `future` execution and its results.
|
||||
"""
|
||||
try:
|
||||
reply = result.result()
|
||||
LOG.debug('privsep: reply[%(msgid)s]: %(reply)s',
|
||||
{'msgid': msgid, 'reply': reply})
|
||||
channel.send((msgid, reply))
|
||||
except IOError:
|
||||
self.communication_error = sys.exc_info()
|
||||
except Exception as e:
|
||||
LOG.debug(
|
||||
'privsep: Exception during request[%(msgid)s]: '
|
||||
'%(err)s', {'msgid': msgid, 'err': e}, exc_info=True)
|
||||
cls = e.__class__
|
||||
cls_name = '%s.%s' % (cls.__module__, cls.__name__)
|
||||
reply = (Message.ERR.value, cls_name, e.args)
|
||||
try:
|
||||
channel.send((msgid, reply))
|
||||
except IOError:
|
||||
self.communication_error = sys.exc_info()
|
||||
|
||||
return _call_back
|
||||
|
||||
def loop(self):
|
||||
"""Main body of daemon request loop"""
|
||||
@ -439,27 +497,16 @@ class Daemon(object):
|
||||
self.context.set_client_mode(False)
|
||||
|
||||
for msgid, msg in self.channel:
|
||||
LOG.debug('privsep: request[%(msgid)s]: %(req)s',
|
||||
{'msgid': msgid, 'req': msg})
|
||||
try:
|
||||
reply = self._process_cmd(*msg)
|
||||
except Exception as e:
|
||||
LOG.debug(
|
||||
'privsep: Exception during request[%(msgid)s]: %(err)s',
|
||||
{'msgid': msgid, 'err': e}, exc_info=True)
|
||||
cls = e.__class__
|
||||
cls_name = '%s.%s' % (cls.__module__, cls.__name__)
|
||||
reply = (Message.ERR.value, cls_name, e.args)
|
||||
|
||||
try:
|
||||
LOG.debug('privsep: reply[%(msgid)s]: %(reply)s',
|
||||
{'msgid': msgid, 'reply': reply})
|
||||
self.channel.send((msgid, reply))
|
||||
except IOError as e:
|
||||
if e.errno == errno.EPIPE:
|
||||
error = self.communication_error
|
||||
if error:
|
||||
if error[1].errno == errno.EPIPE:
|
||||
# Write stream closed, exit loop
|
||||
break
|
||||
raise
|
||||
six.reraise(error)
|
||||
|
||||
# Submit the command for execution
|
||||
future = self.thread_pool.submit(self._process_cmd, msgid, *msg)
|
||||
future.add_done_callback(self._create_done_callback(msgid))
|
||||
|
||||
LOG.debug('Socket closed, shutting down privsep daemon')
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
import enum
|
||||
import functools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import shlex
|
||||
import sys
|
||||
|
||||
@ -48,6 +49,12 @@ OPTS = [
|
||||
type=types.List(CapNameOrInt), default=[],
|
||||
help=_('List of Linux capabilities retained by the privsep '
|
||||
'daemon.')),
|
||||
cfg.IntOpt('thread_pool_size',
|
||||
min=1,
|
||||
help=_("The number of threads available for privsep to "
|
||||
"concurrently run processes. Defaults to the number of "
|
||||
"CPU cores in the system."),
|
||||
default=multiprocessing.cpu_count()),
|
||||
cfg.StrOpt('helper_command',
|
||||
help=_('Command to invoke to start the privsep daemon if '
|
||||
'not using the "fork" method. '
|
||||
|
@ -149,6 +149,7 @@ class DaemonTest(base.BaseTestCase):
|
||||
context = mock.NonCallableMock()
|
||||
context.conf.user = 42
|
||||
context.conf.group = 84
|
||||
context.conf.thread_pool_size = 10
|
||||
context.conf.capabilities = [
|
||||
capabilities.CAP_SYS_ADMIN, capabilities.CAP_NET_ADMIN]
|
||||
|
||||
|
@ -0,0 +1,7 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Privsep now uses multithreading to allow concurrency in executing
|
||||
privileged commands. The number of concurrent threads defaults to the
|
||||
available CPU cores, but can be adjusted by the new ``thread_pool_size``
|
||||
config option.
|
Loading…
Reference in New Issue
Block a user