Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in WER plugin caused by special characters in field name #544

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 94 additions & 71 deletions dissect/target/plugins/os/windows/wer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,69 +14,6 @@
camel_case_patterns = [re.compile(r"(\S)([A-Z][a-z]+)"), re.compile(r"([a-z0-9])([A-Z])"), re.compile(r"(\w)[.\s](\w)")]


def _collect_wer_data(wer_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]:
"""Parse data from a .wer file."""
record_values = {}
record_fields = []
key = None

# Default encoding when no BOM is present
encoding = "utf-16-le"

# If a BOM header is present we can decode it using utf-16
with wer_file.open("rb") as fh:
if fh.read(len(codecs.BOM)) in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
encoding = "utf-16"

for line in wer_file.read_text(encoding).splitlines():
if len(line_split := line.rstrip().split("=", 1)) == 2:
name, value = line_split
record_type = "string"

# dynamic entry with key and value on separate lines
if "].Name" in name and not key:
key = value
# set key and continue to get value on the next line
continue

# dynamic entry with key and value on the same line
elif "]." in name and not key:
category, name = name.split(".", 1)
key = f"{category.split('[')[0]}{name}"

if "EventTime" in name:
value = wintimestamp(int(value))
record_type = "datetime"
key = "ts"

key = _key_to_snake_case(key if key else name)

record_values[key] = value
record_fields.append((record_type, key)) if key != "ts" else record_fields.insert(0, (record_type, key))
# reset key necessary for dynamic entries and ts
key = None

return record_fields, record_values


def _collect_wer_metadata(metadata_xml_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]:
"""Parse data from a metadata .xml file linked to a .wer file."""
record_fields = []
record_values = {}
file = metadata_xml_file.read_text("utf-16")

tree = ElementTree.fromstring(file)
for metadata in tree.iter("WERReportMetadata"):
for category in metadata:
for value in category:
if record_value := value.text.strip("\t\n"):
key = _key_to_snake_case(f"{category.tag}{value.tag}")
record_fields.append(("string", key))
record_values[key] = record_value

return record_fields, record_values


def _create_record_descriptor(record_name: str, record_fields: list[tuple[str, str]]) -> TargetRecordDescriptor:
record_fields.extend(
[
Expand All @@ -87,12 +24,6 @@
return TargetRecordDescriptor(record_name, record_fields)


def _key_to_snake_case(key: str) -> str:
for pattern in camel_case_patterns:
key = pattern.sub(r"\1_\2", key)
return key.lower()


class WindowsErrorReportingPlugin(Plugin):
"""Plugin for parsing Windows Error Reporting files."""

Expand All @@ -116,6 +47,98 @@
if not self.wer_files:
raise UnsupportedPluginError("No Windows Error Reporting directories found.")

def _sanitize_key(self, key: str) -> str:
# Convert camel case to snake case
for pattern in camel_case_patterns:
key = pattern.sub(r"\1_\2", key)

# Keep only basic characters in key
key = re.sub(r"[^a-zA-Z0-9_]", "", key)

return key.lower()

def _collect_wer_data(self, wer_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]:
"""Parse data from a .wer file."""
record_values = {}
record_fields = []
key = None

# Default encoding when no BOM is present
encoding = "utf-16-le"

# If a BOM header is present we can decode it using utf-16
with wer_file.open("rb") as fh:
if fh.read(len(codecs.BOM)) in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
encoding = "utf-16"

for line in wer_file.read_text(encoding).splitlines():
if len(line_split := line.rstrip().split("=", 1)) != 2:
continue

Check warning on line 76 in dissect/target/plugins/os/windows/wer.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/windows/wer.py#L76

Added line #L76 was not covered by tests

name, value = line_split
record_type = "string"

# Dynamic entry with key and value on separate lines
if "].Name" in name and not key:
key = value
# Set key and continue to get value on the next line
continue

# Dynamic entry with key and value on the same line
elif "]." in name and not key:
category, name = name.split(".", 1)
key = f"{category.split('[')[0]}{name}"

if "EventTime" in name:
value = wintimestamp(int(value))
record_type = "datetime"
key = "ts"

key = self._sanitize_key(key if key else name)
if not key:
self.target.log.warning(f"Sanitizing key resulted in empty key, skipping line '{line}'.")
key = None
continue

if key in record_values:
self.target.log.warning(f"Key does already exist, skipping line '{line}'.")
key = None
continue

Check warning on line 106 in dissect/target/plugins/os/windows/wer.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/windows/wer.py#L104-L106

Added lines #L104 - L106 were not covered by tests

record_values[key] = value
record_fields.append((record_type, key)) if key != "ts" else record_fields.insert(0, (record_type, key))
# Reset key necessary for dynamic entries and ts
key = None

return record_fields, record_values

def _collect_wer_metadata(self, metadata_xml_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]:
"""Parse data from a metadata .xml file linked to a .wer file."""
record_fields = []
record_values = {}
file = metadata_xml_file.read_text("utf-16")

tree = ElementTree.fromstring(file)
for metadata in tree.iter("WERReportMetadata"):
for category in metadata:
for value in category:
if not (record_value := value.text.strip("\t\n")):
continue

Check warning on line 126 in dissect/target/plugins/os/windows/wer.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/windows/wer.py#L126

Added line #L126 was not covered by tests

key = self._sanitize_key(f"{category.tag}{value.tag}")
if not key:
self.target.log.warning(f"Sanitizing key resulted in empty key, skipping value '{value}'.")
continue

Check warning on line 131 in dissect/target/plugins/os/windows/wer.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/windows/wer.py#L130-L131

Added lines #L130 - L131 were not covered by tests

if key in record_values:
self.target.log.warning(f"Key already exists, skipping value '{value}'.")
continue

Check warning on line 135 in dissect/target/plugins/os/windows/wer.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/windows/wer.py#L134-L135

Added lines #L134 - L135 were not covered by tests

record_fields.append(("string", key))
record_values[key] = record_value

return record_fields, record_values

@export(record=DynamicDescriptor(["path", "string", "datetime"]))
def wer(self) -> Iterator[DynamicDescriptor]:
"""Return information from Windows Error Reporting (WER) files.
Expand Down Expand Up @@ -158,13 +181,13 @@
for file in files:
if file.suffix == ".wer":
record_values["wer_file_path"] = file
wer_report_fields, wer_report_values = _collect_wer_data(file)
wer_report_fields, wer_report_values = self._collect_wer_data(file)
# make sure wer_report_fields are the first entries in the list
record_fields = wer_report_fields + record_fields
record_values = record_values | wer_report_values
elif ".WERInternalMetadata" in file.suffixes:
record_values["metadata_file_path"] = file
metadata_fields, metadata_values = _collect_wer_metadata(file)
metadata_fields, metadata_values = self._collect_wer_metadata(file)
record_fields.extend(metadata_fields)
record_values = metadata_values | record_values

Expand Down
Binary file modified tests/_data/plugins/os/windows/wer/wer_test.wer
Binary file not shown.
17 changes: 16 additions & 1 deletion tests/plugins/os/windows/test_wer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,16 @@ def test_wer_plugin(target_win, fs_win):
wer_dir = absolute_path("_data/plugins/os/windows/wer")
fs_win.map_dir("ProgramData/Microsoft/Windows/WER/ReportQueue/test", wer_dir)
target_win.add_plugin(WindowsErrorReportingPlugin)
tests = ["os_version_information_lcid", "response_type", "sig", "dynamic_sig", "dynamic_signatures_parameter1"]
tests = [
"os_version_information_lcid",
"response_type",
"sig",
"dynamic_sig",
"dynamic_signatures_parameter1",
"ui1",
"spcial_charactr",
"невидимый",
]

records = list(target_win.wer())
assert len(records) == 2
Expand All @@ -20,6 +29,12 @@ def test_wer_plugin(target_win, fs_win):
record = wer_record_map["wer_test.wer"]
for test in tests:
record_field = getattr(record, test, None)

# Check if expected line has been skipped
if record_field is None:
assert test == "невидимый"
continue

assert record_field == f"test_{test}"

assert record.ts == datetime.datetime(2022, 10, 4, 11, 0, 0, 0, tzinfo=datetime.timezone.utc)
Expand Down
Loading