diff --git a/cloudbaseinit/tests/metadata/services/test_cloudstack.py b/cloudbaseinit/tests/metadata/services/test_cloudstack.py index 8ebaa004..965d1978 100644 --- a/cloudbaseinit/tests/metadata/services/test_cloudstack.py +++ b/cloudbaseinit/tests/metadata/services/test_cloudstack.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import socket import unittest try: @@ -23,6 +24,7 @@ from six.moves import urllib from cloudbaseinit.metadata.services import base from cloudbaseinit.metadata.services import cloudstack +from cloudbaseinit.tests import testutils CONF = cfg.CONF @@ -49,11 +51,12 @@ class CloudStackTest(unittest.TestCase): msg='Testing 404 Not Found.'), urllib.error.HTTPError(url=url, code=427, hdrs={}, fp=None, msg='Testing 429 Too Many Requests.'), - base.NotExistingMetadataException() + base.NotExistingMetadataException(), + socket.error, ] self.assertTrue(self._service._test_api(url)) - for _ in range(3): + for _ in range(4): self.assertFalse(self._service._test_api(url)) @mock.patch('cloudbaseinit.osutils.factory.get_os_utils') @@ -181,3 +184,21 @@ class CloudStackTest(unittest.TestCase): for _ in range(3): response = self._service.get_public_keys() self.assertEqual([], response) + + @mock.patch('six.moves.urllib.request') + def test__http_request(self, mock_urllib_request): + mock_urllib_request.Request.return_value = mock.sentinel.request + with testutils.LogSnatcher('cloudbaseinit.metadata.services.' + 'cloudstack') as snatcher: + self._service._http_request(mock.sentinel.url) + + expected_logging = [ + 'Getting metadata from: %s' % mock.sentinel.url, + ] + mock_urllib_request.Request.assert_called_once_with( + mock.sentinel.url) + mock_urllib_request.urlopen.assert_called_once_with( + mock.sentinel.request) + mock_urlopen = mock_urllib_request.urlopen.return_value + mock_urlopen.read.assert_called_once_with() + self.assertEqual(expected_logging, snatcher.output) diff --git a/cloudbaseinit/tests/metadata/services/test_maasservice.py b/cloudbaseinit/tests/metadata/services/test_maasservice.py index 46ffce9e..ac62a1fc 100644 --- a/cloudbaseinit/tests/metadata/services/test_maasservice.py +++ b/cloudbaseinit/tests/metadata/services/test_maasservice.py @@ -38,14 +38,24 @@ class MaaSHttpServiceTest(unittest.TestCase): self._maasservice = maasservice.MaaSHttpService() @mock.patch("cloudbaseinit.metadata.services.maasservice.MaaSHttpService" - "._get_data") - def _test_load(self, mock_get_data, ip): + "._get_cache_data") + def _test_load(self, mock_get_cache_data, ip, cache_data_fails=False): + if cache_data_fails: + mock_get_cache_data.side_effect = Exception + with testutils.ConfPatcher('maas_metadata_url', ip): - response = self._maasservice.load() + with testutils.LogSnatcher('cloudbaseinit.metadata.services.' + 'maasservice') as snatcher: + response = self._maasservice.load() + if ip is not None: - mock_get_data.assert_called_once_with( - '%s/meta-data/' % self._maasservice._metadata_version) - self.assertTrue(response) + if not cache_data_fails: + mock_get_cache_data.assert_called_once_with( + '%s/meta-data/' % self._maasservice._metadata_version) + self.assertTrue(response) + else: + expected_logging = 'Metadata not found at URL \'%s\'' % ip + self.assertEqual(expected_logging, snatcher.output[-1]) else: self.assertFalse(response) @@ -55,6 +65,9 @@ class MaaSHttpServiceTest(unittest.TestCase): def test_load_no_ip(self): self._test_load(ip=None) + def test_load_get_cache_data_fails(self): + self._test_load(ip='196.254.196.254', cache_data_fails=True) + @mock.patch('six.moves.urllib.request.urlopen') def _test_get_response(self, mock_urlopen, ret_val): mock_request = mock.MagicMock() diff --git a/cloudbaseinit/tests/osutils/test_base.py b/cloudbaseinit/tests/osutils/test_base.py index 956714c0..1bc3ea54 100755 --- a/cloudbaseinit/tests/osutils/test_base.py +++ b/cloudbaseinit/tests/osutils/test_base.py @@ -56,3 +56,11 @@ class BaseOSUtilsTests(unittest.TestCase): response) else: self.assertEqual((mock_out, mock_err, mock_p.returncode), response) + + @mock.patch('os.urandom') + def test_generate_random_password(self, mock_urandom): + mock_urandom.return_value = b"test" + response = self._base.generate_random_password(20) + + mock_urandom.assert_called_once_with(256) + self.assertEqual("dGVzdA==", response) diff --git a/cloudbaseinit/tests/plugins/common/test_setuserpassword.py b/cloudbaseinit/tests/plugins/common/test_setuserpassword.py index 157e7841..4fcf077e 100644 --- a/cloudbaseinit/tests/plugins/common/test_setuserpassword.py +++ b/cloudbaseinit/tests/plugins/common/test_setuserpassword.py @@ -131,9 +131,16 @@ class SetUserPasswordPluginTests(unittest.TestCase): mock_service.post_password.return_value = 'value' mock_service.can_post_password = True mock_service.is_password_set = False - response = self._setpassword_plugin._set_metadata_password( - fake_passw0rd, mock_service) + with testutils.LogSnatcher('cloudbaseinit.plugins.common.' + 'setuserpassword') as snatcher: + response = self._setpassword_plugin._set_metadata_password( + fake_passw0rd, mock_service) + + expected_logging = [] if ssh_pub_key is None: + expected_logging = [ + 'No SSH public key available for password encryption' + ] self.assertTrue(response) else: mock_get_key.assert_called_once_with(mock_service) @@ -142,6 +149,7 @@ class SetUserPasswordPluginTests(unittest.TestCase): mock_service.post_password.assert_called_with( 'encrypted password') self.assertEqual('value', response) + self.assertEqual(expected_logging, snatcher.output) def test_set_metadata_password_with_ssh_key(self): fake_key = 'fake key' @@ -150,6 +158,20 @@ class SetUserPasswordPluginTests(unittest.TestCase): def test_set_metadata_password_no_ssh_key(self): self._test_set_metadata_password(ssh_pub_key=None) + def test_set_metadata_password_already_set(self): + mock_service = mock.MagicMock() + mock_service.is_password_set = True + with testutils.LogSnatcher('cloudbaseinit.plugins.common.' + 'setuserpassword') as snatcher: + response = self._setpassword_plugin._set_metadata_password( + mock.sentinel.fake_password, mock_service) + + self.assertTrue(response) + expected_logging = ['User\'s password already set in the ' + 'instance metadata and it cannot be ' + 'updated in the instance metadata'] + self.assertEqual(expected_logging, snatcher.output) + @mock.patch('cloudbaseinit.plugins.common.setuserpassword.' 'SetUserPasswordPlugin._get_password') def test_set_password(self, mock_get_password): diff --git a/cloudbaseinit/tests/plugins/common/test_sshpublickeys.py b/cloudbaseinit/tests/plugins/common/test_sshpublickeys.py index 47a9ca0c..83a1cef0 100644 --- a/cloudbaseinit/tests/plugins/common/test_sshpublickeys.py +++ b/cloudbaseinit/tests/plugins/common/test_sshpublickeys.py @@ -72,3 +72,14 @@ class SetUserSSHPublicKeysPluginTests(unittest.TestCase): def test_execute_with_no_user_home(self): self._test_execute(user_home=None) + + def test_no_public_keys(self): + mock_service = mock.Mock() + mock_service.get_public_keys.return_value = None + + with testutils.LogSnatcher('cloudbaseinit.plugins.common.' + 'sshpublickeys') as snatcher: + self._set_ssh_keys_plugin.execute(mock_service, {}) + + expected_logging = ['Public keys not found in metadata'] + self.assertEqual(expected_logging, snatcher.output) diff --git a/cloudbaseinit/tests/plugins/common/userdataplugins/cloudconfigplugins/test_write_files.py b/cloudbaseinit/tests/plugins/common/userdataplugins/cloudconfigplugins/test_write_files.py index 199628dc..5886332d 100644 --- a/cloudbaseinit/tests/plugins/common/userdataplugins/cloudconfigplugins/test_write_files.py +++ b/cloudbaseinit/tests/plugins/common/userdataplugins/cloudconfigplugins/test_write_files.py @@ -24,6 +24,7 @@ except ImportError: import mock from oslo.config import cfg +from cloudbaseinit import exception from cloudbaseinit.plugins.common.userdataplugins import cloudconfig from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import ( write_files @@ -162,6 +163,38 @@ class WriteFilesPluginTests(unittest.TestCase): self.assertTrue(snatcher.output[0].endswith("ValueError")) self.assertFalse(os.path.exists('random_cloudbaseinit_test')) + def test_wrong_gzip_content(self): + tmp = self._get_tempfile() + code = textwrap.dedent(""" + write_files: + - content: lala + encoding: gz + path: {} + """.format(tmp)) + with testutils.LogSnatcher('cloudbaseinit.plugins.common.' + 'userdataplugins.cloudconfigplugins.' + 'write_files') as snatcher: + self.plugin.process_non_multipart(code) + + self.assertTrue(snatcher.output[0].startswith( + "Fail to decompress gzip content")) + + def test_wrong_b64_content(self): + tmp = self._get_tempfile() + code = textwrap.dedent(""" + write_files: + - content: l + encoding: b64 + path: {} + """.format(tmp)) + with testutils.LogSnatcher('cloudbaseinit.plugins.common.' + 'userdataplugins.cloudconfigplugins.' + 'write_files') as snatcher: + self.plugin.process_non_multipart(code) + + self.assertTrue(snatcher.output[0].startswith( + "Fail to decode base64 content.")) + def test_unknown_encoding(self): tmp = self._get_tempfile() code = textwrap.dedent(""" @@ -182,3 +215,10 @@ class WriteFilesPluginTests(unittest.TestCase): self.assertEqual(["Unknown encoding, doing nothing."], snatcher.output) + + def test_invalid_object_passed(self): + with self.assertRaises(exception.CloudbaseInitException) as cm: + write_files.WriteFilesPlugin().process(1) + + expected = "Can't process the type of data %r" % type(1) + self.assertEqual(expected, str(cm.exception)) diff --git a/cloudbaseinit/tests/plugins/common/userdataplugins/test_shellscript.py b/cloudbaseinit/tests/plugins/common/userdataplugins/test_shellscript.py index d684b462..4c6c5e39 100644 --- a/cloudbaseinit/tests/plugins/common/userdataplugins/test_shellscript.py +++ b/cloudbaseinit/tests/plugins/common/userdataplugins/test_shellscript.py @@ -29,12 +29,17 @@ class ShellScriptPluginTests(unittest.TestCase): def setUp(self): self._shellscript = shellscript.ShellScriptPlugin() + @mock.patch('os.path.exists') + @mock.patch('os.remove') @mock.patch('cloudbaseinit.osutils.factory.get_os_utils') @mock.patch('tempfile.gettempdir') @mock.patch('cloudbaseinit.plugins.common.fileexecutils.exec_file') @mock.patch('cloudbaseinit.utils.encoding.write_file') def _test_process(self, mock_write_file, mock_exec_file, mock_gettempdir, - mock_get_os_utils, exception=False): + mock_get_os_utils, mock_os_remove, + mock_path_exists, exception=False): + + mock_path_exists.return_value = True fake_dir_path = os.path.join("fake", "dir") mock_osutils = mock.MagicMock() mock_part = mock.MagicMock() @@ -58,6 +63,8 @@ class ShellScriptPluginTests(unittest.TestCase): mock_gettempdir.assert_called_once_with() if not exception: self.assertEqual('fake response', response) + mock_os_remove.assert_called_once_with(fake_target) + mock_path_exists.assert_called_once_with(fake_target) def test_process(self): self._test_process(exception=False) diff --git a/cloudbaseinit/tests/test_exception.py b/cloudbaseinit/tests/test_exception.py index f5943f7b..ffd0d09c 100644 --- a/cloudbaseinit/tests/test_exception.py +++ b/cloudbaseinit/tests/test_exception.py @@ -12,24 +12,47 @@ # License for the specific language governing permissions and limitations # under the License. -import os -import unittest +try: + import unittest.mock as mock +except ImportError: + import mock from cloudbaseinit import exception from cloudbaseinit.tests import testutils -WINDOWS = os.name == "nt" - - -@unittest.skipUnless(WINDOWS, "This requires the Windows platform.") class TestException(testutils.CloudbaseInitTestBase): + @mock.patch('ctypes.GetLastError', create=True) + @mock.patch('ctypes.FormatError', create=True) + def _test_windows_exception(self, mock_format_error, + mock_get_last_error, message="Test %r", + description="test", + error_code=None): + + mock_format_error.return_value = description + mock_get_last_error.return_value = mock.sentinel.error_code + + with self.assertRaises(exception.CloudbaseInitException) as cm: + raise exception.WindowsCloudbaseInitException(message, error_code) + + if error_code is None: + mock_get_last_error.assert_called_once_with() + error_code = mock.sentinel.error_code + + try: + expected = message % description + except TypeError: + expected = message + + mock_format_error.assert_called_once_with(error_code) + self.assertEqual(expected, str(cm.exception)) + def test_windows_exception_no_error_code_given(self): - with self.assert_raises_windows_message("Test %r", error_code=100): - raise exception.WindowsCloudbaseInitException("Test %r") + self._test_windows_exception() def test_windows_exception_error_code_given(self): - with self.assert_raises_windows_message("Test %r", error_code=100): - raise exception.WindowsCloudbaseInitException("Test %r", - error_code=100) + self._test_windows_exception(error_code=100) + + def test_windows_exception_no_formatting_allowed(self): + self._test_windows_exception(message="Test") diff --git a/cloudbaseinit/tests/utils/test_dhcp.py b/cloudbaseinit/tests/utils/test_dhcp.py index bc5b281a..b83a6622 100644 --- a/cloudbaseinit/tests/utils/test_dhcp.py +++ b/cloudbaseinit/tests/utils/test_dhcp.py @@ -22,6 +22,7 @@ try: except ImportError: import mock +from cloudbaseinit.tests import testutils from cloudbaseinit.utils import dhcp @@ -157,3 +158,27 @@ class DHCPUtilsTests(unittest.TestCase): 'fake int') mock_socket().close.assert_called_once_with() self.assertEqual('fake replied options', response) + + def test__bind_dhcp_client_socket_bind_succeeds(self): + mock_socket = mock.Mock() + dhcp._bind_dhcp_client_socket(mock_socket, 0, 0) + + mock_socket.bind.assert_called_once_with(('', 68)) + + @mock.patch('time.sleep') + def test__bind_dhcp_client_socket(self, mock_time_sleep): + mock_socket = mock.Mock() + exc = socket.error() + exc.errno = 48 + mock_socket.bind = mock.Mock(side_effect=exc) + + with testutils.LogSnatcher('cloudbaseinit.utils.dhcp') as snatcher: + with self.assertRaises(socket.error): + dhcp._bind_dhcp_client_socket( + mock_socket, max_bind_attempts=4, + bind_retry_interval=mock.sentinel.bind_retry_interval) + + expected_occurences = sum( + 1 for item in snatcher.output + if item.startswith("Retrying to bind DHCP client port in ")) + self.assertEqual(3, expected_occurences) diff --git a/cloudbaseinit/tests/utils/test_network.py b/cloudbaseinit/tests/utils/test_network.py index 96af623c..93094e14 100644 --- a/cloudbaseinit/tests/utils/test_network.py +++ b/cloudbaseinit/tests/utils/test_network.py @@ -12,7 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import sys import unittest try: @@ -28,13 +27,13 @@ CONF = cfg.CONF class NetworkUtilsTest(unittest.TestCase): + @mock.patch('sys.platform', new='win32') @mock.patch('cloudbaseinit.osutils.factory.get_os_utils') @mock.patch('six.moves.urllib.parse.urlparse') def _test_check_metadata_ip_route(self, mock_urlparse, mock_get_os_utils, side_effect): mock_utils = mock.MagicMock() mock_split = mock.MagicMock() - sys.platform = 'win32' mock_get_os_utils.return_value = mock_utils mock_utils.check_os_version.return_value = True mock_urlparse().netloc.split.return_value = mock_split