diff --git a/oslo_utils/imageutils/format_inspector.py b/oslo_utils/imageutils/format_inspector.py index 2e46a160..e7ce9741 100644 --- a/oslo_utils/imageutils/format_inspector.py +++ b/oslo_utils/imageutils/format_inspector.py @@ -1276,6 +1276,45 @@ class GPTInspector(FileInspector): raise SafetyViolation('GPT MBR has no partitions defined') +# The LUKSv1 format consists of a header with some metadata and key +# information followed by a bulk non-sparse data payload which is the +# encyrpted disk image. +# https://gitlab.com/cryptsetup/cryptsetup/-/wikis/LUKS-standard/on-disk-format.pdf +# LUKSv2 is a different but similar spec, which is not yet covered here (or +# in qemu). +class LUKSInspector(FileInspector): + NAME = 'luks' + + def _initialize(self): + self.new_region('header', CaptureRegion(0, 592)) + self.add_safety_check(SafetyCheck('version', self.check_version)) + + @property + def format_match(self): + return self.region('header').data[:6] == b'LUKS\xBA\xBE' + + @property + def header_items(self): + fields = struct.unpack('>6sh32s32s32sI', + self.region('header').data[:108]) + names = ['magic', 'version', 'cipher_alg', 'cipher_mode', 'hash', + 'payload_offset'] + return dict(zip(names, fields)) + + def check_version(self): + header = self.header_items + if header['version'] != 1: + raise SafetyViolation( + 'LUKS version %i is not supported' % header['version']) + + @property + def virtual_size(self): + # NOTE(danms): This will not be correct until/unless the whole stream + # has been read, since all we have is (effectively the size of the + # header. This is similar to how RawFileInspector works. + return super().virtual_size - self.header_items['payload_offset'] * 512 + + class InspectWrapper: """A file-like object that wraps another and detects the format. @@ -1437,6 +1476,7 @@ ALL_FORMATS = { 'qed': QEDInspector, 'iso': ISOInspector, 'gpt': GPTInspector, + 'luks': LUKSInspector, } diff --git a/oslo_utils/tests/imageutils/test_format_inspector.py b/oslo_utils/tests/imageutils/test_format_inspector.py index be7e4976..5b8b60ee 100644 --- a/oslo_utils/tests/imageutils/test_format_inspector.py +++ b/oslo_utils/tests/imageutils/test_format_inspector.py @@ -127,6 +127,14 @@ class TestFormatInspectors(test_base.BaseTestCase): self._created_files.append(fn) return fn + def _create_luks(self, image_size, subformat): + fn = tempfile.mktemp(suffix='.luks') + cmd = ['qemu-img', 'create', '-f', 'luks', + '--object', 'secret,id=sec0,data=secret-passphrase', + '-o', 'key-secret=sec0', fn, '%i' % image_size] + subprocess.check_output(' '.join(cmd), shell=True) + return fn + def _create_img( self, fmt, size, subformat=None, options=None, backing_file=None): @@ -143,6 +151,8 @@ class TestFormatInspectors(test_base.BaseTestCase): return self._create_iso(size, subformat) if fmt == 'gpt': return self._create_gpt(size, subformat) + if fmt == 'luks': + return self._create_luks(size, subformat) if fmt == 'vhd': # QEMU calls the vhd format vpc @@ -216,11 +226,22 @@ class TestFormatInspectors(test_base.BaseTestCase): def _test_format_at_block_size(self, format_name, img, block_size): wrapper = format_inspector.InspectWrapper(open(img, 'rb'), format_name) - + current_block_size = block_size while True: - chunk = wrapper.read(block_size) + chunk = wrapper.read(current_block_size) if not chunk: break + # If we've already settled on a format, the block size no longer + # really matters for correctness since we won't be capturing and + # parsing anything else. Bump up the block size so we will eat + # the rest of the file more efficiently. This matters for formats + # that are non-sparse and for which the virtual_size calculation + # relies on the actual size of the file (i.e. raw, gpt, luks, etc) + try: + if current_block_size == block_size and wrapper.format: + current_block_size = 64 * units.Ki + except Exception: + pass wrapper.close() self.assertIsNotNone(wrapper.format, 'Failed to detect format') @@ -279,7 +300,7 @@ class TestFormatInspectors(test_base.BaseTestCase): subformat=subformat, safety_check=True) - @ddt.data('qcow2', 'vhd', 'vhdx', 'vmdk', 'gpt') + @ddt.data('qcow2', 'vhd', 'vhdx', 'vmdk', 'gpt', 'luks') def test_format(self, format): self._test_format(format)