Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LUKS volume support #404

Merged
merged 7 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions dissect/target/helpers/keychain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,50 @@
import logging
from enum import Enum
from pathlib import Path
from typing import List, NamedTuple, Set
from typing import NamedTuple, Union

log = logging.getLogger(__name__)


class KeyType(Enum):
RAW = "raw"
PASSPHRASE = "passphrase"
RECOVERY_KEY = "recovery_key"
FILE = "file"


class Key(NamedTuple):
key_type: KeyType
value: str
value: Union[str, bytes]
provider: str = None
identifier: str = None


KEYCHAIN: Set[Key] = set()
KEYCHAIN: list[Key] = []


def register_key(key_type: KeyType, value: str, identifier: str = None, provider: str = None):
key = Key(provider=provider, key_type=key_type, value=value, identifier=identifier)
KEYCHAIN.add(key)
def register_key(key_type: KeyType, value: str, identifier: str = None, provider: str = None) -> None:
if key_type == KeyType.RAW:
try:
value = bytes.fromhex(value)
except ValueError:
log.warning("Failed to decode raw key as hex, ignoring: %s", value)
return

key = Key(key_type, value, provider, identifier)
KEYCHAIN.append(key)
log.info("Registered key %s", key)


def get_all_keys() -> List[Key]:
return list(KEYCHAIN)
def get_all_keys() -> list[Key]:
return KEYCHAIN[:]


def get_keys_for_provider(provider: str) -> List[Key]:
def get_keys_for_provider(provider: str) -> list[Key]:
return [key for key in KEYCHAIN if key.provider and key.provider.lower() == provider.lower()]


def get_keys_without_provider() -> List[Key]:
def get_keys_without_provider() -> list[Key]:
return [key for key in KEYCHAIN if not key.provider]


Expand All @@ -48,12 +56,12 @@ def parse_key_type(key_type_name: str) -> KeyType:
raise ValueError("No KeyType enum values that match %s", key_type_name)


def register_wildcard_value(value: str):
def register_wildcard_value(value: str) -> None:
for key_type in KeyType:
register_key(key_type=key_type, value=value)
register_key(key_type, value)


def register_keychain_file(keychain_path: Path):
def register_keychain_file(keychain_path: Path) -> None:
"""Register all keys from provided keychain file.

The keychain file is a CSV file in "excel" dialect with the columns:
Expand Down
11 changes: 8 additions & 3 deletions dissect/target/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
"""A lazy import of :mod:`dissect.target.volumes.md`."""
bde = import_lazy("dissect.target.volumes.bde")
"""A lazy import of :mod:`dissect.target.volumes.bde`."""
luks = import_lazy("dissect.target.volumes.luks")
"""A lazy import of :mod:`dissect.target.volumes.luks`."""

log = logging.getLogger(__name__)
"""A logger instance for this module."""
Expand All @@ -34,7 +36,7 @@
md.MdVolumeSystem,
]
"""All available :class:`LogicalVolumeSystem` classes."""
ENCRYPTED_VOLUME_MANAGERS: list[type[EncryptedVolumeSystem]] = [bde.BitlockerVolumeSystem]
ENCRYPTED_VOLUME_MANAGERS: list[type[EncryptedVolumeSystem]] = [bde.BitlockerVolumeSystem, luks.LUKSVolumeSystem]
"""All available :class:`EncryptedVolumeSystem` classes."""


Expand Down Expand Up @@ -387,8 +389,11 @@ def open_encrypted(volume: BinaryIO) -> Iterator[Volume]:
except ImportError as e:
log.info("Failed to import %s", manager_cls)
log.debug("", exc_info=e)
except VolumeSystemError:
log.exception(f"Failed to open an encrypted volume {volume} with volume manager {manager_cls}")
except Exception as e:
log.error(
"Failed to open an encrypted volume %s with volume manager %s: %s", volume, manager_cls.PROVIDER, e
)
log.debug("", exc_info=e)
return None


Expand Down
116 changes: 116 additions & 0 deletions dissect/target/volumes/luks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import logging
import pathlib
from typing import BinaryIO, Iterator, Optional, Union

from dissect.fve import luks
from dissect.util.stream import AlignedStream

from dissect.target.exceptions import VolumeSystemError
from dissect.target.helpers.keychain import KeyType
from dissect.target.volume import EncryptedVolumeSystem, Volume

log = logging.getLogger(__name__)


class LUKSVolumeSystemError(VolumeSystemError):
pass


class LUKSVolumeSystem(EncryptedVolumeSystem):
PROVIDER = "luks"

def __init__(self, fh: Union[BinaryIO, list[BinaryIO]], *args, **kwargs):
super().__init__(fh, *args, **kwargs)
self.luks = luks.LUKS(fh)

@staticmethod
def _detect(fh: BinaryIO) -> bool:
return luks.is_luks_volume(fh)

def _volumes(self) -> Iterator[Volume]:
if isinstance(self.fh, Volume):
volume_details = dict(
number=self.fh.number,
offset=self.fh.offset,
vtype=self.fh.type,
name=self.fh.name,
)
else:
volume_details = dict(
number=1,
offset=0,
vtype=None,
name=None,
)

stream = self.unlock_volume()
yield Volume(
fh=stream,
size=stream.size,
raw=self.fh,
vs=self,
**volume_details,
)

def unlock_with_volume_encryption_key(self, key: bytes, keyslot: Optional[int] = None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not seem to work?

self.unlock_with_passphrase("luks", 0)

ValueError: No valid keyslot found.
Tested on latest Ubuntu. Some usage documentation might be handy.
Also, is there a mechanism to provide a key through an ENV var or option?

Copy link
Member Author

@Schamper Schamper Oct 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me, are you using the correct passphrase and keyslot?

Also, the intended way is to use the keychain, for which you can either provide a csv file using-K <path> or a value using -Kv <value>. For example:

❯ target-shell -p ../dissect.fve/tests/data/luks_aes-cbc-essiv.bin -Kv password -q
Python 3.11.5 (main, Aug 24 2023, 12:23:19) [Clang 15.0.0 (clang-1500.0.40.1)]
Type 'copyright', 'credits' or 'license' for more information
IPython 8.13.2 -- An enhanced Interactive Python. Type '?' for help.


Loaded targets in 'targets' variable. First target is in 't'.

In [1]: t.volumes[1].vs.luks.unlocked
Out[1]: True

There's also documentation available here: https://docs.dissect.tools/en/latest/usage/disk-encryption.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't get it to work with the -Kv parameter, let's say I make an empty luks volume like this:

dd if=/dev/zero of=/path/to/lucky_luks.img bs=1M count=512
cryptsetup -vy luksFormat /path/to/lucky_luks.img
sudo cryptsetup luksOpen /path/to/lucky_luks.img lucky_luks_volume
sudo mkfs.ext4 /dev/mapper/lucky_luks_volume

But opening it with:

target-shell /path/to/lucky_luks.img -Kv luks

gives an error (Error: Group descriptor block locations exceed last block), yet:

dd if=/dev/zero of=/path/to/noluks.img bs=1M count=512
mkfs.ext4 -F /path/to/noluks.img
target-shell /path/to/noluks.img

works as expected?
What do I miss?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For documentation purposes: this was picked up offline and turned out to be a two-fold problem:

  • LUKS1 is not yet supported, which it turns out is what cryptsetup formatted it as by default
  • There was a bug in dissect.fve that resulted in LUKS volumes with a large sector size (4k) being decrypted incorrectly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed, seems to work now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a warning or error if the LUKS version is 1 because now I can still open it but I will only see an empty disk.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I improved the error logging in volume.py to make the existing error more visible.

try:
if keyslot is None:
for keyslot in self.luks.keyslots.keys():
try:
self.luks.unlock(key, keyslot)
break
except ValueError:
continue
else:
raise ValueError("Failed to find matching keyslot for provided volume encryption key")
else:
self.luks.unlock(key, keyslot)

log.debug("Unlocked LUKS volume with provided volume encryption key")
except ValueError:
log.exception("Failed to unlock LUKS volume with provided volume encryption key")

def unlock_with_passphrase(self, passphrase: str, keyslot: Optional[int] = None) -> None:
try:
self.luks.unlock_with_passphrase(passphrase, keyslot)
log.debug("Unlocked LUKS volume with provided passphrase")
except ValueError:
log.exception("Failed to unlock LUKS volume with provided passphrase")

def unlock_with_key_file(self, key_file: pathlib.Path, keyslot: Optional[int] = None) -> None:
if not key_file.exists():
log.error("Provided key file does not exist: %s", key_file)
return

try:
self.luks.unlock_with_key_file(key_file, keyslot=keyslot)
log.debug("Unlocked LUKS volume with key file %s", key_file)
except ValueError:
log.exception("Failed to unlock LUKS volume with key file %s", key_file)

def unlock_volume(self) -> AlignedStream:
keyslots = list(map(str, self.luks.keyslots.keys()))
keys = self.get_keys_for_identifiers(keyslots) + self.get_keys_without_identifier()

for key in keys:
try:
keyslot = int(key.identifier)
except Exception:
keyslot = None

if key.key_type == KeyType.RAW:
self.unlock_with_volume_encryption_key(key.value, keyslot)
if key.key_type == KeyType.PASSPHRASE:
self.unlock_with_passphrase(key.value, keyslot)
elif key.key_type == KeyType.FILE:
key_file = pathlib.Path(key.value)
self.unlock_with_key_file(key_file, keyslot)

if self.luks.unlocked:
log.info("Volume %s unlocked with %s (keyslot: %d)", self.fh, key, self.luks._active_keyslot_id)
break

if self.luks.unlocked:
return self.luks.open()
else:
raise LUKSVolumeSystemError("Failed to unlock LUKS volume")
8 changes: 7 additions & 1 deletion tests/test_helpers_keychain.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,11 @@ def test_keychain_register_keychain_file(guarded_keychain):
def test_keychain_register_wildcard_value(guarded_keychain):
keychain.register_wildcard_value("test-value")

# number of keys registered is equal number of supported key types
# Number of keys registered is equal number of supported key types, minus one for an invalid raw key
assert len(keychain.get_keys_without_provider()) == len(keychain.KeyType) - 1

keychain.KEYCHAIN.clear()
keychain.register_wildcard_value("0000")

# Valid raw key now included
assert len(keychain.get_keys_without_provider()) == len(keychain.KeyType)