Merge "Ironic config drive support"

This commit is contained in:
Jenkins 2015-10-14 20:56:24 +00:00 committed by Gerrit Code Review
commit 80595d0143
11 changed files with 1305 additions and 1130 deletions

View File

@ -14,23 +14,47 @@
import os
import shutil
import tempfile
import uuid
from oslo_config import cfg
from oslo_log import log as oslo_logging
from cloudbaseinit import exception
from cloudbaseinit.metadata.services import base
from cloudbaseinit.metadata.services import baseopenstackservice
from cloudbaseinit.metadata.services.osconfigdrive import factory
# Config Drive types and possible locations.
CD_TYPES = {
"vfat", # Visible device (with partition table).
"iso", # "Raw" format containing ISO bytes.
}
CD_LOCATIONS = {
# Look into optical units devices. Only an ISO format could
# be used here (vfat ignored).
"cdrom",
# Search through physical disks for raw ISO content or vfat filesystems
# containing configuration drive's content.
"hdd",
# Search through partitions for raw ISO content or through volumes
# containing configuration drive's content.
"partition",
}
opts = [
cfg.BoolOpt('config_drive_raw_hhd', default=True,
help='Look for an ISO config drive in raw HDDs'),
help='Look for an ISO config drive in raw HDDs',
deprecated_for_removal=True),
cfg.BoolOpt('config_drive_cdrom', default=True,
help='Look for a config drive in the attached cdrom drives'),
help='Look for a config drive in the attached cdrom drives',
deprecated_for_removal=True),
cfg.BoolOpt('config_drive_vfat', default=True,
help='Look for a config drive in VFAT filesystems.'),
help='Look for a config drive in VFAT filesystems',
deprecated_for_removal=True),
cfg.ListOpt('config_drive_types', default=list(CD_TYPES),
help='Supported formats of a configuration drive'),
cfg.ListOpt('config_drive_locations', default=list(CD_LOCATIONS),
help='Supported configuration drive locations'),
]
CONF = cfg.CONF
@ -45,33 +69,52 @@ class ConfigDriveService(baseopenstackservice.BaseOpenStackService):
super(ConfigDriveService, self).__init__()
self._metadata_path = None
def _preprocess_options(self):
self._searched_types = set(CONF.config_drive_types)
self._searched_locations = set(CONF.config_drive_locations)
# Deprecation backward compatibility.
if CONF.config_drive_raw_hhd:
self._searched_types.add("iso")
self._searched_locations.add("hdd")
if CONF.config_drive_cdrom:
self._searched_types.add("iso")
self._searched_locations.add("cdrom")
if CONF.config_drive_vfat:
self._searched_types.add("vfat")
self._searched_locations.add("hdd")
# Check for invalid option values.
if self._searched_types | CD_TYPES != CD_TYPES:
raise exception.CloudbaseInitException(
"Invalid Config Drive types %s", self._searched_types)
if self._searched_locations | CD_LOCATIONS != CD_LOCATIONS:
raise exception.CloudbaseInitException(
"Invalid Config Drive locations %s", self._searched_locations)
def load(self):
super(ConfigDriveService, self).load()
target_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
self._preprocess_options()
self._mgr = factory.get_config_drive_manager()
found = self._mgr.get_config_drive_files(
searched_types=self._searched_types,
searched_locations=self._searched_locations)
mgr = factory.get_config_drive_manager()
found = mgr.get_config_drive_files(
target_path,
check_raw_hhd=CONF.config_drive_raw_hhd,
check_cdrom=CONF.config_drive_cdrom,
check_vfat=CONF.config_drive_vfat)
if found:
self._metadata_path = target_path
LOG.debug('Metadata copied to folder: \'%s\'' %
self._metadata_path)
self._metadata_path = self._mgr.target_path
LOG.debug('Metadata copied to folder: %r', self._metadata_path)
return found
def _get_data(self, path):
norm_path = os.path.normpath(os.path.join(self._metadata_path, path))
try:
with open(norm_path, 'rb') as f:
return f.read()
with open(norm_path, 'rb') as stream:
return stream.read()
except IOError:
raise base.NotExistingMetadataException()
def cleanup(self):
if self._metadata_path:
LOG.debug('Deleting metadata folder: \'%s\'' % self._metadata_path)
shutil.rmtree(self._metadata_path, ignore_errors=True)
self._metadata_path = None
LOG.debug('Deleting metadata folder: %r', self._mgr.target_path)
shutil.rmtree(self._mgr.target_path, ignore_errors=True)
self._metadata_path = None

View File

@ -13,6 +13,7 @@
# under the License.
import abc
import tempfile
import six
@ -20,7 +21,9 @@ import six
@six.add_metaclass(abc.ABCMeta)
class BaseConfigDriveManager(object):
def __init__(self):
self.target_path = tempfile.mkdtemp()
@abc.abstractmethod
def get_config_drive_files(self, target_path, check_raw_hhd=True,
check_cdrom=True, check_vfat=True):
def get_config_drive_files(self, check_types=None, check_locations=None):
pass

View File

@ -12,13 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import itertools
import os
import shutil
import struct
import tempfile
import uuid
from ctypes import wintypes
from oslo_config import cfg
from oslo_log import log as oslo_logging
@ -28,6 +29,7 @@ from cloudbaseinit.osutils import factory as osutils_factory
from cloudbaseinit.utils.windows import disk
from cloudbaseinit.utils.windows import vfat
opts = [
cfg.StrOpt('bsdtar_path', default='bsdtar.exe',
help='Path to "bsdtar", used to extract ISO ConfigDrive '
@ -39,83 +41,73 @@ CONF.register_opts(opts)
LOG = oslo_logging.getLogger(__name__)
CONFIG_DRIVE_LABEL = 'config-2'
MAX_SECTOR_SIZE = 4096
# Absolute offset values and the ISO magic string.
OFFSET_BOOT_RECORD = 0x8000
OFFSET_ISO_ID = OFFSET_BOOT_RECORD + 1
ISO_ID = b'CD001'
# Little-endian unsigned short size values.
OFFSET_VOLUME_SIZE = OFFSET_BOOT_RECORD + 80
OFFSET_BLOCK_SIZE = OFFSET_BOOT_RECORD + 128
PEEK_SIZE = 2
class WindowsConfigDriveManager(base.BaseConfigDriveManager):
def _get_config_drive_cdrom_mount_point(self):
osutils = osutils_factory.get_os_utils()
def __init__(self):
super(WindowsConfigDriveManager, self).__init__()
self._osutils = osutils_factory.get_os_utils()
for drive in osutils.get_cdrom_drives():
label = osutils.get_volume_label(drive)
if label == "config-2" and \
os.path.exists(os.path.join(drive,
'openstack\\latest\\'
'meta_data.json')):
return drive
return None
def _check_for_config_drive(self, drive):
label = self._osutils.get_volume_label(drive)
if label and label.lower() == CONFIG_DRIVE_LABEL and \
os.path.exists(os.path.join(drive,
'openstack\\latest\\'
'meta_data.json')):
LOG.info('Config Drive found on %s', drive)
return True
return False
def _c_char_array_to_c_ushort(self, buf, offset):
low = ctypes.cast(buf[offset],
ctypes.POINTER(wintypes.WORD)).contents
high = ctypes.cast(buf[offset + 1],
ctypes.POINTER(wintypes.WORD)).contents
return (high.value << 8) + low.value
def _get_iso_disk_size(self, phys_disk):
geom = phys_disk.get_geometry()
if geom.MediaType != disk.Win32_DiskGeometry.FixedMedia:
def _get_iso_file_size(self, device):
if not device.fixed:
return None
disk_size = geom.Cylinders * geom.TracksPerCylinder * \
geom.SectorsPerTrack * geom.BytesPerSector
boot_record_off = 0x8000
id_off = 1
volume_size_off = 80
block_size_off = 128
iso_id = b'CD001'
offset = boot_record_off // geom.BytesPerSector * geom.BytesPerSector
bytes_to_read = geom.BytesPerSector
if disk_size <= offset + bytes_to_read:
if not device.size > (OFFSET_BLOCK_SIZE + PEEK_SIZE):
return None
phys_disk.seek(offset)
(buf, bytes_read) = phys_disk.read(bytes_to_read)
buf_off = boot_record_off - offset + id_off
if iso_id != buf[buf_off: buf_off + len(iso_id)]:
off = device.seek(OFFSET_ISO_ID)
magic = device.read(len(ISO_ID), skip=OFFSET_ISO_ID - off)
if ISO_ID != magic:
return None
buf_off = boot_record_off - offset + volume_size_off
num_blocks = self._c_char_array_to_c_ushort(buf, buf_off)
off = device.seek(OFFSET_VOLUME_SIZE)
volume_size_bytes = device.read(PEEK_SIZE,
skip=OFFSET_VOLUME_SIZE - off)
off = device.seek(OFFSET_BLOCK_SIZE)
block_size_bytes = device.read(PEEK_SIZE,
skip=OFFSET_BLOCK_SIZE - off)
volume_size = struct.unpack("<H", volume_size_bytes)[0]
block_size = struct.unpack("<H", block_size_bytes)[0]
buf_off = boot_record_off - offset + block_size_off
block_size = self._c_char_array_to_c_ushort(buf, buf_off)
return volume_size * block_size
return num_blocks * block_size
def _write_iso_file(self, phys_disk, path, iso_file_size):
with open(path, 'wb') as f:
geom = phys_disk.get_geometry()
def _write_iso_file(self, device, iso_file_path, iso_file_size):
with open(iso_file_path, 'wb') as stream:
offset = 0
# Get a multiple of the sector byte size
bytes_to_read = 4096 // geom.BytesPerSector * geom.BytesPerSector
# Read multiples of the sector size bytes
# until the entire ISO content is written.
while offset < iso_file_size:
phys_disk.seek(offset)
bytes_to_read = min(bytes_to_read, iso_file_size - offset)
(buf, bytes_read) = phys_disk.read(bytes_to_read)
f.write(buf)
offset += bytes_read
real_offset = device.seek(offset)
bytes_to_read = min(MAX_SECTOR_SIZE, iso_file_size - offset)
data = device.read(bytes_to_read, skip=offset - real_offset)
stream.write(data)
offset += bytes_to_read
def _extract_iso_files(self, osutils, iso_file_path, target_path):
os.makedirs(target_path)
args = [CONF.bsdtar_path, '-xf', iso_file_path, '-C', target_path]
(out, err, exit_code) = osutils.execute_process(args, False)
def _extract_files_from_iso(self, iso_file_path):
args = [CONF.bsdtar_path, '-xf', iso_file_path,
'-C', self.target_path]
(out, err, exit_code) = self._osutils.execute_process(args, False)
if exit_code:
raise exception.CloudbaseInitException(
@ -125,72 +117,104 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager):
'exit_code': exit_code,
'out': out, 'err': err})
def _extract_iso_disk_file(self, osutils, iso_file_path):
iso_disk_found = False
for path in osutils.get_physical_disks():
phys_disk = disk.PhysicalDisk(path)
try:
phys_disk.open()
iso_file_size = self._get_iso_disk_size(phys_disk)
if iso_file_size:
LOG.debug('ISO9660 disk found on raw HDD: %s' % path)
self._write_iso_file(phys_disk, iso_file_path,
iso_file_size)
iso_disk_found = True
break
except Exception:
# Ignore exception
pass
finally:
phys_disk.close()
return iso_disk_found
def _get_conf_drive_from_vfat(self, target_path):
osutils = osutils_factory.get_os_utils()
for drive_path in osutils.get_physical_disks():
if vfat.is_vfat_drive(osutils, drive_path):
LOG.info('Config Drive found on disk %r', drive_path)
os.makedirs(target_path)
vfat.copy_from_vfat_drive(osutils, drive_path, target_path)
return True
def get_config_drive_files(self, target_path, check_raw_hhd=True,
check_cdrom=True, check_vfat=True):
config_drive_found = False
if check_vfat:
LOG.debug('Looking for Config Drive in VFAT filesystems')
config_drive_found = self._get_conf_drive_from_vfat(target_path)
if not config_drive_found and check_raw_hhd:
LOG.debug('Looking for Config Drive in raw HDDs')
config_drive_found = self._get_conf_drive_from_raw_hdd(
target_path)
if not config_drive_found and check_cdrom:
LOG.debug('Looking for Config Drive in cdrom drives')
config_drive_found = self._get_conf_drive_from_cdrom_drive(
target_path)
return config_drive_found
def _get_conf_drive_from_cdrom_drive(self, target_path):
cdrom_mount_point = self._get_config_drive_cdrom_mount_point()
if cdrom_mount_point:
shutil.copytree(cdrom_mount_point, target_path)
return True
return False
def _get_conf_drive_from_raw_hdd(self, target_path):
config_drive_found = False
def _extract_iso_from_devices(self, devices):
"""Search across multiple devices for a raw ISO."""
extracted = False
iso_file_path = os.path.join(tempfile.gettempdir(),
str(uuid.uuid4()) + '.iso')
try:
osutils = osutils_factory.get_os_utils()
if self._extract_iso_disk_file(osutils, iso_file_path):
self._extract_iso_files(osutils, iso_file_path, target_path)
config_drive_found = True
finally:
if os.path.exists(iso_file_path):
os.remove(iso_file_path)
return config_drive_found
for device in devices:
try:
with device:
iso_file_size = self._get_iso_file_size(device)
if iso_file_size:
LOG.info('ISO9660 disk found on %s', device)
self._write_iso_file(device, iso_file_path,
iso_file_size)
self._extract_files_from_iso(iso_file_path)
extracted = True
break
except Exception as exc:
LOG.warning('ISO extraction failed on %(device)s with '
'%(error)r', {"device": device, "error": exc})
if os.path.isfile(iso_file_path):
os.remove(iso_file_path)
return extracted
def _get_config_drive_from_cdrom_drive(self):
for drive_letter in self._osutils.get_cdrom_drives():
if self._check_for_config_drive(drive_letter):
os.rmdir(self.target_path)
shutil.copytree(drive_letter, self.target_path)
return True
return False
def _get_config_drive_from_raw_hdd(self):
disks = map(disk.Disk, self._osutils.get_physical_disks())
return self._extract_iso_from_devices(disks)
def _get_config_drive_from_vfat(self):
for drive_path in self._osutils.get_physical_disks():
if vfat.is_vfat_drive(self._osutils, drive_path):
LOG.info('Config Drive found on disk %r', drive_path)
vfat.copy_from_vfat_drive(self._osutils, drive_path,
self.target_path)
return True
return False
def _get_config_drive_from_partition(self):
for disk_path in self._osutils.get_physical_disks():
physical_drive = disk.Disk(disk_path)
with physical_drive:
partitions = physical_drive.partitions()
extracted = self._extract_iso_from_devices(partitions)
if extracted:
return True
return False
def _get_config_drive_from_volume(self):
"""Look through all the volumes for config drive."""
volumes = self._osutils.get_volumes()
for volume in volumes:
if self._check_for_config_drive(volume):
os.rmdir(self.target_path)
shutil.copytree(volume, self.target_path)
return True
return False
def _get_config_drive_files(self, cd_type, cd_location):
get_config_drive = self.config_drive_type_location.get(
"{}_{}".format(cd_location, cd_type))
if get_config_drive:
return get_config_drive()
else:
LOG.debug("Irrelevant type %(type)s in %(location)s location; "
"skip",
{"type": cd_type, "location": cd_location})
return False
def get_config_drive_files(self, searched_types=None,
searched_locations=None):
searched_types = searched_types or []
searched_locations = searched_locations or []
for cd_type, cd_location in itertools.product(searched_types,
searched_locations):
LOG.debug('Looking for Config Drive %(type)s in %(location)s',
{"type": cd_type, "location": cd_location})
if self._get_config_drive_files(cd_type, cd_location):
return True
return False
@property
def config_drive_type_location(self):
return {
"cdrom_iso": self._get_config_drive_from_cdrom_drive,
"hdd_iso": self._get_config_drive_from_raw_hdd,
"hdd_vfat": self._get_config_drive_from_vfat,
"partition_iso": self._get_config_drive_from_partition,
"partition_vfat": self._get_config_drive_from_volume,
}

View File

@ -33,6 +33,7 @@ import wmi
from cloudbaseinit import exception
from cloudbaseinit.osutils import base
from cloudbaseinit.utils.windows import disk
from cloudbaseinit.utils.windows import network
from cloudbaseinit.utils.windows import privilege
from cloudbaseinit.utils.windows import timezone
@ -116,31 +117,10 @@ class Win32_OSVERSIONINFOEX_W(ctypes.Structure):
]
class GUID(ctypes.Structure):
_fields_ = [
("data1", ctypes.wintypes.DWORD),
("data2", ctypes.wintypes.WORD),
("data3", ctypes.wintypes.WORD),
("data4", ctypes.c_byte * 8)]
def __init__(self, l, w1, w2, b1, b2, b3, b4, b5, b6, b7, b8):
self.data1 = l
self.data2 = w1
self.data3 = w2
self.data4[0] = b1
self.data4[1] = b2
self.data4[2] = b3
self.data4[3] = b4
self.data4[4] = b5
self.data4[5] = b6
self.data4[6] = b7
self.data4[7] = b8
class Win32_SP_DEVICE_INTERFACE_DATA(ctypes.Structure):
_fields_ = [
('cbSize', wintypes.DWORD),
('InterfaceClassGuid', GUID),
('InterfaceClassGuid', disk.GUID),
('Flags', wintypes.DWORD),
('Reserved', ctypes.POINTER(wintypes.ULONG))
]
@ -218,7 +198,7 @@ iphlpapi.GetIpForwardTable.restype = wintypes.DWORD
Ws2_32.inet_ntoa.restype = ctypes.c_char_p
setupapi.SetupDiGetClassDevsW.argtypes = [ctypes.POINTER(GUID),
setupapi.SetupDiGetClassDevsW.argtypes = [ctypes.POINTER(disk.GUID),
wintypes.LPCWSTR,
wintypes.HANDLE,
wintypes.DWORD]
@ -227,7 +207,7 @@ setupapi.SetupDiGetClassDevsW.restype = wintypes.HANDLE
setupapi.SetupDiEnumDeviceInterfaces.argtypes = [
wintypes.HANDLE,
wintypes.LPVOID,
ctypes.POINTER(GUID),
ctypes.POINTER(disk.GUID),
wintypes.DWORD,
ctypes.POINTER(Win32_SP_DEVICE_INTERFACE_DATA)]
setupapi.SetupDiEnumDeviceInterfaces.restype = wintypes.BOOL
@ -250,8 +230,8 @@ VER_BUILDNUMBER = 4
VER_GREATER_EQUAL = 3
GUID_DEVINTERFACE_DISK = GUID(0x53f56307, 0xb6bf, 0x11d0, 0x94, 0xf2,
0x00, 0xa0, 0xc9, 0x1e, 0xfb, 0x8b)
GUID_DEVINTERFACE_DISK = disk.GUID(0x53f56307, 0xb6bf, 0x11d0, 0x94, 0xf2,
0x00, 0xa0, 0xc9, 0x1e, 0xfb, 0x8b)
class WindowsUtils(base.BaseOSUtils):
@ -466,7 +446,7 @@ class WindowsUtils(base.BaseOSUtils):
'Microsoft\\Windows NT\\CurrentVersion\\'
'ProfileList\\%s' % user_sid) as key:
return winreg.QueryValueEx(key, 'ProfileImagePath')[0]
LOG.debug('Home directory not found for user \'%s\'' % username)
LOG.debug('Home directory not found for user %r', username)
return None
def sanitize_shell_input(self, value):
@ -686,7 +666,7 @@ class WindowsUtils(base.BaseOSUtils):
break
time.sleep(1)
LOG.info('Waiting for sysprep completion. '
'GeneralizationState: %d' % gen_state)
'GeneralizationState: %d', gen_state)
except WindowsError as ex:
if ex.winerror == 2:
LOG.debug('Sysprep data not found in the registry, '
@ -722,7 +702,7 @@ class WindowsUtils(base.BaseOSUtils):
'ret_val': ret_val})
def start_service(self, service_name):
LOG.debug('Starting service %s' % service_name)
LOG.debug('Starting service %s', service_name)
service = self._get_service(service_name)
(ret_val,) = service.StartService()
if ret_val != 0:
@ -732,7 +712,7 @@ class WindowsUtils(base.BaseOSUtils):
'ret_val': ret_val})
def stop_service(self, service_name):
LOG.debug('Stopping service %s' % service_name)
LOG.debug('Stopping service %s', service_name)
service = self._get_service(service_name)
(ret_val,) = service.StopService()
if ret_val != 0:
@ -989,6 +969,33 @@ class WindowsUtils(base.BaseOSUtils):
return physical_disks
def get_volumes(self):
"""Retrieve a list with all the volumes found on all disks."""
volumes = []
volume = ctypes.create_unicode_buffer(chr(0) * self.MAX_PATH)
handle_volumes = kernel32.FindFirstVolumeW(volume, self.MAX_PATH)
if handle_volumes == self.INVALID_HANDLE_VALUE:
raise exception.WindowsCloudbaseInitException(
"FindFirstVolumeW failed: %r")
try:
while True:
volumes.append(volume.value)
found = kernel32.FindNextVolumeW(handle_volumes, volume,
self.MAX_PATH)
if not found:
errno = ctypes.GetLastError()
if errno == self.ERROR_NO_MORE_FILES:
break
else:
raise exception.WindowsCloudbaseInitException(
"FindNextVolumeW failed: %r")
finally:
kernel32.FindVolumeClose(handle_volumes)
return volumes
def _get_fw_protocol(self, protocol):
if protocol == self.PROTOCOL_TCP:
fw_protocol = self._FW_IP_PROTOCOL_TCP

View File

@ -12,7 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import itertools
import os
import unittest
@ -28,380 +30,266 @@ from cloudbaseinit.tests import testutils
CONF = cfg.CONF
OPEN = mock.mock_open()
class TestWindowsConfigDriveManager(unittest.TestCase):
def setUp(self):
self._ctypes_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict('sys.modules',
{'ctypes': self._ctypes_mock})
module_path = "cloudbaseinit.metadata.services.osconfigdrive.windows"
mock_ctypes = mock.MagicMock()
mock_ctypes.wintypes = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules',
{'disk': mock.Mock(),
'ctypes': mock_ctypes,
'winioctlcon': mock.Mock()})
self._module_patcher.start()
self.addCleanup(self._module_patcher.stop)
self.conf_module = importlib.import_module(module_path)
self.windows = importlib.import_module(
"cloudbaseinit.metadata.services.osconfigdrive.windows")
self.physical_disk = importlib.import_module(
"cloudbaseinit.utils.windows.disk")
self.conf_module.osutils_factory = mock.Mock()
self.conf_module.disk.Disk = mock.MagicMock()
self.conf_module.tempfile = mock.Mock()
self.mock_gettempdir = self.conf_module.tempfile.gettempdir
self.mock_gettempdir.return_value = "tempdir"
self.conf_module.uuid = mock.Mock()
self.mock_uuid4 = self.conf_module.uuid.uuid4
self.mock_uuid4.return_value = "uuid"
self._config_manager = self.conf_module.WindowsConfigDriveManager()
self.addCleanup(os.rmdir, self._config_manager.target_path)
self.osutils = mock.Mock()
self._config_manager._osutils = self.osutils
self.snatcher = testutils.LogSnatcher(module_path)
self.physical_disk.Win32_DiskGeometry = mock.MagicMock()
self.windows.disk.PhysicalDisk = mock.MagicMock()
self._config_manager = self.windows.WindowsConfigDriveManager()
def tearDown(self):
self._module_patcher.stop()
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
@mock.patch('os.path.exists')
def _test_get_config_drive_cdrom_mount_point(self, mock_join,
mock_get_os_utils, exists):
mock_osutils = mock.MagicMock()
mock_get_os_utils.return_value = mock_osutils
mock_osutils.get_cdrom_drives.return_value = ['fake drive']
mock_osutils.get_volume_label.return_value = 'config-2'
mock_join.return_value = exists
def _test_check_for_config_drive(self, mock_exists, exists=True,
label="config-2", fail=False):
drive = "C:\\"
self.osutils.get_volume_label.return_value = label
mock_exists.return_value = exists
response = self._config_manager._get_config_drive_cdrom_mount_point()
with self.snatcher:
response = self._config_manager._check_for_config_drive(drive)
mock_osutils.get_cdrom_drives.assert_called_once_with()
mock_osutils.get_volume_label.assert_called_once_with('fake drive')
self.osutils.get_volume_label.assert_called_once_with(drive)
if exists and not fail:
self.assertEqual(["Config Drive found on C:\\"],
self.snatcher.output)
self.assertEqual(not fail, response)
if exists:
self.assertEqual('fake drive', response)
else:
def test_check_for_config_drive_exists(self):
self._test_check_for_config_drive()
def test_check_for_config_drive_exists_upper_label(self):
self._test_check_for_config_drive(label="CONFIG-2")
def test_check_for_config_drive_missing(self):
self._test_check_for_config_drive(exists=False, fail=True)
def test_check_for_config_drive_wrong_label(self):
self._test_check_for_config_drive(label="config-3", fail=True)
def _test_get_iso_file_size(self, fixed=True, small=False,
found_iso=True):
device = mock.Mock()
device.fixed = fixed
device.size = (self.conf_module.OFFSET_BLOCK_SIZE +
self.conf_module.PEEK_SIZE + int(not small))
iso_id = self.conf_module.ISO_ID
if not found_iso:
iso_id = b"pwned"
iso_off = self.conf_module.OFFSET_ISO_ID - 1
volume_off = self.conf_module.OFFSET_VOLUME_SIZE - 1
block_off = self.conf_module.OFFSET_BLOCK_SIZE - 1
volume_bytes = b'd\x00' # 100
block_bytes = b'\x00\x02' # 512
device.seek.side_effect = [iso_off, volume_off, block_off]
device.read.side_effect = [iso_id, volume_bytes, block_bytes]
response = self._config_manager._get_iso_file_size(device)
if not fixed or small or not found_iso:
self.assertIsNone(response)
return
def test_get_config_drive_cdrom_mount_point_exists_true(self):
self._test_get_config_drive_cdrom_mount_point(exists=True)
seek_calls = [
mock.call(self.conf_module.OFFSET_ISO_ID),
mock.call(self.conf_module.OFFSET_VOLUME_SIZE),
mock.call(self.conf_module.OFFSET_BLOCK_SIZE)]
read_calls = [
mock.call(len(iso_id),
skip=self.conf_module.OFFSET_ISO_ID - iso_off),
mock.call(self.conf_module.PEEK_SIZE,
skip=self.conf_module.OFFSET_VOLUME_SIZE - volume_off),
mock.call(self.conf_module.PEEK_SIZE,
skip=self.conf_module.OFFSET_BLOCK_SIZE - block_off)]
device.seek.assert_has_calls(seek_calls)
device.read.assert_has_calls(read_calls)
self.assertEqual(100 * 512, response)
def test_get_config_drive_cdrom_mount_point_exists_false(self):
self._test_get_config_drive_cdrom_mount_point(exists=False)
def test_get_iso_file_size_not_fixed(self):
self._test_get_iso_file_size(fixed=False)
def test_c_char_array_to_c_ushort(self):
mock_buf = mock.MagicMock()
contents = self._ctypes_mock.cast.return_value.contents
def test_get_iso_file_size_small(self):
self._test_get_iso_file_size(small=True)
response = self._config_manager._c_char_array_to_c_ushort(mock_buf, 1)
def test_get_iso_file_size_not_found(self):
self._test_get_iso_file_size(found_iso=False)
self.assertEqual(2, self._ctypes_mock.cast.call_count)
self._ctypes_mock.POINTER.assert_called_with(
self._ctypes_mock.wintypes.WORD)
def test_get_iso_file_size(self):
self._test_get_iso_file_size()
self._ctypes_mock.cast.assert_called_with(
mock_buf.__getitem__(), self._ctypes_mock.POINTER.return_value)
@mock.patch("six.moves.builtins.open", new=OPEN)
def test_write_iso_file(self):
file_path = "fake\\path"
file_size = 100 * 512
sector_size = self.conf_module.MAX_SECTOR_SIZE
offsets = list(range(0, file_size, sector_size))
remain = file_size % sector_size
reads = ([b"\x00" * sector_size] *
(len(offsets) - int(bool(remain))) +
([b"\x00" * remain] if remain else []))
self.assertEqual(contents.value.__lshift__().__add__(), response)
device = mock.Mock()
device_seek_calls = [mock.call(off) for off in offsets]
device_read_calls = [
mock.call(min(sector_size, file_size - off), skip=0)
for off in offsets]
stream_write_calls = [mock.call(read) for read in reads]
device.seek.side_effect = offsets
device.read.side_effect = reads
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._c_char_array_to_c_ushort')
def _test_get_iso_disk_size(self, mock_c_char_array_to_c_ushort,
media_type, value, iso_id,
bytes_per_sector=2):
if media_type == "fixed":
media_type = self.physical_disk.Win32_DiskGeometry.FixedMedia
self._config_manager._write_iso_file(device, file_path, file_size)
device.seek.assert_has_calls(device_seek_calls)
device.read.assert_has_calls(device_read_calls)
OPEN.return_value.write.assert_has_calls(stream_write_calls)
boot_record_off = 0x8000
volume_size_off = 80
block_size_off = 128
mock_phys_disk = mock.MagicMock()
mock_buff = mock.MagicMock()
mock_geom = mock.MagicMock()
mock_phys_disk.get_geometry.return_value = mock_geom
mock_geom.MediaType = media_type
mock_geom.Cylinders = value
mock_geom.TracksPerCylinder = 2
mock_geom.SectorsPerTrack = 2
mock_geom.BytesPerSector = bytes_per_sector
mock_phys_disk.read.return_value = (mock_buff, 'fake value')
mock_buff.__getitem__.return_value = iso_id
mock_c_char_array_to_c_ushort.return_value = 100
disk_size = mock_geom.Cylinders * mock_geom.TracksPerCylinder * \
mock_geom.SectorsPerTrack * mock_geom.BytesPerSector
offset = boot_record_off // mock_geom.BytesPerSector * \
mock_geom.BytesPerSector
buf_off_volume = boot_record_off - offset + volume_size_off
buf_off_block = boot_record_off - offset + block_size_off
response = self._config_manager._get_iso_disk_size(mock_phys_disk)
mock_phys_disk.get_geometry.assert_called_once_with()
if media_type != self.physical_disk.Win32_DiskGeometry.FixedMedia:
self.assertIsNone(response)
elif disk_size <= offset + mock_geom.BytesPerSector:
self.assertIsNone(response)
else:
mock_phys_disk.seek.assert_called_once_with(offset)
mock_phys_disk.read.assert_called_once_with(
mock_geom.BytesPerSector)
if iso_id != 'CD001':
self.assertIsNone(response)
else:
mock_c_char_array_to_c_ushort.assert_has_calls(
mock.call(mock_buff, buf_off_volume),
mock.call(mock_buff, buf_off_block))
self.assertEqual(10000, response)
def test_test_get_iso_disk_size(self):
self._test_get_iso_disk_size(
media_type="fixed",
value=100, iso_id='CD001')
def test_test_get_iso_disk_size_other_media_type(self):
self._test_get_iso_disk_size(media_type="other", value=100,
iso_id='CD001')
def test_test_get_iso_disk_size_other_disk_size_too_small(self):
self._test_get_iso_disk_size(
media_type="fixed",
value=0, iso_id='CD001')
def test_test_get_iso_disk_size_other_id(self):
self._test_get_iso_disk_size(
media_type="fixed",
value=100, iso_id='other id')
def test_test_get_iso_disk_size_bigger_offset(self):
self._test_get_iso_disk_size(
media_type="other media", value=100, iso_id='other id',
bytes_per_sector=2000)
def test_write_iso_file(self, bytes_per_sector=40):
mock_buff = mock.MagicMock()
mock_geom = mock.MagicMock()
mock_geom.BytesPerSector = 2
mock_phys_disk = mock.MagicMock()
mock_phys_disk.read.return_value = (mock_buff, 10)
fake_path = os.path.join('fake', 'path')
mock_phys_disk.get_geometry.return_value = mock_geom
with mock.patch('six.moves.builtins.open', mock.mock_open(),
create=True) as f:
self._config_manager._write_iso_file(mock_phys_disk, fake_path,
4098)
f().write.assert_called_with(mock_buff)
seek_calls = [mock.call(value) for value in range(0, 4098, 10)]
read_calls = (
[mock.call(4096)] +
[mock.call(value) for value in range(4088, 0, -10)]
)
self.assertEqual(seek_calls, mock_phys_disk.seek.mock_calls)
self.assertEqual(read_calls, mock_phys_disk.read.mock_calls)
@mock.patch('os.makedirs')
def _test_extract_iso_files(self, mock_makedirs, exit_code):
def _test_extract_files_from_iso(self, exit_code):
fake_path = os.path.join('fake', 'path')
fake_target_path = os.path.join(fake_path, 'target')
self._config_manager.target_path = fake_target_path
args = [CONF.bsdtar_path, '-xf', fake_path, '-C', fake_target_path]
mock_os_utils = mock.MagicMock()
mock_os_utils.execute_process.return_value = ('fake out', 'fake err',
exit_code)
self.osutils.execute_process.return_value = ('fake out', 'fake err',
exit_code)
if exit_code:
self.assertRaises(exception.CloudbaseInitException,
self._config_manager._extract_iso_files,
mock_os_utils, fake_path, fake_target_path)
self._config_manager._extract_files_from_iso,
fake_path)
else:
self._config_manager._extract_iso_files(mock_os_utils, fake_path,
fake_target_path)
self._config_manager._extract_files_from_iso(fake_path)
mock_os_utils.execute_process.assert_called_once_with(args, False)
mock_makedirs.assert_called_once_with(fake_target_path)
self.osutils.execute_process.assert_called_once_with(args, False)
def test_extract_iso_files(self):
self._test_extract_iso_files(exit_code=None)
def test_extract_files_from_iso(self):
self._test_extract_files_from_iso(exit_code=0)
def test_extract_iso_files_exception(self):
self._test_extract_iso_files(exit_code=1)
def test_extract_files_from_iso_fail(self):
self._test_extract_files_from_iso(exit_code=1)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_iso_disk_size')
'WindowsConfigDriveManager._extract_files_from_iso')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._write_iso_file')
def _test_extract_iso_disk_file(self, mock_write_iso_file,
mock_get_iso_disk_size, exception):
mock_osutils = mock.MagicMock()
fake_path = os.path.join('fake', 'path')
fake_path_physical = os.path.join(fake_path, 'physical')
mock_osutils.get_physical_disks.return_value = [fake_path_physical]
mock_get_iso_disk_size.return_value = 'fake iso size'
mock_PhysDisk = self.windows.disk.PhysicalDisk.return_value
if exception:
mock_PhysDisk.open.side_effect = [Exception]
response = self._config_manager._extract_iso_disk_file(
osutils=mock_osutils, iso_file_path=fake_path)
if not exception:
mock_get_iso_disk_size.assert_called_once_with(
mock_PhysDisk)
mock_write_iso_file.assert_called_once_with(
mock_PhysDisk, fake_path, 'fake iso size')
self.windows.disk.PhysicalDisk.assert_called_once_with(
fake_path_physical)
mock_osutils.get_physical_disks.assert_called_once_with()
mock_PhysDisk.open.assert_called_once_with()
mock_PhysDisk.close.assert_called_once_with()
self.assertTrue(response)
else:
self.assertFalse(response)
def test_extract_iso_disk_file_disk_found(self):
self._test_extract_iso_disk_file(exception=False)
def test_extract_iso_disk_file_disk_not_found(self):
self._test_extract_iso_disk_file(exception=True)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_conf_drive_from_raw_hdd')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_conf_drive_from_cdrom_drive')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_conf_drive_from_vfat')
def _test_get_config_drive_files(self,
mock_get_conf_drive_from_vfat,
mock_get_conf_drive_from_cdrom_drive,
mock_get_conf_drive_from_raw_hdd,
raw_hdd_found=False,
cdrom_drive_found=False,
vfat_found=False):
'WindowsConfigDriveManager._get_iso_file_size')
def _test_extract_iso_from_devices(self, mock_get_iso_file_size,
mock_write_iso_file,
mock_extract_files_from_iso,
found=True):
# For every device (mock) in the list of available devices:
# first - skip (no size)
# second - error (throws Exception)
# third - extract (is ok)
# fourth - unreachable (already found ok device)
size = 100 * 512
devices = [mock.MagicMock() for _ in range(4)]
devices[1].__enter__.side_effect = [Exception]
rest = [size] if found else [None]
mock_get_iso_file_size.side_effect = [None] + rest * 2
file_path = os.path.join("tempdir", "uuid.iso")
fake_path = os.path.join('fake', 'path')
mock_get_conf_drive_from_raw_hdd.return_value = raw_hdd_found
mock_get_conf_drive_from_cdrom_drive.return_value = cdrom_drive_found
mock_get_conf_drive_from_vfat.return_value = vfat_found
with self.snatcher:
response = self._config_manager._extract_iso_from_devices(devices)
self.mock_gettempdir.assert_called_once_with()
mock_get_iso_file_size.assert_has_calls([
mock.call(devices[0]), mock.call(devices[2])])
expected_log = [
"ISO extraction failed on %(device)s with %(error)r" %
{"device": devices[1], "error": Exception()}]
if found:
mock_write_iso_file.assert_called_once_with(devices[2],
file_path, size)
mock_extract_files_from_iso.assert_called_once_with(file_path)
expected_log.append("ISO9660 disk found on %s" % devices[2])
self.assertEqual(expected_log, self.snatcher.output)
self.assertEqual(found, response)
response = self._config_manager.get_config_drive_files(
target_path=fake_path)
def test_extract_iso_from_devices_not_found(self):
self._test_extract_iso_from_devices(found=False)
if vfat_found:
mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path)
self.assertFalse(mock_get_conf_drive_from_raw_hdd.called)
self.assertFalse(mock_get_conf_drive_from_cdrom_drive.called)
elif cdrom_drive_found:
mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path)
mock_get_conf_drive_from_cdrom_drive.assert_called_once_with(
fake_path)
mock_get_conf_drive_from_raw_hdd.assert_called_once_with(
fake_path)
elif raw_hdd_found:
mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path)
mock_get_conf_drive_from_raw_hdd.assert_called_once_with(
fake_path)
self.assertFalse(mock_get_conf_drive_from_cdrom_drive.called)
self.assertTrue(response)
def test_get_config_drive_files(self):
self._test_get_config_drive_files(raw_hdd_found=True)
self._test_get_config_drive_files(cdrom_drive_found=True)
self._test_get_config_drive_files(vfat_found=True)
def test_extract_iso_from_devices(self):
self._test_extract_iso_from_devices()
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_cdrom_mount_point')
'_check_for_config_drive')
@mock.patch('shutil.copytree')
def _test_get_conf_drive_from_cdrom_drive(self, mock_copytree,
mock_get_config_cdrom_mount,
mount_point):
fake_path = os.path.join('fake', 'path')
mock_get_config_cdrom_mount.return_value = mount_point
@mock.patch('os.rmdir')
def _test_get_config_drive_from_cdrom_drive(self, mock_os_rmdir,
mock_copytree,
mock_check_for_config_drive,
found=True):
drives = ["C:\\", "M:\\", "I:\\", "N:\\"]
self.osutils.get_cdrom_drives.return_value = drives
checks = [False, False, True, False]
if not found:
checks[2] = False
mock_check_for_config_drive.side_effect = checks
response = self._config_manager._get_conf_drive_from_cdrom_drive(
fake_path)
response = self._config_manager._get_config_drive_from_cdrom_drive()
mock_get_config_cdrom_mount.assert_called_once_with()
self.osutils.get_cdrom_drives.assert_called_once_with()
idx = 3 if found else 4
check_calls = [mock.call(drive) for drive in drives[:idx]]
mock_check_for_config_drive.assert_has_calls(check_calls)
if found:
mock_os_rmdir.assert_called_once_with(
self._config_manager.target_path)
mock_copytree.assert_called_once_with(
drives[2], self._config_manager.target_path)
if mount_point:
mock_copytree.assert_called_once_with(mount_point, fake_path)
self.assertTrue(response)
else:
self.assertFalse(response)
self.assertEqual(found, response)
def test_get_conf_drive_from_cdrom_drive_with_mountpoint(self):
self._test_get_conf_drive_from_cdrom_drive(
mount_point='fake mount point')
def test_get_config_drive_from_cdrom_drive_not_found(self):
self._test_get_config_drive_from_cdrom_drive(found=False)
def test_get_conf_drive_from_cdrom_drive_without_mountpoint(self):
self._test_get_conf_drive_from_cdrom_drive(
mount_point=None)
def test_get_config_drive_from_cdrom_drive(self):
self._test_get_config_drive_from_cdrom_drive()
@mock.patch('os.remove')
@mock.patch('os.path.exists')
@mock.patch('tempfile.gettempdir')
@mock.patch('uuid.uuid4')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._extract_iso_disk_file')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._extract_iso_files')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
def _test_get_conf_drive_from_raw_hdd(self, mock_get_os_utils,
mock_extract_iso_files,
mock_extract_iso_disk_file,
mock_uuid4, mock_gettempdir,
mock_exists, mock_remove,
found_drive):
fake_target_path = os.path.join('fake', 'path')
fake_iso_path = os.path.join('fake_dir', 'fake_id' + '.iso')
'WindowsConfigDriveManager.'
'_extract_iso_from_devices')
@mock.patch("six.moves.builtins.map")
def test_get_config_drive_from_raw_hdd(self, mock_map,
mock_extract_iso_from_devices):
Disk = self.conf_module.disk.Disk
paths = [mock.Mock() for _ in range(3)]
self.osutils.get_physical_disks.return_value = paths
mock_extract_iso_from_devices.return_value = True
mock_uuid4.return_value = 'fake_id'
mock_gettempdir.return_value = 'fake_dir'
mock_extract_iso_disk_file.return_value = found_drive
mock_exists.return_value = found_drive
response = self._config_manager._get_config_drive_from_raw_hdd()
mock_map.assert_called_once_with(Disk, paths)
self.osutils.get_physical_disks.assert_called_once_with()
mock_extract_iso_from_devices.assert_called_once_with(
mock_map.return_value)
self.assertTrue(response)
response = self._config_manager._get_conf_drive_from_raw_hdd(
fake_target_path)
mock_get_os_utils.assert_called_once_with()
mock_gettempdir.assert_called_once_with()
mock_extract_iso_disk_file.assert_called_once_with(
mock_get_os_utils(), fake_iso_path)
if found_drive:
mock_extract_iso_files.assert_called_once_with(
mock_get_os_utils(), fake_iso_path, fake_target_path)
mock_exists.assert_called_once_with(fake_iso_path)
mock_remove.assert_called_once_with(fake_iso_path)
self.assertTrue(response)
else:
self.assertFalse(response)
def test_get_conf_drive_from_raw_hdd_found_drive(self):
self._test_get_conf_drive_from_raw_hdd(found_drive=True)
def test_get_conf_drive_from_raw_hdd_no_drive_found(self):
self._test_get_conf_drive_from_raw_hdd(found_drive=False)
@mock.patch('os.makedirs')
@mock.patch('cloudbaseinit.utils.windows.vfat.copy_from_vfat_drive')
@mock.patch('cloudbaseinit.utils.windows.vfat.is_vfat_drive')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
def test_get_conf_drive_from_vfat(self, mock_get_os_utils,
mock_is_vfat_drive,
mock_copy_from_vfat_drive,
mock_os_makedirs):
mock_osutils = mock_get_os_utils.return_value
mock_osutils.get_physical_disks.return_value = (
def test_get_config_drive_from_vfat(self, mock_is_vfat_drive,
mock_copy_from_vfat_drive):
self.osutils.get_physical_disks.return_value = (
mock.sentinel.drive1,
mock.sentinel.drive2,
)
@ -409,24 +297,160 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
with testutils.LogSnatcher('cloudbaseinit.metadata.services.'
'osconfigdrive.windows') as snatcher:
response = self._config_manager._get_conf_drive_from_vfat(
mock.sentinel.target_path)
response = self._config_manager._get_config_drive_from_vfat()
self.assertTrue(response)
mock_osutils.get_physical_disks.assert_called_once_with()
self.osutils.get_physical_disks.assert_called_once_with()
expected_is_vfat_calls = [
mock.call(mock_osutils, mock.sentinel.drive1),
mock.call(mock_osutils, mock.sentinel.drive2),
mock.call(self.osutils, mock.sentinel.drive1),
mock.call(self.osutils, mock.sentinel.drive2),
]
self.assertEqual(expected_is_vfat_calls, mock_is_vfat_drive.mock_calls)
mock_copy_from_vfat_drive.assert_called_once_with(
mock_osutils,
self.osutils,
mock.sentinel.drive2,
mock.sentinel.target_path)
self._config_manager.target_path)
expected_logging = [
'Config Drive found on disk %r' % mock.sentinel.drive2,
]
self.assertEqual(expected_logging, snatcher.output)
mock_os_makedirs.assert_called_once_with(mock.sentinel.target_path)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_extract_iso_from_devices')
def _test_get_config_drive_from_partition(self,
mock_extract_iso_from_devices,
found=True):
paths = [mock.Mock() for _ in range(3)]
self.osutils.get_physical_disks.return_value = paths
disks = list(map(self.conf_module.disk.Disk, paths))
mock_extract_iso_from_devices.side_effect = [False, found, found]
idx = 3 - int(found)
extract_calls = [mock.call(disk.partitions())
for disk in disks[:idx]]
response = self._config_manager._get_config_drive_from_partition()
self.osutils.get_physical_disks.assert_called_once_with()
mock_extract_iso_from_devices.assert_has_calls(extract_calls)
self.assertEqual(found, response)
def test_get_config_drive_from_partition_not_found(self):
self._test_get_config_drive_from_partition(found=False)
def test_get_config_drive_from_partition(self):
self._test_get_config_drive_from_partition()
@mock.patch('os.rmdir')
@mock.patch('shutil.copytree')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_check_for_config_drive')
def _test_get_config_drive_from_volume(self, mock_check_for_config_drive,
mock_copytree, mock_os_rmdir,
found=True):
volumes = [mock.Mock() for _ in range(3)]
self.osutils.get_volumes.return_value = volumes
checks = [False, found, found]
mock_check_for_config_drive.side_effect = checks
idx = 3 - int(found)
check_calls = [mock.call(volume) for volume in volumes[:idx]]
response = self._config_manager._get_config_drive_from_volume()
self.osutils.get_volumes.assert_called_once_with()
mock_check_for_config_drive.assert_has_calls(check_calls)
if found:
mock_os_rmdir.assert_called_once_with(
self._config_manager.target_path)
mock_copytree.assert_called_once_with(
volumes[1], self._config_manager.target_path)
self.assertEqual(found, response)
def test_get_config_drive_from_volume_not_found(self):
self._test_get_config_drive_from_volume(found=False)
def test_get_config_drive_from_volume(self):
self._test_get_config_drive_from_volume()
def _test__get_config_drive_files(self, cd_type, cd_location,
func, found=True):
response = self._config_manager._get_config_drive_files(cd_type,
cd_location)
if found:
if func:
func.assert_called_once_with()
self.assertEqual(func.return_value, response)
else:
self.assertFalse(response)
def test__get_config_drive_files_not_found(self):
self._test__get_config_drive_files(None, None, None, found=False)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_from_cdrom_drive')
def test__get_config_drive_files_cdrom_iso(self, func):
self._test__get_config_drive_files(
"iso", "cdrom", func)
def test__get_config_drive_files_cdrom_vfat(self):
self._test__get_config_drive_files(
"vfat", "cdrom", None)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_from_raw_hdd')
def test__get_config_drive_files_hdd_iso(self, func):
self._test__get_config_drive_files(
"iso", "hdd", func)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_from_vfat')
def test__get_config_drive_files_hdd_vfat(self, func):
self._test__get_config_drive_files(
"vfat", "hdd", func)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_from_partition')
def test__get_config_drive_files_partition_iso(self, func):
self._test__get_config_drive_files(
"iso", "partition", func)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_from_volume')
def test__get_config_drive_files_partition_vfat(self, func):
self._test__get_config_drive_files(
"vfat", "partition", func)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_files')
def _test_get_config_drive_files(self, mock_get_config_drive_files,
found=True):
check_types = ["iso", "vfat"] if found else []
check_locations = ["cdrom", "hdd", "partition"]
product = list(itertools.product(check_types, check_locations))
product_calls = [mock.call(cd_type, cd_location)
for cd_type, cd_location in product]
mock_get_config_drive_files.side_effect = \
[False] * (len(product_calls) - 1) + [True]
expected_log = ["Looking for Config Drive %(type)s in %(location)s" %
{"type": cd_type, "location": cd_location}
for cd_type, cd_location in product]
with self.snatcher:
response = self._config_manager.get_config_drive_files(
check_types, check_locations)
mock_get_config_drive_files.assert_has_calls(product_calls)
self.assertEqual(expected_log, self.snatcher.output)
self.assertEqual(found, response)
def test_get_config_drive_files_not_found(self):
self._test_get_config_drive_files(found=False)
def test_get_config_drive_files(self):
self._test_get_config_drive_files()

View File

@ -12,23 +12,24 @@
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import os
import unittest
import uuid
try:
import unittest.mock as mock
except ImportError:
import mock
from oslo_config import cfg
CONF = cfg.CONF
from cloudbaseinit import exception
from cloudbaseinit.tests import testutils
class ConfigDriveServiceTest(unittest.TestCase):
class TestConfigDriveService(unittest.TestCase):
def setUp(self):
module_path = "cloudbaseinit.metadata.services.configdrive"
self._win32com_mock = mock.MagicMock()
self._ctypes_mock = mock.MagicMock()
self._ctypes_util_mock = mock.MagicMock()
@ -43,35 +44,66 @@ class ConfigDriveServiceTest(unittest.TestCase):
'win32com.client': self._win32com_client_mock,
'pywintypes': self._pywintypes_mock})
self._module_patcher.start()
self.addCleanup(self._module_patcher.stop)
configdrive = importlib.import_module('cloudbaseinit.metadata.services'
'.configdrive')
self._config_drive = configdrive.ConfigDriveService()
self.configdrive_module = importlib.import_module(module_path)
self._config_drive = self.configdrive_module.ConfigDriveService()
self.snatcher = testutils.LogSnatcher(module_path)
def tearDown(self):
self._module_patcher.stop()
def _test_preprocess_options(self, fail=False):
if fail:
with testutils.ConfPatcher("config_drive_types",
["vfat", "ntfs"]):
with self.assertRaises(exception.CloudbaseInitException):
self._config_drive._preprocess_options()
with testutils.ConfPatcher("config_drive_locations",
["device"]):
with self.assertRaises(exception.CloudbaseInitException):
self._config_drive._preprocess_options()
return
options = {
"config_drive_raw_hhd": False,
"config_drive_cdrom": False,
"config_drive_vfat": True,
# Deprecated options above.
"config_drive_types": ["vfat", "iso"],
"config_drive_locations": ["partition"]
}
contexts = [testutils.ConfPatcher(key, value)
for key, value in options.items()]
with contexts[0], contexts[1], contexts[2], \
contexts[3], contexts[4]:
self._config_drive._preprocess_options()
self.assertEqual({"vfat", "iso"},
self._config_drive._searched_types)
self.assertEqual({"hdd", "partition"},
self._config_drive._searched_locations)
def test_preprocess_options_fail(self):
self._test_preprocess_options(fail=True)
def test_preprocess_options(self):
self._test_preprocess_options()
@mock.patch('tempfile.gettempdir')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.factory.'
'get_config_drive_manager')
def test_load(self, mock_get_config_drive_manager,
mock_gettempdir):
def test_load(self, mock_get_config_drive_manager):
mock_manager = mock.MagicMock()
mock_manager.get_config_drive_files.return_value = True
fake_path = "fake\\fake_id"
mock_manager.target_path = fake_path
mock_get_config_drive_manager.return_value = mock_manager
mock_gettempdir.return_value = 'fake'
uuid.uuid4 = mock.MagicMock(return_value='fake_id')
fake_path = os.path.join('fake', str('fake_id'))
expected_log = [
"Metadata copied to folder: %r" % fake_path]
response = self._config_drive.load()
with self.snatcher:
response = self._config_drive.load()
mock_gettempdir.assert_called_once_with()
mock_get_config_drive_manager.assert_called_once_with()
mock_manager.get_config_drive_files.assert_called_once_with(
fake_path,
check_raw_hhd=CONF.config_drive_raw_hhd,
check_cdrom=CONF.config_drive_cdrom,
check_vfat=CONF.config_drive_vfat)
searched_types=self.configdrive_module.CD_TYPES,
searched_locations=self.configdrive_module.CD_LOCATIONS)
self.assertEqual(expected_log, self.snatcher.output)
self.assertTrue(response)
self.assertEqual(fake_path, self._config_drive._metadata_path)
@ -85,10 +117,19 @@ class ConfigDriveServiceTest(unittest.TestCase):
self.assertEqual('fake data', response)
mock_join.assert_called_with(
self._config_drive._metadata_path, fake_path)
mock_normpath.assert_called_once_with(mock_join.return_value)
@mock.patch('shutil.rmtree')
def test_cleanup(self, mock_rmtree):
fake_path = os.path.join('fake', 'path')
self._config_drive._metadata_path = fake_path
self._config_drive.cleanup()
mock_mgr = mock.Mock()
self._config_drive._mgr = mock_mgr
mock_mgr.target_path = fake_path
with self.snatcher:
self._config_drive.cleanup()
self.assertEqual(["Deleting metadata folder: %r" % fake_path],
self.snatcher.output)
mock_rmtree.assert_called_once_with(fake_path,
ignore_errors=True)
self.assertEqual(None, self._config_drive._metadata_path)

View File

@ -66,7 +66,7 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._win32net_mock.error = Exception
module_path = "cloudbaseinit.osutils.windows"
self._module_patcher = mock.patch.dict(
_module_patcher = mock.patch.dict(
'sys.modules',
{'win32com': self._win32com_mock,
'win32process': self._win32process_mock,
@ -78,10 +78,15 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
'six.moves.xmlrpc_client': self._xmlrpc_client_mock,
'ctypes': self._ctypes_mock,
'pywintypes': self._pywintypes_mock,
'tzlocal': self._tzlocal_mock})
'tzlocal': self._tzlocal_mock,
'winioctlcon': mock.MagicMock()})
_module_patcher.start()
self.addCleanup(_module_patcher.stop)
self._module_patcher.start()
self.windows_utils = importlib.import_module(module_path)
exception.ctypes.GetLastError = mock.MagicMock()
exception.ctypes.FormatError = mock.MagicMock()
with mock.patch("cloudbaseinit.utils.windows.disk.GUID"):
self.windows_utils = importlib.import_module(module_path)
self._winreg_mock = self._moves_mock.winreg
self._windll_mock = self._ctypes_mock.windll
@ -96,9 +101,6 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self.snatcher = testutils.LogSnatcher(module_path)
def tearDown(self):
self._module_patcher.stop()
@mock.patch('cloudbaseinit.osutils.windows.privilege')
def _test_reboot(self, mock_privilege_module, ret_value,
expected_ret_value=None):
@ -523,7 +525,8 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
def _test_get_user_home(self, user_sid, mock_get_user_sid):
mock_get_user_sid.return_value = user_sid
response = self._winutils.get_user_home(self._USERNAME)
with self.snatcher:
response = self._winutils.get_user_home(self._USERNAME)
if user_sid:
mock_get_user_sid.assert_called_with(self._USERNAME)
@ -536,6 +539,9 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._winreg_mock.OpenKey.return_value.__enter__.return_value,
'ProfileImagePath')
else:
self.assertEqual(
["Home directory not found for user %r" % self._USERNAME],
self.snatcher.output)
self.assertTrue(response is None)
def test_get_user_home(self):
@ -593,6 +599,13 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
)
return
expected_log = []
for (ret_val,), msg in ((static_val, "Setting static IP address"),
(gateway_val, "Setting static gateways"),
(dns_val, "Setting static DNS servers")):
if ret_val in (0, 1):
expected_log.append(msg)
conn.return_value.query.return_value = adapter
adapter_config = adapter[0].associators.return_value[0]
adapter_config.EnableStatic.return_value = static_val
@ -604,11 +617,13 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
exception.CloudbaseInitException,
set_static_call)
else:
response = set_static_call()
with self.snatcher:
response = set_static_call()
if static_val[0] or gateway_val[0] or dns_val[0]:
self.assertTrue(response)
else:
self.assertFalse(response)
self.assertEqual(expected_log, self.snatcher.output)
select = ("SELECT * FROM Win32_NetworkAdapter WHERE "
"MACAddress = '{}'".format(mac_address))
@ -655,13 +670,19 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
set_static_call = functools.partial(
self._winutils.set_static_network_config_v6,
mac_address, address6, netmask6, gateway6)
expected_log = []
if not mock_check_os_version.return_value:
expected_log.append("Setting IPv6 info not available "
"on this system")
if not v6adapters or v6error:
self.assertRaises(
exception.CloudbaseInitException,
set_static_call)
else:
set_static_call()
expected_log.append("Setting IPv6 info for %s" % friendly_name)
with self.snatcher:
set_static_call()
mock_get_adapter_addresses.assert_called_once_with()
select = ("SELECT * FROM MSFT_NetIPAddress "
"WHERE InterfaceAlias = '{}'".format(friendly_name))
@ -686,6 +707,7 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
"PassThru": False,
}
netip.Create.assert_called_once_with(**params)
self.assertEqual(expected_log, self.snatcher.output)
def test_set_static_network_config(self):
ret_val1 = (1,)
@ -820,10 +842,11 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._test_get_config_value(None)
@mock.patch('time.sleep')
def _test_wait_for_boot_completion(self, ret_val, mock_sleep):
self._winreg_mock.QueryValueEx.side_effect = [ret_val]
def _test_wait_for_boot_completion(self, _, ret_vals=None):
self._winreg_mock.QueryValueEx.side_effect = ret_vals
self._winutils.wait_for_boot_completion()
with self.snatcher:
self._winutils.wait_for_boot_completion()
key = self._winreg_mock.OpenKey.return_value.__enter__.return_value
self._winreg_mock.OpenKey.assert_called_with(
@ -831,12 +854,24 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
"SYSTEM\\Setup\\Status\\SysprepStatus", 0,
self._winreg_mock.KEY_READ)
expected_log = []
for gen_states in ret_vals:
gen_state = gen_states[0]
if gen_state == 7:
break
expected_log.append('Waiting for sysprep completion. '
'GeneralizationState: %d' % gen_state)
self._winreg_mock.QueryValueEx.assert_called_with(
key, "GeneralizationState")
self.assertEqual(expected_log, self.snatcher.output)
def test_wait_for_boot_completion(self):
ret_val = [7]
self._test_wait_for_boot_completion(ret_val)
ret_vals = [[7]]
self._test_wait_for_boot_completion(ret_vals=ret_vals)
def test_wait_for_boot_completion_wait(self):
ret_vals = [[1], [7]]
self._test_wait_for_boot_completion(ret_vals=ret_vals)
def test_get_service(self):
conn = self._wmi_mock.WMI
@ -911,7 +946,10 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._winutils.start_service,
'fake name')
else:
self._winutils.start_service('fake name')
with self.snatcher:
self._winutils.start_service('fake name')
self.assertEqual(["Starting service fake name"],
self.snatcher.output)
mock_service.StartService.assert_called_once_with()
@ -933,7 +971,10 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._winutils.stop_service,
'fake name')
else:
self._winutils.stop_service('fake name')
with self.snatcher:
self._winutils.stop_service('fake name')
self.assertEqual(["Stopping service fake name"],
self.snatcher.output)
mock_service.StopService.assert_called_once_with()
@ -1302,6 +1343,49 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
interface_detail='fake interface detail',
disk_handle=mock_disk_handle, io_control=True)
def _test_get_volumes(self, find_first=True, find_next=True):
count = 3
expected_volumes = ["ID_{}".format(idx) for idx in range(count)]
volume_values = mock.PropertyMock(side_effect=expected_volumes)
mock_volume = mock.Mock()
type(mock_volume).value = volume_values
self._ctypes_mock.create_unicode_buffer.return_value = mock_volume
if not find_first:
self._kernel32.FindFirstVolumeW.return_value = self._winutils\
.INVALID_HANDLE_VALUE
side_effects = [1] * (count - 1)
side_effects.append(0)
self._kernel32.FindNextVolumeW.side_effect = side_effects
if find_next:
self._ctypes_mock.GetLastError.return_value = self._winutils\
.ERROR_NO_MORE_FILES
if not (find_first and find_next):
with self.assertRaises(exception.WindowsCloudbaseInitException):
self._winutils.get_volumes()
return
volumes = self._winutils.get_volumes()
self._kernel32.FindFirstVolumeW.assert_called_once_with(
mock_volume, self._winutils.MAX_PATH)
find_next_calls = [
mock.call(self._kernel32.FindFirstVolumeW.return_value,
mock_volume, self._winutils.MAX_PATH)] * count
self._kernel32.FindNextVolumeW.assert_has_calls(find_next_calls)
self._kernel32.FindVolumeClose.assert_called_once_with(
self._kernel32.FindFirstVolumeW.return_value)
self.assertEqual(expected_volumes, volumes)
def test_get_volumes(self):
self._test_get_volumes()
def test_get_volumes_first_fail(self):
self._test_get_volumes(find_first=False)
def test_get_volumes_next_fail(self):
self._test_get_volumes(find_next=False)
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils._get_fw_protocol')
def test_firewall_create_rule(self, mock_get_fw_protocol):
expected = [mock.call("HNetCfg.FWOpenPort"),

View File

@ -12,201 +12,323 @@
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import exception as cbinit_exception
from cloudbaseinit import exception
from cloudbaseinit.tests import testutils
class WindowsPhysicalDiskUtilsTests(testutils.CloudbaseInitTestBase):
class BaseTestDevice(unittest.TestCase):
def setUp(self):
self._ctypes_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict(
_module_patcher = mock.patch.dict(
'sys.modules',
{'ctypes': self._ctypes_mock})
{'ctypes': self._ctypes_mock,
'ctypes.windll': mock.MagicMock(),
'ctypes.wintypes': mock.MagicMock(),
'winioctlcon': mock.MagicMock()})
_module_patcher.start()
self.addCleanup(_module_patcher.stop)
self._module_patcher.start()
self.physical_disk = importlib.import_module(
self.disk = importlib.import_module(
"cloudbaseinit.utils.windows.disk")
self.mock_dword = mock.Mock()
self._ctypes_mock.wintypes.DWORD = self.mock_dword
self.disk.kernel32 = mock.MagicMock()
self.geom = mock.Mock()
self.geom.MediaType = self.disk.Win32_DiskGeometry.FixedMedia = 12
self.geom.Cylinders = 2
self.geom.TracksPerCylinder = 5
self.geom.SectorsPerTrack = 512
self.geom.BytesPerSector = 512
class TestBaseDevice(BaseTestDevice, testutils.CloudbaseInitTestBase):
def setUp(self):
super(TestBaseDevice, self).setUp()
class FinalBaseDevice(self.disk.BaseDevice):
def size(self):
return 0
self.fake_path = mock.sentinel.fake_path
self._phys_disk_class = self.physical_disk.PhysicalDisk(
self._device_class = FinalBaseDevice(
path=self.fake_path)
self._device_class._sector_size = self.geom.BytesPerSector
self._device_class._disk_size = 2 * 5 * 512 * 512
self.physical_disk.kernel32 = mock.MagicMock()
def tearDown(self):
self._module_patcher.stop()
@mock.patch('cloudbaseinit.utils.windows.disk'
'.PhysicalDisk.close')
def _test_open(self, mock_close, _handle, exception):
self._phys_disk_class._handle = _handle
if exception:
self.physical_disk.kernel32.CreateFileW.return_value = \
self._phys_disk_class.INVALID_HANDLE_VALUE
self.assertRaises(cbinit_exception.CloudbaseInitException,
self._phys_disk_class.open)
def _test_open(self, exc):
if exc:
self.disk.kernel32.CreateFileW.return_value = \
self._device_class.INVALID_HANDLE_VALUE
with self.assert_raises_windows_message(
"Cannot open file: %r", 100):
self._device_class.open()
else:
self._phys_disk_class.open()
self._device_class.open()
self.physical_disk.kernel32.CreateFileW.assert_called_once_with(
self.disk.kernel32.CreateFileW.assert_called_once_with(
self._ctypes_mock.c_wchar_p.return_value,
self._phys_disk_class.GENERIC_READ,
self._phys_disk_class.FILE_SHARE_READ,
0, self._phys_disk_class.OPEN_EXISTING,
self._phys_disk_class.FILE_ATTRIBUTE_READONLY, 0
self._device_class.GENERIC_READ,
self._device_class.FILE_SHARE_READ,
0, self._device_class.OPEN_EXISTING,
self._device_class.FILE_ATTRIBUTE_READONLY, 0
)
self._ctypes_mock.c_wchar_p.assert_called_once_with(self.fake_path)
self.assertEqual(
self.physical_disk.kernel32.CreateFileW.return_value,
self._phys_disk_class._handle)
if _handle:
mock_close.assert_called_once_with()
self.disk.kernel32.CreateFileW.return_value,
self._device_class._handle)
def test_open(self):
self._test_open(_handle=None, exception=None)
self._test_open(exc=False)
def test_open_exeption(self):
self._test_open(_handle=None, exception=True)
def test_open_with_close(self):
self._test_open(_handle=mock.sentinel._handle, exception=True)
def test_open_exception(self):
self._test_open(exc=True)
def test_close(self):
self._phys_disk_class._handle = mock.sentinel._handle
self._phys_disk_class._geom = mock.sentinel._geom
self._device_class._handle = mock.sentinel._handle
self._phys_disk_class.close()
self.physical_disk.kernel32.CloseHandle.assert_called_once_with(
self._device_class.close()
self.disk.kernel32.CloseHandle.assert_called_once_with(
mock.sentinel._handle)
self.assertEqual(None, self._device_class._handle)
self.assertEqual(0, self._phys_disk_class._handle)
self.assertEqual(None, self._phys_disk_class._geom)
def _test_get_geometry(self, ret_val, last_error=None):
mock_disk_geom = self.disk.Win32_DiskGeometry
mock_disk_geom.side_effect = None
mock_disk_geom.return_value = self.geom
@mock.patch('cloudbaseinit.utils.windows.disk'
'.Win32_DiskGeometry')
def _test_get_geometry(self, mock_Win32_DiskGeometry, _geom, ret_val,
last_error=None):
mock_DeviceIoControl = self.physical_disk.kernel32.DeviceIoControl
expect_byref = [mock.call(mock_Win32_DiskGeometry.return_value),
mock_DeviceIoControl = self.disk.kernel32.DeviceIoControl
expect_byref = [mock.call(self.geom),
mock.call(
self._ctypes_mock.wintypes.DWORD.return_value)]
self._phys_disk_class._geom = _geom
self.physical_disk.kernel32.DeviceIoControl.return_value = ret_val
self.mock_dword.return_value)]
self.disk.kernel32.DeviceIoControl.return_value = ret_val
if not ret_val:
with self.assert_raises_windows_message(
"Cannot get disk geometry: %r", last_error):
self._phys_disk_class.get_geometry()
elif _geom:
response = self._phys_disk_class.get_geometry()
self.assertEqual(_geom, response)
self._device_class._get_geometry()
else:
response = self._phys_disk_class.get_geometry()
mock_Win32_DiskGeometry.assert_called_once_with()
self._ctypes_mock.wintypes.DWORD.assert_called_once_with()
response = self._device_class._get_geometry()
self.mock_dword.assert_called_once_with()
mock_DeviceIoControl.assert_called_once_with(
self._phys_disk_class._handle,
self._phys_disk_class.IOCTL_DISK_GET_DRIVE_GEOMETRY, 0, 0,
self._device_class._handle,
self.disk.winioctlcon.IOCTL_DISK_GET_DRIVE_GEOMETRY, 0, 0,
self._ctypes_mock.byref.return_value,
self._ctypes_mock.sizeof.return_value,
self._ctypes_mock.byref.return_value, 0)
self.assertEqual(expect_byref,
self._ctypes_mock.byref.call_args_list)
self.assertEqual(mock_Win32_DiskGeometry.return_value,
self._phys_disk_class._geom)
self.assertEqual(self._phys_disk_class._geom, response)
self.assertEqual((self._device_class._sector_size,
self._device_class._disk_size, True),
response)
def test_get_geometry(self):
self._test_get_geometry(_geom=mock.sentinel._geom,
ret_val=mock.sentinel.ret_val)
self._test_get_geometry(ret_val=mock.sentinel.ret_val)
def test_get_geometry_no_geom(self):
self._test_get_geometry(_geom=None,
ret_val=mock.sentinel.ret_val,
last_error=100)
def test_get_geometry_exception(self):
self._test_get_geometry(ret_val=0, last_error=100)
def test_get_geometry_no_geom_exception(self):
self._test_get_geometry(_geom=None, ret_val=None,
last_error=100)
def _test_seek(self, exception):
def _test__seek(self, exc):
expect_DWORD = [mock.call(0), mock.call(1)]
if exception:
self.physical_disk.kernel32.SetFilePointer.return_value = \
self._phys_disk_class.INVALID_SET_FILE_POINTER
self.assertRaises(cbinit_exception.CloudbaseInitException,
self._phys_disk_class.seek, 1)
if exc:
self.disk.kernel32.SetFilePointer.return_value = \
self._device_class.INVALID_SET_FILE_POINTER
with self.assert_raises_windows_message(
"Seek error: %r", 100):
self._device_class._seek(1)
else:
self._phys_disk_class.seek(1)
self.physical_disk.kernel32.SetFilePointer.assert_called_once_with(
self._phys_disk_class._handle,
self._ctypes_mock.wintypes.DWORD.return_value,
self._device_class._seek(1)
self.disk.kernel32.SetFilePointer.assert_called_once_with(
self._device_class._handle,
self.mock_dword.return_value,
self._ctypes_mock.byref.return_value,
self._phys_disk_class.FILE_BEGIN)
self._device_class.FILE_BEGIN)
self._ctypes_mock.byref.assert_called_once_with(
self._ctypes_mock.wintypes.DWORD.return_value)
self.mock_dword.return_value)
self.assertEqual(expect_DWORD,
self._ctypes_mock.wintypes.DWORD.call_args_list)
self.mock_dword.call_args_list)
def test__seek(self):
self._test__seek(exc=False)
def test__seek_exception(self):
self._test__seek(exc=True)
def test_seek(self):
self._test_seek(exception=False)
offset = self._device_class.seek(1025)
self.assertEqual(1024, offset)
def test_seek_exception(self):
self._test_seek(exception=True)
def _test_read(self, ret_val, last_error=None):
def _test__read(self, ret_val, last_error=None):
bytes_to_read = mock.sentinel.bytes_to_read
self.physical_disk.kernel32.ReadFile.return_value = ret_val
self.disk.kernel32.ReadFile.return_value = ret_val
if not ret_val:
with self.assert_raises_windows_message(
"Read exception: %r", last_error):
self._phys_disk_class.read(bytes_to_read)
self._device_class._read(bytes_to_read)
else:
response = self._phys_disk_class.read(bytes_to_read)
response = self._device_class._read(bytes_to_read)
mock_buffer = self._ctypes_mock.create_string_buffer
self._ctypes_mock.create_string_buffer.assert_called_once_with(
bytes_to_read)
self._ctypes_mock.wintypes.DWORD.assert_called_once_with()
self.physical_disk.kernel32.ReadFile.assert_called_once_with(
self._phys_disk_class._handle,
self._ctypes_mock.create_string_buffer.return_value,
mock_buffer.assert_called_once_with(bytes_to_read)
self.mock_dword.assert_called_once_with()
self.disk.kernel32.ReadFile.assert_called_once_with(
self._device_class._handle,
mock_buffer.return_value,
bytes_to_read, self._ctypes_mock.byref.return_value, 0)
self._ctypes_mock.byref.assert_called_once_with(
self._ctypes_mock.wintypes.DWORD.return_value)
self.mock_dword.return_value)
self.assertEqual(
(self._ctypes_mock.create_string_buffer.return_value,
self._ctypes_mock.wintypes.DWORD.return_value.value),
response)
mock_buffer.return_value.raw[
:self.mock_dword.return_value.value], response)
def test__read(self):
self._test__read(ret_val=mock.sentinel.ret_val)
def test__read_exception(self):
self._test__read(ret_val=None, last_error=100)
def test_read(self):
self._test_read(ret_val=mock.sentinel.ret_val)
_read_func = mock.Mock()
mock_content = mock.MagicMock()
_read_func.return_value = mock_content
self._device_class._read = _read_func
response = self._device_class.read(512, 10)
self._device_class._read.assert_called_once_with(1024)
self.assertEqual(response, mock_content[10, 522])
def test_read_exception(self):
self._test_read(ret_val=None, last_error=100)
class TestDisk(BaseTestDevice, testutils.CloudbaseInitTestBase):
def setUp(self):
super(TestDisk, self).setUp()
self.fake_disk_path = mock.sentinel.fake_disk_path
self._disk_class = self.disk.Disk(
path=self.fake_disk_path)
self._disk_class._disk_size = 2 * 5 * 512 * 512
@mock.patch("cloudbaseinit.utils.windows.disk"
".Win32_DRIVE_LAYOUT_INFORMATION_EX")
def _test_get_layout(self, mock_layout_struct, fail=False):
mock_layout = mock.Mock()
mock_layout_struct.return_value = mock_layout
mock_devio = self.disk.kernel32.DeviceIoControl
if fail:
mock_devio.return_value = 0
with self.assert_raises_windows_message(
"Cannot get disk layout: %r", 100):
self._disk_class._get_layout()
return
mock_devio.return_value = 1
response = self._disk_class._get_layout()
mock_devio.assert_called_once_with(
self._disk_class._handle,
self.disk.winioctlcon.IOCTL_DISK_GET_DRIVE_LAYOUT_EX,
0,
0,
self._ctypes_mock.byref(mock_layout),
self._ctypes_mock.sizeof(mock_layout),
self._ctypes_mock.byref(self.mock_dword.return_value),
0)
self.assertEqual(mock_layout, response)
def test_get_layout_fail(self):
self._test_get_layout(fail=True)
def test_get_layout(self):
self._test_get_layout()
def _test_get_partition_indexes(self, fail=False, gpt=True):
layout = mock.MagicMock()
if fail:
with self.assertRaises(exception.CloudbaseInitException):
self._disk_class._get_partition_indexes(layout)
return
part_style = (self._disk_class.PARTITION_STYLE_GPT if gpt
else self._disk_class.PARTITION_STYLE_MBR)
layout.PartitionStyle = part_style
count = 8
layout.PartitionCount = count
if gpt:
expected = list(range(count))
else:
layout.PartitionEntry = [mock.Mock() for _ in range(count)]
layout.PartitionEntry[-1].Mbr.PartitionType = \
self._disk_class.PARTITION_ENTRY_UNUSED
expected = list(range(count - 1))
response = self._disk_class._get_partition_indexes(layout)
self.assertEqual(expected, response)
def test_get_partition_indexes_fail(self):
self._test_get_partition_indexes(fail=True)
def test_get_partition_indexes_gpt(self):
self._test_get_partition_indexes()
def test_get_partition_indexes_mbr(self):
self._test_get_partition_indexes(gpt=False)
@mock.patch("cloudbaseinit.utils.windows.disk"
".Partition")
@mock.patch("cloudbaseinit.utils.windows.disk"
".Disk._get_partition_indexes")
@mock.patch("cloudbaseinit.utils.windows.disk"
".Disk._get_layout")
def test_partitions(self, mock_get_layout, mock_get_partition_indexes,
mock_partition):
size = 512
layout = mock.MagicMock()
layout.PartitionEntry[0].PartitionLength = size
indexes = [0, 1, 2]
mock_get_layout.return_value = layout
mock_get_partition_indexes.return_value = indexes
self._disk_class._path = r"\\?\GLOBALROOT\Device\Harddisk0"
response = self._disk_class.partitions()
mock_get_layout.assert_called_once_with()
mock_get_partition_indexes.assert_called_once_with(layout)
paths = [r"\\?\GLOBALROOT\Device\Harddisk{}\Partition{}".format(
0, idx + 1) for idx in indexes]
calls = [mock.call(path, size) for path in paths]
mock_partition.assert_has_calls(calls)
expected = [mock_partition(path, size) for path in paths]
self.assertEqual(expected, response)
class TestPartition(unittest.TestCase):
def setUp(self):
_module_patcher = mock.patch.dict(
'sys.modules',
{'ctypes': mock.MagicMock(),
'winioctlcon': mock.MagicMock()})
_module_patcher.start()
self.addCleanup(_module_patcher.stop)
self.disk = importlib.import_module(
"cloudbaseinit.utils.windows.disk")
def test_size(self):
size = mock.sentinel.size
partition = self.disk.Partition(mock.Mock(), size)
self.assertEqual(size, partition.size)

View File

@ -1,263 +0,0 @@
# Copyright 2014 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import importlib
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit.tests import testutils
class WindowsVirtualDiskUtilsTests(testutils.CloudbaseInitTestBase):
def setUp(self):
self._ctypes_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules',
{'ctypes': self._ctypes_mock})
self._module_patcher.start()
self.virtual_disk = importlib.import_module(
"cloudbaseinit.utils.windows.virtual_disk")
self.fake_path = mock.sentinel.fake_path
self._vdisk_class = self.virtual_disk.VirtualDisk(path=self.fake_path)
self.virtual_disk.virtdisk = None
self.virtual_disk.kernel32 = mock.MagicMock()
def tearDown(self):
self._module_patcher.stop()
def test_load_virtdisk_dll(self):
self._vdisk_class._load_virtdisk_dll()
self.assertEqual(self._ctypes_mock.windll.virtdisk,
self.virtual_disk.virtdisk)
@mock.patch('cloudbaseinit.utils.windows.virtual_disk'
'.Win32_VIRTUAL_STORAGE_TYPE')
@mock.patch('cloudbaseinit.utils.windows.virtual_disk'
'.get_WIN32_VIRTUAL_STORAGE_TYPE_VENDOR_MICROSOFT')
@mock.patch('cloudbaseinit.utils.windows.virtual_disk'
'.VirtualDisk._load_virtdisk_dll')
@mock.patch('cloudbaseinit.utils.windows.virtual_disk'
'.VirtualDisk.close')
def _test_open(self, mock_close, mock_load_virtdisk_dll,
mock_get_virtual_storage_type_vendor,
mock_Win32_VIRTUAL_STORAGE_TYPE, handle, ret_val):
virtdisk = self._ctypes_mock.windll.virtdisk
virtdisk.OpenVirtualDisk.return_value = ret_val
self.virtual_disk.virtdisk = virtdisk
self._vdisk_class._handle = None
if handle:
self._vdisk_class._handle = handle
if ret_val:
with self.assert_raises_windows_message(
"Cannot open virtual disk: %r",
ret_val):
self._vdisk_class.open()
else:
self._vdisk_class.open()
if handle:
mock_close.assert_called_once_with()
mock_load_virtdisk_dll.assert_called_once_with()
mock_Win32_VIRTUAL_STORAGE_TYPE.assert_called_once_with()
mock_get_virtual_storage_type_vendor.assert_called_once_with()
self.assertEqual(
self._vdisk_class.VIRTUAL_STORAGE_TYPE_DEVICE_ISO,
mock_Win32_VIRTUAL_STORAGE_TYPE.return_value.DeviceId)
self.assertEqual(self._ctypes_mock.wintypes.HANDLE.return_value,
self._vdisk_class._handle)
def test_open(self):
self._test_open(handle=None, ret_val=None)
def test_open_exception(self):
self._test_open(handle=None, ret_val=100)
def test_open_handle_exists(self):
self._test_open(handle=None, ret_val=None)
def _test_attach(self, ret_val):
virtdisk = self._ctypes_mock.windll.virtdisk
self.virtual_disk.virtdisk = virtdisk
virtdisk.AttachVirtualDisk.return_value = ret_val
if ret_val:
with self.assert_raises_windows_message(
"Cannot attach virtual disk: %r",
ret_val):
self._vdisk_class.attach()
else:
self._vdisk_class.attach()
virtdisk.AttachVirtualDisk.assert_called_once_with(
self._vdisk_class._handle, 0,
self._vdisk_class.ATTACH_VIRTUAL_DISK_FLAG_READ_ONLY, 0, 0, 0)
def test_attach(self):
self._test_attach(ret_val=None)
def test_attach_exception(self):
self._test_attach(ret_val=100)
def _test_detach(self, ret_val):
virtdisk = self._ctypes_mock.windll.virtdisk
self.virtual_disk.virtdisk = virtdisk
virtdisk.DetachVirtualDisk.return_value = ret_val
if ret_val:
with self.assert_raises_windows_message(
"Cannot detach virtual disk: %r", ret_val):
self._vdisk_class.detach()
else:
self._vdisk_class.detach()
virtdisk.DetachVirtualDisk.assert_called_once_with(
self._vdisk_class._handle,
self._vdisk_class.DETACH_VIRTUAL_DISK_FLAG_NONE, 0)
def test_detach(self):
self._test_detach(ret_val=None)
def test_detach_exception(self):
self._test_detach(ret_val=100)
def _test_get_physical_path(self, ret_val):
virtdisk = self._ctypes_mock.windll.virtdisk
self.virtual_disk.virtdisk = virtdisk
virtdisk.GetVirtualDiskPhysicalPath.return_value = ret_val
buf = self._ctypes_mock.create_unicode_buffer.return_value
if ret_val:
with self.assert_raises_windows_message(
"Cannot get virtual disk physical path: %r", ret_val):
self._vdisk_class.get_physical_path()
else:
response = self._vdisk_class.get_physical_path()
self.assertEqual(buf.value, response)
self._ctypes_mock.create_unicode_buffer.assert_called_once_with(1024)
self._ctypes_mock.wintypes.DWORD.assert_called_once_with(
self._ctypes_mock.sizeof.return_value)
self._ctypes_mock.sizeof.assert_called_once_with(
buf)
virtdisk.GetVirtualDiskPhysicalPath.assert_called_once_with(
self._vdisk_class._handle,
self._ctypes_mock.byref.return_value,
self._ctypes_mock.create_unicode_buffer.return_value)
self._ctypes_mock.byref.assert_called_once_with(
self._ctypes_mock.wintypes.DWORD.return_value)
self._ctypes_mock.create_unicode_buffer.assert_called_once_with(1024)
def test_get_physical_path(self):
self._test_get_physical_path(ret_val=None)
def test_get_physical_path_fails(self):
self._test_get_physical_path(ret_val=100)
@mock.patch('cloudbaseinit.utils.windows.virtual_disk'
'.VirtualDisk.get_physical_path')
def _test_get_cdrom_drive_mount_point(self, mock_get_physical_path,
buf_len, ret_val, last_error=None):
buf = self._ctypes_mock.create_unicode_buffer.return_value
kernel32 = self.virtual_disk.kernel32
kernel32.GetLogicalDriveStringsW.return_value = buf_len
kernel32.QueryDosDeviceW.return_value = ret_val
self._ctypes_mock.wstring_at.return_value = [mock.sentinel.value1,
mock.sentinel.value2]
dev = self._ctypes_mock.create_unicode_buffer.return_value
dev.value = mock_get_physical_path.return_value
self._ctypes_mock.sizeof.return_value = 1
expected_sizeof = [mock.call(buf),
mock.call(self._ctypes_mock.wintypes.WCHAR)]
expected_create_unicode_buffer = [mock.call(2048)]
if not buf_len:
with self.assert_raises_windows_message(
"Cannot enumerate logical devices: %r", last_error):
self._vdisk_class.get_cdrom_drive_mount_point()
elif not ret_val:
with self.assert_raises_windows_message(
"Cannot query NT device: %r", last_error):
self._vdisk_class.get_cdrom_drive_mount_point()
expected_create_unicode_buffer.append(mock.call(2048))
expected_sizeof.append(mock.call(self._ctypes_mock.wintypes.WCHAR))
expected_sizeof.append(
mock.call(
self._ctypes_mock.create_unicode_buffer.return_value))
expected_sizeof.append(mock.call(self._ctypes_mock.wintypes.WCHAR))
else:
response = self._vdisk_class.get_cdrom_drive_mount_point()
mock_get_physical_path.assert_called_once_with()
self._ctypes_mock.wstring_at.assert_called_once_with(
self._ctypes_mock.addressof.return_value + 0 * 1)
self._ctypes_mock.addressof.assert_called_once_with(buf)
kernel32.QueryDosDeviceW.assert_called_once_with(
[mock.sentinel.value1],
self._ctypes_mock.create_unicode_buffer.return_value, 1)
expected_sizeof.append(mock.call(self._ctypes_mock.wintypes.WCHAR))
expected_sizeof.append(
mock.call(
self._ctypes_mock.create_unicode_buffer.return_value))
expected_sizeof.append(mock.call(self._ctypes_mock.wintypes.WCHAR))
expected_create_unicode_buffer.append(mock.call(2048))
self.assertEqual(self._ctypes_mock.wstring_at.return_value[:-1],
response)
self.assertEqual(
expected_create_unicode_buffer,
self._ctypes_mock.create_unicode_buffer.call_args_list)
self.assertEqual(expected_sizeof,
self._ctypes_mock.sizeof.call_args_list)
kernel32.GetLogicalDriveStringsW.assert_called_once_with(1, buf)
def test_get_cdrom_drive_mount_point_exception_buf_len(self):
self._test_get_cdrom_drive_mount_point(buf_len=0, ret_val=1,
last_error=100)
def test_get_cdrom_drive_mount_point_exception_query(self):
self._test_get_cdrom_drive_mount_point(buf_len=1, ret_val=0,
last_error=100)
def test_get_cdrom_drive_mount_point(self):
self._test_get_cdrom_drive_mount_point(buf_len=1, ret_val=1)
def test_close(self):
self._vdisk_class.close()
self.virtual_disk.kernel32.CloseHandle.assert_called_once_with(
self._vdisk_class._handle)
self.assertEqual(0, self._vdisk_class._handle)

View File

@ -12,17 +12,24 @@
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import abc
import ctypes
from ctypes import windll
from ctypes import wintypes
import re
import six
import winioctlcon
from cloudbaseinit import exception
kernel32 = windll.kernel32
class Win32_DiskGeometry(ctypes.Structure):
FixedMedia = 12
_fields_ = [
@ -30,29 +37,193 @@ class Win32_DiskGeometry(ctypes.Structure):
('MediaType', wintypes.DWORD),
('TracksPerCylinder', wintypes.DWORD),
('SectorsPerTrack', wintypes.DWORD),
('BytesPerSector', wintypes.DWORD),
('BytesPerSector', wintypes.DWORD)
]
class PhysicalDisk(object):
class Win32_DRIVE_LAYOUT_INFORMATION_MBR(ctypes.Structure):
_fields_ = [
('Signature', wintypes.ULONG)
]
class GUID(ctypes.Structure):
_fields_ = [
("data1", wintypes.DWORD),
("data2", wintypes.WORD),
("data3", wintypes.WORD),
("data4", wintypes.BYTE * 8)
]
def __init__(self, l, w1, w2, b1, b2, b3, b4, b5, b6, b7, b8):
self.data1 = l
self.data2 = w1
self.data3 = w2
self.data4[0] = b1
self.data4[1] = b2
self.data4[2] = b3
self.data4[3] = b4
self.data4[4] = b5
self.data4[5] = b6
self.data4[6] = b7
self.data4[7] = b8
class Win32_DRIVE_LAYOUT_INFORMATION_GPT(ctypes.Structure):
_fields_ = [
('DiskId', GUID),
('StartingUsableOffset', wintypes.LARGE_INTEGER),
('UsableLength', wintypes.LARGE_INTEGER),
('MaxPartitionCount', wintypes.ULONG)
]
class DRIVE_FORMAT(ctypes.Union):
_fields_ = [
('Mbr', Win32_DRIVE_LAYOUT_INFORMATION_MBR),
('Gpt', Win32_DRIVE_LAYOUT_INFORMATION_GPT)
]
class Win32_PARTITION_INFORMATION_MBR(ctypes.Structure):
_fields_ = [
('PartitionType', wintypes.BYTE),
('BootIndicator', wintypes.BOOLEAN),
('RecognizedPartition', wintypes.BOOLEAN),
('HiddenSectors', wintypes.DWORD)
]
class Win32_PARTITION_INFORMATION_GPT(ctypes.Structure):
_fields_ = [
('PartitionType', GUID),
('PartitionId', GUID),
('Attributes', wintypes.ULARGE_INTEGER),
('Name', wintypes.WCHAR * 36)
]
class PARTITION_INFORMATION(ctypes.Union):
_fields_ = [
('Mbr', Win32_PARTITION_INFORMATION_MBR),
('Gpt', Win32_PARTITION_INFORMATION_GPT)
]
class Win32_PARTITION_INFORMATION_EX(ctypes.Structure):
_anonymous_ = ('PartitionInformation',)
_fields_ = [
('PartitionStyle', wintypes.DWORD),
('StartingOffset', wintypes.LARGE_INTEGER),
('PartitionLength', wintypes.LARGE_INTEGER),
('PartitionNumber', wintypes.DWORD),
('RewritePartition', wintypes.BOOLEAN),
('PartitionInformation', PARTITION_INFORMATION)
]
class Win32_DRIVE_LAYOUT_INFORMATION_EX(ctypes.Structure):
_anonymous_ = ('DriveFormat',)
_fields_ = [
('PartitionStyle', wintypes.DWORD),
('PartitionCount', wintypes.DWORD),
('DriveFormat', DRIVE_FORMAT),
('PartitionEntry', Win32_PARTITION_INFORMATION_EX * 128)
]
@six.add_metaclass(abc.ABCMeta)
class BaseDevice(object):
"""Base class for devices like disks and partitions.
It has common methods for getting physical disk geometry,
opening/closing the device and also seeking through it
for reading certain amounts of bytes.
"""
GENERIC_READ = 0x80000000
FILE_SHARE_READ = 1
OPEN_EXISTING = 3
FILE_ATTRIBUTE_READONLY = 1
INVALID_HANDLE_VALUE = -1
IOCTL_DISK_GET_DRIVE_GEOMETRY = 0x70000
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
FILE_BEGIN = 0
INVALID_SET_FILE_POINTER = 0xFFFFFFFF
def __init__(self, path):
self._path = path
self._handle = 0
self._geom = None
self._handle = None
self._sector_size = None
self._disk_size = None
self.fixed = None
def __repr__(self):
return "<{}: {}>".format(self.__class__.__name__, self._path)
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def _get_geometry(self):
"""Get details about the disk size bounds."""
geom = Win32_DiskGeometry()
bytes_returned = wintypes.DWORD()
ret_val = kernel32.DeviceIoControl(
self._handle,
winioctlcon.IOCTL_DISK_GET_DRIVE_GEOMETRY,
0,
0,
ctypes.byref(geom),
ctypes.sizeof(geom),
ctypes.byref(bytes_returned),
0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot get disk geometry: %r")
_sector_size = geom.BytesPerSector
_disk_size = (geom.Cylinders * geom.TracksPerCylinder *
geom.SectorsPerTrack * geom.BytesPerSector)
fixed = geom.MediaType == Win32_DiskGeometry.FixedMedia
return _sector_size, _disk_size, fixed
def _seek(self, offset):
high = wintypes.DWORD(offset >> 32)
low = wintypes.DWORD(offset & 0xFFFFFFFF)
ret_val = kernel32.SetFilePointer(self._handle, low,
ctypes.byref(high),
self.FILE_BEGIN)
if ret_val == self.INVALID_SET_FILE_POINTER:
raise exception.WindowsCloudbaseInitException(
"Seek error: %r")
def _read(self, size):
buff = ctypes.create_string_buffer(size)
bytes_read = wintypes.DWORD()
ret_val = kernel32.ReadFile(self._handle, buff, size,
ctypes.byref(bytes_read), 0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Read exception: %r")
return buff.raw[:bytes_read.value] # all bytes without the null byte
def open(self):
if self._handle:
self.close()
handle = kernel32.CreateFileW(
ctypes.c_wchar_p(self._path),
self.GENERIC_READ,
@ -62,49 +233,119 @@ class PhysicalDisk(object):
self.FILE_ATTRIBUTE_READONLY,
0)
if handle == self.INVALID_HANDLE_VALUE:
raise exception.CloudbaseInitException('Cannot open file')
raise exception.WindowsCloudbaseInitException(
'Cannot open file: %r')
self._handle = handle
self._sector_size, self._disk_size, self.fixed =\
self._get_geometry()
def close(self):
kernel32.CloseHandle(self._handle)
self._handle = 0
self._geom = None
def get_geometry(self):
if not self._geom:
geom = Win32_DiskGeometry()
bytes_returned = wintypes.DWORD()
ret_val = kernel32.DeviceIoControl(
self._handle,
self.IOCTL_DISK_GET_DRIVE_GEOMETRY,
0,
0,
ctypes.byref(geom),
ctypes.sizeof(geom),
ctypes.byref(bytes_returned),
0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot get disk geometry: %r")
self._geom = geom
return self._geom
if self._handle:
kernel32.CloseHandle(self._handle)
self._handle = None
def seek(self, offset):
high = wintypes.DWORD(offset >> 32)
low = wintypes.DWORD(offset & 0xFFFFFFFF)
"""Drive geometry safe seek.
ret_val = kernel32.SetFilePointer(self._handle, low,
ctypes.byref(high),
self.FILE_BEGIN)
if ret_val == self.INVALID_SET_FILE_POINTER:
raise exception.CloudbaseInitException("Seek error")
Seek for a given offset and return the valid set one.
"""
safe_offset = int(offset / self._sector_size) * self._sector_size
self._seek(safe_offset)
return safe_offset
def read(self, size, skip=0):
"""Drive geometry safe read.
Read and extract exactly the requested content.
"""
# Compute a size to fit both of the bytes we need to skip and
# also the minimum read size.
total = size + skip
safe_size = ((int(total / self._sector_size) +
bool(total % self._sector_size)) * self._sector_size)
content = self._read(safe_size)
return content[skip:total]
@abc.abstractmethod
def size(self):
"""Returns the size in bytes of the actual opened device."""
class Disk(BaseDevice):
"""Disk class with seek/read support.
It also has the capability of obtaining partition objects.
"""
PARTITION_ENTRY_UNUSED = 0
PARTITION_STYLE_MBR = 0
PARTITION_STYLE_GPT = 1
def _get_layout(self):
layout = Win32_DRIVE_LAYOUT_INFORMATION_EX()
bytes_returned = wintypes.DWORD()
ret_val = kernel32.DeviceIoControl(
self._handle,
winioctlcon.IOCTL_DISK_GET_DRIVE_LAYOUT_EX,
0,
0,
ctypes.byref(layout),
ctypes.sizeof(layout),
ctypes.byref(bytes_returned),
0)
def read(self, bytes_to_read):
buf = ctypes.create_string_buffer(bytes_to_read)
bytes_read = wintypes.DWORD()
ret_val = kernel32.ReadFile(self._handle, buf, bytes_to_read,
ctypes.byref(bytes_read), 0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Read exception: %r")
return buf, bytes_read.value
"Cannot get disk layout: %r")
return layout
def _get_partition_indexes(self, layout):
partition_style = layout.PartitionStyle
if partition_style not in (self.PARTITION_STYLE_MBR,
self.PARTITION_STYLE_GPT):
raise exception.CloudbaseInitException(
"Invalid partition style %r" % partition_style)
# If is GPT, then the count reflects the actual number of partitions
# but if is MBR, then the number of partitions is a multiple of 4
# and just the indexes for the used partitions must be saved.
partition_indexes = []
if partition_style == self.PARTITION_STYLE_GPT:
partition_indexes.extend(range(layout.PartitionCount))
else:
for idx in range(layout.PartitionCount):
if (layout.PartitionEntry[idx].Mbr.PartitionType !=
self.PARTITION_ENTRY_UNUSED):
partition_indexes.append(idx)
return partition_indexes
def partitions(self):
"""Return a list of partition objects available on disk."""
layout = self._get_layout()
partition_indexes = self._get_partition_indexes(layout)
# Create and return the partition objects containing their sizes.
partitions = []
disk_index = re.search(r"(disk|drive)(\d+)", self._path,
re.I | re.M).group(2)
for partition_index in partition_indexes:
path = r'\\?\GLOBALROOT\Device\Harddisk{}\Partition{}'.format(
disk_index, partition_index + 1)
size = layout.PartitionEntry[partition_index].PartitionLength
partition = Partition(path, size)
partitions.append(partition)
return partitions
@property
def size(self):
return self._disk_size
class Partition(BaseDevice):
"""Partition class with seek/read support."""
def __init__(self, path, size):
super(Partition, self).__init__(path)
self._partition_size = size
@property
def size(self):
return self._partition_size

View File

@ -1,151 +0,0 @@
# Copyright 2012 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
from ctypes import windll
from ctypes import wintypes
from cloudbaseinit import exception
kernel32 = windll.kernel32
# VirtDisk.dll is available starting from Windows Server 2008 R2 / Windows7
virtdisk = None
class Win32_GUID(ctypes.Structure):
_fields_ = [("Data1", wintypes.DWORD),
("Data2", wintypes.WORD),
("Data3", wintypes.WORD),
("Data4", wintypes.BYTE * 8)]
def get_WIN32_VIRTUAL_STORAGE_TYPE_VENDOR_MICROSOFT():
guid = Win32_GUID()
guid.Data1 = 0xec984aec
guid.Data2 = 0xa0f9
guid.Data3 = 0x47e9
ByteArray8 = wintypes.BYTE * 8
guid.Data4 = ByteArray8(0x90, 0x1f, 0x71, 0x41, 0x5a, 0x66, 0x34, 0x5b)
return guid
class Win32_VIRTUAL_STORAGE_TYPE(ctypes.Structure):
_fields_ = [
('DeviceId', wintypes.DWORD),
('VendorId', Win32_GUID)
]
class VirtualDisk(object):
VIRTUAL_STORAGE_TYPE_DEVICE_ISO = 1
VIRTUAL_DISK_ACCESS_ATTACH_RO = 0x10000
VIRTUAL_DISK_ACCESS_READ = 0xd0000
OPEN_VIRTUAL_DISK_FLAG_NONE = 0
DETACH_VIRTUAL_DISK_FLAG_NONE = 0
ATTACH_VIRTUAL_DISK_FLAG_READ_ONLY = 1
ATTACH_VIRTUAL_DISK_FLAG_NO_DRIVE_LETTER = 2
def __init__(self, path):
self._path = path
self._handle = 0
def _load_virtdisk_dll(self):
global virtdisk
if not virtdisk:
virtdisk = windll.virtdisk
def open(self):
if self._handle:
self.close()
self._load_virtdisk_dll()
vst = Win32_VIRTUAL_STORAGE_TYPE()
vst.DeviceId = self.VIRTUAL_STORAGE_TYPE_DEVICE_ISO
vst.VendorId = get_WIN32_VIRTUAL_STORAGE_TYPE_VENDOR_MICROSOFT()
handle = wintypes.HANDLE()
ret_val = virtdisk.OpenVirtualDisk(ctypes.byref(vst),
ctypes.c_wchar_p(self._path),
self.VIRTUAL_DISK_ACCESS_ATTACH_RO |
self.VIRTUAL_DISK_ACCESS_READ,
self.OPEN_VIRTUAL_DISK_FLAG_NONE, 0,
ctypes.byref(handle))
if ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot open virtual disk: %r", ret_val)
self._handle = handle
def attach(self):
ret_val = virtdisk.AttachVirtualDisk(
self._handle, 0, self.ATTACH_VIRTUAL_DISK_FLAG_READ_ONLY, 0, 0, 0)
if ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot attach virtual disk: %r", ret_val)
def detach(self):
ret_val = virtdisk.DetachVirtualDisk(
self._handle, self.DETACH_VIRTUAL_DISK_FLAG_NONE, 0)
if ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot detach virtual disk: %r", ret_val)
def get_physical_path(self):
buf = ctypes.create_unicode_buffer(1024)
bufLen = wintypes.DWORD(ctypes.sizeof(buf))
ret_val = virtdisk.GetVirtualDiskPhysicalPath(self._handle,
ctypes.byref(bufLen),
buf)
if ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot get virtual disk physical path: %r", ret_val)
return buf.value
def get_cdrom_drive_mount_point(self):
mount_point = None
buf = ctypes.create_unicode_buffer(2048)
buf_len = kernel32.GetLogicalDriveStringsW(
ctypes.sizeof(buf) / ctypes.sizeof(wintypes.WCHAR), buf)
if not buf_len:
raise exception.WindowsCloudbaseInitException(
"Cannot enumerate logical devices: %r")
cdrom_dev = self.get_physical_path().rsplit('\\')[-1].upper()
i = 0
while not mount_point and i < buf_len:
curr_drive = ctypes.wstring_at(ctypes.addressof(buf) + i *
ctypes.sizeof(wintypes.WCHAR))[:-1]
dev = ctypes.create_unicode_buffer(2048)
ret_val = kernel32.QueryDosDeviceW(curr_drive, dev,
ctypes.sizeof(dev) /
ctypes.sizeof(wintypes.WCHAR))
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot query NT device: %r")
if dev.value.rsplit('\\')[-1].upper() == cdrom_dev:
mount_point = curr_drive
else:
i += len(curr_drive) + 2
return mount_point
def close(self):
kernel32.CloseHandle(self._handle)
self._handle = 0