From 2f51dae5d77cbbc542dd668e3102db9e4a987739 Mon Sep 17 00:00:00 2001 From: Poeloe <22234727+Poeloe@users.noreply.github.com> Date: Mon, 19 Feb 2024 03:48:57 -0800 Subject: [PATCH 1/2] Fix bug in WER plugin caused by special characters in field name (DIS-2507) --- dissect/target/plugins/os/windows/wer.py | 11 ++++++++--- .../_data/plugins/os/windows/wer/wer_test.wer | Bin 1450 -> 1556 bytes 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dissect/target/plugins/os/windows/wer.py b/dissect/target/plugins/os/windows/wer.py index f5c6f73e4..79629d3a4 100644 --- a/dissect/target/plugins/os/windows/wer.py +++ b/dissect/target/plugins/os/windows/wer.py @@ -49,7 +49,7 @@ def _collect_wer_data(wer_file: Path) -> tuple[list[tuple[str, str]], dict[str, record_type = "datetime" key = "ts" - key = _key_to_snake_case(key if key else name) + key = _sanitize_key(key if key else name) record_values[key] = value record_fields.append((record_type, key)) if key != "ts" else record_fields.insert(0, (record_type, key)) @@ -70,7 +70,7 @@ def _collect_wer_metadata(metadata_xml_file: Path) -> tuple[list[tuple[str, str] for category in metadata: for value in category: if record_value := value.text.strip("\t\n"): - key = _key_to_snake_case(f"{category.tag}{value.tag}") + key = _sanitize_key(f"{category.tag}{value.tag}") record_fields.append(("string", key)) record_values[key] = record_value @@ -87,9 +87,14 @@ def _create_record_descriptor(record_name: str, record_fields: list[tuple[str, s return TargetRecordDescriptor(record_name, record_fields) -def _key_to_snake_case(key: str) -> str: +def _sanitize_key(key: str) -> str: + # Convert camel case to snake case for pattern in camel_case_patterns: key = pattern.sub(r"\1_\2", key) + + # Keep only basic characters in key + key = re.sub(r"[^a-zA-Z0-9_]", "", key) + return key.lower() diff --git a/tests/_data/plugins/os/windows/wer/wer_test.wer b/tests/_data/plugins/os/windows/wer/wer_test.wer index 1c836d1754528c959ee241ff16ec6e2cf689d48f..e260767aaa4221f56a1fe8bcf540875666e933aa 100644 GIT binary patch delta 114 zcmZ3*J%wk(D%LD6hEj%1hG+&uhFAt$h7yKUhGHNY4;D3K;9>}7C}4QWkPK9l$dJR} f%#Z=3i-0&8sOlw_vgTf_hWpePtR delta 7 OcmbQjvx Date: Tue, 20 Feb 2024 02:35:10 -0800 Subject: [PATCH 2/2] Process review feedback --- dissect/target/plugins/os/windows/wer.py | 170 ++++++++++-------- .../_data/plugins/os/windows/wer/wer_test.wer | Bin 1556 -> 1612 bytes tests/plugins/os/windows/test_wer.py | 17 +- 3 files changed, 110 insertions(+), 77 deletions(-) diff --git a/dissect/target/plugins/os/windows/wer.py b/dissect/target/plugins/os/windows/wer.py index 79629d3a4..e40718d53 100644 --- a/dissect/target/plugins/os/windows/wer.py +++ b/dissect/target/plugins/os/windows/wer.py @@ -14,69 +14,6 @@ camel_case_patterns = [re.compile(r"(\S)([A-Z][a-z]+)"), re.compile(r"([a-z0-9])([A-Z])"), re.compile(r"(\w)[.\s](\w)")] -def _collect_wer_data(wer_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]: - """Parse data from a .wer file.""" - record_values = {} - record_fields = [] - key = None - - # Default encoding when no BOM is present - encoding = "utf-16-le" - - # If a BOM header is present we can decode it using utf-16 - with wer_file.open("rb") as fh: - if fh.read(len(codecs.BOM)) in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): - encoding = "utf-16" - - for line in wer_file.read_text(encoding).splitlines(): - if len(line_split := line.rstrip().split("=", 1)) == 2: - name, value = line_split - record_type = "string" - - # dynamic entry with key and value on separate lines - if "].Name" in name and not key: - key = value - # set key and continue to get value on the next line - continue - - # dynamic entry with key and value on the same line - elif "]." in name and not key: - category, name = name.split(".", 1) - key = f"{category.split('[')[0]}{name}" - - if "EventTime" in name: - value = wintimestamp(int(value)) - record_type = "datetime" - key = "ts" - - key = _sanitize_key(key if key else name) - - record_values[key] = value - record_fields.append((record_type, key)) if key != "ts" else record_fields.insert(0, (record_type, key)) - # reset key necessary for dynamic entries and ts - key = None - - return record_fields, record_values - - -def _collect_wer_metadata(metadata_xml_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]: - """Parse data from a metadata .xml file linked to a .wer file.""" - record_fields = [] - record_values = {} - file = metadata_xml_file.read_text("utf-16") - - tree = ElementTree.fromstring(file) - for metadata in tree.iter("WERReportMetadata"): - for category in metadata: - for value in category: - if record_value := value.text.strip("\t\n"): - key = _sanitize_key(f"{category.tag}{value.tag}") - record_fields.append(("string", key)) - record_values[key] = record_value - - return record_fields, record_values - - def _create_record_descriptor(record_name: str, record_fields: list[tuple[str, str]]) -> TargetRecordDescriptor: record_fields.extend( [ @@ -87,17 +24,6 @@ def _create_record_descriptor(record_name: str, record_fields: list[tuple[str, s return TargetRecordDescriptor(record_name, record_fields) -def _sanitize_key(key: str) -> str: - # Convert camel case to snake case - for pattern in camel_case_patterns: - key = pattern.sub(r"\1_\2", key) - - # Keep only basic characters in key - key = re.sub(r"[^a-zA-Z0-9_]", "", key) - - return key.lower() - - class WindowsErrorReportingPlugin(Plugin): """Plugin for parsing Windows Error Reporting files.""" @@ -121,6 +47,98 @@ def check_compatible(self) -> None: if not self.wer_files: raise UnsupportedPluginError("No Windows Error Reporting directories found.") + def _sanitize_key(self, key: str) -> str: + # Convert camel case to snake case + for pattern in camel_case_patterns: + key = pattern.sub(r"\1_\2", key) + + # Keep only basic characters in key + key = re.sub(r"[^a-zA-Z0-9_]", "", key) + + return key.lower() + + def _collect_wer_data(self, wer_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]: + """Parse data from a .wer file.""" + record_values = {} + record_fields = [] + key = None + + # Default encoding when no BOM is present + encoding = "utf-16-le" + + # If a BOM header is present we can decode it using utf-16 + with wer_file.open("rb") as fh: + if fh.read(len(codecs.BOM)) in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): + encoding = "utf-16" + + for line in wer_file.read_text(encoding).splitlines(): + if len(line_split := line.rstrip().split("=", 1)) != 2: + continue + + name, value = line_split + record_type = "string" + + # Dynamic entry with key and value on separate lines + if "].Name" in name and not key: + key = value + # Set key and continue to get value on the next line + continue + + # Dynamic entry with key and value on the same line + elif "]." in name and not key: + category, name = name.split(".", 1) + key = f"{category.split('[')[0]}{name}" + + if "EventTime" in name: + value = wintimestamp(int(value)) + record_type = "datetime" + key = "ts" + + key = self._sanitize_key(key if key else name) + if not key: + self.target.log.warning(f"Sanitizing key resulted in empty key, skipping line '{line}'.") + key = None + continue + + if key in record_values: + self.target.log.warning(f"Key does already exist, skipping line '{line}'.") + key = None + continue + + record_values[key] = value + record_fields.append((record_type, key)) if key != "ts" else record_fields.insert(0, (record_type, key)) + # Reset key necessary for dynamic entries and ts + key = None + + return record_fields, record_values + + def _collect_wer_metadata(self, metadata_xml_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]: + """Parse data from a metadata .xml file linked to a .wer file.""" + record_fields = [] + record_values = {} + file = metadata_xml_file.read_text("utf-16") + + tree = ElementTree.fromstring(file) + for metadata in tree.iter("WERReportMetadata"): + for category in metadata: + for value in category: + if not (record_value := value.text.strip("\t\n")): + continue + + key = self._sanitize_key(f"{category.tag}{value.tag}") + if not key: + self.target.log.warning(f"Sanitizing key resulted in empty key, skipping value '{value}'.") + continue + + if key in record_values: + self.target.log.warning(f"Key already exists, skipping value '{value}'.") + continue + + record_fields.append(("string", key)) + record_values[key] = record_value + + return record_fields, record_values + @export(record=DynamicDescriptor(["path", "string", "datetime"])) def wer(self) -> Iterator[DynamicDescriptor]: """Return information from Windows Error Reporting (WER) files. @@ -163,13 +181,13 @@ def wer(self) -> Iterator[DynamicDescriptor]: for file in files: if file.suffix == ".wer": record_values["wer_file_path"] = file - wer_report_fields, wer_report_values = _collect_wer_data(file) + wer_report_fields, wer_report_values = self._collect_wer_data(file) # make sure wer_report_fields are the first entries in the list record_fields = wer_report_fields + record_fields record_values = record_values | wer_report_values elif ".WERInternalMetadata" in file.suffixes: record_values["metadata_file_path"] = file - metadata_fields, metadata_values = _collect_wer_metadata(file) + metadata_fields, metadata_values = self._collect_wer_metadata(file) record_fields.extend(metadata_fields) record_values = metadata_values | record_values diff --git a/tests/_data/plugins/os/windows/wer/wer_test.wer b/tests/_data/plugins/os/windows/wer/wer_test.wer index e260767aaa4221f56a1fe8bcf540875666e933aa..ad2231563c38b8aa00fe39aeda3b3ee812f984fe 100644 GIT binary patch delta 64 zcmbQjbB1Sw2%7~LgDs0GixG