Ironic config drive support

Search through partitions (containing raw ISO bytes) and volumes when
looking for a configuration drive. This commit implies the following:
    1. New config options are used for choosing the possible config drive
       paths (`config_drive_locations`) and the types the service will search
       for (`config_drive_types`). The old options are still available and
       marked as deprecated.
    2. The configdrive plugin was intensively refactored and size computation,
       parsing and ISO extraction bugs were fixed. The plugin will search in
       locations like cdrom, hard disks or partitions for metadata content or
       raw ISO bytes. Also, is using the `disk` windows utility for reading
       disks and listing partitions.
    3. A new method, `get_volumes`, was added in osutils
       for listing all the volumes.
    4. Removed dead code virtual_disk.py and disk.py (physical_disk.py)
       was remade from scratch.
    5. The ability to handle partitions within a disk for reading purposes
       and related bugs fixed:
        a. Wrong INVALID_HANDLE_VALUE (-1 in Python isn't the unsigned -1 of C)
        b. Erroneous geometry computations in Py3 ("/" lead to float)
        c. Comparing string with bytes in Py3
        d. High risk of IndexErrors because of the insufficient buffer reads
           relying on standard block sector sizes.

Change-Id: Ic3a5ef1ee81c694e41fc7a22abe63b0154f51065
This commit is contained in:
Cosmin Poieana 2015-08-10 19:38:51 +03:00
parent db951648ca
commit 9ed4705fd6
11 changed files with 1305 additions and 1130 deletions

View File

@ -14,23 +14,47 @@
import os
import shutil
import tempfile
import uuid
from oslo_config import cfg
from oslo_log import log as oslo_logging
from cloudbaseinit import exception
from cloudbaseinit.metadata.services import base
from cloudbaseinit.metadata.services import baseopenstackservice
from cloudbaseinit.metadata.services.osconfigdrive import factory
# Config Drive types and possible locations.
CD_TYPES = {
"vfat", # Visible device (with partition table).
"iso", # "Raw" format containing ISO bytes.
}
CD_LOCATIONS = {
# Look into optical units devices. Only an ISO format could
# be used here (vfat ignored).
"cdrom",
# Search through physical disks for raw ISO content or vfat filesystems
# containing configuration drive's content.
"hdd",
# Search through partitions for raw ISO content or through volumes
# containing configuration drive's content.
"partition",
}
opts = [
cfg.BoolOpt('config_drive_raw_hhd', default=True,
help='Look for an ISO config drive in raw HDDs'),
help='Look for an ISO config drive in raw HDDs',
deprecated_for_removal=True),
cfg.BoolOpt('config_drive_cdrom', default=True,
help='Look for a config drive in the attached cdrom drives'),
help='Look for a config drive in the attached cdrom drives',
deprecated_for_removal=True),
cfg.BoolOpt('config_drive_vfat', default=True,
help='Look for a config drive in VFAT filesystems.'),
help='Look for a config drive in VFAT filesystems',
deprecated_for_removal=True),
cfg.ListOpt('config_drive_types', default=list(CD_TYPES),
help='Supported formats of a configuration drive'),
cfg.ListOpt('config_drive_locations', default=list(CD_LOCATIONS),
help='Supported configuration drive locations'),
]
CONF = cfg.CONF
@ -45,33 +69,52 @@ class ConfigDriveService(baseopenstackservice.BaseOpenStackService):
super(ConfigDriveService, self).__init__()
self._metadata_path = None
def _preprocess_options(self):
self._searched_types = set(CONF.config_drive_types)
self._searched_locations = set(CONF.config_drive_locations)
# Deprecation backward compatibility.
if CONF.config_drive_raw_hhd:
self._searched_types.add("iso")
self._searched_locations.add("hdd")
if CONF.config_drive_cdrom:
self._searched_types.add("iso")
self._searched_locations.add("cdrom")
if CONF.config_drive_vfat:
self._searched_types.add("vfat")
self._searched_locations.add("hdd")
# Check for invalid option values.
if self._searched_types | CD_TYPES != CD_TYPES:
raise exception.CloudbaseInitException(
"Invalid Config Drive types %s", self._searched_types)
if self._searched_locations | CD_LOCATIONS != CD_LOCATIONS:
raise exception.CloudbaseInitException(
"Invalid Config Drive locations %s", self._searched_locations)
def load(self):
super(ConfigDriveService, self).load()
target_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
self._preprocess_options()
self._mgr = factory.get_config_drive_manager()
found = self._mgr.get_config_drive_files(
searched_types=self._searched_types,
searched_locations=self._searched_locations)
mgr = factory.get_config_drive_manager()
found = mgr.get_config_drive_files(
target_path,
check_raw_hhd=CONF.config_drive_raw_hhd,
check_cdrom=CONF.config_drive_cdrom,
check_vfat=CONF.config_drive_vfat)
if found:
self._metadata_path = target_path
LOG.debug('Metadata copied to folder: \'%s\'' %
self._metadata_path)
self._metadata_path = self._mgr.target_path
LOG.debug('Metadata copied to folder: %r', self._metadata_path)
return found
def _get_data(self, path):
norm_path = os.path.normpath(os.path.join(self._metadata_path, path))
try:
with open(norm_path, 'rb') as f:
return f.read()
with open(norm_path, 'rb') as stream:
return stream.read()
except IOError:
raise base.NotExistingMetadataException()
def cleanup(self):
if self._metadata_path:
LOG.debug('Deleting metadata folder: \'%s\'' % self._metadata_path)
shutil.rmtree(self._metadata_path, ignore_errors=True)
self._metadata_path = None
LOG.debug('Deleting metadata folder: %r', self._mgr.target_path)
shutil.rmtree(self._mgr.target_path, ignore_errors=True)
self._metadata_path = None

View File

@ -13,6 +13,7 @@
# under the License.
import abc
import tempfile
import six
@ -20,7 +21,9 @@ import six
@six.add_metaclass(abc.ABCMeta)
class BaseConfigDriveManager(object):
def __init__(self):
self.target_path = tempfile.mkdtemp()
@abc.abstractmethod
def get_config_drive_files(self, target_path, check_raw_hhd=True,
check_cdrom=True, check_vfat=True):
def get_config_drive_files(self, check_types=None, check_locations=None):
pass

View File

@ -12,13 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import itertools
import os
import shutil
import struct
import tempfile
import uuid
from ctypes import wintypes
from oslo_config import cfg
from oslo_log import log as oslo_logging
@ -28,6 +29,7 @@ from cloudbaseinit.osutils import factory as osutils_factory
from cloudbaseinit.utils.windows import disk
from cloudbaseinit.utils.windows import vfat
opts = [
cfg.StrOpt('bsdtar_path', default='bsdtar.exe',
help='Path to "bsdtar", used to extract ISO ConfigDrive '
@ -39,83 +41,73 @@ CONF.register_opts(opts)
LOG = oslo_logging.getLogger(__name__)
CONFIG_DRIVE_LABEL = 'config-2'
MAX_SECTOR_SIZE = 4096
# Absolute offset values and the ISO magic string.
OFFSET_BOOT_RECORD = 0x8000
OFFSET_ISO_ID = OFFSET_BOOT_RECORD + 1
ISO_ID = b'CD001'
# Little-endian unsigned short size values.
OFFSET_VOLUME_SIZE = OFFSET_BOOT_RECORD + 80
OFFSET_BLOCK_SIZE = OFFSET_BOOT_RECORD + 128
PEEK_SIZE = 2
class WindowsConfigDriveManager(base.BaseConfigDriveManager):
def _get_config_drive_cdrom_mount_point(self):
osutils = osutils_factory.get_os_utils()
def __init__(self):
super(WindowsConfigDriveManager, self).__init__()
self._osutils = osutils_factory.get_os_utils()
for drive in osutils.get_cdrom_drives():
label = osutils.get_volume_label(drive)
if label == "config-2" and \
os.path.exists(os.path.join(drive,
'openstack\\latest\\'
'meta_data.json')):
return drive
return None
def _check_for_config_drive(self, drive):
label = self._osutils.get_volume_label(drive)
if label and label.lower() == CONFIG_DRIVE_LABEL and \
os.path.exists(os.path.join(drive,
'openstack\\latest\\'
'meta_data.json')):
LOG.info('Config Drive found on %s', drive)
return True
return False
def _c_char_array_to_c_ushort(self, buf, offset):
low = ctypes.cast(buf[offset],
ctypes.POINTER(wintypes.WORD)).contents
high = ctypes.cast(buf[offset + 1],
ctypes.POINTER(wintypes.WORD)).contents
return (high.value << 8) + low.value
def _get_iso_disk_size(self, phys_disk):
geom = phys_disk.get_geometry()
if geom.MediaType != disk.Win32_DiskGeometry.FixedMedia:
def _get_iso_file_size(self, device):
if not device.fixed:
return None
disk_size = geom.Cylinders * geom.TracksPerCylinder * \
geom.SectorsPerTrack * geom.BytesPerSector
boot_record_off = 0x8000
id_off = 1
volume_size_off = 80
block_size_off = 128
iso_id = b'CD001'
offset = boot_record_off // geom.BytesPerSector * geom.BytesPerSector
bytes_to_read = geom.BytesPerSector
if disk_size <= offset + bytes_to_read:
if not device.size > (OFFSET_BLOCK_SIZE + PEEK_SIZE):
return None
phys_disk.seek(offset)
(buf, bytes_read) = phys_disk.read(bytes_to_read)
buf_off = boot_record_off - offset + id_off
if iso_id != buf[buf_off: buf_off + len(iso_id)]:
off = device.seek(OFFSET_ISO_ID)
magic = device.read(len(ISO_ID), skip=OFFSET_ISO_ID - off)
if ISO_ID != magic:
return None
buf_off = boot_record_off - offset + volume_size_off
num_blocks = self._c_char_array_to_c_ushort(buf, buf_off)
off = device.seek(OFFSET_VOLUME_SIZE)
volume_size_bytes = device.read(PEEK_SIZE,
skip=OFFSET_VOLUME_SIZE - off)
off = device.seek(OFFSET_BLOCK_SIZE)
block_size_bytes = device.read(PEEK_SIZE,
skip=OFFSET_BLOCK_SIZE - off)
volume_size = struct.unpack("<H", volume_size_bytes)[0]
block_size = struct.unpack("<H", block_size_bytes)[0]
buf_off = boot_record_off - offset + block_size_off
block_size = self._c_char_array_to_c_ushort(buf, buf_off)
return volume_size * block_size
return num_blocks * block_size
def _write_iso_file(self, phys_disk, path, iso_file_size):
with open(path, 'wb') as f:
geom = phys_disk.get_geometry()
def _write_iso_file(self, device, iso_file_path, iso_file_size):
with open(iso_file_path, 'wb') as stream:
offset = 0
# Get a multiple of the sector byte size
bytes_to_read = 4096 // geom.BytesPerSector * geom.BytesPerSector
# Read multiples of the sector size bytes
# until the entire ISO content is written.
while offset < iso_file_size:
phys_disk.seek(offset)
bytes_to_read = min(bytes_to_read, iso_file_size - offset)
(buf, bytes_read) = phys_disk.read(bytes_to_read)
f.write(buf)
offset += bytes_read
real_offset = device.seek(offset)
bytes_to_read = min(MAX_SECTOR_SIZE, iso_file_size - offset)
data = device.read(bytes_to_read, skip=offset - real_offset)
stream.write(data)
offset += bytes_to_read
def _extract_iso_files(self, osutils, iso_file_path, target_path):
os.makedirs(target_path)
args = [CONF.bsdtar_path, '-xf', iso_file_path, '-C', target_path]
(out, err, exit_code) = osutils.execute_process(args, False)
def _extract_files_from_iso(self, iso_file_path):
args = [CONF.bsdtar_path, '-xf', iso_file_path,
'-C', self.target_path]
(out, err, exit_code) = self._osutils.execute_process(args, False)
if exit_code:
raise exception.CloudbaseInitException(
@ -125,72 +117,104 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager):
'exit_code': exit_code,
'out': out, 'err': err})
def _extract_iso_disk_file(self, osutils, iso_file_path):
iso_disk_found = False
for path in osutils.get_physical_disks():
phys_disk = disk.PhysicalDisk(path)
try:
phys_disk.open()
iso_file_size = self._get_iso_disk_size(phys_disk)
if iso_file_size:
LOG.debug('ISO9660 disk found on raw HDD: %s' % path)
self._write_iso_file(phys_disk, iso_file_path,
iso_file_size)
iso_disk_found = True
break
except Exception:
# Ignore exception
pass
finally:
phys_disk.close()
return iso_disk_found
def _get_conf_drive_from_vfat(self, target_path):
osutils = osutils_factory.get_os_utils()
for drive_path in osutils.get_physical_disks():
if vfat.is_vfat_drive(osutils, drive_path):
LOG.info('Config Drive found on disk %r', drive_path)
os.makedirs(target_path)
vfat.copy_from_vfat_drive(osutils, drive_path, target_path)
return True
def get_config_drive_files(self, target_path, check_raw_hhd=True,
check_cdrom=True, check_vfat=True):
config_drive_found = False
if check_vfat:
LOG.debug('Looking for Config Drive in VFAT filesystems')
config_drive_found = self._get_conf_drive_from_vfat(target_path)
if not config_drive_found and check_raw_hhd:
LOG.debug('Looking for Config Drive in raw HDDs')
config_drive_found = self._get_conf_drive_from_raw_hdd(
target_path)
if not config_drive_found and check_cdrom:
LOG.debug('Looking for Config Drive in cdrom drives')
config_drive_found = self._get_conf_drive_from_cdrom_drive(
target_path)
return config_drive_found
def _get_conf_drive_from_cdrom_drive(self, target_path):
cdrom_mount_point = self._get_config_drive_cdrom_mount_point()
if cdrom_mount_point:
shutil.copytree(cdrom_mount_point, target_path)
return True
return False
def _get_conf_drive_from_raw_hdd(self, target_path):
config_drive_found = False
def _extract_iso_from_devices(self, devices):
"""Search across multiple devices for a raw ISO."""
extracted = False
iso_file_path = os.path.join(tempfile.gettempdir(),
str(uuid.uuid4()) + '.iso')
try:
osutils = osutils_factory.get_os_utils()
if self._extract_iso_disk_file(osutils, iso_file_path):
self._extract_iso_files(osutils, iso_file_path, target_path)
config_drive_found = True
finally:
if os.path.exists(iso_file_path):
os.remove(iso_file_path)
return config_drive_found
for device in devices:
try:
with device:
iso_file_size = self._get_iso_file_size(device)
if iso_file_size:
LOG.info('ISO9660 disk found on %s', device)
self._write_iso_file(device, iso_file_path,
iso_file_size)
self._extract_files_from_iso(iso_file_path)
extracted = True
break
except Exception as exc:
LOG.warning('ISO extraction failed on %(device)s with '
'%(error)r', {"device": device, "error": exc})
if os.path.isfile(iso_file_path):
os.remove(iso_file_path)
return extracted
def _get_config_drive_from_cdrom_drive(self):
for drive_letter in self._osutils.get_cdrom_drives():
if self._check_for_config_drive(drive_letter):
os.rmdir(self.target_path)
shutil.copytree(drive_letter, self.target_path)
return True
return False
def _get_config_drive_from_raw_hdd(self):
disks = map(disk.Disk, self._osutils.get_physical_disks())
return self._extract_iso_from_devices(disks)
def _get_config_drive_from_vfat(self):
for drive_path in self._osutils.get_physical_disks():
if vfat.is_vfat_drive(self._osutils, drive_path):
LOG.info('Config Drive found on disk %r', drive_path)
vfat.copy_from_vfat_drive(self._osutils, drive_path,
self.target_path)
return True
return False
def _get_config_drive_from_partition(self):
for disk_path in self._osutils.get_physical_disks():
physical_drive = disk.Disk(disk_path)
with physical_drive:
partitions = physical_drive.partitions()
extracted = self._extract_iso_from_devices(partitions)
if extracted:
return True
return False
def _get_config_drive_from_volume(self):
"""Look through all the volumes for config drive."""
volumes = self._osutils.get_volumes()
for volume in volumes:
if self._check_for_config_drive(volume):
os.rmdir(self.target_path)
shutil.copytree(volume, self.target_path)
return True
return False
def _get_config_drive_files(self, cd_type, cd_location):
get_config_drive = self.config_drive_type_location.get(
"{}_{}".format(cd_location, cd_type))
if get_config_drive:
return get_config_drive()
else:
LOG.debug("Irrelevant type %(type)s in %(location)s location; "
"skip",
{"type": cd_type, "location": cd_location})
return False
def get_config_drive_files(self, searched_types=None,
searched_locations=None):
searched_types = searched_types or []
searched_locations = searched_locations or []
for cd_type, cd_location in itertools.product(searched_types,
searched_locations):
LOG.debug('Looking for Config Drive %(type)s in %(location)s',
{"type": cd_type, "location": cd_location})
if self._get_config_drive_files(cd_type, cd_location):
return True
return False
@property
def config_drive_type_location(self):
return {
"cdrom_iso": self._get_config_drive_from_cdrom_drive,
"hdd_iso": self._get_config_drive_from_raw_hdd,
"hdd_vfat": self._get_config_drive_from_vfat,
"partition_iso": self._get_config_drive_from_partition,
"partition_vfat": self._get_config_drive_from_volume,
}

View File

@ -33,6 +33,7 @@ import wmi
from cloudbaseinit import exception
from cloudbaseinit.osutils import base
from cloudbaseinit.utils.windows import disk
from cloudbaseinit.utils.windows import network
from cloudbaseinit.utils.windows import privilege
from cloudbaseinit.utils.windows import timezone
@ -116,31 +117,10 @@ class Win32_OSVERSIONINFOEX_W(ctypes.Structure):
]
class GUID(ctypes.Structure):
_fields_ = [
("data1", ctypes.wintypes.DWORD),
("data2", ctypes.wintypes.WORD),
("data3", ctypes.wintypes.WORD),
("data4", ctypes.c_byte * 8)]
def __init__(self, l, w1, w2, b1, b2, b3, b4, b5, b6, b7, b8):
self.data1 = l
self.data2 = w1
self.data3 = w2
self.data4[0] = b1
self.data4[1] = b2
self.data4[2] = b3
self.data4[3] = b4
self.data4[4] = b5
self.data4[5] = b6
self.data4[6] = b7
self.data4[7] = b8
class Win32_SP_DEVICE_INTERFACE_DATA(ctypes.Structure):
_fields_ = [
('cbSize', wintypes.DWORD),
('InterfaceClassGuid', GUID),
('InterfaceClassGuid', disk.GUID),
('Flags', wintypes.DWORD),
('Reserved', ctypes.POINTER(wintypes.ULONG))
]
@ -218,7 +198,7 @@ iphlpapi.GetIpForwardTable.restype = wintypes.DWORD
Ws2_32.inet_ntoa.restype = ctypes.c_char_p
setupapi.SetupDiGetClassDevsW.argtypes = [ctypes.POINTER(GUID),
setupapi.SetupDiGetClassDevsW.argtypes = [ctypes.POINTER(disk.GUID),
wintypes.LPCWSTR,
wintypes.HANDLE,
wintypes.DWORD]
@ -227,7 +207,7 @@ setupapi.SetupDiGetClassDevsW.restype = wintypes.HANDLE
setupapi.SetupDiEnumDeviceInterfaces.argtypes = [
wintypes.HANDLE,
wintypes.LPVOID,
ctypes.POINTER(GUID),
ctypes.POINTER(disk.GUID),
wintypes.DWORD,
ctypes.POINTER(Win32_SP_DEVICE_INTERFACE_DATA)]
setupapi.SetupDiEnumDeviceInterfaces.restype = wintypes.BOOL
@ -250,8 +230,8 @@ VER_BUILDNUMBER = 4
VER_GREATER_EQUAL = 3
GUID_DEVINTERFACE_DISK = GUID(0x53f56307, 0xb6bf, 0x11d0, 0x94, 0xf2,
0x00, 0xa0, 0xc9, 0x1e, 0xfb, 0x8b)
GUID_DEVINTERFACE_DISK = disk.GUID(0x53f56307, 0xb6bf, 0x11d0, 0x94, 0xf2,
0x00, 0xa0, 0xc9, 0x1e, 0xfb, 0x8b)
class WindowsUtils(base.BaseOSUtils):
@ -466,7 +446,7 @@ class WindowsUtils(base.BaseOSUtils):
'Microsoft\\Windows NT\\CurrentVersion\\'
'ProfileList\\%s' % user_sid) as key:
return winreg.QueryValueEx(key, 'ProfileImagePath')[0]
LOG.debug('Home directory not found for user \'%s\'' % username)
LOG.debug('Home directory not found for user %r', username)
return None
def sanitize_shell_input(self, value):
@ -686,7 +666,7 @@ class WindowsUtils(base.BaseOSUtils):
break
time.sleep(1)
LOG.info('Waiting for sysprep completion. '
'GeneralizationState: %d' % gen_state)
'GeneralizationState: %d', gen_state)
except WindowsError as ex:
if ex.winerror == 2:
LOG.debug('Sysprep data not found in the registry, '
@ -722,7 +702,7 @@ class WindowsUtils(base.BaseOSUtils):
'ret_val': ret_val})
def start_service(self, service_name):
LOG.debug('Starting service %s' % service_name)
LOG.debug('Starting service %s', service_name)
service = self._get_service(service_name)
(ret_val,) = service.StartService()
if ret_val != 0:
@ -732,7 +712,7 @@ class WindowsUtils(base.BaseOSUtils):
'ret_val': ret_val})
def stop_service(self, service_name):
LOG.debug('Stopping service %s' % service_name)
LOG.debug('Stopping service %s', service_name)
service = self._get_service(service_name)
(ret_val,) = service.StopService()
if ret_val != 0:
@ -989,6 +969,33 @@ class WindowsUtils(base.BaseOSUtils):
return physical_disks
def get_volumes(self):
"""Retrieve a list with all the volumes found on all disks."""
volumes = []
volume = ctypes.create_unicode_buffer(chr(0) * self.MAX_PATH)
handle_volumes = kernel32.FindFirstVolumeW(volume, self.MAX_PATH)
if handle_volumes == self.INVALID_HANDLE_VALUE:
raise exception.WindowsCloudbaseInitException(
"FindFirstVolumeW failed: %r")
try:
while True:
volumes.append(volume.value)
found = kernel32.FindNextVolumeW(handle_volumes, volume,
self.MAX_PATH)
if not found:
errno = ctypes.GetLastError()
if errno == self.ERROR_NO_MORE_FILES:
break
else:
raise exception.WindowsCloudbaseInitException(
"FindNextVolumeW failed: %r")
finally:
kernel32.FindVolumeClose(handle_volumes)
return volumes
def _get_fw_protocol(self, protocol):
if protocol == self.PROTOCOL_TCP:
fw_protocol = self._FW_IP_PROTOCOL_TCP

View File

@ -12,7 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import itertools
import os
import unittest
@ -28,380 +30,266 @@ from cloudbaseinit.tests import testutils
CONF = cfg.CONF
OPEN = mock.mock_open()
class TestWindowsConfigDriveManager(unittest.TestCase):
def setUp(self):
self._ctypes_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict('sys.modules',
{'ctypes': self._ctypes_mock})
module_path = "cloudbaseinit.metadata.services.osconfigdrive.windows"
mock_ctypes = mock.MagicMock()
mock_ctypes.wintypes = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules',
{'disk': mock.Mock(),
'ctypes': mock_ctypes,
'winioctlcon': mock.Mock()})
self._module_patcher.start()
self.addCleanup(self._module_patcher.stop)
self.conf_module = importlib.import_module(module_path)
self.windows = importlib.import_module(
"cloudbaseinit.metadata.services.osconfigdrive.windows")
self.physical_disk = importlib.import_module(
"cloudbaseinit.utils.windows.disk")
self.conf_module.osutils_factory = mock.Mock()
self.conf_module.disk.Disk = mock.MagicMock()
self.conf_module.tempfile = mock.Mock()
self.mock_gettempdir = self.conf_module.tempfile.gettempdir
self.mock_gettempdir.return_value = "tempdir"
self.conf_module.uuid = mock.Mock()
self.mock_uuid4 = self.conf_module.uuid.uuid4
self.mock_uuid4.return_value = "uuid"
self._config_manager = self.conf_module.WindowsConfigDriveManager()
self.addCleanup(os.rmdir, self._config_manager.target_path)
self.osutils = mock.Mock()
self._config_manager._osutils = self.osutils
self.snatcher = testutils.LogSnatcher(module_path)
self.physical_disk.Win32_DiskGeometry = mock.MagicMock()
self.windows.disk.PhysicalDisk = mock.MagicMock()
self._config_manager = self.windows.WindowsConfigDriveManager()
def tearDown(self):
self._module_patcher.stop()
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
@mock.patch('os.path.exists')
def _test_get_config_drive_cdrom_mount_point(self, mock_join,
mock_get_os_utils, exists):
mock_osutils = mock.MagicMock()
mock_get_os_utils.return_value = mock_osutils
mock_osutils.get_cdrom_drives.return_value = ['fake drive']
mock_osutils.get_volume_label.return_value = 'config-2'
mock_join.return_value = exists
def _test_check_for_config_drive(self, mock_exists, exists=True,
label="config-2", fail=False):
drive = "C:\\"
self.osutils.get_volume_label.return_value = label
mock_exists.return_value = exists
response = self._config_manager._get_config_drive_cdrom_mount_point()
with self.snatcher:
response = self._config_manager._check_for_config_drive(drive)
mock_osutils.get_cdrom_drives.assert_called_once_with()
mock_osutils.get_volume_label.assert_called_once_with('fake drive')
self.osutils.get_volume_label.assert_called_once_with(drive)
if exists and not fail:
self.assertEqual(["Config Drive found on C:\\"],
self.snatcher.output)
self.assertEqual(not fail, response)
if exists:
self.assertEqual('fake drive', response)
else:
def test_check_for_config_drive_exists(self):
self._test_check_for_config_drive()
def test_check_for_config_drive_exists_upper_label(self):
self._test_check_for_config_drive(label="CONFIG-2")
def test_check_for_config_drive_missing(self):
self._test_check_for_config_drive(exists=False, fail=True)
def test_check_for_config_drive_wrong_label(self):
self._test_check_for_config_drive(label="config-3", fail=True)
def _test_get_iso_file_size(self, fixed=True, small=False,
found_iso=True):
device = mock.Mock()
device.fixed = fixed
device.size = (self.conf_module.OFFSET_BLOCK_SIZE +
self.conf_module.PEEK_SIZE + int(not small))
iso_id = self.conf_module.ISO_ID
if not found_iso:
iso_id = b"pwned"
iso_off = self.conf_module.OFFSET_ISO_ID - 1
volume_off = self.conf_module.OFFSET_VOLUME_SIZE - 1
block_off = self.conf_module.OFFSET_BLOCK_SIZE - 1
volume_bytes = b'd\x00' # 100
block_bytes = b'\x00\x02' # 512
device.seek.side_effect = [iso_off, volume_off, block_off]
device.read.side_effect = [iso_id, volume_bytes, block_bytes]
response = self._config_manager._get_iso_file_size(device)
if not fixed or small or not found_iso:
self.assertIsNone(response)
return
def test_get_config_drive_cdrom_mount_point_exists_true(self):
self._test_get_config_drive_cdrom_mount_point(exists=True)
seek_calls = [
mock.call(self.conf_module.OFFSET_ISO_ID),
mock.call(self.conf_module.OFFSET_VOLUME_SIZE),
mock.call(self.conf_module.OFFSET_BLOCK_SIZE)]
read_calls = [
mock.call(len(iso_id),
skip=self.conf_module.OFFSET_ISO_ID - iso_off),
mock.call(self.conf_module.PEEK_SIZE,
skip=self.conf_module.OFFSET_VOLUME_SIZE - volume_off),
mock.call(self.conf_module.PEEK_SIZE,
skip=self.conf_module.OFFSET_BLOCK_SIZE - block_off)]
device.seek.assert_has_calls(seek_calls)
device.read.assert_has_calls(read_calls)
self.assertEqual(100 * 512, response)
def test_get_config_drive_cdrom_mount_point_exists_false(self):
self._test_get_config_drive_cdrom_mount_point(exists=False)
def test_get_iso_file_size_not_fixed(self):
self._test_get_iso_file_size(fixed=False)
def test_c_char_array_to_c_ushort(self):
mock_buf = mock.MagicMock()
contents = self._ctypes_mock.cast.return_value.contents
def test_get_iso_file_size_small(self):
self._test_get_iso_file_size(small=True)
response = self._config_manager._c_char_array_to_c_ushort(mock_buf, 1)
def test_get_iso_file_size_not_found(self):
self._test_get_iso_file_size(found_iso=False)
self.assertEqual(2, self._ctypes_mock.cast.call_count)
self._ctypes_mock.POINTER.assert_called_with(
self._ctypes_mock.wintypes.WORD)
def test_get_iso_file_size(self):
self._test_get_iso_file_size()
self._ctypes_mock.cast.assert_called_with(
mock_buf.__getitem__(), self._ctypes_mock.POINTER.return_value)
@mock.patch("six.moves.builtins.open", new=OPEN)
def test_write_iso_file(self):
file_path = "fake\\path"
file_size = 100 * 512
sector_size = self.conf_module.MAX_SECTOR_SIZE
offsets = list(range(0, file_size, sector_size))
remain = file_size % sector_size
reads = ([b"\x00" * sector_size] *
(len(offsets) - int(bool(remain))) +
([b"\x00" * remain] if remain else []))
self.assertEqual(contents.value.__lshift__().__add__(), response)
device = mock.Mock()
device_seek_calls = [mock.call(off) for off in offsets]
device_read_calls = [
mock.call(min(sector_size, file_size - off), skip=0)
for off in offsets]
stream_write_calls = [mock.call(read) for read in reads]
device.seek.side_effect = offsets
device.read.side_effect = reads
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._c_char_array_to_c_ushort')
def _test_get_iso_disk_size(self, mock_c_char_array_to_c_ushort,
media_type, value, iso_id,
bytes_per_sector=2):
if media_type == "fixed":
media_type = self.physical_disk.Win32_DiskGeometry.FixedMedia
self._config_manager._write_iso_file(device, file_path, file_size)
device.seek.assert_has_calls(device_seek_calls)
device.read.assert_has_calls(device_read_calls)
OPEN.return_value.write.assert_has_calls(stream_write_calls)
boot_record_off = 0x8000
volume_size_off = 80
block_size_off = 128
mock_phys_disk = mock.MagicMock()
mock_buff = mock.MagicMock()
mock_geom = mock.MagicMock()
mock_phys_disk.get_geometry.return_value = mock_geom
mock_geom.MediaType = media_type
mock_geom.Cylinders = value
mock_geom.TracksPerCylinder = 2
mock_geom.SectorsPerTrack = 2
mock_geom.BytesPerSector = bytes_per_sector
mock_phys_disk.read.return_value = (mock_buff, 'fake value')
mock_buff.__getitem__.return_value = iso_id
mock_c_char_array_to_c_ushort.return_value = 100
disk_size = mock_geom.Cylinders * mock_geom.TracksPerCylinder * \
mock_geom.SectorsPerTrack * mock_geom.BytesPerSector
offset = boot_record_off // mock_geom.BytesPerSector * \
mock_geom.BytesPerSector
buf_off_volume = boot_record_off - offset + volume_size_off
buf_off_block = boot_record_off - offset + block_size_off
response = self._config_manager._get_iso_disk_size(mock_phys_disk)
mock_phys_disk.get_geometry.assert_called_once_with()
if media_type != self.physical_disk.Win32_DiskGeometry.FixedMedia:
self.assertIsNone(response)
elif disk_size <= offset + mock_geom.BytesPerSector:
self.assertIsNone(response)
else:
mock_phys_disk.seek.assert_called_once_with(offset)
mock_phys_disk.read.assert_called_once_with(
mock_geom.BytesPerSector)
if iso_id != 'CD001':
self.assertIsNone(response)
else:
mock_c_char_array_to_c_ushort.assert_has_calls(
mock.call(mock_buff, buf_off_volume),
mock.call(mock_buff, buf_off_block))
self.assertEqual(10000, response)
def test_test_get_iso_disk_size(self):
self._test_get_iso_disk_size(
media_type="fixed",
value=100, iso_id='CD001')
def test_test_get_iso_disk_size_other_media_type(self):
self._test_get_iso_disk_size(media_type="other", value=100,
iso_id='CD001')
def test_test_get_iso_disk_size_other_disk_size_too_small(self):
self._test_get_iso_disk_size(
media_type="fixed",
value=0, iso_id='CD001')
def test_test_get_iso_disk_size_other_id(self):
self._test_get_iso_disk_size(
media_type="fixed",
value=100, iso_id='other id')
def test_test_get_iso_disk_size_bigger_offset(self):
self._test_get_iso_disk_size(
media_type="other media", value=100, iso_id='other id',
bytes_per_sector=2000)
def test_write_iso_file(self, bytes_per_sector=40):
mock_buff = mock.MagicMock()
mock_geom = mock.MagicMock()
mock_geom.BytesPerSector = 2
mock_phys_disk = mock.MagicMock()
mock_phys_disk.read.return_value = (mock_buff, 10)
fake_path = os.path.join('fake', 'path')
mock_phys_disk.get_geometry.return_value = mock_geom
with mock.patch('six.moves.builtins.open', mock.mock_open(),
create=True) as f:
self._config_manager._write_iso_file(mock_phys_disk, fake_path,
4098)
f().write.assert_called_with(mock_buff)
seek_calls = [mock.call(value) for value in range(0, 4098, 10)]
read_calls = (
[mock.call(4096)] +
[mock.call(value) for value in range(4088, 0, -10)]
)
self.assertEqual(seek_calls, mock_phys_disk.seek.mock_calls)
self.assertEqual(read_calls, mock_phys_disk.read.mock_calls)
@mock.patch('os.makedirs')
def _test_extract_iso_files(self, mock_makedirs, exit_code):
def _test_extract_files_from_iso(self, exit_code):
fake_path = os.path.join('fake', 'path')
fake_target_path = os.path.join(fake_path, 'target')
self._config_manager.target_path = fake_target_path
args = [CONF.bsdtar_path, '-xf', fake_path, '-C', fake_target_path]
mock_os_utils = mock.MagicMock()
mock_os_utils.execute_process.return_value = ('fake out', 'fake err',
exit_code)
self.osutils.execute_process.return_value = ('fake out', 'fake err',
exit_code)
if exit_code:
self.assertRaises(exception.CloudbaseInitException,
self._config_manager._extract_iso_files,
mock_os_utils, fake_path, fake_target_path)
self._config_manager._extract_files_from_iso,
fake_path)
else:
self._config_manager._extract_iso_files(mock_os_utils, fake_path,
fake_target_path)
self._config_manager._extract_files_from_iso(fake_path)
mock_os_utils.execute_process.assert_called_once_with(args, False)
mock_makedirs.assert_called_once_with(fake_target_path)
self.osutils.execute_process.assert_called_once_with(args, False)
def test_extract_iso_files(self):
self._test_extract_iso_files(exit_code=None)
def test_extract_files_from_iso(self):
self._test_extract_files_from_iso(exit_code=0)
def test_extract_iso_files_exception(self):
self._test_extract_iso_files(exit_code=1)
def test_extract_files_from_iso_fail(self):
self._test_extract_files_from_iso(exit_code=1)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_iso_disk_size')
'WindowsConfigDriveManager._extract_files_from_iso')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._write_iso_file')
def _test_extract_iso_disk_file(self, mock_write_iso_file,
mock_get_iso_disk_size, exception):
mock_osutils = mock.MagicMock()
fake_path = os.path.join('fake', 'path')
fake_path_physical = os.path.join(fake_path, 'physical')
mock_osutils.get_physical_disks.return_value = [fake_path_physical]
mock_get_iso_disk_size.return_value = 'fake iso size'
mock_PhysDisk = self.windows.disk.PhysicalDisk.return_value
if exception:
mock_PhysDisk.open.side_effect = [Exception]
response = self._config_manager._extract_iso_disk_file(
osutils=mock_osutils, iso_file_path=fake_path)
if not exception:
mock_get_iso_disk_size.assert_called_once_with(
mock_PhysDisk)
mock_write_iso_file.assert_called_once_with(
mock_PhysDisk, fake_path, 'fake iso size')
self.windows.disk.PhysicalDisk.assert_called_once_with(
fake_path_physical)
mock_osutils.get_physical_disks.assert_called_once_with()
mock_PhysDisk.open.assert_called_once_with()
mock_PhysDisk.close.assert_called_once_with()
self.assertTrue(response)
else:
self.assertFalse(response)
def test_extract_iso_disk_file_disk_found(self):
self._test_extract_iso_disk_file(exception=False)
def test_extract_iso_disk_file_disk_not_found(self):
self._test_extract_iso_disk_file(exception=True)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_conf_drive_from_raw_hdd')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_conf_drive_from_cdrom_drive')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_conf_drive_from_vfat')
def _test_get_config_drive_files(self,
mock_get_conf_drive_from_vfat,
mock_get_conf_drive_from_cdrom_drive,
mock_get_conf_drive_from_raw_hdd,
raw_hdd_found=False,
cdrom_drive_found=False,
vfat_found=False):
'WindowsConfigDriveManager._get_iso_file_size')
def _test_extract_iso_from_devices(self, mock_get_iso_file_size,
mock_write_iso_file,
mock_extract_files_from_iso,
found=True):
# For every device (mock) in the list of available devices:
# first - skip (no size)
# second - error (throws Exception)
# third - extract (is ok)
# fourth - unreachable (already found ok device)
size = 100 * 512
devices = [mock.MagicMock() for _ in range(4)]
devices[1].__enter__.side_effect = [Exception]
rest = [size] if found else [None]
mock_get_iso_file_size.side_effect = [None] + rest * 2
file_path = os.path.join("tempdir", "uuid.iso")
fake_path = os.path.join('fake', 'path')
mock_get_conf_drive_from_raw_hdd.return_value = raw_hdd_found
mock_get_conf_drive_from_cdrom_drive.return_value = cdrom_drive_found
mock_get_conf_drive_from_vfat.return_value = vfat_found
with self.snatcher:
response = self._config_manager._extract_iso_from_devices(devices)
self.mock_gettempdir.assert_called_once_with()
mock_get_iso_file_size.assert_has_calls([
mock.call(devices[0]), mock.call(devices[2])])
expected_log = [
"ISO extraction failed on %(device)s with %(error)r" %
{"device": devices[1], "error": Exception()}]
if found:
mock_write_iso_file.assert_called_once_with(devices[2],
file_path, size)
mock_extract_files_from_iso.assert_called_once_with(file_path)
expected_log.append("ISO9660 disk found on %s" % devices[2])
self.assertEqual(expected_log, self.snatcher.output)
self.assertEqual(found, response)
response = self._config_manager.get_config_drive_files(
target_path=fake_path)
def test_extract_iso_from_devices_not_found(self):
self._test_extract_iso_from_devices(found=False)
if vfat_found:
mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path)
self.assertFalse(mock_get_conf_drive_from_raw_hdd.called)
self.assertFalse(mock_get_conf_drive_from_cdrom_drive.called)
elif cdrom_drive_found:
mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path)
mock_get_conf_drive_from_cdrom_drive.assert_called_once_with(
fake_path)
mock_get_conf_drive_from_raw_hdd.assert_called_once_with(
fake_path)
elif raw_hdd_found:
mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path)
mock_get_conf_drive_from_raw_hdd.assert_called_once_with(
fake_path)
self.assertFalse(mock_get_conf_drive_from_cdrom_drive.called)
self.assertTrue(response)
def test_get_config_drive_files(self):
self._test_get_config_drive_files(raw_hdd_found=True)
self._test_get_config_drive_files(cdrom_drive_found=True)
self._test_get_config_drive_files(vfat_found=True)
def test_extract_iso_from_devices(self):
self._test_extract_iso_from_devices()
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_cdrom_mount_point')
'_check_for_config_drive')
@mock.patch('shutil.copytree')
def _test_get_conf_drive_from_cdrom_drive(self, mock_copytree,
mock_get_config_cdrom_mount,
mount_point):
fake_path = os.path.join('fake', 'path')
mock_get_config_cdrom_mount.return_value = mount_point
@mock.patch('os.rmdir')
def _test_get_config_drive_from_cdrom_drive(self, mock_os_rmdir,
mock_copytree,
mock_check_for_config_drive,
found=True):
drives = ["C:\\", "M:\\", "I:\\", "N:\\"]
self.osutils.get_cdrom_drives.return_value = drives
checks = [False, False, True, False]
if not found:
checks[2] = False
mock_check_for_config_drive.side_effect = checks
response = self._config_manager._get_conf_drive_from_cdrom_drive(
fake_path)
response = self._config_manager._get_config_drive_from_cdrom_drive()
mock_get_config_cdrom_mount.assert_called_once_with()
self.osutils.get_cdrom_drives.assert_called_once_with()
idx = 3 if found else 4
check_calls = [mock.call(drive) for drive in drives[:idx]]
mock_check_for_config_drive.assert_has_calls(check_calls)
if found:
mock_os_rmdir.assert_called_once_with(
self._config_manager.target_path)
mock_copytree.assert_called_once_with(
drives[2], self._config_manager.target_path)
if mount_point:
mock_copytree.assert_called_once_with(mount_point, fake_path)
self.assertTrue(response)
else:
self.assertFalse(response)
self.assertEqual(found, response)
def test_get_conf_drive_from_cdrom_drive_with_mountpoint(self):
self._test_get_conf_drive_from_cdrom_drive(
mount_point='fake mount point')
def test_get_config_drive_from_cdrom_drive_not_found(self):
self._test_get_config_drive_from_cdrom_drive(found=False)
def test_get_conf_drive_from_cdrom_drive_without_mountpoint(self):
self._test_get_conf_drive_from_cdrom_drive(
mount_point=None)
def test_get_config_drive_from_cdrom_drive(self):
self._test_get_config_drive_from_cdrom_drive()
@mock.patch('os.remove')
@mock.patch('os.path.exists')
@mock.patch('tempfile.gettempdir')
@mock.patch('uuid.uuid4')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._extract_iso_disk_file')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._extract_iso_files')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
def _test_get_conf_drive_from_raw_hdd(self, mock_get_os_utils,
mock_extract_iso_files,
mock_extract_iso_disk_file,
mock_uuid4, mock_gettempdir,
mock_exists, mock_remove,
found_drive):
fake_target_path = os.path.join('fake', 'path')
fake_iso_path = os.path.join('fake_dir', 'fake_id' + '.iso')
'WindowsConfigDriveManager.'
'_extract_iso_from_devices')
@mock.patch("six.moves.builtins.map")
def test_get_config_drive_from_raw_hdd(self, mock_map,
mock_extract_iso_from_devices):
Disk = self.conf_module.disk.Disk
paths = [mock.Mock() for _ in range(3)]
self.osutils.get_physical_disks.return_value = paths
mock_extract_iso_from_devices.return_value = True
mock_uuid4.return_value = 'fake_id'
mock_gettempdir.return_value = 'fake_dir'
mock_extract_iso_disk_file.return_value = found_drive
mock_exists.return_value = found_drive
response = self._config_manager._get_config_drive_from_raw_hdd()
mock_map.assert_called_once_with(Disk, paths)
self.osutils.get_physical_disks.assert_called_once_with()
mock_extract_iso_from_devices.assert_called_once_with(
mock_map.return_value)
self.assertTrue(response)
response = self._config_manager._get_conf_drive_from_raw_hdd(
fake_target_path)
mock_get_os_utils.assert_called_once_with()
mock_gettempdir.assert_called_once_with()
mock_extract_iso_disk_file.assert_called_once_with(
mock_get_os_utils(), fake_iso_path)
if found_drive:
mock_extract_iso_files.assert_called_once_with(
mock_get_os_utils(), fake_iso_path, fake_target_path)
mock_exists.assert_called_once_with(fake_iso_path)
mock_remove.assert_called_once_with(fake_iso_path)
self.assertTrue(response)
else:
self.assertFalse(response)
def test_get_conf_drive_from_raw_hdd_found_drive(self):
self._test_get_conf_drive_from_raw_hdd(found_drive=True)
def test_get_conf_drive_from_raw_hdd_no_drive_found(self):
self._test_get_conf_drive_from_raw_hdd(found_drive=False)
@mock.patch('os.makedirs')
@mock.patch('cloudbaseinit.utils.windows.vfat.copy_from_vfat_drive')
@mock.patch('cloudbaseinit.utils.windows.vfat.is_vfat_drive')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
def test_get_conf_drive_from_vfat(self, mock_get_os_utils,
mock_is_vfat_drive,
mock_copy_from_vfat_drive,
mock_os_makedirs):
mock_osutils = mock_get_os_utils.return_value
mock_osutils.get_physical_disks.return_value = (
def test_get_config_drive_from_vfat(self, mock_is_vfat_drive,
mock_copy_from_vfat_drive):
self.osutils.get_physical_disks.return_value = (
mock.sentinel.drive1,
mock.sentinel.drive2,
)
@ -409,24 +297,160 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
with testutils.LogSnatcher('cloudbaseinit.metadata.services.'
'osconfigdrive.windows') as snatcher:
response = self._config_manager._get_conf_drive_from_vfat(
mock.sentinel.target_path)
response = self._config_manager._get_config_drive_from_vfat()
self.assertTrue(response)
mock_osutils.get_physical_disks.assert_called_once_with()
self.osutils.get_physical_disks.assert_called_once_with()
expected_is_vfat_calls = [
mock.call(mock_osutils, mock.sentinel.drive1),
mock.call(mock_osutils, mock.sentinel.drive2),
mock.call(self.osutils, mock.sentinel.drive1),
mock.call(self.osutils, mock.sentinel.drive2),
]
self.assertEqual(expected_is_vfat_calls, mock_is_vfat_drive.mock_calls)
mock_copy_from_vfat_drive.assert_called_once_with(
mock_osutils,
self.osutils,
mock.sentinel.drive2,
mock.sentinel.target_path)
self._config_manager.target_path)
expected_logging = [
'Config Drive found on disk %r' % mock.sentinel.drive2,
]
self.assertEqual(expected_logging, snatcher.output)
mock_os_makedirs.assert_called_once_with(mock.sentinel.target_path)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_extract_iso_from_devices')
def _test_get_config_drive_from_partition(self,
mock_extract_iso_from_devices,
found=True):
paths = [mock.Mock() for _ in range(3)]
self.osutils.get_physical_disks.return_value = paths
disks = list(map(self.conf_module.disk.Disk, paths))
mock_extract_iso_from_devices.side_effect = [False, found, found]
idx = 3 - int(found)
extract_calls = [mock.call(disk.partitions())
for disk in disks[:idx]]
response = self._config_manager._get_config_drive_from_partition()
self.osutils.get_physical_disks.assert_called_once_with()
mock_extract_iso_from_devices.assert_has_calls(extract_calls)
self.assertEqual(found, response)
def test_get_config_drive_from_partition_not_found(self):
self._test_get_config_drive_from_partition(found=False)
def test_get_config_drive_from_partition(self):
self._test_get_config_drive_from_partition()
@mock.patch('os.rmdir')
@mock.patch('shutil.copytree')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_check_for_config_drive')
def _test_get_config_drive_from_volume(self, mock_check_for_config_drive,
mock_copytree, mock_os_rmdir,
found=True):
volumes = [mock.Mock() for _ in range(3)]
self.osutils.get_volumes.return_value = volumes
checks = [False, found, found]
mock_check_for_config_drive.side_effect = checks
idx = 3 - int(found)
check_calls = [mock.call(volume) for volume in volumes[:idx]]
response = self._config_manager._get_config_drive_from_volume()
self.osutils.get_volumes.assert_called_once_with()
mock_check_for_config_drive.assert_has_calls(check_calls)
if found:
mock_os_rmdir.assert_called_once_with(
self._config_manager.target_path)
mock_copytree.assert_called_once_with(
volumes[1], self._config_manager.target_path)
self.assertEqual(found, response)
def test_get_config_drive_from_volume_not_found(self):
self._test_get_config_drive_from_volume(found=False)
def test_get_config_drive_from_volume(self):
self._test_get_config_drive_from_volume()
def _test__get_config_drive_files(self, cd_type, cd_location,
func, found=True):
response = self._config_manager._get_config_drive_files(cd_type,
cd_location)
if found:
if func:
func.assert_called_once_with()
self.assertEqual(func.return_value, response)
else:
self.assertFalse(response)
def test__get_config_drive_files_not_found(self):
self._test__get_config_drive_files(None, None, None, found=False)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_from_cdrom_drive')
def test__get_config_drive_files_cdrom_iso(self, func):
self._test__get_config_drive_files(
"iso", "cdrom", func)
def test__get_config_drive_files_cdrom_vfat(self):
self._test__get_config_drive_files(
"vfat", "cdrom", None)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_from_raw_hdd')
def test__get_config_drive_files_hdd_iso(self, func):
self._test__get_config_drive_files(
"iso", "hdd", func)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_from_vfat')
def test__get_config_drive_files_hdd_vfat(self, func):
self._test__get_config_drive_files(
"vfat", "hdd", func)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_from_partition')
def test__get_config_drive_files_partition_iso(self, func):
self._test__get_config_drive_files(
"iso", "partition", func)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_from_volume')
def test__get_config_drive_files_partition_vfat(self, func):
self._test__get_config_drive_files(
"vfat", "partition", func)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_files')
def _test_get_config_drive_files(self, mock_get_config_drive_files,
found=True):
check_types = ["iso", "vfat"] if found else []
check_locations = ["cdrom", "hdd", "partition"]
product = list(itertools.product(check_types, check_locations))
product_calls = [mock.call(cd_type, cd_location)
for cd_type, cd_location in product]
mock_get_config_drive_files.side_effect = \
[False] * (len(product_calls) - 1) + [True]
expected_log = ["Looking for Config Drive %(type)s in %(location)s" %
{"type": cd_type, "location": cd_location}
for cd_type, cd_location in product]
with self.snatcher:
response = self._config_manager.get_config_drive_files(
check_types, check_locations)
mock_get_config_drive_files.assert_has_calls(product_calls)
self.assertEqual(expected_log, self.snatcher.output)
self.assertEqual(found, response)
def test_get_config_drive_files_not_found(self):
self._test_get_config_drive_files(found=False)
def test_get_config_drive_files(self):
self._test_get_config_drive_files()

View File

@ -12,23 +12,24 @@
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import os
import unittest
import uuid
try:
import unittest.mock as mock
except ImportError:
import mock
from oslo_config import cfg
CONF = cfg.CONF
from cloudbaseinit import exception
from cloudbaseinit.tests import testutils
class ConfigDriveServiceTest(unittest.TestCase):
class TestConfigDriveService(unittest.TestCase):
def setUp(self):
module_path = "cloudbaseinit.metadata.services.configdrive"
self._win32com_mock = mock.MagicMock()
self._ctypes_mock = mock.MagicMock()
self._ctypes_util_mock = mock.MagicMock()
@ -43,35 +44,66 @@ class ConfigDriveServiceTest(unittest.TestCase):
'win32com.client': self._win32com_client_mock,
'pywintypes': self._pywintypes_mock})
self._module_patcher.start()
self.addCleanup(self._module_patcher.stop)
configdrive = importlib.import_module('cloudbaseinit.metadata.services'
'.configdrive')
self._config_drive = configdrive.ConfigDriveService()
self.configdrive_module = importlib.import_module(module_path)
self._config_drive = self.configdrive_module.ConfigDriveService()
self.snatcher = testutils.LogSnatcher(module_path)
def tearDown(self):
self._module_patcher.stop()
def _test_preprocess_options(self, fail=False):
if fail:
with testutils.ConfPatcher("config_drive_types",
["vfat", "ntfs"]):
with self.assertRaises(exception.CloudbaseInitException):
self._config_drive._preprocess_options()
with testutils.ConfPatcher("config_drive_locations",
["device"]):
with self.assertRaises(exception.CloudbaseInitException):
self._config_drive._preprocess_options()
return
options = {
"config_drive_raw_hhd": False,
"config_drive_cdrom": False,
"config_drive_vfat": True,
# Deprecated options above.
"config_drive_types": ["vfat", "iso"],
"config_drive_locations": ["partition"]
}
contexts = [testutils.ConfPatcher(key, value)
for key, value in options.items()]
with contexts[0], contexts[1], contexts[2], \
contexts[3], contexts[4]:
self._config_drive._preprocess_options()
self.assertEqual({"vfat", "iso"},
self._config_drive._searched_types)
self.assertEqual({"hdd", "partition"},
self._config_drive._searched_locations)
def test_preprocess_options_fail(self):
self._test_preprocess_options(fail=True)
def test_preprocess_options(self):
self._test_preprocess_options()
@mock.patch('tempfile.gettempdir')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.factory.'
'get_config_drive_manager')
def test_load(self, mock_get_config_drive_manager,
mock_gettempdir):
def test_load(self, mock_get_config_drive_manager):
mock_manager = mock.MagicMock()
mock_manager.get_config_drive_files.return_value = True
fake_path = "fake\\fake_id"
mock_manager.target_path = fake_path
mock_get_config_drive_manager.return_value = mock_manager
mock_gettempdir.return_value = 'fake'
uuid.uuid4 = mock.MagicMock(return_value='fake_id')
fake_path = os.path.join('fake', str('fake_id'))
expected_log = [
"Metadata copied to folder: %r" % fake_path]
response = self._config_drive.load()
with self.snatcher:
response = self._config_drive.load()
mock_gettempdir.assert_called_once_with()
mock_get_config_drive_manager.assert_called_once_with()
mock_manager.get_config_drive_files.assert_called_once_with(
fake_path,
check_raw_hhd=CONF.config_drive_raw_hhd,
check_cdrom=CONF.config_drive_cdrom,
check_vfat=CONF.config_drive_vfat)
searched_types=self.configdrive_module.CD_TYPES,
searched_locations=self.configdrive_module.CD_LOCATIONS)
self.assertEqual(expected_log, self.snatcher.output)
self.assertTrue(response)
self.assertEqual(fake_path, self._config_drive._metadata_path)
@ -85,10 +117,19 @@ class ConfigDriveServiceTest(unittest.TestCase):
self.assertEqual('fake data', response)
mock_join.assert_called_with(
self._config_drive._metadata_path, fake_path)
mock_normpath.assert_called_once_with(mock_join.return_value)
@mock.patch('shutil.rmtree')
def test_cleanup(self, mock_rmtree):
fake_path = os.path.join('fake', 'path')
self._config_drive._metadata_path = fake_path
self._config_drive.cleanup()
mock_mgr = mock.Mock()
self._config_drive._mgr = mock_mgr
mock_mgr.target_path = fake_path
with self.snatcher:
self._config_drive.cleanup()
self.assertEqual(["Deleting metadata folder: %r" % fake_path],
self.snatcher.output)
mock_rmtree.assert_called_once_with(fake_path,
ignore_errors=True)
self.assertEqual(None, self._config_drive._metadata_path)

View File

@ -66,7 +66,7 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._win32net_mock.error = Exception
module_path = "cloudbaseinit.osutils.windows"
self._module_patcher = mock.patch.dict(
_module_patcher = mock.patch.dict(
'sys.modules',
{'win32com': self._win32com_mock,
'win32process': self._win32process_mock,
@ -78,10 +78,15 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
'six.moves.xmlrpc_client': self._xmlrpc_client_mock,
'ctypes': self._ctypes_mock,
'pywintypes': self._pywintypes_mock,
'tzlocal': self._tzlocal_mock})
'tzlocal': self._tzlocal_mock,
'winioctlcon': mock.MagicMock()})
_module_patcher.start()
self.addCleanup(_module_patcher.stop)
self._module_patcher.start()
self.windows_utils = importlib.import_module(module_path)
exception.ctypes.GetLastError = mock.MagicMock()
exception.ctypes.FormatError = mock.MagicMock()
with mock.patch("cloudbaseinit.utils.windows.disk.GUID"):
self.windows_utils = importlib.import_module(module_path)
self._winreg_mock = self._moves_mock.winreg
self._windll_mock = self._ctypes_mock.windll
@ -96,9 +101,6 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self.snatcher = testutils.LogSnatcher(module_path)
def tearDown(self):
self._module_patcher.stop()
@mock.patch('cloudbaseinit.osutils.windows.privilege')
def _test_reboot(self, mock_privilege_module, ret_value,
expected_ret_value=None):
@ -523,7 +525,8 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
def _test_get_user_home(self, user_sid, mock_get_user_sid):
mock_get_user_sid.return_value = user_sid
response = self._winutils.get_user_home(self._USERNAME)
with self.snatcher:
response = self._winutils.get_user_home(self._USERNAME)
if user_sid:
mock_get_user_sid.assert_called_with(self._USERNAME)
@ -536,6 +539,9 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._winreg_mock.OpenKey.return_value.__enter__.return_value,
'ProfileImagePath')
else:
self.assertEqual(
["Home directory not found for user %r" % self._USERNAME],
self.snatcher.output)
self.assertTrue(response is None)
def test_get_user_home(self):
@ -593,6 +599,13 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
)
return
expected_log = []
for (ret_val,), msg in ((static_val, "Setting static IP address"),
(gateway_val, "Setting static gateways"),
(dns_val, "Setting static DNS servers")):
if ret_val in (0, 1):
expected_log.append(msg)
conn.return_value.query.return_value = adapter
adapter_config = adapter[0].associators.return_value[0]
adapter_config.EnableStatic.return_value = static_val
@ -604,11 +617,13 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
exception.CloudbaseInitException,
set_static_call)
else:
response = set_static_call()
with self.snatcher:
response = set_static_call()
if static_val[0] or gateway_val[0] or dns_val[0]:
self.assertTrue(response)
else:
self.assertFalse(response)
self.assertEqual(expected_log, self.snatcher.output)
select = ("SELECT * FROM Win32_NetworkAdapter WHERE "
"MACAddress = '{}'".format(mac_address))
@ -655,13 +670,19 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
set_static_call = functools.partial(
self._winutils.set_static_network_config_v6,
mac_address, address6, netmask6, gateway6)
expected_log = []
if not mock_check_os_version.return_value:
expected_log.append("Setting IPv6 info not available "
"on this system")
if not v6adapters or v6error:
self.assertRaises(
exception.CloudbaseInitException,
set_static_call)
else:
set_static_call()
expected_log.append("Setting IPv6 info for %s" % friendly_name)
with self.snatcher:
set_static_call()
mock_get_adapter_addresses.assert_called_once_with()
select = ("SELECT * FROM MSFT_NetIPAddress "
"WHERE InterfaceAlias = '{}'".format(friendly_name))
@ -686,6 +707,7 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
"PassThru": False,
}
netip.Create.assert_called_once_with(**params)
self.assertEqual(expected_log, self.snatcher.output)
def test_set_static_network_config(self):
ret_val1 = (1,)
@ -820,10 +842,11 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._test_get_config_value(None)
@mock.patch('time.sleep')
def _test_wait_for_boot_completion(self, ret_val, mock_sleep):
self._winreg_mock.QueryValueEx.side_effect = [ret_val]
def _test_wait_for_boot_completion(self, _, ret_vals=None):
self._winreg_mock.QueryValueEx.side_effect = ret_vals
self._winutils.wait_for_boot_completion()
with self.snatcher:
self._winutils.wait_for_boot_completion()
key = self._winreg_mock.OpenKey.return_value.__enter__.return_value
self._winreg_mock.OpenKey.assert_called_with(
@ -831,12 +854,24 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
"SYSTEM\\Setup\\Status\\SysprepStatus", 0,
self._winreg_mock.KEY_READ)
expected_log = []
for gen_states in ret_vals:
gen_state = gen_states[0]
if gen_state == 7:
break
expected_log.append('Waiting for sysprep completion. '
'GeneralizationState: %d' % gen_state)
self._winreg_mock.QueryValueEx.assert_called_with(
key, "GeneralizationState")
self.assertEqual(expected_log, self.snatcher.output)
def test_wait_for_boot_completion(self):
ret_val = [7]
self._test_wait_for_boot_completion(ret_val)
ret_vals = [[7]]
self._test_wait_for_boot_completion(ret_vals=ret_vals)
def test_wait_for_boot_completion_wait(self):
ret_vals = [[1], [7]]
self._test_wait_for_boot_completion(ret_vals=ret_vals)
def test_get_service(self):
conn = self._wmi_mock.WMI
@ -911,7 +946,10 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._winutils.start_service,
'fake name')
else:
self._winutils.start_service('fake name')
with self.snatcher:
self._winutils.start_service('fake name')
self.assertEqual(["Starting service fake name"],
self.snatcher.output)
mock_service.StartService.assert_called_once_with()
@ -933,7 +971,10 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._winutils.stop_service,
'fake name')
else:
self._winutils.stop_service('fake name')
with self.snatcher:
self._winutils.stop_service('fake name')
self.assertEqual(["Stopping service fake name"],
self.snatcher.output)
mock_service.StopService.assert_called_once_with()
@ -1302,6 +1343,49 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
interface_detail='fake interface detail',
disk_handle=mock_disk_handle, io_control=True)
def _test_get_volumes(self, find_first=True, find_next=True):
count = 3
expected_volumes = ["ID_{}".format(idx) for idx in range(count)]
volume_values = mock.PropertyMock(side_effect=expected_volumes)
mock_volume = mock.Mock()
type(mock_volume).value = volume_values
self._ctypes_mock.create_unicode_buffer.return_value = mock_volume
if not find_first:
self._kernel32.FindFirstVolumeW.return_value = self._winutils\
.INVALID_HANDLE_VALUE
side_effects = [1] * (count - 1)
side_effects.append(0)
self._kernel32.FindNextVolumeW.side_effect = side_effects
if find_next:
self._ctypes_mock.GetLastError.return_value = self._winutils\
.ERROR_NO_MORE_FILES
if not (find_first and find_next):
with self.assertRaises(exception.WindowsCloudbaseInitException):
self._winutils.get_volumes()
return
volumes = self._winutils.get_volumes()
self._kernel32.FindFirstVolumeW.assert_called_once_with(
mock_volume, self._winutils.MAX_PATH)
find_next_calls = [
mock.call(self._kernel32.FindFirstVolumeW.return_value,
mock_volume, self._winutils.MAX_PATH)] * count
self._kernel32.FindNextVolumeW.assert_has_calls(find_next_calls)
self._kernel32.FindVolumeClose.assert_called_once_with(
self._kernel32.FindFirstVolumeW.return_value)
self.assertEqual(expected_volumes, volumes)
def test_get_volumes(self):
self._test_get_volumes()
def test_get_volumes_first_fail(self):
self._test_get_volumes(find_first=False)
def test_get_volumes_next_fail(self):
self._test_get_volumes(find_next=False)
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils._get_fw_protocol')
def test_firewall_create_rule(self, mock_get_fw_protocol):
expected = [mock.call("HNetCfg.FWOpenPort"),

View File

@ -12,201 +12,323 @@
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import exception as cbinit_exception
from cloudbaseinit import exception
from cloudbaseinit.tests import testutils
class WindowsPhysicalDiskUtilsTests(testutils.CloudbaseInitTestBase):
class BaseTestDevice(unittest.TestCase):
def setUp(self):
self._ctypes_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict(
_module_patcher = mock.patch.dict(
'sys.modules',
{'ctypes': self._ctypes_mock})
{'ctypes': self._ctypes_mock,
'ctypes.windll': mock.MagicMock(),
'ctypes.wintypes': mock.MagicMock(),
'winioctlcon': mock.MagicMock()})
_module_patcher.start()
self.addCleanup(_module_patcher.stop)
self._module_patcher.start()
self.physical_disk = importlib.import_module(
self.disk = importlib.import_module(
"cloudbaseinit.utils.windows.disk")
self.mock_dword = mock.Mock()
self._ctypes_mock.wintypes.DWORD = self.mock_dword
self.disk.kernel32 = mock.MagicMock()
self.geom = mock.Mock()
self.geom.MediaType = self.disk.Win32_DiskGeometry.FixedMedia = 12
self.geom.Cylinders = 2
self.geom.TracksPerCylinder = 5
self.geom.SectorsPerTrack = 512
self.geom.BytesPerSector = 512
class TestBaseDevice(BaseTestDevice, testutils.CloudbaseInitTestBase):
def setUp(self):
super(TestBaseDevice, self).setUp()
class FinalBaseDevice(self.disk.BaseDevice):
def size(self):
return 0
self.fake_path = mock.sentinel.fake_path
self._phys_disk_class = self.physical_disk.PhysicalDisk(
self._device_class = FinalBaseDevice(
path=self.fake_path)
self._device_class._sector_size = self.geom.BytesPerSector
self._device_class._disk_size = 2 * 5 * 512 * 512
self.physical_disk.kernel32 = mock.MagicMock()
def tearDown(self):
self._module_patcher.stop()
@mock.patch('cloudbaseinit.utils.windows.disk'
'.PhysicalDisk.close')
def _test_open(self, mock_close, _handle, exception):
self._phys_disk_class._handle = _handle
if exception:
self.physical_disk.kernel32.CreateFileW.return_value = \
self._phys_disk_class.INVALID_HANDLE_VALUE
self.assertRaises(cbinit_exception.CloudbaseInitException,
self._phys_disk_class.open)
def _test_open(self, exc):
if exc:
self.disk.kernel32.CreateFileW.return_value = \
self._device_class.INVALID_HANDLE_VALUE
with self.assert_raises_windows_message(
"Cannot open file: %r", 100):
self._device_class.open()
else:
self._phys_disk_class.open()
self._device_class.open()
self.physical_disk.kernel32.CreateFileW.assert_called_once_with(
self.disk.kernel32.CreateFileW.assert_called_once_with(
self._ctypes_mock.c_wchar_p.return_value,
self._phys_disk_class.GENERIC_READ,
self._phys_disk_class.FILE_SHARE_READ,
0, self._phys_disk_class.OPEN_EXISTING,
self._phys_disk_class.FILE_ATTRIBUTE_READONLY, 0
self._device_class.GENERIC_READ,
self._device_class.FILE_SHARE_READ,
0, self._device_class.OPEN_EXISTING,
self._device_class.FILE_ATTRIBUTE_READONLY, 0
)
self._ctypes_mock.c_wchar_p.assert_called_once_with(self.fake_path)
self.assertEqual(
self.physical_disk.kernel32.CreateFileW.return_value,
self._phys_disk_class._handle)
if _handle:
mock_close.assert_called_once_with()
self.disk.kernel32.CreateFileW.return_value,
self._device_class._handle)
def test_open(self):
self._test_open(_handle=None, exception=None)
self._test_open(exc=False)
def test_open_exeption(self):
self._test_open(_handle=None, exception=True)
def test_open_with_close(self):
self._test_open(_handle=mock.sentinel._handle, exception=True)
def test_open_exception(self):
self._test_open(exc=True)
def test_close(self):
self._phys_disk_class._handle = mock.sentinel._handle
self._phys_disk_class._geom = mock.sentinel._geom
self._device_class._handle = mock.sentinel._handle
self._phys_disk_class.close()
self.physical_disk.kernel32.CloseHandle.assert_called_once_with(
self._device_class.close()
self.disk.kernel32.CloseHandle.assert_called_once_with(
mock.sentinel._handle)
self.assertEqual(None, self._device_class._handle)
self.assertEqual(0, self._phys_disk_class._handle)
self.assertEqual(None, self._phys_disk_class._geom)
def _test_get_geometry(self, ret_val, last_error=None):
mock_disk_geom = self.disk.Win32_DiskGeometry
mock_disk_geom.side_effect = None
mock_disk_geom.return_value = self.geom
@mock.patch('cloudbaseinit.utils.windows.disk'
'.Win32_DiskGeometry')
def _test_get_geometry(self, mock_Win32_DiskGeometry, _geom, ret_val,
last_error=None):
mock_DeviceIoControl = self.physical_disk.kernel32.DeviceIoControl
expect_byref = [mock.call(mock_Win32_DiskGeometry.return_value),
mock_DeviceIoControl = self.disk.kernel32.DeviceIoControl
expect_byref = [mock.call(self.geom),
mock.call(
self._ctypes_mock.wintypes.DWORD.return_value)]
self._phys_disk_class._geom = _geom
self.physical_disk.kernel32.DeviceIoControl.return_value = ret_val
self.mock_dword.return_value)]
self.disk.kernel32.DeviceIoControl.return_value = ret_val
if not ret_val:
with self.assert_raises_windows_message(
"Cannot get disk geometry: %r", last_error):
self._phys_disk_class.get_geometry()
elif _geom:
response = self._phys_disk_class.get_geometry()
self.assertEqual(_geom, response)
self._device_class._get_geometry()
else:
response = self._phys_disk_class.get_geometry()
mock_Win32_DiskGeometry.assert_called_once_with()
self._ctypes_mock.wintypes.DWORD.assert_called_once_with()
response = self._device_class._get_geometry()
self.mock_dword.assert_called_once_with()
mock_DeviceIoControl.assert_called_once_with(
self._phys_disk_class._handle,
self._phys_disk_class.IOCTL_DISK_GET_DRIVE_GEOMETRY, 0, 0,
self._device_class._handle,
self.disk.winioctlcon.IOCTL_DISK_GET_DRIVE_GEOMETRY, 0, 0,
self._ctypes_mock.byref.return_value,
self._ctypes_mock.sizeof.return_value,
self._ctypes_mock.byref.return_value, 0)
self.assertEqual(expect_byref,
self._ctypes_mock.byref.call_args_list)
self.assertEqual(mock_Win32_DiskGeometry.return_value,
self._phys_disk_class._geom)
self.assertEqual(self._phys_disk_class._geom, response)
self.assertEqual((self._device_class._sector_size,
self._device_class._disk_size, True),
response)
def test_get_geometry(self):
self._test_get_geometry(_geom=mock.sentinel._geom,
ret_val=mock.sentinel.ret_val)
self._test_get_geometry(ret_val=mock.sentinel.ret_val)
def test_get_geometry_no_geom(self):
self._test_get_geometry(_geom=None,
ret_val=mock.sentinel.ret_val,
last_error=100)
def test_get_geometry_exception(self):
self._test_get_geometry(ret_val=0, last_error=100)
def test_get_geometry_no_geom_exception(self):
self._test_get_geometry(_geom=None, ret_val=None,
last_error=100)
def _test_seek(self, exception):
def _test__seek(self, exc):
expect_DWORD = [mock.call(0), mock.call(1)]
if exception:
self.physical_disk.kernel32.SetFilePointer.return_value = \
self._phys_disk_class.INVALID_SET_FILE_POINTER
self.assertRaises(cbinit_exception.CloudbaseInitException,
self._phys_disk_class.seek, 1)
if exc:
self.disk.kernel32.SetFilePointer.return_value = \
self._device_class.INVALID_SET_FILE_POINTER
with self.assert_raises_windows_message(
"Seek error: %r", 100):
self._device_class._seek(1)
else:
self._phys_disk_class.seek(1)
self.physical_disk.kernel32.SetFilePointer.assert_called_once_with(
self._phys_disk_class._handle,
self._ctypes_mock.wintypes.DWORD.return_value,
self._device_class._seek(1)
self.disk.kernel32.SetFilePointer.assert_called_once_with(
self._device_class._handle,
self.mock_dword.return_value,
self._ctypes_mock.byref.return_value,
self._phys_disk_class.FILE_BEGIN)
self._device_class.FILE_BEGIN)
self._ctypes_mock.byref.assert_called_once_with(
self._ctypes_mock.wintypes.DWORD.return_value)
self.mock_dword.return_value)
self.assertEqual(expect_DWORD,
self._ctypes_mock.wintypes.DWORD.call_args_list)
self.mock_dword.call_args_list)
def test__seek(self):
self._test__seek(exc=False)
def test__seek_exception(self):
self._test__seek(exc=True)
def test_seek(self):
self._test_seek(exception=False)
offset = self._device_class.seek(1025)
self.assertEqual(1024, offset)
def test_seek_exception(self):
self._test_seek(exception=True)
def _test_read(self, ret_val, last_error=None):
def _test__read(self, ret_val, last_error=None):
bytes_to_read = mock.sentinel.bytes_to_read
self.physical_disk.kernel32.ReadFile.return_value = ret_val
self.disk.kernel32.ReadFile.return_value = ret_val
if not ret_val:
with self.assert_raises_windows_message(
"Read exception: %r", last_error):
self._phys_disk_class.read(bytes_to_read)
self._device_class._read(bytes_to_read)
else:
response = self._phys_disk_class.read(bytes_to_read)
response = self._device_class._read(bytes_to_read)
mock_buffer = self._ctypes_mock.create_string_buffer
self._ctypes_mock.create_string_buffer.assert_called_once_with(
bytes_to_read)
self._ctypes_mock.wintypes.DWORD.assert_called_once_with()
self.physical_disk.kernel32.ReadFile.assert_called_once_with(
self._phys_disk_class._handle,
self._ctypes_mock.create_string_buffer.return_value,
mock_buffer.assert_called_once_with(bytes_to_read)
self.mock_dword.assert_called_once_with()
self.disk.kernel32.ReadFile.assert_called_once_with(
self._device_class._handle,
mock_buffer.return_value,
bytes_to_read, self._ctypes_mock.byref.return_value, 0)
self._ctypes_mock.byref.assert_called_once_with(
self._ctypes_mock.wintypes.DWORD.return_value)
self.mock_dword.return_value)
self.assertEqual(
(self._ctypes_mock.create_string_buffer.return_value,
self._ctypes_mock.wintypes.DWORD.return_value.value),
response)
mock_buffer.return_value.raw[
:self.mock_dword.return_value.value], response)
def test__read(self):
self._test__read(ret_val=mock.sentinel.ret_val)
def test__read_exception(self):
self._test__read(ret_val=None, last_error=100)
def test_read(self):
self._test_read(ret_val=mock.sentinel.ret_val)
_read_func = mock.Mock()
mock_content = mock.MagicMock()
_read_func.return_value = mock_content
self._device_class._read = _read_func
response = self._device_class.read(512, 10)
self._device_class._read.assert_called_once_with(1024)
self.assertEqual(response, mock_content[10, 522])
def test_read_exception(self):
self._test_read(ret_val=None, last_error=100)
class TestDisk(BaseTestDevice, testutils.CloudbaseInitTestBase):
def setUp(self):
super(TestDisk, self).setUp()
self.fake_disk_path = mock.sentinel.fake_disk_path
self._disk_class = self.disk.Disk(
path=self.fake_disk_path)
self._disk_class._disk_size = 2 * 5 * 512 * 512
@mock.patch("cloudbaseinit.utils.windows.disk"
".Win32_DRIVE_LAYOUT_INFORMATION_EX")
def _test_get_layout(self, mock_layout_struct, fail=False):
mock_layout = mock.Mock()
mock_layout_struct.return_value = mock_layout
mock_devio = self.disk.kernel32.DeviceIoControl
if fail:
mock_devio.return_value = 0
with self.assert_raises_windows_message(
"Cannot get disk layout: %r", 100):
self._disk_class._get_layout()
return
mock_devio.return_value = 1
response = self._disk_class._get_layout()
mock_devio.assert_called_once_with(
self._disk_class._handle,
self.disk.winioctlcon.IOCTL_DISK_GET_DRIVE_LAYOUT_EX,
0,
0,
self._ctypes_mock.byref(mock_layout),
self._ctypes_mock.sizeof(mock_layout),
self._ctypes_mock.byref(self.mock_dword.return_value),
0)
self.assertEqual(mock_layout, response)
def test_get_layout_fail(self):
self._test_get_layout(fail=True)
def test_get_layout(self):
self._test_get_layout()
def _test_get_partition_indexes(self, fail=False, gpt=True):
layout = mock.MagicMock()
if fail:
with self.assertRaises(exception.CloudbaseInitException):
self._disk_class._get_partition_indexes(layout)
return
part_style = (self._disk_class.PARTITION_STYLE_GPT if gpt
else self._disk_class.PARTITION_STYLE_MBR)
layout.PartitionStyle = part_style
count = 8
layout.PartitionCount = count
if gpt:
expected = list(range(count))
else:
layout.PartitionEntry = [mock.Mock() for _ in range(count)]
layout.PartitionEntry[-1].Mbr.PartitionType = \
self._disk_class.PARTITION_ENTRY_UNUSED
expected = list(range(count - 1))
response = self._disk_class._get_partition_indexes(layout)
self.assertEqual(expected, response)
def test_get_partition_indexes_fail(self):
self._test_get_partition_indexes(fail=True)
def test_get_partition_indexes_gpt(self):
self._test_get_partition_indexes()
def test_get_partition_indexes_mbr(self):
self._test_get_partition_indexes(gpt=False)
@mock.patch("cloudbaseinit.utils.windows.disk"
".Partition")
@mock.patch("cloudbaseinit.utils.windows.disk"
".Disk._get_partition_indexes")
@mock.patch("cloudbaseinit.utils.windows.disk"
".Disk._get_layout")
def test_partitions(self, mock_get_layout, mock_get_partition_indexes,
mock_partition):
size = 512
layout = mock.MagicMock()
layout.PartitionEntry[0].PartitionLength = size
indexes = [0, 1, 2]
mock_get_layout.return_value = layout
mock_get_partition_indexes.return_value = indexes
self._disk_class._path = r"\\?\GLOBALROOT\Device\Harddisk0"
response = self._disk_class.partitions()
mock_get_layout.assert_called_once_with()
mock_get_partition_indexes.assert_called_once_with(layout)
paths = [r"\\?\GLOBALROOT\Device\Harddisk{}\Partition{}".format(
0, idx + 1) for idx in indexes]
calls = [mock.call(path, size) for path in paths]
mock_partition.assert_has_calls(calls)
expected = [mock_partition(path, size) for path in paths]
self.assertEqual(expected, response)
class TestPartition(unittest.TestCase):
def setUp(self):
_module_patcher = mock.patch.dict(
'sys.modules',
{'ctypes': mock.MagicMock(),
'winioctlcon': mock.MagicMock()})
_module_patcher.start()
self.addCleanup(_module_patcher.stop)
self.disk = importlib.import_module(
"cloudbaseinit.utils.windows.disk")
def test_size(self):
size = mock.sentinel.size
partition = self.disk.Partition(mock.Mock(), size)
self.assertEqual(size, partition.size)

View File

@ -1,263 +0,0 @@
# Copyright 2014 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import importlib
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit.tests import testutils
class WindowsVirtualDiskUtilsTests(testutils.CloudbaseInitTestBase):
def setUp(self):
self._ctypes_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules',
{'ctypes': self._ctypes_mock})
self._module_patcher.start()
self.virtual_disk = importlib.import_module(
"cloudbaseinit.utils.windows.virtual_disk")
self.fake_path = mock.sentinel.fake_path
self._vdisk_class = self.virtual_disk.VirtualDisk(path=self.fake_path)
self.virtual_disk.virtdisk = None
self.virtual_disk.kernel32 = mock.MagicMock()
def tearDown(self):
self._module_patcher.stop()
def test_load_virtdisk_dll(self):
self._vdisk_class._load_virtdisk_dll()
self.assertEqual(self._ctypes_mock.windll.virtdisk,
self.virtual_disk.virtdisk)
@mock.patch('cloudbaseinit.utils.windows.virtual_disk'
'.Win32_VIRTUAL_STORAGE_TYPE')
@mock.patch('cloudbaseinit.utils.windows.virtual_disk'
'.get_WIN32_VIRTUAL_STORAGE_TYPE_VENDOR_MICROSOFT')
@mock.patch('cloudbaseinit.utils.windows.virtual_disk'
'.VirtualDisk._load_virtdisk_dll')
@mock.patch('cloudbaseinit.utils.windows.virtual_disk'
'.VirtualDisk.close')
def _test_open(self, mock_close, mock_load_virtdisk_dll,
mock_get_virtual_storage_type_vendor,
mock_Win32_VIRTUAL_STORAGE_TYPE, handle, ret_val):
virtdisk = self._ctypes_mock.windll.virtdisk
virtdisk.OpenVirtualDisk.return_value = ret_val
self.virtual_disk.virtdisk = virtdisk
self._vdisk_class._handle = None
if handle:
self._vdisk_class._handle = handle
if ret_val:
with self.assert_raises_windows_message(
"Cannot open virtual disk: %r",
ret_val):
self._vdisk_class.open()
else:
self._vdisk_class.open()
if handle:
mock_close.assert_called_once_with()
mock_load_virtdisk_dll.assert_called_once_with()
mock_Win32_VIRTUAL_STORAGE_TYPE.assert_called_once_with()
mock_get_virtual_storage_type_vendor.assert_called_once_with()
self.assertEqual(
self._vdisk_class.VIRTUAL_STORAGE_TYPE_DEVICE_ISO,
mock_Win32_VIRTUAL_STORAGE_TYPE.return_value.DeviceId)
self.assertEqual(self._ctypes_mock.wintypes.HANDLE.return_value,
self._vdisk_class._handle)
def test_open(self):
self._test_open(handle=None, ret_val=None)
def test_open_exception(self):
self._test_open(handle=None, ret_val=100)
def test_open_handle_exists(self):
self._test_open(handle=None, ret_val=None)
def _test_attach(self, ret_val):
virtdisk = self._ctypes_mock.windll.virtdisk
self.virtual_disk.virtdisk = virtdisk
virtdisk.AttachVirtualDisk.return_value = ret_val
if ret_val:
with self.assert_raises_windows_message(
"Cannot attach virtual disk: %r",
ret_val):
self._vdisk_class.attach()
else:
self._vdisk_class.attach()
virtdisk.AttachVirtualDisk.assert_called_once_with(
self._vdisk_class._handle, 0,
self._vdisk_class.ATTACH_VIRTUAL_DISK_FLAG_READ_ONLY, 0, 0, 0)
def test_attach(self):
self._test_attach(ret_val=None)
def test_attach_exception(self):
self._test_attach(ret_val=100)
def _test_detach(self, ret_val):
virtdisk = self._ctypes_mock.windll.virtdisk
self.virtual_disk.virtdisk = virtdisk
virtdisk.DetachVirtualDisk.return_value = ret_val
if ret_val:
with self.assert_raises_windows_message(
"Cannot detach virtual disk: %r", ret_val):
self._vdisk_class.detach()
else:
self._vdisk_class.detach()
virtdisk.DetachVirtualDisk.assert_called_once_with(
self._vdisk_class._handle,
self._vdisk_class.DETACH_VIRTUAL_DISK_FLAG_NONE, 0)
def test_detach(self):
self._test_detach(ret_val=None)
def test_detach_exception(self):
self._test_detach(ret_val=100)
def _test_get_physical_path(self, ret_val):
virtdisk = self._ctypes_mock.windll.virtdisk
self.virtual_disk.virtdisk = virtdisk
virtdisk.GetVirtualDiskPhysicalPath.return_value = ret_val
buf = self._ctypes_mock.create_unicode_buffer.return_value
if ret_val:
with self.assert_raises_windows_message(
"Cannot get virtual disk physical path: %r", ret_val):
self._vdisk_class.get_physical_path()
else:
response = self._vdisk_class.get_physical_path()
self.assertEqual(buf.value, response)
self._ctypes_mock.create_unicode_buffer.assert_called_once_with(1024)
self._ctypes_mock.wintypes.DWORD.assert_called_once_with(
self._ctypes_mock.sizeof.return_value)
self._ctypes_mock.sizeof.assert_called_once_with(
buf)
virtdisk.GetVirtualDiskPhysicalPath.assert_called_once_with(
self._vdisk_class._handle,
self._ctypes_mock.byref.return_value,
self._ctypes_mock.create_unicode_buffer.return_value)
self._ctypes_mock.byref.assert_called_once_with(
self._ctypes_mock.wintypes.DWORD.return_value)
self._ctypes_mock.create_unicode_buffer.assert_called_once_with(1024)
def test_get_physical_path(self):
self._test_get_physical_path(ret_val=None)
def test_get_physical_path_fails(self):
self._test_get_physical_path(ret_val=100)
@mock.patch('cloudbaseinit.utils.windows.virtual_disk'
'.VirtualDisk.get_physical_path')
def _test_get_cdrom_drive_mount_point(self, mock_get_physical_path,
buf_len, ret_val, last_error=None):
buf = self._ctypes_mock.create_unicode_buffer.return_value
kernel32 = self.virtual_disk.kernel32
kernel32.GetLogicalDriveStringsW.return_value = buf_len
kernel32.QueryDosDeviceW.return_value = ret_val
self._ctypes_mock.wstring_at.return_value = [mock.sentinel.value1,
mock.sentinel.value2]
dev = self._ctypes_mock.create_unicode_buffer.return_value
dev.value = mock_get_physical_path.return_value
self._ctypes_mock.sizeof.return_value = 1
expected_sizeof = [mock.call(buf),
mock.call(self._ctypes_mock.wintypes.WCHAR)]
expected_create_unicode_buffer = [mock.call(2048)]
if not buf_len:
with self.assert_raises_windows_message(
"Cannot enumerate logical devices: %r", last_error):
self._vdisk_class.get_cdrom_drive_mount_point()
elif not ret_val:
with self.assert_raises_windows_message(
"Cannot query NT device: %r", last_error):
self._vdisk_class.get_cdrom_drive_mount_point()
expected_create_unicode_buffer.append(mock.call(2048))
expected_sizeof.append(mock.call(self._ctypes_mock.wintypes.WCHAR))
expected_sizeof.append(
mock.call(
self._ctypes_mock.create_unicode_buffer.return_value))
expected_sizeof.append(mock.call(self._ctypes_mock.wintypes.WCHAR))
else:
response = self._vdisk_class.get_cdrom_drive_mount_point()
mock_get_physical_path.assert_called_once_with()
self._ctypes_mock.wstring_at.assert_called_once_with(
self._ctypes_mock.addressof.return_value + 0 * 1)
self._ctypes_mock.addressof.assert_called_once_with(buf)
kernel32.QueryDosDeviceW.assert_called_once_with(
[mock.sentinel.value1],
self._ctypes_mock.create_unicode_buffer.return_value, 1)
expected_sizeof.append(mock.call(self._ctypes_mock.wintypes.WCHAR))
expected_sizeof.append(
mock.call(
self._ctypes_mock.create_unicode_buffer.return_value))
expected_sizeof.append(mock.call(self._ctypes_mock.wintypes.WCHAR))
expected_create_unicode_buffer.append(mock.call(2048))
self.assertEqual(self._ctypes_mock.wstring_at.return_value[:-1],
response)
self.assertEqual(
expected_create_unicode_buffer,
self._ctypes_mock.create_unicode_buffer.call_args_list)
self.assertEqual(expected_sizeof,
self._ctypes_mock.sizeof.call_args_list)
kernel32.GetLogicalDriveStringsW.assert_called_once_with(1, buf)
def test_get_cdrom_drive_mount_point_exception_buf_len(self):
self._test_get_cdrom_drive_mount_point(buf_len=0, ret_val=1,
last_error=100)
def test_get_cdrom_drive_mount_point_exception_query(self):
self._test_get_cdrom_drive_mount_point(buf_len=1, ret_val=0,
last_error=100)
def test_get_cdrom_drive_mount_point(self):
self._test_get_cdrom_drive_mount_point(buf_len=1, ret_val=1)
def test_close(self):
self._vdisk_class.close()
self.virtual_disk.kernel32.CloseHandle.assert_called_once_with(
self._vdisk_class._handle)
self.assertEqual(0, self._vdisk_class._handle)

View File

@ -12,17 +12,24 @@
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import abc
import ctypes
from ctypes import windll
from ctypes import wintypes
import re
import six
import winioctlcon
from cloudbaseinit import exception
kernel32 = windll.kernel32
class Win32_DiskGeometry(ctypes.Structure):
FixedMedia = 12
_fields_ = [
@ -30,29 +37,193 @@ class Win32_DiskGeometry(ctypes.Structure):
('MediaType', wintypes.DWORD),
('TracksPerCylinder', wintypes.DWORD),
('SectorsPerTrack', wintypes.DWORD),
('BytesPerSector', wintypes.DWORD),
('BytesPerSector', wintypes.DWORD)
]
class PhysicalDisk(object):
class Win32_DRIVE_LAYOUT_INFORMATION_MBR(ctypes.Structure):
_fields_ = [
('Signature', wintypes.ULONG)
]
class GUID(ctypes.Structure):
_fields_ = [
("data1", wintypes.DWORD),
("data2", wintypes.WORD),
("data3", wintypes.WORD),
("data4", wintypes.BYTE * 8)
]
def __init__(self, l, w1, w2, b1, b2, b3, b4, b5, b6, b7, b8):
self.data1 = l
self.data2 = w1
self.data3 = w2
self.data4[0] = b1
self.data4[1] = b2
self.data4[2] = b3
self.data4[3] = b4
self.data4[4] = b5
self.data4[5] = b6
self.data4[6] = b7
self.data4[7] = b8
class Win32_DRIVE_LAYOUT_INFORMATION_GPT(ctypes.Structure):
_fields_ = [
('DiskId', GUID),
('StartingUsableOffset', wintypes.LARGE_INTEGER),
('UsableLength', wintypes.LARGE_INTEGER),
('MaxPartitionCount', wintypes.ULONG)
]
class DRIVE_FORMAT(ctypes.Union):
_fields_ = [
('Mbr', Win32_DRIVE_LAYOUT_INFORMATION_MBR),
('Gpt', Win32_DRIVE_LAYOUT_INFORMATION_GPT)
]
class Win32_PARTITION_INFORMATION_MBR(ctypes.Structure):
_fields_ = [
('PartitionType', wintypes.BYTE),
('BootIndicator', wintypes.BOOLEAN),
('RecognizedPartition', wintypes.BOOLEAN),
('HiddenSectors', wintypes.DWORD)
]
class Win32_PARTITION_INFORMATION_GPT(ctypes.Structure):
_fields_ = [
('PartitionType', GUID),
('PartitionId', GUID),
('Attributes', wintypes.ULARGE_INTEGER),
('Name', wintypes.WCHAR * 36)
]
class PARTITION_INFORMATION(ctypes.Union):
_fields_ = [
('Mbr', Win32_PARTITION_INFORMATION_MBR),
('Gpt', Win32_PARTITION_INFORMATION_GPT)
]
class Win32_PARTITION_INFORMATION_EX(ctypes.Structure):
_anonymous_ = ('PartitionInformation',)
_fields_ = [
('PartitionStyle', wintypes.DWORD),
('StartingOffset', wintypes.LARGE_INTEGER),
('PartitionLength', wintypes.LARGE_INTEGER),
('PartitionNumber', wintypes.DWORD),
('RewritePartition', wintypes.BOOLEAN),
('PartitionInformation', PARTITION_INFORMATION)
]
class Win32_DRIVE_LAYOUT_INFORMATION_EX(ctypes.Structure):
_anonymous_ = ('DriveFormat',)
_fields_ = [
('PartitionStyle', wintypes.DWORD),
('PartitionCount', wintypes.DWORD),
('DriveFormat', DRIVE_FORMAT),
('PartitionEntry', Win32_PARTITION_INFORMATION_EX * 128)
]
@six.add_metaclass(abc.ABCMeta)
class BaseDevice(object):
"""Base class for devices like disks and partitions.
It has common methods for getting physical disk geometry,
opening/closing the device and also seeking through it
for reading certain amounts of bytes.
"""
GENERIC_READ = 0x80000000
FILE_SHARE_READ = 1
OPEN_EXISTING = 3
FILE_ATTRIBUTE_READONLY = 1
INVALID_HANDLE_VALUE = -1
IOCTL_DISK_GET_DRIVE_GEOMETRY = 0x70000
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
FILE_BEGIN = 0
INVALID_SET_FILE_POINTER = 0xFFFFFFFF
def __init__(self, path):
self._path = path
self._handle = 0
self._geom = None
self._handle = None
self._sector_size = None
self._disk_size = None
self.fixed = None
def __repr__(self):
return "<{}: {}>".format(self.__class__.__name__, self._path)
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def _get_geometry(self):
"""Get details about the disk size bounds."""
geom = Win32_DiskGeometry()
bytes_returned = wintypes.DWORD()
ret_val = kernel32.DeviceIoControl(
self._handle,
winioctlcon.IOCTL_DISK_GET_DRIVE_GEOMETRY,
0,
0,
ctypes.byref(geom),
ctypes.sizeof(geom),
ctypes.byref(bytes_returned),
0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot get disk geometry: %r")
_sector_size = geom.BytesPerSector
_disk_size = (geom.Cylinders * geom.TracksPerCylinder *
geom.SectorsPerTrack * geom.BytesPerSector)
fixed = geom.MediaType == Win32_DiskGeometry.FixedMedia
return _sector_size, _disk_size, fixed
def _seek(self, offset):
high = wintypes.DWORD(offset >> 32)
low = wintypes.DWORD(offset & 0xFFFFFFFF)
ret_val = kernel32.SetFilePointer(self._handle, low,
ctypes.byref(high),
self.FILE_BEGIN)
if ret_val == self.INVALID_SET_FILE_POINTER:
raise exception.WindowsCloudbaseInitException(
"Seek error: %r")
def _read(self, size):
buff = ctypes.create_string_buffer(size)
bytes_read = wintypes.DWORD()
ret_val = kernel32.ReadFile(self._handle, buff, size,
ctypes.byref(bytes_read), 0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Read exception: %r")
return buff.raw[:bytes_read.value] # all bytes without the null byte
def open(self):
if self._handle:
self.close()
handle = kernel32.CreateFileW(
ctypes.c_wchar_p(self._path),
self.GENERIC_READ,
@ -62,49 +233,119 @@ class PhysicalDisk(object):
self.FILE_ATTRIBUTE_READONLY,
0)
if handle == self.INVALID_HANDLE_VALUE:
raise exception.CloudbaseInitException('Cannot open file')
raise exception.WindowsCloudbaseInitException(
'Cannot open file: %r')
self._handle = handle
self._sector_size, self._disk_size, self.fixed =\
self._get_geometry()
def close(self):
kernel32.CloseHandle(self._handle)
self._handle = 0
self._geom = None
def get_geometry(self):
if not self._geom:
geom = Win32_DiskGeometry()
bytes_returned = wintypes.DWORD()
ret_val = kernel32.DeviceIoControl(
self._handle,
self.IOCTL_DISK_GET_DRIVE_GEOMETRY,
0,
0,
ctypes.byref(geom),
ctypes.sizeof(geom),
ctypes.byref(bytes_returned),
0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot get disk geometry: %r")
self._geom = geom
return self._geom
if self._handle:
kernel32.CloseHandle(self._handle)
self._handle = None
def seek(self, offset):
high = wintypes.DWORD(offset >> 32)
low = wintypes.DWORD(offset & 0xFFFFFFFF)
"""Drive geometry safe seek.
ret_val = kernel32.SetFilePointer(self._handle, low,
ctypes.byref(high),
self.FILE_BEGIN)
if ret_val == self.INVALID_SET_FILE_POINTER:
raise exception.CloudbaseInitException("Seek error")
Seek for a given offset and return the valid set one.
"""
safe_offset = int(offset / self._sector_size) * self._sector_size
self._seek(safe_offset)
return safe_offset
def read(self, size, skip=0):
"""Drive geometry safe read.
Read and extract exactly the requested content.
"""
# Compute a size to fit both of the bytes we need to skip and
# also the minimum read size.
total = size + skip
safe_size = ((int(total / self._sector_size) +
bool(total % self._sector_size)) * self._sector_size)
content = self._read(safe_size)
return content[skip:total]
@abc.abstractmethod
def size(self):
"""Returns the size in bytes of the actual opened device."""
class Disk(BaseDevice):
"""Disk class with seek/read support.
It also has the capability of obtaining partition objects.
"""
PARTITION_ENTRY_UNUSED = 0
PARTITION_STYLE_MBR = 0
PARTITION_STYLE_GPT = 1
def _get_layout(self):
layout = Win32_DRIVE_LAYOUT_INFORMATION_EX()
bytes_returned = wintypes.DWORD()
ret_val = kernel32.DeviceIoControl(
self._handle,
winioctlcon.IOCTL_DISK_GET_DRIVE_LAYOUT_EX,
0,
0,
ctypes.byref(layout),
ctypes.sizeof(layout),
ctypes.byref(bytes_returned),
0)
def read(self, bytes_to_read):
buf = ctypes.create_string_buffer(bytes_to_read)
bytes_read = wintypes.DWORD()
ret_val = kernel32.ReadFile(self._handle, buf, bytes_to_read,
ctypes.byref(bytes_read), 0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Read exception: %r")
return buf, bytes_read.value
"Cannot get disk layout: %r")
return layout
def _get_partition_indexes(self, layout):
partition_style = layout.PartitionStyle
if partition_style not in (self.PARTITION_STYLE_MBR,
self.PARTITION_STYLE_GPT):
raise exception.CloudbaseInitException(
"Invalid partition style %r" % partition_style)
# If is GPT, then the count reflects the actual number of partitions
# but if is MBR, then the number of partitions is a multiple of 4
# and just the indexes for the used partitions must be saved.
partition_indexes = []
if partition_style == self.PARTITION_STYLE_GPT:
partition_indexes.extend(range(layout.PartitionCount))
else:
for idx in range(layout.PartitionCount):
if (layout.PartitionEntry[idx].Mbr.PartitionType !=
self.PARTITION_ENTRY_UNUSED):
partition_indexes.append(idx)
return partition_indexes
def partitions(self):
"""Return a list of partition objects available on disk."""
layout = self._get_layout()
partition_indexes = self._get_partition_indexes(layout)
# Create and return the partition objects containing their sizes.
partitions = []
disk_index = re.search(r"(disk|drive)(\d+)", self._path,
re.I | re.M).group(2)
for partition_index in partition_indexes:
path = r'\\?\GLOBALROOT\Device\Harddisk{}\Partition{}'.format(
disk_index, partition_index + 1)
size = layout.PartitionEntry[partition_index].PartitionLength
partition = Partition(path, size)
partitions.append(partition)
return partitions
@property
def size(self):
return self._disk_size
class Partition(BaseDevice):
"""Partition class with seek/read support."""
def __init__(self, path, size):
super(Partition, self).__init__(path)
self._partition_size = size
@property
def size(self):
return self._partition_size

View File

@ -1,151 +0,0 @@
# Copyright 2012 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
from ctypes import windll
from ctypes import wintypes
from cloudbaseinit import exception
kernel32 = windll.kernel32
# VirtDisk.dll is available starting from Windows Server 2008 R2 / Windows7
virtdisk = None
class Win32_GUID(ctypes.Structure):
_fields_ = [("Data1", wintypes.DWORD),
("Data2", wintypes.WORD),
("Data3", wintypes.WORD),
("Data4", wintypes.BYTE * 8)]
def get_WIN32_VIRTUAL_STORAGE_TYPE_VENDOR_MICROSOFT():
guid = Win32_GUID()
guid.Data1 = 0xec984aec
guid.Data2 = 0xa0f9
guid.Data3 = 0x47e9
ByteArray8 = wintypes.BYTE * 8
guid.Data4 = ByteArray8(0x90, 0x1f, 0x71, 0x41, 0x5a, 0x66, 0x34, 0x5b)
return guid
class Win32_VIRTUAL_STORAGE_TYPE(ctypes.Structure):
_fields_ = [
('DeviceId', wintypes.DWORD),
('VendorId', Win32_GUID)
]
class VirtualDisk(object):
VIRTUAL_STORAGE_TYPE_DEVICE_ISO = 1
VIRTUAL_DISK_ACCESS_ATTACH_RO = 0x10000
VIRTUAL_DISK_ACCESS_READ = 0xd0000
OPEN_VIRTUAL_DISK_FLAG_NONE = 0
DETACH_VIRTUAL_DISK_FLAG_NONE = 0
ATTACH_VIRTUAL_DISK_FLAG_READ_ONLY = 1
ATTACH_VIRTUAL_DISK_FLAG_NO_DRIVE_LETTER = 2
def __init__(self, path):
self._path = path
self._handle = 0
def _load_virtdisk_dll(self):
global virtdisk
if not virtdisk:
virtdisk = windll.virtdisk
def open(self):
if self._handle:
self.close()
self._load_virtdisk_dll()
vst = Win32_VIRTUAL_STORAGE_TYPE()
vst.DeviceId = self.VIRTUAL_STORAGE_TYPE_DEVICE_ISO
vst.VendorId = get_WIN32_VIRTUAL_STORAGE_TYPE_VENDOR_MICROSOFT()
handle = wintypes.HANDLE()
ret_val = virtdisk.OpenVirtualDisk(ctypes.byref(vst),
ctypes.c_wchar_p(self._path),
self.VIRTUAL_DISK_ACCESS_ATTACH_RO |
self.VIRTUAL_DISK_ACCESS_READ,
self.OPEN_VIRTUAL_DISK_FLAG_NONE, 0,
ctypes.byref(handle))
if ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot open virtual disk: %r", ret_val)
self._handle = handle
def attach(self):
ret_val = virtdisk.AttachVirtualDisk(
self._handle, 0, self.ATTACH_VIRTUAL_DISK_FLAG_READ_ONLY, 0, 0, 0)
if ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot attach virtual disk: %r", ret_val)
def detach(self):
ret_val = virtdisk.DetachVirtualDisk(
self._handle, self.DETACH_VIRTUAL_DISK_FLAG_NONE, 0)
if ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot detach virtual disk: %r", ret_val)
def get_physical_path(self):
buf = ctypes.create_unicode_buffer(1024)
bufLen = wintypes.DWORD(ctypes.sizeof(buf))
ret_val = virtdisk.GetVirtualDiskPhysicalPath(self._handle,
ctypes.byref(bufLen),
buf)
if ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot get virtual disk physical path: %r", ret_val)
return buf.value
def get_cdrom_drive_mount_point(self):
mount_point = None
buf = ctypes.create_unicode_buffer(2048)
buf_len = kernel32.GetLogicalDriveStringsW(
ctypes.sizeof(buf) / ctypes.sizeof(wintypes.WCHAR), buf)
if not buf_len:
raise exception.WindowsCloudbaseInitException(
"Cannot enumerate logical devices: %r")
cdrom_dev = self.get_physical_path().rsplit('\\')[-1].upper()
i = 0
while not mount_point and i < buf_len:
curr_drive = ctypes.wstring_at(ctypes.addressof(buf) + i *
ctypes.sizeof(wintypes.WCHAR))[:-1]
dev = ctypes.create_unicode_buffer(2048)
ret_val = kernel32.QueryDosDeviceW(curr_drive, dev,
ctypes.sizeof(dev) /
ctypes.sizeof(wintypes.WCHAR))
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot query NT device: %r")
if dev.value.rsplit('\\')[-1].upper() == cdrom_dev:
mount_point = curr_drive
else:
i += len(curr_drive) + 2
return mount_point
def close(self):
kernel32.CloseHandle(self._handle)
self._handle = 0