From 2c6ea11810db5e3220c788b1946cdf7657e5d271 Mon Sep 17 00:00:00 2001 From: cecinestpasunepipe <110607403+cecinestpasunepipe@users.noreply.github.com> Date: Thu, 16 May 2024 12:57:05 +0200 Subject: [PATCH 1/8] Add monitoring option to MQTT Loader (DIS-3213) --- dissect/target/loaders/mqtt.py | 89 +++++++++++++++++++++++++++++++++- 1 file changed, 87 insertions(+), 2 deletions(-) diff --git a/dissect/target/loaders/mqtt.py b/dissect/target/loaders/mqtt.py index 5ef744216..c668baed4 100644 --- a/dissect/target/loaders/mqtt.py +++ b/dissect/target/loaders/mqtt.py @@ -1,13 +1,18 @@ from __future__ import annotations +import atexit import logging +import math +import os import ssl +import sys import time import urllib from dataclasses import dataclass from functools import lru_cache from pathlib import Path from struct import pack, unpack_from +from threading import Thread from typing import Any, Callable, Iterator, Optional, Union import paho.mqtt.client as mqtt @@ -62,12 +67,79 @@ def _read(self, offset: int, length: int, optimization_strategy: int = 0) -> byt return data +class MQTTDiagnosticLine: + def __init__(self, connection: MQTTConnection, total_peers: int): + self.connection = connection + self.total_peers = total_peers + self._columns, self._rows = os.get_terminal_size(0) + atexit.register(self._detach) + self._attach() + + def _attach(self) -> None: + sys.stderr.write(f"\0337\033[r\0338\033D\033M\0337\033[1;{self._rows - 1}r\0338") + + def _detach(self) -> None: + sys.stderr.write(f"\0337\033[{self._rows};1H\033[K\033[r\0338") + sys.stderr.flush() + + def display(self) -> None: + prefix = "\x1b[44m\x1b[37m\r" + suffix = "\x1b[0m" + separator = "\x1b[41m\x1b[1m" + logo = "TARGETD" + start = time.time() + t2 = start + mark = start + _bytes = 0 + subtract = 0 + while True: + time.sleep(0.05) + peers = "?" + try: + peers = len(self.connection.broker.peers(self.connection.host)) + except Exception: + pass + total = self.total_peers + recv = self.connection.broker.bytes_received + now = time.time() + + # to avoid endless countdowns if no data is transferred in reality anymore. + if (now - mark) > 3 and not _bytes: + _bytes = recv + t2 = now + if (now - mark) > 9 and _bytes: + subtract = _bytes + _bytes = 0 + mark = t2 + + recv -= subtract + failures = self.connection.retries + transfer = (recv / (now - mark)) / 1000 + seconds_elapsed = round(now - start) % 60 + minutes_elapsed = math.floor(seconds_elapsed / 60) % 60 + hours_elapsed = math.floor(minutes_elapsed / 60) + timer = f"{hours_elapsed:02d}:{minutes_elapsed:02d}:{seconds_elapsed:02d}" + display = f"{timer} {peers}/{total} peers {transfer:>8.2f} KB p/s {failures:>4} failures" + rest = self._columns - len(display) + padding = (rest - len(logo) - 1) * " " + sys.stderr.write(f"\0337\033[{self._rows};1H\033[?7l\033[0m") + sys.stderr.write(prefix + display + padding + separator + logo + suffix) + sys.stderr.write("\033[?7h\0338") + sys.stderr.flush() + + def start(self) -> None: + t = Thread(target=self.display) + t.daemon = True + t.start() + + class MQTTConnection: broker = None host = None prev = -1 factor = 1 prefetch_factor_inc = 10 + retries = 0 def __init__(self, broker: Broker, host: str): self.broker = broker @@ -125,6 +197,7 @@ def read(self, disk_id: int, offset: int, length: int, optimization_strategy: in # message might have not reached agent, resend... self.broker.seek(self.host, disk_id, offset, flength, optimization_strategy) attempts = 0 + self.retries += 1 return message.data @@ -138,6 +211,7 @@ class Broker: mqtt_client = None connected = False case = None + bytes_received = 0 diskinfo = {} index = {} @@ -217,6 +291,8 @@ def _on_message(self, client: mqtt.Client, userdata: Any, msg: mqtt.client.MQTTM if casename != self.case: return + self.bytes_received += sys.getsizeof(msg.payload) + if response == "DISKS": self._on_disk(hostname, msg.payload) elif response == "READ": @@ -238,9 +314,12 @@ def info(self, host: str) -> None: self.mqtt_client.publish(f"{self.case}/{host}/INFO") def topology(self, host: str) -> None: - self.topo[host] = [] + if host not in self.topo: + self.topo[host] = [] self.mqtt_client.subscribe(f"{self.case}/{host}/ID") time.sleep(1) # need some time to avoid race condition, i.e. MQTT might react too fast + # send a simple clear command (invalid, just clears the prev. msg) just in case TOPO is stale + self.mqtt_client.publish(f"{self.case}/{host}/CLR") self.mqtt_client.publish(f"{self.case}/{host}/TOPO") def connect(self) -> None: @@ -272,6 +351,7 @@ def connect(self) -> None: @arg("--mqtt-crt", dest="crt", help="client certificate file") @arg("--mqtt-ca", dest="ca", help="certificate authority file") @arg("--mqtt-command", dest="command", help="direct command to client(s)") +@arg("--mqtt-diag", action="store_true", dest="diag", help="show MQTT diagnostic information") class MQTTLoader(Loader): """Load remote targets through a broker.""" @@ -292,6 +372,7 @@ def detect(path: Path) -> bool: def find_all(path: Path, **kwargs) -> Iterator[str]: cls = MQTTLoader num_peers = 1 + if cls.broker is None: if (uri := kwargs.get("parsed_path")) is None: raise LoaderError("No URI connection details have been passed.") @@ -299,8 +380,12 @@ def find_all(path: Path, **kwargs) -> Iterator[str]: cls.broker = Broker(**options) cls.broker.connect() num_peers = int(options.get("peers", 1)) + cls.connection = MQTTConnection(cls.broker, path) + if options.get("diag", None): + MQTTDiagnosticLine(cls.connection, num_peers).start() + else: + cls.connection = MQTTConnection(cls.broker, path) - cls.connection = MQTTConnection(cls.broker, path) cls.peers = cls.connection.topo(num_peers) yield from cls.peers From 7d0731c21fe4bcded27d354a0945445f41fb5a9d Mon Sep 17 00:00:00 2001 From: cecinestpasunepipe <110607403+cecinestpasunepipe@users.noreply.github.com> Date: Thu, 16 May 2024 14:12:29 +0200 Subject: [PATCH 2/8] adjust test --- tests/loaders/test_mqtt.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/loaders/test_mqtt.py b/tests/loaders/test_mqtt.py index 7cba68788..15ba108c3 100644 --- a/tests/loaders/test_mqtt.py +++ b/tests/loaders/test_mqtt.py @@ -44,6 +44,8 @@ def publish(self, topic: str, *args) -> None: begin = int(tokens[4], 16) end = int(tokens[5], 16) response.payload = self.disks[int(tokens[3])][begin : begin + end] + else: + return self.on_message(self, None, response) From bd4c82438dcdfa2f886204dc81987b26f8ac8db5 Mon Sep 17 00:00:00 2001 From: cecinestpasunepipe <110607403+cecinestpasunepipe@users.noreply.github.com> Date: Thu, 16 May 2024 14:29:55 +0200 Subject: [PATCH 3/8] Make compatible with pypy. --- dissect/target/loaders/mqtt.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dissect/target/loaders/mqtt.py b/dissect/target/loaders/mqtt.py index c668baed4..29e3963fb 100644 --- a/dissect/target/loaders/mqtt.py +++ b/dissect/target/loaders/mqtt.py @@ -212,6 +212,7 @@ class Broker: connected = False case = None bytes_received = 0 + monitor = False diskinfo = {} index = {} @@ -291,7 +292,8 @@ def _on_message(self, client: mqtt.Client, userdata: Any, msg: mqtt.client.MQTTM if casename != self.case: return - self.bytes_received += sys.getsizeof(msg.payload) + if self.monitor: + self.bytes_received += len(msg.payload) if response == "DISKS": self._on_disk(hostname, msg.payload) @@ -382,6 +384,7 @@ def find_all(path: Path, **kwargs) -> Iterator[str]: num_peers = int(options.get("peers", 1)) cls.connection = MQTTConnection(cls.broker, path) if options.get("diag", None): + cls.broker.monitor = True MQTTDiagnosticLine(cls.connection, num_peers).start() else: cls.connection = MQTTConnection(cls.broker, path) From 86529a9ad8bd88f0568df8ae3e5171378250d687 Mon Sep 17 00:00:00 2001 From: cecinestpasunepipe <110607403+cecinestpasunepipe@users.noreply.github.com> Date: Tue, 21 May 2024 13:11:03 +0200 Subject: [PATCH 4/8] Update dissect/target/loaders/mqtt.py Co-authored-by: pyrco <105293448+pyrco@users.noreply.github.com> --- dissect/target/loaders/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dissect/target/loaders/mqtt.py b/dissect/target/loaders/mqtt.py index 29e3963fb..6f31a42d2 100644 --- a/dissect/target/loaders/mqtt.py +++ b/dissect/target/loaders/mqtt.py @@ -119,7 +119,7 @@ def display(self) -> None: minutes_elapsed = math.floor(seconds_elapsed / 60) % 60 hours_elapsed = math.floor(minutes_elapsed / 60) timer = f"{hours_elapsed:02d}:{minutes_elapsed:02d}:{seconds_elapsed:02d}" - display = f"{timer} {peers}/{total} peers {transfer:>8.2f} KB p/s {failures:>4} failures" + display = f"{timer} {peers}/{self.total_peers} peers {transfer:>8.2f} KB p/s {failures:>4} failures" rest = self._columns - len(display) padding = (rest - len(logo) - 1) * " " sys.stderr.write(f"\0337\033[{self._rows};1H\033[?7l\033[0m") From af623e7a6810d0926852947149867c4b329a7df3 Mon Sep 17 00:00:00 2001 From: cecinestpasunepipe <110607403+cecinestpasunepipe@users.noreply.github.com> Date: Tue, 21 May 2024 13:11:10 +0200 Subject: [PATCH 5/8] Update dissect/target/loaders/mqtt.py Co-authored-by: pyrco <105293448+pyrco@users.noreply.github.com> --- dissect/target/loaders/mqtt.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dissect/target/loaders/mqtt.py b/dissect/target/loaders/mqtt.py index 6f31a42d2..4e05c4eaf 100644 --- a/dissect/target/loaders/mqtt.py +++ b/dissect/target/loaders/mqtt.py @@ -87,11 +87,14 @@ def display(self) -> None: suffix = "\x1b[0m" separator = "\x1b[41m\x1b[1m" logo = "TARGETD" + start = time.time() t2 = start mark = start + _bytes = 0 subtract = 0 + while True: time.sleep(0.05) peers = "?" From 3f624df20d987a39ca4008094e5fb375061a64ca Mon Sep 17 00:00:00 2001 From: cecinestpasunepipe <110607403+cecinestpasunepipe@users.noreply.github.com> Date: Tue, 21 May 2024 13:11:16 +0200 Subject: [PATCH 6/8] Update dissect/target/loaders/mqtt.py Co-authored-by: pyrco <105293448+pyrco@users.noreply.github.com> --- dissect/target/loaders/mqtt.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dissect/target/loaders/mqtt.py b/dissect/target/loaders/mqtt.py index 4e05c4eaf..4ba4aed5e 100644 --- a/dissect/target/loaders/mqtt.py +++ b/dissect/target/loaders/mqtt.py @@ -102,7 +102,6 @@ def display(self) -> None: peers = len(self.connection.broker.peers(self.connection.host)) except Exception: pass - total = self.total_peers recv = self.connection.broker.bytes_received now = time.time() From fe74cb1d9d0b884c94e85a27da7500f75523c7a4 Mon Sep 17 00:00:00 2001 From: cecinestpasunepipe <110607403+cecinestpasunepipe@users.noreply.github.com> Date: Tue, 21 May 2024 17:08:48 +0200 Subject: [PATCH 7/8] Improvements --- dissect/target/loaders/mqtt.py | 99 ++++++++++++++++++++++++++-------- 1 file changed, 77 insertions(+), 22 deletions(-) diff --git a/dissect/target/loaders/mqtt.py b/dissect/target/loaders/mqtt.py index 4ba4aed5e..962e4c991 100644 --- a/dissect/target/loaders/mqtt.py +++ b/dissect/target/loaders/mqtt.py @@ -56,6 +56,34 @@ class SeekMessage: data: bytes = b"" +class MQTTTransferRatePerSecond: + def __init__(self, window_size: int = 10): + self.window_size = window_size + self.timestamps = [] + self.bytes = [] + + def record(self, timestamp: float, byte_count: int) -> MQTTTransferRatePerSecond: + while self.timestamps and (timestamp - self.timestamps[0] > self.window_size): + self.timestamps.pop(0) + self.bytes.pop(0) + + self.timestamps.append(timestamp) + self.bytes.append(byte_count) + return self + + def value(self, current_time: float) -> float: + if not self.timestamps: + return 0 + + elapsed_time = current_time - self.timestamps[0] + if elapsed_time == 0: + return 0 + + total_bytes = self.bytes[-1] - self.bytes[0] + + return total_bytes / elapsed_time + + class MQTTStream(AlignedStream): def __init__(self, stream: MQTTConnection, disk_id: int, size: Optional[int] = None): self.stream = stream @@ -76,24 +104,48 @@ def __init__(self, connection: MQTTConnection, total_peers: int): self._attach() def _attach(self) -> None: - sys.stderr.write(f"\0337\033[r\0338\033D\033M\0337\033[1;{self._rows - 1}r\0338") + # save cursor position + sys.stderr.write("\0337") + # set top and bottom margins of the scrolling region to default + sys.stderr.write("\033[r") + # restore cursor position + sys.stderr.write("\0338") + # move cursor down one line in the same column; if at the bottom, the screen scrolls up + sys.stderr.write("\033D") + # move cursor up one line in the same column; if at the top, screen scrolls down + sys.stderr.write("\033M") + # save cursor position again + sys.stderr.write("\0337") + # restrict scrolling to a region from the first line to one before the last line + sys.stderr.write(f"\033[1;{self._rows - 1}r") + # restore cursor position after setting scrolling region + sys.stderr.write("\0338") def _detach(self) -> None: - sys.stderr.write(f"\0337\033[{self._rows};1H\033[K\033[r\0338") + # save cursor position + sys.stderr.write("\0337") + # move cursor to the specified position (last line, first column) + sys.stderr.write(f"\033[{self._rows};1H") + # clear from cursor to end of the line + sys.stderr.write("\033[K") + # reset scrolling region to include the entire display + sys.stderr.write("\033[r") + # restore cursor position + sys.stderr.write("\0338") + # ensure the written content is displayed (flush output) sys.stderr.flush() def display(self) -> None: + # prepare: set background color to blue and text color to white at the beginning of the line prefix = "\x1b[44m\x1b[37m\r" + # reset all attributes (colors, styles) to their defaults afterwards suffix = "\x1b[0m" + # separator to set background color to red and text style to bold separator = "\x1b[41m\x1b[1m" logo = "TARGETD" start = time.time() - t2 = start - mark = start - - _bytes = 0 - subtract = 0 + transfer_rate = MQTTTransferRatePerSecond(window_size=7) while True: time.sleep(0.05) @@ -102,31 +154,34 @@ def display(self) -> None: peers = len(self.connection.broker.peers(self.connection.host)) except Exception: pass + recv = self.connection.broker.bytes_received now = time.time() - - # to avoid endless countdowns if no data is transferred in reality anymore. - if (now - mark) > 3 and not _bytes: - _bytes = recv - t2 = now - if (now - mark) > 9 and _bytes: - subtract = _bytes - _bytes = 0 - mark = t2 - - recv -= subtract + transfer = transfer_rate.record(now, recv).value(now) / 1000 # convert to KB/s failures = self.connection.retries - transfer = (recv / (now - mark)) / 1000 seconds_elapsed = round(now - start) % 60 minutes_elapsed = math.floor(seconds_elapsed / 60) % 60 hours_elapsed = math.floor(minutes_elapsed / 60) timer = f"{hours_elapsed:02d}:{minutes_elapsed:02d}:{seconds_elapsed:02d}" display = f"{timer} {peers}/{self.total_peers} peers {transfer:>8.2f} KB p/s {failures:>4} failures" rest = self._columns - len(display) - padding = (rest - len(logo) - 1) * " " - sys.stderr.write(f"\0337\033[{self._rows};1H\033[?7l\033[0m") + padding = (rest - len(logo)) * " " + + # save cursor position + sys.stderr.write("\0337") + # move cursor to specified position (last line, first column) + sys.stderr.write(f"\033[{self._rows};1H") + # disable line wrapping + sys.stderr.write("\033[?7l") + # reset all attributes + sys.stderr.write("\033[0m") + # write the display line with prefix, calculated display content, padding, separator, and logo sys.stderr.write(prefix + display + padding + separator + logo + suffix) - sys.stderr.write("\033[?7h\0338") + # enable line wrapping again + sys.stderr.write("\033[?7h") + # restore cursor position + sys.stderr.write("\0338") + # flush output to ensure it is displayed sys.stderr.flush() def start(self) -> None: From 2134ec5427b2ea8550c2d9fbb9cf968ddec36564 Mon Sep 17 00:00:00 2001 From: cecinestpasunepipe <110607403+cecinestpasunepipe@users.noreply.github.com> Date: Tue, 21 May 2024 17:16:35 +0200 Subject: [PATCH 8/8] Improvements --- dissect/target/loaders/mqtt.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dissect/target/loaders/mqtt.py b/dissect/target/loaders/mqtt.py index 962e4c991..cf87f4973 100644 --- a/dissect/target/loaders/mqtt.py +++ b/dissect/target/loaders/mqtt.py @@ -160,8 +160,8 @@ def display(self) -> None: transfer = transfer_rate.record(now, recv).value(now) / 1000 # convert to KB/s failures = self.connection.retries seconds_elapsed = round(now - start) % 60 - minutes_elapsed = math.floor(seconds_elapsed / 60) % 60 - hours_elapsed = math.floor(minutes_elapsed / 60) + minutes_elapsed = math.floor((now - start) / 60) % 60 + hours_elapsed = math.floor((now - start) / 60**2) timer = f"{hours_elapsed:02d}:{minutes_elapsed:02d}:{seconds_elapsed:02d}" display = f"{timer} {peers}/{self.total_peers} peers {transfer:>8.2f} KB p/s {failures:>4} failures" rest = self._columns - len(display)