From 419bd056a2571e318897ebe6619b8279083f608d Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Thu, 14 Mar 2024 13:30:53 +0100 Subject: [PATCH 1/6] Create separate LayerFilesystem from RootFilesystem --- dissect/target/filesystem.py | 241 +++++++++++++++----- dissect/target/plugins/filesystem/walkfs.py | 4 +- dissect/target/tools/shell.py | 4 +- tests/test_filesystem.py | 51 ++++- 4 files changed, 226 insertions(+), 74 deletions(-) diff --git a/dissect/target/filesystem.py b/dissect/target/filesystem.py index cd1c89684..719989d90 100644 --- a/dissect/target/filesystem.py +++ b/dissect/target/filesystem.py @@ -525,21 +525,21 @@ def _resolve(self, follow_symlinks: bool = True) -> FilesystemEntry: follow_symlinks: Whether to resolve the entry if it is a symbolic link. Returns: - The resolved symbolic link if ``follow_symlinks`` is ``True`` and the ``FilesystemEntry`` is a - symbolic link or else the ``FilesystemEntry`` itself. + The resolved symbolic link if ``follow_symlinks`` is ``True`` and the :class:`FilesystemEntry` is a + symbolic link or else the :class:`FilesystemEntry` itself. """ if follow_symlinks and self.is_symlink(): return self.readlink_ext() return self def get(self, path: str) -> FilesystemEntry: - """Retrieve a FilesystemEntry relative to this entry. + """Retrieve a :class:`FilesystemEntry` relative to this entry. Args: path: The path relative to this filesystem entry. Returns: - A relative FilesystemEntry. + A relative :class:`FilesystemEntry`. """ raise NotImplementedError() @@ -560,10 +560,10 @@ def iterdir(self) -> Iterator[str]: raise NotImplementedError() def scandir(self) -> Iterator[FilesystemEntry]: - """Iterate over the contents of a directory, return them as FilesystemEntry's. + """Iterate over the contents of a directory, yields :class:`FilesystemEntry`. Returns: - An iterator of directory entries as FilesystemEntry's. + An iterator of :class:`FilesystemEntry`. """ raise NotImplementedError() @@ -576,10 +576,10 @@ def listdir(self) -> list[str]: return list(self.iterdir()) def listdir_ext(self) -> list[FilesystemEntry]: - """List the contents of a directory as FilesystemEntry's. + """List the contents of a directory as a list of :class:`FilesystemEntry`. Returns: - A list of FilesystemEntry's. + A list of :class:`FilesystemEntry`. """ return list(self.scandir()) @@ -614,7 +614,7 @@ def walk_ext( onerror: Optional[Callable] = None, followlinks: bool = False, ) -> Iterator[FilesystemEntry]: - """Walk a directory and show its contents as FilesystemEntry's. + """Walk a directory and show its contents as :class:`FilesystemEntry`. It walks across all the files inside the entry recursively. @@ -629,7 +629,7 @@ def walk_ext( followlinks: ``True`` if we want to follow any symbolic link Returns: - An iterator of directory entries as FilesystemEntry's. + An iterator of :class:`FilesystemEntry`. """ yield from fsutil.walk_ext(self, topdown, onerror, followlinks) @@ -646,20 +646,22 @@ def glob(self, pattern) -> Iterator[str]: yield entry.path def glob_ext(self, pattern) -> Iterator[FilesystemEntry]: - """Iterate over the directory part of ``pattern``, returning entries matching ``pattern`` as FilesysmteEntry's. + """Iterate over the directory part of ``pattern``, returning entries matching + ``pattern`` as :class:`FilesysmteEntry`. Args: pattern: The pattern to glob for. Returns: - An iterator of FilesystemEntry's that match the pattern. + An iterator of :class:`FilesystemEntry` that match the pattern. """ yield from fsutil.glob_ext(self, pattern) def exists(self, path: str) -> bool: """Determines whether a ``path``, relative to this entry, exists. - If the `path` is a symbolic link, it will attempt to resolve it to find the FilesystemEntry it points to. + If the `path` is a symbolic link, it will attempt to resolve it to find + the :class:`FilesystemEntry` it points to. Args: path: The path relative to this entry. @@ -737,7 +739,7 @@ def readlink(self) -> str: raise NotImplementedError() def readlink_ext(self) -> FilesystemEntry: - """Read the link where this entry points to, return the resulting path as FilesystemEntry. + """Read the link where this entry points to, return the resulting path as :class:`FilesystemEntry`. If it is a symlink and returns the string that corresponds to that path. This means it follows the path a link points to, it tries to do it recursively. @@ -860,7 +862,7 @@ def lattr(self) -> Any: raise TypeError(f"lattr is not allowed on VirtualDirectory: {self.path}") def add(self, name: str, entry: FilesystemEntry) -> None: - """Add an entry to this VirtualDirectory.""" + """Add an entry to this :class:`VirtualDirectory`.""" if not self.fs.case_sensitive: name = name.lower() @@ -1214,7 +1216,7 @@ def map_file_fh(self, vfspath: str, fh: BinaryIO) -> None: self.map_file_entry(vfspath, VirtualFile(self, file_path, fh)) def map_file_entry(self, vfspath: str, entry: FilesystemEntry) -> None: - """Map a FilesystemEntry into the VFS. + """Map a :class:`FilesystemEntry` into the VFS. Any missing subdirectories up to, but not including, the last part of ``vfspath`` will be created. @@ -1271,7 +1273,7 @@ def map_file_from_tar(self, vfspath: str, tar_file: str | pathlib.Path) -> None: return self.map_dir_from_tar(vfspath.lstrip("/"), tar_file, map_single_file=True) def link(self, src: str, dst: str) -> None: - """Hard link a FilesystemEntry to another location. + """Hard link a :class:`FilesystemEntry` to another location. Args: src: The path to the target of the link. @@ -1291,31 +1293,49 @@ def symlink(self, src: str, dst: str) -> None: self.map_file_entry(dst, VirtualSymlink(self, dst, src)) -class RootFilesystem(Filesystem): - __type__ = "root" +class LayerFilesystem(Filesystem): + __type__ = "layer" - def __init__(self, target: Target): - self.target = target - self.layers = [] + def __init__(self, **kwargs): + self.layers: list[Filesystem] = [] self.mounts = {} self._alt_separator = "/" self._case_sensitive = True - self._root_entry = RootFilesystemEntry(self, "/", []) + self._root_entry = LayerFilesystemEntry(self, "/", []) self.root = self.add_layer() - super().__init__(None) + super().__init__(None, **kwargs) + + def __getattr__(self, attr: str) -> Any: + """Provide "magic" access to filesystem specific attributes from any of the layers. + + For example, on a :class:`LayerFilesystem` ``fs``, you can do ``fs.ntfs`` to access the + internal NTFS object if it has an NTFS layer. + """ + for fs in self.layers: + try: + return getattr(fs, attr) + except AttributeError: + continue + else: + return object.__getattribute__(self, attr) @staticmethod def detect(fh: BinaryIO) -> bool: - raise TypeError("Detect is not allowed on RootFilesystem class") + raise TypeError("Detect is not allowed on LayerFilesystem class") - def mount(self, path: str, fs: Filesystem) -> None: + def mount(self, path: str, fs: Filesystem, ignore_existing: bool = True) -> None: """Mount a filesystem at a given path. If there's an overlap with an existing mount, creates a new layer. + + Args: + path: The path to mount the filesystem at. + fs: The filesystem to mount. + ignore_existing: Whether to ignore existing mounts and create a new layer. Defaults to ``True``. """ root = self.root for mount in self.mounts.keys(): - if path == mount: + if ignore_existing and path == mount: continue if path.startswith(mount): @@ -1326,8 +1346,7 @@ def mount(self, path: str, fs: Filesystem) -> None: self.mounts[path] = fs def link(self, dst: str, src: str) -> None: - """Hard link a RootFilesystemEntry to another location.""" - dst = fsutil.normalize(dst, alt_separator=self.alt_separator) + """Hard link a :class:`FilesystemEntry` to another location.""" self.root.map_file_entry(dst, self.get(src)) def symlink(self, dst: str, src: str) -> None: @@ -1335,21 +1354,65 @@ def symlink(self, dst: str, src: str) -> None: self.root.symlink(dst, src) def add_layer(self, **kwargs) -> VirtualFilesystem: + """Append a new layer.""" layer = VirtualFilesystem(case_sensitive=self.case_sensitive, alt_separator=self.alt_separator, **kwargs) - self.layers.append(layer) - self._root_entry.entries.append(layer.root) + self.add_fs_layer(layer) return layer + def prepend_layer(self, **kwargs) -> VirtualFilesystem: + """Prepend a new layer.""" + layer = VirtualFilesystem(case_sensitive=self.case_sensitive, alt_separator=self.alt_separator, **kwargs) + self.prepend_fs_layer(layer) + return layer + + def add_fs_layer(self, fs: Filesystem) -> None: + """Append a filesystem as a layer. + + Args: + fs: The filesystem to append. + """ + self.layers.append(fs) + self._root_entry.entries.append(fs.get("/")) + + def prepend_fs_layer(self, fs: Filesystem) -> None: + """Prepend a filesystem as a layer. + + Args: + fs: The filesystem to prepend. + """ + self.layers.insert(0, fs) + self._root_entry.entries.insert(0, fs.get("/")) + + def remove_fs_layer(self, fs: Filesystem) -> None: + """Remove a filesystem layer. + + Args: + fs: The filesystem to remove. + """ + self.remove_layer(self.layers.index(fs)) + + def remove_layer(self, idx: int) -> None: + """Remove a layer by index. + + Args: + idx: The index of the layer to remove. + """ + del self.layers[idx] + del self._root_entry.entries[idx] + @property def case_sensitive(self) -> bool: + """Whether the filesystem is case sensitive.""" return self._case_sensitive @property def alt_separator(self) -> str: + """The alternative separator for the filesystem.""" return self._alt_separator @case_sensitive.setter def case_sensitive(self, value: bool) -> None: + """Set the case sensitivity of the filesystem (and all layers).""" self._case_sensitive = value self.root.case_sensitive = value for layer in self.layers: @@ -1357,14 +1420,14 @@ def case_sensitive(self, value: bool) -> None: @alt_separator.setter def alt_separator(self, value: str) -> None: + """Set the alternative separator for the filesystem (and all layers).""" self._alt_separator = value self.root.alt_separator = value for layer in self.layers: layer.alt_separator = value - def get(self, path: str, relentry: FilesystemEntry = None) -> FilesystemEntry: - self.target.log.debug("%r::get(%r)", self, path) - + def get(self, path: str, relentry: LayerFilesystemEntry = None) -> LayerFilesystemEntry: + """Get a :class:`FilesystemEntry` from the filesystem.""" entry = relentry or self._root_entry path = fsutil.normalize(path, alt_separator=self.alt_separator).strip("/") full_path = fsutil.join(entry.path, path, alt_separator=self.alt_separator) @@ -1388,9 +1451,10 @@ def get(self, path: str, relentry: FilesystemEntry = None) -> FilesystemEntry: raise NotASymlinkError(full_path) raise FileNotFoundError(full_path) - return RootFilesystemEntry(self, full_path, entries) + return LayerFilesystemEntry(self, full_path, entries) - def _get_from_entry(self, path: str, entry: FilesystemEntry) -> FilesystemEntry: + def _get_from_entry(self, path: str, entry: FilesystemEntry) -> LayerFilesystemEntry: + """Get a :class:`FilesystemEntry` relative from a specific entry.""" parts = path.split("/") for part in parts: @@ -1405,11 +1469,11 @@ def _get_from_entry(self, path: str, entry: FilesystemEntry) -> FilesystemEntry: class EntryList(list): """Wrapper list for filesystem entries. - Expose a getattr on a list of items. Useful in cases where - there's a virtual filesystem entry as well as a real one. + Expose a getattr on a list of items. Useful to access internal objects from specific filesystem implementations. + For example, access the underlying NTFS object from a list of virtual and NTFS entries. """ - def __init__(self, value: Any): + def __init__(self, value: FilesystemEntry | list[FilesystemEntry]): if not isinstance(value, list): value = [value] super().__init__(value) @@ -1424,20 +1488,12 @@ def __getattr__(self, attr: str) -> Any: return object.__getattribute__(self, attr) -class RootFilesystemEntry(FilesystemEntry): +class LayerFilesystemEntry(FilesystemEntry): def __init__(self, fs: Filesystem, path: str, entry: FilesystemEntry): super().__init__(fs, path, EntryList(entry)) - self.entries = self.entry + self.entries: EntryList = self.entry self._link = None - def __getattr__(self, attr): - for entry in self.entries: - try: - return getattr(entry, attr) - except AttributeError: - continue - return object.__getattribute__(self, attr) - def _exec(self, func: str, *args, **kwargs) -> Any: """Helper method to execute a method over all contained entries.""" exc = [] @@ -1451,18 +1507,16 @@ def _exec(self, func: str, *args, **kwargs) -> Any: exceptions = ",".join(exc) else: exceptions = "No entries" + raise FilesystemError(f"Can't resolve {func} for {self}: {exceptions}") def get(self, path: str) -> FilesystemEntry: - self.fs.target.log.debug("%r::get(%r)", self, path) return self.fs.get(path, self._resolve()) def open(self) -> BinaryIO: - self.fs.target.log.debug("%r::open()", self) return self._resolve()._exec("open") def iterdir(self) -> Iterator[str]: - self.fs.target.log.debug("%r::iterdir()", self) yielded = {".", ".."} selfentry = self._resolve() for fsentry in selfentry.entries: @@ -1475,7 +1529,6 @@ def iterdir(self) -> Iterator[str]: yielded.add(name) def scandir(self) -> Iterator[FilesystemEntry]: - self.fs.target.log.debug("%r::scandir()", self) # Every entry is actually a list of entries from the different # overlaying FSes, of which each may implement a different function # like .stat() or .open() @@ -1495,49 +1548,113 @@ def scandir(self) -> Iterator[FilesystemEntry]: # overlaying FSes may have different casing of the name. entry_name = entries[0].name path = fsutil.join(selfentry.path, entry_name, alt_separator=selfentry.fs.alt_separator) - yield RootFilesystemEntry(selfentry.fs, path, entries) + yield LayerFilesystemEntry(selfentry.fs, path, entries) def is_file(self, follow_symlinks: bool = True) -> bool: - self.fs.target.log.debug("%r::is_file()", self) try: return self._resolve(follow_symlinks=follow_symlinks)._exec("is_file", follow_symlinks=follow_symlinks) except FileNotFoundError: return False def is_dir(self, follow_symlinks: bool = True) -> bool: - self.fs.target.log.debug("%r::is_dir()", self) try: return self._resolve(follow_symlinks=follow_symlinks)._exec("is_dir", follow_symlinks=follow_symlinks) except FileNotFoundError: return False def is_symlink(self) -> bool: - self.fs.target.log.debug("%r::is_symlink()", self) return self._exec("is_symlink") def readlink(self) -> str: - self.fs.target.log.debug("%r::readlink()", self) if not self.is_symlink(): raise NotASymlinkError(f"Not a link: {self}") return self._exec("readlink") def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result: - self.fs.target.log.debug("%r::stat()", self) return self._resolve(follow_symlinks=follow_symlinks)._exec("stat", follow_symlinks=follow_symlinks) def lstat(self) -> fsutil.stat_result: - self.fs.target.log.debug("%r::lstat()", self) return self._exec("lstat") def attr(self) -> Any: - self.fs.target.log.debug("%r::attr()", self) return self._resolve()._exec("attr") def lattr(self) -> Any: - self.fs.target.log.debug("%r::lattr()", self) return self._exec("lattr") +class RootFilesystem(LayerFilesystem): + __type__ = "root" + + def __init__(self, target: Target): + self.target = target + super().__init__() + + @staticmethod + def detect(fh: BinaryIO) -> bool: + raise TypeError("Detect is not allowed on RootFilesystem class") + + def get(self, path: str, relentry: LayerFilesystemEntry = None) -> LayerFilesystemEntry: + self.target.log.debug("%r::get(%r)", self, path) + entry = super().get(path, relentry) + entry.__class__ = RootFilesystemEntry + return entry + + +class RootFilesystemEntry(LayerFilesystemEntry): + fs: RootFilesystem + + def get(self, path: str) -> RootFilesystemEntry: + self.fs.target.log.debug("%r::get(%r)", self, path) + entry = super().get(path) + entry.__class__ = RootFilesystemEntry + return entry + + def open(self) -> BinaryIO: + self.fs.target.log.debug("%r::open()", self) + return super().open() + + def iterdir(self) -> Iterator[str]: + self.fs.target.log.debug("%r::iterdir()", self) + yield from super().iterdir() + + def scandir(self) -> Iterator[FilesystemEntry]: + self.fs.target.log.debug("%r::scandir()", self) + yield from super().scandir() + + def is_file(self, follow_symlinks: bool = True) -> bool: + self.fs.target.log.debug("%r::is_file()", self) + return super().is_file(follow_symlinks=follow_symlinks) + + def is_dir(self, follow_symlinks: bool = True) -> bool: + self.fs.target.log.debug("%r::is_dir()", self) + return super().is_dir(follow_symlinks=follow_symlinks) + + def is_symlink(self) -> bool: + self.fs.target.log.debug("%r::is_symlink()", self) + return super().is_symlink() + + def readlink(self) -> str: + self.fs.target.log.debug("%r::readlink()", self) + return super().readlink() + + def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result: + self.fs.target.log.debug("%r::stat()", self) + return super().stat(follow_symlinks=follow_symlinks) + + def lstat(self) -> fsutil.stat_result: + self.fs.target.log.debug("%r::lstat()", self) + return super().lstat() + + def attr(self) -> Any: + self.fs.target.log.debug("%r::attr()", self) + return super().attr() + + def lattr(self) -> Any: + self.fs.target.log.debug("%r::lattr()", self) + return super().lattr() + + def register(module: str, class_name: str, internal: bool = True) -> None: """Register a filesystem implementation to use when opening a filesystem. diff --git a/dissect/target/plugins/filesystem/walkfs.py b/dissect/target/plugins/filesystem/walkfs.py index 626eec4df..ba22c0a2d 100644 --- a/dissect/target/plugins/filesystem/walkfs.py +++ b/dissect/target/plugins/filesystem/walkfs.py @@ -3,7 +3,7 @@ from dissect.util.ts import from_unix from dissect.target.exceptions import FileNotFoundError, UnsupportedPluginError -from dissect.target.filesystem import RootFilesystemEntry +from dissect.target.filesystem import LayerFilesystemEntry from dissect.target.helpers.fsutil import TargetPath from dissect.target.helpers.record import TargetRecordDescriptor from dissect.target.plugin import Plugin, export @@ -50,7 +50,7 @@ def generate_record(target: Target, path: TargetPath) -> FilesystemRecord: stat = path.lstat() btime = from_unix(stat.st_birthtime) if stat.st_birthtime else None entry = path.get() - if isinstance(entry, RootFilesystemEntry): + if isinstance(entry, LayerFilesystemEntry): fs_types = [sub_entry.fs.__type__ for sub_entry in entry.entries] else: fs_types = [entry.fs.__type__] diff --git a/dissect/target/tools/shell.py b/dissect/target/tools/shell.py index 77ab76ae2..2f8cb34c3 100644 --- a/dissect/target/tools/shell.py +++ b/dissect/target/tools/shell.py @@ -31,7 +31,7 @@ RegistryValueNotFoundError, TargetError, ) -from dissect.target.filesystem import FilesystemEntry, RootFilesystemEntry +from dissect.target.filesystem import FilesystemEntry, LayerFilesystemEntry from dissect.target.helpers import cyber, fsutil, regutil from dissect.target.plugin import arg from dissect.target.target import Target @@ -469,7 +469,7 @@ def scandir(self, path: str, color: bool = False) -> list[tuple[fsutil.TargetPat # If we happen to scan an NTFS filesystem see if any of the # entries has an alternative data stream and also list them. entry = file_.get() - if isinstance(entry, RootFilesystemEntry): + if isinstance(entry, LayerFilesystemEntry): if entry.entries.fs.__type__ == "ntfs": attrs = entry.lattr() for data_stream in attrs.DATA: diff --git a/tests/test_filesystem.py b/tests/test_filesystem.py index df9764b3b..0d0439342 100644 --- a/tests/test_filesystem.py +++ b/tests/test_filesystem.py @@ -20,6 +20,7 @@ ) from dissect.target.filesystem import ( FilesystemEntry, + LayerFilesystem, MappedFile, NotASymlinkError, RootFilesystem, @@ -797,7 +798,7 @@ def top_virt_dir() -> VirtualDirectory: return VirtualDirectory(Mock(), "") -def test_virutal_directory_stat(virt_dir: VirtualDirectory, top_virt_dir: VirtualDirectory) -> None: +def test_virtual_directory_stat(virt_dir: VirtualDirectory, top_virt_dir: VirtualDirectory) -> None: assert virt_dir.stat(follow_symlinks=False) == virt_dir._stat() assert virt_dir.stat(follow_symlinks=True) == virt_dir._stat() @@ -806,7 +807,7 @@ def test_virutal_directory_stat(virt_dir: VirtualDirectory, top_virt_dir: Virtua assert virt_dir.stat(follow_symlinks=True) == top_virt_dir.stat(follow_symlinks=True) -def test_virutal_directory_lstat(virt_dir: VirtualDirectory, top_virt_dir: VirtualDirectory) -> None: +def test_virtual_directory_lstat(virt_dir: VirtualDirectory, top_virt_dir: VirtualDirectory) -> None: assert virt_dir.lstat() == virt_dir._stat() assert virt_dir.lstat() == virt_dir.stat(follow_symlinks=False) assert virt_dir.lstat().st_mode == stat.S_IFDIR @@ -815,12 +816,12 @@ def test_virutal_directory_lstat(virt_dir: VirtualDirectory, top_virt_dir: Virtu assert virt_dir.lstat() == top_virt_dir.lstat() -def test_virutal_directory_is_dir(virt_dir: VirtualDirectory) -> None: +def test_virtual_directory_is_dir(virt_dir: VirtualDirectory) -> None: assert virt_dir.is_dir(follow_symlinks=True) assert virt_dir.is_dir(follow_symlinks=False) -def test_virutal_directory_is_file(virt_dir: VirtualDirectory) -> None: +def test_virtual_directory_is_file(virt_dir: VirtualDirectory) -> None: assert not virt_dir.is_file(follow_symlinks=True) assert not virt_dir.is_file(follow_symlinks=False) @@ -830,21 +831,21 @@ def virt_file() -> VirtualFile: return VirtualFile(Mock(), "", Mock()) -def test_virutal_file_stat(virt_file: VirtualFile) -> None: +def test_virtual_file_stat(virt_file: VirtualFile) -> None: assert virt_file.stat(follow_symlinks=False) == virt_file.lstat() assert virt_file.stat(follow_symlinks=True) == virt_file.lstat() -def test_virutal_file_lstat(virt_file: VirtualFile) -> None: +def test_virtual_file_lstat(virt_file: VirtualFile) -> None: assert virt_file.lstat().st_mode == stat.S_IFREG -def test_virutal_file_is_dir(virt_file: VirtualFile) -> None: +def test_virtual_file_is_dir(virt_file: VirtualFile) -> None: assert not virt_file.is_dir(follow_symlinks=True) assert not virt_file.is_dir(follow_symlinks=False) -def test_virutal_file_is_file(virt_file: VirtualFile) -> None: +def test_virtual_file_is_file(virt_file: VirtualFile) -> None: assert virt_file.is_file(follow_symlinks=True) assert virt_file.is_file(follow_symlinks=False) @@ -1164,3 +1165,37 @@ def test_virtual_filesystem_map_file_from_tar() -> None: stat = mock_fs.path("/var/example/test.txt").stat() assert ts.from_unix(stat.st_mtime) == datetime(2021, 12, 6, 9, 51, 40, tzinfo=timezone.utc) + + +def test_layer_filesystem() -> None: + lfs = LayerFilesystem() + + vfs1 = VirtualFilesystem() + vfs1.map_file_fh("file1", BytesIO(b"value1")) + + vfs2 = VirtualFilesystem() + vfs2.map_file_fh("file2", BytesIO(b"value2")) + + vfs3 = VirtualFilesystem() + vfs3.map_file_fh("file3", BytesIO(b"value3")) + + vfs4 = VirtualFilesystem() + vfs4.map_file_fh("file1", BytesIO(b"value4")) + + lfs.add_fs_layer(vfs1) + assert lfs.path("file1").read_text() == "value1" + + lfs.add_fs_layer(vfs2) + assert lfs.path("file1").read_text() == "value1" + assert lfs.path("file2").read_text() == "value2" + + lfs.add_fs_layer(vfs3) + assert lfs.path("file1").read_text() == "value1" + assert lfs.path("file2").read_text() == "value2" + assert lfs.path("file3").read_text() == "value3" + + lfs.add_fs_layer(vfs4) + assert lfs.path("file1").read_text() == "value1" + lfs.remove_fs_layer(vfs4) + lfs.prepend_fs_layer(vfs4) + assert lfs.path("file1").read_text() == "value4" From f3e01ad3be32b270cd6d94edee9605b64930f84a Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Thu, 14 Mar 2024 13:39:58 +0100 Subject: [PATCH 2/6] Allow multiple accessors in Velociraptor loader --- dissect/target/loaders/dir.py | 27 +++++++++++++++++++++----- dissect/target/loaders/velociraptor.py | 4 ++++ tests/loaders/test_velociraptor.py | 21 ++++++++++++++------ 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/dissect/target/loaders/dir.py b/dissect/target/loaders/dir.py index a86a540b0..d5a231b5a 100644 --- a/dissect/target/loaders/dir.py +++ b/dissect/target/loaders/dir.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import TYPE_CHECKING +from dissect.target.filesystem import LayerFilesystem from dissect.target.filesystems.dir import DirectoryFilesystem from dissect.target.filesystems.zip import ZipFilesystem from dissect.target.helpers import loaderutil @@ -48,6 +49,7 @@ def map_dirs(target: Target, dirs: list[Path | tuple[str, Path]], os_type: str, alt_separator = "\\" case_sensitive = False + drive_letter_map = {} for path in dirs: drive_letter = None if isinstance(path, tuple): @@ -59,13 +61,28 @@ def map_dirs(target: Target, dirs: list[Path | tuple[str, Path]], os_type: str, dfs = ZipFilesystem(path.root.fp, path.at, alt_separator=alt_separator, case_sensitive=case_sensitive) else: dfs = DirectoryFilesystem(path, alt_separator=alt_separator, case_sensitive=case_sensitive) - target.filesystems.add(dfs) - if os_type == OperatingSystem.WINDOWS: - loaderutil.add_virtual_ntfs_filesystem(target, dfs, **kwargs) + drive_letter_map.setdefault(drive_letter, []).append(dfs) + + fs_to_add = [] + for drive_letter, dfs in drive_letter_map.items(): + if drive_letter is not None: + if len(dfs) > 1: + vfs = LayerFilesystem() + for fs in dfs: + vfs.add_fs_layer(fs) + else: + vfs = dfs[0] - if drive_letter is not None: - target.fs.mount(drive_letter.lower() + ":", dfs) + fs_to_add.append(vfs) + target.fs.mount(drive_letter.lower() + ":", vfs) + else: + fs_to_add.extend(dfs) + + for fs in fs_to_add: + target.filesystems.add(fs) + if os_type == OperatingSystem.WINDOWS: + loaderutil.add_virtual_ntfs_filesystem(target, fs, **kwargs) def find_and_map_dirs(target: Target, path: Path, **kwargs) -> None: diff --git a/dissect/target/loaders/velociraptor.py b/dissect/target/loaders/velociraptor.py index c570337f6..3d3e53f74 100644 --- a/dissect/target/loaders/velociraptor.py +++ b/dissect/target/loaders/velociraptor.py @@ -61,6 +61,10 @@ def extract_drive_letter(name: str) -> Optional[str]: if len(name) == 14 and name.startswith("%5C%5C.%5C") and name.endswith("%3A"): return name[10].lower() + # X: in URL encoding + if len(name) == 4 and name.endswith("%3A"): + return name[0].lower() + class VelociraptorLoader(DirLoader): """Load Rapid7 Velociraptor forensic image files. diff --git a/tests/loaders/test_velociraptor.py b/tests/loaders/test_velociraptor.py index bf2318401..3b1291e19 100644 --- a/tests/loaders/test_velociraptor.py +++ b/tests/loaders/test_velociraptor.py @@ -43,11 +43,19 @@ def create_root(sub_dir: str, tmp_path: Path) -> Path: @pytest.mark.parametrize( - "sub_dir", - ["mft", "ntfs", "ntfs_vss", "lazy_ntfs", "auto"], + "sub_dir, other_dir", + [ + ("mft", "auto"), + ("ntfs", "auto"), + ("ntfs_vss", "auto"), + ("lazy_ntfs", "auto"), + ("auto", "ntfs"), + ], ) -def test_velociraptor_loader_windows_ntfs(sub_dir: str, target_bare: Target, tmp_path: Path) -> None: +def test_windows_ntfs(sub_dir: str, other_dir: str, target_bare: Target, tmp_path: Path) -> None: root = create_root(sub_dir, tmp_path) + root.joinpath(f"uploads/{other_dir}/C%3A").mkdir(parents=True, exist_ok=True) + root.joinpath(f"uploads/{other_dir}/C%3A/other.txt").write_text("my first file") assert VelociraptorLoader.detect(root) is True @@ -65,13 +73,14 @@ def test_velociraptor_loader_windows_ntfs(sub_dir: str, target_bare: Target, tmp assert usnjrnl_records == 2 assert len(target_bare.filesystems) == 4 assert target_bare.fs.path("sysvol/C-DRIVE.txt").exists() + assert target_bare.fs.path("sysvol/other.txt").read_text() == "my first file" @pytest.mark.parametrize( "sub_dir", ["mft", "ntfs", "ntfs_vss", "lazy_ntfs", "auto"], ) -def test_velociraptor_loader_windows_ntfs_zip(sub_dir: str, target_bare: Target, tmp_path: Path) -> None: +def test_windows_ntfs_zip(sub_dir: str, target_bare: Target, tmp_path: Path) -> None: create_root(sub_dir, tmp_path) shutil.make_archive(tmp_path.joinpath("test_ntfs"), "zip", tmp_path) @@ -106,7 +115,7 @@ def test_velociraptor_loader_windows_ntfs_zip(sub_dir: str, target_bare: Target, (["uploads/auto/Library", "uploads/auto/Applications"]), ], ) -def test_dir_loader_unix(paths: list[str], target_bare: Target, tmp_path: Path) -> None: +def test_unix(paths: list[str], target_bare: Target, tmp_path: Path) -> None: root = tmp_path mkdirs(root, paths) @@ -131,7 +140,7 @@ def test_dir_loader_unix(paths: list[str], target_bare: Target, tmp_path: Path) (["uploads/auto/Library", "uploads/auto/Applications"]), ], ) -def test_dir_loader_unix_zip(paths: list[str], target_bare: Target, tmp_path: Path) -> None: +def test_unix_zip(paths: list[str], target_bare: Target, tmp_path: Path) -> None: root = tmp_path mkdirs(root, paths) From f2f11293a090c8eeb8177d6be2411c30555f1be3 Mon Sep 17 00:00:00 2001 From: Erik Schamper <1254028+Schamper@users.noreply.github.com> Date: Wed, 3 Apr 2024 16:05:37 +0200 Subject: [PATCH 3/6] Update test_filesystem.py --- tests/test_filesystem.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_filesystem.py b/tests/test_filesystem.py index 5451a146d..97aa3f79f 100644 --- a/tests/test_filesystem.py +++ b/tests/test_filesystem.py @@ -1193,6 +1193,7 @@ def test_layer_filesystem() -> None: assert lfs.path("file1").read_text() == "value1" assert lfs.path("file2").read_text() == "value2" assert lfs.path("file3").read_text() == "value3" + lfs.append_fs_layer(vfs4) assert lfs.path("file1").read_text() == "value4" lfs.remove_fs_layer(vfs4) From 161d655182188ed9481942f22febab235cfc806e Mon Sep 17 00:00:00 2001 From: Erik Schamper <1254028+Schamper@users.noreply.github.com> Date: Wed, 3 Apr 2024 16:06:37 +0200 Subject: [PATCH 4/6] Update dir.py --- dissect/target/loaders/dir.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dissect/target/loaders/dir.py b/dissect/target/loaders/dir.py index d5a231b5a..b33e4aa37 100644 --- a/dissect/target/loaders/dir.py +++ b/dissect/target/loaders/dir.py @@ -70,7 +70,7 @@ def map_dirs(target: Target, dirs: list[Path | tuple[str, Path]], os_type: str, if len(dfs) > 1: vfs = LayerFilesystem() for fs in dfs: - vfs.add_fs_layer(fs) + vfs.append_fs_layer(fs) else: vfs = dfs[0] From a0eeecd0f04ab84990ccb37bdab35c1a395a5f8d Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Wed, 17 Apr 2024 18:47:57 +0200 Subject: [PATCH 5/6] Update docstring --- dissect/target/loaders/velociraptor.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dissect/target/loaders/velociraptor.py b/dissect/target/loaders/velociraptor.py index 3d3e53f74..68737cf7f 100644 --- a/dissect/target/loaders/velociraptor.py +++ b/dissect/target/loaders/velociraptor.py @@ -75,10 +75,7 @@ class VelociraptorLoader(DirLoader): {"Generic.Collectors.File":{"Root":"/","collectionSpec":"Glob\\netc/**\\nvar/log/**"}} Generic.Collectors.File (Windows) and Windows.KapeFiles.Targets (Windows) uses the accessors mft, ntfs, lazy_ntfs, - ntfs_vss and auto. The loader only supports a collection where a single accessor is used, which can be forced by - using the following configuration:: - - {"Windows.KapeFiles.Targets":{"VSSAnalysisAge":"1000","_SANS_Triage":"Y"}} + ntfs_vss and auto. The loader supports a collection where multiple accessors were used. References: - https://www.rapid7.com/products/velociraptor/ From af01ff06ea697672f3e0af521316601f1b79a080 Mon Sep 17 00:00:00 2001 From: Erik Schamper <1254028+Schamper@users.noreply.github.com> Date: Thu, 18 Apr 2024 08:11:05 +0000 Subject: [PATCH 6/6] Use defaultdict --- dissect/target/loaders/dir.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dissect/target/loaders/dir.py b/dissect/target/loaders/dir.py index b33e4aa37..22a55e549 100644 --- a/dissect/target/loaders/dir.py +++ b/dissect/target/loaders/dir.py @@ -1,6 +1,7 @@ from __future__ import annotations import zipfile +from collections import defaultdict from pathlib import Path from typing import TYPE_CHECKING @@ -49,7 +50,7 @@ def map_dirs(target: Target, dirs: list[Path | tuple[str, Path]], os_type: str, alt_separator = "\\" case_sensitive = False - drive_letter_map = {} + drive_letter_map = defaultdict(list) for path in dirs: drive_letter = None if isinstance(path, tuple): @@ -62,7 +63,7 @@ def map_dirs(target: Target, dirs: list[Path | tuple[str, Path]], os_type: str, else: dfs = DirectoryFilesystem(path, alt_separator=alt_separator, case_sensitive=case_sensitive) - drive_letter_map.setdefault(drive_letter, []).append(dfs) + drive_letter_map[drive_letter].append(dfs) fs_to_add = [] for drive_letter, dfs in drive_letter_map.items():