diff --git a/dissect/target/helpers/loaderutil.py b/dissect/target/helpers/loaderutil.py index 475cabb2a..ad376b566 100644 --- a/dissect/target/helpers/loaderutil.py +++ b/dissect/target/helpers/loaderutil.py @@ -5,7 +5,7 @@ import urllib from os import PathLike from pathlib import Path -from typing import TYPE_CHECKING, BinaryIO, Optional, Union +from typing import TYPE_CHECKING, BinaryIO from dissect.target.exceptions import FileNotFoundError from dissect.target.filesystem import Filesystem @@ -42,12 +42,31 @@ def add_virtual_ntfs_filesystem( fh_sds = _try_open(fs, sds_path) if any([fh_boot, fh_mft]): - ntfs = NtfsFilesystem(boot=fh_boot, mft=fh_mft, usnjrnl=fh_usnjrnl, sds=fh_sds) - target.filesystems.add(ntfs) - fs.ntfs = ntfs.ntfs + ntfs = None - -def _try_open(fs: Filesystem, path: str) -> BinaryIO: + try: + ntfs = NtfsFilesystem(boot=fh_boot, mft=fh_mft, usnjrnl=fh_usnjrnl, sds=fh_sds) + except Exception as e: + if fh_boot: + log.warning("Failed to load NTFS filesystem from %s, retrying without $Boot file", fs) + log.debug("", exc_info=e) + + try: + # Try once more without the $Boot file + ntfs = NtfsFilesystem(mft=fh_mft, usnjrnl=fh_usnjrnl, sds=fh_sds) + except Exception: + log.warning("Failed to load NTFS filesystem from %s without $Boot file, skipping", fs) + return + + # Only add it if we have a valid NTFS with an MFT + if ntfs and ntfs.ntfs.mft: + target.filesystems.add(ntfs) + fs.ntfs = ntfs.ntfs + else: + log.warning("Opened NTFS filesystem from %s but could not find $MFT, skipping", fs) + + +def _try_open(fs: Filesystem, path: str) -> BinaryIO | None: paths = [path] if not isinstance(path, list) else path for path in paths: @@ -61,7 +80,7 @@ def _try_open(fs: Filesystem, path: str) -> BinaryIO: pass -def extract_path_info(path: Union[str, Path]) -> tuple[Path, Optional[urllib.parse.ParseResult]]: +def extract_path_info(path: str | Path) -> tuple[Path, urllib.parse.ParseResult | None]: """ Extracts a ParseResult from a path if it has a scheme and adjusts the path if necessary. diff --git a/tests/helpers/test_loaderutil.py b/tests/helpers/test_loaderutil.py index 8dad73703..1d8d0996a 100644 --- a/tests/helpers/test_loaderutil.py +++ b/tests/helpers/test_loaderutil.py @@ -2,13 +2,21 @@ import urllib from pathlib import Path -from typing import Optional +from typing import TYPE_CHECKING +from unittest.mock import Mock, patch import pytest +from dissect.target.filesystem import VirtualFilesystem from dissect.target.filesystems.dir import DirectoryFilesystem from dissect.target.helpers.fsutil import TargetPath -from dissect.target.helpers.loaderutil import extract_path_info +from dissect.target.helpers.loaderutil import ( + add_virtual_ntfs_filesystem, + extract_path_info, +) + +if TYPE_CHECKING: + from dissect.target.target import Target @pytest.mark.parametrize( @@ -36,6 +44,102 @@ ], ) def test_extract_path_info( - path: Path | str, expected: tuple[Optional[Path], Optional[urllib.parse.ParseResult[str]]] + path: Path | str, expected: tuple[Path | None, urllib.parse.ParseResult[str] | None] ) -> None: assert extract_path_info(path) == expected + + +@pytest.mark.parametrize( + "boot, mft, expected_logs", + [ + (None, None, []), + ( + None, + False, + [ + "Opened NTFS filesystem from but could not find $MFT, skipping", + ], + ), + (None, True, []), + ( + False, + None, + [ + "Failed to load NTFS filesystem from , retrying without $Boot file", + "Opened NTFS filesystem from but could not find $MFT, skipping", + ], + ), + ( + False, + False, + [ + "Failed to load NTFS filesystem from , retrying without $Boot file", + "Failed to load NTFS filesystem from without $Boot file, skipping", + ], + ), + ( + False, + True, + [ + "Failed to load NTFS filesystem from , retrying without $Boot file", + ], + ), + ( + True, + None, + [ + "Opened NTFS filesystem from but could not find $MFT, skipping", + ], + ), + ( + True, + False, + [ + "Failed to load NTFS filesystem from , retrying without $Boot file", + "Failed to load NTFS filesystem from without $Boot file, skipping", + ], + ), + (True, True, []), + ], +) +def test_virtual_ntfs_resiliency( + boot: bool | None, + mft: bool | None, + expected_logs: list[str], + target_default: Target, + caplog: pytest.LogCaptureFixture, +) -> None: + sentinels = { + None: None, + False: Mock(), + True: Mock(), + } + + def _try_open(fs, path): + state = None + if path == "$Boot": + state = boot + elif path == "$MFT": + state = mft + return sentinels[state] + + def NtfsFilesystem(boot=None, mft=None, **kwargs): + if boot is sentinels[False] or mft is sentinels[False]: + raise Exception("Oopsiewoopsie") + + fake_ntfs = Mock() + fake_ntfs.ntfs.mft = None + + if mft is sentinels[True]: + fake_ntfs.ntfs.mft = Mock() + + return fake_ntfs + + vfs = VirtualFilesystem() + with patch("dissect.target.helpers.loaderutil._try_open", new=_try_open), patch( + "dissect.target.helpers.loaderutil.NtfsFilesystem", new=NtfsFilesystem + ): + add_virtual_ntfs_filesystem(target_default, vfs) + + assert caplog.messages == expected_logs + assert hasattr(vfs, "ntfs") == (True if mft else False)