diff --git a/dissect/target/plugins/os/unix/log/journal.py b/dissect/target/plugins/os/unix/log/journal.py index 4c9e3b05e..278b35026 100644 --- a/dissect/target/plugins/os/unix/log/journal.py +++ b/dissect/target/plugins/os/unix/log/journal.py @@ -1,7 +1,8 @@ from __future__ import annotations +import logging import lzma -from typing import BinaryIO, Callable, Iterator +from typing import Any, BinaryIO, Callable, Iterator import zstandard from dissect.cstruct import cstruct @@ -13,6 +14,8 @@ from dissect.target.helpers.record import TargetRecordDescriptor from dissect.target.plugin import Plugin, export +log = logging.getLogger(__name__) + # The events have undocumented fields that are not part of the record JournalRecord = TargetRecordDescriptor( "linux/log/journal", @@ -28,7 +31,7 @@ ("varint", "errno"), ("string", "invocation_id"), ("string", "user_invocation_id"), - ("varint", "syslog_facility"), + ("string", "syslog_facility"), ("string", "syslog_identifier"), ("varint", "syslog_pid"), ("string", "syslog_raw"), @@ -70,11 +73,13 @@ ("path", "udev_devlink"), # Other fields ("string", "journal_hostname"), - ("path", "filepath"), + ("path", "source"), ], ) journal_def = """ +#define HEADER_SIGNATURE b"LPKSHHRH" + typedef uint8 uint8_t; typedef uint32 le32_t; typedef uint64 le64_t; @@ -100,7 +105,7 @@ }; struct Header { - uint8_t signature[8]; + char signature[8]; le32_t compatible_flags; IncompatibleFlag incompatible_flags; State state; @@ -165,7 +170,7 @@ // The first four members are copied from ObjectHeader, so that the size can be used as the length of payload struct DataObject { - ObjectType type; + // ObjectType type; ObjectFlag flags; uint8_t reserved[6]; le64_t size; @@ -181,7 +186,7 @@ // If the HEADER_INCOMPATIBLE_COMPACT flag is set, two extra fields are stored to allow immediate access // to the tail entry array in the DATA object's entry array chain. struct DataObject_Compact { - ObjectType type; + // ObjectType type; ObjectFlag flags; uint8_t reserved[6]; le64_t size; @@ -236,7 +241,7 @@ // The first four members are copied from from ObjectHeader, so that the size can be used as the length of entry_object_offsets struct EntryArrayObject { - ObjectType type; + // ObjectType type; uint8_t flags; uint8_t reserved[6]; le64_t size; @@ -245,7 +250,7 @@ }; struct EntryArrayObject_Compact { - ObjectType type; + // ObjectType type; uint8_t flags; uint8_t reserved[6]; le64_t size; @@ -257,9 +262,19 @@ c_journal = cstruct().load(journal_def) -def get_optional(value: str, to_type: Callable): +def get_optional(value: str, to_type: Callable) -> Any | None: """Return the value if True, otherwise return None.""" - return to_type(value) if value else None + + if not value: + return None + + try: + return to_type(value) + + except ValueError as e: + log.error("Unable to cast '%s' to %s", value, to_type) + log.debug("", exc_info=e) + return None class JournalFile: @@ -273,136 +288,138 @@ class JournalFile: def __init__(self, fh: BinaryIO, target: Target): self.fh = fh self.target = target - self.header = c_journal.Header(self.fh) - self.signature = "".join(chr(c) for c in self.header.signature) - self.entry_array_offset = self.header.entry_array_offset - def entry_object_offsets(self) -> Iterator[int]: - """Read object entry arrays.""" + try: + self.header = c_journal.Header(self.fh) + except EOFError as e: + raise ValueError(f"Invalid systemd Journal file: {e}") - offset = self.entry_array_offset - - # Entry Array with next_entry_array_offset set to 0 is the last in the list - while offset != 0: - self.fh.seek(offset) - - object = c_journal.ObjectHeader(self.fh) - - if object.type == c_journal.ObjectType.OBJECT_ENTRY_ARRAY: - # After the object is checked, read again but with EntryArrayObject instead of ObjectHeader - self.fh.seek(offset) - - if self.header.incompatible_flags & c_journal.IncompatibleFlag.HEADER_INCOMPATIBLE_COMPACT: - entry_array_object = c_journal.EntryArrayObject_Compact(self.fh) - else: - entry_array_object = c_journal.EntryArrayObject(self.fh) - - for entry_object_offset in entry_array_object.entry_object_offsets: - # Check if the offset is not zero and points to nothing - if entry_object_offset: - yield entry_object_offset - - offset = entry_array_object.next_entry_array_offset + if self.header.signature != c_journal.HEADER_SIGNATURE: + raise ValueError(f"Journal file has invalid magic header: {self.header.signature!r}'") def decode_value(self, value: bytes) -> tuple[str, str]: - value = value.decode(encoding="utf-8", errors="surrogateescape").strip() - - # Strip leading underscores part of the field name - value = value.lstrip("_") - + """Decode the given bytes to a key value pair.""" + value = value.decode(errors="surrogateescape").strip().lstrip("_") key, value = value.split("=", 1) key = key.lower() - return key, value def __iter__(self) -> Iterator[dict[str, int | str]]: "Iterate over the entry objects to read payloads." - for offset in self.entry_object_offsets(): + offset = self.header.entry_array_offset + while offset != 0: self.fh.seek(offset) + if self.fh.read(1)[0] != c_journal.ObjectType.OBJECT_ENTRY_ARRAY: + raise ValueError(f"Expected OBJECT_ENTRY_ARRAY at offset {offset}") + + if self.header.incompatible_flags & c_journal.IncompatibleFlag.HEADER_INCOMPATIBLE_COMPACT: + entry_array_object = c_journal.EntryArrayObject_Compact(self.fh) + else: + entry_array_object = c_journal.EntryArrayObject(self.fh) + + for entry_object_offset in entry_array_object.entry_object_offsets: + if entry_object_offset: + yield from self._parse_entry_object(offset=entry_object_offset) + + offset = entry_array_object.next_entry_array_offset + + def _parse_entry_object(self, offset: int) -> Iterator[dict]: + self.fh.seek(offset) + + try: if self.header.incompatible_flags & c_journal.IncompatibleFlag.HEADER_INCOMPATIBLE_COMPACT: entry = c_journal.EntryObject_Compact(self.fh) else: entry = c_journal.EntryObject(self.fh) - event = {} - event["ts"] = ts.from_unix_us(entry.realtime) + except EOFError as e: + self.target.log.warning("Unable to read Journal EntryObject at offset %s in: %s", offset, self.fh) + self.target.log.debug("", exc_info=e) + return - for item in entry.items: - try: - self.fh.seek(item.object_offset) + event = {"ts": ts.from_unix_us(entry.realtime)} + for item in entry.items: + try: + self.fh.seek(item.object_offset) - object = c_journal.ObjectHeader(self.fh) + if self.fh.read(1)[0] != c_journal.ObjectType.OBJECT_DATA: + continue - if object.type == c_journal.ObjectType.OBJECT_DATA: - self.fh.seek(item.object_offset) + if self.header.incompatible_flags & c_journal.IncompatibleFlag.HEADER_INCOMPATIBLE_COMPACT: + data_object = c_journal.DataObject_Compact(self.fh) + else: + data_object = c_journal.DataObject(self.fh) - if self.header.incompatible_flags & c_journal.IncompatibleFlag.HEADER_INCOMPATIBLE_COMPACT: - data_object = c_journal.DataObject_Compact(self.fh) - else: - data_object = c_journal.DataObject(self.fh) + if not data_object.payload: + continue - data = data_object.payload + data = data_object.payload - if not data: - # If the payload is empty - continue - elif data_object.flags & c_journal.ObjectFlag.OBJECT_COMPRESSED_XZ: - data = lzma.decompress(data) - elif data_object.flags & c_journal.ObjectFlag.OBJECT_COMPRESSED_LZ4: - data = lz4.decompress(data[8:]) - elif data_object.flags & c_journal.ObjectFlag.OBJECT_COMPRESSED_ZSTD: - data = zstandard.decompress(data) + if data_object.flags & c_journal.ObjectFlag.OBJECT_COMPRESSED_XZ: + data = lzma.decompress(data) - key, value = self.decode_value(data) - event[key] = value + elif data_object.flags & c_journal.ObjectFlag.OBJECT_COMPRESSED_LZ4: + data = lz4.decompress(data[8:]) - except Exception as e: - self.target.log.warning( - "The data object in Journal file %s could not be parsed", - getattr(self.fh, "name", None), - exc_info=e, - ) - continue + elif data_object.flags & c_journal.ObjectFlag.OBJECT_COMPRESSED_ZSTD: + data = zstandard.decompress(data) + + key, value = self.decode_value(data) + event[key] = value + + except Exception as e: + self.target.log.warning( + "Journal DataObject could not be parsed at offset %s in %s", + item.object_offset, + getattr(self.fh, "name", None), + ) + self.target.log.debug("", exc_info=e) + continue - yield event + yield event class JournalPlugin(Plugin): + """Systemd Journal plugin.""" + JOURNAL_PATHS = ["/var/log/journal"] # TODO: /run/systemd/journal JOURNAL_GLOB = "*/*.journal*" # The extensions .journal and .journal~ - JOURNAL_SIGNATURE = "LPKSHHRH" def __init__(self, target: Target): super().__init__(target) - self.journal_paths = [] + self.journal_files = [] - for _path in self.JOURNAL_PATHS: - self.journal_paths.extend(self.target.fs.path(_path).glob(self.JOURNAL_GLOB)) + for journal_path in self.JOURNAL_PATHS: + self.journal_files.extend(self.target.fs.path(journal_path).glob(self.JOURNAL_GLOB)) def check_compatible(self) -> None: - if not len(self.journal_paths): + if not self.journal_files: raise UnsupportedPluginError("No journald files found") @export(record=JournalRecord) def journal(self) -> Iterator[JournalRecord]: - """Return the content of Systemd Journal log files. + """Return the contents of Systemd Journal log files. References: - https://wiki.archlinux.org/title/Systemd/Journal - https://github.com/systemd/systemd/blob/9203abf79f1d05fdef9b039e7addf9fc5a27752d/man/systemd.journal-fields.xml """ # noqa: E501 - path_function = self.target.fs.path - for _path in self.journal_paths: - fh = _path.open() + for journal_file in self.journal_files: + if not journal_file.is_file(): + self.target.log.warning("Unable to parse journal file as it is not a file: %s", journal_file) + continue - journal = JournalFile(fh, self.target) + try: + fh = journal_file.open() + journal = JournalFile(fh, self.target) - if not journal.signature == self.JOURNAL_SIGNATURE: - self.target.log.warning("The Journal log file %s has an invalid magic header", _path) + except Exception as e: + self.target.log.warning("Unable to parse journal file structure: %s: %s", journal_file, str(e)) + self.target.log.debug("", exc_info=e) continue for entry in journal: @@ -417,7 +434,7 @@ def journal(self) -> Iterator[JournalRecord]: errno=get_optional(entry.get("errno"), int), invocation_id=entry.get("invocation_id"), user_invocation_id=entry.get("user_invocation_id"), - syslog_facility=get_optional(entry.get("syslog_facility"), int), + syslog_facility=entry.get("syslog_facility"), syslog_identifier=entry.get("syslog_identifier"), syslog_pid=get_optional(entry.get("syslog_pid"), int), syslog_raw=entry.get("syslog_raw"), @@ -456,6 +473,6 @@ def journal(self) -> Iterator[JournalRecord]: udev_devnode=get_optional(entry.get("udev_devnode"), path_function), udev_devlink=get_optional(entry.get("udev_devlink"), path_function), journal_hostname=entry.get("hostname"), - filepath=_path, + source=journal_file, _target=self.target, ) diff --git a/tests/plugins/os/unix/test_journal.py b/tests/plugins/os/unix/log/test_journal.py similarity index 50% rename from tests/plugins/os/unix/test_journal.py rename to tests/plugins/os/unix/log/test_journal.py index 883be854f..82675bb2f 100644 --- a/tests/plugins/os/unix/test_journal.py +++ b/tests/plugins/os/unix/log/test_journal.py @@ -1,27 +1,30 @@ from flow.record.fieldtypes import datetime as dt +from dissect.target.filesystem import VirtualFilesystem from dissect.target.plugins.os.unix.log.journal import JournalPlugin +from dissect.target.target import Target from tests._utils import absolute_path -def test_journal_plugin(target_unix, fs_unix): +def test_journal_plugin(target_unix: Target, fs_unix: VirtualFilesystem) -> None: + """test linux systemd journal file parsing.""" + data_file = absolute_path("_data/plugins/os/unix/log/journal/journal") fs_unix.map_file("var/log/journal/1337/user-1000.journal", data_file) - target_unix.add_plugin(JournalPlugin) results = list(target_unix.journal()) - record = results[0] - assert len(results) == 2 + record = results[0] assert record.ts == dt("2023-05-19T16:22:38.841870+00:00") - assert ( - record.message - == "Window manager warning: last_user_time (928062) is greater than comparison timestamp (928031). This most likely represents a buggy client sending inaccurate timestamps in messages such as _NET_ACTIVE_WINDOW. Trying to work around..." # noqa: E501 + assert record.message == ( + "Window manager warning: last_user_time (928062) is greater than comparison timestamp (928031). " + "This most likely represents a buggy client sending inaccurate timestamps in messages such as " + "_NET_ACTIVE_WINDOW. Trying to work around..." ) - assert record.syslog_facility == 3 + assert record.syslog_facility == "3" assert record.syslog_identifier == "gnome-shell" assert record.pid == 2096 assert record.transport == "stdout" - assert str(record.filepath) == "/var/log/journal/1337/user-1000.journal" + assert record.source == "/var/log/journal/1337/user-1000.journal"