Rewrite exec utilies to be generic
This patch adds executil.BaseCommand, which contains some scaffolding for building generic and simple CLI commands. Also, executil.py contains a couple of predefined commands, such as Python, Shell, Powershell etc. fileexecutils, as well as userdatautils, were rewritten to use the new command framework instead, which greatly increases usability and simplifies cross platform issues. Change-Id: Ifcd91da95a272ef68d56491f90bc213826eb7679
This commit is contained in:
parent
b0aa3b56b8
commit
6d5d3994ea
0
cloudbaseinit/plugins/common/__init__.py
Normal file
0
cloudbaseinit/plugins/common/__init__.py
Normal file
145
cloudbaseinit/plugins/common/executil.py
Normal file
145
cloudbaseinit/plugins/common/executil.py
Normal file
@ -0,0 +1,145 @@
|
||||
# Copyright 2014 Cloudbase Solutions Srl
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import functools
|
||||
import os
|
||||
import tempfile
|
||||
import uuid
|
||||
|
||||
from cloudbaseinit.osutils import factory as osutils_factory
|
||||
|
||||
|
||||
__all__ = (
|
||||
'BaseCommand',
|
||||
'Shell',
|
||||
'Python',
|
||||
'Bash',
|
||||
'Powershell',
|
||||
'PowershellSysnative',
|
||||
)
|
||||
|
||||
|
||||
class BaseCommand(object):
|
||||
"""Implements logic for executing an user command.
|
||||
|
||||
This is intended to be subclassed and each subclass should change the
|
||||
attributes which controls the behaviour of the execution.
|
||||
It must be instantiated with a file.
|
||||
It can also execute string commands, by using the alternate
|
||||
constructor :meth:`~from_data`.
|
||||
|
||||
The following attributes can control the behaviour of the command:
|
||||
|
||||
* shell: Run the command as a shell command.
|
||||
* extension:
|
||||
|
||||
A string, which will be appended to a generated script file.
|
||||
This is important for certain commands, e.g. Powershell,
|
||||
which can't execute something without the `.ps1` extension.
|
||||
|
||||
* command:
|
||||
|
||||
A program which will execute the underlying command,
|
||||
e.g. `python`, `bash` etc.
|
||||
|
||||
"""
|
||||
shell = False
|
||||
extension = None
|
||||
command = None
|
||||
|
||||
def __init__(self, target_path, cleanup=None):
|
||||
"""Instantiate the command.
|
||||
|
||||
The parameter *target_path* represents the file which will be
|
||||
executed. The optional parameter *cleanup* can be a callable,
|
||||
which will be called after executing a command, no matter if the
|
||||
execution was succesful or not.
|
||||
"""
|
||||
|
||||
self._target_path = target_path
|
||||
self._cleanup = cleanup
|
||||
self._osutils = osutils_factory.get_os_utils()
|
||||
|
||||
@property
|
||||
def args(self):
|
||||
"""Return a list of commands.
|
||||
|
||||
The list will be passed to :meth:`~execute_process`.
|
||||
"""
|
||||
if not self.command:
|
||||
# Then we can assume it's a shell command.
|
||||
return [self._target_path]
|
||||
else:
|
||||
return [self.command, self._target_path]
|
||||
|
||||
def get_execute_method(self):
|
||||
"""Return a callable, which will be called by :meth:`~execute`."""
|
||||
return functools.partial(self._osutils.execute_process,
|
||||
self.args, shell=self.shell)
|
||||
|
||||
def execute(self):
|
||||
"""Execute the underlying command."""
|
||||
try:
|
||||
return self.get_execute_method()()
|
||||
finally:
|
||||
if self._cleanup:
|
||||
self._cleanup()
|
||||
|
||||
__call__ = execute
|
||||
|
||||
@classmethod
|
||||
def from_data(cls, command):
|
||||
"""Create a new command class from the given command data."""
|
||||
def safe_remove(target_path):
|
||||
try:
|
||||
os.remove(target_path)
|
||||
except OSError: # pragma: no cover
|
||||
pass
|
||||
|
||||
tmp = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
|
||||
if cls.extension:
|
||||
tmp += cls.extension
|
||||
with open(tmp, 'wb') as stream:
|
||||
stream.write(command)
|
||||
return cls(tmp, cleanup=functools.partial(safe_remove, tmp))
|
||||
|
||||
|
||||
class Shell(BaseCommand):
|
||||
shell = True
|
||||
extension = '.cmd'
|
||||
|
||||
|
||||
class Python(BaseCommand):
|
||||
extension = '.py'
|
||||
command = 'python'
|
||||
|
||||
|
||||
class Bash(BaseCommand):
|
||||
extension = '.sh'
|
||||
command = 'bash'
|
||||
|
||||
|
||||
class PowershellSysnative(BaseCommand):
|
||||
extension = '.ps1'
|
||||
sysnative = True
|
||||
|
||||
def get_execute_method(self):
|
||||
return functools.partial(
|
||||
self._osutils.execute_powershell_script,
|
||||
self._target_path,
|
||||
self.sysnative)
|
||||
|
||||
|
||||
class Powershell(PowershellSysnative):
|
||||
sysnative = False
|
@ -15,47 +15,37 @@
|
||||
import os
|
||||
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
from cloudbaseinit.osutils import factory as osutils_factory
|
||||
from cloudbaseinit.plugins.common import executil
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
FORMATS = {
|
||||
"cmd": executil.Shell,
|
||||
"exe": executil.Shell,
|
||||
"sh": executil.Bash,
|
||||
"py": executil.Python,
|
||||
"ps1": executil.PowershellSysnative,
|
||||
}
|
||||
|
||||
|
||||
def exec_file(file_path):
|
||||
shell = False
|
||||
powershell = False
|
||||
|
||||
ret_val = 0
|
||||
out = err = None
|
||||
ext = os.path.splitext(file_path)[1][1:].lower()
|
||||
|
||||
if ext == "cmd":
|
||||
args = [file_path]
|
||||
shell = True
|
||||
elif ext == "exe":
|
||||
args = [file_path]
|
||||
elif ext == "sh":
|
||||
args = ["bash.exe", file_path]
|
||||
elif ext == "py":
|
||||
args = ["python.exe", file_path]
|
||||
elif ext == "ps1":
|
||||
powershell = True
|
||||
else:
|
||||
command = FORMATS.get(ext)
|
||||
if not command:
|
||||
# Unsupported
|
||||
LOG.warning('Unsupported script file type: %s' % ext)
|
||||
return 0
|
||||
|
||||
osutils = osutils_factory.get_os_utils()
|
||||
LOG.warning('Unsupported script file type: %s', ext)
|
||||
return ret_val
|
||||
|
||||
try:
|
||||
if powershell:
|
||||
(out, err,
|
||||
ret_val) = osutils.execute_powershell_script(file_path)
|
||||
else:
|
||||
(out, err, ret_val) = osutils.execute_process(args, shell)
|
||||
|
||||
LOG.info('Script "%(file_path)s" ended with exit code: %(ret_val)d' %
|
||||
{"file_path": file_path, "ret_val": ret_val})
|
||||
LOG.debug('User_data stdout:\n%s' % out)
|
||||
LOG.debug('User_data stderr:\n%s' % err)
|
||||
|
||||
return ret_val
|
||||
out, err, ret_val = command(file_path).execute()
|
||||
except Exception as ex:
|
||||
LOG.warning('An error occurred during file execution: \'%s\'' % ex)
|
||||
LOG.warning('An error occurred during file execution: \'%s\'', ex)
|
||||
else:
|
||||
LOG.debug('User_data stdout:\n%s', out)
|
||||
LOG.debug('User_data stderr:\n%s', err)
|
||||
|
||||
LOG.info('Script "%(file_path)s" ended with exit code: %(ret_val)d',
|
||||
{"file_path": file_path, "ret_val": ret_val})
|
||||
return ret_val
|
||||
|
@ -12,66 +12,51 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import functools
|
||||
import re
|
||||
import tempfile
|
||||
import uuid
|
||||
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
from cloudbaseinit.osutils import factory as osutils_factory
|
||||
from cloudbaseinit.utils import encoding
|
||||
from cloudbaseinit.plugins.common import executil
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# Avoid 80+ length by using a local variable, which
|
||||
# is deleted afterwards.
|
||||
_compile = functools.partial(re.compile, flags=re.I)
|
||||
FORMATS = (
|
||||
(_compile(br'^rem cmd\s'), executil.Shell),
|
||||
(_compile(br'^#!/usr/bin/env\spython\s'), executil.Python),
|
||||
(_compile(br'^#!'), executil.Bash),
|
||||
(_compile(br'^#(ps1|ps1_sysnative)\s'), executil.PowershellSysnative),
|
||||
(_compile(br'^#ps1_x86\s'), executil.Powershell),
|
||||
)
|
||||
del _compile
|
||||
|
||||
|
||||
def _get_command(data):
|
||||
# Get the command which should process the given data.
|
||||
for pattern, command_class in FORMATS:
|
||||
if pattern.search(data):
|
||||
return command_class.from_data(data)
|
||||
|
||||
|
||||
def execute_user_data_script(user_data):
|
||||
osutils = osutils_factory.get_os_utils()
|
||||
|
||||
shell = False
|
||||
powershell = False
|
||||
sysnative = True
|
||||
|
||||
target_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
|
||||
if re.search(r'^rem cmd\s', user_data, re.I):
|
||||
target_path += '.cmd'
|
||||
args = [target_path]
|
||||
shell = True
|
||||
elif re.search(r'^#!/usr/bin/env\spython\s', user_data, re.I):
|
||||
target_path += '.py'
|
||||
args = ['python.exe', target_path]
|
||||
elif re.search(r'^#!', user_data, re.I):
|
||||
target_path += '.sh'
|
||||
args = ['bash.exe', target_path]
|
||||
elif re.search(r'^#(ps1|ps1_sysnative)\s', user_data, re.I):
|
||||
target_path += '.ps1'
|
||||
powershell = True
|
||||
elif re.search(r'^#ps1_x86\s', user_data, re.I):
|
||||
target_path += '.ps1'
|
||||
powershell = True
|
||||
sysnative = False
|
||||
else:
|
||||
ret_val = 0
|
||||
out = err = None
|
||||
command = _get_command(user_data)
|
||||
if not command:
|
||||
# Unsupported
|
||||
LOG.warning('Unsupported user_data format')
|
||||
return 0
|
||||
return ret_val
|
||||
|
||||
try:
|
||||
encoding.write_file(target_path, user_data)
|
||||
|
||||
if powershell:
|
||||
(out, err,
|
||||
ret_val) = osutils.execute_powershell_script(target_path,
|
||||
sysnative)
|
||||
else:
|
||||
(out, err, ret_val) = osutils.execute_process(args, shell)
|
||||
|
||||
LOG.info('User_data script ended with return code: %d' % ret_val)
|
||||
LOG.debug('User_data stdout:\n%s' % out)
|
||||
LOG.debug('User_data stderr:\n%s' % err)
|
||||
|
||||
return ret_val
|
||||
out, err, ret_val = command()
|
||||
except Exception as ex:
|
||||
LOG.warning('An error occurred during user_data execution: \'%s\''
|
||||
% ex)
|
||||
finally:
|
||||
if os.path.exists(target_path):
|
||||
os.remove(target_path)
|
||||
LOG.warning('An error occurred during user_data execution: \'%s\'',
|
||||
ex)
|
||||
else:
|
||||
LOG.debug('User_data stdout:\n%s', out)
|
||||
LOG.debug('User_data stderr:\n%s', err)
|
||||
|
||||
LOG.info('User_data script ended with return code: %d', ret_val)
|
||||
return ret_val
|
||||
|
0
cloudbaseinit/tests/plugins/common/__init__.py
Normal file
0
cloudbaseinit/tests/plugins/common/__init__.py
Normal file
110
cloudbaseinit/tests/plugins/common/test_executil.py
Normal file
110
cloudbaseinit/tests/plugins/common/test_executil.py
Normal file
@ -0,0 +1,110 @@
|
||||
# Copyright 2014 Cloudbase Solutions Srl
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from cloudbaseinit.plugins.common import executil
|
||||
from cloudbaseinit.tests import testutils
|
||||
|
||||
|
||||
def _remove_file(filepath):
|
||||
try:
|
||||
os.remove(filepath)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
|
||||
class ExecUtilTest(unittest.TestCase):
|
||||
|
||||
def test_from_data(self, _):
|
||||
command = executil.BaseCommand.from_data(b"test")
|
||||
|
||||
self.assertIsInstance(command, executil.BaseCommand)
|
||||
|
||||
# Not public API, though.
|
||||
self.assertTrue(os.path.exists(command._target_path),
|
||||
command._target_path)
|
||||
self.addCleanup(_remove_file, command._target_path)
|
||||
|
||||
with open(command._target_path) as stream:
|
||||
data = stream.read()
|
||||
|
||||
self.assertEqual("test", data)
|
||||
command._cleanup()
|
||||
self.assertFalse(os.path.exists(command._target_path),
|
||||
command._target_path)
|
||||
|
||||
def test_args(self, _):
|
||||
class FakeCommand(executil.BaseCommand):
|
||||
command = mock.sentinel.command
|
||||
|
||||
with testutils.create_tempfile() as tmp:
|
||||
fake_command = FakeCommand(tmp)
|
||||
self.assertEqual([mock.sentinel.command, tmp],
|
||||
fake_command.args)
|
||||
|
||||
fake_command = executil.BaseCommand(tmp)
|
||||
self.assertEqual([tmp], fake_command.args)
|
||||
|
||||
def test_from_data_extension(self, _):
|
||||
class FakeCommand(executil.BaseCommand):
|
||||
command = mock.sentinel.command
|
||||
extension = ".test"
|
||||
|
||||
command = FakeCommand.from_data(b"test")
|
||||
self.assertIsInstance(command, FakeCommand)
|
||||
|
||||
self.addCleanup(os.remove, command._target_path)
|
||||
self.assertTrue(command._target_path.endswith(".test"))
|
||||
|
||||
def test_execute_normal_command(self, mock_get_os_utils):
|
||||
mock_osutils = mock_get_os_utils()
|
||||
|
||||
with testutils.create_tempfile() as tmp:
|
||||
command = executil.BaseCommand(tmp)
|
||||
command.execute()
|
||||
|
||||
mock_osutils.execute_process.assert_called_once_with(
|
||||
[command._target_path],
|
||||
shell=command.shell)
|
||||
|
||||
# test __call__ API.
|
||||
mock_osutils.execute_process.reset_mock()
|
||||
command()
|
||||
|
||||
mock_osutils.execute_process.assert_called_once_with(
|
||||
[command._target_path],
|
||||
shell=command.shell)
|
||||
|
||||
def test_execute_powershell_command(self, mock_get_os_utils):
|
||||
mock_osutils = mock_get_os_utils()
|
||||
|
||||
with testutils.create_tempfile() as tmp:
|
||||
command = executil.Powershell(tmp)
|
||||
command.execute()
|
||||
|
||||
mock_osutils.execute_powershell_script.assert_called_once_with(
|
||||
command._target_path, command.sysnative)
|
||||
|
||||
def test_execute_cleanup(self, _):
|
||||
with testutils.create_tempfile() as tmp:
|
||||
cleanup = mock.Mock()
|
||||
command = executil.BaseCommand(tmp, cleanup=cleanup)
|
||||
command.execute()
|
||||
|
||||
cleanup.assert_called_once_with()
|
@ -12,60 +12,49 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from cloudbaseinit.plugins.common import executil
|
||||
from cloudbaseinit.plugins.windows import fileexecutils
|
||||
|
||||
|
||||
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
|
||||
class TestFileExecutilsPlugin(unittest.TestCase):
|
||||
|
||||
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
|
||||
def _test_exec_file(self, mock_get_os_utils, filename, exception=False):
|
||||
mock_osutils = mock.MagicMock()
|
||||
mock_part = mock.MagicMock()
|
||||
mock_part.get_filename.return_value = filename
|
||||
mock_get_os_utils.return_value = mock_osutils
|
||||
if exception:
|
||||
mock_osutils.execute_process.side_effect = [Exception]
|
||||
with mock.patch("cloudbaseinit.plugins.windows.userdataplugins."
|
||||
"shellscript.open", mock.mock_open(), create=True):
|
||||
response = fileexecutils.exec_file(filename)
|
||||
if filename.endswith(".cmd"):
|
||||
mock_osutils.execute_process.assert_called_once_with(
|
||||
[filename], True)
|
||||
elif filename.endswith(".sh"):
|
||||
mock_osutils.execute_process.assert_called_once_with(
|
||||
['bash.exe', filename], False)
|
||||
elif filename.endswith(".py"):
|
||||
mock_osutils.execute_process.assert_called_once_with(
|
||||
['python.exe', filename], False)
|
||||
elif filename.endswith(".exe"):
|
||||
mock_osutils.execute_process.assert_called_once_with(
|
||||
[filename], False)
|
||||
elif filename.endswith(".ps1"):
|
||||
mock_osutils.execute_powershell_script.assert_called_once_with(
|
||||
filename)
|
||||
else:
|
||||
self.assertEqual(0, response)
|
||||
def test_exec_file_no_executor(self, _):
|
||||
retval = fileexecutils.exec_file("fake.fake")
|
||||
self.assertEqual(0, retval)
|
||||
|
||||
def test_process_cmd(self):
|
||||
self._test_exec_file(filename='fake.cmd')
|
||||
def test_executors_mapping(self, _):
|
||||
self.assertEqual(fileexecutils.FORMATS["cmd"],
|
||||
executil.Shell)
|
||||
self.assertEqual(fileexecutils.FORMATS["exe"],
|
||||
executil.Shell)
|
||||
self.assertEqual(fileexecutils.FORMATS["sh"],
|
||||
executil.Bash)
|
||||
self.assertEqual(fileexecutils.FORMATS["py"],
|
||||
executil.Python)
|
||||
self.assertEqual(fileexecutils.FORMATS["ps1"],
|
||||
executil.PowershellSysnative)
|
||||
|
||||
def test_process_sh(self):
|
||||
self._test_exec_file(filename='fake.sh')
|
||||
@mock.patch('cloudbaseinit.plugins.common.executil.'
|
||||
'BaseCommand.execute')
|
||||
def test_exec_file_fails(self, mock_execute, _):
|
||||
mock_execute.side_effect = ValueError
|
||||
retval = fileexecutils.exec_file("fake.py")
|
||||
mock_execute.assert_called_once_with()
|
||||
self.assertEqual(0, retval)
|
||||
|
||||
def test_process_py(self):
|
||||
self._test_exec_file(filename='fake.py')
|
||||
|
||||
def test_process_ps1(self):
|
||||
self._test_exec_file(filename='fake.ps1')
|
||||
|
||||
def test_process_other(self):
|
||||
self._test_exec_file(filename='fake.other')
|
||||
|
||||
def test_process_exe(self):
|
||||
self._test_exec_file(filename='fake.exe')
|
||||
|
||||
def test_process_exception(self):
|
||||
self._test_exec_file(filename='fake.exe', exception=True)
|
||||
@mock.patch('cloudbaseinit.plugins.common.executil.'
|
||||
'BaseCommand.execute')
|
||||
def test_exec_file_(self, mock_execute, _):
|
||||
mock_execute.return_value = (
|
||||
mock.sentinel.out,
|
||||
mock.sentinel.error,
|
||||
0,
|
||||
)
|
||||
retval = fileexecutils.exec_file("fake.py")
|
||||
mock_execute.assert_called_once_with()
|
||||
self.assertEqual(0, retval)
|
||||
|
@ -12,123 +12,74 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from oslo.config import cfg
|
||||
import mock
|
||||
|
||||
from cloudbaseinit.plugins.common import executil
|
||||
from cloudbaseinit.plugins.windows import userdatautils
|
||||
from cloudbaseinit.tests.metadata import fake_json_response
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def _safe_remove(filepath):
|
||||
try:
|
||||
os.remove(filepath)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
|
||||
class UserDataUtilsTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.fake_data = fake_json_response.get_fake_metadata_json(
|
||||
'2013-04-04')
|
||||
def _get_command(self, data):
|
||||
"""Get a command from the given data.
|
||||
|
||||
@mock.patch('re.search')
|
||||
@mock.patch('tempfile.gettempdir')
|
||||
@mock.patch('os.remove')
|
||||
@mock.patch('os.path.exists')
|
||||
@mock.patch('os.path.expandvars')
|
||||
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
|
||||
@mock.patch('uuid.uuid4')
|
||||
@mock.patch('cloudbaseinit.utils.encoding.write_file')
|
||||
def _test_execute_user_data_script(self, mock_write_file, mock_uuid4,
|
||||
mock_get_os_utils, mock_path_expandvars,
|
||||
mock_path_exists, mock_os_remove,
|
||||
mock_gettempdir, mock_re_search,
|
||||
fake_user_data):
|
||||
mock_osutils = mock.MagicMock()
|
||||
mock_gettempdir.return_value = 'fake_temp'
|
||||
mock_uuid4.return_value = 'randomID'
|
||||
match_instance = mock.MagicMock()
|
||||
path = os.path.join('fake_temp', 'randomID')
|
||||
args = None
|
||||
powershell = False
|
||||
mock_get_os_utils.return_value = mock_osutils
|
||||
mock_path_exists.return_value = True
|
||||
extension = ''
|
||||
If a command was obtained, then a cleanup will be added in order
|
||||
to remove the underlying target path of the command.
|
||||
"""
|
||||
command = userdatautils._get_command(data)
|
||||
if command:
|
||||
self.addCleanup(_safe_remove, command._target_path)
|
||||
return command
|
||||
|
||||
if fake_user_data == '^rem cmd\s':
|
||||
side_effect = [match_instance]
|
||||
number_of_calls = 1
|
||||
extension = '.cmd'
|
||||
args = [path + extension]
|
||||
shell = True
|
||||
elif fake_user_data == '^#!/usr/bin/env\spython\s':
|
||||
side_effect = [None, match_instance]
|
||||
number_of_calls = 2
|
||||
extension = '.py'
|
||||
args = ['python.exe', path + extension]
|
||||
shell = False
|
||||
elif fake_user_data == '#!':
|
||||
side_effect = [None, None, match_instance]
|
||||
number_of_calls = 3
|
||||
extension = '.sh'
|
||||
args = ['bash.exe', path + extension]
|
||||
shell = False
|
||||
elif fake_user_data == '#ps1_sysnative\s':
|
||||
side_effect = [None, None, None, match_instance]
|
||||
number_of_calls = 4
|
||||
extension = '.ps1'
|
||||
sysnative = True
|
||||
powershell = True
|
||||
elif fake_user_data == '#ps1_x86\s':
|
||||
side_effect = [None, None, None, None, match_instance]
|
||||
number_of_calls = 5
|
||||
extension = '.ps1'
|
||||
shell = False
|
||||
sysnative = False
|
||||
powershell = True
|
||||
else:
|
||||
side_effect = [None, None, None, None, None]
|
||||
number_of_calls = 5
|
||||
def test__get_command(self, _):
|
||||
command = self._get_command(b'rem cmd test')
|
||||
self.assertIsInstance(command, executil.Shell)
|
||||
|
||||
mock_re_search.side_effect = side_effect
|
||||
command = self._get_command(b'#!/usr/bin/env python\ntest')
|
||||
self.assertIsInstance(command, executil.Python)
|
||||
|
||||
response = userdatautils.execute_user_data_script(fake_user_data)
|
||||
command = self._get_command(b'#!/bin/bash')
|
||||
self.assertIsInstance(command, executil.Bash)
|
||||
|
||||
mock_gettempdir.assert_called_once_with()
|
||||
command = self._get_command(b'#ps1_sysnative\n')
|
||||
self.assertIsInstance(command, executil.PowershellSysnative)
|
||||
|
||||
self.assertEqual(number_of_calls, mock_re_search.call_count)
|
||||
if args:
|
||||
mock_write_file.assert_called_once_with(path + extension,
|
||||
fake_user_data)
|
||||
mock_osutils.execute_process.assert_called_with(args, shell)
|
||||
mock_os_remove.assert_called_once_with(path + extension)
|
||||
self.assertEqual(None, response)
|
||||
elif powershell:
|
||||
mock_osutils.execute_powershell_script.assert_called_with(
|
||||
path + extension, sysnative)
|
||||
mock_os_remove.assert_called_once_with(path + extension)
|
||||
self.assertEqual(None, response)
|
||||
else:
|
||||
self.assertEqual(0, response)
|
||||
command = self._get_command(b'#ps1_x86\n')
|
||||
self.assertIsInstance(command, executil.Powershell)
|
||||
|
||||
def test_handle_batch(self):
|
||||
fake_user_data = b'^rem cmd\s'
|
||||
self._test_execute_user_data_script(fake_user_data=fake_user_data)
|
||||
command = self._get_command(b'unknown')
|
||||
self.assertIsNone(command)
|
||||
|
||||
def test_handle_python(self):
|
||||
self._test_execute_user_data_script(
|
||||
fake_user_data=b'^#!/usr/bin/env\spython\s')
|
||||
def test_execute_user_data_script_no_commands(self, _):
|
||||
retval = userdatautils.execute_user_data_script(b"unknown")
|
||||
self.assertEqual(0, retval)
|
||||
|
||||
def test_handle_shell(self):
|
||||
self._test_execute_user_data_script(fake_user_data=b'^#!')
|
||||
@mock.patch('cloudbaseinit.plugins.windows.userdatautils.'
|
||||
'_get_command')
|
||||
def test_execute_user_data_script_fails(self, mock_get_command, _):
|
||||
mock_get_command.return_value.side_effect = ValueError
|
||||
retval = userdatautils.execute_user_data_script(
|
||||
mock.sentinel.user_data)
|
||||
|
||||
def test_handle_powershell(self):
|
||||
self._test_execute_user_data_script(fake_user_data=b'^#ps1\s')
|
||||
self.assertEqual(0, retval)
|
||||
|
||||
def test_handle_powershell_sysnative(self):
|
||||
self._test_execute_user_data_script(fake_user_data=b'#ps1_sysnative\s')
|
||||
|
||||
def test_handle_powershell_sysnative_no_sysnative(self):
|
||||
self._test_execute_user_data_script(fake_user_data=b'#ps1_x86\s')
|
||||
|
||||
def test_handle_unsupported_format(self):
|
||||
self._test_execute_user_data_script(fake_user_data=b'unsupported')
|
||||
@mock.patch('cloudbaseinit.plugins.windows.userdatautils.'
|
||||
'_get_command')
|
||||
def test_execute_user_data_script(self, mock_get_command, _):
|
||||
mock_get_command.return_value.return_value = (
|
||||
mock.sentinel.output, mock.sentinel.error, -1
|
||||
)
|
||||
retval = userdatautils.execute_user_data_script(
|
||||
mock.sentinel.user_data)
|
||||
self.assertEqual(-1, retval)
|
||||
|
59
cloudbaseinit/tests/testutils.py
Normal file
59
cloudbaseinit/tests/testutils.py
Normal file
@ -0,0 +1,59 @@
|
||||
# Copyright 2014 Cloudbase Solutions Srl
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
|
||||
__all__ = (
|
||||
'create_tempfile',
|
||||
'create_tempdir',
|
||||
)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def create_tempdir():
|
||||
"""Create a temporary directory.
|
||||
|
||||
This is a context manager, which creates a new temporary
|
||||
directory and removes it when exiting from the context manager
|
||||
block.
|
||||
"""
|
||||
tempdir = tempfile.mkdtemp(prefix="cloudbaseinit-tests")
|
||||
try:
|
||||
yield tempdir
|
||||
finally:
|
||||
shutil.rmtree(tempdir)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def create_tempfile(content=None):
|
||||
"""Create a temporary file.
|
||||
|
||||
This is a context manager, which uses `create_tempdir` to obtain a
|
||||
temporary directory, where the file will be placed.
|
||||
|
||||
:param content:
|
||||
Additionally, a string which will be written
|
||||
in the new file.
|
||||
"""
|
||||
with create_tempdir() as temp:
|
||||
fd, path = tempfile.mkstemp(dir=temp)
|
||||
os.close(fd)
|
||||
if content:
|
||||
with open(path, 'w') as stream:
|
||||
stream.write(content)
|
||||
yield path
|
Loading…
x
Reference in New Issue
Block a user