Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor target-shell #812

Merged
merged 24 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4d7e0c7
Refactor reusable functions out of TargetCli class
JSCU-CNI Aug 8, 2024
7a45275
Move functionality from TargetCli to ExtendedCmd
JSCU-CNI Aug 8, 2024
4cacdc7
Add trailing slash to directories in target shell autocomplete
JSCU-CNI Aug 8, 2024
8153fee
Add stat function to target-fs
JSCU-CNI Aug 8, 2024
178b9b7
Merge branch 'main' into improvement/target-shell-fs-refactor
JSCU-CNI Aug 8, 2024
99bb146
Fixes #624 and implements #623
JSCU-CNI Aug 8, 2024
81b2dae
Apply suggestions from code review
JSCU-CNI Aug 8, 2024
112efd3
fix fix
JSCU-CNI Aug 8, 2024
b7f6765
UI/UX improvements for target-shell
JSCU-CNI Aug 12, 2024
2ed01b9
tar and zip files can now be opened inside a target using target-shel…
JSCU-CNI Aug 12, 2024
0d09cdb
Apply suggestions from code review
JSCU-CNI Aug 12, 2024
7b25da4
implement review feedback
JSCU-CNI Aug 12, 2024
5c48aee
Merge branch 'main' into improvement/target-shell-fs-refactor
JSCU-CNI Aug 12, 2024
0777881
use stat.filemode instead of our own func
JSCU-CNI Aug 12, 2024
df9861a
use kwargs instead of _arg
JSCU-CNI Aug 12, 2024
82c3973
Make check_custom_command_execution return a bool or None
JSCU-CNI Aug 12, 2024
bfffee5
implement review feedback
JSCU-CNI Aug 12, 2024
49717fe
Merge branch 'main' into improvement/target-shell-fs-refactor
JSCU-CNI Aug 12, 2024
58810db
add alias tests
JSCU-CNI Aug 12, 2024
6963f99
add clone_alias
JSCU-CNI Aug 12, 2024
57157e0
Add macOS readline compat
JSCU-CNI Aug 13, 2024
cc8e801
fix clone_alias for ExtendedCmd
JSCU-CNI Aug 13, 2024
5b9c9b4
fix linter
JSCU-CNI Aug 13, 2024
5845f28
Merge branch 'main' into improvement/target-shell-fs-refactor
JSCU-CNI Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,19 @@ Opening a shell on a target is straight-forward. You can do so by specifying a p

```bash
target-shell targets/EXAMPLE.vmx
EXAMPLE /> help
WIN-EXAMPLE:/$ help

Documented commands (type help <topic>):
========================================
cat disks filesystems help less python save
cd exit find hexdump ls readlink stat
clear file hash info pwd registry volumes
attr cls enter find info man registry volumes
cat cyber exit hash less pwd save zcat
cd debug file help ll python stat zless
clear disks filesystems hexdump ls readlink tree

EXAMPLE /> ls
WIN-EXAMPLE:/$ ls
$fs$
c:
efi
sysvol
```

Expand Down
4 changes: 4 additions & 0 deletions dissect/target/filesystems/extfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@
st_info.st_mtime_ns = self.entry.mtime_ns
st_info.st_ctime_ns = self.entry.ctime_ns

# Set blocks
st_info.st_blocks = self.entry.inode.i_blocks_lo
st_info.st_blksize = self.entry.extfs.block_size

Check warning on line 139 in dissect/target/filesystems/extfs.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/extfs.py#L138-L139

Added lines #L138 - L139 were not covered by tests

return st_info

def attr(self) -> Any:
Expand Down
12 changes: 8 additions & 4 deletions dissect/target/loaders/tar.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

import logging
import re
import tarfile
from pathlib import Path
from typing import Union

from dissect.target import filesystem, target
from dissect.target.filesystems.tar import (
Expand All @@ -21,22 +22,25 @@
class TarLoader(Loader):
"""Load tar files."""

def __init__(self, path: Union[Path, str], **kwargs):
def __init__(self, path: Path | str, **kwargs):
super().__init__(path)

if isinstance(path, str):
path = Path(path)

if self.is_compressed(path):
log.warning(
f"Tar file {path!r} is compressed, which will affect performance. "
"Consider uncompressing the archive before passing the tar file to Dissect."
)

self.tar = tarfile.open(path)
self.tar = tarfile.open(fileobj=path.open("rb"))

@staticmethod
def detect(path: Path) -> bool:
return path.name.lower().endswith((".tar", ".tar.gz", ".tgz"))

def is_compressed(self, path: Union[Path, str]) -> bool:
def is_compressed(self, path: Path | str) -> bool:
return str(path).lower().endswith((".tar.gz", ".tgz"))

def map(self, target: target.Target) -> None:
Expand Down
12 changes: 6 additions & 6 deletions dissect/target/loaders/velociraptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import zipfile
from pathlib import Path
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING

from dissect.target.loaders.dir import DirLoader, find_dirs, map_dirs
from dissect.target.plugin import OperatingSystem
Expand All @@ -18,7 +18,7 @@
WINDOWS_ACCESSORS = ["mft", "ntfs", "lazy_ntfs", "ntfs_vss", "auto"]


def find_fs_directories(path: Path) -> tuple[Optional[OperatingSystem], Optional[list[Path]]]:
def find_fs_directories(path: Path) -> tuple[OperatingSystem | None, list[Path] | None]:
fs_root = path.joinpath(FILESYSTEMS_ROOT)

# Unix
Expand Down Expand Up @@ -56,7 +56,7 @@ def find_fs_directories(path: Path) -> tuple[Optional[OperatingSystem], Optional
return None, None


def extract_drive_letter(name: str) -> Optional[str]:
def extract_drive_letter(name: str) -> str | None:
# \\.\X: in URL encoding
if len(name) == 14 and name.startswith("%5C%5C.%5C") and name.endswith("%3A"):
return name[10].lower()
Expand Down Expand Up @@ -91,7 +91,7 @@ def __init__(self, path: Path, **kwargs):
f"Velociraptor target {path!r} is compressed, which will slightly affect performance. "
"Consider uncompressing the archive and passing the uncompressed folder to Dissect."
)
self.root = zipfile.Path(path)
self.root = zipfile.Path(path.open("rb"))
else:
self.root = path

Expand All @@ -105,8 +105,8 @@ def detect(path: Path) -> bool:
# results/
# uploads.json
# [...] other files related to the collection
if path.suffix == ".zip": # novermin
path = zipfile.Path(path)
if path.exists() and path.suffix == ".zip": # novermin
path = zipfile.Path(path.open("rb"))

if path.joinpath(FILESYSTEMS_ROOT).exists() and path.joinpath("uploads.json").exists():
_, dirs = find_fs_directories(path)
Expand Down
50 changes: 50 additions & 0 deletions dissect/target/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

See dissect/target/plugins/general/example.py for an example plugin.
"""

from __future__ import annotations

import fnmatch
import functools
import importlib
import importlib.util
import inspect
Expand Down Expand Up @@ -196,6 +198,8 @@
The :func:`internal` decorator and :class:`InternalPlugin` set the ``__internal__`` attribute.
Finally. :func:`args` decorator sets the ``__args__`` attribute.

The :func:`alias` decorator populates the ``__aliases__`` private attribute of :class:`Plugin` methods.

Args:
target: The :class:`~dissect.target.target.Target` object to load the plugin for.
"""
Expand Down Expand Up @@ -448,6 +452,11 @@
exports = []
functions = []

# First pass to resolve aliases
for attr in get_nonprivate_attributes(plugincls):
for alias in getattr(attr, "__aliases__", []):
clone_alias(plugincls, attr, alias)

for attr in get_nonprivate_attributes(plugincls):
if isinstance(attr, property):
attr = attr.fget
Expand Down Expand Up @@ -542,6 +551,47 @@
return decorator


def alias(*args, **kwargs: dict[str, Any]) -> Callable:
"""Decorator to be used on :class:`Plugin` functions to register an alias of that function."""

if not kwargs.get("name") and not args:
raise ValueError("Missing argument 'name'")

Check warning on line 558 in dissect/target/plugin.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugin.py#L558

Added line #L558 was not covered by tests

def decorator(obj: Callable) -> Callable:
if not hasattr(obj, "__aliases__"):
obj.__aliases__ = []

if name := (kwargs.get("name") or args[0]):
obj.__aliases__.append(name)

return obj

return decorator


def clone_alias(cls: type, attr: Callable, alias: str) -> None:
"""Clone the given attribute to an alias in the provided class."""

# Clone the function object
clone = type(attr)(attr.__code__, attr.__globals__, alias, attr.__defaults__, attr.__closure__)
clone.__kwdefaults__ = attr.__kwdefaults__

# Copy some attributes
functools.update_wrapper(clone, attr)
if wrapped := getattr(attr, "__wrapped__", None):
# update_wrapper sets a new wrapper, we want the original
clone.__wrapped__ = wrapped

# Update module path so we can fool inspect.getmodule with subclassed Plugin classes
clone.__module__ = cls.__module__

# Update the names
clone.__name__ = alias
clone.__qualname__ = f"{cls.__name__}.{alias}"

setattr(cls, alias, clone)


def plugins(
osfilter: Optional[type[OSPlugin]] = None,
special_keys: set[str] = set(),
Expand Down
10 changes: 3 additions & 7 deletions dissect/target/plugins/os/unix/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dissect.target.helpers.descriptor_extensions import UserRecordDescriptorExtension
from dissect.target.helpers.fsutil import TargetPath
from dissect.target.helpers.record import UnixUserRecord, create_extended_descriptor
from dissect.target.plugin import Plugin, export, internal
from dissect.target.plugin import Plugin, alias, export, internal

CommandHistoryRecord = create_extended_descriptor([UserRecordDescriptorExtension])(
"unix/history",
Expand Down Expand Up @@ -36,6 +36,7 @@ class CommandHistoryPlugin(Plugin):
("sqlite", ".sqlite_history"),
("zsh", ".zsh_history"),
("ash", ".ash_history"),
("dissect", ".dissect_history"), # wow so meta
)

def __init__(self, target: Target):
Expand All @@ -56,12 +57,7 @@ def _find_history_files(self) -> List[Tuple[str, TargetPath, UnixUserRecord]]:
history_files.append((shell, history_path, user_details.user))
return history_files

@export(record=CommandHistoryRecord)
def bashhistory(self):
"""Deprecated, use commandhistory function."""
self.target.log.warn("Function 'bashhistory' is deprecated, use the 'commandhistory' function instead.")
return self.commandhistory()

@alias("bashhistory")
@export(record=CommandHistoryRecord)
def commandhistory(self):
"""Return shell history for all users.
Expand Down
2 changes: 1 addition & 1 deletion dissect/target/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(self, path: Union[str, Path] = None):
self._applied = False

try:
self._config = config.load([self.path, os.getcwd()])
self._config = config.load([self.path, Path.cwd(), Path.home()])
except Exception as e:
self.log.warning("Error loading config file: %s", self.path)
self.log.debug("", exc_info=e)
Expand Down
90 changes: 25 additions & 65 deletions dissect/target/tools/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
# -*- coding: utf-8 -*-

import argparse
import datetime
import logging
import operator
import os
import pathlib
import shutil
Expand All @@ -13,7 +11,7 @@
from dissect.target import Target
from dissect.target.exceptions import TargetError
from dissect.target.helpers.fsutil import TargetPath
from dissect.target.tools.shell import stat_modestr
from dissect.target.tools.fsutils import print_ls, print_stat
from dissect.target.tools.utils import (
catch_sigpipe,
configure_generic_arguments,
Expand All @@ -25,75 +23,27 @@
logging.raiseExceptions = False


def human_size(bytes: int, units: list[str] = ["", "K", "M", "G", "T", "P", "E"]) -> str:
"""Helper function to return the human readable string representation of bytes."""
return str(bytes) + units[0] if bytes < 1024 else human_size(bytes >> 10, units[1:])


def ls(t: Target, path: TargetPath, args: argparse.Namespace) -> None:
if args.use_ctime and args.use_atime:
log.error("Can't specify -c and -u at the same time")
return
if not path or not path.exists():
return

_print_ls(args, path, 0)


def _print_ls(args: argparse.Namespace, path: TargetPath, depth: int) -> None:
subdirs = []

if path.is_dir():
contents = sorted(path.iterdir(), key=operator.attrgetter("name"))
elif path.is_file():
contents = [path]

if depth > 0:
print(f"\n{str(path)}:")

if not args.l:
for entry in contents:
print(entry.name)

if entry.is_dir():
subdirs.append(entry)
else:
if len(contents) > 1:
print(f"total {len(contents)}")

for entry in contents:
_print_extensive_file_stat(args, entry, entry.name)

if entry.is_dir():
subdirs.append(entry)

if args.recursive and subdirs:
for subdir in subdirs:
_print_ls(args, subdir, depth + 1)


def _print_extensive_file_stat(args: argparse.Namespace, path: TargetPath, name: str) -> None:
try:
entry = path.get()
stat = entry.lstat()
symlink = f" -> {entry.readlink()}" if entry.is_symlink() else ""
show_time = stat.st_mtime

if args.use_ctime:
show_time = stat.st_ctime
elif args.use_atime:
show_time = stat.st_atime

utc_time = datetime.datetime.utcfromtimestamp(show_time).isoformat()

if args.human_readable:
size = human_size(stat.st_size)
else:
size = stat.st_size

print(f"{stat_modestr(stat)} {stat.st_uid:4d} {stat.st_gid:4d} {size:>6s} {utc_time} {name}{symlink}")
except FileNotFoundError:
print(f"?????????? ? ? ? ????-??-??T??:??:??.?????? {name}")
# Only output with colors if stdout is a tty
use_colors = sys.stdout.buffer.isatty()

Check warning on line 34 in dissect/target/tools/fs.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/tools/fs.py#L34

Added line #L34 was not covered by tests

print_ls(

Check warning on line 36 in dissect/target/tools/fs.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/tools/fs.py#L36

Added line #L36 was not covered by tests
path,
0,
sys.stdout,
args.l,
args.human_readable,
args.recursive,
args.use_ctime,
args.use_atime,
use_colors,
)


def cat(t: Target, path: TargetPath, args: argparse.Namespace) -> None:
Expand All @@ -120,6 +70,12 @@
print("[!] Failed, unsuported file type: %s" % path)


def stat(t: Target, path: TargetPath, args: argparse.Namespace) -> None:
if not path or not path.exists():
return
print_stat(path, sys.stdout, args.dereference)

Check warning on line 76 in dissect/target/tools/fs.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/tools/fs.py#L74-L76

Added lines #L74 - L76 were not covered by tests


def _extract_path(path: TargetPath, output_path: str) -> None:
print("%s -> %s" % (path, output_path))

Expand Down Expand Up @@ -172,6 +128,10 @@
parser_cat = subparsers.add_parser("cat", help="dump file contents", parents=[baseparser])
parser_cat.set_defaults(handler=cat)

parser_stat = subparsers.add_parser("stat", help="display file status", parents=[baseparser])
parser_stat.add_argument("-L", "--dereference", action="store_true")
parser_stat.set_defaults(handler=stat)

Check warning on line 133 in dissect/target/tools/fs.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/tools/fs.py#L131-L133

Added lines #L131 - L133 were not covered by tests

parser_find = subparsers.add_parser("walk", help="perform a walk", parents=[baseparser])
parser_find.set_defaults(handler=walk)

Expand Down
Loading