diff --git a/cloudbaseinit/tests/osutils/test_windows.py b/cloudbaseinit/tests/osutils/test_windows.py index 9cb19577..bbcccdf0 100644 --- a/cloudbaseinit/tests/osutils/test_windows.py +++ b/cloudbaseinit/tests/osutils/test_windows.py @@ -988,7 +988,7 @@ class WindowsUtilsTest(unittest.TestCase): sizeof_calls = [mock.call( windows_utils.Win32_SP_DEVICE_INTERFACE_DATA), - mock.call(mock_sdn())] + mock.call(mock_sdn())] device_interfaces_calls = [mock.call(handle_disks, None, mock_byref(), 0, mock_byref()), mock.call(handle_disks, None, mock_byref(), @@ -1008,11 +1008,11 @@ class WindowsUtilsTest(unittest.TestCase): mock_setupapi.SetupDiEnumDeviceInterfaces.side_effect = [True, False] - if handle_disks == self._winutils.INVALID_HANDLE_VALUE \ - or last_error != self._winutils.ERROR_INSUFFICIENT_BUFFER \ - and not interface_detail \ - or disk_handle == self._winutils.INVALID_HANDLE_VALUE \ - or not io_control: + if handle_disks == self._winutils.INVALID_HANDLE_VALUE or ( + last_error != self._winutils.ERROR_INSUFFICIENT_BUFFER) and not ( + interface_detail) or ( + disk_handle == self._winutils.INVALID_HANDLE_VALUE) or ( + not io_control): self.assertRaises(Exception, self._winutils.get_physical_disks) @@ -1032,7 +1032,7 @@ class WindowsUtilsTest(unittest.TestCase): self.assertEqual(mock_cast.call_args_list, cast_calls) mock_setup_interface.assert_called_with(handle_disks, mock_byref(), - mock_cast(),mock_DWORD(), + mock_cast(), mock_DWORD(), None, None) mock_kernel32.CreateFileW.assert_called_with( mock_cast().value, 0, self._winutils.FILE_SHARE_READ, None, @@ -1050,8 +1050,6 @@ class WindowsUtilsTest(unittest.TestCase): mock_byref(), None, None, self._winutils.DIGCF_PRESENT | self._winutils.DIGCF_DEVICEINTERFACE) - - def test_get_physical_disks(self): mock_handle_disks = mock.MagicMock() mock_disk_handle = mock.MagicMock() @@ -1089,7 +1087,7 @@ class WindowsUtilsTest(unittest.TestCase): def test_get_physical_disks_handle_disks_invalid(self): mock_disk_handle = mock.MagicMock() self._test_get_physical_disks( - handle_disks=self._winutils.INVALID_HANDLE_VALUE , + handle_disks=self._winutils.INVALID_HANDLE_VALUE, last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER, interface_detail='fake interface detail', disk_handle=mock_disk_handle, io_control=True) @@ -1193,3 +1191,59 @@ class WindowsUtilsTest(unittest.TestCase): def test_execute_powershell_script_system32(self): self._test_execute_powershell_script(ret_val=False) + + @mock.patch('wmi.WMI') + def test_get_dhcp_hosts_in_use(self, mock_WMI): + mock_net_cfg = mock.MagicMock() + mock_net_cfg.DHCPServer = 'fake dhcp server' + mock_WMI().Win32_NetworkAdapterConfiguration.return_value = [ + mock_net_cfg] + response = self._winutils.get_dhcp_hosts_in_use() + mock_WMI.assert_called_with(moniker='//./root/cimv2') + mock_WMI().Win32_NetworkAdapterConfiguration.assert_called_with( + DHCPEnabled=True) + self.assertEqual(response, ['fake dhcp server']) + + @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' + '.check_sysnative_dir_exists') + @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' + '.get_sysnative_dir') + @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' + '.get_system32_dir') + @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' + '.execute_process') + def _test_set_ntp_client_config(self, mock_execute_process, + mock_get_system32_dir, + mock_get_sysnative_dir, + mock_check_sysnative_dir_exists, + sysnative, ret_val): + mock_check_sysnative_dir_exists.return_value = sysnative + fake_base_dir = 'fake_dir' + mock_execute_process.return_value = (None, None, ret_val) + w32tm_path = os.path.join(fake_base_dir, "w32tm.exe") + mock_get_sysnative_dir.return_value = fake_base_dir + mock_get_system32_dir.return_value = fake_base_dir + args = [w32tm_path, '/config', '/manualpeerlist:%s' % 'fake ntp host', + '/syncfromflags:manual', '/update'] + + if ret_val: + self.assertRaises(Exception, self._winutils.set_ntp_client_config, + args, False) + else: + self._winutils.set_ntp_client_config(ntp_host='fake ntp host') + + if sysnative: + mock_get_sysnative_dir.assert_called_once_with() + else: + mock_get_system32_dir.assert_called_once_with() + mock_execute_process.assert_called_once_with(args, False) + + def test_set_ntp_client_config_sysnative_true(self): + self._test_set_ntp_client_config(sysnative=True, ret_val=None) + + def test_set_ntp_client_config_sysnative_false(self): + self._test_set_ntp_client_config(sysnative=False, ret_val=None) + + def test_set_ntp_client_config_sysnative_exception(self): + self._test_set_ntp_client_config(sysnative=False, + ret_val='fake return value') diff --git a/cloudbaseinit/tests/plugins/windows/test_ntpclient.py b/cloudbaseinit/tests/plugins/windows/test_ntpclient.py new file mode 100644 index 00000000..cdb31e10 --- /dev/null +++ b/cloudbaseinit/tests/plugins/windows/test_ntpclient.py @@ -0,0 +1,113 @@ +# Copyright 2014 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import unittest + +from oslo.config import cfg + +from cloudbaseinit.plugins import base +from cloudbaseinit.plugins.windows import ntpclient +from cloudbaseinit.utils import dhcp + +CONF = cfg.CONF + + +class NTPClientPluginTests(unittest.TestCase): + + def setUp(self): + self._ntpclient = ntpclient.NTPClientPlugin() + + def _test_check_w32time_svc_status(self, start_mode, fail_service_start): + # TODO(rtingirica): use _W32TIME_SERVICE when it will be moved outside + # of method declaration + mock_osutils = mock.MagicMock() + mock_osutils.SERVICE_START_MODE_AUTOMATIC = "automatic" + mock_osutils.SERVICE_STATUS_RUNNING = "running" + mock_osutils.SERVICE_STATUS_STOPPED = "stopped" + mock_osutils.get_service_start_mode.return_value = start_mode + + if fail_service_start: + mock_osutils.get_service_status.return_value = "stopped" + self.assertRaises(Exception, + self._ntpclient._check_w32time_svc_status, + mock_osutils) + + else: + mock_osutils.get_service_status.side_effect = [ + "stopped", mock_osutils.SERVICE_STATUS_RUNNING] + self._ntpclient._check_w32time_svc_status(osutils=mock_osutils) + mock_osutils.get_service_start_mode.assert_called_once_with( + "w32time") + + if start_mode != mock_osutils.SERVICE_START_MODE_AUTOMATIC: + mock_osutils.set_service_start_mode.assert_called_once_with( + "w32time", mock_osutils.SERVICE_START_MODE_AUTOMATIC) + + mock_osutils.start_service.assert_called_once_with("w32time") + + mock_osutils.get_service_status.assert_called_with("w32time") + + def test_check_w32time_svc_status_other_start_mode(self): + self._test_check_w32time_svc_status(start_mode="not automatic", + fail_service_start=False) + + def test_check_w32time_svc_status_start_automatic(self): + self._test_check_w32time_svc_status(start_mode="automatic", + fail_service_start=False) + + def test_check_w32time_svc_status_exception(self): + self._test_check_w32time_svc_status(start_mode="automatic", + fail_service_start=True) + + @mock.patch('cloudbaseinit.osutils.factory.get_os_utils') + @mock.patch('cloudbaseinit.utils.dhcp.get_dhcp_options') + @mock.patch('socket.inet_ntoa') + @mock.patch('cloudbaseinit.plugins.windows.ntpclient.NTPClientPlugin.' + '_check_w32time_svc_status') + def _test_execute(self, mock_check_w32time_svc_status, mock_inet_ntoa, + mock_get_dhcp_options, mock_get_os_utils, ntp_data): + CONF.set_override('ntp_use_dhcp_config', True) + mock_service = mock.MagicMock() + mock_osutils = mock.MagicMock() + mock_options_data = mock.MagicMock() + + mock_get_os_utils.return_value = mock_osutils + mock_osutils.get_dhcp_hosts_in_use.return_value = ['fake dhcp host'] + mock_get_dhcp_options.return_value = mock_options_data + mock_options_data.get.return_value = ntp_data + mock_inet_ntoa.return_value = 'fake host' + + response = self._ntpclient.execute(service=mock_service, + shared_data='fake data') + + mock_osutils.get_dhcp_hosts_in_use.assert_called_once_with() + mock_get_dhcp_options.assert_called_once_with( + 'fake dhcp host', [dhcp.OPTION_NTP_SERVERS]) + mock_options_data.get.assert_called_once_with(dhcp.OPTION_NTP_SERVERS) + if ntp_data: + mock_inet_ntoa.assert_called_once_with(ntp_data[:4]) + self.assertEqual(response, (base.PLUGIN_EXECUTION_DONE, False)) + mock_check_w32time_svc_status.assert_called_once_with(mock_osutils) + mock_osutils.set_ntp_client_config.assert_called_once_with( + 'fake host') + else: + self.assertEqual(response, + (base.PLUGIN_EXECUTE_ON_NEXT_BOOT, False)) + + def test_execute_no_ntp_options_data(self): + self._test_execute(ntp_data=None) + + def test_execute(self): + self._test_execute(ntp_data='ntp:fake server') diff --git a/cloudbaseinit/tests/utils/test_dhcp.py b/cloudbaseinit/tests/utils/test_dhcp.py new file mode 100644 index 00000000..11d9284a --- /dev/null +++ b/cloudbaseinit/tests/utils/test_dhcp.py @@ -0,0 +1,155 @@ +# Copyright 2014 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import netifaces +import struct +import socket +import unittest + +from cloudbaseinit.utils import dhcp +from oslo.config import cfg + +CONF = cfg.CONF + + +class DHCPUtilsTests(unittest.TestCase): + + def test_get_dhcp_request_data(self): + data = b'\x01' + data += b'\x01' + data += b'\x06' + data += b'\x00' + data += struct.pack('!L', 9999) + data += b'\x00\x00' + data += b'\x00\x00' + data += b'\x00\x00\x00\x00' + data += b'\x00\x00\x00\x00' + data += b'\x00\x00\x00\x00' + data += b'\x00\x00\x00\x00' + data += 'fake mac address' + data += b'\x00' * 10 + data += b'\x00' * 64 + data += b'\x00' * 128 + data += dhcp._DHCP_COOKIE + data += b'\x35\x01\x01' + data += b'\x3c' + struct.pack('b', + len('fake id')) + 'fake id'.encode( + 'ascii') + data += b'\x3d\x07\x01' + 'fake mac address' + data += b'\x37' + struct.pack('b', len([100])) + data += struct.pack('b', 100) + data += dhcp._OPTION_END + + response = dhcp._get_dhcp_request_data( + id_req=9999, mac_address_b='fake mac address', + requested_options=[100], vendor_id='fake id') + self.assertEqual(response, data) + + @mock.patch('struct.unpack') + def _test_parse_dhcp_reply(self, mock_unpack, message_type, + id_reply, equals_cookie): + fake_data = 236 * "1" + if equals_cookie: + fake_data += dhcp._DHCP_COOKIE + '11' + else: + fake_data += '111111' + fake_data += 'fake' + fake_data += dhcp._OPTION_END + + mock_unpack.side_effect = [(message_type, None), (id_reply, None), + (100, None), (4, None)] + + response = dhcp._parse_dhcp_reply(data=fake_data, id_req=9999) + print mock_unpack.mock_calls + + if message_type != 2: + self.assertEqual(response, (False, {})) + elif id_reply != 9999: + self.assertEqual(response, (False, {})) + elif fake_data[236:240] != dhcp._DHCP_COOKIE: + self.assertEqual(response, (False, {})) + else: + self.assertEqual(response, (True, {100: 'fake'})) + + def test_parse_dhcp_reply(self): + self._test_parse_dhcp_reply(message_type=2, id_reply=9999, + equals_cookie=True) + + def test_parse_dhcp_reply_other_message_type(self): + self._test_parse_dhcp_reply(message_type=3, id_reply=9999, + equals_cookie=True) + + def test_parse_dhcp_reply_other_reply(self): + self._test_parse_dhcp_reply(message_type=3, id_reply=111, + equals_cookie=True) + + def test_parse_dhcp_reply_other_than_cookie(self): + self._test_parse_dhcp_reply(message_type=3, id_reply=111, + equals_cookie=False) + + @mock.patch('netifaces.ifaddresses') + @mock.patch('netifaces.interfaces') + def test_get_mac_address_by_local_ip(self, mock_interfaces, + mock_ifaddresses): + fake_addresses = {} + fake_addresses[netifaces.AF_INET] = [{'addr': 'fake address'}] + fake_addresses[netifaces.AF_LINK] = [{'addr': 'fake mac'}] + + mock_interfaces.return_value = ['fake interface'] + mock_ifaddresses.return_value = fake_addresses + + response = dhcp._get_mac_address_by_local_ip('fake address') + + mock_interfaces.assert_called_once_with() + mock_ifaddresses.assert_called_once_with('fake interface') + self.assertEqual(response, + fake_addresses[netifaces.AF_LINK][0]['addr']) + + @mock.patch('random.randint') + @mock.patch('socket.socket') + @mock.patch('cloudbaseinit.utils.dhcp._get_mac_address_by_local_ip') + @mock.patch('cloudbaseinit.utils.dhcp._get_dhcp_request_data') + @mock.patch('cloudbaseinit.utils.dhcp._parse_dhcp_reply') + def test_get_dhcp_options(self, mock_parse_dhcp_reply, + mock_get_dhcp_request_data, + mock_get_mac_address_by_local_ip, mock_socket, + mock_randint): + mock_randint.return_value = 'fake int' + mock_socket().getsockname.return_value = ['fake local ip'] + mock_get_mac_address_by_local_ip.return_value = 'fake mac' + mock_get_dhcp_request_data.return_value = 'fake data' + mock_parse_dhcp_reply.return_value = (True, 'fake replied options') + + response = dhcp.get_dhcp_options( + dhcp_host='fake host', requested_options=['fake option']) + + mock_randint.assert_called_once_with(0, 2 ** 32 - 1) + mock_socket.assert_called_with(socket.AF_INET, socket.SOCK_DGRAM) + mock_socket().bind.assert_called_once_with(('', 68)) + mock_socket().settimeout.assert_called_once_with(5) + mock_socket().connect.assert_called_once_with(('fake host', 67)) + mock_socket().getsockname.assert_called_once_with() + mock_get_mac_address_by_local_ip.assert_called_once_with( + 'fake local ip') + mock_get_dhcp_request_data.assert_called_once_with('fake int', + 'fake mac', + ['fake option'], + 'cloudbase-init') + mock_socket().send.assert_called_once_with('fake data') + mock_socket().recv.assert_called_once_with(1024) + mock_parse_dhcp_reply.assert_called_once_with(mock_socket().recv(), + 'fake int') + mock_socket().close.assert_called_once_with() + self.assertEqual(response, 'fake replied options')