Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Velociraptor Loader Windows root filesystem detection #490

Merged
merged 6 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions dissect/target/loaders/velociraptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def find_fs_directories(path: Path) -> tuple[Optional[OperatingSystem], Optional
# As of Velociraptor version 0.7.0 the structure of the Velociraptor Offline Collector varies by operating system.
# Generic.Collectors.File (Unix) uses the accessors file and auto.
# Generic.Collectors.File (Windows) and Windows.KapeFiles.Targets (Windows) uses the accessors
# mft, ntfs, lazy_ntfs, ntfs_vss and auto.
# mft, ntfs, lazy_ntfs, ntfs_vss and auto. The loader only supports a collection where a single accessor is used.
# For Windows usage of the ntfs_vss accessor can be forced by configuring VSSAnalysisAge to be greater than 0.

fs_root = path.joinpath(FILESYSTEMS_ROOT)

Expand All @@ -36,14 +37,22 @@ def find_fs_directories(path: Path) -> tuple[Optional[OperatingSystem], Optional

# Windows
volumes = set()
vss_volumes = set()
for accessor in WINDOWS_ACCESSORS:
accessor_root = fs_root.joinpath(accessor)
if accessor_root.exists():
# If the accessor directory exists, assume all the subdirectories are volumes
volumes.update(accessor_root.iterdir())
for volume in accessor_root.iterdir():
# https://github.com/Velocidex/velociraptor/blob/87368e7cc678144592a1614bb3bbd0a0f900ded9/accessors/ntfs/vss.go#L82
if "HarddiskVolumeShadowCopy" in volume.name:
vss_volumes.add(volume)
else:
volumes.add(volume)

if volumes:
return OperatingSystem.WINDOWS, list(volumes)
# The volumes that represent drives (C, D) are mounted first,
# otherwise one of the volume shadow copies could be detected as the root filesystem which results in errors.
return OperatingSystem.WINDOWS, list(volumes) + list(vss_volumes)

return None, None

Expand Down
65 changes: 30 additions & 35 deletions tests/loaders/test_velociraptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,26 @@
from tests._utils import absolute_path, mkdirs


def create_paths(sub_dir: str) -> list[str]:
return [
def create_root(sub_dir: str, tmp_path: Path) -> Path:
paths = [
f"uploads/{sub_dir}/%5C%5C.%5CC%3A/",
f"uploads/{sub_dir}/%5C%5C.%5CC%3A/$Extend",
f"uploads/{sub_dir}/%5C%5C.%5CC%3A/windows/system32",
f"uploads/{sub_dir}/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1",
f"uploads/{sub_dir}/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1/",
f"uploads/{sub_dir}/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1/$Extend",
f"uploads/{sub_dir}/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1/windows/system32",
]


@pytest.mark.parametrize(
"sub_dir",
["mft", "ntfs", "ntfs_vss", "lazy_ntfs", "auto"],
)
def test_velociraptor_loader_windows_ntfs(sub_dir: str, target_bare: Target, tmp_path: Path) -> None:
paths = create_paths(sub_dir)
root = tmp_path
mkdirs(root, paths)

(root / "uploads.json").write_bytes(b"{}")
(root / f"uploads/{sub_dir}/%5C%5C.%5CC%3A/C-DRIVE.txt").write_bytes(b"{}")

with open(absolute_path("_data/plugins/filesystem/ntfs/mft/mft.raw"), "rb") as fh:
root.joinpath(paths[0]).joinpath("$MFT").write_bytes(fh.read(10 * 1025))
mft = fh.read(10 * 1025)

root.joinpath(paths[0]).joinpath("$MFT").write_bytes(mft)
root.joinpath(paths[3]).joinpath("$MFT").write_bytes(mft)

# Add one record so we can test if it works
data = bytes.fromhex(
Expand All @@ -39,44 +37,39 @@ def test_velociraptor_loader_windows_ntfs(sub_dir: str, target_bare: Target, tmp
"2d00310035005000320036002e0074006d00700000000000"
)
root.joinpath(paths[1]).joinpath("$UsnJrnl%3A$J").write_bytes(data)
root.joinpath(paths[4]).joinpath("$UsnJrnl%3A$J").write_bytes(data)

return root


@pytest.mark.parametrize(
"sub_dir",
["mft", "ntfs", "ntfs_vss", "lazy_ntfs", "auto"],
)
def test_velociraptor_loader_windows_ntfs(sub_dir: str, target_bare: Target, tmp_path: Path) -> None:
root = create_root(sub_dir, tmp_path)

assert VelociraptorLoader.detect(root) is True

loader = VelociraptorLoader(root)
loader.map(target_bare)
target_bare.apply()

# TODO: Add fake Secure:SDS and verify mft function
usnjrnl_records = 0
for fs in target_bare.filesystems:
if isinstance(fs, NtfsFilesystem):
usnjrnl_records += len(list(fs.ntfs.usnjrnl.records()))
assert usnjrnl_records == 1

# The 2 found directories + the fake NTFS filesystem
assert len(target_bare.filesystems) == 3
assert usnjrnl_records == 2
assert len(target_bare.filesystems) == 4
assert target_bare.fs.path("sysvol/C-DRIVE.txt").exists()


@pytest.mark.parametrize(
"sub_dir",
["mft", "ntfs", "ntfs_vss", "lazy_ntfs", "auto"],
)
def test_velociraptor_loader_windows_ntfs_zip(sub_dir: str, target_bare: Target, tmp_path: Path) -> None:
paths = create_paths(sub_dir)
root = tmp_path
mkdirs(root, paths)

(root / "uploads.json").write_bytes(b"{}")

with open(absolute_path("_data/plugins/filesystem/ntfs/mft/mft.raw"), "rb") as fh:
root.joinpath(paths[0]).joinpath("$MFT").write_bytes(fh.read(10 * 1025))

# Add one record so we can test if it works
data = bytes.fromhex(
"5800000002000000c100000000000100bf000000000001002003010000000000"
"6252641a86a4d7010381008000000000000000002000000018003c0069007300"
"2d00310035005000320036002e0074006d00700000000000"
)
root.joinpath(paths[1]).joinpath("$UsnJrnl%3A$J").write_bytes(data)
create_root(sub_dir, tmp_path)

shutil.make_archive(tmp_path.joinpath("test_ntfs"), "zip", tmp_path)

Expand All @@ -85,13 +78,15 @@ def test_velociraptor_loader_windows_ntfs_zip(sub_dir: str, target_bare: Target,

loader = VelociraptorLoader(zip_path)
loader.map(target_bare)
target_bare.apply()

usnjrnl_records = 0
for fs in target_bare.filesystems:
if isinstance(fs, NtfsFilesystem):
usnjrnl_records += len(list(fs.ntfs.usnjrnl.records()))
assert usnjrnl_records == 1
assert len(target_bare.filesystems) == 3
assert usnjrnl_records == 2
assert len(target_bare.filesystems) == 4
assert target_bare.fs.path("sysvol/C-DRIVE.txt").exists()


@pytest.mark.parametrize(
Expand Down