# Copyright 2014 Cloudbase Solutions Srl # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import importlib import itertools import os import unittest try: import unittest.mock as mock except ImportError: import mock from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import exception from cloudbaseinit.tests import testutils CONF = cloudbaseinit_conf.CONF OPEN = mock.mock_open() class TestWindowsConfigDriveManager(unittest.TestCase): def setUp(self): module_path = "cloudbaseinit.metadata.services.osconfigdrive.windows" mock_ctypes = mock.MagicMock() mock_ctypes.wintypes = mock.MagicMock() self._module_patcher = mock.patch.dict( 'sys.modules', {'disk': mock.Mock(), 'ctypes': mock_ctypes, 'winioctlcon': mock.Mock()}) self._module_patcher.start() self.addCleanup(self._module_patcher.stop) self.conf_module = importlib.import_module(module_path) self.conf_module.osutils_factory = mock.Mock() self.conf_module.disk.Disk = mock.MagicMock() self.conf_module.tempfile = mock.Mock() self.mock_gettempdir = self.conf_module.tempfile.gettempdir self.mock_gettempdir.return_value = "tempdir" self.conf_module.uuid = mock.Mock() self.mock_uuid4 = self.conf_module.uuid.uuid4 self.mock_uuid4.return_value = "uuid" self._config_manager = self.conf_module.WindowsConfigDriveManager() self.addCleanup(os.rmdir, self._config_manager.target_path) self.osutils = mock.Mock() self._config_manager._osutils = self.osutils self.snatcher = testutils.LogSnatcher(module_path) @mock.patch('os.path.exists') def _test_check_for_config_drive(self, mock_exists, exists=True, label="config-2", fail=False): drive = "C:\\" self.osutils.get_volume_label.return_value = label mock_exists.return_value = exists with self.snatcher: response = self._config_manager._check_for_config_drive(drive) self.osutils.get_volume_label.assert_called_once_with(drive) if exists and not fail: self.assertEqual(["Config Drive found on C:\\"], self.snatcher.output) self.assertEqual(not fail, response) def test_check_for_config_drive_exists(self): self._test_check_for_config_drive() def test_check_for_config_drive_exists_upper_label(self): self._test_check_for_config_drive(label="CONFIG-2") def test_check_for_config_drive_missing(self): self._test_check_for_config_drive(exists=False, fail=True) def test_check_for_config_drive_wrong_label(self): self._test_check_for_config_drive(label="config-3", fail=True) def _test_get_iso_file_size(self, fixed=True, small=False, found_iso=True): device = mock.Mock() device.fixed = fixed device.size = (self.conf_module.OFFSET_BLOCK_SIZE + self.conf_module.PEEK_SIZE + int(not small)) iso_id = self.conf_module.ISO_ID if not found_iso: iso_id = b"pwned" iso_off = self.conf_module.OFFSET_ISO_ID - 1 volume_off = self.conf_module.OFFSET_VOLUME_SIZE - 1 block_off = self.conf_module.OFFSET_BLOCK_SIZE - 1 volume_bytes = b'd\x00' # 100 block_bytes = b'\x00\x02' # 512 device.seek.side_effect = [iso_off, volume_off, block_off] device.read.side_effect = [iso_id, volume_bytes, block_bytes] response = self._config_manager._get_iso_file_size(device) if not fixed or small or not found_iso: self.assertIsNone(response) return seek_calls = [ mock.call(self.conf_module.OFFSET_ISO_ID), mock.call(self.conf_module.OFFSET_VOLUME_SIZE), mock.call(self.conf_module.OFFSET_BLOCK_SIZE)] read_calls = [ mock.call(len(iso_id), skip=self.conf_module.OFFSET_ISO_ID - iso_off), mock.call(self.conf_module.PEEK_SIZE, skip=self.conf_module.OFFSET_VOLUME_SIZE - volume_off), mock.call(self.conf_module.PEEK_SIZE, skip=self.conf_module.OFFSET_BLOCK_SIZE - block_off)] device.seek.assert_has_calls(seek_calls) device.read.assert_has_calls(read_calls) self.assertEqual(100 * 512, response) def test_get_iso_file_size_not_fixed(self): self._test_get_iso_file_size(fixed=False) def test_get_iso_file_size_small(self): self._test_get_iso_file_size(small=True) def test_get_iso_file_size_not_found(self): self._test_get_iso_file_size(found_iso=False) def test_get_iso_file_size(self): self._test_get_iso_file_size() @mock.patch("six.moves.builtins.open", new=OPEN) def test_write_iso_file(self): file_path = "fake\\path" file_size = 100 * 512 sector_size = self.conf_module.MAX_SECTOR_SIZE offsets = list(range(0, file_size, sector_size)) remain = file_size % sector_size reads = ([b"\x00" * sector_size] * (len(offsets) - int(bool(remain))) + ([b"\x00" * remain] if remain else [])) device = mock.Mock() device_seek_calls = [mock.call(off) for off in offsets] device_read_calls = [ mock.call(min(sector_size, file_size - off), skip=0) for off in offsets] stream_write_calls = [mock.call(read) for read in reads] device.seek.side_effect = offsets device.read.side_effect = reads self._config_manager._write_iso_file(device, file_path, file_size) device.seek.assert_has_calls(device_seek_calls) device.read.assert_has_calls(device_read_calls) OPEN.return_value.write.assert_has_calls(stream_write_calls) def _test_extract_files_from_iso(self, exit_code): fake_path = os.path.join('fake', 'path') fake_target_path = os.path.join(fake_path, 'target') self._config_manager.target_path = fake_target_path args = [CONF.bsdtar_path, '-xf', fake_path, '-C', fake_target_path] self.osutils.execute_process.return_value = ('fake out', 'fake err', exit_code) if exit_code: self.assertRaises(exception.CloudbaseInitException, self._config_manager._extract_files_from_iso, fake_path) else: self._config_manager._extract_files_from_iso(fake_path) self.osutils.execute_process.assert_called_once_with(args, False) def test_extract_files_from_iso(self): self._test_extract_files_from_iso(exit_code=0) def test_extract_files_from_iso_fail(self): self._test_extract_files_from_iso(exit_code=1) @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager._extract_files_from_iso') @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager._write_iso_file') @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager._get_iso_file_size') def _test_extract_iso_from_devices(self, mock_get_iso_file_size, mock_write_iso_file, mock_extract_files_from_iso, found=True): # For every device (mock) in the list of available devices: # first - skip (no size) # second - error (throws Exception) # third - extract (is ok) # fourth - unreachable (already found ok device) size = 100 * 512 devices = [mock.MagicMock() for _ in range(4)] devices[1].__enter__.side_effect = [Exception] rest = [size] if found else [None] mock_get_iso_file_size.side_effect = [None] + rest * 2 file_path = os.path.join("tempdir", "uuid.iso") with self.snatcher: response = self._config_manager._extract_iso_from_devices(devices) self.mock_gettempdir.assert_called_once_with() mock_get_iso_file_size.assert_has_calls([ mock.call(devices[0]), mock.call(devices[2])]) expected_log = [ "ISO extraction failed on %(device)s with %(error)r" % {"device": devices[1], "error": Exception()}] if found: mock_write_iso_file.assert_called_once_with(devices[2], file_path, size) mock_extract_files_from_iso.assert_called_once_with(file_path) expected_log.append("ISO9660 disk found on %s" % devices[2]) self.assertEqual(expected_log, self.snatcher.output) self.assertEqual(found, response) def test_extract_iso_from_devices_not_found(self): self._test_extract_iso_from_devices(found=False) def test_extract_iso_from_devices(self): self._test_extract_iso_from_devices() @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager.' '_check_for_config_drive') @mock.patch('shutil.copytree') @mock.patch('os.rmdir') def _test_get_config_drive_from_cdrom_drive(self, mock_os_rmdir, mock_copytree, mock_check_for_config_drive, found=True): drives = ["C:\\", "M:\\", "I:\\", "N:\\"] self.osutils.get_cdrom_drives.return_value = drives checks = [False, False, True, False] if not found: checks[2] = False mock_check_for_config_drive.side_effect = checks response = self._config_manager._get_config_drive_from_cdrom_drive() self.osutils.get_cdrom_drives.assert_called_once_with() idx = 3 if found else 4 check_calls = [mock.call(drive) for drive in drives[:idx]] mock_check_for_config_drive.assert_has_calls(check_calls) if found: mock_os_rmdir.assert_called_once_with( self._config_manager.target_path) mock_copytree.assert_called_once_with( drives[2], self._config_manager.target_path) self.assertEqual(found, response) def test_get_config_drive_from_cdrom_drive_not_found(self): self._test_get_config_drive_from_cdrom_drive(found=False) def test_get_config_drive_from_cdrom_drive(self): self._test_get_config_drive_from_cdrom_drive() @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager.' '_extract_iso_from_devices') @mock.patch("six.moves.builtins.map") def test_get_config_drive_from_raw_hdd(self, mock_map, mock_extract_iso_from_devices): Disk = self.conf_module.disk.Disk paths = [mock.Mock() for _ in range(3)] self.osutils.get_physical_disks.return_value = paths mock_extract_iso_from_devices.return_value = True response = self._config_manager._get_config_drive_from_raw_hdd() mock_map.assert_called_once_with(Disk, paths) self.osutils.get_physical_disks.assert_called_once_with() mock_extract_iso_from_devices.assert_called_once_with( mock_map.return_value) self.assertTrue(response) @mock.patch('cloudbaseinit.utils.windows.vfat.copy_from_vfat_drive') @mock.patch('cloudbaseinit.utils.windows.vfat.is_vfat_drive') def test_get_config_drive_from_vfat(self, mock_is_vfat_drive, mock_copy_from_vfat_drive): self.osutils.get_physical_disks.return_value = ( mock.sentinel.drive1, mock.sentinel.drive2, ) mock_is_vfat_drive.side_effect = (None, True) with testutils.LogSnatcher('cloudbaseinit.metadata.services.' 'osconfigdrive.windows') as snatcher: response = self._config_manager._get_config_drive_from_vfat() self.assertTrue(response) self.osutils.get_physical_disks.assert_called_once_with() expected_is_vfat_calls = [ mock.call(self.osutils, mock.sentinel.drive1), mock.call(self.osutils, mock.sentinel.drive2), ] self.assertEqual(expected_is_vfat_calls, mock_is_vfat_drive.mock_calls) mock_copy_from_vfat_drive.assert_called_once_with( self.osutils, mock.sentinel.drive2, self._config_manager.target_path) expected_logging = [ 'Config Drive found on disk %r' % mock.sentinel.drive2, ] self.assertEqual(expected_logging, snatcher.output) @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager.' '_extract_iso_from_devices') def _test_get_config_drive_from_partition(self, mock_extract_iso_from_devices, found=True): paths = [mock.Mock() for _ in range(3)] self.osutils.get_physical_disks.return_value = paths disks = list(map(self.conf_module.disk.Disk, paths)) mock_extract_iso_from_devices.side_effect = [False, found, found] idx = 3 - int(found) extract_calls = [mock.call(disk.partitions()) for disk in disks[:idx]] response = self._config_manager._get_config_drive_from_partition() self.osutils.get_physical_disks.assert_called_once_with() mock_extract_iso_from_devices.assert_has_calls(extract_calls) self.assertEqual(found, response) def test_get_config_drive_from_partition_not_found(self): self._test_get_config_drive_from_partition(found=False) def test_get_config_drive_from_partition(self): self._test_get_config_drive_from_partition() @mock.patch('os.rmdir') @mock.patch('shutil.copytree') @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager.' '_check_for_config_drive') def _test_get_config_drive_from_volume(self, mock_check_for_config_drive, mock_copytree, mock_os_rmdir, found=True): volumes = [mock.Mock() for _ in range(3)] self.osutils.get_volumes.return_value = volumes checks = [False, found, found] mock_check_for_config_drive.side_effect = checks idx = 3 - int(found) check_calls = [mock.call(volume) for volume in volumes[:idx]] response = self._config_manager._get_config_drive_from_volume() self.osutils.get_volumes.assert_called_once_with() mock_check_for_config_drive.assert_has_calls(check_calls) if found: mock_os_rmdir.assert_called_once_with( self._config_manager.target_path) mock_copytree.assert_called_once_with( volumes[1], self._config_manager.target_path) self.assertEqual(found, response) def test_get_config_drive_from_volume_not_found(self): self._test_get_config_drive_from_volume(found=False) def test_get_config_drive_from_volume(self): self._test_get_config_drive_from_volume() def _test__get_config_drive_files(self, cd_type, cd_location, func, found=True): response = self._config_manager._get_config_drive_files(cd_type, cd_location) if found: if func: func.assert_called_once_with() self.assertEqual(func.return_value, response) else: self.assertFalse(response) def test__get_config_drive_files_not_found(self): self._test__get_config_drive_files(None, None, None, found=False) @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager.' '_get_config_drive_from_cdrom_drive') def test__get_config_drive_files_cdrom_iso(self, func): self._test__get_config_drive_files( "iso", "cdrom", func) @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager.' '_get_config_drive_from_cdrom_drive') def test__get_config_drive_files_cdrom_iso_failed(self, func): func.side_effect = Exception self._test__get_config_drive_files( "iso", "cdrom", func, found=False) def test__get_config_drive_files_cdrom_vfat(self): self._test__get_config_drive_files( "vfat", "cdrom", None) @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager.' '_get_config_drive_from_raw_hdd') def test__get_config_drive_files_hdd_iso(self, func): self._test__get_config_drive_files( "iso", "hdd", func) @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager.' '_get_config_drive_from_vfat') def test__get_config_drive_files_hdd_vfat(self, func): self._test__get_config_drive_files( "vfat", "hdd", func) @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager.' '_get_config_drive_from_partition') def test__get_config_drive_files_partition_iso(self, func): self._test__get_config_drive_files( "iso", "partition", func) @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager.' '_get_config_drive_from_volume') def test__get_config_drive_files_partition_vfat(self, func): self._test__get_config_drive_files( "vfat", "partition", func) @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager.' '_get_config_drive_files') def _test_get_config_drive_files(self, mock_get_config_drive_files, found=True): check_types = ["iso", "vfat"] if found else [] check_locations = ["cdrom", "hdd", "partition"] product = list(itertools.product(check_types, check_locations)) product_calls = [mock.call(cd_type, cd_location) for cd_type, cd_location in product] mock_get_config_drive_files.side_effect = \ [False] * (len(product_calls) - 1) + [True] expected_log = ["Looking for Config Drive %(type)s in %(location)s" % {"type": cd_type, "location": cd_location} for cd_type, cd_location in product] with self.snatcher: response = self._config_manager.get_config_drive_files( check_types, check_locations) mock_get_config_drive_files.assert_has_calls(product_calls) self.assertEqual(expected_log, self.snatcher.output) self.assertEqual(found, response) def test_get_config_drive_files_not_found(self): self._test_get_config_drive_files(found=False) def test_get_config_drive_files(self): self._test_get_config_drive_files()