Merge "Add LUKSv1 inspector"
This commit is contained in:
commit
7ab82d4a60
@ -1276,6 +1276,45 @@ class GPTInspector(FileInspector):
|
|||||||
raise SafetyViolation('GPT MBR has no partitions defined')
|
raise SafetyViolation('GPT MBR has no partitions defined')
|
||||||
|
|
||||||
|
|
||||||
|
# The LUKSv1 format consists of a header with some metadata and key
|
||||||
|
# information followed by a bulk non-sparse data payload which is the
|
||||||
|
# encyrpted disk image.
|
||||||
|
# https://gitlab.com/cryptsetup/cryptsetup/-/wikis/LUKS-standard/on-disk-format.pdf
|
||||||
|
# LUKSv2 is a different but similar spec, which is not yet covered here (or
|
||||||
|
# in qemu).
|
||||||
|
class LUKSInspector(FileInspector):
|
||||||
|
NAME = 'luks'
|
||||||
|
|
||||||
|
def _initialize(self):
|
||||||
|
self.new_region('header', CaptureRegion(0, 592))
|
||||||
|
self.add_safety_check(SafetyCheck('version', self.check_version))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def format_match(self):
|
||||||
|
return self.region('header').data[:6] == b'LUKS\xBA\xBE'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def header_items(self):
|
||||||
|
fields = struct.unpack('>6sh32s32s32sI',
|
||||||
|
self.region('header').data[:108])
|
||||||
|
names = ['magic', 'version', 'cipher_alg', 'cipher_mode', 'hash',
|
||||||
|
'payload_offset']
|
||||||
|
return dict(zip(names, fields))
|
||||||
|
|
||||||
|
def check_version(self):
|
||||||
|
header = self.header_items
|
||||||
|
if header['version'] != 1:
|
||||||
|
raise SafetyViolation(
|
||||||
|
'LUKS version %i is not supported' % header['version'])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def virtual_size(self):
|
||||||
|
# NOTE(danms): This will not be correct until/unless the whole stream
|
||||||
|
# has been read, since all we have is (effectively the size of the
|
||||||
|
# header. This is similar to how RawFileInspector works.
|
||||||
|
return super().virtual_size - self.header_items['payload_offset'] * 512
|
||||||
|
|
||||||
|
|
||||||
class InspectWrapper:
|
class InspectWrapper:
|
||||||
"""A file-like object that wraps another and detects the format.
|
"""A file-like object that wraps another and detects the format.
|
||||||
|
|
||||||
@ -1437,6 +1476,7 @@ ALL_FORMATS = {
|
|||||||
'qed': QEDInspector,
|
'qed': QEDInspector,
|
||||||
'iso': ISOInspector,
|
'iso': ISOInspector,
|
||||||
'gpt': GPTInspector,
|
'gpt': GPTInspector,
|
||||||
|
'luks': LUKSInspector,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -127,6 +127,14 @@ class TestFormatInspectors(test_base.BaseTestCase):
|
|||||||
self._created_files.append(fn)
|
self._created_files.append(fn)
|
||||||
return fn
|
return fn
|
||||||
|
|
||||||
|
def _create_luks(self, image_size, subformat):
|
||||||
|
fn = tempfile.mktemp(suffix='.luks')
|
||||||
|
cmd = ['qemu-img', 'create', '-f', 'luks',
|
||||||
|
'--object', 'secret,id=sec0,data=secret-passphrase',
|
||||||
|
'-o', 'key-secret=sec0', fn, '%i' % image_size]
|
||||||
|
subprocess.check_output(' '.join(cmd), shell=True)
|
||||||
|
return fn
|
||||||
|
|
||||||
def _create_img(
|
def _create_img(
|
||||||
self, fmt, size, subformat=None, options=None,
|
self, fmt, size, subformat=None, options=None,
|
||||||
backing_file=None):
|
backing_file=None):
|
||||||
@ -143,6 +151,8 @@ class TestFormatInspectors(test_base.BaseTestCase):
|
|||||||
return self._create_iso(size, subformat)
|
return self._create_iso(size, subformat)
|
||||||
if fmt == 'gpt':
|
if fmt == 'gpt':
|
||||||
return self._create_gpt(size, subformat)
|
return self._create_gpt(size, subformat)
|
||||||
|
if fmt == 'luks':
|
||||||
|
return self._create_luks(size, subformat)
|
||||||
|
|
||||||
if fmt == 'vhd':
|
if fmt == 'vhd':
|
||||||
# QEMU calls the vhd format vpc
|
# QEMU calls the vhd format vpc
|
||||||
@ -216,11 +226,22 @@ class TestFormatInspectors(test_base.BaseTestCase):
|
|||||||
def _test_format_at_block_size(self, format_name, img, block_size):
|
def _test_format_at_block_size(self, format_name, img, block_size):
|
||||||
wrapper = format_inspector.InspectWrapper(open(img, 'rb'),
|
wrapper = format_inspector.InspectWrapper(open(img, 'rb'),
|
||||||
format_name)
|
format_name)
|
||||||
|
current_block_size = block_size
|
||||||
while True:
|
while True:
|
||||||
chunk = wrapper.read(block_size)
|
chunk = wrapper.read(current_block_size)
|
||||||
if not chunk:
|
if not chunk:
|
||||||
break
|
break
|
||||||
|
# If we've already settled on a format, the block size no longer
|
||||||
|
# really matters for correctness since we won't be capturing and
|
||||||
|
# parsing anything else. Bump up the block size so we will eat
|
||||||
|
# the rest of the file more efficiently. This matters for formats
|
||||||
|
# that are non-sparse and for which the virtual_size calculation
|
||||||
|
# relies on the actual size of the file (i.e. raw, gpt, luks, etc)
|
||||||
|
try:
|
||||||
|
if current_block_size == block_size and wrapper.format:
|
||||||
|
current_block_size = 64 * units.Ki
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
wrapper.close()
|
wrapper.close()
|
||||||
self.assertIsNotNone(wrapper.format, 'Failed to detect format')
|
self.assertIsNotNone(wrapper.format, 'Failed to detect format')
|
||||||
@ -279,7 +300,7 @@ class TestFormatInspectors(test_base.BaseTestCase):
|
|||||||
subformat=subformat,
|
subformat=subformat,
|
||||||
safety_check=True)
|
safety_check=True)
|
||||||
|
|
||||||
@ddt.data('qcow2', 'vhd', 'vhdx', 'vmdk', 'gpt')
|
@ddt.data('qcow2', 'vhd', 'vhdx', 'vmdk', 'gpt', 'luks')
|
||||||
def test_format(self, format):
|
def test_format(self, format):
|
||||||
self._test_format(format)
|
self._test_format(format)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user