Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add symlink support to ZipFilesystem #808

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 81 additions & 46 deletions dissect/target/filesystems/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@
import stat
import zipfile
from datetime import datetime, timezone
from typing import BinaryIO, Optional
from typing import BinaryIO, Iterator

from dissect.util.stream import BufferedStream

from dissect.target.exceptions import FileNotFoundError
from dissect.target.exceptions import (
FileNotFoundError,
FilesystemError,
IsADirectoryError,
NotADirectoryError,
NotASymlinkError,
)
from dissect.target.filesystem import (
Filesystem,
FilesystemEntry,
VirtualDirectory,
VirtualFile,
VirtualFilesystem,
)
from dissect.target.helpers import fsutil
Expand All @@ -33,7 +38,7 @@
def __init__(
self,
fh: BinaryIO,
base: Optional[str] = None,
base: str | None = None,
*args,
**kwargs,
):
Expand All @@ -52,12 +57,7 @@
continue

rel_name = fsutil.normpath(mname[len(self.base) :], alt_separator=self.alt_separator)

# NOTE: Normally we would check here if the member is a symlink or not

entry_cls = ZipFilesystemDirectoryEntry if member.is_dir() else ZipFilesystemEntry
file_entry = entry_cls(self, rel_name, member)
self._fs.map_file_entry(rel_name, file_entry)
self._fs.map_file_entry(rel_name, ZipFilesystemEntry(self, rel_name, member))

@staticmethod
def _detect(fh: BinaryIO) -> bool:
Expand All @@ -69,60 +69,95 @@
return self._fs.get(path, relentry=relentry)


class ZipFilesystemEntry(VirtualFile):
# Note: We subclass from VirtualDirectory because VirtualFilesystem is currently only compatible with VirtualDirectory
# Subclass from VirtualDirectory so we get that compatibility for free, and override the rest to do our own thing
class ZipFilesystemEntry(VirtualDirectory):
fs: ZipFilesystem
entry: zipfile.ZipInfo

def __init__(self, fs: ZipFilesystem, path: str, entry: zipfile.ZipInfo):
super().__init__(fs, path)
self.entry = entry

def open(self) -> BinaryIO:
"""Returns file handle (file-like object)."""
if self.is_dir():
raise IsADirectoryError(self.path)

if self.is_symlink():
return self._resolve().open()

try:
return BufferedStream(self.fs.zip.open(self.entry), size=self.entry.file_size)
except Exception:
raise FileNotFoundError()
raise FileNotFoundError(self.path)

Check warning on line 92 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L92

Added line #L92 was not covered by tests

def readlink(self) -> str:
"""Read the link if this entry is a symlink. Returns a string."""
raise NotImplementedError()
def iterdir(self) -> Iterator[str]:
if not self.is_dir():
raise NotADirectoryError(self.path)

def readlink_ext(self) -> FilesystemEntry:
"""Read the link if this entry is a symlink. Returns a filesystem entry."""
raise NotImplementedError()
entry = self._resolve()
if isinstance(entry, ZipFilesystemEntry):
yield from super(ZipFilesystemEntry, entry).iterdir()
else:
yield from entry.iterdir()

Check warning on line 102 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L102

Added line #L102 was not covered by tests

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
"""Return the stat information of this entry."""
return self.lstat()
def scandir(self) -> Iterator[FilesystemEntry]:
if not self.is_dir():
raise NotADirectoryError(self.path)

def lstat(self) -> fsutil.stat_result:
"""Return the stat information of the given path, without resolving links."""
# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime']
return fsutil.stat_result(
[
stat.S_IFREG | 0o777,
self.entry.header_offset,
id(self.fs),
1,
0,
0,
self.entry.file_size,
0,
datetime(*self.entry.date_time, tzinfo=timezone.utc).timestamp(),
0,
]
)
entry = self._resolve()
if isinstance(entry, ZipFilesystemEntry):
yield from super(ZipFilesystemEntry, entry).scandir()
else:
yield from entry.scandir()

def is_dir(self, follow_symlinks: bool = True) -> bool:
try:
entry = self._resolve(follow_symlinks=follow_symlinks)
except FilesystemError:
return False

Check warning on line 118 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L117-L118

Added lines #L117 - L118 were not covered by tests

class ZipFilesystemDirectoryEntry(VirtualDirectory):
def __init__(self, fs: ZipFilesystem, path: str, entry: zipfile.ZipInfo):
super().__init__(fs, path)
self.entry = entry
if isinstance(entry, ZipFilesystemEntry):
return entry.entry.is_dir()
return isinstance(entry, VirtualDirectory)

def is_file(self, follow_symlinks: bool = True) -> bool:
try:
entry = self._resolve(follow_symlinks=follow_symlinks)
except FilesystemError:
return False

Check warning on line 128 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L127-L128

Added lines #L127 - L128 were not covered by tests

if isinstance(entry, ZipFilesystemEntry):
return not entry.entry.is_dir()
return False

def is_symlink(self) -> bool:
return stat.S_ISLNK(self.entry.external_attr >> 16)

def readlink(self) -> str:
if not self.is_symlink():
raise NotASymlinkError()
return self.fs.zip.open(self.entry).read().decode()

def readlink_ext(self) -> FilesystemEntry:
return FilesystemEntry.readlink_ext(self)

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
"""Return the stat information of this entry."""
return self.lstat()
return self._resolve(follow_symlinks=follow_symlinks).lstat()

def lstat(self) -> fsutil.stat_result:
"""Return the stat information of the given path, without resolving links."""
# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime']
mode = self.entry.external_attr >> 16

if self.entry.is_dir() and not stat.S_ISDIR(mode):
mode = stat.S_IFDIR | mode

Check warning on line 154 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L154

Added line #L154 was not covered by tests
elif not self.entry.is_dir() and not stat.S_ISREG(mode):
mode = stat.S_IFREG | mode

return fsutil.stat_result(
[
stat.S_IFDIR | 0o777,
mode,
self.entry.header_offset,
id(self.fs),
1,
Expand Down
73 changes: 58 additions & 15 deletions tests/filesystems/test_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@

import pytest

from dissect.target.exceptions import (
IsADirectoryError,
NotADirectoryError,
NotASymlinkError,
)
from dissect.target.filesystems.zip import ZipFilesystem, ZipFilesystemEntry


def _mkdir(zf, name):
def _mkdir(zf: zipfile.ZipFile, name: str) -> None:
# There's no easy way to make a directory with Python zipfile, so copy what zipfile.py does
zinfo = zipfile.ZipInfo(name)
zinfo.compress_size = 0
Expand All @@ -32,7 +37,7 @@ def _mkdir(zf, name):
zf.start_dir = zf.fp.tell()


def _create_zip(prefix="", zip_dir=True):
def _create_zip(prefix: str = "", zip_dir: bool = True) -> io.BytesIO:
buf = io.BytesIO()
zf = zipfile.ZipFile(buf, "w")

Expand All @@ -51,34 +56,42 @@ def _create_zip(prefix="", zip_dir=True):
for i in range(100):
zf.writestr(f"{prefix}dir/{i}", f"contents {i}")

symlink = zipfile.ZipInfo(f"{prefix}symlink_dir")
symlink.external_attr = 0o120777 << 16
zf.writestr(symlink, "dir/")

symlink = zipfile.ZipInfo(f"{prefix}symlink_file")
symlink.external_attr = 0o120777 << 16
zf.writestr(symlink, "file_1")

zf.close()
buf.seek(0)
return buf


@pytest.fixture
def zip_simple():
yield _create_zip()
def zip_simple() -> io.BytesIO:
return _create_zip()


@pytest.fixture
def zip_base():
yield _create_zip("base/")
def zip_base() -> io.BytesIO:
return _create_zip("base/")


@pytest.fixture
def zip_relative():
yield _create_zip("./", False)
def zip_relative() -> io.BytesIO:
return _create_zip("./", False)


@pytest.fixture
def zip_relative_dir():
yield _create_zip("./")
def zip_relative_dir() -> io.BytesIO:
return _create_zip("./")


@pytest.fixture
def zip_virtual_dir():
yield _create_zip("", False)
def zip_virtual_dir() -> io.BytesIO:
return _create_zip("", False)


@pytest.mark.parametrize(
Expand All @@ -91,38 +104,68 @@ def zip_virtual_dir():
("zip_virtual_dir", ""),
],
)
def test_filesystems_zip(obj, base, request):
def test_filesystems_zip(obj: str, base: str, request: pytest.FixtureRequest) -> None:
fh = request.getfixturevalue(obj)

assert ZipFilesystem.detect(fh)

fs = ZipFilesystem(fh, base)
assert isinstance(fs, ZipFilesystem)

assert len(fs.listdir("/")) == 3
assert len(fs.listdir("/")) == 5

assert fs.get("./file_1").open().read() == b"file 1 contents"
assert fs.get("./file_2").open().read() == b"file 2 contents"
assert fs.get("./symlink_file").open().read() == b"file 1 contents"
assert len(list(fs.glob("./dir/*"))) == 100
assert len(list(fs.glob("./symlink_dir/*"))) == 100

zfile = fs.get("./file_1")
zdir = fs.get("./dir")
zsymd = fs.get("./symlink_dir")
zsymf = fs.get("./symlink_file")

assert zfile.is_file()
assert not zfile.is_dir()
assert not zfile.is_symlink()

with pytest.raises(NotADirectoryError):
list(zfile.iterdir())

with pytest.raises(NotADirectoryError):
next(zfile.scandir())

with pytest.raises(NotASymlinkError):
zfile.readlink()

assert zdir.is_dir()
assert not zdir.is_file()
assert not zdir.is_symlink()
assert len(list(zdir.iterdir())) == 100
assert len(list(zdir.scandir())) == 100

with pytest.raises(IsADirectoryError):
zdir.open()

assert zsymd.is_dir()
assert not zsymd.is_file()
assert zsymd.is_symlink()
assert zsymd.readlink() == "dir/"

assert not zsymf.is_dir()
assert zsymf.is_file()
assert zsymf.is_symlink()
assert zsymf.readlink() == "file_1"

file1 = zdir.get("1")
assert file1.is_file()
assert not file1.is_dir()
assert not file1.is_symlink()
assert file1.open().read() == b"contents 1"

assert zfile.stat().st_mode == 0o100777
assert file1.stat() == zsymd.get("1").stat()

assert zfile.stat().st_mode == 0o100600
assert zfile.stat(follow_symlinks=False) == zfile.lstat()

if isinstance(zdir, ZipFilesystemEntry):
Expand Down