Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic cpio filesystem #531

Merged
merged 9 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ tests/_data/volumes/bde/enc-volume.bin filter=lfs diff=lfs merge=lfs -text
tests/_data/volumes/md/md-nested.bin.gz filter=lfs diff=lfs merge=lfs -text
tests/_data/loaders/tar/test-anon-filesystems.tar filter=lfs diff=lfs merge=lfs -text
tests/_data/plugins/apps/browser/firefox/cookies.sqlite filter=lfs diff=lfs merge=lfs -text
tests/_data/plugins/apps/container/docker/docker.tgz filter=lfs diff=lfs merge=lfs -text
tests/_data/loaders/cpio/initrd.img-6.1.0-15-amd64 filter=lfs diff=lfs merge=lfs -text
tests/_data/loaders/cpio/initrd.img-6.1.0-17-amd64 filter=lfs diff=lfs merge=lfs -text
tests/_data/plugins/os/unix/locate/locatedb filter=lfs diff=lfs merge=lfs -text
tests/_data/plugins/os/unix/locate/mlocate.db filter=lfs diff=lfs merge=lfs -text
tests/_data/plugins/os/unix/locate/plocate.db filter=lfs diff=lfs merge=lfs -text
tests/_data/plugins/apps/container/docker/docker.tgz filter=lfs diff=lfs merge=lfs -text
2 changes: 2 additions & 0 deletions dissect/target/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -1588,5 +1588,7 @@ def open_multi_volume(fhs: list[BinaryIO], *args, **kwargs) -> Filesystem:
register("exfat", "ExfatFilesystem")
register("squashfs", "SquashFSFilesystem")
register("zip", "ZipFilesystem")
register("tar", "TarFilesystem")
register("cpio", "CpioFilesystem")
register("ad1", "AD1Filesystem")
register("jffs", "JFFSFilesystem")
18 changes: 18 additions & 0 deletions dissect/target/filesystems/cpio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import BinaryIO, Optional

from dissect.util import cpio

from dissect.target.filesystems.tar import TarFilesystem
from dissect.target.helpers.fsutil import open_decompress


class CpioFilesystem(TarFilesystem):
__type__ = "cpio"

def __init__(self, fh: BinaryIO, base: Optional[str] = None, *args, **kwargs):
super().__init__(open_decompress(fileobj=fh), base, tarinfo=cpio.CpioInfo, *args, **kwargs)

@staticmethod
def _detect(fh: BinaryIO) -> bool:
"""Detect a cpio file on a given file-like object."""
return cpio.detect_header(open_decompress(fileobj=fh)) != cpio.FORMAT_CPIO_UNKNOWN
43 changes: 37 additions & 6 deletions dissect/target/helpers/fsutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
except ImportError:
HAVE_BZ2 = False

try:
import zstandard

HAVE_ZSTD = True
except ImportError:
HAVE_ZSTD = False

Check warning on line 28 in dissect/target/helpers/fsutil.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/helpers/fsutil.py#L27-L28

Added lines #L27 - L28 were not covered by tests

import dissect.target.filesystem as filesystem
from dissect.target.exceptions import FileNotFoundError, SymlinkRecursionError
from dissect.target.helpers.polypath import (
Expand Down Expand Up @@ -445,17 +452,22 @@


def open_decompress(
path: TargetPath,
path: Optional[TargetPath] = None,
mode: str = "rb",
*,
fileobj: Optional[BinaryIO] = None,
encoding: Optional[str] = "UTF-8",
errors: Optional[str] = "backslashreplace",
newline: Optional[str] = None,
) -> Union[BinaryIO, TextIO]:
"""Open and decompress a file. Handles gz and bz2 files. Uncompressed files are opened as-is.
"""Open and decompress a file. Handles gz, bz2 and zstd files. Uncompressed files are opened as-is.

When passing in an already opened ``fileobj``, the mode, encoding, errors and newline arguments are ignored.

Args:
path: The path to the file to open and decompress. It is assumed this path exists.
mode: The mode in which to open the file.
fileobj: The file-like object to open and decompress. This is mutually exclusive with path.
encoding: The decoding for text streams. By default UTF-8 encoding is used.
errors: The error handling for text streams. By default we're more lenient and use ``backslashreplace``.
newline: How newlines are handled for text streams.
Expand All @@ -469,7 +481,17 @@
for line in open_decompress(Path("/dir/file.gz"), "rt"):
print(line)
"""
file = path.open()
if path and fileobj:
raise ValueError("path and fileobj are mutually exclusive")

Check warning on line 485 in dissect/target/helpers/fsutil.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/helpers/fsutil.py#L485

Added line #L485 was not covered by tests

if not path and not fileobj:
raise ValueError("path or fileobj is required")

Check warning on line 488 in dissect/target/helpers/fsutil.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/helpers/fsutil.py#L488

Added line #L488 was not covered by tests

if path:
file = path.open("rb")
else:
file = fileobj

magic = file.read(4)
file.seek(0)

Expand All @@ -480,13 +502,22 @@

if magic[:2] == b"\x1f\x8b":
return gzip.open(file, mode, encoding=encoding, errors=errors, newline=newline)
# In a valid bz2 header the 4th byte is in the range b'1' ... b'9'.
elif HAVE_BZ2 and magic[:3] == b"BZh" and 0x31 <= magic[3] <= 0x39:

if HAVE_BZ2 and magic[:3] == b"BZh" and 0x31 <= magic[3] <= 0x39:
# In a valid bz2 header the 4th byte is in the range b'1' ... b'9'.
return bz2.open(file, mode, encoding=encoding, errors=errors, newline=newline)
else:

if HAVE_ZSTD and magic[:4] in [b"\xfd\x2f\xb5\x28", b"\x28\xb5\x2f\xfd"]:
# stream_reader is not seekable, so we have to resort to the less
# efficient decompressor which returns bytes.
return io.BytesIO(zstandard.decompress(file.read()))

if path:
file.close()
return path.open(mode, encoding=encoding, errors=errors, newline=newline)

return file


def reverse_readlines(fh: TextIO, chunk_size: int = 1024 * 1024 * 8) -> Iterator[str]:
"""Like iterating over a ``TextIO`` file-like object, but starting from the end of the file.
Expand Down
3 changes: 3 additions & 0 deletions tests/_data/loaders/cpio/initrd.img-6.1.0-15-amd64
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/_data/loaders/cpio/initrd.img-6.1.0-17-amd64
Git LFS file not shown
35 changes: 35 additions & 0 deletions tests/filesystems/test_cpio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from pathlib import Path

from dissect.target.filesystems.cpio import CpioFilesystem
from tests._utils import absolute_path


def test_cpio_uncompressed() -> None:
cpio_path = Path(absolute_path("_data/loaders/cpio/initrd.img-6.1.0-17-amd64"))

with cpio_path.open("rb") as fh:
assert CpioFilesystem.detect(fh)

fs = CpioFilesystem(fh)
assert [f.name for f in fs.path("/").iterdir()] == ["kernel"]


def test_cpio_compressed_zstd() -> None:
cpio_path = Path(absolute_path("_data/loaders/cpio/initrd.img-6.1.0-15-amd64"))

with cpio_path.open("rb") as fh:
assert CpioFilesystem.detect(fh)

fs = CpioFilesystem(fh)
assert [f.name for f in fs.path("/").iterdir()] == [
"bin",
"conf",
"etc",
"init",
"lib",
"lib64",
"run",
"sbin",
"scripts",
"usr",
]
Loading