Make sudo check in ip_lib.IpNetnsCommand.execute optional

If the process runs as root the root_helper and sudo check
are not required.

Closes-Bug: #1393184
Change-Id: I7876ca7e4652f8152d1a8a0015cc897b09b31899
This commit is contained in:
Assaf Muller 2014-09-30 14:07:24 +03:00
parent 6b1da897a9
commit 5c21d0306f
2 changed files with 32 additions and 8 deletions

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os
import netaddr import netaddr
from oslo.config import cfg from oslo.config import cfg
@ -60,10 +62,13 @@ class SubProcessBase(object):
return self._execute(options, command, args, return self._execute(options, command, args,
log_fail_as_error=self.log_fail_as_error) log_fail_as_error=self.log_fail_as_error)
def _as_root(self, options, command, args, use_root_namespace=False): def enforce_root_helper(self):
if not self.root_helper: if not self.root_helper and os.geteuid() != 0:
raise exceptions.SudoRequired() raise exceptions.SudoRequired()
def _as_root(self, options, command, args, use_root_namespace=False):
self.enforce_root_helper()
namespace = self.namespace if not use_root_namespace else None namespace = self.namespace if not use_root_namespace else None
return self._execute(options, return self._execute(options,
@ -536,8 +541,7 @@ class IpNetnsCommand(IpCommandBase):
extra_ok_codes=None): extra_ok_codes=None):
ns_params = [] ns_params = []
if self._parent.namespace: if self._parent.namespace:
if not self._parent.root_helper: self._parent.enforce_root_helper()
raise exceptions.SudoRequired()
ns_params = ['ip', 'netns', 'exec', self._parent.namespace] ns_params = ['ip', 'netns', 'exec', self._parent.namespace]
env_params = [] env_params = []

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os
import mock import mock
from neutron.agent.linux import ip_lib from neutron.agent.linux import ip_lib
@ -191,11 +193,29 @@ class TestSubProcessBase(base.BaseTestCase):
root_helper='sudo', root_helper='sudo',
log_fail_as_error=True) log_fail_as_error=True)
def test_as_root_no_root_helper(self): def test_enforce_root_helper_no_root_helper(self):
base = ip_lib.SubProcessBase() base = ip_lib.SubProcessBase()
self.assertRaises(exceptions.SudoRequired, not_root = 42
base._as_root, with mock.patch.object(os, 'geteuid', return_value=not_root):
[], 'link', ('list',)) self.assertRaises(exceptions.SudoRequired,
base.enforce_root_helper)
def test_enforce_root_helper_with_root_helper_supplied(self):
base = ip_lib.SubProcessBase('sudo')
try:
base.enforce_root_helper()
except exceptions.SudoRequired:
self.fail('enforce_root_helper should not raise SudoRequired '
'when a root_helper is supplied.')
def test_enforce_root_helper_with_no_root_helper_but_root(self):
base = ip_lib.SubProcessBase()
with mock.patch.object(os, 'geteuid', return_value=0):
try:
base.enforce_root_helper()
except exceptions.SudoRequired:
self.fail('enforce_root_helper should not require a root '
'helper when run as root.')
class TestIpWrapper(base.BaseTestCase): class TestIpWrapper(base.BaseTestCase):