diff --git a/README.md b/README.md index b3dd4774f..2d648f429 100644 --- a/README.md +++ b/README.md @@ -58,16 +58,19 @@ Opening a shell on a target is straight-forward. You can do so by specifying a p ```bash target-shell targets/EXAMPLE.vmx - EXAMPLE /> help + WIN-EXAMPLE:/$ help Documented commands (type help ): ======================================== - cat disks filesystems help less python save - cd exit find hexdump ls readlink stat - clear file hash info pwd registry volumes + attr cls enter find info man registry volumes + cat cyber exit hash less pwd save zcat + cd debug file help ll python stat zless + clear disks filesystems hexdump ls readlink tree - EXAMPLE /> ls + WIN-EXAMPLE:/$ ls + $fs$ c: + efi sysvol ``` diff --git a/dissect/target/filesystems/extfs.py b/dissect/target/filesystems/extfs.py index 424756aa2..2877fe921 100644 --- a/dissect/target/filesystems/extfs.py +++ b/dissect/target/filesystems/extfs.py @@ -134,6 +134,10 @@ def lstat(self) -> fsutil.stat_result: st_info.st_mtime_ns = self.entry.mtime_ns st_info.st_ctime_ns = self.entry.ctime_ns + # Set blocks + st_info.st_blocks = self.entry.inode.i_blocks_lo + st_info.st_blksize = self.entry.extfs.block_size + return st_info def attr(self) -> Any: diff --git a/dissect/target/loaders/tar.py b/dissect/target/loaders/tar.py index 549f7a5ca..092cd1a31 100644 --- a/dissect/target/loaders/tar.py +++ b/dissect/target/loaders/tar.py @@ -1,8 +1,9 @@ +from __future__ import annotations + import logging import re import tarfile from pathlib import Path -from typing import Union from dissect.target import filesystem, target from dissect.target.filesystems.tar import ( @@ -21,22 +22,25 @@ class TarLoader(Loader): """Load tar files.""" - def __init__(self, path: Union[Path, str], **kwargs): + def __init__(self, path: Path | str, **kwargs): super().__init__(path) + if isinstance(path, str): + path = Path(path) + if self.is_compressed(path): log.warning( f"Tar file {path!r} is compressed, which will affect performance. " "Consider uncompressing the archive before passing the tar file to Dissect." ) - self.tar = tarfile.open(path) + self.tar = tarfile.open(fileobj=path.open("rb")) @staticmethod def detect(path: Path) -> bool: return path.name.lower().endswith((".tar", ".tar.gz", ".tgz")) - def is_compressed(self, path: Union[Path, str]) -> bool: + def is_compressed(self, path: Path | str) -> bool: return str(path).lower().endswith((".tar.gz", ".tgz")) def map(self, target: target.Target) -> None: diff --git a/dissect/target/loaders/velociraptor.py b/dissect/target/loaders/velociraptor.py index 68737cf7f..572402ba3 100644 --- a/dissect/target/loaders/velociraptor.py +++ b/dissect/target/loaders/velociraptor.py @@ -3,7 +3,7 @@ import logging import zipfile from pathlib import Path -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING from dissect.target.loaders.dir import DirLoader, find_dirs, map_dirs from dissect.target.plugin import OperatingSystem @@ -18,7 +18,7 @@ WINDOWS_ACCESSORS = ["mft", "ntfs", "lazy_ntfs", "ntfs_vss", "auto"] -def find_fs_directories(path: Path) -> tuple[Optional[OperatingSystem], Optional[list[Path]]]: +def find_fs_directories(path: Path) -> tuple[OperatingSystem | None, list[Path] | None]: fs_root = path.joinpath(FILESYSTEMS_ROOT) # Unix @@ -56,7 +56,7 @@ def find_fs_directories(path: Path) -> tuple[Optional[OperatingSystem], Optional return None, None -def extract_drive_letter(name: str) -> Optional[str]: +def extract_drive_letter(name: str) -> str | None: # \\.\X: in URL encoding if len(name) == 14 and name.startswith("%5C%5C.%5C") and name.endswith("%3A"): return name[10].lower() @@ -91,7 +91,7 @@ def __init__(self, path: Path, **kwargs): f"Velociraptor target {path!r} is compressed, which will slightly affect performance. " "Consider uncompressing the archive and passing the uncompressed folder to Dissect." ) - self.root = zipfile.Path(path) + self.root = zipfile.Path(path.open("rb")) else: self.root = path @@ -105,8 +105,8 @@ def detect(path: Path) -> bool: # results/ # uploads.json # [...] other files related to the collection - if path.suffix == ".zip": # novermin - path = zipfile.Path(path) + if path.exists() and path.suffix == ".zip": # novermin + path = zipfile.Path(path.open("rb")) if path.joinpath(FILESYSTEMS_ROOT).exists() and path.joinpath("uploads.json").exists(): _, dirs = find_fs_directories(path) diff --git a/dissect/target/plugin.py b/dissect/target/plugin.py index dfd08d1ad..76d91dc3d 100644 --- a/dissect/target/plugin.py +++ b/dissect/target/plugin.py @@ -2,9 +2,11 @@ See dissect/target/plugins/general/example.py for an example plugin. """ + from __future__ import annotations import fnmatch +import functools import importlib import importlib.util import inspect @@ -196,6 +198,8 @@ class attribute. Namespacing results in your plugin needing to be prefixed The :func:`internal` decorator and :class:`InternalPlugin` set the ``__internal__`` attribute. Finally. :func:`args` decorator sets the ``__args__`` attribute. + The :func:`alias` decorator populates the ``__aliases__`` private attribute of :class:`Plugin` methods. + Args: target: The :class:`~dissect.target.target.Target` object to load the plugin for. """ @@ -448,6 +452,11 @@ def register(plugincls: Type[Plugin]) -> None: exports = [] functions = [] + # First pass to resolve aliases + for attr in get_nonprivate_attributes(plugincls): + for alias in getattr(attr, "__aliases__", []): + clone_alias(plugincls, attr, alias) + for attr in get_nonprivate_attributes(plugincls): if isinstance(attr, property): attr = attr.fget @@ -542,6 +551,47 @@ def decorator(obj): return decorator +def alias(*args, **kwargs: dict[str, Any]) -> Callable: + """Decorator to be used on :class:`Plugin` functions to register an alias of that function.""" + + if not kwargs.get("name") and not args: + raise ValueError("Missing argument 'name'") + + def decorator(obj: Callable) -> Callable: + if not hasattr(obj, "__aliases__"): + obj.__aliases__ = [] + + if name := (kwargs.get("name") or args[0]): + obj.__aliases__.append(name) + + return obj + + return decorator + + +def clone_alias(cls: type, attr: Callable, alias: str) -> None: + """Clone the given attribute to an alias in the provided class.""" + + # Clone the function object + clone = type(attr)(attr.__code__, attr.__globals__, alias, attr.__defaults__, attr.__closure__) + clone.__kwdefaults__ = attr.__kwdefaults__ + + # Copy some attributes + functools.update_wrapper(clone, attr) + if wrapped := getattr(attr, "__wrapped__", None): + # update_wrapper sets a new wrapper, we want the original + clone.__wrapped__ = wrapped + + # Update module path so we can fool inspect.getmodule with subclassed Plugin classes + clone.__module__ = cls.__module__ + + # Update the names + clone.__name__ = alias + clone.__qualname__ = f"{cls.__name__}.{alias}" + + setattr(cls, alias, clone) + + def plugins( osfilter: Optional[type[OSPlugin]] = None, special_keys: set[str] = set(), diff --git a/dissect/target/plugins/os/unix/history.py b/dissect/target/plugins/os/unix/history.py index 2798cdc98..7c4b96559 100644 --- a/dissect/target/plugins/os/unix/history.py +++ b/dissect/target/plugins/os/unix/history.py @@ -8,7 +8,7 @@ from dissect.target.helpers.descriptor_extensions import UserRecordDescriptorExtension from dissect.target.helpers.fsutil import TargetPath from dissect.target.helpers.record import UnixUserRecord, create_extended_descriptor -from dissect.target.plugin import Plugin, export, internal +from dissect.target.plugin import Plugin, alias, export, internal CommandHistoryRecord = create_extended_descriptor([UserRecordDescriptorExtension])( "unix/history", @@ -36,6 +36,7 @@ class CommandHistoryPlugin(Plugin): ("sqlite", ".sqlite_history"), ("zsh", ".zsh_history"), ("ash", ".ash_history"), + ("dissect", ".dissect_history"), # wow so meta ) def __init__(self, target: Target): @@ -56,12 +57,7 @@ def _find_history_files(self) -> List[Tuple[str, TargetPath, UnixUserRecord]]: history_files.append((shell, history_path, user_details.user)) return history_files - @export(record=CommandHistoryRecord) - def bashhistory(self): - """Deprecated, use commandhistory function.""" - self.target.log.warn("Function 'bashhistory' is deprecated, use the 'commandhistory' function instead.") - return self.commandhistory() - + @alias("bashhistory") @export(record=CommandHistoryRecord) def commandhistory(self): """Return shell history for all users. diff --git a/dissect/target/target.py b/dissect/target/target.py index 1b732a322..73a44c86b 100644 --- a/dissect/target/target.py +++ b/dissect/target/target.py @@ -87,7 +87,7 @@ def __init__(self, path: Union[str, Path] = None): self._applied = False try: - self._config = config.load([self.path, os.getcwd()]) + self._config = config.load([self.path, Path.cwd(), Path.home()]) except Exception as e: self.log.warning("Error loading config file: %s", self.path) self.log.debug("", exc_info=e) diff --git a/dissect/target/tools/fs.py b/dissect/target/tools/fs.py index a9ba4bd5e..f14d9d8a9 100644 --- a/dissect/target/tools/fs.py +++ b/dissect/target/tools/fs.py @@ -2,9 +2,7 @@ # -*- coding: utf-8 -*- import argparse -import datetime import logging -import operator import os import pathlib import shutil @@ -13,7 +11,7 @@ from dissect.target import Target from dissect.target.exceptions import TargetError from dissect.target.helpers.fsutil import TargetPath -from dissect.target.tools.shell import stat_modestr +from dissect.target.tools.fsutils import print_ls, print_stat from dissect.target.tools.utils import ( catch_sigpipe, configure_generic_arguments, @@ -25,11 +23,6 @@ logging.raiseExceptions = False -def human_size(bytes: int, units: list[str] = ["", "K", "M", "G", "T", "P", "E"]) -> str: - """Helper function to return the human readable string representation of bytes.""" - return str(bytes) + units[0] if bytes < 1024 else human_size(bytes >> 10, units[1:]) - - def ls(t: Target, path: TargetPath, args: argparse.Namespace) -> None: if args.use_ctime and args.use_atime: log.error("Can't specify -c and -u at the same time") @@ -37,63 +30,20 @@ def ls(t: Target, path: TargetPath, args: argparse.Namespace) -> None: if not path or not path.exists(): return - _print_ls(args, path, 0) - - -def _print_ls(args: argparse.Namespace, path: TargetPath, depth: int) -> None: - subdirs = [] - - if path.is_dir(): - contents = sorted(path.iterdir(), key=operator.attrgetter("name")) - elif path.is_file(): - contents = [path] - - if depth > 0: - print(f"\n{str(path)}:") - - if not args.l: - for entry in contents: - print(entry.name) - - if entry.is_dir(): - subdirs.append(entry) - else: - if len(contents) > 1: - print(f"total {len(contents)}") - - for entry in contents: - _print_extensive_file_stat(args, entry, entry.name) - - if entry.is_dir(): - subdirs.append(entry) - - if args.recursive and subdirs: - for subdir in subdirs: - _print_ls(args, subdir, depth + 1) - - -def _print_extensive_file_stat(args: argparse.Namespace, path: TargetPath, name: str) -> None: - try: - entry = path.get() - stat = entry.lstat() - symlink = f" -> {entry.readlink()}" if entry.is_symlink() else "" - show_time = stat.st_mtime - - if args.use_ctime: - show_time = stat.st_ctime - elif args.use_atime: - show_time = stat.st_atime - - utc_time = datetime.datetime.utcfromtimestamp(show_time).isoformat() - - if args.human_readable: - size = human_size(stat.st_size) - else: - size = stat.st_size - - print(f"{stat_modestr(stat)} {stat.st_uid:4d} {stat.st_gid:4d} {size:>6s} {utc_time} {name}{symlink}") - except FileNotFoundError: - print(f"?????????? ? ? ? ????-??-??T??:??:??.?????? {name}") + # Only output with colors if stdout is a tty + use_colors = sys.stdout.buffer.isatty() + + print_ls( + path, + 0, + sys.stdout, + args.l, + args.human_readable, + args.recursive, + args.use_ctime, + args.use_atime, + use_colors, + ) def cat(t: Target, path: TargetPath, args: argparse.Namespace) -> None: @@ -120,6 +70,12 @@ def cp(t: Target, path: TargetPath, args: argparse.Namespace) -> None: print("[!] Failed, unsuported file type: %s" % path) +def stat(t: Target, path: TargetPath, args: argparse.Namespace) -> None: + if not path or not path.exists(): + return + print_stat(path, sys.stdout, args.dereference) + + def _extract_path(path: TargetPath, output_path: str) -> None: print("%s -> %s" % (path, output_path)) @@ -172,6 +128,10 @@ def main() -> None: parser_cat = subparsers.add_parser("cat", help="dump file contents", parents=[baseparser]) parser_cat.set_defaults(handler=cat) + parser_stat = subparsers.add_parser("stat", help="display file status", parents=[baseparser]) + parser_stat.add_argument("-L", "--dereference", action="store_true") + parser_stat.set_defaults(handler=stat) + parser_find = subparsers.add_parser("walk", help="perform a walk", parents=[baseparser]) parser_find.set_defaults(handler=walk) diff --git a/dissect/target/tools/fsutils.py b/dissect/target/tools/fsutils.py new file mode 100644 index 000000000..c44bb634e --- /dev/null +++ b/dissect/target/tools/fsutils.py @@ -0,0 +1,243 @@ +from __future__ import annotations + +import os +import stat +from datetime import datetime, timezone +from typing import TextIO + +from dissect.target.exceptions import FileNotFoundError +from dissect.target.filesystem import FilesystemEntry, LayerFilesystemEntry +from dissect.target.helpers import fsutil +from dissect.target.helpers.fsutil import TargetPath + +# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime'] +STAT_TEMPLATE = """ File: {path} {symlink} + Size: {size} Blocks: {blocks} IO Block: {blksize} {filetype} +Device: {device} Inode: {inode} Links: {nlink} +Access: ({modeord}/{modestr}) Uid: ( {uid} ) Gid: ( {gid} ) +Access: {atime} +Modify: {mtime} +Change: {ctime} + Birth: {btime}""" + +FALLBACK_LS_COLORS = "rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=00:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32" # noqa: E501 + + +def prepare_ls_colors() -> dict[str, str]: + """Parse the LS_COLORS environment variable so we can use it later.""" + d = {} + ls_colors = os.environ.get("LS_COLORS", FALLBACK_LS_COLORS) + for line in ls_colors.split(":"): + if not line: + continue + + ft, _, value = line.partition("=") + if ft.startswith("*"): + ft = ft[1:] + + d[ft] = f"\x1b[{value}m{{}}\x1b[0m" + + return d + + +LS_COLORS = prepare_ls_colors() + + +def fmt_ls_colors(ft: str, name: str) -> str: + """Helper method to colorize strings according to LS_COLORS.""" + try: + return LS_COLORS[ft].format(name) + except KeyError: + pass + + try: + return LS_COLORS[fsutil.splitext(name)[1]].format(name) + except KeyError: + pass + + return name + + +def human_size(bytes: int, units: list[str] = ["", "K", "M", "G", "T", "P", "E"]) -> str: + """Helper function to return the human readable string representation of bytes.""" + return str(bytes) + units[0] if bytes < 1024 else human_size(bytes >> 10, units[1:]) + + +def stat_modestr(st: fsutil.stat_result) -> str: + """Helper method for generating a mode string from a numerical mode value.""" + return stat.filemode(st.st_mode) + + +def print_extensive_file_stat_listing( + stdout: TextIO, + name: str, + entry: FilesystemEntry | None = None, + timestamp: datetime | None = None, + human_readable: bool = False, +) -> None: + """Print the file status as a single line.""" + if entry is not None: + try: + entry_stat = entry.lstat() + if timestamp is None: + timestamp = entry_stat.st_mtime + symlink = f" -> {entry.readlink()}" if entry.is_symlink() else "" + utc_time = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat(timespec="microseconds") + size = f"{human_size(entry_stat.st_size):5s}" if human_readable else f"{entry_stat.st_size:10d}" + + print( + ( + f"{stat_modestr(entry_stat)} {entry_stat.st_uid:4d} {entry_stat.st_gid:4d} {size} " + f"{utc_time} {name}{symlink}" + ), + file=stdout, + ) + return + except FileNotFoundError: + pass + + hr_spaces = f"{'':5s}" if human_readable else " " + regular_spaces = f"{'':10s}" if not human_readable else " " + + print(f"?????????? ? ?{regular_spaces}?{hr_spaces}????-??-??T??:??:??.??????+??:?? {name}", file=stdout) + + +def ls_scandir(path: fsutil.TargetPath, color: bool = False) -> list[tuple[fsutil.TargetPath, str]]: + """List a directory for the given path.""" + result = [] + if not path.exists() or not path.is_dir(): + return [] + + for file_ in path.iterdir(): + file_type = None + if color: + if file_.is_symlink(): + file_type = "ln" + elif file_.is_dir(): + file_type = "di" + elif file_.is_file(): + file_type = "fi" + + result.append((file_, fmt_ls_colors(file_type, file_.name) if color else file_.name)) + + # If we happen to scan an NTFS filesystem see if any of the + # entries has an alternative data stream and also list them. + entry = file_.get() + if isinstance(entry, LayerFilesystemEntry): + if entry.entries.fs.__type__ == "ntfs": + attrs = entry.lattr() + for data_stream in attrs.DATA: + if data_stream.name != "": + name = f"{file_.name}:{data_stream.name}" + result.append((file_, fmt_ls_colors(file_type, name) if color else name)) + + result.sort(key=lambda e: e[0].name) + + return result + + +def print_ls( + path: fsutil.TargetPath, + depth: int, + stdout: TextIO, + long_listing: bool = False, + human_readable: bool = False, + recursive: bool = False, + use_ctime: bool = False, + use_atime: bool = False, + color: bool = True, +) -> None: + """Print ls output""" + subdirs = [] + + if path.is_dir(): + contents = ls_scandir(path, color) + elif path.is_file(): + contents = [(path, path.name)] + + if depth > 0: + print(f"\n{str(path)}:", file=stdout) + + if not long_listing: + for target_path, name in contents: + print(name, file=stdout) + if target_path.is_dir(): + subdirs.append(target_path) + else: + if len(contents) > 1: + print(f"total {len(contents)}", file=stdout) + for target_path, name in contents: + try: + entry = target_path.get() + entry_stat = entry.lstat() + show_time = entry_stat.st_mtime + if use_ctime: + show_time = entry_stat.st_ctime + elif use_atime: + show_time = entry_stat.st_atime + except FileNotFoundError: + entry = None + show_time = None + print_extensive_file_stat_listing(stdout, name, entry, show_time, human_readable) + if target_path.is_dir(): + subdirs.append(target_path) + + if recursive and subdirs: + for subdir in subdirs: + print_ls(subdir, depth + 1, stdout, long_listing, human_readable, recursive, use_ctime, use_atime, color) + + +def print_stat(path: fsutil.TargetPath, stdout: TextIO, dereference: bool = False) -> None: + """Print file status.""" + symlink = f"-> {path.readlink()}" if path.is_symlink() else "" + s = path.stat() if dereference else path.lstat() + + def filetype(path: TargetPath) -> str: + if path.is_dir(): + return "directory" + elif path.is_symlink(): + return "symbolic link" + elif path.is_file(): + return "regular file" + + res = STAT_TEMPLATE.format( + path=path, + symlink=symlink, + size=s.st_size, + filetype=filetype(path), + device="?", + inode=s.st_ino, + blocks=s.st_blocks or "?", + blksize=s.st_blksize or "?", + nlink=s.st_nlink, + modeord=oct(stat.S_IMODE(s.st_mode)), + modestr=stat_modestr(s), + uid=s.st_uid, + gid=s.st_gid, + atime=datetime.fromtimestamp(s.st_atime, tz=timezone.utc).isoformat(timespec="microseconds"), + mtime=datetime.fromtimestamp(s.st_mtime, tz=timezone.utc).isoformat(timespec="microseconds"), + ctime=datetime.fromtimestamp(s.st_ctime, tz=timezone.utc).isoformat(timespec="microseconds"), + btime=datetime.fromtimestamp(s.st_birthtime, tz=timezone.utc).isoformat(timespec="microseconds") + if hasattr(s, "st_birthtime") and s.st_birthtime + else "?", + ) + print(res, file=stdout) + + try: + if (xattr := path.get().attr()) and isinstance(xattr, list) and hasattr(xattr[0], "name"): + print(" Attr:") + print_xattr(path.name, xattr, stdout) + except Exception: + pass + + +def print_xattr(basename: str, xattr: list, stdout: TextIO) -> None: + """Mimics getfattr -d {file} behaviour.""" + if not hasattr(xattr[0], "name"): + return + + XATTR_TEMPLATE = "# file: {basename}\n{attrs}" + res = XATTR_TEMPLATE.format( + basename=basename, attrs="\n".join([f'{attr.name}="{attr.value.decode()}"' for attr in xattr]) + ) + print(res, file=stdout) diff --git a/dissect/target/tools/info.py b/dissect/target/tools/info.py index 5fda3470a..e105d2df3 100644 --- a/dissect/target/tools/info.py +++ b/dissect/target/tools/info.py @@ -4,6 +4,7 @@ import argparse import json import logging +from datetime import datetime from pathlib import Path from typing import Union @@ -138,10 +139,13 @@ def print_target_info(target: Target) -> None: if isinstance(value, list): value = ", ".join(value) + if isinstance(value, datetime): + value = value.isoformat(timespec="microseconds") + if name == "hostname": print() - print(f"{name.capitalize().replace('_', ' ')}" + (14 - len(name)) * " " + f" : {value}") + print(f"{name.capitalize().replace('_', ' '):14s} : {value}") def get_disks_info(target: Target) -> list[dict[str, Union[str, int]]]: diff --git a/dissect/target/tools/shell.py b/dissect/target/tools/shell.py index 29b06781d..175f960fc 100644 --- a/dissect/target/tools/shell.py +++ b/dissect/target/tools/shell.py @@ -1,7 +1,8 @@ +from __future__ import annotations + import argparse import cmd import contextlib -import datetime import fnmatch import io import itertools @@ -13,27 +14,33 @@ import re import shlex import shutil -import stat import subprocess import sys from contextlib import contextmanager -from typing import Any, BinaryIO, Callable, Iterator, Optional, TextIO, Union +from datetime import datetime, timedelta, timezone +from typing import Any, BinaryIO, Callable, Iterator, TextIO from dissect.cstruct import hexdump from flow.record import RecordOutput from dissect.target.exceptions import ( - FileNotFoundError, PluginError, RegistryError, RegistryKeyNotFoundError, RegistryValueNotFoundError, TargetError, ) -from dissect.target.filesystem import FilesystemEntry, LayerFilesystemEntry +from dissect.target.filesystem import FilesystemEntry from dissect.target.helpers import cyber, fsutil, regutil -from dissect.target.plugin import PluginFunction, arg +from dissect.target.plugin import PluginFunction, alias, arg, clone_alias from dissect.target.target import Target +from dissect.target.tools.fsutils import ( + fmt_ls_colors, + ls_scandir, + print_ls, + print_stat, + print_xattr, +) from dissect.target.tools.info import print_target_info from dissect.target.tools.utils import ( args_to_uri, @@ -52,47 +59,23 @@ try: import readline - # remove `-` as an autocomplete delimeter on Linux + # remove `-`, `$` and `{` as an autocomplete delimeter on Linux # https://stackoverflow.com/questions/27288340/python-cmd-on-linux-does-not-autocomplete-special-characters-or-symbols - readline.set_completer_delims(readline.get_completer_delims().replace("-", "").replace("$", "")) + readline.set_completer_delims(readline.get_completer_delims().replace("-", "").replace("$", "").replace("{", "")) + + # Fix autocomplete on macOS + # https://stackoverflow.com/a/7116997 + if "libedit" in readline.__doc__: + readline.parse_and_bind("bind ^I rl_complete") + else: + readline.parse_and_bind("tab: complete") except ImportError: # Readline is not available on Windows log.warning("Readline module is not available") readline = None -# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime'] -STAT_TEMPLATE = """ File: {path} {symlink} - Size: {size} {filetype} - Inode: {inode} Links: {nlink} -Access: ({modeord}/{modestr}) Uid: ( {uid} ) Gid: ( {gid} ) -Access: {atime} -Modify: {mtime} -Change: {ctime}""" - -FALLBACK_LS_COLORS = "rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=00:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32" # noqa: E501 - - -def prepare_ls_colors() -> dict[str, str]: - """Parse the LS_COLORS environment variable so we can use it later.""" - d = {} - ls_colors = os.environ.get("LS_COLORS", FALLBACK_LS_COLORS) - for line in ls_colors.split(":"): - if not line: - continue - - ft, _, value = line.partition("=") - if ft.startswith("*"): - ft = ft[1:] - - d[ft] = f"\x1b[{value}m{{}}\x1b[0m" - return d - - -LS_COLORS = prepare_ls_colors() - - -class TargetCmd(cmd.Cmd): +class ExtendedCmd(cmd.Cmd): """Subclassed cmd.Cmd to provide some additional features. Add new simple commands by implementing: @@ -112,49 +95,25 @@ class TargetCmd(cmd.Cmd): CMD_PREFIX = "cmd_" - DEFAULT_HISTFILE = "~/.dissect_history" - DEFAULT_HISTFILESIZE = 10_000 - DEFAULT_HISTDIR = None - DEFAULT_HISTDIRFMT = ".dissect_history_{uid}_{target}" - - def __init__(self, target: Target): + def __init__(self, cyber: bool = False): cmd.Cmd.__init__(self) - self.target = target self.debug = False + self.cyber = cyber self.identchars += "." - self.histfilesize = getattr(target._config, "HISTFILESIZE", self.DEFAULT_HISTFILESIZE) - self.histdir = getattr(target._config, "HISTDIR", self.DEFAULT_HISTDIR) - - if self.histdir: - self.histdirfmt = getattr(target._config, "HISTDIRFMT", self.DEFAULT_HISTDIRFMT) - self.histfile = pathlib.Path(self.histdir).resolve() / pathlib.Path( - self.histdirfmt.format(uid=os.getuid(), target=target.name) - ) - else: - self.histfile = pathlib.Path(getattr(target._config, "HISTFILE", self.DEFAULT_HISTFILE)).expanduser() - - def preloop(self) -> None: - if readline and self.histfile.exists(): - try: - readline.read_history_file(self.histfile) - except Exception as e: - log.debug("Error reading history file: %s", e) - - def postloop(self) -> None: - if readline: - readline.set_history_length(self.histfilesize) - try: - readline.write_history_file(self.histfile) - except Exception as e: - log.debug("Error writing history file: %s", e) + self.register_aliases() def __getattr__(self, attr: str) -> Any: if attr.startswith("help_"): _, _, command = attr.partition("_") + + def print_help(command: str, func: Callable) -> None: + parser = generate_argparse_for_bound_method(func, usage_tmpl=f"{command} {{usage}}") + parser.print_help() + try: func = getattr(self, self.CMD_PREFIX + command) - return lambda: print(func.__doc__) + return lambda: print_help(command, func) except AttributeError: pass @@ -164,6 +123,16 @@ def __getattr__(self, attr: str) -> Any: def check_compatible(target: Target) -> bool: return True + def register_aliases(self) -> None: + for name in self.get_names(): + if name.startswith(self.CMD_PREFIX): + func = getattr(self.__class__, name) + for alias_name in getattr(func, "__aliases__", []): + if not alias_name.startswith(self.CMD_PREFIX): + alias_name = self.CMD_PREFIX + alias_name + + clone_alias(self.__class__, func, alias_name) + def get_names(self) -> list[str]: names = cmd.Cmd.get_names(self) @@ -175,44 +144,49 @@ def get_names(self) -> list[str]: return names - def default(self, line: str) -> Optional[bool]: + def _handle_command(self, line: str) -> bool | None: + """Check whether custom handling of the cmd can be performed and if so, do it. + + If a custom handling of the cmd was performed, return the result (a boolean indicating whether the shell should + exit). If not, return None. Can be overridden by subclasses to perform further / other 'custom command' checks. + """ if line == "EOF": return True - # Override default command execution to first attempt complex - # command execution, and then target plugin command execution + # Override default command execution to first attempt complex command execution command, command_args_str, line = self.parseline(line) - try: + if hasattr(self, self.CMD_PREFIX + command): return self._exec_command(command, command_args_str) - except AttributeError: - pass - if plugins := list(find_and_filter_plugins(self.target, command, [])): - return self._exec_target(plugins, command_args_str) + # Return None if no custom command was found to be run + return None - return cmd.Cmd.default(self, line) + def default(self, line: str) -> bool: + if (should_exit := self._handle_command(line)) is not None: + return should_exit + + # Fallback to default + cmd.Cmd.default(self, line) + return False def emptyline(self) -> None: """This function forces Python's cmd.Cmd module to behave like a regular shell. When entering an empty command, the cmd module will by default repeat the previous command. By defining an empty ``emptyline`` function we make sure no command is executed instead. - See https://stackoverflow.com/a/16479030 + Resources: + - https://stackoverflow.com/a/16479030 + - https://github.com/python/cpython/blob/3.12/Lib/cmd.py#L10 """ pass - def _exec( - self, func: Callable[[list[str], TextIO], bool], command_args_str: str, no_cyber: bool = False - ) -> Optional[bool]: + def _exec(self, func: Callable[[list[str], TextIO], bool], command_args_str: str, no_cyber: bool = False) -> bool: """Command execution helper that chains initial command and piped subprocesses (if any) together.""" argparts = [] if command_args_str is not None: - lexer = shlex.shlex(command_args_str, posix=True, punctuation_chars=True) - lexer.wordchars += "$" - lexer.whitespace_split = True - argparts = list(lexer) + argparts = arg_str_to_arg_list(command_args_str) if "|" in argparts: pipeidx = argparts.index("|") @@ -223,15 +197,16 @@ def _exec( except OSError as e: # in case of a failure in a subprocess print(e) + return False else: ctx = contextlib.nullcontext() - if self.target.props.get("cyber") and not no_cyber: + if self.cyber and not no_cyber: ctx = cyber.cyber(color=None, run_at_end=True) with ctx: return func(argparts, sys.stdout) - def _exec_command(self, command: str, command_args_str: str) -> Optional[bool]: + def _exec_command(self, command: str, command_args_str: str) -> bool: """Command execution helper for ``cmd_`` commands.""" cmdfunc = getattr(self, self.CMD_PREFIX + command) argparser = generate_argparse_for_bound_method(cmdfunc, usage_tmpl=f"{command} {{usage}}") @@ -240,21 +215,122 @@ def _exec_(argparts: list[str], stdout: TextIO) -> bool: try: args = argparser.parse_args(argparts) except SystemExit: - return + return False return cmdfunc(args, stdout) # These commands enter a subshell, which doesn't work well with cyber no_cyber = cmdfunc.__func__ in (TargetCli.cmd_registry, TargetCli.cmd_enter) return self._exec(_exec_, command_args_str, no_cyber) - def _exec_target(self, funcs: list[PluginFunction], command_args_str: str) -> Optional[bool]: + def do_man(self, line: str) -> bool: + """alias for help""" + return self.do_help(line) + + def complete_man(self, *args) -> list[str]: + return cmd.Cmd.complete_help(self, *args) + + def do_clear(self, line: str) -> bool: + """clear the terminal screen""" + os.system("cls||clear") + return False + + def do_cls(self, line: str) -> bool: + """alias for clear""" + return self.do_clear(line) + + def do_exit(self, line: str) -> bool: + """exit shell""" + return True + + def do_cyber(self, line: str) -> bool: + """cyber""" + self.cyber = not self.cyber + word, color = {False: ("D I S E N", cyber.Color.RED), True: ("E N", cyber.Color.YELLOW)}[self.cyber] + with cyber.cyber(color=color): + print(f"C Y B E R - M O D E - {word} G A G E D") + return False + + def do_debug(self, line: str) -> bool: + """toggle debug mode""" + self.debug = not self.debug + if self.debug: + print("Debug mode on") + else: + print("Debug mode off") + return False + + +class TargetCmd(ExtendedCmd): + DEFAULT_HISTFILE = "~/.dissect_history" + DEFAULT_HISTFILESIZE = 10_000 + DEFAULT_HISTDIR = None + DEFAULT_HISTDIRFMT = ".dissect_history_{uid}_{target}" + + def __init__(self, target: Target): + self.target = target + + # history file + self.histfilesize = getattr(target._config, "HISTFILESIZE", self.DEFAULT_HISTFILESIZE) + self.histdir = getattr(target._config, "HISTDIR", self.DEFAULT_HISTDIR) + + if self.histdir: + self.histdirfmt = getattr(target._config, "HISTDIRFMT", self.DEFAULT_HISTDIRFMT) + self.histfile = pathlib.Path(self.histdir).resolve() / pathlib.Path( + self.histdirfmt.format(uid=os.getuid(), target=target.name) + ) + else: + self.histfile = pathlib.Path(getattr(target._config, "HISTFILE", self.DEFAULT_HISTFILE)).expanduser() + + # prompt format + if ps1 := getattr(target._config, "PS1", None): + if "{cwd}" in ps1 and "{base}" in ps1: + self.prompt_ps1 = ps1 + + elif getattr(target._config, "NO_COLOR", None) or os.getenv("NO_COLOR"): + self.prompt_ps1 = "{base}:{cwd}$ " + + else: + self.prompt_ps1 = "\x1b[1;32m{base}\x1b[0m:\x1b[1;34m{cwd}\x1b[0m$ " + + super().__init__(self.target.props.get("cyber")) + + def preloop(self) -> None: + if readline and self.histfile.exists(): + try: + readline.read_history_file(self.histfile) + except Exception as e: + log.debug("Error reading history file: %s", e) + + def postloop(self) -> None: + if readline: + readline.set_history_length(self.histfilesize) + try: + readline.write_history_file(self.histfile) + except Exception as e: + log.debug("Error writing history file: %s", e) + + def _handle_command(self, line: str) -> bool | None: + if (should_exit := super()._handle_command(line)) is not None: + return should_exit + + # The parent class has already attempted complex command execution, we now attempt target plugin command + # execution + command, command_args_str, line = self.parseline(line) + + if plugins := list(find_and_filter_plugins(self.target, command, [])): + return self._exec_target(plugins, command_args_str) + + # We didn't execute a function on the target + return None + + def _exec_target(self, funcs: list[PluginFunction], command_args_str: str) -> bool: """Command exection helper for target plugins.""" - def _exec_(argparts: list[str], stdout: TextIO) -> Optional[bool]: + def _exec_(argparts: list[str], stdout: TextIO) -> None: try: output, value, _ = execute_function_on_target(self.target, func, argparts) except SystemExit: - return False + return if output == "record": # if the command results are piped to another process, @@ -276,45 +352,21 @@ def _exec_(argparts: list[str], stdout: TextIO) -> Optional[bool]: else: print(value, file=stdout) - result = None for func in funcs: try: - result = self._exec(_exec_, command_args_str) + self._exec(_exec_, command_args_str) except PluginError as err: if self.debug: raise err self.target.log.error(err) - return result + # Keep the shell open + return False - def do_python(self, line: str) -> Optional[bool]: + def do_python(self, line: str) -> bool: """drop into a Python shell""" python_shell([self.target]) - - def do_clear(self, line: str) -> Optional[bool]: - """clear the terminal screen""" - os.system("cls||clear") - - def do_cyber(self, line: str) -> Optional[bool]: - """cyber""" - self.target.props["cyber"] = not bool(self.target.props.get("cyber")) - word, color = {False: ("D I S E N", cyber.Color.RED), True: ("E N", cyber.Color.YELLOW)}[ - self.target.props["cyber"] - ] - with cyber.cyber(color=color): - print(f"C Y B E R - M O D E - {word} G A G E D") - - def do_exit(self, line: str) -> Optional[bool]: - """exit shell""" - return True - - def do_debug(self, line: str) -> Optional[bool]: - """toggle debug mode""" - self.debug = not self.debug - if self.debug: - print("Debug mode on") - else: - print("Debug mode off") + return False class TargetHubCli(cmd.Cmd): @@ -337,24 +389,26 @@ def __init__(self, targets: list[Target], cli: TargetCmd): self._clicache = {} - def default(self, line: str) -> Optional[bool]: + def default(self, line: str) -> bool: if line == "EOF": return True - return cmd.Cmd.default(self, line) + cmd.Cmd.default(self, line) + return False def emptyline(self) -> None: pass - def do_exit(self, line: str) -> Optional[bool]: + def do_exit(self, line: str) -> bool: """exit shell""" return True - def do_list(self, line: str) -> Optional[bool]: + def do_list(self, line: str) -> bool: """list the loaded targets""" print("\n".join([f"{i:2d}: {e}" for i, e in enumerate(self._names)])) + return False - def do_enter(self, line: str) -> Optional[bool]: + def do_enter(self, line: str) -> bool: """enter a target by number or name""" if line.isdigit(): @@ -364,24 +418,25 @@ def do_enter(self, line: str) -> Optional[bool]: idx = self._names_lower.index(line.lower()) except ValueError: print("Unknown name") - return + return False if idx >= len(self.targets): print("Unknown target") - return + return False try: cli = self._clicache[idx] except KeyError: target = self.targets[idx] if not self._targetcli.check_compatible(target): - return + return False cli = self._targetcli(self.targets[idx]) self._clicache[idx] = cli print(f"Entering {idx}: {self._names[idx]}") run_cli(cli) + return False def complete_enter(self, text: str, line: str, begidx: int, endidx: int) -> list[str]: if not text: @@ -395,32 +450,41 @@ def complete_enter(self, text: str, line: str, begidx: int, endidx: int) -> list return compl - def do_python(self, line: str) -> Optional[bool]: + def do_python(self, line: str) -> bool: """drop into a Python shell""" python_shell(self.targets) + return False class TargetCli(TargetCmd): """CLI for interacting with a target and browsing the filesystem.""" def __init__(self, target: Target): + self.prompt_base = _target_name(target) + TargetCmd.__init__(self, target) - self.prompt_base = target.name - self._clicache = {} + self._clicache = {} self.cwd = None self.chdir("/") @property def prompt(self) -> str: - return f"{self.prompt_base} {self.cwd}> " + return self.prompt_ps1.format(base=self.prompt_base, cwd=self.cwd) - def completedefault(self, text: str, line: str, begidx: int, endidx: int): - path = line[:begidx].rsplit(" ")[-1] + def completedefault(self, text: str, line: str, begidx: int, endidx: int) -> list[str]: + path = self.resolve_path(line[:begidx].rsplit(" ")[-1]) textlower = text.lower() - r = [fname for _, fname in self.scandir(path) if fname.lower().startswith(textlower)] - return r + suggestions = [] + for fpath, fname in ls_scandir(path): + if not fname.lower().startswith(textlower): + continue + + # Add a trailing slash to directories, to allow for easier traversal of the filesystem + suggestion = f"{fname}/" if fpath.is_dir() else fname + suggestions.append(suggestion) + return suggestions def resolve_path(self, path: str) -> fsutil.TargetPath: if not path: @@ -448,7 +512,7 @@ def resolve_glob_path(self, path: str) -> Iterator[fsutil.TargetPath]: # component print(err) - def check_file(self, path: str) -> Optional[fsutil.TargetPath]: + def check_file(self, path: str) -> fsutil.TargetPath | None: path = self.resolve_path(path) if not path.exists(): print(f"{path}: No such file") @@ -464,7 +528,7 @@ def check_file(self, path: str) -> Optional[fsutil.TargetPath]: return path - def check_dir(self, path: str) -> Optional[fsutil.TargetPath]: + def check_dir(self, path: str) -> fsutil.TargetPath | None: path = self.resolve_path(path) if not path.exists(): print(f"{path}: No such directory") @@ -480,70 +544,51 @@ def check_dir(self, path: str) -> Optional[fsutil.TargetPath]: return path + def check_path(self, path: str) -> fsutil.TargetPath | None: + path = self.resolve_path(path) + if not path.exists(): + print(f"{path}: No such file or directory") + return + + return path + def chdir(self, path: str) -> None: """Change directory to the given path.""" if path := self.check_dir(path): self.cwd = path - def scandir(self, path: str, color: bool = False) -> list[tuple[fsutil.TargetPath, str]]: - """List a directory for the given path.""" - path = self.resolve_path(path) - result = [] - - if path.exists() and path.is_dir(): - for file_ in path.iterdir(): - file_type = None - if color: - if file_.is_symlink(): - file_type = "ln" - elif file_.is_dir(): - file_type = "di" - elif file_.is_file(): - file_type = "fi" - - result.append((file_, fmt_ls_colors(file_type, file_.name) if color else file_.name)) - - # If we happen to scan an NTFS filesystem see if any of the - # entries has an alternative data stream and also list them. - entry = file_.get() - if isinstance(entry, LayerFilesystemEntry): - if entry.entries.fs.__type__ == "ntfs": - attrs = entry.lattr() - for data_stream in attrs.DATA: - if data_stream.name != "": - name = f"{file_.name}:{data_stream.name}" - result.append((file_, fmt_ls_colors(file_type, name) if color else name)) - - result.sort(key=lambda e: e[0].name) - - return result - - def do_cd(self, line: str) -> Optional[bool]: + def do_cd(self, line: str) -> bool: """change directory""" self.chdir(line) + return False - def do_pwd(self, line: str) -> Optional[bool]: + def do_pwd(self, line: str) -> bool: """print current directory""" print(self.cwd) + return False - def do_disks(self, line: str) -> Optional[bool]: + def do_disks(self, line: str) -> bool: """print target disks""" for d in self.target.disks: print(str(d)) + return False - def do_volumes(self, line: str) -> Optional[bool]: + def do_volumes(self, line: str) -> bool: """print target volumes""" for v in self.target.volumes: print(str(v)) + return False - def do_filesystems(self, line: str) -> Optional[bool]: + def do_filesystems(self, line: str) -> bool: """print target filesystems""" for fs in self.target.filesystems: print(str(fs)) + return False - def do_info(self, line: str) -> Optional[bool]: + def do_info(self, line: str) -> bool: """print target information""" - return print_target_info(self.target) + print_target_info(self.target) + return False @arg("path", nargs="?") @arg("-l", action="store_true") @@ -552,129 +597,137 @@ def do_info(self, line: str) -> Optional[bool]: @arg("-R", "--recursive", action="store_true", help="recursively list subdirectories encountered") @arg("-c", action="store_true", dest="use_ctime", help="show time when file status was last changed") @arg("-u", action="store_true", dest="use_atime", help="show time of last access") - def cmd_ls(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + @alias("l") + @alias("dir") + def cmd_ls(self, args: argparse.Namespace, stdout: TextIO) -> bool: """list directory contents""" path = self.resolve_path(args.path) if args.use_ctime and args.use_atime: print("can't specify -c and -u at the same time") - return + return False - if not path or not path.exists(): - return + if not path or not self.check_dir(path): + return False - self._print_ls(args, path, 0, stdout) + if path.is_file(): + print(args.path) # mimic ls behaviour + return False - def _print_ls(self, args: argparse.Namespace, path: fsutil.TargetPath, depth: int, stdout: TextIO) -> None: - path = self.resolve_path(path) - subdirs = [] + print_ls(path, 0, stdout, args.l, args.human_readable, args.recursive, args.use_ctime, args.use_atime) + return False - if path.is_dir(): - contents = self.scandir(path, color=True) - elif path.is_file(): - contents = [(path, path.name)] - - if depth > 0: - print(f"\n{str(path)}:", file=stdout) - - if not args.l: - for target_path, name in contents: - print(name, file=stdout) - if target_path.is_dir(): - subdirs.append(target_path) - else: - if len(contents) > 1: - print(f"total {len(contents)}", file=stdout) - for target_path, name in contents: - self.print_extensive_file_stat(args=args, stdout=stdout, target_path=target_path, name=name) - if target_path.is_dir(): - subdirs.append(target_path) - - if args.recursive and subdirs: - for subdir in subdirs: - self._print_ls(args, subdir, depth + 1, stdout) - - def print_extensive_file_stat( - self, args: argparse.Namespace, stdout: TextIO, target_path: fsutil.TargetPath, name: str - ) -> None: - """Print the file status.""" - try: - entry = target_path.get() - stat = entry.lstat() - symlink = f" -> {entry.readlink()}" if entry.is_symlink() else "" - show_time = stat.st_mtime - if args.use_ctime: - show_time = stat.st_ctime - elif args.use_atime: - show_time = stat.st_atime - utc_time = datetime.datetime.utcfromtimestamp(show_time).isoformat() - - print( - f"{stat_modestr(stat)} {stat.st_uid:4d} {stat.st_gid:4d} {stat.st_size:6d} {utc_time} {name}{symlink}", - file=stdout, - ) + @arg("path", nargs="?") + def cmd_ll(self, args: argparse.Namespace, stdout: TextIO) -> bool: + """alias for ls -la""" + args = extend_args(args, self.cmd_ls) + args.l = True # noqa: E741 + args.a = True + return self.cmd_ls(args, stdout) - except FileNotFoundError: - print( - f"?????????? ? ? ? ????-??-??T??:??:??.?????? {name}", - file=stdout, - ) + @arg("path", nargs="?") + def cmd_tree(self, args: argparse.Namespace, stdout: TextIO) -> bool: + """alias for ls -R""" + args = extend_args(args, self.cmd_ls) + args.recursive = True + return self.cmd_ls(args, stdout) @arg("path", nargs="?") - @arg("-name", default="*") - @arg("-iname") - def cmd_find(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + @arg("-name", default="*", help="path to match with") + @arg("-iname", help="like -name, but the match is case insensitive") + @arg("-atime", type=int, help="file was last accessed n*24 hours ago") + @arg("-mtime", type=int, help="file was last modified n*24 hours ago") + @arg("-ctime", type=int, help="file (windows) or metadata (unix) was last changed n*24 hours ago") + @arg("-btime", type=int, help="file was born n*24 hours ago (ext4)") + def cmd_find(self, args: argparse.Namespace, stdout: TextIO) -> bool: """search for files in a directory hierarchy""" path = self.resolve_path(args.path) - if not path: - return + if not path or not self.check_dir(path): + return False + + matches = [] + now = datetime.now(tz=timezone.utc) + do_time_compare = any([args.mtime, args.atime]) if args.iname: pattern = re.compile(fnmatch.translate(args.iname), re.IGNORECASE) for f in path.rglob("*"): if pattern.match(f.name): - print(f, file=stdout) - - elif args.name: + matches.append(f) if do_time_compare else print(f, file=stdout) + else: for f in path.rglob(args.name): + matches.append(f) if do_time_compare else print(f, file=stdout) + + def compare(now: datetime, then_ts: float, offset: int) -> bool: + then = datetime.fromtimestamp(then_ts, tz=timezone.utc) + return now - timedelta(hours=offset * 24) > then + + if do_time_compare: + for f in matches: + s = f.lstat() + + if args.mtime and compare(now, s.st_mtime, offset=args.mtime): + continue + + if args.atime and compare(now, s.st_atime, offset=args.atime): + continue + + if args.ctime and compare(now, s.st_ctime, offset=args.ctime): + continue + + if args.btime and compare(now, s.st_birthtime, offset=args.btime): + continue + print(f, file=stdout) + return False + @arg("path") @arg("-L", "--dereference", action="store_true") - def cmd_stat(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + def cmd_stat(self, args: argparse.Namespace, stdout: TextIO) -> bool: """display file status""" path = self.resolve_path(args.path) - if not path: - return + if not path or not self.check_path(path): + return False + + print_stat(path, stdout, args.dereference) + return False + + @arg("path") + @arg("-d", "--dump", action="store_true") + @arg("-R", "--recursive", action="store_true") + @alias("getfattr") + def cmd_attr(self, args: argparse.Namespace, stdout: TextIO) -> bool: + """display file attributes""" + path = self.resolve_path(args.path) + if not path or not self.check_path(path): + return False + + try: + if attr := path.get().attr(): + print_xattr(path.name, attr, stdout) + except Exception: + pass + + if args.recursive: + for child in path.rglob("*"): + try: + if child_attr := child.get().attr(): + print_xattr(child, child_attr, stdout) + print() + except Exception: + pass - symlink = f"-> {path.readlink()}" if path.is_symlink() else "" - - s = path.stat() if args.dereference else path.lstat() - - res = STAT_TEMPLATE.format( - path=path, - symlink=symlink, - size=s.st_size, - filetype="", - inode=s.st_ino, - nlink=s.st_nlink, - modeord=oct(stat.S_IMODE(s.st_mode)), - modestr=stat_modestr(s), - uid=s.st_uid, - gid=s.st_gid, - atime=datetime.datetime.utcfromtimestamp(s.st_atime).isoformat(), - mtime=datetime.datetime.utcfromtimestamp(s.st_mtime).isoformat(), - ctime=datetime.datetime.utcfromtimestamp(s.st_ctime).isoformat(), - ) - print(res, file=stdout) + return False @arg("path") - def cmd_file(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + def cmd_file(self, args: argparse.Namespace, stdout: TextIO) -> bool: """determine file type""" + path = self.check_file(args.path) if not path: - return + return False fh = path.open() @@ -686,11 +739,12 @@ def cmd_file(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: p.wait() filetype = p.stdout.read().decode().split(":", 1)[1].strip() print(f"{path}: {filetype}", file=stdout) + return False @arg("path", nargs="+") @arg("-o", "--out", default=".") @arg("-v", "--verbose", action="store_true") - def cmd_save(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + def cmd_save(self, args: argparse.Namespace, stdout: TextIO) -> bool: """save a common file or directory to the host filesystem""" dst_path = pathlib.Path(args.out).resolve() @@ -719,7 +773,7 @@ def get_diverging_path(path: pathlib.Path, reference_path: pathlib.Path) -> path return diverging_path def save_path( - src_path: pathlib.Path, dst_path: pathlib.Path, create_dst_subdir: Optional[pathlib.Path] = None + src_path: pathlib.Path, dst_path: pathlib.Path, create_dst_subdir: pathlib.Path | None = None ) -> None: """Save a common file or directory in src_path to dst_path. @@ -798,8 +852,8 @@ def save_path( try: first_src_path = next(src_paths) except StopIteration: - print(f"{path}: no such file or directory") - return + print(f"{path}: No such file or directory") + return False try: second_src_path = next(src_paths) @@ -820,10 +874,18 @@ def save_path( extra_dir = get_diverging_path(src_path, reference_path).parent save_path(src_path, dst_path, create_dst_subdir=extra_dir) + return False + @arg("path") - def cmd_cat(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + @alias("type") + def cmd_cat(self, args: argparse.Namespace, stdout: TextIO) -> bool: """print file content""" - paths = self.resolve_glob_path(args.path) + paths = list(self.resolve_glob_path(args.path)) + + if not paths: + print(f"{args.path}: No such file or directory") + return False + stdout = stdout.buffer for path in paths: path = self.check_file(path) @@ -834,11 +896,17 @@ def cmd_cat(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: shutil.copyfileobj(fh, stdout) stdout.flush() print("") + return False @arg("path") - def cmd_zcat(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + def cmd_zcat(self, args: argparse.Namespace, stdout: TextIO) -> bool: """print file content from compressed files""" - paths = self.resolve_glob_path(args.path) + paths = list(self.resolve_glob_path(args.path)) + + if not paths: + print(f"{args.path}: No such file or directory") + return False + stdout = stdout.buffer for path in paths: path = self.check_file(path) @@ -849,45 +917,70 @@ def cmd_zcat(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: shutil.copyfileobj(fh, stdout) stdout.flush() + return False + @arg("path") - def cmd_hexdump(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: - """print a hexdump of the first X bytes""" + @arg("-n", "--length", type=int, default=16 * 20, help="amount of bytes to read") + @arg("-s", "--skip", type=int, default=0, help="skip offset bytes from the beginning") + @arg("-p", "--hex", action="store_true", default=False, help="output in plain hexdump style") + @arg("-C", "--canonical", action="store_true") + @alias("xxd") + def cmd_hexdump(self, args: argparse.Namespace, stdout: TextIO) -> bool: + """print a hexdump of a file""" path = self.check_file(args.path) if not path: - return + return False + + fh = path.open("rb") + if args.skip > 0: + fh.seek(args.skip + 1) + + if args.hex: + print(fh.read(args.length).hex(), file=stdout) + else: + print(hexdump(fh.read(args.length), output="string"), file=stdout) - print(hexdump(path.open().read(16 * 20), output="string"), file=stdout) + return False @arg("path") - def cmd_hash(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + @alias("digest") + @alias("shasum") + def cmd_hash(self, args: argparse.Namespace, stdout: TextIO) -> bool: """print the MD5, SHA1 and SHA256 hashes of a file""" path = self.check_file(args.path) if not path: - return + return False md5, sha1, sha256 = path.get().hash() print(f"MD5:\t{md5}\nSHA1:\t{sha1}\nSHA256:\t{sha256}", file=stdout) + return False @arg("path") - def cmd_less(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + @alias("head") + @alias("more") + def cmd_less(self, args: argparse.Namespace, stdout: TextIO) -> bool: """open the first 10 MB of a file with less""" path = self.check_file(args.path) if not path: - return + return False pydoc.pager(path.open("rt", errors="ignore").read(10 * 1024 * 1024)) + return False @arg("path") - def cmd_zless(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + @alias("zhead") + @alias("zmore") + def cmd_zless(self, args: argparse.Namespace, stdout: TextIO) -> bool: """open the first 10 MB of a compressed file with zless""" path = self.check_file(args.path) if not path: - return + return False pydoc.pager(fsutil.open_decompress(path, "rt").read(10 * 1024 * 1024)) + return False @arg("path", nargs="+") - def cmd_readlink(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + def cmd_readlink(self, args: argparse.Namespace, stdout: TextIO) -> bool: """print resolved symbolic links or canonical file names""" for path in args.path: path = self.resolve_path(path) @@ -896,12 +989,14 @@ def cmd_readlink(self, args: argparse.Namespace, stdout: TextIO) -> Optional[boo print(path.get().readlink(), file=stdout) + return False + @arg("path", nargs="?", help="load a hive from the given path") - def cmd_registry(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + def cmd_registry(self, args: argparse.Namespace, stdout: TextIO) -> bool: """drop into a registry shell""" if self.target.os == "linux": run_cli(UnixConfigTreeCli(self.target)) - return + return False hive = None @@ -909,7 +1004,7 @@ def cmd_registry(self, args: argparse.Namespace, stdout: TextIO) -> Optional[boo if args.path: path = self.check_file(args.path) if not path: - return + return False hive = regutil.RegfHive(path) clikey = f"registry_{path}" @@ -918,26 +1013,26 @@ def cmd_registry(self, args: argparse.Namespace, stdout: TextIO) -> Optional[boo cli = self._clicache[clikey] except KeyError: if not hive and not RegistryCli.check_compatible(self.target): - return + return False cli = RegistryCli(self.target, hive) self._clicache[clikey] = cli run_cli(cli) - - # Print an additional empty newline after exit print() + return False @arg("targets", metavar="TARGETS", nargs="*", help="targets to load") @arg("-p", "--python", action="store_true", help="(I)Python shell") @arg("-r", "--registry", action="store_true", help="registry shell") - def cmd_enter(self, args: argparse.Namespace, stdout: TextIO) -> None: + def cmd_enter(self, args: argparse.Namespace, stdout: TextIO) -> bool: """load one or more files as sub-targets and drop into a sub-shell""" paths = [self.resolve_path(path) for path in args.targets] if args.python: # Quick path that doesn't require CLI caching - return open_shell(paths, args.python, args.registry) + open_shell(paths, args.python, args.registry) + return False clikey = tuple(str(path) for path in paths) try: @@ -946,33 +1041,32 @@ def cmd_enter(self, args: argparse.Namespace, stdout: TextIO) -> None: targets = list(Target.open_all(paths)) cli = create_cli(targets, RegistryCli if args.registry else TargetCli) if not cli: - return + return False self._clicache[clikey] = cli run_cli(cli) - - # Print an additional empty newline after exit print() + return False class UnixConfigTreeCli(TargetCli): def __init__(self, target: Target): TargetCmd.__init__(self, target) self.config_tree = target.etc() - self.prompt_base = target.name + self.prompt_base = _target_name(target) self.cwd = None self.chdir("/") @property def prompt(self) -> str: - return f"{self.prompt_base}/config_tree {self.cwd}> " + return f"(config tree) {self.prompt_base}:{self.cwd}$ " def check_compatible(target: Target) -> bool: return target.has_function("etc") - def resolve_path(self, path: Optional[Union[str, fsutil.TargetPath]]) -> fsutil.TargetPath: + def resolve_path(self, path: str | fsutil.TargetPath | None) -> fsutil.TargetPath: if not path: return self.cwd @@ -1006,12 +1100,12 @@ def resolve_glob_path(self, path: fsutil.TargetPath) -> Iterator[fsutil.TargetPa class RegistryCli(TargetCmd): """CLI for browsing the registry.""" - def __init__(self, target: Target, registry: Optional[regutil.RegfHive] = None): - TargetCmd.__init__(self, target) - self.registry = registry or target.registry + def __init__(self, target: Target, registry: regutil.RegfHive | None = None): + self.prompt_base = _target_name(target) - self.prompt_base = target.name + TargetCmd.__init__(self, target) + self.registry = registry or target.registry self.cwd = None self.chdir("\\") @@ -1024,8 +1118,7 @@ def check_compatible(target: Target) -> bool: @property def prompt(self) -> str: - prompt_end = self.cwd.strip("\\") - return f"{self.prompt_base}/registry {prompt_end}> " + return "(registry) " + self.prompt_ps1.format(base=self.prompt_base, cwd=self.cwd) def completedefault(self, text: str, line: str, begidx: int, endidx: int) -> list[str]: path = line[:begidx].rsplit(" ")[-1] @@ -1065,9 +1158,7 @@ def chdir(self, path: str) -> None: if self.check_key(path): self.cwd = "\\" + path.strip("\\") - def scandir( - self, path: str, color: bool = False - ) -> list[tuple[Union[regutil.RegistryKey, regutil.RegistryValue], str]]: + def scandir(self, path: str, color: bool = False) -> list[tuple[regutil.RegistryKey | regutil.RegistryValue, str]]: try: key = self.resolve_key(path) except RegistryError: @@ -1083,56 +1174,95 @@ def scandir( r.sort(key=lambda e: e[0].name) return r - def do_cd(self, line: str) -> Optional[bool]: + def do_cd(self, line: str) -> bool: """change subkey""" + if line == "..": + try: + self.resolve_key(self.cwd + "\\..") + except RegistryError: + self.do_up(line) + return False + self.chdir(line) + return False - def do_up(self, line: str) -> Optional[bool]: + def do_up(self, line: str) -> bool: """go up a subkey""" parent = self.cwd.rpartition("\\")[0] if not parent: parent = "\\" self.chdir(parent) + return False - def do_pwd(self, line: str) -> Optional[bool]: + def do_pwd(self, line: str) -> bool: """print current path""" - print(self.cwd) + print(self.cwd.lstrip("\\")) + return False - def do_recommend(self, line: str) -> Optional[bool]: + def do_recommend(self, line: str) -> bool: """recommend a key""" print(random.choice([name for _, name in self.scandir(None)])) + return False @arg("path", nargs="?") - def cmd_ls(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + def cmd_ls(self, args: argparse.Namespace, stdout: TextIO) -> bool: key = self.check_key(args.path) if not key: - return + return False r = self.scandir(key, color=True) print("\n".join([name for _, name in r]), file=stdout) + return False @arg("value") - def cmd_cat(self, args: argparse.Namespace, stdout: TextIO) -> Optional[bool]: + def cmd_cat(self, args: argparse.Namespace, stdout: TextIO) -> bool: value = self.check_value(args.value) if not value: - return + return False print(repr(value.value), file=stdout) + return False + + @arg("value") + @arg("-p", "--hex", action="store_true") + @alias("xxd") + def cmd_hexdump(self, args: argparse.Namespace, stdout: TextIO) -> bool: + value = self.check_value(args.value) + if not value: + return False + if args.hex: + print(value.value.hex(), file=stdout) + else: + print(hexdump(value.value, output="string"), file=stdout) -def fmt_ls_colors(ft: str, name: str) -> str: - """Helper method to colorize strings according to LS_COLORS.""" - try: - return LS_COLORS[ft].format(name) - except KeyError: - pass + return False + + +def arg_str_to_arg_list(args: str) -> list[str]: + """Convert a commandline string to a list of command line arguments.""" + lexer = shlex.shlex(args, posix=True, punctuation_chars=True) + lexer.wordchars += "$" + lexer.whitespace_split = True + return list(lexer) - try: - return LS_COLORS[fsutil.splitext(name)[1]].format(name) - except KeyError: - pass - return name +def extend_args(args: argparse.Namespace, func: Callable) -> argparse.Namespace: + """Extend the arguments of the given ``func`` with the provided ``argparse.Namespace``.""" + for short, kwargs in func.__args__: + name = kwargs.get("dest", short[-1]).lstrip("-").replace("-", "_") + if not hasattr(args, name): + setattr(args, name, None) + return args + + +def _target_name(target: Target) -> str: + """Return a target name for cmd.Cmd base prompts.""" + + if target.has_function("domain") and target.domain: + return f"{target.name}.{target.domain}" + + return target.name @contextmanager @@ -1200,15 +1330,7 @@ def build_pipe_stdout(pipe_parts: list[str]) -> Iterator[TextIO]: yield pipe_stdin -def stat_modestr(st: fsutil.stat_result) -> str: - """Helper method for generating a mode string from a numerical mode value.""" - is_dir = "d" if stat.S_ISDIR(st.st_mode) else "-" - dic = {"7": "rwx", "6": "rw-", "5": "r-x", "4": "r--", "0": "---"} - perm = str(oct(st.st_mode)[-3:]) - return is_dir + "".join(dic.get(x, x) for x in perm) - - -def open_shell(targets: list[Union[str, pathlib.Path]], python: bool, registry: bool) -> None: +def open_shell(targets: list[str | pathlib.Path], python: bool, registry: bool) -> None: """Helper method for starting a regular, Python or registry shell for one or multiple targets.""" targets = list(Target.open_all(targets)) @@ -1246,7 +1368,7 @@ def python_shell(targets: list[Target]) -> None: print() -def create_cli(targets: list[Target], cli_cls: type[TargetCmd]) -> Optional[cmd.Cmd]: +def create_cli(targets: list[Target], cli_cls: type[TargetCmd]) -> cmd.Cmd | None: """Helper method for instatiating the appropriate CLI.""" if len(targets) == 1: target = targets[0] @@ -1275,6 +1397,9 @@ def run_cli(cli: cmd.Cmd) -> None: return except KeyboardInterrupt: + # Run postloop so the interrupted command is added to the history file + cli.postloop() + # Add a line when pressing ctrl+c, so the next one starts at a new line print() @@ -1286,6 +1411,8 @@ def run_cli(cli: cmd.Cmd) -> None: print(f"*** Unhandled error: {e}") print("If you wish to see the full debug trace, enable debug mode.") + cli.postloop() + @catch_sigpipe def main() -> None: @@ -1311,8 +1438,7 @@ def main() -> None: args.targets = args_to_uri(args.targets, args.loader, rest) if args.loader else args.targets process_generic_arguments(args) - # For the shell tool we want -q to log slightly more then just CRITICAL - # messages. + # For the shell tool we want -q to log slightly more then just CRITICAL messages. if args.quiet: logging.getLogger("dissect").setLevel(level=logging.ERROR) diff --git a/dissect/target/tools/utils.py b/dissect/target/tools/utils.py index 04730b01e..d69317cca 100644 --- a/dissect/target/tools/utils.py +++ b/dissect/target/tools/utils.py @@ -8,6 +8,7 @@ import urllib from datetime import datetime from functools import wraps +from importlib.metadata import PackageNotFoundError, version from pathlib import Path from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Type, Union @@ -32,6 +33,7 @@ def configure_generic_arguments(args_parser: argparse.ArgumentParser) -> None: args_parser.add_argument("-K", "--keychain-file", type=Path, help="keychain file in CSV format") args_parser.add_argument("-Kv", "--keychain-value", help="passphrase, recovery key or key file path value") args_parser.add_argument("-v", "--verbose", action="count", default=0, help="increase output verbosity") + args_parser.add_argument("--version", action="store_true", help="print version") args_parser.add_argument("-q", "--quiet", action="store_true", help="do not output logging information") args_parser.add_argument( "--plugin-path", @@ -45,6 +47,13 @@ def configure_generic_arguments(args_parser: argparse.ArgumentParser) -> None: def process_generic_arguments(args: argparse.Namespace) -> None: configure_logging(args.verbose, args.quiet, as_plain_text=True) + if args.version: + try: + print("dissect.target version " + version("dissect.target")) + except PackageNotFoundError: + print("unable to determine version") + sys.exit(0) + if args.keychain_file: keychain.register_keychain_file(args.keychain_file) diff --git a/tests/test_plugin.py b/tests/test_plugin.py index e109d9ddf..bc15d9ab8 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -1,7 +1,7 @@ import os from functools import reduce from pathlib import Path -from typing import Optional +from typing import Iterator, Optional from unittest.mock import Mock, patch import pytest @@ -15,6 +15,7 @@ NamespacePlugin, OSPlugin, Plugin, + alias, environment_variable_paths, export, find_plugin_functions, @@ -533,3 +534,24 @@ def test_os_plugin___init_subclass__(subclass: type[OSPlugin], replaced: bool) - assert (os_docstring == subclass_docstring) is replaced if not replaced: assert subclass_docstring == f"Test docstring {method_name}" + + +class ExampleFooPlugin(Plugin): + def check_compatible(self) -> None: + return + + @export + @alias("bar") + @alias(name="baz") + def foo(self) -> Iterator[str]: + yield "foo!" + + +def test_plugin_alias(target_bare: Target) -> None: + """test ``@alias`` decorator behaviour""" + target_bare.add_plugin(ExampleFooPlugin) + assert target_bare.has_function("foo") + assert target_bare.foo.__aliases__ == ["baz", "bar"] + assert target_bare.has_function("bar") + assert target_bare.has_function("baz") + assert list(target_bare.foo()) == list(target_bare.bar()) == list(target_bare.baz()) diff --git a/tests/tools/test_fsutils.py b/tests/tools/test_fsutils.py new file mode 100644 index 000000000..3fabb9c82 --- /dev/null +++ b/tests/tools/test_fsutils.py @@ -0,0 +1,44 @@ +import sys +from unittest.mock import MagicMock + +import pytest + +from dissect.target.exceptions import FileNotFoundError +from dissect.target.filesystem import FilesystemEntry +from dissect.target.helpers.fsutil import stat_result +from dissect.target.target import Target +from dissect.target.tools.fsutils import print_extensive_file_stat_listing + + +def test_target_cli_print_extensive_file_stat(target_win: Target, capsys: pytest.CaptureFixture) -> None: + mock_stat = stat_result([0o100777, 1, 2, 3, 1337, 7331, 999, 0, 0, 0]) + mock_entry = MagicMock(spec_set=FilesystemEntry) + mock_entry.lstat.return_value = mock_stat + mock_entry.is_symlink.return_value = False + + print_extensive_file_stat_listing(sys.stdout, "foo", mock_entry) + + captured = capsys.readouterr() + assert captured.out == "-rwxrwxrwx 1337 7331 999 1970-01-01T00:00:00.000000+00:00 foo\n" + + +def test_print_extensive_file_stat_symlink(target_win: Target, capsys: pytest.CaptureFixture) -> None: + mock_stat = stat_result([0o120777, 1, 2, 3, 1337, 7331, 999, 0, 0, 0]) + mock_entry = MagicMock(spec_set=FilesystemEntry) + mock_entry.lstat.return_value = mock_stat + mock_entry.is_symlink.return_value = True + mock_entry.readlink.return_value = "bar" + + print_extensive_file_stat_listing(sys.stdout, "foo", mock_entry) + + captured = capsys.readouterr() + assert captured.out == "lrwxrwxrwx 1337 7331 999 1970-01-01T00:00:00.000000+00:00 foo -> bar\n" + + +def test_print_extensive_file_stat_fail(target_win: Target, capsys: pytest.CaptureFixture) -> None: + mock_entry = MagicMock(spec_set=FilesystemEntry) + mock_entry.lstat.side_effect = FileNotFoundError("ERROR") + print_extensive_file_stat_listing(sys.stdout, "foo", mock_entry) + + captured = capsys.readouterr() + assert captured.out == "?????????? ? ? ? ????-??-??T??:??:??.??????+??:?? foo\n" diff --git a/tests/tools/test_shell.py b/tests/tools/test_shell.py index 78533ff4a..b209c7f9c 100644 --- a/tests/tools/test_shell.py +++ b/tests/tools/test_shell.py @@ -3,14 +3,14 @@ import sys from io import BytesIO, StringIO from pathlib import Path +from typing import Callable from unittest.mock import MagicMock import pytest -from dissect.target.exceptions import FileNotFoundError -from dissect.target.filesystem import FilesystemEntry -from dissect.target.helpers.fsutil import TargetPath, normalize, stat_result -from dissect.target.tools import shell +from dissect.target.helpers.fsutil import TargetPath, normalize +from dissect.target.target import Target +from dissect.target.tools import fsutils from dissect.target.tools.shell import ( TargetCli, TargetHubCli, @@ -32,7 +32,7 @@ @pytest.mark.skipif(platform.system() == "Windows", reason="Unix-specific test.") -def test_build_pipe(): +def test_build_pipe() -> None: pipeparts = [ "grep", "test2", @@ -49,7 +49,7 @@ def test_build_pipe(): @pytest.mark.skipif(platform.system() == "Windows", reason="Unix-specific test.") -def test_build_pipe_nonexisting_command(): +def test_build_pipe_nonexisting_command() -> None: dummy_command = "non-existing-command" pipeparts = ["grep", "test1", "|", dummy_command] input_stream = "input data test1" @@ -64,7 +64,7 @@ def test_build_pipe_nonexisting_command(): @pytest.mark.skipif(platform.system() == "Windows", reason="Unix-specific test.") -def test_build_pipe_broken_pipe(): +def test_build_pipe_broken_pipe() -> None: # `ls` command does not accept stdin, so pipe closes prematurely pipeparts = ["grep", "test1", "|", "ls"] input_stream = "input data test1" @@ -78,7 +78,7 @@ def test_build_pipe_broken_pipe(): print(input_stream, file=pipe_stdin) -def test_targethubcli_autocomplete_enter(make_mock_targets): +def test_targethubcli_autocomplete_enter(make_mock_targets: Callable) -> None: target1, target2 = make_mock_targets(2) target1.hostname = "dev-null-1.localhost" @@ -101,28 +101,37 @@ def test_targethubcli_autocomplete_enter(make_mock_targets): assert suggestions == ["1"] -def test_targetcli_autocomplete(target_bare): +def test_targetcli_autocomplete(target_bare: Target, monkeypatch: pytest.MonkeyPatch) -> None: target_cli = TargetCli(target_bare) - base_path = "/base-path/" - subpath_match = "subpath1" + mock_subfolder = MagicMock(spec_set=TargetPath) + mock_subfolder.is_dir.return_value = True + mock_subfile = MagicMock(spec_set=TargetPath) + mock_subfile.is_dir.return_value = False + + base_path = "/base-path" + + subfolder_name = "subfolder" + subfile_name = "subfile" subpath_mismatch = "mismatch" - def dummy_scandir(path): - assert path == base_path + def dummy_scandir(path: TargetPath): + assert str(path) == base_path return [ - (None, subpath_match), + (mock_subfolder, subfolder_name), + (mock_subfile, subfile_name), (None, subpath_mismatch), ] - target_cli.scandir = dummy_scandir + monkeypatch.setattr("dissect.target.tools.shell.ls_scandir", dummy_scandir) + suggestions = target_cli.completedefault("sub", f"ls {base_path}/sub", 3 + len(base_path), 3 + len(base_path) + 3) - suggestions = target_cli.completedefault("sub", f"ls {base_path}sub", 3 + len(base_path), 3 + len(base_path) + 3) - assert suggestions == [subpath_match] + # We expect folder suggestions to be trailed with a '/' + assert suggestions == [f"{subfolder_name}/", subfile_name] @pytest.mark.skipif(platform.system() == "Windows", reason="Unix-specific test.") -def test_pipe_symbol_parsing(capfd, target_bare): +def test_pipe_symbol_parsing(capfd: pytest.CaptureFixture, target_bare: Target) -> None: cli = TargetCli(target_bare) def mock_func(func_args, func_stdout): @@ -146,7 +155,7 @@ def mock_func(func_args, func_stdout): @pytest.mark.skipif(platform.system() == "Windows", reason="Unix-specific test.") -def test_exec_target_command(capfd, target_default): +def test_exec_target_command(capfd: pytest.CaptureFixture, target_default: Target) -> None: cli = TargetCli(target_default) # `users` from the general OSPlugin does not ouput any records, but as the # ouput is piped to, the records are transformed to a binary record stream, @@ -161,9 +170,9 @@ def test_exec_target_command(capfd, target_default): assert captured.out.endswith("0\n") -def test_target_cli_ls(target_win, capsys, monkeypatch): +def test_target_cli_ls(target_win: Target, capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: # disable colorful output in `target-shell` - monkeypatch.setattr(shell, "LS_COLORS", {}) + monkeypatch.setattr(fsutils, "LS_COLORS", {}) cli = TargetCli(target_win) cli.onecmd("ls") @@ -172,54 +181,6 @@ def test_target_cli_ls(target_win, capsys, monkeypatch): assert captured.out == "\n".join(["c:", "sysvol"]) + "\n" -def test_target_cli_print_extensive_file_stat(target_win, capsys): - mock_stat = stat_result([0o1777, 1, 2, 3, 1337, 7331, 999, 0, 0, 0]) - mock_entry = MagicMock(spec_set=FilesystemEntry) - mock_entry.lstat.return_value = mock_stat - mock_entry.is_symlink.return_value = False - mock_path = MagicMock(spec_set=TargetPath) - mock_path.get.return_value = mock_entry - - mock_args = MagicMock() - - cli = TargetCli(target_win) - cli.print_extensive_file_stat(mock_args, sys.stdout, mock_path, "foo") - - captured = capsys.readouterr() - assert captured.out == "-rwxrwxrwx 1337 7331 999 1970-01-01T00:00:00 foo\n" - - -def test_target_cli_print_extensive_file_stat_symlink(target_win, capsys): - mock_stat = stat_result([0o1777, 1, 2, 3, 1337, 7331, 999, 0, 0, 0]) - mock_entry = MagicMock(spec_set=FilesystemEntry) - mock_entry.lstat.return_value = mock_stat - mock_entry.is_symlink.return_value = True - mock_entry.readlink.return_value = "bar" - mock_path = MagicMock(spec_set=TargetPath) - mock_path.get.return_value = mock_entry - - mock_args = MagicMock() - - cli = TargetCli(target_win) - cli.print_extensive_file_stat(mock_args, sys.stdout, mock_path, "foo") - - captured = capsys.readouterr() - assert captured.out == "-rwxrwxrwx 1337 7331 999 1970-01-01T00:00:00 foo -> bar\n" - - -def test_target_cli_print_extensive_file_stat_fail(target_win, capsys): - mock_path = MagicMock(spec_set=TargetPath) - mock_path.get.side_effect = FileNotFoundError("ERROR") - - mock_args = MagicMock() - - cli = TargetCli(target_win) - cli.print_extensive_file_stat(mock_args, sys.stdout, mock_path, "foo") - - captured = capsys.readouterr() - assert captured.out == "?????????? ? ? ? ????-??-??T??:??:??.?????? foo\n" - - @pytest.mark.parametrize( "folders, files, save, expected", [ @@ -236,7 +197,9 @@ def test_target_cli_print_extensive_file_stat_fail(target_win, capsys): ), ], ) -def test_target_cli_save(target_win, tmp_path, folders, files, save, expected): +def test_target_cli_save( + target_win: Target, tmp_path: Path, folders: str, files: str, save: str, expected: str +) -> None: output_dir = tmp_path / "output" output_dir.mkdir(parents=True, exist_ok=True) @@ -258,6 +221,17 @@ def _map_function(path: Path) -> str: assert tree == expected +def run_target_shell( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, target_path: str, stdin: str +) -> tuple[bytes, bytes]: + with monkeypatch.context() as m: + m.setattr("sys.argv", ["target-shell", target_path]) + m.setattr("sys.stdin", StringIO(stdin)) + m.setenv("NO_COLOR", "1") + target_shell() + return capsys.readouterr() + + @pytest.mark.parametrize( "provided_input, expected_output", [ @@ -275,13 +249,24 @@ def test_target_cli_unicode_argparse( provided_input: str, expected_output: str, ) -> None: - with monkeypatch.context() as m: - target_file = absolute_path("_data/tools/shell/unicode.tar") - m.setattr("sys.argv", ["target-shell", target_file]) - m.setattr("sys.stdin", StringIO(f"ls unicode/charsets/{provided_input}")) - target_shell() - out, err = capsys.readouterr() - out = out.replace("unicode.tar />", "").strip() - - assert out == expected_output - assert "unrecognized arguments" not in err + out, err = run_target_shell( + monkeypatch, capsys, absolute_path("_data/tools/shell/unicode.tar"), f"ls unicode/charsets/{provided_input}" + ) + out = out.replace("unicode.tar:/$", "").strip() + assert out == expected_output + assert "unrecognized arguments" not in err + + +def test_shell_cmd_alias(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture) -> None: + """test if alias commands call their parent attribute correctly.""" + target_path = absolute_path("_data/tools/info/image.tar") + + # 'dir' and 'ls' should return the same output + dir_out, _ = run_target_shell(monkeypatch, capsys, target_path, "dir") + ls_out, _ = run_target_shell(monkeypatch, capsys, target_path, "ls") + assert dir_out == ls_out + + # ll is not really a standard aliased command so we test that separately. + ls_la_out, _ = run_target_shell(monkeypatch, capsys, target_path, "ls -la") + ll_out, _ = run_target_shell(monkeypatch, capsys, target_path, "ll") + assert ls_la_out == ll_out