From c1d36945ff0703beb01a176f3f77a639c4ce1f4b Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Fri, 29 Sep 2023 17:58:41 +0200 Subject: [PATCH 1/6] Add LUKS volume support --- dissect/target/volume.py | 4 +- dissect/target/volumes/luks.py | 99 ++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 dissect/target/volumes/luks.py diff --git a/dissect/target/volume.py b/dissect/target/volume.py index 8e8d6adc4..5eea0fc20 100644 --- a/dissect/target/volume.py +++ b/dissect/target/volume.py @@ -24,6 +24,8 @@ """A lazy import of :mod:`dissect.target.volumes.md`.""" bde = import_lazy("dissect.target.volumes.bde") """A lazy import of :mod:`dissect.target.volumes.bde`.""" +luks = import_lazy("dissect.target.volumes.luks") +"""A lazy import of :mod:`dissect.target.volumes.luks`.""" log = logging.getLogger(__name__) """A logger instance for this module.""" @@ -34,7 +36,7 @@ md.MdVolumeSystem, ] """All available :class:`LogicalVolumeSystem` classes.""" -ENCRYPTED_VOLUME_MANAGERS: list[type[EncryptedVolumeSystem]] = [bde.BitlockerVolumeSystem] +ENCRYPTED_VOLUME_MANAGERS: list[type[EncryptedVolumeSystem]] = [bde.BitlockerVolumeSystem, luks.LUKSVolumeSystem] """All available :class:`EncryptedVolumeSystem` classes.""" diff --git a/dissect/target/volumes/luks.py b/dissect/target/volumes/luks.py new file mode 100644 index 000000000..349958641 --- /dev/null +++ b/dissect/target/volumes/luks.py @@ -0,0 +1,99 @@ +import logging +import pathlib +from typing import BinaryIO, Iterator, Optional, Union + +from dissect.fve import luks +from dissect.util.stream import AlignedStream + +from dissect.target.exceptions import VolumeSystemError +from dissect.target.helpers.keychain import KeyType +from dissect.target.volume import EncryptedVolumeSystem, Volume + +log = logging.getLogger(__name__) + + +class LUKSVolumeSystemError(VolumeSystemError): + pass + + +class LUKSVolumeSystem(EncryptedVolumeSystem): + PROVIDER = "luks" + + def __init__(self, fh: Union[BinaryIO, list[BinaryIO]], *args, **kwargs): + super().__init__(fh, *args, **kwargs) + self.luks = luks.LUKS(fh) + + @staticmethod + def _detect(fh: BinaryIO) -> bool: + return luks.is_luks_volume(fh) + + def _volumes(self) -> Iterator[Volume]: + if isinstance(self.fh, Volume): + volume_details = dict( + number=self.fh.number, + offset=self.fh.offset, + vtype=self.fh.type, + name=self.fh.name, + ) + else: + volume_details = dict( + number=1, + offset=0, + vtype=None, + name=None, + ) + + stream = self.unlock_volume() + yield Volume( + fh=stream, + size=stream.size, + raw=self.fh, + vs=self, + **volume_details, + ) + + def unlock_with_passphrase(self, passphrase: str, keyslot: Optional[int] = None) -> None: + try: + self.luks.unlock_with_passphrase(passphrase, keyslot) + log.debug("Unlocked LUKS volume with provided passphrase") + except ValueError: + log.exception("Failed to unlock LUKS volume with provided passphrase") + + def unlock_with_key_file(self, key_file: pathlib.Path, keyslot: Optional[int] = None) -> None: + if not key_file.exists(): + log.error("Provided key file does not exist: %s", key_file) + return + + try: + self.luks.unlock_with_key_file(key_file, keyslot=keyslot) + log.debug("Unlocked LUKS volume with key file %s", key_file) + except ValueError: + log.exception("Failed to unlock LUKS volume with key file %s", key_file) + + def unlock_volume(self) -> AlignedStream: + import ipdb + + ipdb.set_trace() + keyslots = list(map(str, self.luks.keyslots.keys())) + keys = self.get_keys_for_identifiers(keyslots) + self.get_keys_without_identifier() + + for key in keys: + try: + keyslot = int(key.identifier) + except Exception: + keyslot = None + + if key.key_type == KeyType.PASSPHRASE: + self.unlock_with_passphrase(key.value, keyslot) + elif key.key_type == KeyType.FILE: + key_file = pathlib.Path(key.value) + self.unlock_with_key_file(key_file, keyslot) + + if self.luks.unlocked: + log.info("Volume %s unlocked with %s (keyslot: %d)", self.fh, key, self.luks._active_keyslot_id) + break + + if self.luks.unlocked: + return self.luks.open() + else: + raise LUKSVolumeSystemError("Failed to unlock LUKS volume") From f522b6f98c07d8c4dd9c26f928e7a1ac68aea5c9 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Fri, 29 Sep 2023 17:59:30 +0200 Subject: [PATCH 2/6] Remove debug code --- dissect/target/volumes/luks.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/dissect/target/volumes/luks.py b/dissect/target/volumes/luks.py index 349958641..d7802d513 100644 --- a/dissect/target/volumes/luks.py +++ b/dissect/target/volumes/luks.py @@ -71,9 +71,6 @@ def unlock_with_key_file(self, key_file: pathlib.Path, keyslot: Optional[int] = log.exception("Failed to unlock LUKS volume with key file %s", key_file) def unlock_volume(self) -> AlignedStream: - import ipdb - - ipdb.set_trace() keyslots = list(map(str, self.luks.keyslots.keys())) keys = self.get_keys_for_identifiers(keyslots) + self.get_keys_without_identifier() From cc0cf63800190fef224cfe43a981c9b409b44ecc Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Wed, 11 Oct 2023 16:53:43 +0200 Subject: [PATCH 3/6] Add support for opening LUKS with volume key --- dissect/target/helpers/keychain.py | 30 +++++++++++++++++++----------- dissect/target/volumes/luks.py | 20 ++++++++++++++++++++ 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/dissect/target/helpers/keychain.py b/dissect/target/helpers/keychain.py index b5dd290cf..a59ee508f 100644 --- a/dissect/target/helpers/keychain.py +++ b/dissect/target/helpers/keychain.py @@ -2,7 +2,7 @@ import logging from enum import Enum from pathlib import Path -from typing import List, NamedTuple, Set +from typing import NamedTuple, Union log = logging.getLogger(__name__) @@ -11,33 +11,41 @@ class KeyType(Enum): PASSPHRASE = "passphrase" RECOVERY_KEY = "recovery_key" FILE = "file" + RAW = "raw" class Key(NamedTuple): key_type: KeyType - value: str + value: Union[str, bytes] provider: str = None identifier: str = None -KEYCHAIN: Set[Key] = set() +KEYCHAIN: set[Key] = set() -def register_key(key_type: KeyType, value: str, identifier: str = None, provider: str = None): - key = Key(provider=provider, key_type=key_type, value=value, identifier=identifier) +def register_key(key_type: KeyType, value: str, identifier: str = None, provider: str = None) -> None: + if key_type == KeyType.RAW: + try: + value = bytes.fromhex(value) + except ValueError: + log.warning("Failed to decode raw key as hex, ignoring: %s", value) + return + + key = Key(key_type, value, provider, identifier) KEYCHAIN.add(key) log.info("Registered key %s", key) -def get_all_keys() -> List[Key]: +def get_all_keys() -> list[Key]: return list(KEYCHAIN) -def get_keys_for_provider(provider: str) -> List[Key]: +def get_keys_for_provider(provider: str) -> list[Key]: return [key for key in KEYCHAIN if key.provider and key.provider.lower() == provider.lower()] -def get_keys_without_provider() -> List[Key]: +def get_keys_without_provider() -> list[Key]: return [key for key in KEYCHAIN if not key.provider] @@ -48,12 +56,12 @@ def parse_key_type(key_type_name: str) -> KeyType: raise ValueError("No KeyType enum values that match %s", key_type_name) -def register_wildcard_value(value: str): +def register_wildcard_value(value: str) -> None: for key_type in KeyType: - register_key(key_type=key_type, value=value) + register_key(key_type, value) -def register_keychain_file(keychain_path: Path): +def register_keychain_file(keychain_path: Path) -> None: """Register all keys from provided keychain file. The keychain file is a CSV file in "excel" dialect with the columns: diff --git a/dissect/target/volumes/luks.py b/dissect/target/volumes/luks.py index d7802d513..d673187cd 100644 --- a/dissect/target/volumes/luks.py +++ b/dissect/target/volumes/luks.py @@ -52,6 +52,24 @@ def _volumes(self) -> Iterator[Volume]: **volume_details, ) + def unlock_with_volume_encryption_key(self, key: bytes, keyslot: Optional[int] = None) -> None: + try: + if keyslot is None: + for keyslot in self.luks.keyslots.keys(): + try: + self.luks.unlock(key, keyslot) + break + except ValueError: + continue + else: + raise ValueError("Failed to find matching keyslot for provided volume encryption key") + else: + self.luks.unlock(key, keyslot) + + log.debug("Unlocked LUKS volume with provided volume encryption key") + except ValueError: + log.exception("Failed to unlock LUKS volume with provided volume encryption key") + def unlock_with_passphrase(self, passphrase: str, keyslot: Optional[int] = None) -> None: try: self.luks.unlock_with_passphrase(passphrase, keyslot) @@ -80,6 +98,8 @@ def unlock_volume(self) -> AlignedStream: except Exception: keyslot = None + if key.key_type == KeyType.RAW: + self.unlock_with_volume_encryption_key(key.value, keyslot) if key.key_type == KeyType.PASSPHRASE: self.unlock_with_passphrase(key.value, keyslot) elif key.key_type == KeyType.FILE: From 8fa79efbe7788b6c71072bd768b9bfe0d7365f23 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Wed, 11 Oct 2023 17:24:24 +0200 Subject: [PATCH 4/6] Fix unit test --- tests/test_helpers_keychain.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/test_helpers_keychain.py b/tests/test_helpers_keychain.py index 1adefdd6d..265928d6d 100644 --- a/tests/test_helpers_keychain.py +++ b/tests/test_helpers_keychain.py @@ -27,5 +27,11 @@ def test_keychain_register_keychain_file(guarded_keychain): def test_keychain_register_wildcard_value(guarded_keychain): keychain.register_wildcard_value("test-value") - # number of keys registered is equal number of supported key types + # Number of keys registered is equal number of supported key types, minus one for an invalid raw key + assert len(keychain.get_keys_without_provider()) == len(keychain.KeyType) - 1 + + keychain.KEYCHAIN.clear() + keychain.register_wildcard_value("0000") + + # Valid raw key now included assert len(keychain.get_keys_without_provider()) == len(keychain.KeyType) From 57c2335a9ef0acf70749af61cf013c3fc1bab0a4 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Sun, 22 Oct 2023 00:26:10 +0200 Subject: [PATCH 5/6] Change keychain list and order for improved logs --- dissect/target/helpers/keychain.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dissect/target/helpers/keychain.py b/dissect/target/helpers/keychain.py index a59ee508f..5c3530cc7 100644 --- a/dissect/target/helpers/keychain.py +++ b/dissect/target/helpers/keychain.py @@ -8,10 +8,10 @@ class KeyType(Enum): + RAW = "raw" PASSPHRASE = "passphrase" RECOVERY_KEY = "recovery_key" FILE = "file" - RAW = "raw" class Key(NamedTuple): @@ -21,7 +21,7 @@ class Key(NamedTuple): identifier: str = None -KEYCHAIN: set[Key] = set() +KEYCHAIN: list[Key] = [] def register_key(key_type: KeyType, value: str, identifier: str = None, provider: str = None) -> None: @@ -33,12 +33,12 @@ def register_key(key_type: KeyType, value: str, identifier: str = None, provider return key = Key(key_type, value, provider, identifier) - KEYCHAIN.add(key) + KEYCHAIN.append(key) log.info("Registered key %s", key) def get_all_keys() -> list[Key]: - return list(KEYCHAIN) + return KEYCHAIN[:] def get_keys_for_provider(provider: str) -> list[Key]: From 73826c7ba29829aa29e8c74cda3da136fefd0bca Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Tue, 31 Oct 2023 09:58:39 +0100 Subject: [PATCH 6/6] Improve the error logging in volume.py --- dissect/target/volume.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dissect/target/volume.py b/dissect/target/volume.py index 5eea0fc20..0fbdc935f 100644 --- a/dissect/target/volume.py +++ b/dissect/target/volume.py @@ -389,8 +389,11 @@ def open_encrypted(volume: BinaryIO) -> Iterator[Volume]: except ImportError as e: log.info("Failed to import %s", manager_cls) log.debug("", exc_info=e) - except VolumeSystemError: - log.exception(f"Failed to open an encrypted volume {volume} with volume manager {manager_cls}") + except Exception as e: + log.error( + "Failed to open an encrypted volume %s with volume manager %s: %s", volume, manager_cls.PROVIDER, e + ) + log.debug("", exc_info=e) return None