Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make adding virtual NTFS filesystem more resilient #691

Merged
merged 3 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions dissect/target/helpers/loaderutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import urllib
from os import PathLike
from pathlib import Path
from typing import TYPE_CHECKING, BinaryIO, Optional, Union
from typing import TYPE_CHECKING, BinaryIO

from dissect.target.exceptions import FileNotFoundError
from dissect.target.filesystem import Filesystem
Expand Down Expand Up @@ -42,12 +42,31 @@ def add_virtual_ntfs_filesystem(
fh_sds = _try_open(fs, sds_path)

if any([fh_boot, fh_mft]):
ntfs = NtfsFilesystem(boot=fh_boot, mft=fh_mft, usnjrnl=fh_usnjrnl, sds=fh_sds)
target.filesystems.add(ntfs)
fs.ntfs = ntfs.ntfs
ntfs = None


def _try_open(fs: Filesystem, path: str) -> BinaryIO:
try:
ntfs = NtfsFilesystem(boot=fh_boot, mft=fh_mft, usnjrnl=fh_usnjrnl, sds=fh_sds)
except Exception as e:
if fh_boot:
log.warning("Failed to load NTFS filesystem from %s, retrying without $Boot file", fs)
log.debug("", exc_info=e)

try:
# Try once more without the $Boot file
ntfs = NtfsFilesystem(mft=fh_mft, usnjrnl=fh_usnjrnl, sds=fh_sds)
except Exception:
log.warning("Failed to load NTFS filesystem from %s without $Boot file, skipping", fs)
return

# Only add it if we have a valid NTFS with an MFT
if ntfs and ntfs.ntfs.mft:
target.filesystems.add(ntfs)
fs.ntfs = ntfs.ntfs
else:
log.warning("Opened NTFS filesystem from %s but could not find $MFT, skipping", fs)


def _try_open(fs: Filesystem, path: str) -> BinaryIO | None:
paths = [path] if not isinstance(path, list) else path

for path in paths:
Expand All @@ -61,7 +80,7 @@ def _try_open(fs: Filesystem, path: str) -> BinaryIO:
pass


def extract_path_info(path: Union[str, Path]) -> tuple[Path, Optional[urllib.parse.ParseResult]]:
def extract_path_info(path: str | Path) -> tuple[Path, urllib.parse.ParseResult | None]:
"""
Extracts a ParseResult from a path if it has
a scheme and adjusts the path if necessary.
Expand Down
110 changes: 107 additions & 3 deletions tests/helpers/test_loaderutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@

import urllib
from pathlib import Path
from typing import Optional
from typing import TYPE_CHECKING
from unittest.mock import Mock, patch

import pytest

from dissect.target.filesystem import VirtualFilesystem
from dissect.target.filesystems.dir import DirectoryFilesystem
from dissect.target.helpers.fsutil import TargetPath
from dissect.target.helpers.loaderutil import extract_path_info
from dissect.target.helpers.loaderutil import (
add_virtual_ntfs_filesystem,
extract_path_info,
)

if TYPE_CHECKING:
from dissect.target.target import Target


@pytest.mark.parametrize(
Expand Down Expand Up @@ -36,6 +44,102 @@
],
)
def test_extract_path_info(
path: Path | str, expected: tuple[Optional[Path], Optional[urllib.parse.ParseResult[str]]]
path: Path | str, expected: tuple[Path | None, urllib.parse.ParseResult[str] | None]
) -> None:
assert extract_path_info(path) == expected


@pytest.mark.parametrize(
"boot, mft, expected_logs",
[
(None, None, []),
(
None,
False,
[
"Opened NTFS filesystem from <VirtualFilesystem> but could not find $MFT, skipping",
],
),
(None, True, []),
(
False,
None,
[
"Failed to load NTFS filesystem from <VirtualFilesystem>, retrying without $Boot file",
"Opened NTFS filesystem from <VirtualFilesystem> but could not find $MFT, skipping",
],
),
(
False,
False,
[
"Failed to load NTFS filesystem from <VirtualFilesystem>, retrying without $Boot file",
"Failed to load NTFS filesystem from <VirtualFilesystem> without $Boot file, skipping",
],
),
(
False,
True,
[
"Failed to load NTFS filesystem from <VirtualFilesystem>, retrying without $Boot file",
],
),
(
True,
None,
[
"Opened NTFS filesystem from <VirtualFilesystem> but could not find $MFT, skipping",
],
),
(
True,
False,
[
"Failed to load NTFS filesystem from <VirtualFilesystem>, retrying without $Boot file",
"Failed to load NTFS filesystem from <VirtualFilesystem> without $Boot file, skipping",
],
),
(True, True, []),
],
)
def test_virtual_ntfs_resiliency(
boot: bool | None,
mft: bool | None,
expected_logs: list[str],
target_default: Target,
caplog: pytest.LogCaptureFixture,
) -> None:
sentinels = {
None: None,
False: Mock(),
True: Mock(),
}

def _try_open(fs, path):
state = None
if path == "$Boot":
state = boot
elif path == "$MFT":
state = mft
return sentinels[state]

def NtfsFilesystem(boot=None, mft=None, **kwargs):
if boot is sentinels[False] or mft is sentinels[False]:
raise Exception("Oopsiewoopsie")

fake_ntfs = Mock()
fake_ntfs.ntfs.mft = None

if mft is sentinels[True]:
fake_ntfs.ntfs.mft = Mock()

return fake_ntfs

vfs = VirtualFilesystem()
with patch("dissect.target.helpers.loaderutil._try_open", new=_try_open), patch(
"dissect.target.helpers.loaderutil.NtfsFilesystem", new=NtfsFilesystem
):
add_virtual_ntfs_filesystem(target_default, vfs)

assert caplog.messages == expected_logs
assert hasattr(vfs, "ntfs") == (True if mft else False)