diff --git a/dissect/target/plugins/os/windows/wer.py b/dissect/target/plugins/os/windows/wer.py index f5c6f73e4..e40718d53 100644 --- a/dissect/target/plugins/os/windows/wer.py +++ b/dissect/target/plugins/os/windows/wer.py @@ -14,69 +14,6 @@ camel_case_patterns = [re.compile(r"(\S)([A-Z][a-z]+)"), re.compile(r"([a-z0-9])([A-Z])"), re.compile(r"(\w)[.\s](\w)")] -def _collect_wer_data(wer_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]: - """Parse data from a .wer file.""" - record_values = {} - record_fields = [] - key = None - - # Default encoding when no BOM is present - encoding = "utf-16-le" - - # If a BOM header is present we can decode it using utf-16 - with wer_file.open("rb") as fh: - if fh.read(len(codecs.BOM)) in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): - encoding = "utf-16" - - for line in wer_file.read_text(encoding).splitlines(): - if len(line_split := line.rstrip().split("=", 1)) == 2: - name, value = line_split - record_type = "string" - - # dynamic entry with key and value on separate lines - if "].Name" in name and not key: - key = value - # set key and continue to get value on the next line - continue - - # dynamic entry with key and value on the same line - elif "]." in name and not key: - category, name = name.split(".", 1) - key = f"{category.split('[')[0]}{name}" - - if "EventTime" in name: - value = wintimestamp(int(value)) - record_type = "datetime" - key = "ts" - - key = _key_to_snake_case(key if key else name) - - record_values[key] = value - record_fields.append((record_type, key)) if key != "ts" else record_fields.insert(0, (record_type, key)) - # reset key necessary for dynamic entries and ts - key = None - - return record_fields, record_values - - -def _collect_wer_metadata(metadata_xml_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]: - """Parse data from a metadata .xml file linked to a .wer file.""" - record_fields = [] - record_values = {} - file = metadata_xml_file.read_text("utf-16") - - tree = ElementTree.fromstring(file) - for metadata in tree.iter("WERReportMetadata"): - for category in metadata: - for value in category: - if record_value := value.text.strip("\t\n"): - key = _key_to_snake_case(f"{category.tag}{value.tag}") - record_fields.append(("string", key)) - record_values[key] = record_value - - return record_fields, record_values - - def _create_record_descriptor(record_name: str, record_fields: list[tuple[str, str]]) -> TargetRecordDescriptor: record_fields.extend( [ @@ -87,12 +24,6 @@ def _create_record_descriptor(record_name: str, record_fields: list[tuple[str, s return TargetRecordDescriptor(record_name, record_fields) -def _key_to_snake_case(key: str) -> str: - for pattern in camel_case_patterns: - key = pattern.sub(r"\1_\2", key) - return key.lower() - - class WindowsErrorReportingPlugin(Plugin): """Plugin for parsing Windows Error Reporting files.""" @@ -116,6 +47,98 @@ def check_compatible(self) -> None: if not self.wer_files: raise UnsupportedPluginError("No Windows Error Reporting directories found.") + def _sanitize_key(self, key: str) -> str: + # Convert camel case to snake case + for pattern in camel_case_patterns: + key = pattern.sub(r"\1_\2", key) + + # Keep only basic characters in key + key = re.sub(r"[^a-zA-Z0-9_]", "", key) + + return key.lower() + + def _collect_wer_data(self, wer_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]: + """Parse data from a .wer file.""" + record_values = {} + record_fields = [] + key = None + + # Default encoding when no BOM is present + encoding = "utf-16-le" + + # If a BOM header is present we can decode it using utf-16 + with wer_file.open("rb") as fh: + if fh.read(len(codecs.BOM)) in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): + encoding = "utf-16" + + for line in wer_file.read_text(encoding).splitlines(): + if len(line_split := line.rstrip().split("=", 1)) != 2: + continue + + name, value = line_split + record_type = "string" + + # Dynamic entry with key and value on separate lines + if "].Name" in name and not key: + key = value + # Set key and continue to get value on the next line + continue + + # Dynamic entry with key and value on the same line + elif "]." in name and not key: + category, name = name.split(".", 1) + key = f"{category.split('[')[0]}{name}" + + if "EventTime" in name: + value = wintimestamp(int(value)) + record_type = "datetime" + key = "ts" + + key = self._sanitize_key(key if key else name) + if not key: + self.target.log.warning(f"Sanitizing key resulted in empty key, skipping line '{line}'.") + key = None + continue + + if key in record_values: + self.target.log.warning(f"Key does already exist, skipping line '{line}'.") + key = None + continue + + record_values[key] = value + record_fields.append((record_type, key)) if key != "ts" else record_fields.insert(0, (record_type, key)) + # Reset key necessary for dynamic entries and ts + key = None + + return record_fields, record_values + + def _collect_wer_metadata(self, metadata_xml_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]: + """Parse data from a metadata .xml file linked to a .wer file.""" + record_fields = [] + record_values = {} + file = metadata_xml_file.read_text("utf-16") + + tree = ElementTree.fromstring(file) + for metadata in tree.iter("WERReportMetadata"): + for category in metadata: + for value in category: + if not (record_value := value.text.strip("\t\n")): + continue + + key = self._sanitize_key(f"{category.tag}{value.tag}") + if not key: + self.target.log.warning(f"Sanitizing key resulted in empty key, skipping value '{value}'.") + continue + + if key in record_values: + self.target.log.warning(f"Key already exists, skipping value '{value}'.") + continue + + record_fields.append(("string", key)) + record_values[key] = record_value + + return record_fields, record_values + @export(record=DynamicDescriptor(["path", "string", "datetime"])) def wer(self) -> Iterator[DynamicDescriptor]: """Return information from Windows Error Reporting (WER) files. @@ -158,13 +181,13 @@ def wer(self) -> Iterator[DynamicDescriptor]: for file in files: if file.suffix == ".wer": record_values["wer_file_path"] = file - wer_report_fields, wer_report_values = _collect_wer_data(file) + wer_report_fields, wer_report_values = self._collect_wer_data(file) # make sure wer_report_fields are the first entries in the list record_fields = wer_report_fields + record_fields record_values = record_values | wer_report_values elif ".WERInternalMetadata" in file.suffixes: record_values["metadata_file_path"] = file - metadata_fields, metadata_values = _collect_wer_metadata(file) + metadata_fields, metadata_values = self._collect_wer_metadata(file) record_fields.extend(metadata_fields) record_values = metadata_values | record_values diff --git a/tests/_data/plugins/os/windows/wer/wer_test.wer b/tests/_data/plugins/os/windows/wer/wer_test.wer index 1c836d175..ad2231563 100644 Binary files a/tests/_data/plugins/os/windows/wer/wer_test.wer and b/tests/_data/plugins/os/windows/wer/wer_test.wer differ diff --git a/tests/plugins/os/windows/test_wer.py b/tests/plugins/os/windows/test_wer.py index caa24b820..6d4765c06 100644 --- a/tests/plugins/os/windows/test_wer.py +++ b/tests/plugins/os/windows/test_wer.py @@ -8,7 +8,16 @@ def test_wer_plugin(target_win, fs_win): wer_dir = absolute_path("_data/plugins/os/windows/wer") fs_win.map_dir("ProgramData/Microsoft/Windows/WER/ReportQueue/test", wer_dir) target_win.add_plugin(WindowsErrorReportingPlugin) - tests = ["os_version_information_lcid", "response_type", "sig", "dynamic_sig", "dynamic_signatures_parameter1"] + tests = [ + "os_version_information_lcid", + "response_type", + "sig", + "dynamic_sig", + "dynamic_signatures_parameter1", + "ui1", + "spcial_charactr", + "невидимый", + ] records = list(target_win.wer()) assert len(records) == 2 @@ -20,6 +29,12 @@ def test_wer_plugin(target_win, fs_win): record = wer_record_map["wer_test.wer"] for test in tests: record_field = getattr(record, test, None) + + # Check if expected line has been skipped + if record_field is None: + assert test == "невидимый" + continue + assert record_field == f"test_{test}" assert record.ts == datetime.datetime(2022, 10, 4, 11, 0, 0, 0, tzinfo=datetime.timezone.utc)