diff --git a/dissect/target/loaders/velociraptor.py b/dissect/target/loaders/velociraptor.py index 6a0811052..0eaf77b25 100644 --- a/dissect/target/loaders/velociraptor.py +++ b/dissect/target/loaders/velociraptor.py @@ -22,7 +22,8 @@ def find_fs_directories(path: Path) -> tuple[Optional[OperatingSystem], Optional # As of Velociraptor version 0.7.0 the structure of the Velociraptor Offline Collector varies by operating system. # Generic.Collectors.File (Unix) uses the accessors file and auto. # Generic.Collectors.File (Windows) and Windows.KapeFiles.Targets (Windows) uses the accessors - # mft, ntfs, lazy_ntfs, ntfs_vss and auto. + # mft, ntfs, lazy_ntfs, ntfs_vss and auto. The loader only supports a collection where a single accessor is used. + # For Windows usage of the ntfs_vss accessor can be forced by configuring VSSAnalysisAge to be greater than 0. fs_root = path.joinpath(FILESYSTEMS_ROOT) @@ -36,14 +37,22 @@ def find_fs_directories(path: Path) -> tuple[Optional[OperatingSystem], Optional # Windows volumes = set() + vss_volumes = set() for accessor in WINDOWS_ACCESSORS: accessor_root = fs_root.joinpath(accessor) if accessor_root.exists(): # If the accessor directory exists, assume all the subdirectories are volumes - volumes.update(accessor_root.iterdir()) + for volume in accessor_root.iterdir(): + # https://github.com/Velocidex/velociraptor/blob/87368e7cc678144592a1614bb3bbd0a0f900ded9/accessors/ntfs/vss.go#L82 + if "HarddiskVolumeShadowCopy" in volume.name: + vss_volumes.add(volume) + else: + volumes.add(volume) if volumes: - return OperatingSystem.WINDOWS, list(volumes) + # The volumes that represent drives (C, D) are mounted first, + # otherwise one of the volume shadow copies could be detected as the root filesystem which results in errors. + return OperatingSystem.WINDOWS, list(volumes) + list(vss_volumes) return None, None diff --git a/tests/loaders/test_velociraptor.py b/tests/loaders/test_velociraptor.py index 902fb115e..f0b299c38 100644 --- a/tests/loaders/test_velociraptor.py +++ b/tests/loaders/test_velociraptor.py @@ -9,28 +9,26 @@ from tests._utils import absolute_path, mkdirs -def create_paths(sub_dir: str) -> list[str]: - return [ +def create_root(sub_dir: str, tmp_path: Path) -> Path: + paths = [ f"uploads/{sub_dir}/%5C%5C.%5CC%3A/", f"uploads/{sub_dir}/%5C%5C.%5CC%3A/$Extend", f"uploads/{sub_dir}/%5C%5C.%5CC%3A/windows/system32", - f"uploads/{sub_dir}/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1", + f"uploads/{sub_dir}/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1/", + f"uploads/{sub_dir}/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1/$Extend", + f"uploads/{sub_dir}/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1/windows/system32", ] - - -@pytest.mark.parametrize( - "sub_dir", - ["mft", "ntfs", "ntfs_vss", "lazy_ntfs", "auto"], -) -def test_velociraptor_loader_windows_ntfs(sub_dir: str, target_bare: Target, tmp_path: Path) -> None: - paths = create_paths(sub_dir) root = tmp_path mkdirs(root, paths) (root / "uploads.json").write_bytes(b"{}") + (root / f"uploads/{sub_dir}/%5C%5C.%5CC%3A/C-DRIVE.txt").write_bytes(b"{}") with open(absolute_path("_data/plugins/filesystem/ntfs/mft/mft.raw"), "rb") as fh: - root.joinpath(paths[0]).joinpath("$MFT").write_bytes(fh.read(10 * 1025)) + mft = fh.read(10 * 1025) + + root.joinpath(paths[0]).joinpath("$MFT").write_bytes(mft) + root.joinpath(paths[3]).joinpath("$MFT").write_bytes(mft) # Add one record so we can test if it works data = bytes.fromhex( @@ -39,21 +37,31 @@ def test_velociraptor_loader_windows_ntfs(sub_dir: str, target_bare: Target, tmp "2d00310035005000320036002e0074006d00700000000000" ) root.joinpath(paths[1]).joinpath("$UsnJrnl%3A$J").write_bytes(data) + root.joinpath(paths[4]).joinpath("$UsnJrnl%3A$J").write_bytes(data) + + return root + + +@pytest.mark.parametrize( + "sub_dir", + ["mft", "ntfs", "ntfs_vss", "lazy_ntfs", "auto"], +) +def test_velociraptor_loader_windows_ntfs(sub_dir: str, target_bare: Target, tmp_path: Path) -> None: + root = create_root(sub_dir, tmp_path) assert VelociraptorLoader.detect(root) is True loader = VelociraptorLoader(root) loader.map(target_bare) + target_bare.apply() - # TODO: Add fake Secure:SDS and verify mft function usnjrnl_records = 0 for fs in target_bare.filesystems: if isinstance(fs, NtfsFilesystem): usnjrnl_records += len(list(fs.ntfs.usnjrnl.records())) - assert usnjrnl_records == 1 - - # The 2 found directories + the fake NTFS filesystem - assert len(target_bare.filesystems) == 3 + assert usnjrnl_records == 2 + assert len(target_bare.filesystems) == 4 + assert target_bare.fs.path("sysvol/C-DRIVE.txt").exists() @pytest.mark.parametrize( @@ -61,22 +69,7 @@ def test_velociraptor_loader_windows_ntfs(sub_dir: str, target_bare: Target, tmp ["mft", "ntfs", "ntfs_vss", "lazy_ntfs", "auto"], ) def test_velociraptor_loader_windows_ntfs_zip(sub_dir: str, target_bare: Target, tmp_path: Path) -> None: - paths = create_paths(sub_dir) - root = tmp_path - mkdirs(root, paths) - - (root / "uploads.json").write_bytes(b"{}") - - with open(absolute_path("_data/plugins/filesystem/ntfs/mft/mft.raw"), "rb") as fh: - root.joinpath(paths[0]).joinpath("$MFT").write_bytes(fh.read(10 * 1025)) - - # Add one record so we can test if it works - data = bytes.fromhex( - "5800000002000000c100000000000100bf000000000001002003010000000000" - "6252641a86a4d7010381008000000000000000002000000018003c0069007300" - "2d00310035005000320036002e0074006d00700000000000" - ) - root.joinpath(paths[1]).joinpath("$UsnJrnl%3A$J").write_bytes(data) + create_root(sub_dir, tmp_path) shutil.make_archive(tmp_path.joinpath("test_ntfs"), "zip", tmp_path) @@ -85,13 +78,15 @@ def test_velociraptor_loader_windows_ntfs_zip(sub_dir: str, target_bare: Target, loader = VelociraptorLoader(zip_path) loader.map(target_bare) + target_bare.apply() usnjrnl_records = 0 for fs in target_bare.filesystems: if isinstance(fs, NtfsFilesystem): usnjrnl_records += len(list(fs.ntfs.usnjrnl.records())) - assert usnjrnl_records == 1 - assert len(target_bare.filesystems) == 3 + assert usnjrnl_records == 2 + assert len(target_bare.filesystems) == 4 + assert target_bare.fs.path("sysvol/C-DRIVE.txt").exists() @pytest.mark.parametrize(