Import Oslo's common rootwrap to Neutron
Use the common oslo-incubator rootwrap rather than maintain a specific fork within Neutron. - Migrated DnsmasqFilter use in dhcp.filters to the new EnvFilter - Changed environment passing in ip_lib's netns.execute so that it can be properly matched using IpNetNsExecFilter + EnvFilter. It now calls "ip netns exec ns env A=B C=D command" instead of "A=B C=D ip netns exec ns command". Adjusted tests accordingly. All the other changes are coming directly from the Oslo "rootwrap" module sync. Notes: - Neutron locates its rootwrap.conf in etc/ rather than in etc/neutron - Neutron maintains a specific bin/quantum-rootwrap-xen-dom0 which requires additional config in rootwrap.conf Both behaviors were preserved in this commit, but this may need to be addressed in the future to simplify future oslo-rootwrap updates. Implements bp: quantum-common-rootwrap Change-Id: I02879942a9d1169a71aa4d684c1b9ec109a6de32
This commit is contained in:
parent
e4f4b9b84a
commit
7f66d9e0b9
@ -16,118 +16,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Root wrapper for Neutron
|
||||
from neutron.openstack.common.rootwrap import cmd
|
||||
|
||||
Filters which commands neutron is allowed to run as another user.
|
||||
|
||||
To use this, you should set the following in neutron.conf and the
|
||||
various .ini files for the agent plugins:
|
||||
root_helper=sudo neutron-rootwrap /etc/neutron/rootwrap.conf
|
||||
|
||||
You also need to let the neutron user run neutron-rootwrap as root in
|
||||
/etc/sudoers:
|
||||
neutron ALL = (root) NOPASSWD: /usr/bin/neutron-rootwrap
|
||||
/etc/neutron/rootwrap.conf *
|
||||
|
||||
Filter specs live in /etc/neutron/rootwrap.d/*.filters, or
|
||||
other locations pointed to by /etc/neutron/rootwrap.conf.
|
||||
To make allowed commands node-specific, your packaging should only
|
||||
install apropriate .filters for commands which are needed on each
|
||||
node.
|
||||
"""
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import ConfigParser
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
RC_UNAUTHORIZED = 99
|
||||
RC_NOCOMMAND = 98
|
||||
RC_BADCONFIG = 97
|
||||
RC_NOEXECFOUND = 96
|
||||
|
||||
|
||||
def _subprocess_setup():
|
||||
# Python installs a SIGPIPE handler by default. This is usually not what
|
||||
# non-Python subprocesses expect.
|
||||
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
||||
|
||||
|
||||
def _exit_error(execname, message, errorcode, log=True):
|
||||
print("%s: %s" % (execname, message))
|
||||
if log:
|
||||
logging.error(message)
|
||||
sys.exit(errorcode)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Split arguments, require at least a command
|
||||
execname = sys.argv.pop(0)
|
||||
if len(sys.argv) < 2:
|
||||
_exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
|
||||
|
||||
configfile = sys.argv.pop(0)
|
||||
userargs = sys.argv[:]
|
||||
|
||||
# Add ../ to sys.path to allow running from branch
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
|
||||
os.pardir, os.pardir))
|
||||
if os.path.exists(os.path.join(possible_topdir, "neutron", "__init__.py")):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
|
||||
from neutron.rootwrap import wrapper
|
||||
|
||||
# Load configuration
|
||||
try:
|
||||
rawconfig = ConfigParser.RawConfigParser()
|
||||
rawconfig.read(configfile)
|
||||
config = wrapper.RootwrapConfig(rawconfig)
|
||||
except ValueError as exc:
|
||||
msg = "Incorrect value in %s: %s" % (configfile, exc.message)
|
||||
_exit_error(execname, msg, RC_BADCONFIG, log=False)
|
||||
except ConfigParser.Error:
|
||||
_exit_error(execname, "Incorrect configuration file: %s" % configfile,
|
||||
RC_BADCONFIG, log=False)
|
||||
|
||||
if config.use_syslog:
|
||||
wrapper.setup_syslog(execname,
|
||||
config.syslog_log_facility,
|
||||
config.syslog_log_level)
|
||||
|
||||
# Execute command if it matches any of the loaded filters
|
||||
filters = wrapper.load_filters(config.filters_path)
|
||||
try:
|
||||
filtermatch = wrapper.match_filter(filters, userargs,
|
||||
exec_dirs=config.exec_dirs)
|
||||
if filtermatch:
|
||||
command = filtermatch.get_command(userargs,
|
||||
exec_dirs=config.exec_dirs)
|
||||
if config.use_syslog:
|
||||
logging.info("(%s > %s) Executing %s (filter match = %s)" % (
|
||||
os.getlogin(), pwd.getpwuid(os.getuid())[0],
|
||||
command, filtermatch.name))
|
||||
|
||||
obj = subprocess.Popen(command,
|
||||
stdin=sys.stdin,
|
||||
stdout=sys.stdout,
|
||||
stderr=sys.stderr,
|
||||
preexec_fn=_subprocess_setup,
|
||||
env=filtermatch.get_environment(userargs))
|
||||
obj.wait()
|
||||
sys.exit(obj.returncode)
|
||||
|
||||
except wrapper.FilterMatchNotExecutable as exc:
|
||||
msg = ("Executable not found: %s (filter match = %s)"
|
||||
% (exc.match.exec_path, exc.match.name))
|
||||
_exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
|
||||
|
||||
except wrapper.NoFilterMatched:
|
||||
msg = ("Unauthorized command: %s (no filter matched)"
|
||||
% ' '.join(userargs))
|
||||
_exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
|
||||
cmd.main()
|
||||
|
@ -16,118 +16,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Root wrapper for Neutron
|
||||
from neutron.openstack.common.rootwrap import cmd
|
||||
|
||||
Filters which commands neutron is allowed to run as another user.
|
||||
|
||||
To use this, you should set the following in neutron.conf and the
|
||||
various .ini files for the agent plugins:
|
||||
root_helper=sudo neutron-rootwrap /etc/neutron/rootwrap.conf
|
||||
|
||||
You also need to let the neutron user run neutron-rootwrap as root in
|
||||
/etc/sudoers:
|
||||
neutron ALL = (root) NOPASSWD: /usr/bin/neutron-rootwrap
|
||||
/etc/neutron/rootwrap.conf *
|
||||
|
||||
Filter specs live in /etc/neutron/rootwrap.d/*.filters, or
|
||||
other locations pointed to by /etc/neutron/rootwrap.conf.
|
||||
To make allowed commands node-specific, your packaging should only
|
||||
install apropriate .filters for commands which are needed on each
|
||||
node.
|
||||
"""
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import ConfigParser
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
RC_UNAUTHORIZED = 99
|
||||
RC_NOCOMMAND = 98
|
||||
RC_BADCONFIG = 97
|
||||
RC_NOEXECFOUND = 96
|
||||
|
||||
|
||||
def _subprocess_setup():
|
||||
# Python installs a SIGPIPE handler by default. This is usually not what
|
||||
# non-Python subprocesses expect.
|
||||
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
||||
|
||||
|
||||
def _exit_error(execname, message, errorcode, log=True):
|
||||
print("%s: %s" % (execname, message))
|
||||
if log:
|
||||
logging.error(message)
|
||||
sys.exit(errorcode)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Split arguments, require at least a command
|
||||
execname = sys.argv.pop(0)
|
||||
if len(sys.argv) < 2:
|
||||
_exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
|
||||
|
||||
configfile = sys.argv.pop(0)
|
||||
userargs = sys.argv[:]
|
||||
|
||||
# Add ../ to sys.path to allow running from branch
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
|
||||
os.pardir, os.pardir))
|
||||
if os.path.exists(os.path.join(possible_topdir, "neutron", "__init__.py")):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
|
||||
from neutron.rootwrap import wrapper
|
||||
|
||||
# Load configuration
|
||||
try:
|
||||
rawconfig = ConfigParser.RawConfigParser()
|
||||
rawconfig.read(configfile)
|
||||
config = wrapper.RootwrapConfig(rawconfig)
|
||||
except ValueError as exc:
|
||||
msg = "Incorrect value in %s: %s" % (configfile, exc.message)
|
||||
_exit_error(execname, msg, RC_BADCONFIG, log=False)
|
||||
except ConfigParser.Error:
|
||||
_exit_error(execname, "Incorrect configuration file: %s" % configfile,
|
||||
RC_BADCONFIG, log=False)
|
||||
|
||||
if config.use_syslog:
|
||||
wrapper.setup_syslog(execname,
|
||||
config.syslog_log_facility,
|
||||
config.syslog_log_level)
|
||||
|
||||
# Execute command if it matches any of the loaded filters
|
||||
filters = wrapper.load_filters(config.filters_path)
|
||||
try:
|
||||
filtermatch = wrapper.match_filter(filters, userargs,
|
||||
exec_dirs=config.exec_dirs)
|
||||
if filtermatch:
|
||||
command = filtermatch.get_command(userargs,
|
||||
exec_dirs=config.exec_dirs)
|
||||
if config.use_syslog:
|
||||
logging.info("(%s > %s) Executing %s (filter match = %s)" % (
|
||||
os.getlogin(), pwd.getpwuid(os.getuid())[0],
|
||||
command, filtermatch.name))
|
||||
|
||||
obj = subprocess.Popen(command,
|
||||
stdin=sys.stdin,
|
||||
stdout=sys.stdout,
|
||||
stderr=sys.stderr,
|
||||
preexec_fn=_subprocess_setup,
|
||||
env=filtermatch.get_environment(userargs))
|
||||
obj.wait()
|
||||
sys.exit(obj.returncode)
|
||||
|
||||
except wrapper.FilterMatchNotExecutable as exc:
|
||||
msg = ("Executable not found: %s (filter match = %s)"
|
||||
% (exc.match.exec_path, exc.match.name))
|
||||
_exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
|
||||
|
||||
except wrapper.NoFilterMatched:
|
||||
msg = ("Unauthorized command: %s (no filter matched)"
|
||||
% ' '.join(userargs))
|
||||
_exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
|
||||
cmd.main()
|
||||
|
@ -95,7 +95,7 @@ def filter_command(exec_name, filters_path, user_args, exec_dirs):
|
||||
if os.path.exists(os.path.join(possible_topdir, "quantum", "__init__.py")):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
|
||||
from quantum.rootwrap import wrapper
|
||||
from neutron.openstack.common.rootwrap import wrapper
|
||||
|
||||
# Execute command if it matches any of the loaded filters
|
||||
filters = wrapper.load_filters(filters_path)
|
||||
|
@ -9,9 +9,7 @@
|
||||
[Filters]
|
||||
|
||||
# dhcp-agent
|
||||
ip_exec_dnsmasq: DnsmasqNetnsFilter, ip, root
|
||||
dnsmasq: DnsmasqFilter, /sbin/dnsmasq, root
|
||||
dnsmasq_usr: DnsmasqFilter, /usr/sbin/dnsmasq, root
|
||||
dnsmasq: EnvFilter, dnsmasq, root, NEUTRON_RELAY_SOCKET_PATH=, NEUTRON_NETWORK_ID=
|
||||
# dhcp-agent uses kill as well, that's handled by the generic KillFilter
|
||||
# it looks like these are the only signals needed, per
|
||||
# neutron/agent/linux/dhcp.py
|
||||
|
@ -1,3 +1,6 @@
|
||||
# Configuration for neutron-rootwrap
|
||||
# This file should be owned by (and only-writeable by) the root user
|
||||
|
||||
[DEFAULT]
|
||||
# List of directories to load filter definitions from (separated by ',').
|
||||
# These directories MUST all be only writeable by root !
|
||||
@ -9,6 +12,20 @@ filters_path=/etc/quantum/rootwrap.d,/usr/share/quantum/rootwrap
|
||||
# These directories MUST all be only writeable by root !
|
||||
exec_dirs=/sbin,/usr/sbin,/bin,/usr/bin
|
||||
|
||||
# Enable logging to syslog
|
||||
# Default value is False
|
||||
use_syslog=False
|
||||
|
||||
# Which syslog facility to use.
|
||||
# Valid values include auth, authpriv, syslog, user0, user1...
|
||||
# Default value is 'syslog'
|
||||
syslog_log_facility=syslog
|
||||
|
||||
# Which messages to log.
|
||||
# INFO means log all usage
|
||||
# ERROR means only log unsuccessful attempts
|
||||
syslog_log_level=ERROR
|
||||
|
||||
[xenapi]
|
||||
# XenAPI configuration is only required by the L2 agent if it is to
|
||||
# target a XenServer/XCP compute host's dom0.
|
||||
|
@ -424,9 +424,13 @@ class IpNetnsCommand(IpCommandBase):
|
||||
elif not self._parent.namespace:
|
||||
raise Exception(_('No namespace defined for parent'))
|
||||
else:
|
||||
env_params = []
|
||||
if addl_env:
|
||||
env_params = (['env'] +
|
||||
['%s=%s' % pair for pair in addl_env.items()])
|
||||
return utils.execute(
|
||||
['%s=%s' % pair for pair in addl_env.items()] +
|
||||
['ip', 'netns', 'exec', self._parent.namespace] + list(cmds),
|
||||
['ip', 'netns', 'exec', self._parent.namespace] +
|
||||
env_params + list(cmds),
|
||||
root_helper=self._parent.root_helper,
|
||||
check_exit_code=check_exit_code)
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2012 OpenStack Foundation.
|
||||
# Copyright (c) 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
130
neutron/openstack/common/rootwrap/cmd.py
Executable file
130
neutron/openstack/common/rootwrap/cmd.py
Executable file
@ -0,0 +1,130 @@
|
||||
#!/usr/bin/env python
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Root wrapper for OpenStack services
|
||||
|
||||
Filters which commands a service is allowed to run as another user.
|
||||
|
||||
To use this with neutron, you should set the following in
|
||||
neutron.conf:
|
||||
rootwrap_config=/etc/neutron/rootwrap.conf
|
||||
|
||||
You also need to let the neutron user run neutron-rootwrap
|
||||
as root in sudoers:
|
||||
neutron ALL = (root) NOPASSWD: /usr/bin/neutron-rootwrap
|
||||
/etc/neutron/rootwrap.conf *
|
||||
|
||||
Service packaging should deploy .filters files only on nodes where
|
||||
they are needed, to avoid allowing more than is necessary.
|
||||
"""
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import ConfigParser
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
RC_UNAUTHORIZED = 99
|
||||
RC_NOCOMMAND = 98
|
||||
RC_BADCONFIG = 97
|
||||
RC_NOEXECFOUND = 96
|
||||
|
||||
|
||||
def _subprocess_setup():
|
||||
# Python installs a SIGPIPE handler by default. This is usually not what
|
||||
# non-Python subprocesses expect.
|
||||
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
||||
|
||||
|
||||
def _exit_error(execname, message, errorcode, log=True):
|
||||
print("%s: %s" % (execname, message))
|
||||
if log:
|
||||
logging.error(message)
|
||||
sys.exit(errorcode)
|
||||
|
||||
|
||||
def main():
|
||||
# Split arguments, require at least a command
|
||||
execname = sys.argv.pop(0)
|
||||
if len(sys.argv) < 2:
|
||||
_exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
|
||||
|
||||
configfile = sys.argv.pop(0)
|
||||
userargs = sys.argv[:]
|
||||
|
||||
# Add ../ to sys.path to allow running from branch
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
|
||||
os.pardir, os.pardir))
|
||||
if os.path.exists(os.path.join(possible_topdir, "neutron", "__init__.py")):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
|
||||
from neutron.openstack.common.rootwrap import wrapper
|
||||
|
||||
# Load configuration
|
||||
try:
|
||||
rawconfig = ConfigParser.RawConfigParser()
|
||||
rawconfig.read(configfile)
|
||||
config = wrapper.RootwrapConfig(rawconfig)
|
||||
except ValueError as exc:
|
||||
msg = "Incorrect value in %s: %s" % (configfile, exc.message)
|
||||
_exit_error(execname, msg, RC_BADCONFIG, log=False)
|
||||
except ConfigParser.Error:
|
||||
_exit_error(execname, "Incorrect configuration file: %s" % configfile,
|
||||
RC_BADCONFIG, log=False)
|
||||
|
||||
if config.use_syslog:
|
||||
wrapper.setup_syslog(execname,
|
||||
config.syslog_log_facility,
|
||||
config.syslog_log_level)
|
||||
|
||||
# Execute command if it matches any of the loaded filters
|
||||
filters = wrapper.load_filters(config.filters_path)
|
||||
try:
|
||||
filtermatch = wrapper.match_filter(filters, userargs,
|
||||
exec_dirs=config.exec_dirs)
|
||||
if filtermatch:
|
||||
command = filtermatch.get_command(userargs,
|
||||
exec_dirs=config.exec_dirs)
|
||||
if config.use_syslog:
|
||||
logging.info("(%s > %s) Executing %s (filter match = %s)" % (
|
||||
os.getlogin(), pwd.getpwuid(os.getuid())[0],
|
||||
command, filtermatch.name))
|
||||
|
||||
obj = subprocess.Popen(command,
|
||||
stdin=sys.stdin,
|
||||
stdout=sys.stdout,
|
||||
stderr=sys.stderr,
|
||||
preexec_fn=_subprocess_setup,
|
||||
env=filtermatch.get_environment(userargs))
|
||||
obj.wait()
|
||||
sys.exit(obj.returncode)
|
||||
|
||||
except wrapper.FilterMatchNotExecutable as exc:
|
||||
msg = ("Executable not found: %s (filter match = %s)"
|
||||
% (exc.match.exec_path, exc.match.name))
|
||||
_exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
|
||||
|
||||
except wrapper.NoFilterMatched:
|
||||
msg = ("Unauthorized command: %s (no filter matched)"
|
||||
% ' '.join(userargs))
|
||||
_exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2012 OpenStack Foundation.
|
||||
# Copyright (c) 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -34,7 +34,7 @@ class CommandFilter(object):
|
||||
if self.real_exec is not None:
|
||||
return self.real_exec
|
||||
self.real_exec = ""
|
||||
if self.exec_path.startswith('/'):
|
||||
if os.path.isabs(self.exec_path):
|
||||
if os.access(self.exec_path, os.X_OK):
|
||||
self.real_exec = self.exec_path
|
||||
else:
|
||||
@ -62,11 +62,6 @@ class CommandFilter(object):
|
||||
return None
|
||||
|
||||
|
||||
class ExecCommandFilter(CommandFilter):
|
||||
def exec_args(self, userargs):
|
||||
return []
|
||||
|
||||
|
||||
class RegExpFilter(CommandFilter):
|
||||
"""Command filter doing regexp matching for every argument."""
|
||||
|
||||
@ -140,62 +135,39 @@ class PathFilter(CommandFilter):
|
||||
class DnsmasqFilter(CommandFilter):
|
||||
"""Specific filter for the dnsmasq call (which includes env)."""
|
||||
|
||||
def is_dnsmasq_cmd(self, argv):
|
||||
if (argv[0] == "dnsmasq"):
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_dnsmasq_env_vars(self, argv):
|
||||
if (argv[0].startswith("NEUTRON_RELAY_SOCKET_PATH=") and
|
||||
argv[1].startswith("NEUTRON_NETWORK_ID=")):
|
||||
return True
|
||||
return False
|
||||
CONFIG_FILE_ARG = 'CONFIG_FILE'
|
||||
|
||||
def match(self, userargs):
|
||||
"""This matches the combination of the leading env
|
||||
vars plus "dnsmasq"
|
||||
"""
|
||||
if (self.is_dnsmasq_env_vars(userargs) and
|
||||
self.is_dnsmasq_cmd(userargs[2:])):
|
||||
if (userargs[0] == 'env' and
|
||||
userargs[1].startswith(self.CONFIG_FILE_ARG) and
|
||||
userargs[2].startswith('NETWORK_ID=') and
|
||||
userargs[3] == 'dnsmasq'):
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_command(self, userargs, exec_dirs=[]):
|
||||
to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
|
||||
return [to_exec] + userargs[3:]
|
||||
dnsmasq_pos = userargs.index('dnsmasq')
|
||||
return [to_exec] + userargs[dnsmasq_pos + 1:]
|
||||
|
||||
def get_environment(self, userargs):
|
||||
env = os.environ.copy()
|
||||
env['NEUTRON_RELAY_SOCKET_PATH'] = userargs[0].split('=')[-1]
|
||||
env['NEUTRON_NETWORK_ID'] = userargs[1].split('=')[-1]
|
||||
env[self.CONFIG_FILE_ARG] = userargs[1].split('=')[-1]
|
||||
env['NETWORK_ID'] = userargs[2].split('=')[-1]
|
||||
return env
|
||||
|
||||
|
||||
class DnsmasqNetnsFilter(DnsmasqFilter):
|
||||
"""Specific filter for the dnsmasq call (which includes env)."""
|
||||
|
||||
def is_ip_netns_cmd(self, argv):
|
||||
if ((argv[0] == "ip") and
|
||||
(argv[1] == "netns") and
|
||||
(argv[2] == "exec")):
|
||||
return True
|
||||
return False
|
||||
|
||||
def match(self, userargs):
|
||||
"""This matches the combination of the leading env
|
||||
vars plus "ip" "netns" "exec" <foo> "dnsmasq"
|
||||
"""
|
||||
if (self.is_dnsmasq_env_vars(userargs) and
|
||||
self.is_ip_netns_cmd(userargs[2:]) and
|
||||
self.is_dnsmasq_cmd(userargs[6:])):
|
||||
return True
|
||||
return False
|
||||
class DeprecatedDnsmasqFilter(DnsmasqFilter):
|
||||
"""Variant of dnsmasq filter to support old-style FLAGFILE."""
|
||||
CONFIG_FILE_ARG = 'FLAGFILE'
|
||||
|
||||
|
||||
class KillFilter(CommandFilter):
|
||||
"""Specific filter for the kill calls.
|
||||
|
||||
1st argument is the user to run /bin/kill under
|
||||
2nd argument is the location of the affected executable
|
||||
if the argument is not absolute, it is checked against $PATH
|
||||
Subsequent arguments list the accepted signals (if any)
|
||||
|
||||
This filter relies on /proc to accurately determine affected
|
||||
@ -224,17 +196,28 @@ class KillFilter(CommandFilter):
|
||||
return False
|
||||
try:
|
||||
command = os.readlink("/proc/%d/exe" % int(args[1]))
|
||||
# NOTE(dprince): /proc/PID/exe may have ' (deleted)' on
|
||||
# the end if an executable is updated or deleted
|
||||
if command.endswith(" (deleted)"):
|
||||
command = command[:command.rindex(" ")]
|
||||
if command != self.args[0]:
|
||||
# Affected executable does not match
|
||||
return False
|
||||
except (ValueError, OSError):
|
||||
# Incorrect PID
|
||||
return False
|
||||
return True
|
||||
|
||||
# NOTE(yufang521247): /proc/PID/exe may have '\0' on the
|
||||
# end, because python doen't stop at '\0' when read the
|
||||
# target path.
|
||||
command = command.partition('\0')[0]
|
||||
|
||||
# NOTE(dprince): /proc/PID/exe may have ' (deleted)' on
|
||||
# the end if an executable is updated or deleted
|
||||
if command.endswith(" (deleted)"):
|
||||
command = command[:-len(" (deleted)")]
|
||||
|
||||
kill_command = self.args[0]
|
||||
|
||||
if os.path.isabs(kill_command):
|
||||
return kill_command == command
|
||||
|
||||
return (os.path.isabs(command) and
|
||||
kill_command == os.path.basename(command) and
|
||||
os.path.dirname(command) in os.environ['PATH'].split(':'))
|
||||
|
||||
|
||||
class ReadFileFilter(CommandFilter):
|
||||
@ -260,22 +243,106 @@ class IpFilter(CommandFilter):
|
||||
def match(self, userargs):
|
||||
if userargs[0] == 'ip':
|
||||
if userargs[1] == 'netns':
|
||||
if userargs[2] in ('list', 'add', 'delete'):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
return (userargs[2] in ('list', 'add', 'delete'))
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
class IpNetnsExecFilter(ExecCommandFilter):
|
||||
"""Specific filter for the ip utility to that does match exec."""
|
||||
class EnvFilter(CommandFilter):
|
||||
"""Specific filter for the env utility.
|
||||
|
||||
Behaves like CommandFilter, except that it handles
|
||||
leading env A=B.. strings appropriately.
|
||||
"""
|
||||
|
||||
def _extract_env(self, arglist):
|
||||
"""Extract all leading NAME=VALUE arguments from arglist."""
|
||||
|
||||
envs = set()
|
||||
for arg in arglist:
|
||||
if '=' not in arg:
|
||||
break
|
||||
envs.add(arg.partition('=')[0])
|
||||
return envs
|
||||
|
||||
def __init__(self, exec_path, run_as, *args):
|
||||
super(EnvFilter, self).__init__(exec_path, run_as, *args)
|
||||
|
||||
env_list = self._extract_env(self.args)
|
||||
# Set exec_path to X when args are in the form of
|
||||
# env A=a B=b C=c X Y Z
|
||||
if "env" in exec_path and len(env_list) < len(self.args):
|
||||
self.exec_path = self.args[len(env_list)]
|
||||
|
||||
def match(self, userargs):
|
||||
if userargs[:3] == ['ip', 'netns', 'exec']:
|
||||
return True
|
||||
else:
|
||||
# ignore leading 'env'
|
||||
if userargs[0] == 'env':
|
||||
userargs.pop(0)
|
||||
|
||||
# require one additional argument after configured ones
|
||||
if len(userargs) < len(self.args):
|
||||
return False
|
||||
|
||||
# extract all env args
|
||||
user_envs = self._extract_env(userargs)
|
||||
filter_envs = self._extract_env(self.args)
|
||||
user_command = userargs[len(user_envs):len(user_envs) + 1]
|
||||
|
||||
# match first non-env argument with CommandFilter
|
||||
return (super(EnvFilter, self).match(user_command)
|
||||
and len(filter_envs) and user_envs == filter_envs)
|
||||
|
||||
def exec_args(self, userargs):
|
||||
args = userargs[:]
|
||||
|
||||
# ignore leading 'env'
|
||||
if args[0] == 'env':
|
||||
args.pop(0)
|
||||
|
||||
# Throw away leading NAME=VALUE arguments
|
||||
while args and '=' in args[0]:
|
||||
args.pop(0)
|
||||
|
||||
return args
|
||||
|
||||
def get_command(self, userargs, exec_dirs=[]):
|
||||
to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
|
||||
return [to_exec] + self.exec_args(userargs)[1:]
|
||||
|
||||
def get_environment(self, userargs):
|
||||
env = os.environ.copy()
|
||||
|
||||
# ignore leading 'env'
|
||||
if userargs[0] == 'env':
|
||||
userargs.pop(0)
|
||||
|
||||
# Handle leading NAME=VALUE pairs
|
||||
for a in userargs:
|
||||
env_name, equals, env_value = a.partition('=')
|
||||
if not equals:
|
||||
break
|
||||
if env_name and env_value:
|
||||
env[env_name] = env_value
|
||||
|
||||
return env
|
||||
|
||||
|
||||
class ChainingFilter(CommandFilter):
|
||||
def exec_args(self, userargs):
|
||||
return []
|
||||
|
||||
|
||||
class IpNetnsExecFilter(ChainingFilter):
|
||||
"""Specific filter for the ip utility to that does match exec."""
|
||||
|
||||
def match(self, userargs):
|
||||
# Network namespaces currently require root
|
||||
# require <ns> argument
|
||||
if self.run_as != "root" or len(userargs) < 4:
|
||||
return False
|
||||
|
||||
return (userargs[:3] == ['ip', 'netns', 'exec'])
|
||||
|
||||
def exec_args(self, userargs):
|
||||
args = userargs[4:]
|
||||
if args:
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2012 OpenStack Foundation.
|
||||
# Copyright (c) 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -22,7 +22,7 @@ import logging.handlers
|
||||
import os
|
||||
import string
|
||||
|
||||
from neutron.rootwrap import filters
|
||||
from neutron.openstack.common.rootwrap import filters
|
||||
|
||||
|
||||
class NoFilterMatched(Exception):
|
||||
@ -119,10 +119,10 @@ def load_filters(filters_path):
|
||||
|
||||
|
||||
def match_filter(filter_list, userargs, exec_dirs=[]):
|
||||
"""Return first matched filter from command filters.
|
||||
"""Checks user command and arguments through command filters.
|
||||
|
||||
Returns the first matching filter.
|
||||
|
||||
Checks user command and arguments through command filters and
|
||||
returns the first matching filter.
|
||||
Raises NoFilterMatched if no filter matched.
|
||||
Raises FilterMatchNotExecutable if no executable was found for the
|
||||
best filter match.
|
||||
@ -131,15 +131,18 @@ def match_filter(filter_list, userargs, exec_dirs=[]):
|
||||
|
||||
for f in filter_list:
|
||||
if f.match(userargs):
|
||||
if isinstance(f, filters.ExecCommandFilter):
|
||||
if isinstance(f, filters.ChainingFilter):
|
||||
# This command calls exec verify that remaining args
|
||||
# matches another filter.
|
||||
def non_chain_filter(fltr):
|
||||
return (fltr.run_as == f.run_as
|
||||
and not isinstance(fltr, filters.ChainingFilter))
|
||||
|
||||
leaf_filters = [fltr for fltr in filter_list
|
||||
if not isinstance(fltr,
|
||||
filters.ExecCommandFilter)]
|
||||
if non_chain_filter(fltr)]
|
||||
args = f.exec_args(userargs)
|
||||
if (not args or not
|
||||
match_filter(leaf_filters, args, exec_dirs=exec_dirs)):
|
||||
if (not args or not match_filter(leaf_filters,
|
||||
args, exec_dirs=exec_dirs)):
|
||||
continue
|
||||
|
||||
# Try other filters if executable is absent
|
@ -428,12 +428,13 @@ class TestDnsmasq(TestBase):
|
||||
raise IndexError
|
||||
|
||||
expected = [
|
||||
'NEUTRON_RELAY_SOCKET_PATH=/dhcp/lease_relay',
|
||||
'NEUTRON_NETWORK_ID=cccccccc-cccc-cccc-cccc-cccccccccccc',
|
||||
'ip',
|
||||
'netns',
|
||||
'exec',
|
||||
'qdhcp-ns',
|
||||
'env',
|
||||
'NEUTRON_RELAY_SOCKET_PATH=/dhcp/lease_relay',
|
||||
'NEUTRON_NETWORK_ID=cccccccc-cccc-cccc-cccc-cccccccccccc',
|
||||
'dnsmasq',
|
||||
'--no-hosts',
|
||||
'--no-resolv',
|
||||
|
@ -670,8 +670,8 @@ class TestIpNetnsCommand(TestIPCmdBase):
|
||||
env = dict(FOO=1, BAR=2)
|
||||
self.netns_cmd.execute(['ip', 'link', 'list'], env)
|
||||
execute.assert_called_once_with(
|
||||
['FOO=1', 'BAR=2', 'ip', 'netns', 'exec', 'ns', 'ip', 'link',
|
||||
'list'],
|
||||
['ip', 'netns', 'exec', 'ns', 'env', 'FOO=1', 'BAR=2',
|
||||
'ip', 'link', 'list'],
|
||||
root_helper='sudo', check_exit_code=True)
|
||||
|
||||
|
||||
|
@ -1,366 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import ConfigParser
|
||||
import logging
|
||||
import logging.handlers
|
||||
import os
|
||||
import subprocess
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
|
||||
from neutron.rootwrap import filters
|
||||
from neutron.rootwrap import wrapper
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
class RootwrapTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(RootwrapTestCase, self).setUp()
|
||||
self.filters = [
|
||||
filters.RegExpFilter("/bin/ls", "root", 'ls', '/[a-z]+'),
|
||||
filters.CommandFilter("/usr/bin/foo_bar_not_exist", "root"),
|
||||
filters.RegExpFilter("/bin/cat", "root", 'cat', '/[a-z]+'),
|
||||
filters.CommandFilter("/nonexistent/cat", "root"),
|
||||
filters.CommandFilter("/bin/cat", "root") # Keep this one last
|
||||
]
|
||||
|
||||
def test_RegExpFilter_match(self):
|
||||
usercmd = ["ls", "/root"]
|
||||
filtermatch = wrapper.match_filter(self.filters, usercmd)
|
||||
self.assertFalse(filtermatch is None)
|
||||
self.assertEqual(filtermatch.get_command(usercmd),
|
||||
["/bin/ls", "/root"])
|
||||
|
||||
def test_RegExpFilter_reject(self):
|
||||
usercmd = ["ls", "root"]
|
||||
self.assertRaises(wrapper.NoFilterMatched,
|
||||
wrapper.match_filter, self.filters, usercmd)
|
||||
|
||||
def test_missing_command(self):
|
||||
valid_but_missing = ["foo_bar_not_exist"]
|
||||
invalid = ["foo_bar_not_exist_and_not_matched"]
|
||||
self.assertRaises(wrapper.FilterMatchNotExecutable,
|
||||
wrapper.match_filter,
|
||||
self.filters, valid_but_missing)
|
||||
self.assertRaises(wrapper.NoFilterMatched,
|
||||
wrapper.match_filter, self.filters, invalid)
|
||||
|
||||
def test_DnsmasqFilter(self):
|
||||
usercmd = ['NEUTRON_RELAY_SOCKET_PATH=A', 'NEUTRON_NETWORK_ID=foobar',
|
||||
'dnsmasq', 'foo']
|
||||
f = filters.DnsmasqFilter("/usr/bin/dnsmasq", "root")
|
||||
self.assertTrue(f.match(usercmd))
|
||||
self.assertEqual(f.get_command(usercmd), ['/usr/bin/dnsmasq', 'foo'])
|
||||
env = f.get_environment(usercmd)
|
||||
self.assertEqual(env.get('NEUTRON_RELAY_SOCKET_PATH'), 'A')
|
||||
self.assertEqual(env.get('NEUTRON_NETWORK_ID'), 'foobar')
|
||||
|
||||
def test_DnsmasqNetnsFilter(self):
|
||||
usercmd = ['NEUTRON_RELAY_SOCKET_PATH=A', 'NEUTRON_NETWORK_ID=foobar',
|
||||
'ip', 'netns', 'exec', 'foo', 'dnsmasq', 'foo']
|
||||
f = filters.DnsmasqNetnsFilter("/sbin/ip", "root")
|
||||
self.assertTrue(f.match(usercmd))
|
||||
self.assertEqual(f.get_command(usercmd), ['/sbin/ip', 'netns', 'exec',
|
||||
'foo', 'dnsmasq', 'foo'])
|
||||
env = f.get_environment(usercmd)
|
||||
self.assertEqual(env.get('NEUTRON_RELAY_SOCKET_PATH'), 'A')
|
||||
self.assertEqual(env.get('NEUTRON_NETWORK_ID'), 'foobar')
|
||||
|
||||
def test_KillFilter(self):
|
||||
if not os.path.exists("/proc/%d" % os.getpid()):
|
||||
self.skipTest("Test requires /proc filesystem (procfs)")
|
||||
p = subprocess.Popen(["cat"], stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
try:
|
||||
f = filters.KillFilter("root", "/bin/cat", "-9", "-HUP")
|
||||
f2 = filters.KillFilter("root", "/usr/bin/cat", "-9", "-HUP")
|
||||
usercmd = ['kill', '-ALRM', p.pid]
|
||||
# Incorrect signal should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
usercmd = ['kill', p.pid]
|
||||
# Providing no signal should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
# Providing matching signal should be allowed
|
||||
usercmd = ['kill', '-9', p.pid]
|
||||
self.assertTrue(f.match(usercmd) or f2.match(usercmd))
|
||||
|
||||
f = filters.KillFilter("root", "/bin/cat")
|
||||
f2 = filters.KillFilter("root", "/usr/bin/cat")
|
||||
usercmd = ['kill', os.getpid()]
|
||||
# Our own PID does not match /bin/sleep, so it should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
usercmd = ['kill', 999999]
|
||||
# Nonexistent PID should fail
|
||||
self.assertFalse(f.match(usercmd) or f2.match(usercmd))
|
||||
usercmd = ['kill', p.pid]
|
||||
# Providing no signal should work
|
||||
self.assertTrue(f.match(usercmd) or f2.match(usercmd))
|
||||
finally:
|
||||
# Terminate the "cat" process and wait for it to finish
|
||||
p.terminate()
|
||||
p.wait()
|
||||
|
||||
def test_KillFilter_no_raise(self):
|
||||
"""Makes sure ValueError from bug 926412 is gone."""
|
||||
f = filters.KillFilter("root", "")
|
||||
# Providing anything other than kill should be False
|
||||
usercmd = ['notkill', 999999]
|
||||
self.assertFalse(f.match(usercmd))
|
||||
# Providing something that is not a pid should be False
|
||||
usercmd = ['kill', 'notapid']
|
||||
self.assertFalse(f.match(usercmd))
|
||||
|
||||
def test_KillFilter_deleted_exe(self):
|
||||
"""Makes sure deleted exe's are killed correctly."""
|
||||
# See bug #967931.
|
||||
def fake_readlink(blah):
|
||||
return '/bin/commandddddd (deleted)'
|
||||
|
||||
f = filters.KillFilter("root", "/bin/commandddddd")
|
||||
usercmd = ['kill', 1234]
|
||||
# Providing no signal should work
|
||||
self.stubs.Set(os, 'readlink', fake_readlink)
|
||||
self.assertTrue(f.match(usercmd))
|
||||
|
||||
def test_ReadFileFilter(self):
|
||||
goodfn = '/good/file.name'
|
||||
f = filters.ReadFileFilter(goodfn)
|
||||
usercmd = ['cat', '/bad/file']
|
||||
self.assertFalse(f.match(['cat', '/bad/file']))
|
||||
usercmd = ['cat', goodfn]
|
||||
self.assertEqual(f.get_command(usercmd), ['/bin/cat', goodfn])
|
||||
self.assertTrue(f.match(usercmd))
|
||||
|
||||
def test_IpFilter_non_netns(self):
|
||||
f = filters.IpFilter('/sbin/ip', 'root')
|
||||
self.assertTrue(f.match(['ip', 'link', 'list']))
|
||||
|
||||
def _test_IpFilter_netns_helper(self, action):
|
||||
f = filters.IpFilter('/sbin/ip', 'root')
|
||||
self.assertTrue(f.match(['ip', 'link', action]))
|
||||
|
||||
def test_IpFilter_netns_add(self):
|
||||
self._test_IpFilter_netns_helper('add')
|
||||
|
||||
def test_IpFilter_netns_delete(self):
|
||||
self._test_IpFilter_netns_helper('delete')
|
||||
|
||||
def test_IpFilter_netns_list(self):
|
||||
self._test_IpFilter_netns_helper('list')
|
||||
|
||||
def test_IpNetnsExecFilter_match(self):
|
||||
f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
|
||||
self.assertTrue(
|
||||
f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
|
||||
|
||||
def test_IpNetnsExecFilter_nomatch(self):
|
||||
f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
|
||||
self.assertFalse(f.match(['ip', 'link', 'list']))
|
||||
|
||||
def test_match_filter_recurses_exec_command_filter_matches(self):
|
||||
filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
|
||||
filters.IpFilter('/sbin/ip', 'root')]
|
||||
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
|
||||
|
||||
self.assertIsNotNone(wrapper.match_filter(filter_list, args))
|
||||
|
||||
def test_match_filter_recurses_exec_command_filter_does_not_match(self):
|
||||
filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
|
||||
filters.IpFilter('/sbin/ip', 'root')]
|
||||
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
|
||||
'ip', 'link', 'list']
|
||||
|
||||
self.assertRaises(wrapper.NoFilterMatched,
|
||||
wrapper.match_filter, filter_list, args)
|
||||
|
||||
def test_exec_dirs_search(self):
|
||||
# This test supposes you have /bin/cat or /usr/bin/cat locally
|
||||
f = filters.CommandFilter("cat", "root")
|
||||
usercmd = ['cat', '/f']
|
||||
self.assertTrue(f.match(usercmd))
|
||||
self.assertTrue(f.get_command(usercmd,
|
||||
exec_dirs=['/bin', '/usr/bin'])
|
||||
in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f']))
|
||||
|
||||
def test_skips(self):
|
||||
# Check that all filters are skipped and that the last matches
|
||||
usercmd = ["cat", "/"]
|
||||
filtermatch = wrapper.match_filter(self.filters, usercmd)
|
||||
self.assertTrue(filtermatch is self.filters[-1])
|
||||
|
||||
def test_RootwrapConfig(self):
|
||||
raw = ConfigParser.RawConfigParser()
|
||||
|
||||
# Empty config should raise ConfigParser.Error
|
||||
self.assertRaises(ConfigParser.Error, wrapper.RootwrapConfig, raw)
|
||||
|
||||
# Check default values
|
||||
raw.set('DEFAULT', 'filters_path', '/a,/b')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.filters_path, ['/a', '/b'])
|
||||
self.assertEqual(config.exec_dirs, os.environ["PATH"].split(':'))
|
||||
self.assertFalse(config.use_syslog)
|
||||
self.assertEqual(config.syslog_log_facility,
|
||||
logging.handlers.SysLogHandler.LOG_SYSLOG)
|
||||
self.assertEqual(config.syslog_log_level, logging.ERROR)
|
||||
|
||||
# Check general values
|
||||
raw.set('DEFAULT', 'exec_dirs', '/a,/x')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.exec_dirs, ['/a', '/x'])
|
||||
|
||||
raw.set('DEFAULT', 'use_syslog', 'oui')
|
||||
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
|
||||
raw.set('DEFAULT', 'use_syslog', 'true')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertTrue(config.use_syslog)
|
||||
|
||||
raw.set('DEFAULT', 'syslog_log_facility', 'moo')
|
||||
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
|
||||
raw.set('DEFAULT', 'syslog_log_facility', 'local0')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.syslog_log_facility,
|
||||
logging.handlers.SysLogHandler.LOG_LOCAL0)
|
||||
raw.set('DEFAULT', 'syslog_log_facility', 'LOG_AUTH')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.syslog_log_facility,
|
||||
logging.handlers.SysLogHandler.LOG_AUTH)
|
||||
|
||||
raw.set('DEFAULT', 'syslog_log_level', 'bar')
|
||||
self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
|
||||
raw.set('DEFAULT', 'syslog_log_level', 'INFO')
|
||||
config = wrapper.RootwrapConfig(raw)
|
||||
self.assertEqual(config.syslog_log_level, logging.INFO)
|
||||
|
||||
|
||||
class PathFilterTestCase(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(PathFilterTestCase, self).setUp()
|
||||
|
||||
tmpdir = fixtures.TempDir('/tmp')
|
||||
self.useFixture(tmpdir)
|
||||
|
||||
self.f = filters.PathFilter('/bin/chown', 'root', 'nova', tmpdir.path)
|
||||
|
||||
gen_name = lambda: str(uuid.uuid4())
|
||||
|
||||
self.SIMPLE_FILE_WITHIN_DIR = os.path.join(tmpdir.path, 'some')
|
||||
self.SIMPLE_FILE_OUTSIDE_DIR = os.path.join('/tmp', 'some')
|
||||
self.TRAVERSAL_WITHIN_DIR = os.path.join(tmpdir.path, 'a', '..',
|
||||
'some')
|
||||
self.TRAVERSAL_OUTSIDE_DIR = os.path.join(tmpdir.path, '..', 'some')
|
||||
|
||||
self.TRAVERSAL_SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path,
|
||||
gen_name())
|
||||
os.symlink(os.path.join(tmpdir.path, 'a', '..', 'a'),
|
||||
self.TRAVERSAL_SYMLINK_WITHIN_DIR)
|
||||
|
||||
self.TRAVERSAL_SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path,
|
||||
gen_name())
|
||||
os.symlink(os.path.join(tmpdir.path, 'a', '..', '..', '..', 'etc'),
|
||||
self.TRAVERSAL_SYMLINK_OUTSIDE_DIR)
|
||||
|
||||
self.SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, gen_name())
|
||||
os.symlink(os.path.join(tmpdir.path, 'a'), self.SYMLINK_WITHIN_DIR)
|
||||
|
||||
self.SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, gen_name())
|
||||
os.symlink(os.path.join('/tmp', 'some_file'), self.SYMLINK_OUTSIDE_DIR)
|
||||
|
||||
def test_argument_pass_constraint(self):
|
||||
f = filters.PathFilter('/bin/chown', 'root', 'pass', 'pass')
|
||||
|
||||
args = ['chown', 'something', self.SIMPLE_FILE_OUTSIDE_DIR]
|
||||
self.assertTrue(f.match(args))
|
||||
|
||||
def test_argument_equality_constraint(self):
|
||||
f = filters.PathFilter('/bin/chown', 'root', 'nova', '/tmp/spam/eggs')
|
||||
|
||||
args = ['chown', 'nova', '/tmp/spam/eggs']
|
||||
self.assertTrue(f.match(args))
|
||||
|
||||
args = ['chown', 'neutron', '/tmp/spam/eggs']
|
||||
self.assertFalse(f.match(args))
|
||||
|
||||
def test_wrong_arguments_number(self):
|
||||
args = ['chown', '-c', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_wrong_exec_command(self):
|
||||
args = ['wrong_exec', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_match(self):
|
||||
args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
self.assertTrue(self.f.match(args))
|
||||
|
||||
def test_match_traversal(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
|
||||
self.assertTrue(self.f.match(args))
|
||||
|
||||
def test_match_symlink(self):
|
||||
args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
|
||||
self.assertTrue(self.f.match(args))
|
||||
|
||||
def test_match_traversal_symlink(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
|
||||
self.assertTrue(self.f.match(args))
|
||||
|
||||
def test_reject(self):
|
||||
args = ['chown', 'nova', self.SIMPLE_FILE_OUTSIDE_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_reject_traversal(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_OUTSIDE_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_reject_symlink(self):
|
||||
args = ['chown', 'nova', self.SYMLINK_OUTSIDE_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_reject_traversal_symlink(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_OUTSIDE_DIR]
|
||||
self.assertFalse(self.f.match(args))
|
||||
|
||||
def test_get_command(self):
|
||||
args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
expected = ['/bin/chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
|
||||
|
||||
self.assertEqual(expected, self.f.get_command(args))
|
||||
|
||||
def test_get_command_traversal(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
|
||||
expected = ['/bin/chown', 'nova',
|
||||
os.path.realpath(self.TRAVERSAL_WITHIN_DIR)]
|
||||
|
||||
self.assertEqual(expected, self.f.get_command(args))
|
||||
|
||||
def test_get_command_symlink(self):
|
||||
args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
|
||||
expected = ['/bin/chown', 'nova',
|
||||
os.path.realpath(self.SYMLINK_WITHIN_DIR)]
|
||||
|
||||
self.assertEqual(expected, self.f.get_command(args))
|
||||
|
||||
def test_get_command_traversal_symlink(self):
|
||||
args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
|
||||
expected = ['/bin/chown', 'nova',
|
||||
os.path.realpath(self.TRAVERSAL_SYMLINK_WITHIN_DIR)]
|
||||
|
||||
self.assertEqual(expected, self.f.get_command(args))
|
Loading…
Reference in New Issue
Block a user