From 4056e8e605bb5f24db1816eafd84d30719264ebe Mon Sep 17 00:00:00 2001 From: Thierry Carrez Date: Mon, 6 May 2013 17:02:39 +0200 Subject: [PATCH] Import recent rootwrap features in local rootwrap Import features developed in oslo-rootwrap during the Grizzly cycle (and recently in havana) into quantum-rootwrap. This is the first step toward using the common oslo-rootwrap into Quantum: the goal being to make both implementations converge, we should avoid making local changes (and push any required change into oslo-rootwrap instead from now on). New features include: - Optional logging (use_syslog) - Searching for executables in a specified binary path (exec_dirs) - New path-based PathFilter Those features required a refactoring in the way executables are matched and in configuration loading. Implements bp quantum-rootwrap-new-features Change-Id: Ia6a2e91c297ade471448dae0964adfd001a46086 --- bin/quantum-rootwrap | 101 +++++++---- quantum/rootwrap/filters.py | 82 ++++++++- quantum/rootwrap/wrapper.py | 103 +++++++++-- quantum/tests/unit/test_rootwrap.py | 272 +++++++++++++++++++++++----- 4 files changed, 460 insertions(+), 98 deletions(-) diff --git a/bin/quantum-rootwrap b/bin/quantum-rootwrap index 0e3383fe9e..8f0158c71a 100755 --- a/bin/quantum-rootwrap +++ b/bin/quantum-rootwrap @@ -36,40 +36,45 @@ node. """ -import ConfigParser -import os -import signal -import sys +from __future__ import print_function -from quantum.common import utils +import ConfigParser +import logging +import os +import pwd +import signal +import subprocess +import sys RC_UNAUTHORIZED = 99 RC_NOCOMMAND = 98 RC_BADCONFIG = 97 +RC_NOEXECFOUND = 96 + + +def _subprocess_setup(): + # Python installs a SIGPIPE handler by default. This is usually not what + # non-Python subprocesses expect. + signal.signal(signal.SIGPIPE, signal.SIG_DFL) + + +def _exit_error(execname, message, errorcode, log=True): + print("%s: %s" % (execname, message)) + if log: + logging.error(message) + sys.exit(errorcode) if __name__ == '__main__': # Split arguments, require at least a command execname = sys.argv.pop(0) - # argv[0] required; path to conf file if len(sys.argv) < 2: - print "%s: %s" % (execname, "No command specified") - sys.exit(RC_NOCOMMAND) + _exit_error(execname, "No command specified", RC_NOCOMMAND, log=False) configfile = sys.argv.pop(0) userargs = sys.argv[:] - # Load configuration - config = ConfigParser.RawConfigParser() - config.read(configfile) - try: - filters_path = config.get("DEFAULT", "filters_path").split(",") - filters = None - except ConfigParser.Error: - print "%s: Incorrect configuration file: %s" % (execname, configfile) - sys.exit(RC_BADCONFIG) - # Add ../ to sys.path to allow running from branch possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname), os.pardir, os.pardir)) @@ -78,17 +83,51 @@ if __name__ == '__main__': from quantum.rootwrap import wrapper - # Execute command if it matches any of the loaded filters - filters = wrapper.load_filters(filters_path) - filtermatch = wrapper.match_filter(filters, userargs) - if filtermatch: - obj = utils.subprocess_popen(filtermatch.get_command(userargs), - stdin=sys.stdin, - stdout=sys.stdout, - stderr=sys.stderr, - env=filtermatch.get_environment(userargs)) - obj.wait() - sys.exit(obj.returncode) + # Load configuration + try: + rawconfig = ConfigParser.RawConfigParser() + rawconfig.read(configfile) + config = wrapper.RootwrapConfig(rawconfig) + except ValueError as exc: + msg = "Incorrect value in %s: %s" % (configfile, exc.message) + _exit_error(execname, msg, RC_BADCONFIG, log=False) + except ConfigParser.Error: + _exit_error(execname, "Incorrect configuration file: %s" % configfile, + RC_BADCONFIG, log=False) - print "Unauthorized command: %s" % ' '.join(userargs) - sys.exit(RC_UNAUTHORIZED) + if config.use_syslog: + wrapper.setup_syslog(execname, + config.syslog_log_facility, + config.syslog_log_level) + + # Execute command if it matches any of the loaded filters + filters = wrapper.load_filters(config.filters_path) + try: + filtermatch = wrapper.match_filter(filters, userargs, + exec_dirs=config.exec_dirs) + if filtermatch: + command = filtermatch.get_command(userargs, + exec_dirs=config.exec_dirs) + if config.use_syslog: + logging.info("(%s > %s) Executing %s (filter match = %s)" % ( + os.getlogin(), pwd.getpwuid(os.getuid())[0], + command, filtermatch.name)) + + obj = subprocess.Popen(command, + stdin=sys.stdin, + stdout=sys.stdout, + stderr=sys.stderr, + preexec_fn=_subprocess_setup, + env=filtermatch.get_environment(userargs)) + obj.wait() + sys.exit(obj.returncode) + + except wrapper.FilterMatchNotExecutable as exc: + msg = ("Executable not found: %s (filter match = %s)" + % (exc.match.exec_path, exc.match.name)) + _exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog) + + except wrapper.NoFilterMatched: + msg = ("Unauthorized command: %s (no filter matched)" + % ' '.join(userargs)) + _exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog) diff --git a/quantum/rootwrap/filters.py b/quantum/rootwrap/filters.py index 28b5e1bd34..30472452ca 100644 --- a/quantum/rootwrap/filters.py +++ b/quantum/rootwrap/filters.py @@ -15,7 +15,6 @@ # License for the specific language governing permissions and limitations # under the License. - import os import re @@ -24,20 +23,39 @@ class CommandFilter(object): """Command filter only checking that the 1st argument matches exec_path.""" def __init__(self, exec_path, run_as, *args): + self.name = '' self.exec_path = exec_path self.run_as = run_as self.args = args + self.real_exec = None + + def get_exec(self, exec_dirs=[]): + """Returns existing executable, or empty string if none found.""" + if self.real_exec is not None: + return self.real_exec + self.real_exec = "" + if self.exec_path.startswith('/'): + if os.access(self.exec_path, os.X_OK): + self.real_exec = self.exec_path + else: + for binary_path in exec_dirs: + expanded_path = os.path.join(binary_path, self.exec_path) + if os.access(expanded_path, os.X_OK): + self.real_exec = expanded_path + break + return self.real_exec def match(self, userargs): """Only check that the first argument (command) matches exec_path.""" return os.path.basename(self.exec_path) == userargs[0] - def get_command(self, userargs): + def get_command(self, userargs, exec_dirs=[]): """Returns command to execute (with sudo -u if run_as != root).""" + to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path if (self.run_as != 'root'): # Used to run commands at lesser privileges - return ['sudo', '-u', self.run_as, self.exec_path] + userargs[1:] - return [self.exec_path] + userargs[1:] + return ['sudo', '-u', self.run_as, to_exec] + userargs[1:] + return [to_exec] + userargs[1:] def get_environment(self, userargs): """Returns specific environment to set, None if none.""" @@ -73,6 +91,52 @@ class RegExpFilter(CommandFilter): return False +class PathFilter(CommandFilter): + """Command filter checking that path arguments are within given dirs + + One can specify the following constraints for command arguments: + 1) pass - pass an argument as is to the resulting command + 2) some_str - check if an argument is equal to the given string + 3) abs path - check if a path argument is within the given base dir + + A typical rootwrapper filter entry looks like this: + # cmdname: filter name, raw command, user, arg_i_constraint [, ...] + chown: PathFilter, /bin/chown, root, nova, /var/lib/images + + """ + + def match(self, userargs): + command, arguments = userargs[0], userargs[1:] + + equal_args_num = len(self.args) == len(arguments) + exec_is_valid = super(PathFilter, self).match(userargs) + args_equal_or_pass = all( + arg == 'pass' or arg == value + for arg, value in zip(self.args, arguments) + if not os.path.isabs(arg) # arguments not specifying abs paths + ) + paths_are_within_base_dirs = all( + os.path.commonprefix([arg, os.path.realpath(value)]) == arg + for arg, value in zip(self.args, arguments) + if os.path.isabs(arg) # arguments specifying abs paths + ) + + return (equal_args_num and + exec_is_valid and + args_equal_or_pass and + paths_are_within_base_dirs) + + def get_command(self, userargs, exec_dirs=[]): + command, arguments = userargs[0], userargs[1:] + + # convert path values to canonical ones; copy other args as is + args = [os.path.realpath(value) if os.path.isabs(arg) else value + for arg, value in zip(self.args, arguments)] + + return super(PathFilter, self).get_command([command] + args, + exec_dirs) + + class DnsmasqFilter(CommandFilter): """Specific filter for the dnsmasq call (which includes env).""" @@ -96,8 +160,9 @@ class DnsmasqFilter(CommandFilter): return True return False - def get_command(self, userargs): - return [self.exec_path] + userargs[3:] + def get_command(self, userargs, exec_dirs=[]): + to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path + return [to_exec] + userargs[3:] def get_environment(self, userargs): env = os.environ.copy() @@ -145,7 +210,7 @@ class KillFilter(CommandFilter): return False args = list(userargs) if len(args) == 3: - # this means we're asking for a specific signal + # A specific signal is requested signal = args.pop(1) if signal not in self.args[1:]: # Requested signal not in accepted list @@ -157,7 +222,6 @@ class KillFilter(CommandFilter): if len(self.args) > 1: # No signal requested, but filter requires specific signal return False - try: command = os.readlink("/proc/%d/exe" % int(args[1])) # NOTE(dprince): /proc/PID/exe may have ' (deleted)' on @@ -165,7 +229,7 @@ class KillFilter(CommandFilter): if command.endswith(" (deleted)"): command = command[:command.rindex(" ")] if command != self.args[0]: - # Affected executable doesn't match + # Affected executable does not match return False except (ValueError, OSError): # Incorrect PID diff --git a/quantum/rootwrap/wrapper.py b/quantum/rootwrap/wrapper.py index 92198c49fd..c7d0999dd1 100644 --- a/quantum/rootwrap/wrapper.py +++ b/quantum/rootwrap/wrapper.py @@ -17,19 +17,86 @@ import ConfigParser +import logging +import logging.handlers import os import string -# this import has the effect of defining global var "filters", -# referenced by build_filter(), below. It gets set up by -# quantum-rootwrap, when we load_filters(). from quantum.rootwrap import filters +class NoFilterMatched(Exception): + """This exception is raised when no filter matched.""" + pass + + +class FilterMatchNotExecutable(Exception): + """ + This exception is raised when a filter matched but no executable was + found. + """ + def __init__(self, match=None, **kwargs): + self.match = match + + +class RootwrapConfig(object): + + def __init__(self, config): + # filters_path + self.filters_path = config.get("DEFAULT", "filters_path").split(",") + + # exec_dirs + if config.has_option("DEFAULT", "exec_dirs"): + self.exec_dirs = config.get("DEFAULT", "exec_dirs").split(",") + else: + # Use system PATH if exec_dirs is not specified + self.exec_dirs = os.environ["PATH"].split(':') + + # syslog_log_facility + if config.has_option("DEFAULT", "syslog_log_facility"): + v = config.get("DEFAULT", "syslog_log_facility") + facility_names = logging.handlers.SysLogHandler.facility_names + self.syslog_log_facility = getattr(logging.handlers.SysLogHandler, + v, None) + if self.syslog_log_facility is None and v in facility_names: + self.syslog_log_facility = facility_names.get(v) + if self.syslog_log_facility is None: + raise ValueError('Unexpected syslog_log_facility: %s' % v) + else: + default_facility = logging.handlers.SysLogHandler.LOG_SYSLOG + self.syslog_log_facility = default_facility + + # syslog_log_level + if config.has_option("DEFAULT", "syslog_log_level"): + v = config.get("DEFAULT", "syslog_log_level") + self.syslog_log_level = logging.getLevelName(v.upper()) + if (self.syslog_log_level == "Level %s" % v.upper()): + raise ValueError('Unexepected syslog_log_level: %s' % v) + else: + self.syslog_log_level = logging.ERROR + + # use_syslog + if config.has_option("DEFAULT", "use_syslog"): + self.use_syslog = config.getboolean("DEFAULT", "use_syslog") + else: + self.use_syslog = False + + +def setup_syslog(execname, facility, level): + rootwrap_logger = logging.getLogger() + rootwrap_logger.setLevel(level) + handler = logging.handlers.SysLogHandler(address='/dev/log', + facility=facility) + handler.setFormatter(logging.Formatter( + os.path.basename(execname) + ': %(message)s')) + rootwrap_logger.addHandler(handler) + + def build_filter(class_name, *args): """Returns a filter object of class class_name.""" if not hasattr(filters, class_name): - # TODO(jrd): Log the error (whenever quantum-rootwrap has a log file) + logging.warning("Skipping unknown filter class (%s) specified " + "in filter definitions" % class_name) return None filterclass = getattr(filters, class_name) return filterclass(*args) @@ -49,17 +116,20 @@ def load_filters(filters_path): newfilter = build_filter(*filterdefinition) if newfilter is None: continue + newfilter.name = name filterlist.append(newfilter) return filterlist -def match_filter(filter_list, userargs): +def match_filter(filter_list, userargs, exec_dirs=[]): """ Checks user command and arguments through command filters and - returns the first matching filter, or None is none matched. + returns the first matching filter. + Raises NoFilterMatched if no filter matched. + Raises FilterMatchNotExecutable if no executable was found for the + best filter match. """ - - found_filter = None + first_not_executable_filter = None for f in filter_list: if f.match(userargs): @@ -70,16 +140,21 @@ def match_filter(filter_list, userargs): if not isinstance(fltr, filters.ExecCommandFilter)] args = f.exec_args(userargs) - if not args or not match_filter(leaf_filters, args): + if (not args or not + match_filter(leaf_filters, args, exec_dirs=exec_dirs)): continue # Try other filters if executable is absent - if not os.access(f.exec_path, os.X_OK): - if not found_filter: - found_filter = f + if not f.get_exec(exec_dirs=exec_dirs): + if not first_not_executable_filter: + first_not_executable_filter = f continue # Otherwise return matching filter for execution return f - # No filter matched or first missing executable - return found_filter + if first_not_executable_filter: + # A filter matched, but no executable was found for it + raise FilterMatchNotExecutable(match=first_not_executable_filter) + + # No filter matched + raise NoFilterMatched() diff --git a/quantum/tests/unit/test_rootwrap.py b/quantum/tests/unit/test_rootwrap.py index 4933f44674..ea476cd05b 100644 --- a/quantum/tests/unit/test_rootwrap.py +++ b/quantum/tests/unit/test_rootwrap.py @@ -14,11 +14,15 @@ # License for the specific language governing permissions and limitations # under the License. +import ConfigParser +import logging +import logging.handlers import os +import subprocess +import uuid -import mock +import fixtures -from quantum.common import utils from quantum.rootwrap import filters from quantum.rootwrap import wrapper from quantum.tests import base @@ -32,11 +36,9 @@ class RootwrapTestCase(base.BaseTestCase): filters.RegExpFilter("/bin/ls", "root", 'ls', '/[a-z]+'), filters.CommandFilter("/usr/bin/foo_bar_not_exist", "root"), filters.RegExpFilter("/bin/cat", "root", 'cat', '/[a-z]+'), - filters.CommandFilter("/nonexistant/cat", "root"), - filters.CommandFilter("/bin/cat", "root")] # Keep this one last - - def tearDown(self): - super(RootwrapTestCase, self).tearDown() + filters.CommandFilter("/nonexistent/cat", "root"), + filters.CommandFilter("/bin/cat", "root") # Keep this one last + ] def test_RegExpFilter_match(self): usercmd = ["ls", "/root"] @@ -47,16 +49,17 @@ class RootwrapTestCase(base.BaseTestCase): def test_RegExpFilter_reject(self): usercmd = ["ls", "root"] - filtermatch = wrapper.match_filter(self.filters, usercmd) - self.assertTrue(filtermatch is None) + self.assertRaises(wrapper.NoFilterMatched, + wrapper.match_filter, self.filters, usercmd) def test_missing_command(self): valid_but_missing = ["foo_bar_not_exist"] invalid = ["foo_bar_not_exist_and_not_matched"] - filtermatch = wrapper.match_filter(self.filters, valid_but_missing) - self.assertTrue(filtermatch is not None) - filtermatch = wrapper.match_filter(self.filters, invalid) - self.assertTrue(filtermatch is None) + self.assertRaises(wrapper.FilterMatchNotExecutable, + wrapper.match_filter, + self.filters, valid_but_missing) + self.assertRaises(wrapper.NoFilterMatched, + wrapper.match_filter, self.filters, invalid) def test_DnsmasqFilter(self): usercmd = ['QUANTUM_RELAY_SOCKET_PATH=A', 'QUANTUM_NETWORK_ID=foobar', @@ -80,30 +83,39 @@ class RootwrapTestCase(base.BaseTestCase): self.assertEqual(env.get('QUANTUM_NETWORK_ID'), 'foobar') def test_KillFilter(self): - p = utils.subprocess_popen(["/bin/sleep", "5"]) - f = filters.KillFilter("root", "/bin/sleep", "-9", "-HUP") - f2 = filters.KillFilter("root", "/usr/bin/sleep", "-9", "-HUP") - usercmd = ['kill', '-ALRM', p.pid] - # Incorrect signal should fail - self.assertFalse(f.match(usercmd) or f2.match(usercmd)) - usercmd = ['kill', p.pid] - # Providing no signal should fail - self.assertFalse(f.match(usercmd) or f2.match(usercmd)) - # Providing matching signal should be allowed - usercmd = ['kill', '-9', p.pid] - self.assertTrue(f.match(usercmd) or f2.match(usercmd)) + if not os.path.exists("/proc/%d" % os.getpid()): + self.skipTest("Test requires /proc filesystem (procfs)") + p = subprocess.Popen(["cat"], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + try: + f = filters.KillFilter("root", "/bin/cat", "-9", "-HUP") + f2 = filters.KillFilter("root", "/usr/bin/cat", "-9", "-HUP") + usercmd = ['kill', '-ALRM', p.pid] + # Incorrect signal should fail + self.assertFalse(f.match(usercmd) or f2.match(usercmd)) + usercmd = ['kill', p.pid] + # Providing no signal should fail + self.assertFalse(f.match(usercmd) or f2.match(usercmd)) + # Providing matching signal should be allowed + usercmd = ['kill', '-9', p.pid] + self.assertTrue(f.match(usercmd) or f2.match(usercmd)) - f = filters.KillFilter("root", "/bin/sleep") - f2 = filters.KillFilter("root", "/usr/bin/sleep") - usercmd = ['kill', os.getpid()] - # Our own PID does not match /bin/sleep, so it should fail - self.assertFalse(f.match(usercmd) or f2.match(usercmd)) - usercmd = ['kill', 999999] - # Nonexistant PID should fail - self.assertFalse(f.match(usercmd) or f2.match(usercmd)) - usercmd = ['kill', p.pid] - # Providing no signal should work - self.assertTrue(f.match(usercmd) or f2.match(usercmd)) + f = filters.KillFilter("root", "/bin/cat") + f2 = filters.KillFilter("root", "/usr/bin/cat") + usercmd = ['kill', os.getpid()] + # Our own PID does not match /bin/sleep, so it should fail + self.assertFalse(f.match(usercmd) or f2.match(usercmd)) + usercmd = ['kill', 999999] + # Nonexistent PID should fail + self.assertFalse(f.match(usercmd) or f2.match(usercmd)) + usercmd = ['kill', p.pid] + # Providing no signal should work + self.assertTrue(f.match(usercmd) or f2.match(usercmd)) + finally: + # Terminate the "cat" process and wait for it to finish + p.terminate() + p.wait() def test_KillFilter_no_raise(self): """Makes sure ValueError from bug 926412 is gone.""" @@ -117,13 +129,15 @@ class RootwrapTestCase(base.BaseTestCase): def test_KillFilter_deleted_exe(self): """Makes sure deleted exe's are killed correctly.""" - # See bug #1073768. - with mock.patch('os.readlink') as mock_readlink: - mock_readlink.return_value = '/bin/commandddddd (deleted)' - f = filters.KillFilter("root", "/bin/commandddddd") - usercmd = ['kill', 1234] - self.assertTrue(f.match(usercmd)) - mock_readlink.assert_called_once_with("/proc/1234/exe") + # See bug #967931. + def fake_readlink(blah): + return '/bin/commandddddd (deleted)' + + f = filters.KillFilter("root", "/bin/commandddddd") + usercmd = ['kill', 1234] + # Providing no signal should work + self.stubs.Set(os, 'readlink', fake_readlink) + self.assertTrue(f.match(usercmd)) def test_ReadFileFilter(self): goodfn = '/good/file.name' @@ -173,10 +187,180 @@ class RootwrapTestCase(base.BaseTestCase): args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar', 'ip', 'link', 'list'] - self.assertIsNone(wrapper.match_filter(filter_list, args)) + self.assertRaises(wrapper.NoFilterMatched, + wrapper.match_filter, filter_list, args) + + def test_exec_dirs_search(self): + # This test supposes you have /bin/cat or /usr/bin/cat locally + f = filters.CommandFilter("cat", "root") + usercmd = ['cat', '/f'] + self.assertTrue(f.match(usercmd)) + self.assertTrue(f.get_command(usercmd, + exec_dirs=['/bin', '/usr/bin']) + in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f'])) def test_skips(self): # Check that all filters are skipped and that the last matches usercmd = ["cat", "/"] filtermatch = wrapper.match_filter(self.filters, usercmd) self.assertTrue(filtermatch is self.filters[-1]) + + def test_RootwrapConfig(self): + raw = ConfigParser.RawConfigParser() + + # Empty config should raise ConfigParser.Error + self.assertRaises(ConfigParser.Error, wrapper.RootwrapConfig, raw) + + # Check default values + raw.set('DEFAULT', 'filters_path', '/a,/b') + config = wrapper.RootwrapConfig(raw) + self.assertEqual(config.filters_path, ['/a', '/b']) + self.assertEqual(config.exec_dirs, os.environ["PATH"].split(':')) + self.assertFalse(config.use_syslog) + self.assertEqual(config.syslog_log_facility, + logging.handlers.SysLogHandler.LOG_SYSLOG) + self.assertEqual(config.syslog_log_level, logging.ERROR) + + # Check general values + raw.set('DEFAULT', 'exec_dirs', '/a,/x') + config = wrapper.RootwrapConfig(raw) + self.assertEqual(config.exec_dirs, ['/a', '/x']) + + raw.set('DEFAULT', 'use_syslog', 'oui') + self.assertRaises(ValueError, wrapper.RootwrapConfig, raw) + raw.set('DEFAULT', 'use_syslog', 'true') + config = wrapper.RootwrapConfig(raw) + self.assertTrue(config.use_syslog) + + raw.set('DEFAULT', 'syslog_log_facility', 'moo') + self.assertRaises(ValueError, wrapper.RootwrapConfig, raw) + raw.set('DEFAULT', 'syslog_log_facility', 'local0') + config = wrapper.RootwrapConfig(raw) + self.assertEqual(config.syslog_log_facility, + logging.handlers.SysLogHandler.LOG_LOCAL0) + raw.set('DEFAULT', 'syslog_log_facility', 'LOG_AUTH') + config = wrapper.RootwrapConfig(raw) + self.assertEqual(config.syslog_log_facility, + logging.handlers.SysLogHandler.LOG_AUTH) + + raw.set('DEFAULT', 'syslog_log_level', 'bar') + self.assertRaises(ValueError, wrapper.RootwrapConfig, raw) + raw.set('DEFAULT', 'syslog_log_level', 'INFO') + config = wrapper.RootwrapConfig(raw) + self.assertEqual(config.syslog_log_level, logging.INFO) + + +class PathFilterTestCase(base.BaseTestCase): + def setUp(self): + super(PathFilterTestCase, self).setUp() + + tmpdir = fixtures.TempDir('/tmp') + self.useFixture(tmpdir) + + self.f = filters.PathFilter('/bin/chown', 'root', 'nova', tmpdir.path) + + gen_name = lambda: str(uuid.uuid4()) + + self.SIMPLE_FILE_WITHIN_DIR = os.path.join(tmpdir.path, 'some') + self.SIMPLE_FILE_OUTSIDE_DIR = os.path.join('/tmp', 'some') + self.TRAVERSAL_WITHIN_DIR = os.path.join(tmpdir.path, 'a', '..', + 'some') + self.TRAVERSAL_OUTSIDE_DIR = os.path.join(tmpdir.path, '..', 'some') + + self.TRAVERSAL_SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, + gen_name()) + os.symlink(os.path.join(tmpdir.path, 'a', '..', 'a'), + self.TRAVERSAL_SYMLINK_WITHIN_DIR) + + self.TRAVERSAL_SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, + gen_name()) + os.symlink(os.path.join(tmpdir.path, 'a', '..', '..', '..', 'etc'), + self.TRAVERSAL_SYMLINK_OUTSIDE_DIR) + + self.SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, gen_name()) + os.symlink(os.path.join(tmpdir.path, 'a'), self.SYMLINK_WITHIN_DIR) + + self.SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, gen_name()) + os.symlink(os.path.join('/tmp', 'some_file'), self.SYMLINK_OUTSIDE_DIR) + + def test_argument_pass_constraint(self): + f = filters.PathFilter('/bin/chown', 'root', 'pass', 'pass') + + args = ['chown', 'something', self.SIMPLE_FILE_OUTSIDE_DIR] + self.assertTrue(f.match(args)) + + def test_argument_equality_constraint(self): + f = filters.PathFilter('/bin/chown', 'root', 'nova', '/tmp/spam/eggs') + + args = ['chown', 'nova', '/tmp/spam/eggs'] + self.assertTrue(f.match(args)) + + args = ['chown', 'quantum', '/tmp/spam/eggs'] + self.assertFalse(f.match(args)) + + def test_wrong_arguments_number(self): + args = ['chown', '-c', 'nova', self.SIMPLE_FILE_WITHIN_DIR] + self.assertFalse(self.f.match(args)) + + def test_wrong_exec_command(self): + args = ['wrong_exec', self.SIMPLE_FILE_WITHIN_DIR] + self.assertFalse(self.f.match(args)) + + def test_match(self): + args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR] + self.assertTrue(self.f.match(args)) + + def test_match_traversal(self): + args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR] + self.assertTrue(self.f.match(args)) + + def test_match_symlink(self): + args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR] + self.assertTrue(self.f.match(args)) + + def test_match_traversal_symlink(self): + args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR] + self.assertTrue(self.f.match(args)) + + def test_reject(self): + args = ['chown', 'nova', self.SIMPLE_FILE_OUTSIDE_DIR] + self.assertFalse(self.f.match(args)) + + def test_reject_traversal(self): + args = ['chown', 'nova', self.TRAVERSAL_OUTSIDE_DIR] + self.assertFalse(self.f.match(args)) + + def test_reject_symlink(self): + args = ['chown', 'nova', self.SYMLINK_OUTSIDE_DIR] + self.assertFalse(self.f.match(args)) + + def test_reject_traversal_symlink(self): + args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_OUTSIDE_DIR] + self.assertFalse(self.f.match(args)) + + def test_get_command(self): + args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR] + expected = ['/bin/chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR] + + self.assertEqual(expected, self.f.get_command(args)) + + def test_get_command_traversal(self): + args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR] + expected = ['/bin/chown', 'nova', + os.path.realpath(self.TRAVERSAL_WITHIN_DIR)] + + self.assertEqual(expected, self.f.get_command(args)) + + def test_get_command_symlink(self): + args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR] + expected = ['/bin/chown', 'nova', + os.path.realpath(self.SYMLINK_WITHIN_DIR)] + + self.assertEqual(expected, self.f.get_command(args)) + + def test_get_command_traversal_symlink(self): + args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR] + expected = ['/bin/chown', 'nova', + os.path.realpath(self.TRAVERSAL_SYMLINK_WITHIN_DIR)] + + self.assertEqual(expected, self.f.get_command(args))