# Copyright 2020 Red Hat, Inc # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import io import os import struct import subprocess import tempfile from unittest import mock import ddt from oslo_utils import units from oslo_utils.imageutils import format_inspector from oslo_utils.imageutils import QemuImgInfo from oslotest import base as test_base TEST_IMAGE_PREFIX = 'oslo-unittest-formatinspector-' def get_size_format_from_qemu_img(filename): output = subprocess.check_output( 'qemu-img info --output=json "%s"' % filename, shell=True) info = QemuImgInfo(output, format='json') return info.virtual_size, info.file_format @ddt.ddt class TestFormatInspectors(test_base.BaseTestCase): def setUp(self): super(TestFormatInspectors, self).setUp() self._created_files = [] def tearDown(self): super(TestFormatInspectors, self).tearDown() for fn in self._created_files: try: os.remove(fn) except Exception: pass def _create_iso(self, image_size, subformat='9660'): """Create an ISO file of the given size. :param image_size: The size of the image to create in bytes :param subformat: The subformat to use, if any """ # these tests depend on mkisofs # being installed and in the path, # if it is not installed, skip try: subprocess.check_output('mkisofs --version', shell=True) except Exception: self.skipTest('mkisofs not installed') size = image_size // units.Mi base_cmd = "mkisofs" if subformat == 'udf': # depending on the distribution mkisofs may not support udf # and may be provided by genisoimage instead. As a result we # need to check if the command supports udf via help # instead of checking the installed version. # mkisofs --help outputs to stderr so we need to # redirect it to stdout to use grep. try: subprocess.check_output( 'mkisofs --help 2>&1 | grep udf', shell=True) except Exception: self.skipTest('mkisofs does not support udf format') base_cmd += " -udf" prefix = TEST_IMAGE_PREFIX prefix += '-%s-' % subformat fn = tempfile.mktemp(prefix=prefix, suffix='.iso') self._created_files.append(fn) subprocess.check_output( 'dd if=/dev/zero of=%s bs=1M count=%i' % (fn, size), shell=True) # We need to use different file as input and output as the behavior # of mkisofs is version dependent if both the input and the output # are the same and can cause test failures out_fn = "%s.iso" % fn subprocess.check_output( '%s -V "TEST" -o %s %s' % (base_cmd, out_fn, fn), shell=True) self._created_files.append(out_fn) return out_fn def _create_gpt(self, image_size, subformat='gpt'): data = bytearray(b'\x00' * 512 * 10) # The last two bytes of the first sector is the little-endian signature # value 0xAA55 data[510:512] = b'\x55\xAA' # This is one EFI Protective MBR partition in the first PTE slot, # which is 16 bytes starting at offset 446. data[446:446 + 16] = struct.pack('