Modiffied tests for refactored userdata handling

Also modiffied the shellscript file where all extentions except for ".other"
were called with ".cmd"
This commit is contained in:
trobert2 2014-01-20 13:49:52 +02:00
parent e725b82913
commit 0559e4a130
11 changed files with 496 additions and 357 deletions

View File

@ -15,12 +15,11 @@
# under the License. # under the License.
import mock import mock
import tempfile
import uuid
import unittest import unittest
from cloudbaseinit.metadata.services import base as base_metadata from cloudbaseinit.metadata.services import base as metadata_services_base
from cloudbaseinit.openstack.common import cfg from cloudbaseinit.openstack.common import cfg
from cloudbaseinit.plugins import base
from cloudbaseinit.plugins.windows import userdata from cloudbaseinit.plugins.windows import userdata
from cloudbaseinit.tests.metadata import fake_json_response from cloudbaseinit.tests.metadata import fake_json_response
@ -34,230 +33,199 @@ class UserDataPluginTest(unittest.TestCase):
self.fake_data = fake_json_response.get_fake_metadata_json( self.fake_data = fake_json_response.get_fake_metadata_json(
'2013-04-04') '2013-04-04')
@mock.patch('os.path')
def test_get_plugin_path(self, mock_ospath):
mock_ospath.join.return_value = 'fake path'
response = self._userdata._get_plugin_path()
mock_ospath.join.assert_called_with(
mock_ospath.dirname(mock_ospath.dirname(mock_ospath.realpath())),
"windows/userdata-plugins")
self.assertEqual(response, 'fake path')
@mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin'
'._process_userdata') '._process_user_data')
def _test_execute(self, mock_process_userdata, user_data, exception): def _test_execute(self, mock_process_user_data, ret_val):
mock_service = mock.MagicMock() mock_service = mock.MagicMock()
fake_shared_data = 'fake data' mock_service.get_user_data.side_effect = [ret_val]
if exception: response = self._userdata.execute(service=mock_service,
e = base_metadata.NotExistingMetadataException() shared_data=None)
mock_service.side_effect = e mock_service.get_user_data.assert_called_once_with('openstack')
if ret_val is metadata_services_base.NotExistingMetadataException:
self.assertEqual(response, (base.PLUGIN_EXECUTION_DONE, False))
elif ret_val is None:
self.assertEqual(response, (base.PLUGIN_EXECUTION_DONE, False))
else: else:
mock_service.get_user_data.return_value = user_data mock_process_user_data.assert_called_once_with(ret_val)
response = self._userdata.execute(mock_service, fake_shared_data) self.assertEqual(response, mock_process_user_data())
mock_service.get_user_data.assert_called_with('openstack')
if user_data:
mock_process_userdata.assert_called_with(user_data)
self.assertEqual(response, (1, False))
def test_execute(self): def test_execute(self):
self._test_execute(user_data=self.fake_data, exception=False) self._test_execute(ret_val='fake data')
def test_execute_no_data(self): def test_execute_NotExistingMetadataException(self):
self._test_execute(user_data=None, exception=False) self._test_execute(
ret_val=metadata_services_base.NotExistingMetadataException)
def test_execute_exception(self): def test_execute_not_user_data(self):
self._test_execute(user_data=None, exception=True) self._test_execute(ret_val=None)
@mock.patch('cloudbaseinit.plugins.windows.userdata.handle') @mock.patch('email.message_from_string')
def test_parse_mime(self, mock_message_from_string):
fake_user_data = 'fake data'
response = self._userdata._parse_mime(user_data=fake_user_data)
mock_message_from_string.assert_called_once_with(fake_user_data)
self.assertEqual(response, mock_message_from_string().walk())
@mock.patch('cloudbaseinit.plugins.windows.userdataplugins.factory.'
'UserDataPluginsFactory.load_plugins')
@mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin'
'._parse_mime') '._parse_mime')
@mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin'
'._process_part') '._process_part')
def _test_process_userdata(self, mock_process_part, mock_parse_mime,
mock_handle, user_data):
mock_process_part().__iter__.side_effect = ['fake']
self._userdata._process_userdata(user_data)
print mock_parse_mime.mock_calls
print mock_process_part.mock_calls
if user_data.startswith('Content-Type: multipart'):
mock_parse_mime.assert_called_once_with(user_data)
self.assertEqual(mock_process_part.call_count, 1)
else:
mock_handle.assert_called_once_with(user_data)
def test_process_userdata_multipart(self):
user_data = 'Content-Type: multipart fake data'
self._test_process_userdata(user_data=user_data)
def test_process_userdata(self):
user_data = 'fake data'
self._test_process_userdata(user_data=user_data)
@mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin'
'._get_part_handler')
@mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin'
'._begin_part_process_event')
@mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin'
'._end_part_process_event') '._end_part_process_event')
def test_process_part(self, mock_end_part_process_event,
mock_begin_part_process_event,
mock_get_part_handler):
mock_part = mock.MagicMock()
mock_part_handler = mock.MagicMock()
mock_get_part_handler.return_value = mock_part_handler
self._userdata._process_part(mock_part)
mock_get_part_handler.assert_called_once_with(mock_part)
mock_begin_part_process_event.assert_called_once_with(mock_part)
mock_part.get_content_type.assert_called_once_with()
mock_part.get_filename.assert_called_once_with()
mock_part_handler.process.assert_called_with(mock_part)
mock_end_part_process_event.assert_called_with(mock_part)
@mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin'
'._get_custom_handler') '._process_non_multi_part')
def test_begin_part_process_event(self, mock_get_custom_handler): def _test_process_user_data(self, mock_process_non_multi_part,
mock_end_part_process_event,
mock_process_part, mock_parse_mime,
mock_load_plugins, user_data, reboot):
mock_part = mock.MagicMock() mock_part = mock.MagicMock()
mock_handler = mock.MagicMock() mock_parse_mime.return_value = [mock_part]
mock_get_custom_handler.return_value = mock_handler mock_process_part.return_value = (base.PLUGIN_EXECUTION_DONE, reboot)
self._userdata._begin_part_process_event(mock_part)
mock_part.get_filename.assert_called_with()
mock_part.get_payload.assert_called_with()
mock_handler.assert_called_with("", "__begin__",
mock_part.get_filename(),
mock_part.get_payload())
@mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' response = self._userdata._process_user_data(user_data=user_data)
'._get_custom_handler') if user_data.startswith('Content-Type: multipart'):
def test_end_part_process_event(self, mock_get_custom_handler): mock_load_plugins.assert_called_once_with()
mock_part = mock.MagicMock() mock_parse_mime.assert_called_once_with(user_data)
mock_handler = mock.MagicMock() mock_process_part.assert_called_once_with(mock_part,
mock_get_custom_handler.return_value = mock_handler mock_load_plugins(), {})
self._userdata._end_part_process_event(mock_part) self.assertEqual(response, (base.PLUGIN_EXECUTION_DONE, reboot))
mock_part.get_payload.assert_called_with()
mock_handler.assert_called_with("", "__end__",
mock_part.get_filename(),
mock_part.get_payload())
def test_get_custom_handler(self):
mock_part = mock.MagicMock()
mock_part.get_content_type.return_value = 0
self._userdata.plugin_set.has_custom_handlers = True
self._userdata.plugin_set.custom_handlers = [0]
response = self._userdata._get_custom_handler(mock_part)
mock_part.get_content_type.assert_called_with()
self.assertEqual(response, 0)
def test_get_part_handler(self):
mock_part = mock.MagicMock()
mock_part.get_content_type.return_value = 0
self._userdata.plugin_set.set = {0: 'fake value'}
response = self._userdata._get_part_handler(mock_part)
mock_part.get_content_type.assert_called_with()
self.assertEqual(response, 'fake value')
@mock.patch('email.message_from_string')
def test_parse_mime(self, mock_message_from_string):
mock_msg = mock.MagicMock()
mock_message_from_string.return_value = mock_msg
response = self._userdata._parse_mime(self.fake_data)
mock_message_from_string.assert_called_once_with(self.fake_data)
mock_msg.walk.assert_called_once_with()
self.assertEqual(response, mock_msg.walk())
@mock.patch('re.search')
@mock.patch('tempfile.gettempdir')
@mock.patch('os.remove')
@mock.patch('os.path.isdir')
@mock.patch('os.path.join')
@mock.patch('os.path.exists')
@mock.patch('os.path.expandvars')
@mock.patch('cloudbaseinit.osutils.factory.OSUtilsFactory.get_os_utils')
def _test_handle(self, mock_get_os_utils, mock_path_expandvars,
mock_path_exists, mock_path_join, mock_path_isdir,
mock_os_remove, mock_gettempdir, mock_re_search,
fake_user_data, directory_exists):
#TODO: recheck, these are old!
mock_osutils = mock.MagicMock()
uuid.uuid4 = mock.MagicMock(return_value='randomID')
mock_path_join.return_value = 'fake_temp\\randomID'
match_instance = mock.MagicMock()
path = 'fake_temp\\randomID'
args = None
mock_get_os_utils.return_value = mock_osutils
if fake_user_data == '^rem cmd\s':
side_effect = [match_instance]
number_of_calls = 1
extension = '.cmd'
args = [path+extension]
shell = True
elif fake_user_data == '#!':
side_effect = [None, match_instance]
number_of_calls = 2
extension = '.sh'
args = ['bash.exe', path+extension]
shell = False
elif fake_user_data == '#ps1\s':
side_effect = [None, None, match_instance]
number_of_calls = 3
extension = '.ps1'
args = ['powershell.exe', '-ExecutionPolicy', 'RemoteSigned',
'-NonInteractive', path+extension]
shell = False
else: else:
side_effect = [None, None, None, match_instance] mock_process_non_multi_part.assert_called_once_with(user_data)
number_of_calls = 4 self.assertEqual(response, mock_process_non_multi_part())
extension = '.ps1'
shell = False
if directory_exists:
args = [mock_path_expandvars('%windir%\\sysnative\\'
'WindowsPowerShell\\v1.0\\'
'powershell.exe'),
'-ExecutionPolicy',
'RemoteSigned', '-NonInteractive', path+extension]
mock_path_isdir.return_value = True
else:
mock_path_isdir.return_value = False
mock_re_search.side_effect = side_effect def test_process_user_data_multipart_reboot_true(self):
self._test_process_user_data(user_data='Content-Type: multipart',
reboot=True)
with mock.patch('cloudbaseinit.plugins.windows.userdata.open', def test_process_user_data_multipart_reboot_false(self):
mock.mock_open(), create=True): self._test_process_user_data(user_data='Content-Type: multipart',
response = userdata.handle(fake_user_data) reboot=False)
tempfile.gettempdir.assert_called_once_with() def test_process_user_data_non_multipart(self):
self._test_process_user_data(user_data='Content-Type: non-multipart',
reboot=False)
mock_path_join.assert_called_once_with(mock_gettempdir(), @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin'
str(uuid.uuid4())) '._add_part_handlers')
assert mock_re_search.call_count == number_of_calls @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin'
if args: '._get_plugin_return_value')
mock_osutils.execute_process.assert_called_with(args, shell) def _test_process_part(self, mock_get_plugin_return_value,
mock_add_part_handlers,
handler_func, user_data_plugin, content_type):
mock_part = mock.MagicMock()
mock_user_data_plugins = mock.MagicMock()
mock_user_handlers = mock.MagicMock()
mock_user_handlers.get.side_effect = [handler_func]
mock_user_data_plugins.get.side_effect = [user_data_plugin]
if content_type:
_content_type = self._userdata._part_handler_content_type
mock_part.get_content_type.return_value = _content_type
else:
_content_type = 'other content type'
mock_part.get_content_type.return_value = _content_type
self.assertEqual(response, (1, False)) response = self._userdata._process_part(
part=mock_part, user_data_plugins=mock_user_data_plugins,
user_handlers=mock_user_handlers)
mock_part.get_content_type.assert_called_once_with()
mock_user_handlers.get.assert_called_once_with(
_content_type)
if handler_func and handler_func is Exception:
self.assertEqual(mock_part.get_content_type.call_count, 2)
self.assertEqual(mock_part.get_filename.call_count, 2)
elif handler_func:
handler_func.assert_called_once_with(None, _content_type,
mock_part.get_filename(),
mock_part.get_payload())
def test_handle_batch(self): self.assertEqual(mock_part.get_content_type.call_count, 1)
fake_user_data = '^rem cmd\s' self.assertEqual(mock_part.get_filename.call_count, 2)
self._test_handle(fake_user_data=fake_user_data, else:
directory_exists=True) mock_user_data_plugins.get.assert_called_once_with(_content_type)
if user_data_plugin and content_type:
user_data_plugin.process.assert_called_with(mock_part)
mock_add_part_handlers.assert_called_with(
mock_user_data_plugins, mock_user_handlers,
user_data_plugin.process())
elif user_data_plugin and not content_type:
mock_get_plugin_return_value.assert_called_once_with(
user_data_plugin.process())
self.assertEqual(response, mock_get_plugin_return_value())
def test_handle_shell(self): def test_process_part(self):
fake_user_data = '^#!' handler_func = mock.MagicMock()
self._test_handle(fake_user_data=fake_user_data, self._test_process_part(handler_func=handler_func,
directory_exists=True) user_data_plugin=None, content_type=False)
def test_handle_powershell(self): def test_process_part_no_handler_func(self):
fake_user_data = '^#ps1\s' user_data_plugin = mock.MagicMock()
self._test_handle(fake_user_data=fake_user_data, self._test_process_part(handler_func=None,
directory_exists=True) user_data_plugin=user_data_plugin,
content_type=True)
def test_handle_powershell_sysnative(self): def test_process_part_not_content_type(self):
fake_user_data = '#ps1_sysnative\s' user_data_plugin = mock.MagicMock()
self._test_handle(fake_user_data=fake_user_data, self._test_process_part(handler_func=None,
directory_exists=True) user_data_plugin=user_data_plugin,
content_type=False)
def test_handle_powershell_sysnative_no_sysnative(self): @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin'
fake_user_data = '#ps1_sysnative\s' '._begin_part_process_event')
self._test_handle(fake_user_data=fake_user_data, def _test_add_part_handlers(self, mock_begin_part_process_event, ret_val):
directory_exists=False) mock_user_data_plugins = mock.MagicMock(spec=dict)
mock_new_user_handlers = mock.MagicMock(spec=dict)
mock_user_handlers = mock.MagicMock(spec=dict)
mock_handler_func = mock.MagicMock()
mock_new_user_handlers.items.return_value = [('fake content type',
mock_handler_func)]
if ret_val:
mock_user_data_plugins.get.return_value = mock_handler_func
else:
mock_user_data_plugins.get.return_value = None
self._userdata._add_part_handlers(
user_data_plugins=mock_user_data_plugins,
user_handlers=mock_user_handlers,
new_user_handlers=mock_new_user_handlers)
mock_user_data_plugins.get.assert_called_with('fake content type')
if ret_val is None:
mock_user_handlers.__setitem__.assert_called_once_with(
'fake content type', mock_handler_func)
mock_begin_part_process_event.assert_called_with(mock_handler_func)
def test_add_part_handlers(self):
self._test_add_part_handlers(ret_val=None)
def test_add_part_handlers_skip_part_handler(self):
mock_func = mock.MagicMock()
self._test_add_part_handlers(ret_val=mock_func)
def test_begin_part_process_event(self):
mock_handler_func = mock.MagicMock()
self._userdata._begin_part_process_event(
handler_func=mock_handler_func)
mock_handler_func.assert_called_once_with(None, "__begin__", None,
None)
def test_end_part_process_event(self):
mock_handler_func = mock.MagicMock()
self._userdata._end_part_process_event(
handler_func=mock_handler_func)
mock_handler_func.assert_called_once_with(None, "__end__", None,
None)
@mock.patch('cloudbaseinit.plugins.windows.userdatautils'
'.execute_user_data_script')
@mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin'
'._get_plugin_return_value')
def test_process_non_multi_part(self, mock_get_plugin_return_value,
mock_execute_user_data_script):
user_data = 'fake'
response = self._userdata._process_non_multi_part(user_data=user_data)
mock_execute_user_data_script.assert_called_once_with(user_data)
mock_get_plugin_return_value.assert_called_once_with(
mock_execute_user_data_script())
self.assertEqual(response, mock_get_plugin_return_value())

View File

@ -1,43 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import unittest
from cloudbaseinit.plugins.windows import userdata_plugins
class MultipartUserDataPluginTest(unittest.TestCase):
def setUp(self):
fake_path = 'fake path'
self._userdata = userdata_plugins.PluginSet(fake_path)
@mock.patch('glob.glob')
@mock.patch('cloudbaseinit.plugins.windows.userdata_plugins.'
'load_from_file')
def test_load(self, mock_load_from_file, mock_glob):
fake_files = ['fake_file.py']
mock_plugin = mock.MagicMock()
mock_glob.return_value = fake_files
mock_load_from_file.return_value = mock_plugin
self._userdata.load()
mock_glob.assert_called_once_with(self._userdata.path + '/*.py')
mock_load_from_file.assert_called_once_with('fake_file.py',
self._userdata)
self.assertEqual(self._userdata.set[mock_plugin.type], mock_plugin)

View File

@ -0,0 +1,135 @@
# Copyright 2014 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import os
import uuid
import unittest
from cloudbaseinit.openstack.common import cfg
from cloudbaseinit.plugins.windows import userdatautils
from cloudbaseinit.tests.metadata import fake_json_response
CONF = cfg.CONF
class UserDataUtilsTest(unittest.TestCase):
def setUp(self):
self.fake_data = fake_json_response.get_fake_metadata_json(
'2013-04-04')
@mock.patch('re.search')
@mock.patch('tempfile.gettempdir')
@mock.patch('os.remove')
@mock.patch('os.path.isdir')
@mock.patch('os.path.exists')
@mock.patch('os.path.expandvars')
@mock.patch('cloudbaseinit.osutils.factory.OSUtilsFactory.get_os_utils')
def _test_execute_user_data_script(self, mock_get_os_utils,
mock_path_expandvars,
mock_path_exists, mock_path_isdir,
mock_os_remove, mock_gettempdir,
mock_re_search, fake_user_data,
directory_exists):
mock_osutils = mock.MagicMock()
mock_gettempdir.return_value = 'fake_temp'
uuid.uuid4 = mock.MagicMock(return_value='randomID')
match_instance = mock.MagicMock()
path = os.path.join('fake_temp', 'randomID')
args = None
mock_get_os_utils.return_value = mock_osutils
if fake_user_data == '^rem cmd\s':
side_effect = [match_instance]
number_of_calls = 1
extension = '.cmd'
args = [path+extension]
shell = True
elif fake_user_data == '^#!/usr/bin/env\spython\s':
side_effect = [None, match_instance]
number_of_calls = 2
extension = '.py'
args = ['python.exe', path+extension]
shell = False
elif fake_user_data == '#!':
side_effect = [None, None, match_instance]
number_of_calls = 3
extension = '.sh'
args = ['bash.exe', path+extension]
shell = False
elif fake_user_data == '#ps1\s':
side_effect = [None, None, None, match_instance]
number_of_calls = 4
extension = '.ps1'
args = ['powershell.exe', '-ExecutionPolicy', 'RemoteSigned',
'-NonInteractive', path+extension]
shell = False
else:
side_effect = [None, None, None, None, match_instance]
number_of_calls = 5
extension = '.ps1'
shell = False
if directory_exists:
args = [mock_path_expandvars('%windir%\\sysnative\\'
'WindowsPowerShell\\v1.0\\'
'powershell.exe'),
'-ExecutionPolicy',
'RemoteSigned', '-NonInteractive', path+extension]
mock_path_isdir.return_value = True
else:
mock_path_isdir.return_value = False
mock_re_search.side_effect = side_effect
with mock.patch('cloudbaseinit.plugins.windows.userdatautils.open',
mock.mock_open(), create=True):
response = userdatautils.execute_user_data_script(fake_user_data)
mock_gettempdir.assert_called_once_with()
self.assertEqual(mock_re_search.call_count, number_of_calls)
if args:
mock_osutils.execute_process.assert_called_with(args, shell)
if not directory_exists:
self.assertEqual(response, 0)
else:
self.assertEqual(response, None)
def test_handle_batch(self):
fake_user_data = '^rem cmd\s'
self._test_execute_user_data_script(fake_user_data=fake_user_data,
directory_exists=True)
def test_handle_python(self):
fake_user_data = '^#!/usr/bin/env\spython\s'
self._test_execute_user_data_script(fake_user_data=fake_user_data,
directory_exists=True)
def test_handle_shell(self):
fake_user_data = '^#!'
self._test_execute_user_data_script(fake_user_data=fake_user_data,
directory_exists=True)
def test_handle_powershell(self):
fake_user_data = '^#ps1\s'
self._test_execute_user_data_script(fake_user_data=fake_user_data,
directory_exists=True)
def test_handle_powershell_sysnative(self):
fake_user_data = '#ps1_sysnative\s'
self._test_execute_user_data_script(fake_user_data=fake_user_data,
directory_exists=True)
def test_handle_powershell_sysnative_no_sysnative(self):
fake_user_data = '#ps1_sysnative\s'
self._test_execute_user_data_script(fake_user_data=fake_user_data,
directory_exists=False)

View File

@ -1,87 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import mock
import os
import unittest
from cloudbaseinit.openstack.common import cfg
from cloudbaseinit.plugins.windows import userdata_plugins
#the name of the module includes "-", importlib.import_module is needed:
parthandler = importlib.import_module("cloudbaseinit.plugins.windows"
".userdata-plugins.parthandler")
CONF = cfg.CONF
class PartHandlerScriptHandlerTests(unittest.TestCase):
def setUp(self):
parent_set = userdata_plugins.PluginSet('fake_path')
self._parthandler = parthandler.PartHandlerScriptHandler(parent_set)
@mock.patch('imp.load_source')
@mock.patch('imp.load_compiled')
@mock.patch('cloudbaseinit.plugins.windows.userdata-plugins.parthandler'
'.__import__', create=True)
def _test_load_from_file(self, mock__import__, mock_load_compiled,
mock_load_source, filepath):
mock_module = mock.MagicMock()
mock__import__.return_value = mock_module
mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1])
response = parthandler.load_from_file(filepath, 'fake_function')
print response
if file_ext.lower() == '.py':
mock_load_source.assert_called_with('path', filepath)
elif file_ext.lower() == '.pyc':
mock_load_compiled.assert_called_with('path', filepath)
mock__import__.assert_called_once_with('path')
self.assertEqual(response, mock_module.fake_function)
def test_load_from_file_py(self):
fake_file_path = os.path.join(os.path.join('fake', 'file'), 'path')
self._test_load_from_file(filepath=fake_file_path + '.py')
def test_load_from_file_pyc(self):
fake_file_path = os.path.join(os.path.join('fake', 'file'), 'path')
self._test_load_from_file(filepath=fake_file_path + '.pyc')
@mock.patch('cloudbaseinit.plugins.windows.userdata-plugins.parthandler.'
'load_from_file')
def test_process(self, mock_load_from_file):
mock_part = mock.MagicMock()
mock_part.get_filename.return_value = 'fake_name'
handler_path = self._parthandler.parent_set.path + "/part-handler/"
handler_path += 'fake_name'
expected = [mock.call(),
mock.call(handler_path, "list_types"),
mock.call(handler_path, "handle_part")]
mock_load_from_file().return_value = ['fake part']
with mock.patch("cloudbaseinit.plugins.windows.userdata-plugins."
"parthandler.open", mock.mock_open(), create=True):
self._parthandler.process(mock_part)
print mock_load_from_file.mock_calls
print self._parthandler.parent_set.custom_handlers
mock_part.get_filename.assert_called_once_with()
mock_part.get_payload.assert_called_once_with()
self.assertEqual(mock_load_from_file.call_args_list, expected)
self.assertEqual(self._parthandler.parent_set.has_custom_handlers,
True)
self.assertEqual(self._parthandler.parent_set.custom_handlers,
{'fake part': mock_load_from_file()})

View File

@ -0,0 +1,36 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import unittest
from cloudbaseinit.openstack.common import cfg
from cloudbaseinit.plugins.windows.userdataplugins import cloudboothook
CONF = cfg.CONF
class CloudBootHookPluginTests(unittest.TestCase):
def setUp(self):
self._cloud_hook = cloudboothook.CloudBootHookPlugin()
@mock.patch('cloudbaseinit.plugins.windows.userdataplugins.base'
'.BaseUserDataPlugin.get_mime_type')
def test_process(self, mock_get_mime_type):
mock_part = mock.MagicMock()
self._cloud_hook.process(mock_part)
mock_get_mime_type.assert_called_once_with()

View File

@ -0,0 +1,36 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import unittest
from cloudbaseinit.openstack.common import cfg
from cloudbaseinit.plugins.windows.userdataplugins import cloudconfig
CONF = cfg.CONF
class CloudConfigPluginTests(unittest.TestCase):
def setUp(self):
self._cloudconfig = cloudconfig.CloudConfigPlugin()
@mock.patch('cloudbaseinit.plugins.windows.userdataplugins.base'
'.BaseUserDataPlugin.get_mime_type')
def test_process(self, mock_get_mime_type):
mock_part = mock.MagicMock()
self._cloudconfig.process(mock_part)
mock_get_mime_type.assert_called_once_with()

View File

@ -0,0 +1,34 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import unittest
from cloudbaseinit.openstack.common import cfg
from cloudbaseinit.plugins.windows.userdataplugins import factory
CONF = cfg.CONF
class UserDataPluginsFactoryTests(unittest.TestCase):
def setUp(self):
self._factory = factory.UserDataPluginsFactory()
@mock.patch('cloudbaseinit.utils.classloader.ClassLoader.load_class')
def test_process(self, mock_load_class):
response = self._factory.load_plugins()
self.assertTrue(response is not None)

View File

@ -14,15 +14,11 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import importlib
import mock import mock
import unittest import unittest
from cloudbaseinit.openstack.common import cfg from cloudbaseinit.openstack.common import cfg
from cloudbaseinit.plugins.windows import userdata_plugins from cloudbaseinit.plugins.windows.userdataplugins import heat
#the name of the module includes "-", importlib.import_module is needed:
heathandler = importlib.import_module("cloudbaseinit.plugins.windows"
".userdata-plugins.heathandler")
CONF = cfg.CONF CONF = cfg.CONF
@ -30,13 +26,24 @@ CONF = cfg.CONF
class HeatUserDataHandlerTests(unittest.TestCase): class HeatUserDataHandlerTests(unittest.TestCase):
def setUp(self): def setUp(self):
parent_set = userdata_plugins.PluginSet self._heat = heat.HeatPlugin()
self._heathandler = heathandler.HeatUserDataHandler(parent_set)
@mock.patch('cloudbaseinit.plugins.windows.userdata.handle') @mock.patch('cloudbaseinit.plugins.windows.userdatautils'
def test_process(self, mock_handle): '.execute_user_data_script')
def _test_process(self, mock_execute_user_data_script, filename):
mock_part = mock.MagicMock() mock_part = mock.MagicMock()
mock_part.get_filename.return_value = "cfn-userdata" mock_part.get_filename.return_value = filename
self._heathandler.process(mock_part) response = self._heat.process(mock_part)
mock_part.get_filename.assert_called_once_with() mock_part.get_filename.assert_called_with()
mock_handle.assert_called_once_with(mock_part.get_payload()) if filename:
mock_execute_user_data_script.assert_called_with(
mock_part.get_payload())
self.assertEqual(response, mock_execute_user_data_script())
else:
self.assertTrue(response is None)
def test_process(self):
self._test_process(filename='cfn-userdata')
def test_process_content_not_supported(self):
self._test_process(filename=None)

View File

@ -0,0 +1,52 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import os
import unittest
from cloudbaseinit.openstack.common import cfg
from cloudbaseinit.plugins.windows.userdataplugins import parthandler
CONF = cfg.CONF
class PartHandlerPluginTests(unittest.TestCase):
def setUp(self):
self._parthandler = parthandler.PartHandlerPlugin()
@mock.patch('tempfile.gettempdir')
@mock.patch('cloudbaseinit.utils.classloader.ClassLoader.load_module')
def test_process(self, mock_load_module, mock_gettempdir):
mock_part = mock.MagicMock()
mock_part_handler = mock.MagicMock()
mock_part.get_filename.return_value = 'fake_name'
mock_gettempdir.return_value = 'fake_directory'
mock_load_module.return_value = mock_part_handler
mock_part_handler.list_types.return_value = ['fake part']
with mock.patch('cloudbaseinit.plugins.windows.userdataplugins.'
'parthandler.open',
mock.mock_open(read_data='fake data'), create=True):
response = self._parthandler.process(mock_part)
mock_part.get_filename.assert_called_once_with()
mock_load_module.assert_called_once_with(os.path.join(
'fake_directory', 'fake_name'))
mock_part_handler.list_types.assert_called_once_with()
self.assertEqual(response, {'fake part':
mock_part_handler.handle_part})

View File

@ -14,25 +14,20 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import importlib
import mock import mock
import os import os
import unittest import unittest
from cloudbaseinit.openstack.common import cfg from cloudbaseinit.openstack.common import cfg
from cloudbaseinit.plugins.windows import userdata_plugins from cloudbaseinit.plugins.windows.userdataplugins import shellscript
#the name of the module includes "-", importlib.import_module is needed:
shellscript = importlib.import_module("cloudbaseinit.plugins.windows"
".userdata-plugins.shellscript")
CONF = cfg.CONF CONF = cfg.CONF
class ShellScriptHandlerTests(unittest.TestCase): class ShellScriptPluginTests(unittest.TestCase):
def setUp(self): def setUp(self):
parent_set = userdata_plugins.PluginSet('fake_path') self._shellscript = shellscript.ShellScriptPlugin()
self._shellscript = shellscript.ShellScriptHandler(parent_set)
@mock.patch('cloudbaseinit.osutils.factory.OSUtilsFactory.get_os_utils') @mock.patch('cloudbaseinit.osutils.factory.OSUtilsFactory.get_os_utils')
@mock.patch('tempfile.gettempdir') @mock.patch('tempfile.gettempdir')
@ -49,7 +44,7 @@ class ShellScriptHandlerTests(unittest.TestCase):
if exception: if exception:
mock_osutils.execute_process.side_effect = [Exception] mock_osutils.execute_process.side_effect = [Exception]
with mock.patch("cloudbaseinit.plugins.windows.userdata-plugins." with mock.patch("cloudbaseinit.plugins.windows.userdataplugins."
"shellscript.open", mock.mock_open(), create=True): "shellscript.open", mock.mock_open(), create=True):
response = self._shellscript.process(mock_part) response = self._shellscript.process(mock_part)
@ -61,21 +56,27 @@ class ShellScriptHandlerTests(unittest.TestCase):
elif filename.endswith(".sh"): elif filename.endswith(".sh"):
mock_osutils.execute_process.assert_called_with( mock_osutils.execute_process.assert_called_with(
['bash.exe', os.path.join(fake_dir_path, filename)], False) ['bash.exe', os.path.join(fake_dir_path, filename)], False)
elif filename.endswith(".py"):
mock_osutils.execute_process.assert_called_with(
['python.exe', os.path.join(fake_dir_path, filename)], False)
elif filename.endswith(".ps1"): elif filename.endswith(".ps1"):
mock_osutils.execute_process.assert_called_with( mock_osutils.execute_process.assert_called_with(
['powershell.exe', '-ExecutionPolicy', 'RemoteSigned', ['powershell.exe', '-ExecutionPolicy', 'RemoteSigned',
'-NonInteractive', os.path.join(fake_dir_path, filename)], '-NonInteractive', os.path.join(fake_dir_path, filename)],
False) False)
self.assertFalse(response) self.assertFalse(response)
def test_process_cmd(self): def test_process_cmd(self):
self._test_process(filename='fake.cmd') self._test_process(filename='fake.cmd')
def test_process_sh(self): def test_process_sh(self):
self._test_process(filename='fake.cmd') self._test_process(filename='fake.sh')
def test_process_py(self):
self._test_process(filename='fake.py')
def test_process_ps1(self): def test_process_ps1(self):
self._test_process(filename='fake.cmd') self._test_process(filename='fake.ps1')
def test_process_other(self): def test_process_other(self):
self._test_process(filename='fake.other') self._test_process(filename='fake.other')