Avoid calling sudo just to change users
When running a command as a user != root, the previous code would wrap the final command with an additional `sudo -u $user`. Apart from being heavyweight, using sudo here also resets permissions, etc preventing related efforts to use more advanced privilege mechanisms (like Linux capabilities or SELinux policies) This change simply calls os.setuid() in the child process instead. Change-Id: I3c4c21258d724a25c41a680ec7939869649269ed
This commit is contained in:
parent
fc41323e22
commit
37b8d376fc
@ -14,9 +14,15 @@
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import pwd
|
||||
import re
|
||||
|
||||
|
||||
def _getuid(user):
|
||||
"""Return uid for user."""
|
||||
return pwd.getpwnam(user).pw_uid
|
||||
|
||||
|
||||
class CommandFilter(object):
|
||||
"""Command filter only checking that the 1st argument matches exec_path."""
|
||||
|
||||
@ -48,13 +54,15 @@ class CommandFilter(object):
|
||||
"""Only check that the first argument (command) matches exec_path."""
|
||||
return userargs and os.path.basename(self.exec_path) == userargs[0]
|
||||
|
||||
def preexec(self):
|
||||
"""Setuid in subprocess right before command is invoked."""
|
||||
if self.run_as != 'root':
|
||||
os.setuid(_getuid(self.run_as))
|
||||
|
||||
def get_command(self, userargs, exec_dirs=None):
|
||||
"""Returns command to execute (with sudo -u if run_as != root)."""
|
||||
"""Returns command to execute."""
|
||||
exec_dirs = exec_dirs or []
|
||||
to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
|
||||
if (self.run_as != 'root'):
|
||||
# Used to run commands at lesser privileges
|
||||
return ['sudo', '-u', self.run_as, to_exec] + userargs[1:]
|
||||
return [to_exec] + userargs[1:]
|
||||
|
||||
def get_environment(self, userargs):
|
||||
|
@ -17,6 +17,7 @@ import contextlib
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
@ -61,6 +62,7 @@ exec_dirs=/bin""" % (filters_dir,))
|
||||
echo: CommandFilter, /bin/echo, root
|
||||
cat: CommandFilter, /bin/cat, root
|
||||
sh: CommandFilter, /bin/sh, root
|
||||
id: CommandFilter, /usr/bin/id, nobody
|
||||
""")
|
||||
|
||||
def test_run_once(self):
|
||||
@ -75,6 +77,18 @@ sh: CommandFilter, /bin/sh, root
|
||||
self.assertEqual(b'teststr', out)
|
||||
self.assertEqual(b'', err)
|
||||
|
||||
def test_run_as(self):
|
||||
if os.getuid() != 0:
|
||||
self.skip('Test requires root (for setuid)')
|
||||
|
||||
# Should run as 'nobody'
|
||||
code, out, err = self.execute(['id', '-u'])
|
||||
self.assertEqual(out, '%s\n' % pwd.getpwnam('nobody').pw_uid)
|
||||
|
||||
# Should run as 'root'
|
||||
code, out, err = self.execute(['sh', '-c', 'id -u'])
|
||||
self.assertEqual(out, '0\n')
|
||||
|
||||
|
||||
class RootwrapTest(_FunctionalBase, testtools.TestCase):
|
||||
def setUp(self):
|
||||
|
@ -175,12 +175,6 @@ def match_filter(filter_list, userargs, exec_dirs=None):
|
||||
raise NoFilterMatched()
|
||||
|
||||
|
||||
def _subprocess_setup():
|
||||
# Python installs a SIGPIPE handler by default. This is usually not what
|
||||
# non-Python subprocesses expect.
|
||||
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
||||
|
||||
|
||||
def _getlogin():
|
||||
try:
|
||||
return os.getlogin()
|
||||
@ -199,8 +193,14 @@ def start_subprocess(filter_list, userargs, exec_dirs=[], log=False, **kwargs):
|
||||
_getlogin(), pwd.getpwuid(os.getuid())[0],
|
||||
command, filtermatch.name))
|
||||
|
||||
def preexec():
|
||||
# Python installs a SIGPIPE handler by default. This is
|
||||
# usually not what non-Python subprocesses expect.
|
||||
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
||||
filtermatch.preexec()
|
||||
|
||||
obj = subprocess.Popen(command,
|
||||
preexec_fn=_subprocess_setup,
|
||||
preexec_fn=preexec,
|
||||
env=filtermatch.get_environment(userargs),
|
||||
**kwargs)
|
||||
return obj
|
||||
|
Loading…
x
Reference in New Issue
Block a user