Import recent rootwrap features in local rootwrap
Import features developed in oslo-rootwrap during the Grizzly cycle (and recently in havana) into quantum-rootwrap. This is the first step toward using the common oslo-rootwrap into Quantum: the goal being to make both implementations converge, we should avoid making local changes (and push any required change into oslo-rootwrap instead from now on). New features include: - Optional logging (use_syslog) - Searching for executables in a specified binary path (exec_dirs) - New path-based PathFilter Those features required a refactoring in the way executables are matched and in configuration loading. Implements bp quantum-rootwrap-new-features Change-Id: Ia6a2e91c297ade471448dae0964adfd001a46086
This commit is contained in:
parent
2f3e340df0
commit
4056e8e605
@ -36,40 +36,45 @@
|
||||
node.
|
||||
"""
|
||||
|
||||
import ConfigParser
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
from __future__ import print_function
|
||||
|
||||
from quantum.common import utils
|
||||
import ConfigParser
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
RC_UNAUTHORIZED = 99
|
||||
RC_NOCOMMAND = 98
|
||||
RC_BADCONFIG = 97
|
||||
RC_NOEXECFOUND = 96
|
||||
|
||||
|
||||
def _subprocess_setup():
|
||||
# Python installs a SIGPIPE handler by default. This is usually not what
|
||||
# non-Python subprocesses expect.
|
||||
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
||||
|
||||
|
||||
def _exit_error(execname, message, errorcode, log=True):
|
||||
print("%s: %s" % (execname, message))
|
||||
if log:
|
||||
logging.error(message)
|
||||
sys.exit(errorcode)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Split arguments, require at least a command
|
||||
execname = sys.argv.pop(0)
|
||||
# argv[0] required; path to conf file
|
||||
if len(sys.argv) < 2:
|
||||
print "%s: %s" % (execname, "No command specified")
|
||||
sys.exit(RC_NOCOMMAND)
|
||||
_exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
|
||||
|
||||
configfile = sys.argv.pop(0)
|
||||
userargs = sys.argv[:]
|
||||
|
||||
# Load configuration
|
||||
config = ConfigParser.RawConfigParser()
|
||||
config.read(configfile)
|
||||
try:
|
||||
filters_path = config.get("DEFAULT", "filters_path").split(",")
|
||||
filters = None
|
||||
except ConfigParser.Error:
|
||||
print "%s: Incorrect configuration file: %s" % (execname, configfile)
|
||||
sys.exit(RC_BADCONFIG)
|
||||
|
||||
# Add ../ to sys.path to allow running from branch
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
|
||||
os.pardir, os.pardir))
|
||||
@ -78,17 +83,51 @@ if __name__ == '__main__':
|
||||
|
||||
from quantum.rootwrap import wrapper
|
||||
|
||||
# Execute command if it matches any of the loaded filters
|
||||
filters = wrapper.load_filters(filters_path)
|
||||
filtermatch = wrapper.match_filter(filters, userargs)
|
||||
if filtermatch:
|
||||
obj = utils.subprocess_popen(filtermatch.get_command(userargs),
|
||||
stdin=sys.stdin,
|
||||
stdout=sys.stdout,
|
||||
stderr=sys.stderr,
|
||||
env=filtermatch.get_environment(userargs))
|
||||
obj.wait()
|
||||
sys.exit(obj.returncode)
|
||||
# Load configuration
|
||||
try:
|
||||
rawconfig = ConfigParser.RawConfigParser()
|
||||
rawconfig.read(configfile)
|
||||
config = wrapper.RootwrapConfig(rawconfig)
|
||||
except ValueError as exc:
|
||||
msg = "Incorrect value in %s: %s" % (configfile, exc.message)
|
||||
_exit_error(execname, msg, RC_BADCONFIG, log=False)
|
||||
except ConfigParser.Error:
|
||||
_exit_error(execname, "Incorrect configuration file: %s" % configfile,
|
||||
RC_BADCONFIG, log=False)
|
||||
|
||||
print "Unauthorized command: %s" % ' '.join(userargs)
|
||||
sys.exit(RC_UNAUTHORIZED)
|
||||
if config.use_syslog:
|
||||
wrapper.setup_syslog(execname,
|
||||
config.syslog_log_facility,
|
||||
config.syslog_log_level)
|
||||
|
||||
# Execute command if it matches any of the loaded filters
|
||||
filters = wrapper.load_filters(config.filters_path)
|
||||
try:
|
||||
filtermatch = wrapper.match_filter(filters, userargs,
|
||||
exec_dirs=config.exec_dirs)
|
||||
if filtermatch:
|
||||
command = filtermatch.get_command(userargs,
|
||||
exec_dirs=config.exec_dirs)
|
||||
if config.use_syslog:
|
||||
logging.info("(%s > %s) Executing %s (filter match = %s)" % (
|
||||
os.getlogin(), pwd.getpwuid(os.getuid())[0],
|
||||
command, filtermatch.name))
|
||||
|
||||
obj = subprocess.Popen(command,
|
||||
stdin=sys.stdin,
|
||||
stdout=sys.stdout,
|
||||
stderr=sys.stderr,
|
||||
preexec_fn=_subprocess_setup,
|
||||
env=filtermatch.get_environment(userargs))
|
||||
obj.wait()
|
||||
sys.exit(obj.returncode)
|
||||
|
||||
except wrapper.FilterMatchNotExecutable as exc:
|
||||
msg = ("Executable not found: %s (filter match = %s)"
|
||||
% (exc.match.exec_path, exc.match.name))
|
||||
_exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
|
||||
|
||||
except wrapper.NoFilterMatched:
|
||||
msg = ("Unauthorized command: %s (no filter matched)"
|
||||
% ' '.join(userargs))
|
||||
_exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
|
||||
|
@ -15,7 +15,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import os
|
||||
import re
|
||||
|
||||
@ -24,20 +23,39 @@ class CommandFilter(object):
|
||||
"""Command filter only checking that the 1st argument matches exec_path."""
|
||||
|
||||
def __init__(self, exec_path, run_as, *args):
|
||||
self.name = ''
|
||||
self.exec_path = exec_path
|
||||
self.run_as = run_as
|
||||
self.args = args
|
||||
self.real_exec = None
|
||||
|
||||
def get_exec(self, exec_dirs=[]):
|
||||
"""Returns existing executable, or empty string if none found."""
|
||||
if self.real_exec is not None:
|
||||
return self.real_exec
|
||||
self.real_exec = ""
|
||||
if self.exec_path.startswith('/'):
|
||||
if os.access(self.exec_path, os.X_OK):
|
||||
self.real_exec = self.exec_path
|
||||
else:
|
||||
for binary_path in exec_dirs:
|
||||
expanded_path = os.path.join(binary_path, self.exec_path)
|
||||
if os.access(expanded_path, os.X_OK):
|
||||
self.real_exec = expanded_path
|
||||
break
|
||||
return self.real_exec
|
||||
|
||||
def match(self, userargs):
|
||||
"""Only check that the first argument (command) matches exec_path."""
|
||||
return os.path.basename(self.exec_path) == userargs[0]
|
||||
|
||||
def get_command(self, userargs):
|
||||
def get_command(self, userargs, exec_dirs=[]):
|
||||
"""Returns command to execute (with sudo -u if run_as != root)."""
|
||||
to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
|
||||
if (self.run_as != 'root'):
|
||||
# Used to run commands at lesser privileges
|
||||
return ['sudo', '-u', self.run_as, self.exec_path] + userargs[1:]
|
||||
return [self.exec_path] + userargs[1:]
|
||||
return ['sudo', '-u', self.run_as, to_exec] + userargs[1:]
|
||||
return [to_exec] + userargs[1:]
|
||||
|
||||
def get_environment(self, userargs):
|
||||
"""Returns specific environment to set, None if none."""
|
||||
@ -73,6 +91,52 @@ class RegExpFilter(CommandFilter):
|
||||
return False
|
||||
|
||||
|
||||
class PathFilter(CommandFilter):
|
||||
"""Command filter checking that path arguments are within given dirs
|
||||
|
||||
One can specify the following constraints for command arguments:
|
||||
1) pass - pass an argument as is to the resulting command
|
||||
2) some_str - check if an argument is equal to the given string
|
||||
3) abs path - check if a path argument is within the given base dir
|
||||
|
||||
A typical rootwrapper filter entry looks like this:
|
||||
# cmdname: filter name, raw command, user, arg_i_constraint [, ...]
|
||||
chown: PathFilter, /bin/chown, root, nova, /var/lib/images
|
||||
|
||||
"""
|
||||
|
||||
def match(self, userargs):
|
||||
command, arguments = userargs[0], userargs[1:]
|
||||
|
||||
equal_args_num = len(self.args) == len(arguments)
|
||||
exec_is_valid = super(PathFilter, self).match(userargs)
|
||||
args_equal_or_pass = all(
|
||||
arg == 'pass' or arg == value
|
||||
for arg, value in zip(self.args, arguments)
|
||||
if not os.path.isabs(arg) # arguments not specifying abs paths
|
||||
)
|
||||
paths_are_within_base_dirs = all(
|
||||
os.path.commonprefix([arg, os.path.realpath(value)]) == arg
|
||||
for arg, value in zip(self.args, arguments)
|
||||
if os.path.isabs(arg) # arguments specifying abs paths
|
||||
)
|
||||
|
||||
return (equal_args_num and
|
||||
exec_is_valid and
|
||||
args_equal_or_pass and
|
||||
paths_are_within_base_dirs)
|
||||
|
||||
def get_command(self, userargs, exec_dirs=[]):
|
||||
command, arguments = userargs[0], userargs[1:]
|
||||
|
||||
# convert path values to canonical ones; copy other args as is
|
||||
args = [os.path.realpath(value) if os.path.isabs(arg) else value
|
||||
for arg, value in zip(self.args, arguments)]
|
||||
|
||||
return super(PathFilter, self).get_command([command] + args,
|
||||
exec_dirs)
|
||||
|
||||
|
||||
class DnsmasqFilter(CommandFilter):
|
||||
"""Specific filter for the dnsmasq call (which includes env)."""
|
||||
|
||||
@ -96,8 +160,9 @@ class DnsmasqFilter(CommandFilter):
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_command(self, userargs):
|
||||
return [self.exec_path] + userargs[3:]
|
||||
def get_command(self, userargs, exec_dirs=[]):
|
||||
to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
|
||||
return [to_exec] + userargs[3:]
|
||||
|
||||
def get_environment(self, userargs):
|
||||
env = os.environ.copy()
|
||||
@ -145,7 +210,7 @@ class KillFilter(CommandFilter):
|
||||
return False
|
||||
args = list(userargs)
|
||||
if len(args) == 3:
|
||||
# this means we're asking for a specific signal
|
||||
# A specific signal is requested
|
||||
signal = args.pop(1)
|
||||
if signal not in self.args[1:]:
|
||||
# Requested signal not in accepted list
|
||||
@ -157,7 +222,6 @@ class KillFilter(CommandFilter):
|
||||
if len(self.args) > 1:
|
||||
# No signal requested, but filter requires specific signal
|
||||
return False
|
||||
|
||||
try:
|
||||
command = os.readlink("/proc/%d/exe" % int(args[1]))
|
||||
# NOTE(dprince): /proc/PID/exe may have ' (deleted)' on
|
||||
@ -165,7 +229,7 @@ class KillFilter(CommandFilter):
|
||||
if command.endswith(" (deleted)"):
|
||||
command = command[:command.rindex(" ")]
|
||||
if command != self.args[0]:
|
||||
# Affected executable doesn't match
|
||||
# Affected executable does not match
|
||||
return False
|
||||
except (ValueError, OSError):
|
||||
# Incorrect PID
|
||||
|
@ -17,19 +17,86 @@
|
||||
|
||||
|
||||
import ConfigParser
|
||||
import logging
|
||||
import logging.handlers
|
||||
import os
|
||||
import string
|
||||
|
||||
# this import has the effect of defining global var "filters",
|
||||
# referenced by build_filter(), below. It gets set up by
|
||||
# quantum-rootwrap, when we load_filters().
|
||||
from quantum.rootwrap import filters
|
||||
|
||||
|
||||
class NoFilterMatched(Exception):
|
||||
"""This exception is raised when no filter matched."""
|
||||
pass
|
||||
|
||||
|
||||
class FilterMatchNotExecutable(Exception):
|
||||
"""
|
||||
This exception is raised when a filter matched but no executable was
|
||||
found.
|
||||
"""
|
||||
def __init__(self, match=None, **kwargs):
|
||||
self.match = match
|
||||
|
||||
|
||||
class RootwrapConfig(object):
|
||||
|
||||
def __init__(self, config):
|
||||
# filters_path
|
||||
self.filters_path = config.get("DEFAULT", "filters_path").split(",")
|
||||
|
||||
# exec_dirs
|
||||
if config.has_option("DEFAULT", "exec_dirs"):
|
||||
self.exec_dirs = config.get("DEFAULT", "exec_dirs").split(",")
|
||||
else:
|
||||
# Use system PATH if exec_dirs is not specified
|
||||
self.exec_dirs = os.environ["PATH"].split(':')
|
||||
|
||||
# syslog_log_facility
|
||||
if config.has_option("DEFAULT", "syslog_log_facility"):
|
||||
v = config.get("DEFAULT", "syslog_log_facility")
|
||||
facility_names = logging.handlers.SysLogHandler.facility_names
|
||||
self.syslog_log_facility = getattr(logging.handlers.SysLogHandler,
|
||||
v, None)
|
||||
if self.syslog_log_facility is None and v in facility_names:
|
||||
self.syslog_log_facility = facility_names.get(v)
|
||||
if self.syslog_log_facility is None:
|
||||
raise ValueError('Unexpected syslog_log_facility: %s' % v)
|
||||
else:
|
||||
default_facility = logging.handlers.SysLogHandler.LOG_SYSLOG
|
||||
self.syslog_log_facility = default_facility
|
||||
|
||||
# syslog_log_level
|
||||
if config.has_option("DEFAULT", "syslog_log_level"):
|
||||
v = config.get("DEFAULT", "syslog_log_level")
|
||||
self.syslog_log_level = logging.getLevelName(v.upper())
|
||||
if (self.syslog_log_level == "Level %s" % v.upper()):
|
||||
raise ValueError('Unexepected syslog_log_level: %s' % v)
|
||||
else:
|
||||
self.syslog_log_level = logging.ERROR
|
||||
|
||||
# use_syslog
|
||||
if config.has_option("DEFAULT", "use_syslog"):
|
||||
self.use_syslog = config.getboolean("DEFAULT", "use_syslog")
|
||||
else:
|
||||
self.use_syslog = False
|
||||
|
||||
|
||||
def setup_syslog(execname, facility, level):
|
||||
rootwrap_logger = logging.getLogger()
|
||||
rootwrap_logger.setLevel(level)
|
||||
handler = logging.handlers.SysLogHandler(address='/dev/log',
|
||||
facility=facility)
|
||||
handler.setFormatter(logging.Formatter(
|
||||
os.path.basename(execname) + ': %(message)s'))
|
||||
rootwrap_logger.addHandler(handler)
|
||||
|
||||
|
||||
def build_filter(class_name, *args):
|
||||
"""Returns a filter object of class class_name."""
|
||||
if not hasattr(filters, class_name):
|
||||
# TODO(jrd): Log the error (whenever quantum-rootwrap has a log file)
|
||||
logging.warning("Skipping unknown filter class (%s) specified "
|
||||
"in filter definitions" % class_name)
|
||||
return None
|
||||
filterclass = getattr(filters, class_name)
|
||||
return filterclass(*args)
|
||||
@ -49,17 +116,20 @@ def load_filters(filters_path):
|
||||
newfilter = build_filter(*filterdefinition)
|
||||
if newfilter is None:
|
||||
continue
|
||||
newfilter.name = name
|
||||
filterlist.append(newfilter)
|
||||
return filterlist
|
||||
|
||||
|
||||
def match_filter(filter_list, userargs):
|
||||
def match_filter(filter_list, userargs, exec_dirs=[]):
|
||||
"""
|
||||
Checks user command and arguments through command filters and
|
||||
returns the first matching filter, or None is none matched.
|
||||
returns the first matching filter.
|
||||
Raises NoFilterMatched if no filter matched.
|
||||
Raises FilterMatchNotExecutable if no executable was found for the
|
||||
best filter match.
|
||||
"""
|
||||
|
||||
found_filter = None
|
||||
first_not_executable_filter = None
|
||||
|
||||
for f in filter_list:
|
||||
if f.match(userargs):
|
||||
@ -70,16 +140,21 @@ def match_filter(filter_list, userargs):
|
||||
if not isinstance(fltr,
|
||||
filters.ExecCommandFilter)]
|
||||
args = f.exec_args(userargs)
|
||||
if not args or not match_filter(leaf_filters, args):
|
||||
if (not args or not
|
||||
match_filter(leaf_filters, args, exec_dirs=exec_dirs)):
|
||||
continue
|
||||
|
||||
# Try other filters if executable is absent
|
||||
if not os.access(f.exec_path, os.X_OK):
|
||||
if not found_filter:
|
||||
found_filter = f
|
||||
if not f.get_exec(exec_dirs=exec_dirs):
|
||||
if not first_not_executable_filter:
|
||||
first_not_executable_filter = f
|
||||
continue
|
||||
# Otherwise return matching filter for execution
|
||||
return f
|
||||
|
||||
# No filter matched or first missing executable
|
||||
return found_filter
|
||||
if first_not_executable_filter:
|
||||
# A filter matched, but no executable was found for it
|
||||
raise FilterMatchNotExecutable(match=first_not_executable_filter)
|
||||
|
||||
# No filter matched
|
||||
raise NoFilterMatched()
|
||||
|
@ -14,11 +14,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import ConfigParser
|
||||
import logging
|
||||
import logging.handlers
|
||||
import os
|
||||
import subprocess
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
import fixtures
|
||||
|
||||
from quantum.common import utils
|
||||
from quantum.rootwrap import filters
|
||||
from quantum.rootwrap import wrapper
|
||||
from quantum.tests import base
|
||||
@ -32,11 +36,9 @@ class RootwrapTestCase(base.BaseTestCase):
|
||||
filters.RegExpFilter("/bin/ls", "root", 'ls', '/[a-z]+'),
|
||||
filters.CommandFilter("/usr/bin/foo_bar_not_exist", "root"),
|
||||
filters.RegExpFilter("/bin/cat", "root", 'cat', '/[a-z]+'),
|
||||
filters.CommandFilter("/nonexistant/cat", "root"),
|
||||
filters.CommandFilter("/bin/cat", "root")] # Keep this one last
|
||||
|
||||
def tearDown(self):
|
||||
super(RootwrapTestCase, self).tearDown()
|
||||
filters.CommandFilter("/nonexistent/cat", "root"),
|
||||
filters.CommandFilter("/bin/cat", "root") # Keep this one last
|
||||
]
|
||||
|
||||
def test_RegExpFilter_match(self):
|
||||
usercmd = ["ls", "/root"]
|
||||
@ -47,16 +49,17 @@ class RootwrapTestCase(base.BaseTestCase):
|
||||
|
||||
def test_RegExpFilter_reject(self):
|
||||
usercmd = ["ls", "root"]
|
||||
filtermatch = wrapper.match_filter(self.filters, usercmd)
|
||||
self.assertTrue(filtermatch is None)
|
||||
self.assertRaises(wrapper.NoFilterMatched,
|
||||
wrapper.match_filter, self.filters, usercmd)
|
||||
|
||||
def test_missing_command(self):
|
||||
valid_but_missing = ["foo_bar_not_exist"]
|
||||
invalid = ["foo_bar_not_exist_and_not_matched"]
|
||||
filtermatch = wrapper.match_filter(self.filters, valid_but_missing)
|
||||
self.assertTrue(filtermatch is not None)
|
||||
filtermatch = wrapper.match_filter(self.filters, invalid)
|
||||
self.assertTrue(filtermatch is None)
|
||||
self.assertRaises(wrapper.FilterMatchNotExecutable,
|
||||
wrapper.match_filter,
|
||||
self.filters, valid_but_missing)
|
||||
self.assertRaises(wrapper.NoFilterMatched,
|
||||
wrapper.match_filter, self.filters, invalid)
|
||||
|
||||
def test_DnsmasqFilter(self):
|
||||
usercmd = ['QUANTUM_RELAY_SOCKET_PATH=A', 'QUANTUM_NETWORK_ID=foobar',
|
||||
@ -80,30 +83,39 @@ class RootwrapTestCase(base.BaseTestCase):
|
||||
self.assertEqual(env.get('QUANTUM_NETWORK_ID'), 'foobar')
|
||||
|
||||
def test_KillFilter(self):
|
||||
p = utils.subprocess_popen(["/bin/sleep", "5"])
|
||||
f = filters.KillFilter("root", "/bin/sleep", "-9", "-HUP")
|
||||
f2 = filters.KillFilter("root", "/usr/bin/sleep", "-9", "-HUP")
|
||||
usercmd = ['kill', '-ALRM', p.pid]
|
||||
# Incorrect signal should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
usercmd = ['kill', p.pid]
|
||||
# Providing no signal should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
# Providing matching signal should be allowed
|
||||
usercmd = ['kill', '-9', p.pid]
|
||||
self.assertTrue(f.match(usercmd) or f2.match(usercmd))
|
||||
if not os.path.exists("/proc/%d" % os.getpid()):
|
||||
self.skipTest("Test requires /proc filesystem (procfs)")
|
||||
p = subprocess.Popen(["cat"], stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
try:
|
||||
f = filters.KillFilter("root", "/bin/cat", "-9", "-HUP")
|
||||
f2 = filters.KillFilter("root", "/usr/bin/cat", "-9", "-HUP")
|
||||
usercmd = ['kill', '-ALRM', p.pid]
|
||||
# Incorrect signal should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
usercmd = ['kill', p.pid]
|
||||
# Providing no signal should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
# Providing matching signal should be allowed
|
||||
usercmd = ['kill', '-9', p.pid]
|
||||
self.assertTrue(f.match(usercmd) or f2.match(usercmd))
|
||||
|
||||
f = filters.KillFilter("root", "/bin/sleep")
|
||||
f2 = filters.KillFilter("root", "/usr/bin/sleep")
|
||||
usercmd = ['kill', os.getpid()]
|
||||
# Our own PID does not match /bin/sleep, so it should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
usercmd = ['kill', 999999]
|
||||
# Nonexistant PID should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
usercmd = ['kill', p.pid]
|
||||
# Providing no signal should work
|
||||
self.assertTrue(f.match(usercmd) or f2.match(usercmd))
|
||||
f = filters.KillFilter("root", "/bin/cat")
|
||||
f2 = filters.KillFilter("root", "/usr/bin/cat")
|
||||
usercmd = ['kill', os.getpid()]
|
||||
# Our own PID does not match /bin/sleep, so it should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
usercmd = ['kill', 999999]
|
||||
# Nonexistent PID should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
usercmd = ['kill', p.pid]
|
||||
# Providing no signal should work
|
||||
self.assertTrue(f.match(usercmd) or f2.match(usercmd))
|
||||
finally:
|
||||
# Terminate the "cat" process and wait for it to finish
|
||||
p.terminate()
|
||||
p.wait()
|
||||
|
||||
def test_KillFilter_no_raise(self):
|
||||
"""Makes sure ValueError from bug 926412 is gone."""
|
||||
@ -117,13 +129,15 @@ class RootwrapTestCase(base.BaseTestCase):
|
||||
|
||||
def test_KillFilter_deleted_exe(self):
|
||||
"""Makes sure deleted exe's are killed correctly."""
|
||||
# See bug #1073768.
|
||||
with mock.patch('os.readlink') as mock_readlink:
|
||||
mock_readlink.return_value = '/bin/commandddddd (deleted)'
|
||||
f = filters.KillFilter("root", "/bin/commandddddd")
|
||||
usercmd = ['kill', 1234]
|
||||
self.assertTrue(f.match(usercmd))
|
||||
mock_readlink.assert_called_once_with("/proc/1234/exe")
|
||||
# See bug #967931.
|
||||
def fake_readlink(blah):
|
||||
return '/bin/commandddddd (deleted)'
|
||||
|
||||
f = filters.KillFilter("root", "/bin/commandddddd")
|
||||
usercmd = ['kill', 1234]
|
||||
# Providing no signal should work
|
||||
self.stubs.Set(os, 'readlink', fake_readlink)
|
||||
self.assertTrue(f.match(usercmd))
|
||||
|
||||
def test_ReadFileFilter(self):
|
||||
goodfn = '/good/file.name'
|
||||
@ -173,10 +187,180 @@ class RootwrapTestCase(base.BaseTestCase):
|
||||
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
|
||||
'ip', 'link', 'list']
|
||||
|
||||
self.assertIsNone(wrapper.match_filter(filter_list, args))
|
||||
self.assertRaises(wrapper.NoFilterMatched,
|
||||
wrapper.match_filter, filter_list, args)
|
||||
|
||||
def test_exec_dirs_search(self):
|
||||
# This test supposes you have /bin/cat or /usr/bin/cat locally
|
||||
f = filters.CommandFilter("cat", "root")
|
||||
usercmd = ['cat', '/f']
|
||||
self.assertTrue(f.match(usercmd))
|
||||
self.assertTrue(f.get_command(usercmd,
|
||||
exec_dirs=['/bin', '/usr/bin'])
|
||||
in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f']))
|
||||
|
||||
def test_skips(self):
|
||||
# Check that all filters are skipped and that the last matches
|
||||
usercmd = ["cat", "/"]
|
||||
filtermatch = wrapper.match_filter(self.filters, usercmd)
|
||||
self.assertTrue(filtermatch is self.filters[-1])
|
||||
|
||||
def test_RootwrapConfig(self):
|
||||
raw = ConfigParser.RawConfigParser()
|
||||
|
||||
# Empty config should raise ConfigParser.Error
|
||||
self.assertRaises(ConfigParser.Error, wrapper.RootwrapConfig, raw)
|
||||
|
||||
# Check default values
|
||||
raw.set('DEFAULT', 'filters_path', '/a,/b')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.filters_path, ['/a', '/b'])
|
||||
self.assertEqual(config.exec_dirs, os.environ["PATH"].split(':'))
|
||||
self.assertFalse(config.use_syslog)
|
||||
self.assertEqual(config.syslog_log_facility,
|
||||
logging.handlers.SysLogHandler.LOG_SYSLOG)
|
||||
self.assertEqual(config.syslog_log_level, logging.ERROR)
|
||||
|
||||
# Check general values
|
||||
raw.set('DEFAULT', 'exec_dirs', '/a,/x')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.exec_dirs, ['/a', '/x'])
|
||||
|
||||
raw.set('DEFAULT', 'use_syslog', 'oui')
|
||||
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
|
||||
raw.set('DEFAULT', 'use_syslog', 'true')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertTrue(config.use_syslog)
|
||||
|
||||
raw.set('DEFAULT', 'syslog_log_facility', 'moo')
|
||||
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
|
||||
raw.set('DEFAULT', 'syslog_log_facility', 'local0')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.syslog_log_facility,
|
||||
logging.handlers.SysLogHandler.LOG_LOCAL0)
|
||||
raw.set('DEFAULT', 'syslog_log_facility', 'LOG_AUTH')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.syslog_log_facility,
|
||||
logging.handlers.SysLogHandler.LOG_AUTH)
|
||||
|
||||
raw.set('DEFAULT', 'syslog_log_level', 'bar')
|
||||
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
|
||||
raw.set('DEFAULT', 'syslog_log_level', 'INFO')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.syslog_log_level, logging.INFO)
|
||||
|
||||
|
||||
class PathFilterTestCase(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(PathFilterTestCase, self).setUp()
|
||||
|
||||
tmpdir = fixtures.TempDir('/tmp')
|
||||
self.useFixture(tmpdir)
|
||||
|
||||
self.f = filters.PathFilter('/bin/chown', 'root', 'nova', tmpdir.path)
|
||||
|
||||
gen_name = lambda: str(uuid.uuid4())
|
||||
|
||||
self.SIMPLE_FILE_WITHIN_DIR = os.path.join(tmpdir.path, 'some')
|
||||
self.SIMPLE_FILE_OUTSIDE_DIR = os.path.join('/tmp', 'some')
|
||||
self.TRAVERSAL_WITHIN_DIR = os.path.join(tmpdir.path, 'a', '..',
|
||||
'some')
|
||||
self.TRAVERSAL_OUTSIDE_DIR = os.path.join(tmpdir.path, '..', 'some')
|
||||
|
||||
self.TRAVERSAL_SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path,
|
||||
gen_name())
|
||||
os.symlink(os.path.join(tmpdir.path, 'a', '..', 'a'),
|
||||
self.TRAVERSAL_SYMLINK_WITHIN_DIR)
|
||||
|
||||
self.TRAVERSAL_SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path,
|
||||
gen_name())
|
||||
os.symlink(os.path.join(tmpdir.path, 'a', '..', '..', '..', 'etc'),
|
||||
self.TRAVERSAL_SYMLINK_OUTSIDE_DIR)
|
||||
|
||||
self.SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, gen_name())
|
||||
os.symlink(os.path.join(tmpdir.path, 'a'), self.SYMLINK_WITHIN_DIR)
|
||||
|
||||
self.SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, gen_name())
|
||||
os.symlink(os.path.join('/tmp', 'some_file'), self.SYMLINK_OUTSIDE_DIR)
|
||||
|
||||
def test_argument_pass_constraint(self):
|
||||
f = filters.PathFilter('/bin/chown', 'root', 'pass', 'pass')
|
||||
|
||||
args = ['chown', 'something', self.SIMPLE_FILE_OUTSIDE_DIR]
|
||||
self.assertTrue(f.match(args))
|
||||
|
||||
def test_argument_equality_constraint(self):
|
||||
f = filters.PathFilter('/bin/chown', 'root', 'nova', '/tmp/spam/eggs')
|
||||
|
||||
args = ['chown', 'nova', '/tmp/spam/eggs']
|
||||
self.assertTrue(f.match(args))
|
||||
|
||||
args = ['chown', 'quantum', '/tmp/spam/eggs']
|
||||
self.assertFalse(f.match(args))
|
||||
|
||||
def test_wrong_arguments_number(self):
|
||||
args = ['chown', '-c', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_wrong_exec_command(self):
|
||||
args = ['wrong_exec', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_match(self):
|
||||
args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
self.assertTrue(self.f.match(args))
|
||||
|
||||
def test_match_traversal(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
|
||||
self.assertTrue(self.f.match(args))
|
||||
|
||||
def test_match_symlink(self):
|
||||
args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
|
||||
self.assertTrue(self.f.match(args))
|
||||
|
||||
def test_match_traversal_symlink(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
|
||||
self.assertTrue(self.f.match(args))
|
||||
|
||||
def test_reject(self):
|
||||
args = ['chown', 'nova', self.SIMPLE_FILE_OUTSIDE_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_reject_traversal(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_OUTSIDE_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_reject_symlink(self):
|
||||
args = ['chown', 'nova', self.SYMLINK_OUTSIDE_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_reject_traversal_symlink(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_OUTSIDE_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_get_command(self):
|
||||
args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
expected = ['/bin/chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
|
||||
self.assertEqual(expected, self.f.get_command(args))
|
||||
|
||||
def test_get_command_traversal(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
|
||||
expected = ['/bin/chown', 'nova',
|
||||
os.path.realpath(self.TRAVERSAL_WITHIN_DIR)]
|
||||
|
||||
self.assertEqual(expected, self.f.get_command(args))
|
||||
|
||||
def test_get_command_symlink(self):
|
||||
args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
|
||||
expected = ['/bin/chown', 'nova',
|
||||
os.path.realpath(self.SYMLINK_WITHIN_DIR)]
|
||||
|
||||
self.assertEqual(expected, self.f.get_command(args))
|
||||
|
||||
def test_get_command_traversal_symlink(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
|
||||
expected = ['/bin/chown', 'nova',
|
||||
os.path.realpath(self.TRAVERSAL_SYMLINK_WITHIN_DIR)]
|
||||
|
||||
self.assertEqual(expected, self.f.get_command(args))
|
||||
|
Loading…
Reference in New Issue
Block a user