Merge "Remove oslo namespace package"
This commit is contained in:
commit
293def2d23
@ -1,13 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
__import__('pkg_resources').declare_namespace(__name__)
|
@ -1,26 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import warnings
|
||||
|
||||
|
||||
def deprecated():
|
||||
new_name = __name__.replace('.', '_')
|
||||
warnings.warn(
|
||||
('The oslo namespace package is deprecated. Please use %s instead.' %
|
||||
new_name),
|
||||
DeprecationWarning,
|
||||
stacklevel=3,
|
||||
)
|
||||
|
||||
|
||||
deprecated()
|
@ -1,13 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_rootwrap.client import * # noqa
|
@ -1,13 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_rootwrap.cmd import * # noqa
|
@ -1,13 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_rootwrap.daemon import * # noqa
|
@ -1,13 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_rootwrap.filters import * # noqa
|
@ -1,13 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_rootwrap.jsonrpc import * # noqa
|
@ -1,13 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_rootwrap.wrapper import * # noqa
|
@ -21,11 +21,7 @@ classifier =
|
||||
|
||||
[files]
|
||||
packages =
|
||||
oslo
|
||||
oslo.rootwrap
|
||||
oslo_rootwrap
|
||||
namespace_packages =
|
||||
oslo
|
||||
|
||||
[entry_points]
|
||||
console_scripts =
|
||||
|
@ -1,57 +0,0 @@
|
||||
# Copyright (c) 2014 Mirantis Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
|
||||
from oslo.rootwrap import cmd
|
||||
|
||||
|
||||
def forward_stream(fr, to):
|
||||
while True:
|
||||
line = fr.readline()
|
||||
if not line:
|
||||
break
|
||||
to.write(line)
|
||||
|
||||
|
||||
def forwarding_popen(f, old_popen=subprocess.Popen):
|
||||
def popen(*args, **kwargs):
|
||||
p = old_popen(*args, **kwargs)
|
||||
t = threading.Thread(target=forward_stream, args=(p.stderr, f))
|
||||
t.daemon = True
|
||||
t.start()
|
||||
return p
|
||||
return popen
|
||||
|
||||
|
||||
class nonclosing(object):
|
||||
def __init__(self, f):
|
||||
self._f = f
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._f, name)
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
log_format = ("%(asctime)s | [%(process)5s]+%(levelname)5s | "
|
||||
"%(message)s")
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.DEBUG, format=log_format)
|
||||
sys.stderr = nonclosing(sys.stderr)
|
||||
cmd.daemon()
|
@ -1,239 +0,0 @@
|
||||
# Copyright (c) 2014 Mirantis Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
|
||||
try:
|
||||
import StringIO
|
||||
except ImportError:
|
||||
StringIO = io.StringIO
|
||||
else:
|
||||
StringIO = StringIO.StringIO
|
||||
|
||||
try:
|
||||
import eventlet
|
||||
except ImportError:
|
||||
eventlet = None
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
import testtools
|
||||
from testtools import content
|
||||
|
||||
from oslo.rootwrap import client
|
||||
from oslo.rootwrap import wrapper
|
||||
from tests import run_daemon
|
||||
|
||||
|
||||
class _FunctionalBase(object):
|
||||
def setUp(self):
|
||||
super(_FunctionalBase, self).setUp()
|
||||
tmpdir = self.useFixture(fixtures.TempDir()).path
|
||||
self.config_file = os.path.join(tmpdir, 'rootwrap.conf')
|
||||
filters_dir = os.path.join(tmpdir, 'filters.d')
|
||||
filters_file = os.path.join(tmpdir, 'filters.d', 'test.filters')
|
||||
os.mkdir(filters_dir)
|
||||
with open(self.config_file, 'w') as f:
|
||||
f.write("""[DEFAULT]
|
||||
filters_path=%s
|
||||
exec_dirs=/bin""" % (filters_dir,))
|
||||
with open(filters_file, 'w') as f:
|
||||
f.write("""[Filters]
|
||||
echo: CommandFilter, /bin/echo, root
|
||||
cat: CommandFilter, /bin/cat, root
|
||||
sh: CommandFilter, /bin/sh, root
|
||||
""")
|
||||
|
||||
def test_run_once(self):
|
||||
code, out, err = self.execute(['echo', 'teststr'])
|
||||
self.assertEqual(0, code)
|
||||
self.assertEqual(b'teststr\n', out)
|
||||
self.assertEqual(b'', err)
|
||||
|
||||
def test_run_with_stdin(self):
|
||||
code, out, err = self.execute(['cat'], stdin=b'teststr')
|
||||
self.assertEqual(0, code)
|
||||
self.assertEqual(b'teststr', out)
|
||||
self.assertEqual(b'', err)
|
||||
|
||||
|
||||
class RootwrapTest(_FunctionalBase, testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(RootwrapTest, self).setUp()
|
||||
self.cmd = [
|
||||
# We need to explicitly ignore the DeprecationWarning
|
||||
# generated by importing oslo.rootwrap instead of
|
||||
# oslo_rootwrap under python 2.6 but it is going to be
|
||||
# ignored by default for versions after 2.7.
|
||||
sys.executable, '-W', 'ignore::DeprecationWarning', '-c',
|
||||
'from oslo.rootwrap import cmd; cmd.main()',
|
||||
self.config_file]
|
||||
|
||||
def execute(self, cmd, stdin=None):
|
||||
proc = subprocess.Popen(
|
||||
self.cmd + cmd,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
out, err = proc.communicate(stdin)
|
||||
self.addDetail('stdout',
|
||||
content.text_content(out.decode('utf-8', 'replace')))
|
||||
self.addDetail('stderr',
|
||||
content.text_content(err.decode('utf-8', 'replace')))
|
||||
return proc.returncode, out, err
|
||||
|
||||
|
||||
class RootwrapDaemonTest(_FunctionalBase, testtools.TestCase):
|
||||
def assert_unpatched(self):
|
||||
# We need to verify that these tests are run without eventlet patching
|
||||
if eventlet and eventlet.patcher.is_monkey_patched('socket'):
|
||||
self.fail("Standard library should not be patched by eventlet"
|
||||
" for this test")
|
||||
|
||||
def setUp(self):
|
||||
self.assert_unpatched()
|
||||
|
||||
super(RootwrapDaemonTest, self).setUp()
|
||||
|
||||
# Collect daemon logs
|
||||
daemon_log = io.BytesIO()
|
||||
p = mock.patch('subprocess.Popen',
|
||||
run_daemon.forwarding_popen(daemon_log))
|
||||
p.start()
|
||||
self.addCleanup(p.stop)
|
||||
|
||||
# Collect client logs
|
||||
client_log = StringIO()
|
||||
handler = logging.StreamHandler(client_log)
|
||||
log_format = run_daemon.log_format.replace('+', ' ')
|
||||
handler.setFormatter(logging.Formatter(log_format))
|
||||
logger = logging.getLogger('oslo.rootwrap')
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
self.addCleanup(logger.removeHandler, handler)
|
||||
|
||||
# Add all logs as details
|
||||
@self.addCleanup
|
||||
def add_logs():
|
||||
self.addDetail('daemon_log', content.Content(
|
||||
content.UTF8_TEXT,
|
||||
lambda: [daemon_log.getvalue()]))
|
||||
self.addDetail('client_log', content.Content(
|
||||
content.UTF8_TEXT,
|
||||
lambda: [client_log.getvalue().encode('utf-8')]))
|
||||
|
||||
# Create client
|
||||
self.client = client.Client([
|
||||
sys.executable, run_daemon.__file__,
|
||||
self.config_file])
|
||||
|
||||
# _finalize is set during Client.execute()
|
||||
@self.addCleanup
|
||||
def finalize_client():
|
||||
if self.client._initialized:
|
||||
self.client._finalize()
|
||||
|
||||
self.execute = self.client.execute
|
||||
|
||||
def test_error_propagation(self):
|
||||
self.assertRaises(wrapper.NoFilterMatched, self.execute, ['other'])
|
||||
|
||||
def test_daemon_ressurection(self):
|
||||
# Let the client start a daemon
|
||||
self.execute(['cat'])
|
||||
# Make daemon go away
|
||||
os.kill(self.client._process.pid, signal.SIGTERM)
|
||||
# Expect client to succesfully restart daemon and run simple request
|
||||
self.test_run_once()
|
||||
|
||||
def _exec_thread(self, fifo_path):
|
||||
try:
|
||||
# Run a shell script that signals calling process through FIFO and
|
||||
# then hangs around for 1 sec
|
||||
self._thread_res = self.execute([
|
||||
'sh', '-c', 'echo > "%s"; sleep 1; echo OK' % fifo_path])
|
||||
except Exception as e:
|
||||
self._thread_res = e
|
||||
|
||||
def test_graceful_death(self):
|
||||
# Create a fifo in a temporary dir
|
||||
tmpdir = self.useFixture(fixtures.TempDir()).path
|
||||
fifo_path = os.path.join(tmpdir, 'fifo')
|
||||
os.mkfifo(fifo_path)
|
||||
# Start daemon
|
||||
self.execute(['cat'])
|
||||
# Begin executing shell script
|
||||
t = threading.Thread(target=self._exec_thread, args=(fifo_path,))
|
||||
t.start()
|
||||
# Wait for shell script to actually start
|
||||
with open(fifo_path) as f:
|
||||
f.readline()
|
||||
# Gracefully kill daemon process
|
||||
os.kill(self.client._process.pid, signal.SIGTERM)
|
||||
# Expect daemon to wait for our request to finish
|
||||
t.join()
|
||||
if isinstance(self._thread_res, Exception):
|
||||
raise self._thread_res # Python 3 will even provide nice traceback
|
||||
code, out, err = self._thread_res
|
||||
self.assertEqual(0, code)
|
||||
self.assertEqual(b'OK\n', out)
|
||||
self.assertEqual(b'', err)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _test_daemon_cleanup(self):
|
||||
# Start a daemon
|
||||
self.execute(['cat'])
|
||||
socket_path = self.client._manager._address
|
||||
# Stop it one way or another
|
||||
yield
|
||||
process = self.client._process
|
||||
stop = threading.Event()
|
||||
|
||||
# Start background thread that would kill process in 1 second if it
|
||||
# doesn't die by then
|
||||
def sleep_kill():
|
||||
stop.wait(1)
|
||||
if not stop.is_set():
|
||||
os.kill(process.pid, signal.SIGKILL)
|
||||
threading.Thread(target=sleep_kill).start()
|
||||
# Wait for process to finish one way or another
|
||||
self.client._process.wait()
|
||||
# Notify background thread that process is dead (no need to kill it)
|
||||
stop.set()
|
||||
# Fail if the process got killed by the background thread
|
||||
self.assertNotEqual(-signal.SIGKILL, process.returncode,
|
||||
"Server haven't stopped in one second")
|
||||
# Verify that socket is deleted
|
||||
self.assertFalse(os.path.exists(socket_path),
|
||||
"Server didn't remove its temporary directory")
|
||||
|
||||
def test_daemon_cleanup_client(self):
|
||||
# Run _test_daemon_cleanup stopping daemon as Client instance would
|
||||
# normally do
|
||||
with self._test_daemon_cleanup():
|
||||
self.client._finalize()
|
||||
|
||||
def test_daemon_cleanup_signal(self):
|
||||
# Run _test_daemon_cleanup stopping daemon with SIGTERM signal
|
||||
with self._test_daemon_cleanup():
|
||||
os.kill(self.client._process.pid, signal.SIGTERM)
|
@ -1,31 +0,0 @@
|
||||
# Copyright (c) 2014 Mirantis Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
if os.environ.get('TEST_EVENTLET', False):
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
|
||||
from tests import test_functional
|
||||
|
||||
class RootwrapDaemonTest(test_functional.RootwrapDaemonTest):
|
||||
def assert_unpatched(self):
|
||||
# This test case is specifically for eventlet testing
|
||||
pass
|
||||
|
||||
def test_graceful_death(self):
|
||||
# This test fails with eventlet on Python 2.6.6 on CentOS
|
||||
self.skip("Eventlet doesn't like FIFOs")
|
@ -1,571 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import logging.handlers
|
||||
import os
|
||||
import subprocess
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
from six import moves
|
||||
import testtools
|
||||
|
||||
from oslo.rootwrap import cmd
|
||||
from oslo.rootwrap import filters
|
||||
from oslo.rootwrap import wrapper
|
||||
|
||||
|
||||
class RootwrapTestCase(testtools.TestCase):
|
||||
if os.path.exists('/sbin/ip'):
|
||||
_ip = '/sbin/ip'
|
||||
else:
|
||||
_ip = '/bin/ip'
|
||||
|
||||
def setUp(self):
|
||||
super(RootwrapTestCase, self).setUp()
|
||||
self.filters = [
|
||||
filters.RegExpFilter("/bin/ls", "root", 'ls', '/[a-z]+'),
|
||||
filters.CommandFilter("/usr/bin/foo_bar_not_exist", "root"),
|
||||
filters.RegExpFilter("/bin/cat", "root", 'cat', '/[a-z]+'),
|
||||
filters.CommandFilter("/nonexistent/cat", "root"),
|
||||
filters.CommandFilter("/bin/cat", "root") # Keep this one last
|
||||
]
|
||||
|
||||
def test_CommandFilter(self):
|
||||
f = filters.CommandFilter("sleep", 'root', '10')
|
||||
self.assertFalse(f.match(["sleep2"]))
|
||||
|
||||
# verify that any arguments are accepted
|
||||
self.assertTrue(f.match(["sleep"]))
|
||||
self.assertTrue(f.match(["sleep", "anything"]))
|
||||
self.assertTrue(f.match(["sleep", "10"]))
|
||||
f = filters.CommandFilter("sleep", 'root')
|
||||
self.assertTrue(f.match(["sleep", "10"]))
|
||||
|
||||
def test_empty_commandfilter(self):
|
||||
f = filters.CommandFilter("sleep", "root")
|
||||
self.assertFalse(f.match([]))
|
||||
self.assertFalse(f.match(None))
|
||||
|
||||
def test_empty_regexpfilter(self):
|
||||
f = filters.RegExpFilter("sleep", "root", "sleep")
|
||||
self.assertFalse(f.match([]))
|
||||
self.assertFalse(f.match(None))
|
||||
|
||||
def test_empty_invalid_regexpfilter(self):
|
||||
f = filters.RegExpFilter("sleep", "root")
|
||||
self.assertFalse(f.match(["anything"]))
|
||||
self.assertFalse(f.match([]))
|
||||
|
||||
def test_RegExpFilter_match(self):
|
||||
usercmd = ["ls", "/root"]
|
||||
filtermatch = wrapper.match_filter(self.filters, usercmd)
|
||||
self.assertFalse(filtermatch is None)
|
||||
self.assertEqual(filtermatch.get_command(usercmd),
|
||||
["/bin/ls", "/root"])
|
||||
|
||||
def test_RegExpFilter_reject(self):
|
||||
usercmd = ["ls", "root"]
|
||||
self.assertRaises(wrapper.NoFilterMatched,
|
||||
wrapper.match_filter, self.filters, usercmd)
|
||||
|
||||
def test_missing_command(self):
|
||||
valid_but_missing = ["foo_bar_not_exist"]
|
||||
invalid = ["foo_bar_not_exist_and_not_matched"]
|
||||
self.assertRaises(wrapper.FilterMatchNotExecutable,
|
||||
wrapper.match_filter,
|
||||
self.filters, valid_but_missing)
|
||||
self.assertRaises(wrapper.NoFilterMatched,
|
||||
wrapper.match_filter, self.filters, invalid)
|
||||
|
||||
def _test_EnvFilter_as_DnsMasq(self, config_file_arg):
|
||||
usercmd = ['env', config_file_arg + '=A', 'NETWORK_ID=foobar',
|
||||
'dnsmasq', 'foo']
|
||||
f = filters.EnvFilter("env", "root", config_file_arg + '=A',
|
||||
'NETWORK_ID=', "/usr/bin/dnsmasq")
|
||||
self.assertTrue(f.match(usercmd))
|
||||
self.assertEqual(f.get_command(usercmd), ['/usr/bin/dnsmasq', 'foo'])
|
||||
env = f.get_environment(usercmd)
|
||||
self.assertEqual(env.get(config_file_arg), 'A')
|
||||
self.assertEqual(env.get('NETWORK_ID'), 'foobar')
|
||||
|
||||
def test_EnvFilter(self):
|
||||
envset = ['A=/some/thing', 'B=somethingelse']
|
||||
envcmd = ['env'] + envset
|
||||
realcmd = ['sleep', '10']
|
||||
usercmd = envcmd + realcmd
|
||||
|
||||
f = filters.EnvFilter("env", "root", "A=", "B=ignored", "sleep")
|
||||
# accept with leading env
|
||||
self.assertTrue(f.match(envcmd + ["sleep"]))
|
||||
# accept without leading env
|
||||
self.assertTrue(f.match(envset + ["sleep"]))
|
||||
|
||||
# any other command does not match
|
||||
self.assertFalse(f.match(envcmd + ["sleep2"]))
|
||||
self.assertFalse(f.match(envset + ["sleep2"]))
|
||||
|
||||
# accept any trailing arguments
|
||||
self.assertTrue(f.match(usercmd))
|
||||
|
||||
# require given environment variables to match
|
||||
self.assertFalse(f.match([envcmd, 'C=ELSE']))
|
||||
self.assertFalse(f.match(['env', 'C=xx']))
|
||||
self.assertFalse(f.match(['env', 'A=xx']))
|
||||
|
||||
# require env command to be given
|
||||
# (otherwise CommandFilters should match
|
||||
self.assertFalse(f.match(realcmd))
|
||||
# require command to match
|
||||
self.assertFalse(f.match(envcmd))
|
||||
self.assertFalse(f.match(envcmd[1:]))
|
||||
|
||||
# ensure that the env command is stripped when executing
|
||||
self.assertEqual(f.exec_args(usercmd), realcmd)
|
||||
env = f.get_environment(usercmd)
|
||||
# check that environment variables are set
|
||||
self.assertEqual(env.get('A'), '/some/thing')
|
||||
self.assertEqual(env.get('B'), 'somethingelse')
|
||||
self.assertFalse('sleep' in env.keys())
|
||||
|
||||
def test_EnvFilter_without_leading_env(self):
|
||||
envset = ['A=/some/thing', 'B=somethingelse']
|
||||
envcmd = ['env'] + envset
|
||||
realcmd = ['sleep', '10']
|
||||
|
||||
f = filters.EnvFilter("sleep", "root", "A=", "B=ignored")
|
||||
|
||||
# accept without leading env
|
||||
self.assertTrue(f.match(envset + ["sleep"]))
|
||||
|
||||
self.assertEqual(f.get_command(envcmd + realcmd), realcmd)
|
||||
self.assertEqual(f.get_command(envset + realcmd), realcmd)
|
||||
|
||||
env = f.get_environment(envset + realcmd)
|
||||
# check that environment variables are set
|
||||
self.assertEqual(env.get('A'), '/some/thing')
|
||||
self.assertEqual(env.get('B'), 'somethingelse')
|
||||
self.assertFalse('sleep' in env.keys())
|
||||
|
||||
def test_KillFilter(self):
|
||||
if not os.path.exists("/proc/%d" % os.getpid()):
|
||||
self.skipTest("Test requires /proc filesystem (procfs)")
|
||||
p = subprocess.Popen(["cat"], stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
try:
|
||||
f = filters.KillFilter("root", "/bin/cat", "-9", "-HUP")
|
||||
f2 = filters.KillFilter("root", "/usr/bin/cat", "-9", "-HUP")
|
||||
usercmd = ['kill', '-ALRM', p.pid]
|
||||
# Incorrect signal should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
usercmd = ['kill', p.pid]
|
||||
# Providing no signal should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
# Providing matching signal should be allowed
|
||||
usercmd = ['kill', '-9', p.pid]
|
||||
self.assertTrue(f.match(usercmd) or f2.match(usercmd))
|
||||
|
||||
f = filters.KillFilter("root", "/bin/cat")
|
||||
f2 = filters.KillFilter("root", "/usr/bin/cat")
|
||||
usercmd = ['kill', os.getpid()]
|
||||
# Our own PID does not match /bin/sleep, so it should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
usercmd = ['kill', 999999]
|
||||
# Nonexistent PID should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
usercmd = ['kill', p.pid]
|
||||
# Providing no signal should work
|
||||
self.assertTrue(f.match(usercmd) or f2.match(usercmd))
|
||||
|
||||
# verify that relative paths are matched against $PATH
|
||||
f = filters.KillFilter("root", "cat")
|
||||
# Our own PID does not match so it should fail
|
||||
usercmd = ['kill', os.getpid()]
|
||||
self.assertFalse(f.match(usercmd))
|
||||
# Filter should find cat in /bin or /usr/bin
|
||||
usercmd = ['kill', p.pid]
|
||||
self.assertTrue(f.match(usercmd))
|
||||
# Filter shouldn't be able to find binary in $PATH, so fail
|
||||
with fixtures.EnvironmentVariable("PATH", "/foo:/bar"):
|
||||
self.assertFalse(f.match(usercmd))
|
||||
# ensure that unset $PATH is not causing an exception
|
||||
with fixtures.EnvironmentVariable("PATH"):
|
||||
self.assertFalse(f.match(usercmd))
|
||||
finally:
|
||||
# Terminate the "cat" process and wait for it to finish
|
||||
p.terminate()
|
||||
p.wait()
|
||||
|
||||
def test_KillFilter_no_raise(self):
|
||||
"""Makes sure ValueError from bug 926412 is gone."""
|
||||
f = filters.KillFilter("root", "")
|
||||
# Providing anything other than kill should be False
|
||||
usercmd = ['notkill', 999999]
|
||||
self.assertFalse(f.match(usercmd))
|
||||
# Providing something that is not a pid should be False
|
||||
usercmd = ['kill', 'notapid']
|
||||
self.assertFalse(f.match(usercmd))
|
||||
# no arguments should also be fine
|
||||
self.assertFalse(f.match([]))
|
||||
self.assertFalse(f.match(None))
|
||||
|
||||
def test_KillFilter_deleted_exe(self):
|
||||
"""Makes sure deleted exe's are killed correctly."""
|
||||
f = filters.KillFilter("root", "/bin/commandddddd")
|
||||
usercmd = ['kill', 1234]
|
||||
# Providing no signal should work
|
||||
with mock.patch('os.readlink') as readlink:
|
||||
readlink.return_value = '/bin/commandddddd (deleted)'
|
||||
self.assertTrue(f.match(usercmd))
|
||||
|
||||
def test_KillFilter_upgraded_exe(self):
|
||||
"""Makes sure upgraded exe's are killed correctly."""
|
||||
f = filters.KillFilter("root", "/bin/commandddddd")
|
||||
usercmd = ['kill', 1234]
|
||||
with mock.patch('os.readlink') as readlink:
|
||||
readlink.return_value = '/bin/commandddddd\0\05190bfb2 (deleted)'
|
||||
self.assertTrue(f.match(usercmd))
|
||||
|
||||
def test_ReadFileFilter(self):
|
||||
goodfn = '/good/file.name'
|
||||
f = filters.ReadFileFilter(goodfn)
|
||||
usercmd = ['cat', '/bad/file']
|
||||
self.assertFalse(f.match(['cat', '/bad/file']))
|
||||
usercmd = ['cat', goodfn]
|
||||
self.assertEqual(f.get_command(usercmd), ['/bin/cat', goodfn])
|
||||
self.assertTrue(f.match(usercmd))
|
||||
|
||||
def test_IpFilter_non_netns(self):
|
||||
f = filters.IpFilter(self._ip, 'root')
|
||||
self.assertTrue(f.match(['ip', 'link', 'list']))
|
||||
self.assertTrue(f.match(['ip', '-s', 'link', 'list']))
|
||||
self.assertTrue(f.match(['ip', '-s', '-v', 'netns', 'add']))
|
||||
self.assertTrue(f.match(['ip', 'link', 'set', 'interface',
|
||||
'netns', 'somens']))
|
||||
|
||||
def test_IpFilter_netns(self):
|
||||
f = filters.IpFilter(self._ip, 'root')
|
||||
self.assertFalse(f.match(['ip', 'netns', 'exec', 'foo']))
|
||||
self.assertFalse(f.match(['ip', 'netns', 'exec']))
|
||||
self.assertFalse(f.match(['ip', '-s', 'netns', 'exec']))
|
||||
self.assertFalse(f.match(['ip', '-l', '42', 'netns', 'exec']))
|
||||
|
||||
def _test_IpFilter_netns_helper(self, action):
|
||||
f = filters.IpFilter(self._ip, 'root')
|
||||
self.assertTrue(f.match(['ip', 'link', action]))
|
||||
|
||||
def test_IpFilter_netns_add(self):
|
||||
self._test_IpFilter_netns_helper('add')
|
||||
|
||||
def test_IpFilter_netns_delete(self):
|
||||
self._test_IpFilter_netns_helper('delete')
|
||||
|
||||
def test_IpFilter_netns_list(self):
|
||||
self._test_IpFilter_netns_helper('list')
|
||||
|
||||
def test_IpNetnsExecFilter_match(self):
|
||||
f = filters.IpNetnsExecFilter(self._ip, 'root')
|
||||
self.assertTrue(
|
||||
f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
|
||||
|
||||
def test_IpNetnsExecFilter_nomatch(self):
|
||||
f = filters.IpNetnsExecFilter(self._ip, 'root')
|
||||
self.assertFalse(f.match(['ip', 'link', 'list']))
|
||||
|
||||
# verify that at least a NS is given
|
||||
self.assertFalse(f.match(['ip', 'netns', 'exec']))
|
||||
|
||||
def test_IpNetnsExecFilter_nomatch_nonroot(self):
|
||||
f = filters.IpNetnsExecFilter(self._ip, 'user')
|
||||
self.assertFalse(
|
||||
f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
|
||||
|
||||
def test_match_filter_recurses_exec_command_filter_matches(self):
|
||||
filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'),
|
||||
filters.IpFilter(self._ip, 'root')]
|
||||
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
|
||||
|
||||
self.assertIsNotNone(wrapper.match_filter(filter_list, args))
|
||||
|
||||
def test_match_filter_recurses_exec_command_matches_user(self):
|
||||
filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'),
|
||||
filters.IpFilter(self._ip, 'user')]
|
||||
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
|
||||
|
||||
# Currently ip netns exec requires root, so verify that
|
||||
# no non-root filter is matched, as that would escalate privileges
|
||||
self.assertRaises(wrapper.NoFilterMatched,
|
||||
wrapper.match_filter, filter_list, args)
|
||||
|
||||
def test_match_filter_recurses_exec_command_filter_does_not_match(self):
|
||||
filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'),
|
||||
filters.IpFilter(self._ip, 'root')]
|
||||
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
|
||||
'ip', 'link', 'list']
|
||||
|
||||
self.assertRaises(wrapper.NoFilterMatched,
|
||||
wrapper.match_filter, filter_list, args)
|
||||
|
||||
def test_ChainingRegExpFilter_match(self):
|
||||
filter_list = [filters.ChainingRegExpFilter('nice', 'root',
|
||||
'nice', '-?\d+'),
|
||||
filters.CommandFilter('cat', 'root')]
|
||||
args = ['nice', '5', 'cat', '/a']
|
||||
dirs = ['/bin', '/usr/bin']
|
||||
|
||||
self.assertIsNotNone(wrapper.match_filter(filter_list, args, dirs))
|
||||
|
||||
def test_ChainingRegExpFilter_not_match(self):
|
||||
filter_list = [filters.ChainingRegExpFilter('nice', 'root',
|
||||
'nice', '-?\d+'),
|
||||
filters.CommandFilter('cat', 'root')]
|
||||
args_invalid = (['nice', '5', 'ls', '/a'],
|
||||
['nice', '--5', 'cat', '/a'],
|
||||
['nice2', '5', 'cat', '/a'],
|
||||
['nice', 'cat', '/a'],
|
||||
['nice', '5'])
|
||||
dirs = ['/bin', '/usr/bin']
|
||||
|
||||
for args in args_invalid:
|
||||
self.assertRaises(wrapper.NoFilterMatched,
|
||||
wrapper.match_filter, filter_list, args, dirs)
|
||||
|
||||
def test_ChainingRegExpFilter_multiple(self):
|
||||
filter_list = [filters.ChainingRegExpFilter('ionice', 'root', 'ionice',
|
||||
'-c[0-3]'),
|
||||
filters.ChainingRegExpFilter('ionice', 'root', 'ionice',
|
||||
'-c[0-3]', '-n[0-7]'),
|
||||
filters.CommandFilter('cat', 'root')]
|
||||
# both filters match to ['ionice', '-c2'], but only the second accepts
|
||||
args = ['ionice', '-c2', '-n7', 'cat', '/a']
|
||||
dirs = ['/bin', '/usr/bin']
|
||||
|
||||
self.assertIsNotNone(wrapper.match_filter(filter_list, args, dirs))
|
||||
|
||||
def test_ReadFileFilter_empty_args(self):
|
||||
goodfn = '/good/file.name'
|
||||
f = filters.ReadFileFilter(goodfn)
|
||||
self.assertFalse(f.match([]))
|
||||
self.assertFalse(f.match(None))
|
||||
|
||||
def test_exec_dirs_search(self):
|
||||
# This test supposes you have /bin/cat or /usr/bin/cat locally
|
||||
f = filters.CommandFilter("cat", "root")
|
||||
usercmd = ['cat', '/f']
|
||||
self.assertTrue(f.match(usercmd))
|
||||
self.assertTrue(f.get_command(usercmd,
|
||||
exec_dirs=['/bin', '/usr/bin'])
|
||||
in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f']))
|
||||
|
||||
def test_skips(self):
|
||||
# Check that all filters are skipped and that the last matches
|
||||
usercmd = ["cat", "/"]
|
||||
filtermatch = wrapper.match_filter(self.filters, usercmd)
|
||||
self.assertTrue(filtermatch is self.filters[-1])
|
||||
|
||||
def test_RootwrapConfig(self):
|
||||
raw = moves.configparser.RawConfigParser()
|
||||
|
||||
# Empty config should raise configparser.Error
|
||||
self.assertRaises(moves.configparser.Error,
|
||||
wrapper.RootwrapConfig, raw)
|
||||
|
||||
# Check default values
|
||||
raw.set('DEFAULT', 'filters_path', '/a,/b')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.filters_path, ['/a', '/b'])
|
||||
self.assertEqual(config.exec_dirs, os.environ["PATH"].split(':'))
|
||||
|
||||
with fixtures.EnvironmentVariable("PATH"):
|
||||
c = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(c.exec_dirs, [])
|
||||
|
||||
self.assertFalse(config.use_syslog)
|
||||
self.assertFalse(config.use_syslog_rfc_format)
|
||||
self.assertEqual(config.syslog_log_facility,
|
||||
logging.handlers.SysLogHandler.LOG_SYSLOG)
|
||||
self.assertEqual(config.syslog_log_level, logging.ERROR)
|
||||
|
||||
# Check general values
|
||||
raw.set('DEFAULT', 'exec_dirs', '/a,/x')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.exec_dirs, ['/a', '/x'])
|
||||
|
||||
raw.set('DEFAULT', 'use_syslog', 'oui')
|
||||
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
|
||||
raw.set('DEFAULT', 'use_syslog', 'true')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertTrue(config.use_syslog)
|
||||
|
||||
raw.set('DEFAULT', 'use_syslog_rfc_format', 'oui')
|
||||
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
|
||||
raw.set('DEFAULT', 'use_syslog_rfc_format', 'true')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertTrue(config.use_syslog_rfc_format)
|
||||
|
||||
raw.set('DEFAULT', 'syslog_log_facility', 'moo')
|
||||
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
|
||||
raw.set('DEFAULT', 'syslog_log_facility', 'local0')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.syslog_log_facility,
|
||||
logging.handlers.SysLogHandler.LOG_LOCAL0)
|
||||
raw.set('DEFAULT', 'syslog_log_facility', 'LOG_AUTH')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.syslog_log_facility,
|
||||
logging.handlers.SysLogHandler.LOG_AUTH)
|
||||
|
||||
raw.set('DEFAULT', 'syslog_log_level', 'bar')
|
||||
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
|
||||
raw.set('DEFAULT', 'syslog_log_level', 'INFO')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.syslog_log_level, logging.INFO)
|
||||
|
||||
|
||||
class PathFilterTestCase(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(PathFilterTestCase, self).setUp()
|
||||
|
||||
tmpdir = fixtures.TempDir('/tmp')
|
||||
self.useFixture(tmpdir)
|
||||
|
||||
self.f = filters.PathFilter('/bin/chown', 'root', 'nova', tmpdir.path)
|
||||
|
||||
gen_name = lambda: str(uuid.uuid4())
|
||||
|
||||
self.SIMPLE_FILE_WITHIN_DIR = os.path.join(tmpdir.path, 'some')
|
||||
self.SIMPLE_FILE_OUTSIDE_DIR = os.path.join('/tmp', 'some')
|
||||
self.TRAVERSAL_WITHIN_DIR = os.path.join(tmpdir.path, 'a', '..',
|
||||
'some')
|
||||
self.TRAVERSAL_OUTSIDE_DIR = os.path.join(tmpdir.path, '..', 'some')
|
||||
|
||||
self.TRAVERSAL_SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path,
|
||||
gen_name())
|
||||
os.symlink(os.path.join(tmpdir.path, 'a', '..', 'a'),
|
||||
self.TRAVERSAL_SYMLINK_WITHIN_DIR)
|
||||
|
||||
self.TRAVERSAL_SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path,
|
||||
gen_name())
|
||||
os.symlink(os.path.join(tmpdir.path, 'a', '..', '..', '..', 'etc'),
|
||||
self.TRAVERSAL_SYMLINK_OUTSIDE_DIR)
|
||||
|
||||
self.SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, gen_name())
|
||||
os.symlink(os.path.join(tmpdir.path, 'a'), self.SYMLINK_WITHIN_DIR)
|
||||
|
||||
self.SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, gen_name())
|
||||
os.symlink(os.path.join('/tmp', 'some_file'), self.SYMLINK_OUTSIDE_DIR)
|
||||
|
||||
def test_empty_args(self):
|
||||
self.assertFalse(self.f.match([]))
|
||||
self.assertFalse(self.f.match(None))
|
||||
|
||||
def test_argument_pass_constraint(self):
|
||||
f = filters.PathFilter('/bin/chown', 'root', 'pass', 'pass')
|
||||
|
||||
args = ['chown', 'something', self.SIMPLE_FILE_OUTSIDE_DIR]
|
||||
self.assertTrue(f.match(args))
|
||||
|
||||
def test_argument_equality_constraint(self):
|
||||
f = filters.PathFilter('/bin/chown', 'root', 'nova', '/tmp/spam/eggs')
|
||||
|
||||
args = ['chown', 'nova', '/tmp/spam/eggs']
|
||||
self.assertTrue(f.match(args))
|
||||
|
||||
args = ['chown', 'quantum', '/tmp/spam/eggs']
|
||||
self.assertFalse(f.match(args))
|
||||
|
||||
def test_wrong_arguments_number(self):
|
||||
args = ['chown', '-c', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_wrong_exec_command(self):
|
||||
args = ['wrong_exec', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_match(self):
|
||||
args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
self.assertTrue(self.f.match(args))
|
||||
|
||||
def test_match_traversal(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
|
||||
self.assertTrue(self.f.match(args))
|
||||
|
||||
def test_match_symlink(self):
|
||||
args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
|
||||
self.assertTrue(self.f.match(args))
|
||||
|
||||
def test_match_traversal_symlink(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
|
||||
self.assertTrue(self.f.match(args))
|
||||
|
||||
def test_reject(self):
|
||||
args = ['chown', 'nova', self.SIMPLE_FILE_OUTSIDE_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_reject_traversal(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_OUTSIDE_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_reject_symlink(self):
|
||||
args = ['chown', 'nova', self.SYMLINK_OUTSIDE_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_reject_traversal_symlink(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_OUTSIDE_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_get_command(self):
|
||||
args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
expected = ['/bin/chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
|
||||
self.assertEqual(expected, self.f.get_command(args))
|
||||
|
||||
def test_get_command_traversal(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
|
||||
expected = ['/bin/chown', 'nova',
|
||||
os.path.realpath(self.TRAVERSAL_WITHIN_DIR)]
|
||||
|
||||
self.assertEqual(expected, self.f.get_command(args))
|
||||
|
||||
def test_get_command_symlink(self):
|
||||
args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
|
||||
expected = ['/bin/chown', 'nova',
|
||||
os.path.realpath(self.SYMLINK_WITHIN_DIR)]
|
||||
|
||||
self.assertEqual(expected, self.f.get_command(args))
|
||||
|
||||
def test_get_command_traversal_symlink(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
|
||||
expected = ['/bin/chown', 'nova',
|
||||
os.path.realpath(self.TRAVERSAL_SYMLINK_WITHIN_DIR)]
|
||||
|
||||
self.assertEqual(expected, self.f.get_command(args))
|
||||
|
||||
|
||||
class RunOneCommandTestCase(testtools.TestCase):
|
||||
def _test_returncode_helper(self, returncode, expected):
|
||||
start_name = 'oslo_rootwrap.wrapper.start_subprocess'
|
||||
with mock.patch(start_name) as mock_start:
|
||||
with mock.patch('sys.exit') as mock_exit:
|
||||
mock_start.return_value.wait.return_value = returncode
|
||||
cmd.run_one_command(None, mock.Mock(), None, None)
|
||||
mock_exit.assert_called_once_with(expected)
|
||||
|
||||
def test_positive_returncode(self):
|
||||
self._test_returncode_helper(1, 1)
|
||||
|
||||
def test_negative_returncode(self):
|
||||
self._test_returncode_helper(-1, 129)
|
@ -1,61 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import imp
|
||||
import os
|
||||
import warnings
|
||||
|
||||
import mock
|
||||
from oslotest import base as test_base
|
||||
import six
|
||||
|
||||
|
||||
class DeprecationWarningTest(test_base.BaseTestCase):
|
||||
|
||||
@mock.patch('warnings.warn')
|
||||
def test_warning(self, mock_warn):
|
||||
import oslo.rootwrap
|
||||
imp.reload(oslo.rootwrap)
|
||||
self.assertTrue(mock_warn.called)
|
||||
args = mock_warn.call_args
|
||||
self.assertIn('oslo_rootwrap', args[0][0])
|
||||
self.assertIn('deprecated', args[0][0])
|
||||
self.assertTrue(issubclass(args[0][1], DeprecationWarning))
|
||||
|
||||
def test_real_warning(self):
|
||||
with warnings.catch_warnings(record=True) as warning_msgs:
|
||||
warnings.resetwarnings()
|
||||
warnings.simplefilter('always', DeprecationWarning)
|
||||
import oslo.rootwrap
|
||||
|
||||
# Use a separate function to get the stack level correct
|
||||
# so we know the message points back to this file. This
|
||||
# corresponds to an import or reload, which isn't working
|
||||
# inside the test under Python 3.3. That may be due to a
|
||||
# difference in the import implementation not triggering
|
||||
# warnings properly when the module is reloaded, or
|
||||
# because the warnings module is mostly implemented in C
|
||||
# and something isn't cleanly resetting the global state
|
||||
# used to track whether a warning needs to be
|
||||
# emitted. Whatever the cause, we definitely see the
|
||||
# warnings.warn() being invoked on a reload (see the test
|
||||
# above) and warnings are reported on the console when we
|
||||
# run the tests. A simpler test script run outside of
|
||||
# testr does correctly report the warnings.
|
||||
def foo():
|
||||
oslo.rootwrap.deprecated()
|
||||
|
||||
foo()
|
||||
self.assertEqual(1, len(warning_msgs))
|
||||
msg = warning_msgs[0]
|
||||
self.assertIn('oslo_rootwrap', six.text_type(msg.message))
|
||||
self.assertEqual('test_warning.py', os.path.basename(msg.filename))
|
Loading…
Reference in New Issue
Block a user