Merge "Fix x64 architecture related issues"

This commit is contained in:
Jenkins 2015-01-12 14:15:26 +00:00 committed by Gerrit Code Review
commit f6ba6c7481
4 changed files with 42 additions and 55 deletions

View File

@ -17,6 +17,7 @@ import ctypes
from ctypes import wintypes
import os
import re
import struct
import time
import pywintypes
@ -193,9 +194,8 @@ kernel32.DeviceIoControl.restype = wintypes.BOOL
kernel32.GetProcessHeap.argtypes = []
kernel32.GetProcessHeap.restype = wintypes.HANDLE
# Note: wintypes.ULONG must be replaced with a 64 bit variable on x64
kernel32.HeapAlloc.argtypes = [wintypes.HANDLE, wintypes.DWORD,
wintypes.ULONG]
ctypes.c_size_t]
kernel32.HeapAlloc.restype = wintypes.LPVOID
kernel32.HeapFree.argtypes = [wintypes.HANDLE, wintypes.DWORD,
@ -689,7 +689,7 @@ class WindowsUtils(base.BaseOSUtils):
heap = kernel32.GetProcessHeap()
size = wintypes.ULONG(ctypes.sizeof(Win32_MIB_IPFORWARDTABLE))
p = kernel32.HeapAlloc(heap, 0, size)
p = kernel32.HeapAlloc(heap, 0, ctypes.c_size_t(size.value))
if not p:
raise exception.CloudbaseInitException(
'Unable to allocate memory for the IP forward table')
@ -701,7 +701,7 @@ class WindowsUtils(base.BaseOSUtils):
ctypes.byref(size), 0)
if err == self.ERROR_INSUFFICIENT_BUFFER:
kernel32.HeapFree(heap, 0, p_forward_table)
p = kernel32.HeapAlloc(heap, 0, size)
p = kernel32.HeapAlloc(heap, 0, ctypes.c_size_t(size.value))
if not p:
raise exception.CloudbaseInitException(
'Unable to allocate memory for the IP forward table')
@ -828,6 +828,10 @@ class WindowsUtils(base.BaseOSUtils):
return [d for d in drives if kernel32.GetDriveTypeW(d) ==
self.DRIVE_CDROM]
def _is_64bit_arch(self):
# interpreter's bits
return struct.calcsize("P") == 8
def get_physical_disks(self):
physical_disks = []
@ -860,15 +864,17 @@ class WindowsUtils(base.BaseOSUtils):
"SetupDiGetDeviceInterfaceDetailW failed")
pdidd = ctypes.cast(
msvcrt.malloc(required_size),
msvcrt.malloc(ctypes.c_size_t(required_size.value)),
ctypes.POINTER(Win32_SP_DEVICE_INTERFACE_DETAIL_DATA_W))
try:
# NOTE(alexpilotti): the size provided by ctypes.sizeof
# is not the expected one
# pdidd.contents.cbSize = ctypes.sizeof(
# Win32_SP_DEVICE_INTERFACE_DETAIL_DATA_W)
pdidd.contents.cbSize = 6
pdidd.contents.cbSize = ctypes.sizeof(
Win32_SP_DEVICE_INTERFACE_DETAIL_DATA_W)
if not self._is_64bit_arch():
# NOTE(cpoieana): For some reason, on x86 platforms
# the alignment or content of the struct
# is not taken into consideration.
pdidd.contents.cbSize = 6
if not setupapi.SetupDiGetDeviceInterfaceDetailW(
handle_disks, ctypes.byref(did), pdidd,
@ -942,11 +948,7 @@ class WindowsUtils(base.BaseOSUtils):
fw_profile = fw_profile.GloballyOpenPorts.Remove(port, fw_protocol)
def is_wow64(self):
ret_val = wintypes.BOOL()
if not kernel32.IsWow64Process(kernel32.GetCurrentProcess(),
ctypes.byref(ret_val)):
raise exception.CloudbaseInitException("IsWow64Process failed")
return bool(ret_val.value)
return win32process.IsWow64Process()
def get_system32_dir(self):
return os.path.expandvars('%windir%\\system32')

View File

@ -27,7 +27,7 @@ from cloudbaseinit.tests import fake
CONF = cfg.CONF
class WindowsUtilsTest(unittest.TestCase):
class TestWindowsUtils(unittest.TestCase):
'''Tests for the windows utils class.'''
_CONFIG_NAME = 'FakeConfig'
@ -1055,9 +1055,15 @@ class WindowsUtilsTest(unittest.TestCase):
mock_msvcrt, handle_disks, last_error,
interface_detail, disk_handle, io_control):
sizeof_calls = [mock.call(
self.windows_utils.Win32_SP_DEVICE_INTERFACE_DATA),
mock.call(mock_sdn())]
sizeof_calls = [
mock.call(
self.windows_utils.Win32_SP_DEVICE_INTERFACE_DATA
),
mock.call(
self.windows_utils.Win32_SP_DEVICE_INTERFACE_DETAIL_DATA_W
),
mock.call(mock_sdn())
]
device_interfaces_calls = [
mock.call(
handle_disks, None, self._ctypes_mock.byref.return_value, 0,
@ -1106,7 +1112,8 @@ class WindowsUtilsTest(unittest.TestCase):
self._ctypes_mock.POINTER.assert_called_with(
self.windows_utils.Win32_SP_DEVICE_INTERFACE_DETAIL_DATA_W)
mock_msvcrt.malloc.assert_called_with(
self._wintypes_mock.DWORD.return_value)
self._ctypes_mock.c_size_t.return_value
)
self.assertEqual(cast_calls, self._ctypes_mock.cast.call_args_list)
@ -1201,33 +1208,6 @@ class WindowsUtilsTest(unittest.TestCase):
mock_get_fw_protocol.assert_called_once_with(
self._winutils.PROTOCOL_TCP)
@mock.patch('cloudbaseinit.osutils.windows.kernel32.IsWow64Process')
@mock.patch('cloudbaseinit.osutils.windows.kernel32.GetCurrentProcess')
def _test_is_wow64(self, mock_GetCurrentProcess,
mock_IsWow64Process, ret_val):
mock_IsWow64Process.return_value = ret_val
self._wintypes_mock.BOOL.return_value.value = ret_val
if ret_val is False:
self.assertRaises(exception.CloudbaseInitException,
self._winutils.is_wow64)
else:
response = self._winutils.is_wow64()
self._ctypes_mock.byref.assert_called_once_with(
self._wintypes_mock.BOOL.return_value)
mock_IsWow64Process.assert_called_once_with(
mock_GetCurrentProcess(), self._ctypes_mock.byref.return_value)
self.assertTrue(response)
def test_is_wow64(self):
self._test_is_wow64(ret_val=True)
def test_is_wow64_exception(self):
self._test_is_wow64(ret_val=False)
@mock.patch('os.path.expandvars')
def test_get_system32_dir(self, mock_expandvars):
mock_expandvars.return_value = 'fake_system32'

View File

@ -45,6 +45,7 @@ class CryptoAPICertManagerTests(unittest.TestCase):
def _test_get_cert_thumprint(self, mock_CertGetCertificateContextProperty,
mock_malloc, mock_free, ret_val):
mock_DWORD = self._ctypes.wintypes.DWORD
mock_CSIZET = self._ctypes.c_size_t
mock_cast = self._ctypes.cast
mock_POINTER = self._ctypes.POINTER
mock_byref = self._ctypes.byref
@ -52,6 +53,7 @@ class CryptoAPICertManagerTests(unittest.TestCase):
mock_pointer = mock.MagicMock()
fake_cert_context_p = 'fake context'
mock_DWORD.return_value.value = 10
mock_CSIZET.return_value.value = mock_DWORD.return_value.value
mock_CertGetCertificateContextProperty.return_value = ret_val
mock_POINTER.return_value = mock_pointer
mock_cast.return_value.contents = [16]
@ -76,7 +78,7 @@ class CryptoAPICertManagerTests(unittest.TestCase):
expected,
mock_CertGetCertificateContextProperty.call_args_list)
mock_malloc.assert_called_with(mock_DWORD.return_value)
mock_malloc.assert_called_with(mock_CSIZET.return_value)
mock_cast.assert_called_with(mock_malloc(), mock_pointer)
mock_free.assert_called_with(mock_malloc())
self.assertEqual('10', response)

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -14,17 +12,19 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import ctypes
import six
from ctypes import wintypes
import uuid
from ctypes import wintypes
import six
from cloudbaseinit.utils import encoding
from cloudbaseinit.utils.windows import cryptoapi
from cloudbaseinit.utils import x509constants
malloc = ctypes.cdll.msvcrt.malloc
malloc.restype = ctypes.c_void_p
malloc.argtypes = [ctypes.c_size_t]
@ -51,7 +51,8 @@ class CryptoAPICertManager(object):
None, ctypes.byref(thumprint_len)):
raise cryptoapi.CryptoAPIException()
thumbprint = malloc(thumprint_len)
size = ctypes.c_size_t(thumprint_len.value)
thumbprint = malloc(size)
if not cryptoapi.CertGetCertificateContextProperty(
cert_context_p,
@ -127,7 +128,8 @@ class CryptoAPICertManager(object):
None):
raise cryptoapi.CryptoAPIException()
subject_encoded = ctypes.cast(malloc(subject_encoded_len),
size = ctypes.c_size_t(subject_encoded_len.value)
subject_encoded = ctypes.cast(malloc(size),
ctypes.POINTER(wintypes.BYTE))
if not cryptoapi.CertStrToName(cryptoapi.X509_ASN_ENCODING,
@ -230,7 +232,8 @@ class CryptoAPICertManager(object):
None, None):
raise cryptoapi.CryptoAPIException()
cert_encoded = ctypes.cast(malloc(cert_encoded_len),
size = ctypes.c_size_t(cert_encoded_len.value)
cert_encoded = ctypes.cast(malloc(size),
ctypes.POINTER(wintypes.BYTE))
if not cryptoapi.CryptStringToBinaryA(