Fixes encoding issue

Logging fails when using non-ASCII characters. This gives the option
of decoding the output of system calls.

Closes-Bug: #1382047
Change-Id: Ieb43b2ce4765975ee93da528ad3cb95b27d6285a
This commit is contained in:
Robert Tingirica 2014-11-19 17:36:49 +02:00
parent 5344904f27
commit b9c23f8692
4 changed files with 65 additions and 3 deletions

View File

@ -17,6 +17,7 @@
import base64
import os
import subprocess
import sys
class BaseOSUtils(object):
@ -35,12 +36,17 @@ class BaseOSUtils(object):
b64_password = base64.b64encode(os.urandom(256))
return b64_password.replace('/', '').replace('+', '')[:length]
def execute_process(self, args, shell=True):
def execute_process(self, args, shell=True, decode_output=False):
p = subprocess.Popen(args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=shell)
(out, err) = p.communicate()
if decode_output and sys.version_info < (3, 0):
out = out.decode(sys.stdout.encoding)
err = err.decode(sys.stdout.encoding)
return (out, err, p.returncode)
def sanitize_shell_input(self, value):

View File

@ -46,7 +46,7 @@ class WindowsLicensingPlugin(base.BasePlugin):
slmgr_path = os.path.join(slmgr_dir, "slmgr.vbs")
(out, err, exit_code) = osutils.execute_process(
[cscript_path, slmgr_path] + args, False)
[cscript_path, slmgr_path] + args, False, True)
if exit_code:
raise exception.CloudbaseInitException(

View File

@ -0,0 +1,56 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import subprocess
import sys
import unittest
from cloudbaseinit.osutils import base
class BaseOSUtilsTests(unittest.TestCase):
def setUp(self):
self._base = base.BaseOSUtils()
@mock.patch('sys.stdout')
@mock.patch('subprocess.Popen')
@mock.patch('subprocess.PIPE')
def test_execute_process(self, mock_PIPE, mock_Popen, mock_stdout):
args = [mock.sentinel.fake_arg]
mock_p = mock.MagicMock()
mock_out = mock.MagicMock()
mock_err = mock.MagicMock()
mock_Popen.return_value = mock_p
mock_p.communicate.return_value = (mock_out, mock_err)
response = self._base.execute_process(args, shell=True,
decode_output=True)
mock_Popen.assert_called_once_with(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
mock_p.communicate.assert_called_once_with()
if sys.version_info < (3, 0):
mock_out.decode.assert_called_once_with(mock_stdout.encoding)
mock_err.decode.assert_called_once_with(mock_stdout.encoding)
self.assertEqual((mock_out.decode.return_value,
mock_err.decode.return_value, mock_p.returncode),
response)
else:
self.assertEqual((mock_out, mock_err, mock_p.returncode), response)

View File

@ -64,7 +64,7 @@ class WindowsLicensingPluginTests(unittest.TestCase):
get_system32_dir_calls.append(mock.call())
mock_osutils.execute_process.assert_called_once_with(
[cscript_path, slmgr_path, 'fake args'], False)
[cscript_path, slmgr_path, 'fake args'], False, True)
self.assertEqual(get_system32_dir_calls,
mock_osutils.get_system32_dir.call_args_list)