add rootwrap filters to wrap ip netns exec
fixes bug 1044083 This patch adds specific filters for the ip command. The first filter matches ip with any subcomand except netns exec. The second filter matches "ip netns exec" and then relies on the caller (match_filter) to verify the sub-command against the other filters. Matching the subcommand separately allows for a single set of filter definitions that work with and without namespaces. Change-Id: Ifd0378dc3461f84867efb3cb60396d9cfa9e582d
This commit is contained in:
parent
66d76b75d7
commit
195a176337
@ -20,3 +20,9 @@ kill_dnsmasq_usr: KillFilter, root, /usr/sbin/dnsmasq, -9, -HUP
|
||||
|
||||
# dhcp-agent uses cat
|
||||
cat: RegExpFilter, /bin/cat, root, cat, /proc/\d+/cmdline
|
||||
|
||||
# ip_lib
|
||||
ip: IpFilter, /sbin/ip, root
|
||||
ip_usr: IpFilter, /usr/sbin/ip, root
|
||||
ip_exec: IpNetnsExecFilter, /sbin/ip, root
|
||||
ip_exec_usr: IpNetnsExecFilter, /usr/sbin/ip, root
|
||||
|
@ -12,8 +12,10 @@
|
||||
sysctl: CommandFilter, /sbin/sysctl, root
|
||||
|
||||
# ip_lib
|
||||
ip: CommandFilter, /sbin/ip, root
|
||||
ip_usr: CommandFilter, /usr/sbin/ip, root
|
||||
ip: IpFilter, /sbin/ip, root
|
||||
ip_usr: IpFilter, /usr/sbin/ip, root
|
||||
ip_exec: IpNetnsExecFilter, /sbin/ip, root
|
||||
ip_exec_usr: IpNetnsExecFilter, /usr/sbin/ip, root
|
||||
|
||||
# ovs_lib (if OVSInterfaceDriver is used)
|
||||
ovs-vsctl: CommandFilter, /bin/ovs-vsctl, root
|
||||
|
@ -13,5 +13,9 @@
|
||||
# from the old mechanism
|
||||
brctl: CommandFilter, /sbin/brctl, root
|
||||
brctl_usr: CommandFilter, /usr/sbin/brctl, root
|
||||
ip: CommandFilter, /sbin/ip, root
|
||||
ip_usr: CommandFilter, /usr/sbin/ip, root
|
||||
|
||||
# ip_lib
|
||||
ip: IpFilter, /sbin/ip, root
|
||||
ip_usr: IpFilter, /usr/sbin/ip, root
|
||||
ip_exec: IpNetnsExecFilter, /sbin/ip, root
|
||||
ip_exec_usr: IpNetnsExecFilter, /usr/sbin/ip, root
|
||||
|
@ -44,6 +44,11 @@ class CommandFilter(object):
|
||||
return None
|
||||
|
||||
|
||||
class ExecCommandFilter(CommandFilter):
|
||||
def exec_args(self, userargs):
|
||||
return []
|
||||
|
||||
|
||||
class RegExpFilter(CommandFilter):
|
||||
"""Command filter doing regexp matching for every argument"""
|
||||
|
||||
@ -163,3 +168,29 @@ class ReadFileFilter(CommandFilter):
|
||||
if len(userargs) != 2:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class IpFilter(CommandFilter):
|
||||
"""Specific filter for the ip utility to that does not match exec."""
|
||||
|
||||
def match(self, userargs):
|
||||
if userargs[0] == 'ip':
|
||||
if userargs[1] == 'netns':
|
||||
if userargs[2] in ('list', 'add', 'delete'):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
class IpNetnsExecFilter(ExecCommandFilter):
|
||||
"""Specific filter for the ip utility to that does match exec."""
|
||||
def match(self, userargs):
|
||||
if userargs[:3] == ['ip', 'netns', 'exec']:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def exec_args(self, userargs):
|
||||
return userargs[4:]
|
||||
|
@ -54,7 +54,7 @@ def load_filters(filters_path):
|
||||
return filterlist
|
||||
|
||||
|
||||
def match_filter(filters, userargs):
|
||||
def match_filter(filter_list, userargs):
|
||||
"""
|
||||
Checks user command and arguments through command filters and
|
||||
returns the first matching filter, or None is none matched.
|
||||
@ -62,8 +62,18 @@ def match_filter(filters, userargs):
|
||||
|
||||
found_filter = None
|
||||
|
||||
for f in filters:
|
||||
for f in filter_list:
|
||||
if f.match(userargs):
|
||||
if isinstance(f, filters.ExecCommandFilter):
|
||||
# This command calls exec verify that remaining args
|
||||
# matches another filter.
|
||||
leaf_filters = [fltr for fltr in filter_list
|
||||
if not isinstance(fltr,
|
||||
filters.ExecCommandFilter)]
|
||||
args = f.exec_args(userargs)
|
||||
if not args or not match_filter(leaf_filters, args):
|
||||
continue
|
||||
|
||||
# Try other filters if executable is absent
|
||||
if not os.access(f.exec_path, os.X_OK):
|
||||
if not found_filter:
|
||||
|
@ -17,9 +17,10 @@
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
import unittest2 as unittest
|
||||
|
||||
from quantum.rootwrap import filters
|
||||
from quantum.rootwrap import wrapper
|
||||
import unittest
|
||||
|
||||
|
||||
class RootwrapTestCase(unittest.TestCase):
|
||||
@ -108,6 +109,47 @@ class RootwrapTestCase(unittest.TestCase):
|
||||
self.assertEqual(f.get_command(usercmd), ['/bin/cat', goodfn])
|
||||
self.assertTrue(f.match(usercmd))
|
||||
|
||||
def test_IpFilter_non_netns(self):
|
||||
f = filters.IpFilter('/sbin/ip', 'root')
|
||||
self.assertTrue(f.match(['ip', 'link', 'list']))
|
||||
|
||||
def _test_IpFilter_netns_helper(self, action):
|
||||
f = filters.IpFilter('/sbin/ip', 'root')
|
||||
self.assertTrue(f.match(['ip', 'link', action]))
|
||||
|
||||
def test_IpFilter_netns_add(self):
|
||||
self._test_IpFilter_netns_helper('add')
|
||||
|
||||
def test_IpFilter_netns_delete(self):
|
||||
self._test_IpFilter_netns_helper('delete')
|
||||
|
||||
def test_IpFilter_netns_list(self):
|
||||
self._test_IpFilter_netns_helper('list')
|
||||
|
||||
def test_IpNetnsExecFilter_match(self):
|
||||
f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
|
||||
self.assertTrue(
|
||||
f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
|
||||
|
||||
def test_IpNetnsExecFilter_nomatch(self):
|
||||
f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
|
||||
self.assertFalse(f.match(['ip', 'link', 'list']))
|
||||
|
||||
def test_match_filter_recurses_exec_command_filter(self):
|
||||
filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
|
||||
filters.IpFilter('/sbin/ip', 'root')]
|
||||
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
|
||||
|
||||
self.assertIsNotNone(wrapper.match_filter(filter_list, args))
|
||||
|
||||
def test_match_filter_recurses_exec_command_filter(self):
|
||||
filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
|
||||
filters.IpFilter('/sbin/ip', 'root')]
|
||||
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
|
||||
'ip', 'link', 'list']
|
||||
|
||||
self.assertIsNone(wrapper.match_filter(filter_list, args))
|
||||
|
||||
def test_skips(self):
|
||||
# Check that all filters are skipped and that the last matches
|
||||
usercmd = ["cat", "/"]
|
||||
|
Loading…
x
Reference in New Issue
Block a user