oslo.rootwrap/oslo_rootwrap/tests/test_rootwrap.py
Hervé Beraud 43ab5ad7a7 Add coreutils as valid value to fix KillFilter test
In containerized RHEL 9 environment the KillFilter test started to fail
because `os.readlink("/proc/<pid-of-cat/exe>")` return /usr/bin/coreutils.

Indeed cat is part of the coreutils package and the symlink of
this file point toward coreutils. Fixing this bug by adding
coreutils as a possible returned value for our tests.

Closes-Bug: #2037383
Change-Id: I141eb15efa30c7df3ca5419b594097f4b683663c
2023-10-16 09:29:07 +02:00

682 lines
28 KiB
Python

# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import configparser
import logging
import logging.handlers
import os
import tempfile
from unittest import mock
import uuid
import fixtures
import testtools
from oslo_rootwrap import cmd
from oslo_rootwrap import daemon
from oslo_rootwrap import filters
from oslo_rootwrap import subprocess
from oslo_rootwrap import wrapper
class RootwrapLoaderTestCase(testtools.TestCase):
def test_privsep_in_loader(self):
privsep = ["privsep-helper", "--context", "foo"]
filterlist = wrapper.load_filters([])
# mock out get_exec because
with mock.patch.object(filters.CommandFilter, 'get_exec') as ge:
ge.return_value = "/fake/privsep-helper"
filtermatch = wrapper.match_filter(filterlist, privsep)
self.assertIsNotNone(filtermatch)
self.assertEqual(["/fake/privsep-helper", "--context", "foo"],
filtermatch.get_command(privsep))
def test_strict_switched_off_in_configparser(self):
temp_dir = self.useFixture(fixtures.TempDir()).path
os.mkdir(os.path.join(temp_dir, 'nested'))
temp_file = os.path.join(temp_dir, 'test.conf')
f = open(temp_file, 'w')
f.write("""[Filters]
privsep: PathFilter, privsep-helper, root
privsep: PathFilter, privsep-helper, root
""")
f.close()
filterlist = wrapper.load_filters([temp_dir])
self.assertIsNotNone(filterlist)
class RootwrapTestCase(testtools.TestCase):
if os.path.exists('/sbin/ip'):
_ip = '/sbin/ip'
else:
_ip = '/bin/ip'
def setUp(self):
super(RootwrapTestCase, self).setUp()
self.filters = [
filters.RegExpFilter("/bin/ls", "root", 'ls', '/[a-z]+'),
filters.CommandFilter("/usr/bin/foo_bar_not_exist", "root"),
filters.RegExpFilter("/bin/cat", "root", 'cat', '/[a-z]+'),
filters.CommandFilter("/nonexistent/cat", "root"),
filters.CommandFilter("/bin/cat", "root") # Keep this one last
]
def test_CommandFilter(self):
f = filters.CommandFilter("sleep", 'root', '10')
self.assertFalse(f.match(["sleep2"]))
# verify that any arguments are accepted
self.assertTrue(f.match(["sleep"]))
self.assertTrue(f.match(["sleep", "anything"]))
self.assertTrue(f.match(["sleep", "10"]))
f = filters.CommandFilter("sleep", 'root')
self.assertTrue(f.match(["sleep", "10"]))
def test_empty_commandfilter(self):
f = filters.CommandFilter("sleep", "root")
self.assertFalse(f.match([]))
self.assertFalse(f.match(None))
def test_empty_regexpfilter(self):
f = filters.RegExpFilter("sleep", "root", "sleep")
self.assertFalse(f.match([]))
self.assertFalse(f.match(None))
def test_empty_invalid_regexpfilter(self):
f = filters.RegExpFilter("sleep", "root")
self.assertFalse(f.match(["anything"]))
self.assertFalse(f.match([]))
def test_RegExpFilter_match(self):
usercmd = ["ls", "/root"]
filtermatch = wrapper.match_filter(self.filters, usercmd)
self.assertFalse(filtermatch is None)
self.assertEqual(["/bin/ls", "/root"],
filtermatch.get_command(usercmd))
def test_RegExpFilter_reject(self):
usercmd = ["ls", "root"]
self.assertRaises(wrapper.NoFilterMatched,
wrapper.match_filter, self.filters, usercmd)
def test_missing_command(self):
valid_but_missing = ["foo_bar_not_exist"]
invalid = ["foo_bar_not_exist_and_not_matched"]
self.assertRaises(wrapper.FilterMatchNotExecutable,
wrapper.match_filter,
self.filters, valid_but_missing)
self.assertRaises(wrapper.NoFilterMatched,
wrapper.match_filter, self.filters, invalid)
def _test_EnvFilter_as_DnsMasq(self, config_file_arg):
usercmd = ['env', config_file_arg + '=A', 'NETWORK_ID=foobar',
'dnsmasq', 'foo']
f = filters.EnvFilter("env", "root", config_file_arg + '=A',
'NETWORK_ID=', "/usr/bin/dnsmasq")
self.assertTrue(f.match(usercmd))
self.assertEqual(['/usr/bin/dnsmasq', 'foo'], f.get_command(usercmd))
env = f.get_environment(usercmd)
self.assertEqual('A', env.get(config_file_arg))
self.assertEqual('foobar', env.get('NETWORK_ID'))
def test_EnvFilter(self):
envset = ['A=/some/thing', 'B=somethingelse']
envcmd = ['env'] + envset
realcmd = ['sleep', '10']
usercmd = envcmd + realcmd
f = filters.EnvFilter("env", "root", "A=", "B=ignored", "sleep")
# accept with leading env
self.assertTrue(f.match(envcmd + ["sleep"]))
# accept without leading env
self.assertTrue(f.match(envset + ["sleep"]))
# any other command does not match
self.assertFalse(f.match(envcmd + ["sleep2"]))
self.assertFalse(f.match(envset + ["sleep2"]))
# accept any trailing arguments
self.assertTrue(f.match(usercmd))
# require given environment variables to match
self.assertFalse(f.match([envcmd, 'C=ELSE']))
self.assertFalse(f.match(['env', 'C=xx']))
self.assertFalse(f.match(['env', 'A=xx']))
# require env command to be given
# (otherwise CommandFilters should match
self.assertFalse(f.match(realcmd))
# require command to match
self.assertFalse(f.match(envcmd))
self.assertFalse(f.match(envcmd[1:]))
# ensure that the env command is stripped when executing
self.assertEqual(realcmd, f.exec_args(usercmd))
env = f.get_environment(usercmd)
# check that environment variables are set
self.assertEqual('/some/thing', env.get('A'))
self.assertEqual('somethingelse', env.get('B'))
self.assertNotIn('sleep', env.keys())
def test_EnvFilter_without_leading_env(self):
envset = ['A=/some/thing', 'B=somethingelse']
envcmd = ['env'] + envset
realcmd = ['sleep', '10']
f = filters.EnvFilter("sleep", "root", "A=", "B=ignored")
# accept without leading env
self.assertTrue(f.match(envset + ["sleep"]))
self.assertEqual(realcmd, f.get_command(envcmd + realcmd))
self.assertEqual(realcmd, f.get_command(envset + realcmd))
env = f.get_environment(envset + realcmd)
# check that environment variables are set
self.assertEqual('/some/thing', env.get('A'))
self.assertEqual('somethingelse', env.get('B'))
self.assertNotIn('sleep', env.keys())
def test_KillFilter(self):
if not os.path.exists("/proc/%d" % os.getpid()):
self.skipTest("Test requires /proc filesystem (procfs)")
p = subprocess.Popen(["cat"], stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
try:
f = filters.KillFilter("root", "/bin/cat", "-9", "-HUP")
f2 = filters.KillFilter("root", "/usr/bin/cat", "-9", "-HUP")
f3 = filters.KillFilter("root", "/usr/bin/coreutils", "-9", "-HUP")
usercmd = ['kill', '-ALRM', p.pid]
# Incorrect signal should fail
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
usercmd = ['kill', p.pid]
# Providing no signal should fail
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
# Providing matching signal should be allowed
usercmd = ['kill', '-9', p.pid]
self.assertTrue(f.match(usercmd) or
f2.match(usercmd) or
f3.match(usercmd))
f = filters.KillFilter("root", "/bin/cat")
f2 = filters.KillFilter("root", "/usr/bin/cat")
f3 = filters.KillFilter("root", "/usr/bin/coreutils")
usercmd = ['kill', os.getpid()]
# Our own PID does not match /bin/sleep, so it should fail
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
usercmd = ['kill', 999999]
# Nonexistent PID should fail
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
usercmd = ['kill', p.pid]
# Providing no signal should work
self.assertTrue(f.match(usercmd) or
f2.match(usercmd) or
f3.match(usercmd))
# verify that relative paths are matched against $PATH
f = filters.KillFilter("root", "cat")
f2 = filters.KillFilter("root", "coreutils")
# Our own PID does not match so it should fail
usercmd = ['kill', os.getpid()]
self.assertFalse(f.match(usercmd))
# Filter should find cat in /bin or /usr/bin
usercmd = ['kill', p.pid]
self.assertTrue(f.match(usercmd) or f2.match(usercmd))
# Filter shouldn't be able to find binary in $PATH, so fail
with fixtures.EnvironmentVariable("PATH", "/foo:/bar"):
self.assertFalse(f.match(usercmd))
# ensure that unset $PATH is not causing an exception
with fixtures.EnvironmentVariable("PATH"):
self.assertFalse(f.match(usercmd))
finally:
# Terminate the "cat" process and wait for it to finish
p.terminate()
p.wait()
def test_KillFilter_no_raise(self):
"""Makes sure ValueError from bug 926412 is gone."""
f = filters.KillFilter("root", "")
# Providing anything other than kill should be False
usercmd = ['notkill', 999999]
self.assertFalse(f.match(usercmd))
# Providing something that is not a pid should be False
usercmd = ['kill', 'notapid']
self.assertFalse(f.match(usercmd))
# no arguments should also be fine
self.assertFalse(f.match([]))
self.assertFalse(f.match(None))
def test_KillFilter_deleted_exe(self):
"""Makes sure deleted exe's are killed correctly."""
command = "/bin/commandddddd"
f = filters.KillFilter("root", command)
usercmd = ['kill', 1234]
# Providing no signal should work
with mock.patch('os.readlink') as readlink:
readlink.return_value = command + ' (deleted)'
with mock.patch('os.path.isfile') as exists:
def fake_exists(path):
return path == command
exists.side_effect = fake_exists
self.assertTrue(f.match(usercmd))
@mock.patch('os.readlink')
@mock.patch('os.path.isfile')
def test_KillFilter_upgraded_exe(self, mock_isfile, mock_readlink):
"""Makes sure upgraded exe's are killed correctly."""
f = filters.KillFilter("root", "/bin/commandddddd")
command = "/bin/commandddddd"
usercmd = ['kill', 1234]
def fake_exists(path):
return path == command
mock_readlink.return_value = command + '\0\05190bfb2 (deleted)'
mock_isfile.side_effect = fake_exists
self.assertTrue(f.match(usercmd))
@mock.patch('os.readlink')
@mock.patch('os.path.isfile')
@mock.patch('os.path.exists')
@mock.patch('os.access')
def test_KillFilter_renamed_exe(self, mock_access, mock_exists,
mock_isfile, mock_readlink):
"""Makes sure renamed exe's are killed correctly."""
command = "/bin/commandddddd"
f = filters.KillFilter("root", command)
usercmd = ['kill', 1234]
def fake_os_func(path, *args):
return path == command
mock_readlink.return_value = command + ';90bfb2 (deleted)'
m = mock.mock_open(read_data=command)
with mock.patch("builtins.open", m, create=True):
mock_isfile.side_effect = fake_os_func
mock_exists.side_effect = fake_os_func
mock_access.side_effect = fake_os_func
self.assertTrue(f.match(usercmd))
def test_ReadFileFilter(self):
goodfn = '/good/file.name'
f = filters.ReadFileFilter(goodfn)
usercmd = ['cat', '/bad/file']
self.assertFalse(f.match(['cat', '/bad/file']))
usercmd = ['cat', goodfn]
self.assertEqual(['/bin/cat', goodfn], f.get_command(usercmd))
self.assertTrue(f.match(usercmd))
def test_IpFilter_non_netns(self):
f = filters.IpFilter(self._ip, 'root')
self.assertTrue(f.match(['ip', 'link', 'list']))
self.assertTrue(f.match(['ip', '-s', 'link', 'list']))
self.assertTrue(f.match(['ip', '-s', '-v', 'netns', 'add']))
self.assertTrue(f.match(['ip', 'link', 'set', 'interface',
'netns', 'somens']))
def test_IpFilter_netns(self):
f = filters.IpFilter(self._ip, 'root')
self.assertFalse(f.match(['ip', 'netns', 'exec', 'foo']))
self.assertFalse(f.match(['ip', 'netns', 'exec']))
self.assertFalse(f.match(['ip', '-s', 'netns', 'exec']))
self.assertFalse(f.match(['ip', '-l', '42', 'netns', 'exec']))
self.assertFalse(f.match(['ip', 'net', 'exec', 'foo']))
self.assertFalse(f.match(['ip', 'netns', 'e', 'foo']))
def _test_IpFilter_netns_helper(self, action):
f = filters.IpFilter(self._ip, 'root')
self.assertTrue(f.match(['ip', 'link', action]))
def test_IpFilter_netns_add(self):
self._test_IpFilter_netns_helper('add')
def test_IpFilter_netns_delete(self):
self._test_IpFilter_netns_helper('delete')
def test_IpFilter_netns_list(self):
self._test_IpFilter_netns_helper('list')
def test_IpNetnsExecFilter_match(self):
f = filters.IpNetnsExecFilter(self._ip, 'root')
self.assertTrue(
f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
self.assertTrue(f.match(['ip', 'net', 'exec', 'foo', 'bar']))
self.assertTrue(f.match(['ip', 'netn', 'e', 'foo', 'bar']))
self.assertTrue(f.match(['ip', 'net', 'e', 'foo', 'bar']))
self.assertTrue(f.match(['ip', 'net', 'exe', 'foo', 'bar']))
def test_IpNetnsExecFilter_nomatch(self):
f = filters.IpNetnsExecFilter(self._ip, 'root')
self.assertFalse(f.match(['ip', 'link', 'list']))
self.assertFalse(f.match(['ip', 'foo', 'bar', 'netns']))
self.assertFalse(f.match(['ip', '-s', 'netns', 'exec']))
self.assertFalse(f.match(['ip', '-l', '42', 'netns', 'exec']))
self.assertFalse(f.match(['ip', 'netns exec', 'foo', 'bar', 'baz']))
self.assertFalse(f.match([]))
# verify that at least a NS is given
self.assertFalse(f.match(['ip', 'netns', 'exec']))
def test_IpNetnsExecFilter_nomatch_nonroot(self):
f = filters.IpNetnsExecFilter(self._ip, 'user')
self.assertFalse(
f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
def test_match_filter_recurses_exec_command_filter_matches(self):
filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'),
filters.IpFilter(self._ip, 'root')]
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
self.assertIsNotNone(wrapper.match_filter(filter_list, args))
def test_match_filter_recurses_exec_command_matches_user(self):
filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'),
filters.IpFilter(self._ip, 'user')]
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
# Currently ip netns exec requires root, so verify that
# no non-root filter is matched, as that would escalate privileges
self.assertRaises(wrapper.NoFilterMatched,
wrapper.match_filter, filter_list, args)
def test_match_filter_recurses_exec_command_filter_does_not_match(self):
filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'),
filters.IpFilter(self._ip, 'root')]
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
'ip', 'link', 'list']
self.assertRaises(wrapper.NoFilterMatched,
wrapper.match_filter, filter_list, args)
def test_ChainingRegExpFilter_match(self):
filter_list = [filters.ChainingRegExpFilter('nice', 'root',
'nice', r'-?\d+'),
filters.CommandFilter('cat', 'root')]
args = ['nice', '5', 'cat', '/a']
dirs = ['/bin', '/usr/bin']
self.assertIsNotNone(wrapper.match_filter(filter_list, args, dirs))
def test_ChainingRegExpFilter_not_match(self):
filter_list = [filters.ChainingRegExpFilter('nice', 'root',
'nice', r'-?\d+'),
filters.CommandFilter('cat', 'root')]
args_invalid = (['nice', '5', 'ls', '/a'],
['nice', '--5', 'cat', '/a'],
['nice2', '5', 'cat', '/a'],
['nice', 'cat', '/a'],
['nice', '5'])
dirs = ['/bin', '/usr/bin']
for args in args_invalid:
self.assertRaises(wrapper.NoFilterMatched,
wrapper.match_filter, filter_list, args, dirs)
def test_ChainingRegExpFilter_multiple(self):
filter_list = [filters.ChainingRegExpFilter('ionice', 'root', 'ionice',
'-c[0-3]'),
filters.ChainingRegExpFilter('ionice', 'root', 'ionice',
'-c[0-3]', '-n[0-7]'),
filters.CommandFilter('cat', 'root')]
# both filters match to ['ionice', '-c2'], but only the second accepts
args = ['ionice', '-c2', '-n7', 'cat', '/a']
dirs = ['/bin', '/usr/bin']
self.assertIsNotNone(wrapper.match_filter(filter_list, args, dirs))
def test_ReadFileFilter_empty_args(self):
goodfn = '/good/file.name'
f = filters.ReadFileFilter(goodfn)
self.assertFalse(f.match([]))
self.assertFalse(f.match(None))
def test_exec_dirs_search(self):
# This test supposes you have /bin/cat or /usr/bin/cat locally
f = filters.CommandFilter("cat", "root")
usercmd = ['cat', '/f']
self.assertTrue(f.match(usercmd))
self.assertTrue(f.get_command(usercmd,
exec_dirs=['/bin', '/usr/bin'])
in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f']))
def test_skips(self):
# Check that all filters are skipped and that the last matches
usercmd = ["cat", "/"]
filtermatch = wrapper.match_filter(self.filters, usercmd)
self.assertTrue(filtermatch is self.filters[-1])
def test_RootwrapConfig(self):
raw = configparser.RawConfigParser()
# Empty config should raise configparser.Error
self.assertRaises(configparser.Error,
wrapper.RootwrapConfig, raw)
# Check default values
raw.set('DEFAULT', 'filters_path', '/a,/b')
config = wrapper.RootwrapConfig(raw)
self.assertEqual(['/a', '/b'], config.filters_path)
self.assertEqual(os.environ["PATH"].split(':'), config.exec_dirs)
with fixtures.EnvironmentVariable("PATH"):
c = wrapper.RootwrapConfig(raw)
self.assertEqual([], c.exec_dirs)
self.assertFalse(config.use_syslog)
self.assertEqual(logging.handlers.SysLogHandler.LOG_SYSLOG,
config.syslog_log_facility)
self.assertEqual(logging.ERROR, config.syslog_log_level)
# Check general values
raw.set('DEFAULT', 'exec_dirs', '/a,/x')
config = wrapper.RootwrapConfig(raw)
self.assertEqual(['/a', '/x'], config.exec_dirs)
raw.set('DEFAULT', 'use_syslog', 'oui')
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
raw.set('DEFAULT', 'use_syslog', 'true')
config = wrapper.RootwrapConfig(raw)
self.assertTrue(config.use_syslog)
raw.set('DEFAULT', 'syslog_log_facility', 'moo')
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
raw.set('DEFAULT', 'syslog_log_facility', 'local0')
config = wrapper.RootwrapConfig(raw)
self.assertEqual(logging.handlers.SysLogHandler.LOG_LOCAL0,
config.syslog_log_facility)
raw.set('DEFAULT', 'syslog_log_facility', 'LOG_AUTH')
config = wrapper.RootwrapConfig(raw)
self.assertEqual(logging.handlers.SysLogHandler.LOG_AUTH,
config.syslog_log_facility)
raw.set('DEFAULT', 'syslog_log_level', 'bar')
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
raw.set('DEFAULT', 'syslog_log_level', 'INFO')
config = wrapper.RootwrapConfig(raw)
self.assertEqual(logging.INFO, config.syslog_log_level)
def test_getlogin(self):
with mock.patch('os.getlogin') as os_getlogin:
os_getlogin.return_value = 'foo'
self.assertEqual('foo', wrapper._getlogin())
def test_getlogin_bad(self):
with mock.patch('os.getenv') as os_getenv:
with mock.patch('os.getlogin') as os_getlogin:
os_getenv.side_effect = [None, None, 'bar']
os_getlogin.side_effect = OSError(
'[Errno 22] Invalid argument')
self.assertEqual('bar', wrapper._getlogin())
os_getlogin.assert_called_once_with()
self.assertEqual(3, os_getenv.call_count)
class PathFilterTestCase(testtools.TestCase):
def setUp(self):
super(PathFilterTestCase, self).setUp()
self.tmp_root_dir = tempfile.mkdtemp()
tmpdir = fixtures.TempDir(self.tmp_root_dir)
self.useFixture(tmpdir)
self.f = filters.PathFilter('/bin/chown', 'root', 'nova', tmpdir.path)
gen_name = lambda: str(uuid.uuid4())
self.SIMPLE_FILE_WITHIN_DIR = os.path.join(tmpdir.path, 'some')
self.SIMPLE_FILE_OUTSIDE_DIR = os.path.join(self.tmp_root_dir, 'some')
self.TRAVERSAL_WITHIN_DIR = os.path.join(tmpdir.path, 'a', '..',
'some')
self.TRAVERSAL_OUTSIDE_DIR = os.path.join(tmpdir.path, '..', 'some')
self.TRAVERSAL_SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path,
gen_name())
os.symlink(os.path.join(tmpdir.path, 'a', '..', 'a'),
self.TRAVERSAL_SYMLINK_WITHIN_DIR)
self.TRAVERSAL_SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path,
gen_name())
os.symlink(os.path.join(tmpdir.path, 'a', '..', '..', '..', 'etc'),
self.TRAVERSAL_SYMLINK_OUTSIDE_DIR)
self.SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, gen_name())
os.symlink(os.path.join(tmpdir.path, 'a'), self.SYMLINK_WITHIN_DIR)
self.SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, gen_name())
os.symlink(os.path.join(self.tmp_root_dir, 'some_file'),
self.SYMLINK_OUTSIDE_DIR)
def test_empty_args(self):
self.assertFalse(self.f.match([]))
self.assertFalse(self.f.match(None))
def test_argument_pass_constraint(self):
f = filters.PathFilter('/bin/chown', 'root', 'pass', 'pass')
args = ['chown', 'something', self.SIMPLE_FILE_OUTSIDE_DIR]
self.assertTrue(f.match(args))
def test_argument_equality_constraint(self):
temp_file_path = os.path.join(self.tmp_root_dir, 'spam/eggs')
f = filters.PathFilter('/bin/chown', 'root', 'nova', temp_file_path)
args = ['chown', 'nova', temp_file_path]
self.assertTrue(f.match(args))
args = ['chown', 'quantum', temp_file_path]
self.assertFalse(f.match(args))
def test_wrong_arguments_number(self):
args = ['chown', '-c', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
self.assertFalse(self.f.match(args))
def test_wrong_exec_command(self):
args = ['wrong_exec', self.SIMPLE_FILE_WITHIN_DIR]
self.assertFalse(self.f.match(args))
def test_match(self):
args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
self.assertTrue(self.f.match(args))
def test_match_traversal(self):
args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
self.assertTrue(self.f.match(args))
def test_match_symlink(self):
args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
self.assertTrue(self.f.match(args))
def test_match_traversal_symlink(self):
args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
self.assertTrue(self.f.match(args))
def test_reject(self):
args = ['chown', 'nova', self.SIMPLE_FILE_OUTSIDE_DIR]
self.assertFalse(self.f.match(args))
def test_reject_traversal(self):
args = ['chown', 'nova', self.TRAVERSAL_OUTSIDE_DIR]
self.assertFalse(self.f.match(args))
def test_reject_symlink(self):
args = ['chown', 'nova', self.SYMLINK_OUTSIDE_DIR]
self.assertFalse(self.f.match(args))
def test_reject_traversal_symlink(self):
args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_OUTSIDE_DIR]
self.assertFalse(self.f.match(args))
def test_get_command(self):
args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
expected = ['/bin/chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
self.assertEqual(expected, self.f.get_command(args))
def test_get_command_traversal(self):
args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
expected = ['/bin/chown', 'nova',
os.path.realpath(self.TRAVERSAL_WITHIN_DIR)]
self.assertEqual(expected, self.f.get_command(args))
def test_get_command_symlink(self):
args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
expected = ['/bin/chown', 'nova',
os.path.realpath(self.SYMLINK_WITHIN_DIR)]
self.assertEqual(expected, self.f.get_command(args))
def test_get_command_traversal_symlink(self):
args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
expected = ['/bin/chown', 'nova',
os.path.realpath(self.TRAVERSAL_SYMLINK_WITHIN_DIR)]
self.assertEqual(expected, self.f.get_command(args))
class RunOneCommandTestCase(testtools.TestCase):
def _test_returncode_helper(self, returncode, expected):
with mock.patch.object(wrapper, 'start_subprocess') as mock_start:
with mock.patch('sys.exit') as mock_exit:
mock_start.return_value.wait.return_value = returncode
cmd.run_one_command(None, mock.Mock(), None, None)
mock_exit.assert_called_once_with(expected)
def test_positive_returncode(self):
self._test_returncode_helper(1, 1)
def test_negative_returncode(self):
self._test_returncode_helper(-1, 129)
class DaemonCleanupException(Exception):
pass
class DaemonCleanupTestCase(testtools.TestCase):
@mock.patch('os.chmod')
@mock.patch('shutil.rmtree')
@mock.patch('tempfile.mkdtemp')
@mock.patch('multiprocessing.managers.BaseManager.get_server',
side_effect=DaemonCleanupException)
def test_daemon_no_cleanup_for_uninitialized_server(self, gs, mkd, *args):
mkd.return_value = '/just_dir/123'
self.assertRaises(DaemonCleanupException, daemon.daemon_start,
config=None, filters=None)