diff --git a/dissect/target/helpers/configutil.py b/dissect/target/helpers/configutil.py index 2d0ec4b34..f0993cd03 100644 --- a/dissect/target/helpers/configutil.py +++ b/dissect/target/helpers/configutil.py @@ -20,6 +20,8 @@ Union, ) +from defusedxml import ElementTree + from dissect.target.exceptions import ConfigurationParsingError, FileNotFoundError from dissect.target.filesystem import FilesystemEntry from dissect.target.helpers.fsutil import TargetPath @@ -254,6 +256,79 @@ def parse_file(self, fh: TextIO) -> None: self.parsed_data = {"content": fh.read(), "size": str(fh.tell())} +class Xml(ConfigurationParser): + """Parses an XML file. Ignores any constructor parameters passed from ``ConfigurationParser`.""" + + def _tree(self, tree: ElementTree, root: bool = False) -> dict: + """Very simple but robust xml -> dict implementation, see comments.""" + nodes = {} + result = {} + counter = {} + + # each node is a folder (so the structure is always the same! [1]) + for node in tree.findall("*"): + # if a node contains multiple nodes with the same name, number them + if node.tag in counter: + counter[node.tag] += 1 + nodes[f"{node.tag}-{counter[node.tag]}"] = self._tree(node) + else: + counter[node.tag] = 1 + nodes[node.tag] = self._tree(node) + + # all attribs go in the attribute folder + # (i.e. stable, does not change depending on xml structure! [2] + # Also, this way we "know" they have been attributes, i.e. we don't lose information! [3] + if tree.attrib: + result["attributes"] = tree.attrib + + # all subnodes go in the nodes folder + if nodes: + result["nodes"] = nodes + + # content goes into the text folder + # we don't use special prefixes ($) because XML docs may use them anyway (even though they are forbidden) + if tree.text: + if text := tree.text.strip(" \n\r"): + result["text"] = text + + # if you need to store meta-data, you can extend add more entries here... CDATA, Comments, errors + result = {tree.tag: result} if root else result + return result + + def _fix(self, content: str, position: tuple(int, int)) -> str: + """Quick heuristic fix. If there is an invalid token, just remove it.""" + lineno, offset = position + lines = content.split("\n") + + line = lines[lineno - 1] + line = line[: offset - 1] + "" + line[offset + 1 :] + + lines[lineno - 1] = line + + return "\n".join(lines) + + def parse_file(self, fh: TextIO) -> None: + content = fh.read() + document = content + errors = 0 + limit = 20 + tree = {} + + while not tree and errors < limit: + try: + tree = self._tree(ElementTree.fromstring(document), root=True) + break + except ElementTree.ParseError as err: + errors += 1 + document = self._fix(document, err.position) + + if not tree: + # Error limit reached. Thus we consider the document not parseable. + raise ConfigurationParsingError(f"Could not parse XML file: {fh.name} after {errors} attempts.") + + self.parsed_data = tree + + class ScopeManager: """A (context)manager for dictionary scoping. @@ -528,11 +603,12 @@ def create_parser(self, options: Optional[ParserOptions] = None) -> Configuratio "*/systemd/*": ParserConfig(SystemD), "*/sysconfig/network-scripts/ifcfg-*": ParserConfig(Default), "*/sysctl.d/*.conf": ParserConfig(Default), + "*/xml/*": ParserConfig(Xml), } CONFIG_MAP: dict[tuple[str, ...], ParserConfig] = { "ini": ParserConfig(Ini), - "xml": ParserConfig(Txt), + "xml": ParserConfig(Xml), "json": ParserConfig(Txt), "cnf": ParserConfig(Default), "conf": ParserConfig(Default, separator=(r"\s",)), @@ -549,6 +625,7 @@ def create_parser(self, options: Optional[ParserOptions] = None) -> Configuratio "hosts": ParserConfig(Default, separator=(r"\s",)), "nsswitch.conf": ParserConfig(Default, separator=(":",)), "lsb-release": ParserConfig(Default), + "catalog": ParserConfig(Xml), } diff --git a/tests/_data/helpers/configutil/test.xml b/tests/_data/helpers/configutil/test.xml new file mode 100644 index 000000000..15500dea4 --- /dev/null +++ b/tests/_data/helpers/configutil/test.xml @@ -0,0 +1,18 @@ + + + a + b + + + + + + + + + diff --git a/tests/filesystems/test_config.py b/tests/filesystems/test_config.py index d201afefa..2041ab2dc 100644 --- a/tests/filesystems/test_config.py +++ b/tests/filesystems/test_config.py @@ -73,6 +73,61 @@ def mapped_file(test_file: str, fs_unix: VirtualFilesystem) -> VirtualFilesystem }, }, ), + ( + "_data/helpers/configutil/test.xml", + { + "Server": { + "attributes": {"port": "8005", "shutdown": "SHUTDOWN"}, + "nodes": { + "Listener": { + "attributes": {"className": "org.apache.catalina.core.JasperListener1"}, + "text": "a", + }, + "Listener-2": { + "attributes": {"className": "org.apache.catalina.core.JasperListener2"}, + "text": "b", + }, + "Service": { + "attributes": {"name": "Catalina"}, + "nodes": { + "Connector": { + "attributes": { + "port": "8080", + "protocol": "HTTP/1.1", + "connectionTimeout": "20000", + "redirectPort": "8443", + }, + }, + "Engine": { + "attributes": {"name": "Catalina", "defaultHost": "localhost"}, + "nodes": { + "Host": { + "attributes": { + "name": "localhost", + "appBase": "webapps", + "unpackWARs": "true", + "autoDeploy": "true", + }, + "nodes": { + "Valve": { + "attributes": { + "className": "org.apache.catalina.valves.AccessLogValve", + "directory": "logs", + "prefix": "localhost_access_log.", + "suffix": ".txt", + "pattern": "%h %l %u %t s", + }, + } + }, + } + }, + }, + }, + }, + }, + }, + }, + ), ], ) def test_parse_file_input(target_unix: Target, mapped_file: str, expected_output: dict) -> None: diff --git a/tests/plugins/general/test_config.py b/tests/plugins/general/test_config.py index 0954da08b..d5cf0d63c 100644 --- a/tests/plugins/general/test_config.py +++ b/tests/plugins/general/test_config.py @@ -68,7 +68,7 @@ def test_collapse_types( "hint, data_bytes", [ ("ini", b"[DEFAULT]\nkey=value"), - ("xml", b"currently_just_text"), + ("xml", b"currently_just_text"), ("json", b"currently_just_text"), ("cnf", b"key=value"), ("conf", b"key value"),