diff --git a/dissect/target/filesystem.py b/dissect/target/filesystem.py index cd1c89684..b95939556 100644 --- a/dissect/target/filesystem.py +++ b/dissect/target/filesystem.py @@ -525,21 +525,21 @@ def _resolve(self, follow_symlinks: bool = True) -> FilesystemEntry: follow_symlinks: Whether to resolve the entry if it is a symbolic link. Returns: - The resolved symbolic link if ``follow_symlinks`` is ``True`` and the ``FilesystemEntry`` is a - symbolic link or else the ``FilesystemEntry`` itself. + The resolved symbolic link if ``follow_symlinks`` is ``True`` and the :class:`FilesystemEntry` is a + symbolic link or else the :class:`FilesystemEntry` itself. """ if follow_symlinks and self.is_symlink(): return self.readlink_ext() return self def get(self, path: str) -> FilesystemEntry: - """Retrieve a FilesystemEntry relative to this entry. + """Retrieve a :class:`FilesystemEntry` relative to this entry. Args: path: The path relative to this filesystem entry. Returns: - A relative FilesystemEntry. + A relative :class:`FilesystemEntry`. """ raise NotImplementedError() @@ -560,10 +560,10 @@ def iterdir(self) -> Iterator[str]: raise NotImplementedError() def scandir(self) -> Iterator[FilesystemEntry]: - """Iterate over the contents of a directory, return them as FilesystemEntry's. + """Iterate over the contents of a directory, yields :class:`FilesystemEntry`. Returns: - An iterator of directory entries as FilesystemEntry's. + An iterator of :class:`FilesystemEntry`. """ raise NotImplementedError() @@ -576,10 +576,10 @@ def listdir(self) -> list[str]: return list(self.iterdir()) def listdir_ext(self) -> list[FilesystemEntry]: - """List the contents of a directory as FilesystemEntry's. + """List the contents of a directory as a list of :class:`FilesystemEntry`. Returns: - A list of FilesystemEntry's. + A list of :class:`FilesystemEntry`. """ return list(self.scandir()) @@ -614,7 +614,7 @@ def walk_ext( onerror: Optional[Callable] = None, followlinks: bool = False, ) -> Iterator[FilesystemEntry]: - """Walk a directory and show its contents as FilesystemEntry's. + """Walk a directory and show its contents as :class:`FilesystemEntry`. It walks across all the files inside the entry recursively. @@ -629,11 +629,11 @@ def walk_ext( followlinks: ``True`` if we want to follow any symbolic link Returns: - An iterator of directory entries as FilesystemEntry's. + An iterator of :class:`FilesystemEntry`. """ yield from fsutil.walk_ext(self, topdown, onerror, followlinks) - def glob(self, pattern) -> Iterator[str]: + def glob(self, pattern: str) -> Iterator[str]: """Iterate over this directory part of ``patern``, returning entries matching ``pattern`` as strings. Args: @@ -645,21 +645,23 @@ def glob(self, pattern) -> Iterator[str]: for entry in self.glob_ext(pattern): yield entry.path - def glob_ext(self, pattern) -> Iterator[FilesystemEntry]: - """Iterate over the directory part of ``pattern``, returning entries matching ``pattern`` as FilesysmteEntry's. + def glob_ext(self, pattern: str) -> Iterator[FilesystemEntry]: + """Iterate over the directory part of ``pattern``, returning entries matching + ``pattern`` as :class:`FilesysmteEntry`. Args: pattern: The pattern to glob for. Returns: - An iterator of FilesystemEntry's that match the pattern. + An iterator of :class:`FilesystemEntry` that match the pattern. """ yield from fsutil.glob_ext(self, pattern) def exists(self, path: str) -> bool: """Determines whether a ``path``, relative to this entry, exists. - If the `path` is a symbolic link, it will attempt to resolve it to find the FilesystemEntry it points to. + If the `path` is a symbolic link, it will attempt to resolve it to find + the :class:`FilesystemEntry` it points to. Args: path: The path relative to this entry. @@ -737,7 +739,7 @@ def readlink(self) -> str: raise NotImplementedError() def readlink_ext(self) -> FilesystemEntry: - """Read the link where this entry points to, return the resulting path as FilesystemEntry. + """Read the link where this entry points to, return the resulting path as :class:`FilesystemEntry`. If it is a symlink and returns the string that corresponds to that path. This means it follows the path a link points to, it tries to do it recursively. @@ -860,7 +862,7 @@ def lattr(self) -> Any: raise TypeError(f"lattr is not allowed on VirtualDirectory: {self.path}") def add(self, name: str, entry: FilesystemEntry) -> None: - """Add an entry to this VirtualDirectory.""" + """Add an entry to this :class:`VirtualDirectory`.""" if not self.fs.case_sensitive: name = name.lower() @@ -1214,7 +1216,7 @@ def map_file_fh(self, vfspath: str, fh: BinaryIO) -> None: self.map_file_entry(vfspath, VirtualFile(self, file_path, fh)) def map_file_entry(self, vfspath: str, entry: FilesystemEntry) -> None: - """Map a FilesystemEntry into the VFS. + """Map a :class:`FilesystemEntry` into the VFS. Any missing subdirectories up to, but not including, the last part of ``vfspath`` will be created. @@ -1271,7 +1273,7 @@ def map_file_from_tar(self, vfspath: str, tar_file: str | pathlib.Path) -> None: return self.map_dir_from_tar(vfspath.lstrip("/"), tar_file, map_single_file=True) def link(self, src: str, dst: str) -> None: - """Hard link a FilesystemEntry to another location. + """Hard link a :class:`FilesystemEntry` to another location. Args: src: The path to the target of the link. @@ -1291,65 +1293,132 @@ def symlink(self, src: str, dst: str) -> None: self.map_file_entry(dst, VirtualSymlink(self, dst, src)) -class RootFilesystem(Filesystem): - __type__ = "root" +class LayerFilesystem(Filesystem): + __type__ = "layer" - def __init__(self, target: Target): - self.target = target - self.layers = [] + def __init__(self, **kwargs): + self.layers: list[Filesystem] = [] self.mounts = {} self._alt_separator = "/" self._case_sensitive = True - self._root_entry = RootFilesystemEntry(self, "/", []) - self.root = self.add_layer() - super().__init__(None) + self._root_entry = LayerFilesystemEntry(self, "/", []) + self.root = self.append_layer() + super().__init__(None, **kwargs) + + def __getattr__(self, attr: str) -> Any: + """Provide "magic" access to filesystem specific attributes from any of the layers. + + For example, on a :class:`LayerFilesystem` ``fs``, you can do ``fs.ntfs`` to access the + internal NTFS object if it has an NTFS layer. + """ + for fs in self.layers: + try: + return getattr(fs, attr) + except AttributeError: + continue + else: + return object.__getattribute__(self, attr) @staticmethod def detect(fh: BinaryIO) -> bool: - raise TypeError("Detect is not allowed on RootFilesystem class") + raise TypeError("Detect is not allowed on LayerFilesystem class") - def mount(self, path: str, fs: Filesystem) -> None: + def mount(self, path: str, fs: Filesystem, ignore_existing: bool = True) -> None: """Mount a filesystem at a given path. If there's an overlap with an existing mount, creates a new layer. + + Args: + path: The path to mount the filesystem at. + fs: The filesystem to mount. + ignore_existing: Whether to ignore existing mounts and create a new layer. Defaults to ``True``. """ root = self.root for mount in self.mounts.keys(): - if path == mount: + if ignore_existing and path == mount: continue if path.startswith(mount): - root = self.add_layer() + root = self.append_layer() break root.map_fs(path, fs) self.mounts[path] = fs def link(self, dst: str, src: str) -> None: - """Hard link a RootFilesystemEntry to another location.""" - dst = fsutil.normalize(dst, alt_separator=self.alt_separator) + """Hard link a :class:`FilesystemEntry` to another location.""" self.root.map_file_entry(dst, self.get(src)) def symlink(self, dst: str, src: str) -> None: """Create a symlink to another location.""" self.root.symlink(dst, src) - def add_layer(self, **kwargs) -> VirtualFilesystem: + def append_layer(self, **kwargs) -> VirtualFilesystem: + """Append a new layer.""" + layer = VirtualFilesystem(case_sensitive=self.case_sensitive, alt_separator=self.alt_separator, **kwargs) + self.append_fs_layer(layer) + return layer + + add_layer = append_layer + + def prepend_layer(self, **kwargs) -> VirtualFilesystem: + """Prepend a new layer.""" layer = VirtualFilesystem(case_sensitive=self.case_sensitive, alt_separator=self.alt_separator, **kwargs) - self.layers.append(layer) - self._root_entry.entries.append(layer.root) + self.prepend_fs_layer(layer) return layer + def append_fs_layer(self, fs: Filesystem) -> None: + """Append a filesystem as a layer. + + Args: + fs: The filesystem to append. + """ + # Counterintuitively, we prepend the filesystem to the list of layers + # We could reverse the list of layers upon iteration, but that is a hot path + self.layers.insert(0, fs) + self._root_entry.entries.insert(0, fs.get("/")) + + def prepend_fs_layer(self, fs: Filesystem) -> None: + """Prepend a filesystem as a layer. + + Args: + fs: The filesystem to prepend. + """ + # Counterintuitively, we append the filesystem to the list of layers + # We could reverse the list of layers upon iteration, but that is a hot path + self.layers.append(fs) + self._root_entry.entries.append(fs.get("/")) + + def remove_fs_layer(self, fs: Filesystem) -> None: + """Remove a filesystem layer. + + Args: + fs: The filesystem to remove. + """ + self.remove_layer(self.layers.index(fs)) + + def remove_layer(self, idx: int) -> None: + """Remove a layer by index. + + Args: + idx: The index of the layer to remove. + """ + del self.layers[idx] + del self._root_entry.entries[idx] + @property def case_sensitive(self) -> bool: + """Whether the filesystem is case sensitive.""" return self._case_sensitive @property def alt_separator(self) -> str: + """The alternative separator of the filesystem.""" return self._alt_separator @case_sensitive.setter def case_sensitive(self, value: bool) -> None: + """Set the case sensitivity of the filesystem (and all layers).""" self._case_sensitive = value self.root.case_sensitive = value for layer in self.layers: @@ -1357,14 +1426,14 @@ def case_sensitive(self, value: bool) -> None: @alt_separator.setter def alt_separator(self, value: str) -> None: + """Set the alternative separator of the filesystem (and all layers).""" self._alt_separator = value self.root.alt_separator = value for layer in self.layers: layer.alt_separator = value - def get(self, path: str, relentry: FilesystemEntry = None) -> FilesystemEntry: - self.target.log.debug("%r::get(%r)", self, path) - + def get(self, path: str, relentry: Optional[LayerFilesystemEntry] = None) -> LayerFilesystemEntry: + """Get a :class:`FilesystemEntry` from the filesystem.""" entry = relentry or self._root_entry path = fsutil.normalize(path, alt_separator=self.alt_separator).strip("/") full_path = fsutil.join(entry.path, path, alt_separator=self.alt_separator) @@ -1388,9 +1457,10 @@ def get(self, path: str, relentry: FilesystemEntry = None) -> FilesystemEntry: raise NotASymlinkError(full_path) raise FileNotFoundError(full_path) - return RootFilesystemEntry(self, full_path, entries) + return LayerFilesystemEntry(self, full_path, entries) def _get_from_entry(self, path: str, entry: FilesystemEntry) -> FilesystemEntry: + """Get a :class:`FilesystemEntry` relative to a specific entry.""" parts = path.split("/") for part in parts: @@ -1405,11 +1475,11 @@ def _get_from_entry(self, path: str, entry: FilesystemEntry) -> FilesystemEntry: class EntryList(list): """Wrapper list for filesystem entries. - Expose a getattr on a list of items. Useful in cases where - there's a virtual filesystem entry as well as a real one. + Exposes a ``__getattr__`` on a list of items. Useful to access internal objects from filesystem implementations. + For example, access the underlying NTFS object from a list of virtual and NTFS entries. """ - def __init__(self, value: Any): + def __init__(self, value: FilesystemEntry | list[FilesystemEntry]): if not isinstance(value, list): value = [value] super().__init__(value) @@ -1424,20 +1494,12 @@ def __getattr__(self, attr: str) -> Any: return object.__getattribute__(self, attr) -class RootFilesystemEntry(FilesystemEntry): +class LayerFilesystemEntry(FilesystemEntry): def __init__(self, fs: Filesystem, path: str, entry: FilesystemEntry): super().__init__(fs, path, EntryList(entry)) - self.entries = self.entry + self.entries: EntryList = self.entry self._link = None - def __getattr__(self, attr): - for entry in self.entries: - try: - return getattr(entry, attr) - except AttributeError: - continue - return object.__getattribute__(self, attr) - def _exec(self, func: str, *args, **kwargs) -> Any: """Helper method to execute a method over all contained entries.""" exc = [] @@ -1451,18 +1513,16 @@ def _exec(self, func: str, *args, **kwargs) -> Any: exceptions = ",".join(exc) else: exceptions = "No entries" + raise FilesystemError(f"Can't resolve {func} for {self}: {exceptions}") def get(self, path: str) -> FilesystemEntry: - self.fs.target.log.debug("%r::get(%r)", self, path) return self.fs.get(path, self._resolve()) def open(self) -> BinaryIO: - self.fs.target.log.debug("%r::open()", self) return self._resolve()._exec("open") def iterdir(self) -> Iterator[str]: - self.fs.target.log.debug("%r::iterdir()", self) yielded = {".", ".."} selfentry = self._resolve() for fsentry in selfentry.entries: @@ -1474,8 +1534,7 @@ def iterdir(self) -> Iterator[str]: yield entry_name yielded.add(name) - def scandir(self) -> Iterator[FilesystemEntry]: - self.fs.target.log.debug("%r::scandir()", self) + def scandir(self) -> Iterator[LayerFilesystemEntry]: # Every entry is actually a list of entries from the different # overlaying FSes, of which each may implement a different function # like .stat() or .open() @@ -1495,49 +1554,115 @@ def scandir(self) -> Iterator[FilesystemEntry]: # overlaying FSes may have different casing of the name. entry_name = entries[0].name path = fsutil.join(selfentry.path, entry_name, alt_separator=selfentry.fs.alt_separator) - yield RootFilesystemEntry(selfentry.fs, path, entries) + yield LayerFilesystemEntry(selfentry.fs, path, entries) def is_file(self, follow_symlinks: bool = True) -> bool: - self.fs.target.log.debug("%r::is_file()", self) try: return self._resolve(follow_symlinks=follow_symlinks)._exec("is_file", follow_symlinks=follow_symlinks) except FileNotFoundError: return False def is_dir(self, follow_symlinks: bool = True) -> bool: - self.fs.target.log.debug("%r::is_dir()", self) try: return self._resolve(follow_symlinks=follow_symlinks)._exec("is_dir", follow_symlinks=follow_symlinks) except FileNotFoundError: return False def is_symlink(self) -> bool: - self.fs.target.log.debug("%r::is_symlink()", self) return self._exec("is_symlink") def readlink(self) -> str: - self.fs.target.log.debug("%r::readlink()", self) if not self.is_symlink(): raise NotASymlinkError(f"Not a link: {self}") return self._exec("readlink") def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result: - self.fs.target.log.debug("%r::stat()", self) return self._resolve(follow_symlinks=follow_symlinks)._exec("stat", follow_symlinks=follow_symlinks) def lstat(self) -> fsutil.stat_result: - self.fs.target.log.debug("%r::lstat()", self) return self._exec("lstat") def attr(self) -> Any: - self.fs.target.log.debug("%r::attr()", self) return self._resolve()._exec("attr") def lattr(self) -> Any: - self.fs.target.log.debug("%r::lattr()", self) return self._exec("lattr") +class RootFilesystem(LayerFilesystem): + __type__ = "root" + + def __init__(self, target: Target): + self.target = target + super().__init__() + + @staticmethod + def detect(fh: BinaryIO) -> bool: + raise TypeError("Detect is not allowed on RootFilesystem class") + + def get(self, path: str, relentry: Optional[LayerFilesystemEntry] = None) -> RootFilesystemEntry: + self.target.log.debug("%r::get(%r)", self, path) + entry = super().get(path, relentry) + entry.__class__ = RootFilesystemEntry + return entry + + +class RootFilesystemEntry(LayerFilesystemEntry): + fs: RootFilesystem + + def get(self, path: str) -> RootFilesystemEntry: + self.fs.target.log.debug("%r::get(%r)", self, path) + entry = super().get(path) + entry.__class__ = RootFilesystemEntry + return entry + + def open(self) -> BinaryIO: + self.fs.target.log.debug("%r::open()", self) + return super().open() + + def iterdir(self) -> Iterator[str]: + self.fs.target.log.debug("%r::iterdir()", self) + yield from super().iterdir() + + def scandir(self) -> Iterator[RootFilesystemEntry]: + self.fs.target.log.debug("%r::scandir()", self) + for entry in super().scandir(): + entry.__class__ = RootFilesystemEntry + yield entry + + def is_file(self, follow_symlinks: bool = True) -> bool: + self.fs.target.log.debug("%r::is_file()", self) + return super().is_file(follow_symlinks=follow_symlinks) + + def is_dir(self, follow_symlinks: bool = True) -> bool: + self.fs.target.log.debug("%r::is_dir()", self) + return super().is_dir(follow_symlinks=follow_symlinks) + + def is_symlink(self) -> bool: + self.fs.target.log.debug("%r::is_symlink()", self) + return super().is_symlink() + + def readlink(self) -> str: + self.fs.target.log.debug("%r::readlink()", self) + return super().readlink() + + def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result: + self.fs.target.log.debug("%r::stat()", self) + return super().stat(follow_symlinks=follow_symlinks) + + def lstat(self) -> fsutil.stat_result: + self.fs.target.log.debug("%r::lstat()", self) + return super().lstat() + + def attr(self) -> Any: + self.fs.target.log.debug("%r::attr()", self) + return super().attr() + + def lattr(self) -> Any: + self.fs.target.log.debug("%r::lattr()", self) + return super().lattr() + + def register(module: str, class_name: str, internal: bool = True) -> None: """Register a filesystem implementation to use when opening a filesystem. diff --git a/dissect/target/loaders/vb.py b/dissect/target/loaders/vb.py index ecaa7d37b..543ef0a46 100644 --- a/dissect/target/loaders/vb.py +++ b/dissect/target/loaders/vb.py @@ -14,8 +14,8 @@ def detect(path): return (mft_exists or c_drive_exists) and config_exists def map(self, target): - ntfs_overlay = target.fs.add_layer() - remap_overlay = target.fs.add_layer() + remap_overlay = target.fs.append_layer() + ntfs_overlay = target.fs.append_layer() dfs = DirectoryFilesystem(self.path, case_sensitive=False) target.filesystems.add(dfs) diff --git a/dissect/target/plugins/filesystem/walkfs.py b/dissect/target/plugins/filesystem/walkfs.py index 626eec4df..ba22c0a2d 100644 --- a/dissect/target/plugins/filesystem/walkfs.py +++ b/dissect/target/plugins/filesystem/walkfs.py @@ -3,7 +3,7 @@ from dissect.util.ts import from_unix from dissect.target.exceptions import FileNotFoundError, UnsupportedPluginError -from dissect.target.filesystem import RootFilesystemEntry +from dissect.target.filesystem import LayerFilesystemEntry from dissect.target.helpers.fsutil import TargetPath from dissect.target.helpers.record import TargetRecordDescriptor from dissect.target.plugin import Plugin, export @@ -50,7 +50,7 @@ def generate_record(target: Target, path: TargetPath) -> FilesystemRecord: stat = path.lstat() btime = from_unix(stat.st_birthtime) if stat.st_birthtime else None entry = path.get() - if isinstance(entry, RootFilesystemEntry): + if isinstance(entry, LayerFilesystemEntry): fs_types = [sub_entry.fs.__type__ for sub_entry in entry.entries] else: fs_types = [entry.fs.__type__] diff --git a/dissect/target/plugins/os/unix/esxi/_os.py b/dissect/target/plugins/os/unix/esxi/_os.py index d79764244..c46b005ff 100644 --- a/dissect/target/plugins/os/unix/esxi/_os.py +++ b/dissect/target/plugins/os/unix/esxi/_os.py @@ -97,7 +97,7 @@ def create(cls, target: Target, sysvol: Filesystem) -> ESXiPlugin: # Create a root layer for the "local state" filesystem # This stores persistent configuration data - local_layer = target.fs.add_layer() + local_layer = target.fs.append_layer() # Mount all the visor tars in individual filesystem layers _mount_modules(target, sysvol, cfg) @@ -209,7 +209,7 @@ def _mount_modules(target: Target, sysvol: Filesystem, cfg: dict[str, str]): tfs = tar.TarFilesystem(cfile, tarinfo=vmtar.VisorTarInfo) if tfs: - target.fs.add_layer().mount("/", tfs) + target.fs.append_layer().mount("/", tfs) def _mount_local(target: Target, local_layer: VirtualFilesystem): diff --git a/dissect/target/plugins/os/unix/linux/debian/vyos/_os.py b/dissect/target/plugins/os/unix/linux/debian/vyos/_os.py index 745af912c..f090defb7 100644 --- a/dissect/target/plugins/os/unix/linux/debian/vyos/_os.py +++ b/dissect/target/plugins/os/unix/linux/debian/vyos/_os.py @@ -24,7 +24,7 @@ def __init__(self, target: Target): self._version, rootpath = latest # VyOS does some additional magic with base system files - layer = target.fs.add_layer() + layer = target.fs.append_layer() layer.map_file_entry("/", target.fs.root.get(f"/boot/{self._version}/{rootpath}")) super().__init__(target) diff --git a/dissect/target/plugins/os/unix/linux/fortios/_os.py b/dissect/target/plugins/os/unix/linux/fortios/_os.py index 1c8b006ef..026774b0f 100644 --- a/dissect/target/plugins/os/unix/linux/fortios/_os.py +++ b/dissect/target/plugins/os/unix/linux/fortios/_os.py @@ -113,7 +113,7 @@ def create(cls, target: Target, sysvol: Filesystem) -> FortiOSPlugin: # FortiGate if (datafs_tar := sysvol.path("/datafs.tar.gz")).exists(): - target.fs.add_layer().mount("/data", TarFilesystem(datafs_tar.open("rb"))) + target.fs.append_layer().mount("/data", TarFilesystem(datafs_tar.open("rb"))) # Additional FortiGate or FortiManager tars with corrupt XZ streams target.log.warning("Attempting to load XZ files, this can take a while.") @@ -127,11 +127,11 @@ def create(cls, target: Target, sysvol: Filesystem) -> FortiOSPlugin: ): if (tar := target.fs.path(path)).exists() or (tar := sysvol.path(path)).exists(): fh = xz.repair_checksum(tar.open("rb")) - target.fs.add_layer().mount("/", TarFilesystem(fh)) + target.fs.append_layer().mount("/", TarFilesystem(fh)) # FortiAnalyzer and FortiManager if (rootfs_ext_tar := sysvol.path("rootfs-ext.tar.xz")).exists(): - target.fs.add_layer().mount("/", TarFilesystem(rootfs_ext_tar.open("rb"))) + target.fs.append_layer().mount("/", TarFilesystem(rootfs_ext_tar.open("rb"))) # Filesystem mounts can be discovered in the FortiCare debug report # or using ``fnsysctl ls`` and ``fnsysctl df`` in the cli. diff --git a/dissect/target/tools/shell.py b/dissect/target/tools/shell.py index 2252ac813..2471cd490 100644 --- a/dissect/target/tools/shell.py +++ b/dissect/target/tools/shell.py @@ -31,7 +31,7 @@ RegistryValueNotFoundError, TargetError, ) -from dissect.target.filesystem import FilesystemEntry, RootFilesystemEntry +from dissect.target.filesystem import FilesystemEntry, LayerFilesystemEntry from dissect.target.helpers import cyber, fsutil, regutil from dissect.target.plugin import arg from dissect.target.target import Target @@ -469,7 +469,7 @@ def scandir(self, path: str, color: bool = False) -> list[tuple[fsutil.TargetPat # If we happen to scan an NTFS filesystem see if any of the # entries has an alternative data stream and also list them. entry = file_.get() - if isinstance(entry, RootFilesystemEntry): + if isinstance(entry, LayerFilesystemEntry): if entry.entries.fs.__type__ == "ntfs": attrs = entry.lattr() for data_stream in attrs.DATA: diff --git a/tests/_data/plugins/os/windows/catroot/catdb b/tests/_data/plugins/os/windows/catroot/catdb index cc3a9b4e0..5c5ec98d6 100644 Binary files a/tests/_data/plugins/os/windows/catroot/catdb and b/tests/_data/plugins/os/windows/catroot/catdb differ diff --git a/tests/_data/plugins/os/windows/catroot/catroot_file_hint.cat b/tests/_data/plugins/os/windows/catroot/catroot_file_hint.cat index 205dcaee6..a6237ed3d 100644 Binary files a/tests/_data/plugins/os/windows/catroot/catroot_file_hint.cat and b/tests/_data/plugins/os/windows/catroot/catroot_file_hint.cat differ diff --git a/tests/_data/plugins/os/windows/catroot/catroot_package_name.cat b/tests/_data/plugins/os/windows/catroot/catroot_package_name.cat index 2f3aea615..35bacea69 100644 Binary files a/tests/_data/plugins/os/windows/catroot/catroot_package_name.cat and b/tests/_data/plugins/os/windows/catroot/catroot_package_name.cat differ diff --git a/tests/_data/plugins/os/windows/catroot/catroot_package_name_2.cat b/tests/_data/plugins/os/windows/catroot/catroot_package_name_2.cat index 167dbff79..312bf976c 100644 Binary files a/tests/_data/plugins/os/windows/catroot/catroot_package_name_2.cat and b/tests/_data/plugins/os/windows/catroot/catroot_package_name_2.cat differ diff --git a/tests/test_filesystem.py b/tests/test_filesystem.py index df9764b3b..97aa3f79f 100644 --- a/tests/test_filesystem.py +++ b/tests/test_filesystem.py @@ -20,6 +20,7 @@ ) from dissect.target.filesystem import ( FilesystemEntry, + LayerFilesystem, MappedFile, NotASymlinkError, RootFilesystem, @@ -98,10 +99,10 @@ def test_symlink_across_layers(target_bare: Target) -> None: vfs2 = VirtualFilesystem() target_dir = vfs2.makedirs("/path/to/target") - layer1 = target_bare.fs.add_layer() + layer1 = target_bare.fs.append_layer() layer1.mount("/", vfs1) - layer2 = target_bare.fs.add_layer() + layer2 = target_bare.fs.append_layer() layer2.mount("/", vfs2) target_entry = target_bare.fs.get("/path/to/symlink/target").readlink_ext() @@ -117,10 +118,10 @@ def test_symlink_files_across_layers(target_bare: Target) -> None: vfs2 = VirtualFilesystem() target_dir = vfs2.makedirs("/path/to/target/derp") - layer1 = target_bare.fs.add_layer() + layer1 = target_bare.fs.append_layer() layer1.mount("/", vfs1) - layer2 = target_bare.fs.add_layer() + layer2 = target_bare.fs.append_layer() layer2.mount("/", vfs2) target_entry = target_bare.fs.get("/path/to/symlink/target/derp") @@ -138,10 +139,10 @@ def test_symlink_to_symlink_across_layers(target_bare: Target) -> None: vfs2 = VirtualFilesystem() vfs2.symlink("../target", "/path/to/target") - layer1 = target_bare.fs.add_layer() + layer1 = target_bare.fs.append_layer() layer1.mount("/", vfs1) - layer2 = target_bare.fs.add_layer() + layer2 = target_bare.fs.append_layer() layer2.mount("/", vfs2) target_entry = target_bare.fs.get("/path/to/symlink/target/").readlink_ext() @@ -157,10 +158,10 @@ def test_recursive_symlink_across_layers(target_bare: Target) -> None: vfs2 = VirtualFilesystem() vfs2.symlink("symlink/target", "/path/to/target") - layer1 = target_bare.fs.add_layer() + layer1 = target_bare.fs.append_layer() layer1.mount("/", vfs1) - layer2 = target_bare.fs.add_layer() + layer2 = target_bare.fs.append_layer() layer2.mount("/", vfs2) with pytest.raises(SymlinkRecursionError): @@ -178,13 +179,13 @@ def test_symlink_across_3_layers(target_bare: Target) -> None: vfs3 = VirtualFilesystem() target_dir = vfs3.makedirs("/path/target") - layer1 = target_bare.fs.add_layer() + layer1 = target_bare.fs.append_layer() layer1.mount("/", vfs1) - layer2 = target_bare.fs.add_layer() + layer2 = target_bare.fs.append_layer() layer2.mount("/", vfs2) - layer3 = target_bare.fs.add_layer() + layer3 = target_bare.fs.append_layer() layer3.mount("/", vfs3) target_entry = target_bare.fs.get("/path/to/symlink/target/").readlink_ext() @@ -203,10 +204,10 @@ def test_recursive_symlink_open_across_layers(target_bare: Target) -> None: vfs2 = VirtualFilesystem() vfs2.symlink("symlink/target", "/path/to/target") - layer1 = target_bare.fs.add_layer() + layer1 = target_bare.fs.append_layer() layer1.mount("/", vfs1) - layer2 = target_bare.fs.add_layer() + layer2 = target_bare.fs.append_layer() layer2.mount("/", vfs2) with pytest.raises(SymlinkRecursionError): @@ -797,7 +798,7 @@ def top_virt_dir() -> VirtualDirectory: return VirtualDirectory(Mock(), "") -def test_virutal_directory_stat(virt_dir: VirtualDirectory, top_virt_dir: VirtualDirectory) -> None: +def test_virtual_directory_stat(virt_dir: VirtualDirectory, top_virt_dir: VirtualDirectory) -> None: assert virt_dir.stat(follow_symlinks=False) == virt_dir._stat() assert virt_dir.stat(follow_symlinks=True) == virt_dir._stat() @@ -806,7 +807,7 @@ def test_virutal_directory_stat(virt_dir: VirtualDirectory, top_virt_dir: Virtua assert virt_dir.stat(follow_symlinks=True) == top_virt_dir.stat(follow_symlinks=True) -def test_virutal_directory_lstat(virt_dir: VirtualDirectory, top_virt_dir: VirtualDirectory) -> None: +def test_virtual_directory_lstat(virt_dir: VirtualDirectory, top_virt_dir: VirtualDirectory) -> None: assert virt_dir.lstat() == virt_dir._stat() assert virt_dir.lstat() == virt_dir.stat(follow_symlinks=False) assert virt_dir.lstat().st_mode == stat.S_IFDIR @@ -815,12 +816,12 @@ def test_virutal_directory_lstat(virt_dir: VirtualDirectory, top_virt_dir: Virtu assert virt_dir.lstat() == top_virt_dir.lstat() -def test_virutal_directory_is_dir(virt_dir: VirtualDirectory) -> None: +def test_virtual_directory_is_dir(virt_dir: VirtualDirectory) -> None: assert virt_dir.is_dir(follow_symlinks=True) assert virt_dir.is_dir(follow_symlinks=False) -def test_virutal_directory_is_file(virt_dir: VirtualDirectory) -> None: +def test_virtual_directory_is_file(virt_dir: VirtualDirectory) -> None: assert not virt_dir.is_file(follow_symlinks=True) assert not virt_dir.is_file(follow_symlinks=False) @@ -830,21 +831,21 @@ def virt_file() -> VirtualFile: return VirtualFile(Mock(), "", Mock()) -def test_virutal_file_stat(virt_file: VirtualFile) -> None: +def test_virtual_file_stat(virt_file: VirtualFile) -> None: assert virt_file.stat(follow_symlinks=False) == virt_file.lstat() assert virt_file.stat(follow_symlinks=True) == virt_file.lstat() -def test_virutal_file_lstat(virt_file: VirtualFile) -> None: +def test_virtual_file_lstat(virt_file: VirtualFile) -> None: assert virt_file.lstat().st_mode == stat.S_IFREG -def test_virutal_file_is_dir(virt_file: VirtualFile) -> None: +def test_virtual_file_is_dir(virt_file: VirtualFile) -> None: assert not virt_file.is_dir(follow_symlinks=True) assert not virt_file.is_dir(follow_symlinks=False) -def test_virutal_file_is_file(virt_file: VirtualFile) -> None: +def test_virtual_file_is_file(virt_file: VirtualFile) -> None: assert virt_file.is_file(follow_symlinks=True) assert virt_file.is_file(follow_symlinks=False) @@ -1164,3 +1165,62 @@ def test_virtual_filesystem_map_file_from_tar() -> None: stat = mock_fs.path("/var/example/test.txt").stat() assert ts.from_unix(stat.st_mtime) == datetime(2021, 12, 6, 9, 51, 40, tzinfo=timezone.utc) + + +def test_layer_filesystem() -> None: + lfs = LayerFilesystem() + + vfs1 = VirtualFilesystem() + vfs1.map_file_fh("file1", BytesIO(b"value1")) + + vfs2 = VirtualFilesystem() + vfs2.map_file_fh("file2", BytesIO(b"value2")) + + vfs3 = VirtualFilesystem() + vfs3.map_file_fh("file3", BytesIO(b"value3")) + + vfs4 = VirtualFilesystem() + vfs4.map_file_fh("file1", BytesIO(b"value4")) + + lfs.append_fs_layer(vfs1) + assert lfs.path("file1").read_text() == "value1" + + lfs.append_fs_layer(vfs2) + assert lfs.path("file1").read_text() == "value1" + assert lfs.path("file2").read_text() == "value2" + + lfs.append_fs_layer(vfs3) + assert lfs.path("file1").read_text() == "value1" + assert lfs.path("file2").read_text() == "value2" + assert lfs.path("file3").read_text() == "value3" + + lfs.append_fs_layer(vfs4) + assert lfs.path("file1").read_text() == "value4" + lfs.remove_fs_layer(vfs4) + lfs.prepend_fs_layer(vfs4) + assert lfs.path("file1").read_text() == "value1" + + +def test_layer_filesystem_mount() -> None: + lfs = LayerFilesystem() + + vfs1 = VirtualFilesystem() + vfs1.map_file_fh("file1", BytesIO(b"value1")) + + vfs2 = VirtualFilesystem() + vfs2.map_file_fh("file2", BytesIO(b"value2")) + + lfs.mount("/vfs", vfs1) + lfs.mount("/vfs", vfs2, ignore_existing=True) + + assert lfs.listdir("/vfs") == ["file2"] + + lfs.mount("/vfs", vfs1, ignore_existing=True) + + assert lfs.listdir("/vfs") == ["file1"] + + lfs.mount("/vfs", vfs2, ignore_existing=False) + + assert sorted(lfs.listdir("/vfs")) == ["file1", "file2"] + assert lfs.path("/vfs/file1").read_text() == "value1" + assert lfs.path("/vfs/file2").read_text() == "value2"