diff --git a/dissect/target/helpers/keychain.py b/dissect/target/helpers/keychain.py index b5dd290cf..5c3530cc7 100644 --- a/dissect/target/helpers/keychain.py +++ b/dissect/target/helpers/keychain.py @@ -2,12 +2,13 @@ import logging from enum import Enum from pathlib import Path -from typing import List, NamedTuple, Set +from typing import NamedTuple, Union log = logging.getLogger(__name__) class KeyType(Enum): + RAW = "raw" PASSPHRASE = "passphrase" RECOVERY_KEY = "recovery_key" FILE = "file" @@ -15,29 +16,36 @@ class KeyType(Enum): class Key(NamedTuple): key_type: KeyType - value: str + value: Union[str, bytes] provider: str = None identifier: str = None -KEYCHAIN: Set[Key] = set() +KEYCHAIN: list[Key] = [] -def register_key(key_type: KeyType, value: str, identifier: str = None, provider: str = None): - key = Key(provider=provider, key_type=key_type, value=value, identifier=identifier) - KEYCHAIN.add(key) +def register_key(key_type: KeyType, value: str, identifier: str = None, provider: str = None) -> None: + if key_type == KeyType.RAW: + try: + value = bytes.fromhex(value) + except ValueError: + log.warning("Failed to decode raw key as hex, ignoring: %s", value) + return + + key = Key(key_type, value, provider, identifier) + KEYCHAIN.append(key) log.info("Registered key %s", key) -def get_all_keys() -> List[Key]: - return list(KEYCHAIN) +def get_all_keys() -> list[Key]: + return KEYCHAIN[:] -def get_keys_for_provider(provider: str) -> List[Key]: +def get_keys_for_provider(provider: str) -> list[Key]: return [key for key in KEYCHAIN if key.provider and key.provider.lower() == provider.lower()] -def get_keys_without_provider() -> List[Key]: +def get_keys_without_provider() -> list[Key]: return [key for key in KEYCHAIN if not key.provider] @@ -48,12 +56,12 @@ def parse_key_type(key_type_name: str) -> KeyType: raise ValueError("No KeyType enum values that match %s", key_type_name) -def register_wildcard_value(value: str): +def register_wildcard_value(value: str) -> None: for key_type in KeyType: - register_key(key_type=key_type, value=value) + register_key(key_type, value) -def register_keychain_file(keychain_path: Path): +def register_keychain_file(keychain_path: Path) -> None: """Register all keys from provided keychain file. The keychain file is a CSV file in "excel" dialect with the columns: diff --git a/dissect/target/volume.py b/dissect/target/volume.py index 8e8d6adc4..0fbdc935f 100644 --- a/dissect/target/volume.py +++ b/dissect/target/volume.py @@ -24,6 +24,8 @@ """A lazy import of :mod:`dissect.target.volumes.md`.""" bde = import_lazy("dissect.target.volumes.bde") """A lazy import of :mod:`dissect.target.volumes.bde`.""" +luks = import_lazy("dissect.target.volumes.luks") +"""A lazy import of :mod:`dissect.target.volumes.luks`.""" log = logging.getLogger(__name__) """A logger instance for this module.""" @@ -34,7 +36,7 @@ md.MdVolumeSystem, ] """All available :class:`LogicalVolumeSystem` classes.""" -ENCRYPTED_VOLUME_MANAGERS: list[type[EncryptedVolumeSystem]] = [bde.BitlockerVolumeSystem] +ENCRYPTED_VOLUME_MANAGERS: list[type[EncryptedVolumeSystem]] = [bde.BitlockerVolumeSystem, luks.LUKSVolumeSystem] """All available :class:`EncryptedVolumeSystem` classes.""" @@ -387,8 +389,11 @@ def open_encrypted(volume: BinaryIO) -> Iterator[Volume]: except ImportError as e: log.info("Failed to import %s", manager_cls) log.debug("", exc_info=e) - except VolumeSystemError: - log.exception(f"Failed to open an encrypted volume {volume} with volume manager {manager_cls}") + except Exception as e: + log.error( + "Failed to open an encrypted volume %s with volume manager %s: %s", volume, manager_cls.PROVIDER, e + ) + log.debug("", exc_info=e) return None diff --git a/dissect/target/volumes/luks.py b/dissect/target/volumes/luks.py new file mode 100644 index 000000000..d673187cd --- /dev/null +++ b/dissect/target/volumes/luks.py @@ -0,0 +1,116 @@ +import logging +import pathlib +from typing import BinaryIO, Iterator, Optional, Union + +from dissect.fve import luks +from dissect.util.stream import AlignedStream + +from dissect.target.exceptions import VolumeSystemError +from dissect.target.helpers.keychain import KeyType +from dissect.target.volume import EncryptedVolumeSystem, Volume + +log = logging.getLogger(__name__) + + +class LUKSVolumeSystemError(VolumeSystemError): + pass + + +class LUKSVolumeSystem(EncryptedVolumeSystem): + PROVIDER = "luks" + + def __init__(self, fh: Union[BinaryIO, list[BinaryIO]], *args, **kwargs): + super().__init__(fh, *args, **kwargs) + self.luks = luks.LUKS(fh) + + @staticmethod + def _detect(fh: BinaryIO) -> bool: + return luks.is_luks_volume(fh) + + def _volumes(self) -> Iterator[Volume]: + if isinstance(self.fh, Volume): + volume_details = dict( + number=self.fh.number, + offset=self.fh.offset, + vtype=self.fh.type, + name=self.fh.name, + ) + else: + volume_details = dict( + number=1, + offset=0, + vtype=None, + name=None, + ) + + stream = self.unlock_volume() + yield Volume( + fh=stream, + size=stream.size, + raw=self.fh, + vs=self, + **volume_details, + ) + + def unlock_with_volume_encryption_key(self, key: bytes, keyslot: Optional[int] = None) -> None: + try: + if keyslot is None: + for keyslot in self.luks.keyslots.keys(): + try: + self.luks.unlock(key, keyslot) + break + except ValueError: + continue + else: + raise ValueError("Failed to find matching keyslot for provided volume encryption key") + else: + self.luks.unlock(key, keyslot) + + log.debug("Unlocked LUKS volume with provided volume encryption key") + except ValueError: + log.exception("Failed to unlock LUKS volume with provided volume encryption key") + + def unlock_with_passphrase(self, passphrase: str, keyslot: Optional[int] = None) -> None: + try: + self.luks.unlock_with_passphrase(passphrase, keyslot) + log.debug("Unlocked LUKS volume with provided passphrase") + except ValueError: + log.exception("Failed to unlock LUKS volume with provided passphrase") + + def unlock_with_key_file(self, key_file: pathlib.Path, keyslot: Optional[int] = None) -> None: + if not key_file.exists(): + log.error("Provided key file does not exist: %s", key_file) + return + + try: + self.luks.unlock_with_key_file(key_file, keyslot=keyslot) + log.debug("Unlocked LUKS volume with key file %s", key_file) + except ValueError: + log.exception("Failed to unlock LUKS volume with key file %s", key_file) + + def unlock_volume(self) -> AlignedStream: + keyslots = list(map(str, self.luks.keyslots.keys())) + keys = self.get_keys_for_identifiers(keyslots) + self.get_keys_without_identifier() + + for key in keys: + try: + keyslot = int(key.identifier) + except Exception: + keyslot = None + + if key.key_type == KeyType.RAW: + self.unlock_with_volume_encryption_key(key.value, keyslot) + if key.key_type == KeyType.PASSPHRASE: + self.unlock_with_passphrase(key.value, keyslot) + elif key.key_type == KeyType.FILE: + key_file = pathlib.Path(key.value) + self.unlock_with_key_file(key_file, keyslot) + + if self.luks.unlocked: + log.info("Volume %s unlocked with %s (keyslot: %d)", self.fh, key, self.luks._active_keyslot_id) + break + + if self.luks.unlocked: + return self.luks.open() + else: + raise LUKSVolumeSystemError("Failed to unlock LUKS volume") diff --git a/tests/test_helpers_keychain.py b/tests/test_helpers_keychain.py index 1adefdd6d..265928d6d 100644 --- a/tests/test_helpers_keychain.py +++ b/tests/test_helpers_keychain.py @@ -27,5 +27,11 @@ def test_keychain_register_keychain_file(guarded_keychain): def test_keychain_register_wildcard_value(guarded_keychain): keychain.register_wildcard_value("test-value") - # number of keys registered is equal number of supported key types + # Number of keys registered is equal number of supported key types, minus one for an invalid raw key + assert len(keychain.get_keys_without_provider()) == len(keychain.KeyType) - 1 + + keychain.KEYCHAIN.clear() + keychain.register_wildcard_value("0000") + + # Valid raw key now included assert len(keychain.get_keys_without_provider()) == len(keychain.KeyType)